So You Want to Build a Language VM - Part 31 - Making Clustering Make Sense

Making the Clustering Make Sense

Intro

Sorry everyone! I’ve been busy, hence the delay. Back in tutorial 29, we had two nodes talking to each other, but we got a lot of random text on the screen. Let’s figure out why!

The Problem

>>> !join_cluster 127.0.0.1 2254
Attempting to join cluster...
Connected to cluster!
thread 'main' panicked at 'called `Option::unwrap()` on a `None` value', libcore/option.rs:345:21
stack backtrace:
   0: std::sys::unix::backtrace::tracing::imp::unwind_backtrace
   1: std::sys_common::backtrace::print
   2: std::panicking::default_hook::{{closure}}
   3: std::panicking::default_hook
   4: std::panicking::rust_panic_with_hook
   5: std::panicking::continue_panic_fmt
   6: rust_begin_unwind
   7: core::panicking::panic_fmt
   8: core::panicking::panic
   9: <core::option::Option<T>>::unwrap
  10: iridium::cluster::client::ClusterClient::send_hello
  11: iridium::repl::REPL::join_cluster
  12: iridium::repl::REPL::execute_command
  13: iridium::repl::REPL::run
  14: iridium::main
  15: std::rt::lang_start::{{closure}}
  16: std::panicking::try::do_call
  17: __rust_maybe_catch_panic
  18: std::rt::lang_start_internal
  19: std::rt::lang_start
  20: main

What is going on in send_hello? Well, we seem to have not specified an alias! We can start up an iridium node with no alias. Our ClusterClient initialization looks like:

ClusterClient {
    reader: BufReader::new(reader),
    writer: BufWriter::new(writer),
    raw_stream: stream,
    _tx: Some(Arc::new(Mutex::new(tx))),
    rx: Some(Arc::new(Mutex::new(rx))),
    alias: None,
}
Let’s have the ID be UUID of the VM. We could add a parameter to the new function of ClusterClient, or we could a more builder-pattern type thing. I’m going to opt for the builder-pattern as I like it.

Add a new function to ClusterClient:

/// Sets the alias of the ClusterClient and returns it
pub fn with_alias(mut self, alias: String) -> Self {
    self.alias = Some(alias);
    self
}
In src/vm.rs, we need to make the id attribute public:
pub id: Uuid,
Now, over in src/repl/mod.rs, in the join_cluster function, we have this line:
let mut cc = cluster::client::ClusterClient::new(stream);
Let’s change it to incorporate our new function:
let mut cc = cluster::client::ClusterClient::new(stream).with_alias(self.vm.id.to_string());

Now let’s see what happens!

>>> Alias is: "aa4356b8-f0a0-4250-b5e5-eaabd265e9b7\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}....that goes on for awhile"

But hey, we got the alias portion!

The Second Problem

Why is it doing this? Well, remember that buffer we declared over in src/cluster/server.rs? Let’s look at the section:

thread::spawn(move || {
    let mut buf = [0; 1024];
    let mut client = ClusterClient::new(stream);
    let _bytes_read = client.reader.read(&mut buf);
    let alias = String::from_utf8_lossy(&buf);
    println!("Alias is: {:?}", alias);
    client.run();
});
The alias only took up so many bytes, but we are converting all 1024 into a string! Awkward! What we want is to take a slice and use only the number of bytes we read.

You can see we even have a handy-dandy variable, _bytes_read, that has the info. A slight modificiation to our code…​

thread::spawn(move || {
    let mut buf = [0; 1024];
    let mut client = ClusterClient::new(stream);
    let bytes_read = client.reader.read(&mut buf).unwrap();
    let alias = String::from_utf8_lossy(&buf[0..bytes_read]);
    println!("Alias is: {:?}", alias);
    client.run();
});

And now let’s try again. On the server side:

Fletchers-MBP:iridium fletcher$ iridium
>>> Welcome to Iridium! Let's be productive!
!start_cluster
Started cluster server!

Then on the client side:

Welcome to Iridium! Let's be productive!
>>> !join_cluster 127.0.0.1 2254
Attempting to join cluster...
Connected to cluster!
>>>

Then back on the server side:

>>> Alias is: "57be0d98-7634-411b-944f-0d1934ead78d"

Yay! We only read the number of bytes received.

Important
Yes, this means that each time we receive data, we only overwrite data up to a certain point in the buffer. If we read 512 bytes, then the last 512 bytes would be the 512 bytes from the previous input.

Listing Members

If, on the VM we used to join the cluster, we run !cluster_members, we get:

>>> !cluster_members
Listing Known Nodes:
>>> [
    ""
]

And if we run it on the VM we used to start the cluster, we get:

!cluster_members
Listing Known Nodes:
>>> []
We need to finish up the introduction process!

Receiving the Hello

Let’s start from the point of receiving the alias. In our function signature for listen, over in src/cluster/server.rs, we have:

