So You Want to Build a Language VM - Part 29 - Clustering: Part 3
Explores making clustering thread-safe
Intro
If you tried to compile the code from tutorial 28, you probably saw a lot of errors about not being able to send Strings across mpsc channels. The reasons behind this is worth a paragraph or three on why.
mpsc Channels
In Rust, we have something called mpsc channels, or multi-producer single-consumer. Think of it as a 100 people talking to 1 person. That 1 person receives and processes each message.
Here’s a simple example of creating a channel:
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
}
And that’s it! A few things to note, though:
The
tx
variable is the producer (or transmit) sideThe
rx
variable is the consumer (or receiver) side
Simple Usage Example
fn main() {
let (tx, rx) = mpsc::channel();
tx.send(1).unwrap();
println!("rx got: {}", rx.recv().unwrap());
}
The Multi Part
There’s a big difference between the tx side and the rx side: you can do let other_sender = tx.clone()
. Then anything you send down that will go to the same receiver.
A common way to handle that is to have the receiver loop forever, something like:
thread::spawn(move || {
loop {
if let Ok(msg) = rx.recv() {
print("{}", msg);
}
}
});
Important | We CANNOT clone the rx end like we can the tx end. We can only move it. |
The recv()
call blocks, so we won’t burn CPU. Whenever a message arrives, we just handle it.
Our Problem
We need a central repository of clients per VM. But we listen for those clients on separate threads. Those errors the compiler is showing us indicate a fundamental flaw in our architecture.
Note | The Rust model of ownership is a pretty good model in general, and the strictness of the compiler helps enforce architectures that tend to be decent. |
The Manager
We have this connection manager struct. And right now, one VM binds to one socket. Which means each VM needs a manager. So let’s address that first. In vm.rs
, add a field:
pub connection_manager: Arc<RwLock<Manager>>,
Note that we wrap it in a Arc and RwLock so that all of our client threads can access it. Down in the new()
function for the VM
, add this:
connection_manager: Arc::new(RwLock::new(Manager::new())),
You probably see an error like this:
error[E0277]: `std::sync::mpsc::Sender<std::string::String>` cannot be shared between threads safely
--> src/scheduler/mod.rs:20:9
|
20 | thread::spawn(move || {
| ^^^^^^^^^^^^^ `std::sync::mpsc::Sender<std::string::String>` cannot be shared between threads safely
error[E0277]: `std::sync::mpsc::Sender<std::string::String>` cannot be shared between threads safely
--> src/scheduler/mod.rs:20:9
|
20 | thread::spawn(move || {
| ^^^^^^^^^^^^^ `std::sync::mpsc::Sender<std::string::String>` cannot be shared between threads safely
|
Our problematic function is this one:
/// Takes a VM and runs it in a background thread
pub fn get_thread(&mut self, mut vm: VM) -> thread::JoinHandle<Vec<VMEvent>> {
thread::spawn(move || {
let events = vm.run();
println!("VM Events");
println!("--------------------------");
for event in &events {
println!("{:#?}", event);
}
events
})
}
It is trying to move a VM into a thread so it can start executing. But the VM contains a Manager, which contains ClusterClients, which contain mpsc channels. And remember, we can’t send those willy-nilly across threads.
For fun, let’s see what happens if we wrap the channels in Arc<Mutex<>>. In src/cluster/client.rs
let’s replace:
rx: Option<Receiver<String>>,
tx: Option<Sender<String>>,
rx: Option<Arc<Mutex<Receiver<String>>>>,
tx: Option<Arc<Mutex<Sender<String>>>>,
Now you probably see errors like:
error[E0599]: no method named `recv` found for type `std::sync::Arc<std::sync::Mutex<std::sync::mpsc::Receiver<std::string::String>>>` in the current scope
--> src/cluster/client.rs:66:28
|
66 | match chan.recv() {
| ^^^^
error: aborting due to previous error
For more information about this error, try `rustc --explain E0599`.
error: Could not compile `iridium`.
warning: build failed, waiting for other jobs to finish...
error[E0599]: no method named `recv` found for type `std::sync::Arc<std::sync::Mutex<std::sync::mpsc::Receiver<std::string::String>>>` in the current scope
--> src/cluster/client.rs:66:28
|
66 | match chan.recv() {
| ^^^^
error: aborting due to previous error
If you replace Mutex with RwLock, you’ll note that it doesn’t work. Why not? Well, I will steal from the Rust docs:
This type of lock allows a number of readers or at most one writer at any point in time. The write portion of this lock typically allows modification of the underlying data (exclusive access) and the read portion of this lock typically allows for read-only access (shared access).
In comparison, a Mutex does not distinguish between readers or writers that acquire the lock, therefore blocking any threads waiting for the lock to become available. An RwLock will allow any number of readers to acquire the lock as long as a writer is not holding the lock.
The priority policy of the lock is dependent on the underlying operating system's implementation, and this type does not guarantee that any particular policy will be used.
The type parameter T represents the data that this lock protects. It is required that T satisfies Send to be shared across threads and Sync to allow concurrent access through readers. The RAII guards returned from the locking methods implement Deref (and DerefMut for the write methods) to allow access to the content of the lock.
The last paragraph is key: T (in this case, our channels) only have the Send
attribute. Using a Mutex gives them Sync
, which let’s them move across threads.
Resolving recv() Issue
Now let’s see if we can resolve that issue. In src/cluster/client.rs
, we can replace the recv_loop()
function with:
fn recv_loop(&mut self) {
let chan = self.rx.take().unwrap();
let mut writer = self.raw_stream.try_clone().unwrap();
let _t = thread::spawn(move || {
loop {
if let Ok(locked_rx) = chan.lock() {
match locked_rx.recv() {
Ok(msg) => {
match writer.write_all(msg.as_bytes()) {
Ok(_) => {}
Err(_e) => {}
};
match writer.flush() {
Ok(_) => {}
Err(_e) => {}
};
}
Err(_e) => {}
}
}
}
});
}
Since we now have a Mutex around the chan, we need to lock it in order to call recv on it. Since this client is the only one using this particular rx
end, this is fine.
Compiles?!
Finally! It compiles! We still don’t quite have it where we want to, though.
Accepting a new ClusterClient
Right now, we listen for connections in src/cluster/server.rs
, in the listen
function:
pub fn listen(addr: SocketAddr) {
info!("Initializing Cluster server...");
let listener = TcpListener::bind(addr).unwrap();
for stream in listener.incoming() {
info!("New Node connected!");
let stream = stream.unwrap();
thread::spawn(move || {
let mut client = ClusterClient::new(stream);
client.run();
});
}
}
See how we don’t have the VM anywhere in there? Or the Manager? So we can’t add it there. Let’s try passing in the Manager. In src/vm.rs
, in bind_cluster_server
, we have this:
pub fn bind_cluster_server(&mut self) {
if let Some(ref addr) = self.server_addr {
if let Some(ref port) = self.server_port {
let socket_addr: SocketAddr = (addr.to_string() + ":" + port).parse().unwrap();
thread::spawn(move || {
cluster::server::listen(socket_addr);
});
} else {
error!("Unable to bind to cluster server address: {}", addr);
}
} else {
error!("Unable to bind to cluster server port: {:?}", self.server_port);
}
}
pub fn bind_cluster_server(&mut self) {
if let Some(ref addr) = self.server_addr {
if let Some(ref port) = self.server_port {
let socket_addr: SocketAddr = (addr.to_string() + ":" + port).parse().unwrap();
// Note that we have to make a clone here before we move it into the thread
let clone = self.connection_manager.clone();
thread::spawn(move || {
// Otherwise, we'd be trying to move the whole thing out of the VM, not an Arc
cluster::server::listen(socket_addr, clone);
});
} else {
error!("Unable to bind to cluster server address: {}", addr);
}
} else {
error!("Unable to bind to cluster server port: {:?}", self.server_port);
}
}
Of course, we then need to change the signature of the listen function in src/cluster/server.rs
to:
pub fn listen(addr: SocketAddr, connection_manager: Arc<RwLock<Manager>>) {
And don’t forget to add:
use cluster::manager::Manager;
If you run cargo test
, it should compile now. Yay!
Adding the ClusterClient to the Manager
Now if we head over to src/cluster/server.rs
, we now have a Arc<Mutex<Manager>>
we can use to add the client.
Except…
Because they just connected, we don’t know their alias yet.
Who….are you?
Because our HashMap uses the node alias as the key, we need SOMETHING to put there. There’s two obvious options:
We have the ClusterClient send its alias first
We generate a random UUID for the alias, until the client sends us its proper one
In server.rs
, note this line:
client.run();
We COULD do something like this:
thread::spawn(move || {
let mut buf = String::new();
let mut client = ClusterClient::new(stream);
// Once this call succeeds, we'll hopefully have the node alias in the string buffer
let bytes_read = client.read(&buf);
client.run();
});
Of course, this means we need to write the node alias…so head over src/repl/mod.rs
. The join_cluster
function for ClusterClient
looks like:
fn join_cluster(&mut self, args: &[&str]) {
self.send_message(format!("Attempting to join cluster..."));
let ip = args[0];
let port = args[1];
let addr = ip.to_owned() + ":" + port;
if let Ok(stream) = TcpStream::connect(addr) {
self.send_message(format!("Connected to cluster!"));
let mut cc = cluster::client::ClusterClient::new(stream);
if let Some(ref a) = self.vm.alias {
if let Ok(mut lock) = self.vm.connection_manager.write() {
lock.add_client(a.to_string(), cc);
}
}
} else {
self.send_message(format!("Could not connect to cluster!"));
}
}
send_hello
(defined in the ClusterClient), like this:fn join_cluster(&mut self, args: &[&str]) {
self.send_message(format!("Attempting to join cluster..."));
let ip = args[0];
let port = args[1];
let addr = ip.to_owned() + ":" + port;
if let Ok(stream) = TcpStream::connect(addr) {
self.send_message(format!("Connected to cluster!"));
let mut cc = cluster::client::ClusterClient::new(stream);
cc.send_hello();
if let Some(ref a) = self.vm.alias {
if let Ok(mut lock) = self.vm.connection_manager.write() {
lock.add_client(a.to_string(), cc);
}
}
} else {
self.send_message(format!("Could not connect to cluster!"));
}
}
And naturally, the compiler wants us to change the listen
function in src/cluster/server
to:
pub fn listen(addr: SocketAddr, connection_manager: Arc<RwLock<Manager>>) {
info!("Initializing Cluster server...");
let listener = TcpListener::bind(addr).unwrap();
for stream in listener.incoming() {
info!("New Node connected!");
let stream = stream.unwrap();
thread::spawn(move || {
let mut buf = [0; 1024];
let mut client = ClusterClient::new(stream);
let bytes_read = client.reader.read(&mut buf);
let alias = String::from_utf8_lossy(&buf);
println!("Alias is: {:?}", alias);
client.run();
});
}
}
The change is the pre-allocated slice of 1024 bytes into which data is read. With all that, the tests should pass!
Connection Test
Do the start two VMs, !start_cluster
on one, !join_cluster
on the other, and you’ll probably see something like:
Started cluster server!
>>> Alias is: "\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u
End
We’ll get into why next tutorial. =) We’re getting there!
If you need some assistance with any of the topics in the tutorials, or just devops and application development in general, we offer consulting services. Check it out over here or click Services along the top.