So You Want to Build a Language VM - Part 29 - Clustering: Part 3

Explores making clustering thread-safe

Intro

If you tried to compile the code from tutorial 28, you probably saw a lot of errors about not being able to send Strings across mpsc channels. The reasons behind this is worth a paragraph or three on why.

mpsc Channels

In Rust, we have something called mpsc channels, or multi-producer single-consumer. Think of it as a 100 people talking to 1 person. That 1 person receives and processes each message.

Here’s a simple example of creating a channel:

use std::sync::mpsc;
fn main() {
    let (tx, rx) = mpsc::channel();
}

And that’s it! A few things to note, though:

  1. The tx variable is the producer (or transmit) side

  2. The rx variable is the consumer (or receiver) side

Simple Usage Example

fn main() {
    let (tx, rx) = mpsc::channel();
    tx.send(1).unwrap();
    println!("rx got: {}", rx.recv().unwrap());
}

The Multi Part

There’s a big difference between the tx side and the rx side: you can do let other_sender = tx.clone(). Then anything you send down that will go to the same receiver.

A common way to handle that is to have the receiver loop forever, something like:

thread::spawn(move || {
  loop {
    if let Ok(msg) = rx.recv() {
       print("{}", msg);
    }
  }
});
Important
We CANNOT clone the rx end like we can the tx end. We can only move it.

The recv() call blocks, so we won’t burn CPU. Whenever a message arrives, we just handle it.

Our Problem

We need a central repository of clients per VM. But we listen for those clients on separate threads. Those errors the compiler is showing us indicate a fundamental flaw in our architecture.

Note
The Rust model of ownership is a pretty good model in general, and the strictness of the compiler helps enforce architectures that tend to be decent.

The Manager

We have this connection manager struct. And right now, one VM binds to one socket. Which means each VM needs a manager. So let’s address that first. In vm.rs, add a field:

pub connection_manager: Arc<RwLock<Manager>>,

Note that we wrap it in a Arc and RwLock so that all of our client threads can access it. Down in the new() function for the VM, add this:

connection_manager: Arc::new(RwLock::new(Manager::new())),

You probably see an error like this:

error[E0277]: `std::sync::mpsc::Sender<std::string::String>` cannot be shared between threads safely
  --> src/scheduler/mod.rs:20:9
   |