pub fn listen(addr: SocketAddr, _connection_manager: Arc<RwLock<Manager>>) {
Change that to connection_manager, and then the function body to look like this:
pub fn listen(addr: SocketAddr, connection_manager: Arc<RwLock<Manager>>) {
    info!("Initializing Cluster server...");
    let listener = TcpListener::bind(addr).unwrap();
    for stream in listener.incoming() {
        let mut cmgr = connection_manager.clone();
        info!("New Node connected!");
        let stream = stream.unwrap();
        thread::spawn(move || {
            let mut buf = [0; 1024];
            let mut client = ClusterClient::new(stream);
            let bytes_read = client.reader.read(&mut buf).unwrap();
            let alias = String::from_utf8_lossy(&buf[0..bytes_read]);
            cmgr.write().unwrap().add_client(alias.to_string(), client);
            let mut client = cmgr.write().get_client(alias.to_string());
            client.run();
        });
    }
}
Two important things to note:

  1. We clone the connection_manager whenever we get a new connection. Since this is an Arc, we’re incrementing the reference count. When it goes out of scope, the reference count will go down. We do this because we don’t want to pass in connection_manager to the thread, because we couldn’t make more clones.

  2. Note how we are not calling client.run(); that’s because we pass ownership of the client to the connection manager. We’ll move that logic there.

Cluster Manager

Head over to src/cluster/manager.rs. Our add_client function looks like this:

pub fn add_client(&mut self, alias: NodeAlias, client: ClusterClient) -> bool {
    if self.clients.contains_key(&alias) {
        error!("Tried to add a client that already existed");
        return false;
    }
    debug!("Adding {}", alias);
    client.run();
    self.clients.insert(alias, client);
    true
}
Now we call run after it is added. Let’s see if that makes a difference:
>>> !join_cluster 127.0.0.1 2254
Attempting to join cluster...
Connected to cluster!
And then it will hang. Why? Well, remember how we moved the call to run to the connection manager? It’s not starting that in a separate thread, so it blocks the main thread.

The Fix

I tried a few different things, such as starting run in the the add_client function. But, since no matter what we’re doing things across threads, we need to wrap the client in an Arc.

Head to src/cluster/manager.rs and change the Manager struct to:

#[derive(Default)]
pub struct Manager {
    clients: HashMap<NodeAlias, Arc<RwLock<ClusterClient>>>,
}
Now let’s add a function to our Manager:
pub fn get_client(&mut self, alias: NodeAlias) -> Option<Arc<RwLock<ClusterClient>>> {
    Some(self.clients.get_mut(&alias).unwrap().clone())
}
NOTE: We’ll have to pretty this up later…​

And next we need to modify the add_client function:

pub fn add_client(&mut self, alias: NodeAlias, client: ClusterClient) -> bool {
    if self.clients.contains_key(&alias) {
        error!("Tried to add a client that already existed");
        return false;
    }
    let client = Arc::new(RwLock::new(client));
    self.clients.insert(alias.clone(), client);
    let cloned_client = self.get_client(alias).unwrap();
    thread::spawn(move || {
        cloned_client.write().unwrap().run();
    });
    true
}
What we’re doing here is wrapping the client in an Arc and RwLock, adding a reference to our HashMap, then spawning its run function in a background thread. Finally, we need to change our listen function in src/cluster/server.rs:
pub fn listen(addr: SocketAddr, connection_manager: Arc<RwLock<Manager>>) {
    info!("Initializing Cluster server...");
    let listener = TcpListener::bind(addr).unwrap();
    for stream in listener.incoming() {
        let mut cmgr = connection_manager.clone();
        info!("New Node connected!");
        let stream = stream.unwrap();
        thread::spawn(move || {
            let mut buf = [0; 1024];
            let mut client = ClusterClient::new(stream);
            let bytes_read = client.reader.read(&mut buf).unwrap();
            let alias = String::from_utf8_lossy(&buf[0..bytes_read]);
            let mut cmgr_lock = cmgr.write().unwrap();
            cmgr_lock.add_client(alias.to_string(), client);
        });
    }
}
Note how we no longer call run here. Let’s see what happens now if we try to cluster:
Fletchers-MBP:~ fletcher$ iridium
Welcome to Iridium! Let's be productive!
>>> !join_cluster 127.0.0.1 2254
Attempting to join cluster...
Connected to cluster!
>>>
Yay! Now let’s list the members…​
>>> !cluster_members
Listing Known Nodes:
>>> [
    ""
]
And if we list the cluster members on the VM we used to start the cluster:
>>> !cluster_members
Listing Known Nodes:
>>> [
    "65fda5a6-8b30-4cf1-926f-6fec0d842461"
]
Phew!

End

The reason we do not see any nodes on the joiner is because we haven’t written any code to send the other known nodes back to a client when it joins. We’ll do that next tutorial!


If you need some assistance with any of the topics in the tutorials, or just devops and application development in general, we offer consulting services. Check it out over here or click Services along the top.