20 |         thread::spawn(move || {
   |         ^^^^^^^^^^^^^ `std::sync::mpsc::Sender<std::string::String>` cannot be shared between threads safely
error[E0277]: `std::sync::mpsc::Sender<std::string::String>` cannot be shared between threads safely
  --> src/scheduler/mod.rs:20:9
   |
20 |         thread::spawn(move || {
   |         ^^^^^^^^^^^^^ `std::sync::mpsc::Sender<std::string::String>` cannot be shared between threads safely
   |

Our problematic function is this one:

/// Takes a VM and runs it in a background thread
pub fn get_thread(&mut self, mut vm: VM) -> thread::JoinHandle<Vec<VMEvent>> {
    thread::spawn(move || {
        let events = vm.run();
        println!("VM Events");
        println!("--------------------------");
        for event in &events {
            println!("{:#?}", event);
        }
        events
    })
}

It is trying to move a VM into a thread so it can start executing. But the VM contains a Manager, which contains ClusterClients, which contain mpsc channels. And remember, we can’t send those willy-nilly across threads.

For fun, let’s see what happens if we wrap the channels in Arc<Mutex<>>. In src/cluster/client.rs let’s replace:

rx: Option<Receiver<String>>,
tx: Option<Sender<String>>,
with:
rx: Option<Arc<Mutex<Receiver<String>>>>,
tx: Option<Arc<Mutex<Sender<String>>>>,

Now you probably see errors like:

error[E0599]: no method named `recv` found for type `std::sync::Arc<std::sync::Mutex<std::sync::mpsc::Receiver<std::string::String>>>` in the current scope
  --> src/cluster/client.rs:66:28
   |
66 |                 match chan.recv() {
   |                            ^^^^

error: aborting due to previous error

For more information about this error, try `rustc --explain E0599`.
error: Could not compile `iridium`.
warning: build failed, waiting for other jobs to finish...
error[E0599]: no method named `recv` found for type `std::sync::Arc<std::sync::Mutex<std::sync::mpsc::Receiver<std::string::String>>>` in the current scope
  --> src/cluster/client.rs:66:28
   |
66 |                 match chan.recv() {
   |                            ^^^^

error: aborting due to previous error

If you replace Mutex with RwLock, you’ll note that it doesn’t work. Why not? Well, I will steal from the Rust docs:

This type of lock allows a number of readers or at most one writer at any point in time. The write portion of this lock typically allows modification of the underlying data (exclusive access) and the read portion of this lock typically allows for read-only access (shared access).

In comparison, a Mutex does not distinguish between readers or writers that acquire the lock, therefore blocking any threads waiting for the lock to become available. An RwLock will allow any number of readers to acquire the lock as long as a writer is not holding the lock.

The priority policy of the lock is dependent on the underlying operating system's implementation, and this type does not guarantee that any particular policy will be used.

The type parameter T represents the data that this lock protects. It is required that T satisfies Send to be shared across threads and Sync to allow concurrent access through readers. The RAII guards returned from the locking methods implement Deref (and DerefMut for the write methods) to allow access to the content of the lock.

The last paragraph is key: T (in this case, our channels) only have the Send attribute. Using a Mutex gives them Sync, which let’s them move across threads.

Resolving recv() Issue

Now let’s see if we can resolve that issue. In src/cluster/client.rs, we can replace the recv_loop() function with:

fn recv_loop(&mut self) {
    let chan = self.rx.take().unwrap();
    let mut writer = self.raw_stream.try_clone().unwrap();
    let _t = thread::spawn(move || {
        loop {
            if let Ok(locked_rx) = chan.lock() {
                match locked_rx.recv() {
                    Ok(msg) => {
                        match writer.write_all(msg.as_bytes()) {
                            Ok(_) => {}
                            Err(_e) => {}
                        };
                        match writer.flush() {
                            Ok(_) => {}
                            Err(_e) => {}
                        };
                    }
                    Err(_e) => {}
                }
            }
        }
    });
}

Since we now have a Mutex around the chan, we need to lock it in order to call recv on it. Since this client is the only one using this particular rx end, this is fine.

Compiles?!

Finally! It compiles! We still don’t quite have it where we want to, though.

Accepting a new ClusterClient

Right now, we listen for connections in src/cluster/server.rs, in the listen function:

pub fn listen(addr: SocketAddr) {
    info!("Initializing Cluster server...");
    let listener = TcpListener::bind(addr).unwrap();
    for stream in listener.incoming() {
        info!("New Node connected!");
        let stream = stream.unwrap();
        thread::spawn(move || {
            let mut client = ClusterClient::new(stream);
            client.run();
        });
    }
}

See how we don’t have the VM anywhere in there? Or the Manager? So we can’t add it there. Let’s try passing in the Manager. In src/vm.rs, in bind_cluster_server, we have this:

pub fn bind_cluster_server(&mut self) {
    if let Some(ref addr) = self.server_addr {
        if let Some(ref port) = self.server_port {
            let socket_addr: SocketAddr = (addr.to_string() + ":" + port).parse().unwrap();
            thread::spawn(move || {
                cluster::server::listen(socket_addr);
            });
        } else {
            error!("Unable to bind to cluster server address: {}", addr);
        }
    } else {
        error!("Unable to bind to cluster server port: {:?}", self.server_port);
    }
}
Let’s change it to:
pub fn bind_cluster_server(&mut self) {
    if let Some(ref addr) = self.server_addr {
        if let Some(ref port) = self.server_port {
            let socket_addr: SocketAddr = (addr.to_string() + ":" + port).parse().unwrap();
            // Note that we have to make a clone here before we move it into the thread
            let clone = self.connection_manager.clone();
            thread::spawn(move || {
                // Otherwise, we'd be trying to move the whole thing out of the VM, not an Arc
                cluster::server::listen(socket_addr, clone);
            });
        } else {
            error!("Unable to bind to cluster server address: {}", addr);
        }
    } else {
        error!("Unable to bind to cluster server port: {:?}", self.server_port);
    }
}

Of course, we then need to change the signature of the listen function in src/cluster/server.rs to:

pub fn listen(addr: SocketAddr, connection_manager: Arc<RwLock<Manager>>) {

And don’t forget to add:

use cluster::manager::Manager;

If you run cargo test, it should compile now. Yay!

Adding the ClusterClient to the Manager

Now if we head over to src/cluster/server.rs, we now have a Arc<Mutex<Manager>> we can use to add the client.

Except…​

Because they just connected, we don’t know their alias yet.

Who…​.are you?

Because our HashMap uses the node alias as the key, we need SOMETHING to put there. There’s two obvious options:

  1. We have the ClusterClient send its alias first

  2. We generate a random UUID for the alias, until the client sends us its proper one

In server.rs, note this line:

client.run();

We COULD do something like this:

thread::spawn(move || {
    let mut buf = String::new();
    let mut client = ClusterClient::new(stream);
    // Once this call succeeds, we'll hopefully have the node alias in the string buffer
    let bytes_read = client.read(&buf);
    client.run();
});

Of course, this means we need to write the node alias…​so head over src/repl/mod.rs. The join_cluster function for ClusterClient looks like:

fn join_cluster(&mut self, args: &[&str]) {
    self.send_message(format!("Attempting to join cluster..."));
    let ip = args[0];
    let port = args[1];
    let addr = ip.to_owned() + ":" + port;
    if let Ok(stream) = TcpStream::connect(addr) {
        self.send_message(format!("Connected to cluster!"));
        let mut cc = cluster::client::ClusterClient::new(stream);
        if let Some(ref a) = self.vm.alias {
            if let Ok(mut lock) = self.vm.connection_manager.write() {
                lock.add_client(a.to_string(), cc);
            }
        }
    } else {
        self.send_message(format!("Could not connect to cluster!"));
    }
}
Let’s try adding a call to send_hello (defined in the ClusterClient), like this:
fn join_cluster(&mut self, args: &[&str]) {
    self.send_message(format!("Attempting to join cluster..."));
    let ip = args[0];
    let port = args[1];
    let addr = ip.to_owned() + ":" + port;
    if let Ok(stream) = TcpStream::connect(addr) {
        self.send_message(format!("Connected to cluster!"));
        let mut cc = cluster::client::ClusterClient::new(stream);
        cc.send_hello();
        if let Some(ref a) = self.vm.alias {
            if let Ok(mut lock) = self.vm.connection_manager.write() {
                lock.add_client(a.to_string(), cc);
            }
        }
    } else {
        self.send_message(format!("Could not connect to cluster!"));
    }
}

And naturally, the compiler wants us to change the listen function in src/cluster/server to:

pub fn listen(addr: SocketAddr, connection_manager: Arc<RwLock<Manager>>) {
    info!("Initializing Cluster server...");
    let listener = TcpListener::bind(addr).unwrap();
    for stream in listener.incoming() {
        info!("New Node connected!");
        let stream = stream.unwrap();
        thread::spawn(move || {
            let mut buf = [0; 1024];
            let mut client = ClusterClient::new(stream);
            let bytes_read = client.reader.read(&mut buf);
            let alias = String::from_utf8_lossy(&buf);
            println!("Alias is: {:?}", alias);
            client.run();
        });
    }
}

The change is the pre-allocated slice of 1024 bytes into which data is read. With all that, the tests should pass!

Connection Test

Do the start two VMs, !start_cluster on one, !join_cluster on the other, and you’ll probably see something like:

Started cluster server!
>>> Alias is: "\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u

End

We’ll get into why next tutorial. =) We’re getting there!


If you need some assistance with any of the topics in the tutorials, or just devops and application development in general, we offer consulting services. Check it out over here or click Services along the top.