So You Want to Build a Language VM - Part 28 - Clustering: Part 2
Finishes basic clustering
Intro
Hi, me again! In our last tutorial, we had added a separate TCP server to the Iridium VM. In this one, we’re going to finish up the client part, so that two Iridium VMs can talk to one another. Make sure you start this tutorial from this tag: https://gitlab.com/subnetzero/iridium/tags/0.0.27
Important | The code at the end of this is not working. We still have one more tutorial to do! |
Join Cluster Command
For this tutorial, let’s add some variation to our lives and start by making the command to join a cluster. Right now, there is a !start_cluster
command. Let’s add a !join_cluster
command.
REPL
This is only a few steps:
In
src/repl/mod.rs
, find the function calledexecute_command
, and add!start_cluster
to it. Something like:"!join_cluster" ⇒ self.join_cluster(&args[1..]),
Make a stub function at the end
The stub function should look like:
fn join_cluster(&mut self, _args: &[&str]) {
debug!("Attempting to join cluster...");
}
Cluster Module
Since we are doing a full-mesh, every node is a server, and it connects as a client to every other node. This implies a few things:
We need to keep a list of all known nodes
We need to track the network connectivity to all our connected nodes
We need to keep the list of nodes in sync across all nodes
Note | This is where we get to start doing some cool distributed systems stuff. =) |
A Bit of Setup
Make the directory
src/cluster
if you haven’t alreadyCreate an empty file
client.rs
Create an empty file
server.rs
Create an empty file
mod.rs
Create an empty file named
manager.rs
In the mod.rs
file, put:
pub mod server;
pub mod client;
pub mod manager;
type NodeAlias = String;
List of Nodes
Let’s start here. We can make a simple Manager struct that handles this for us. It will have the responsibility of tracking each connected client, adding new ones, and removing dead ones. Create src/cluster/manager.rs
, and put the following in it. Since its a larger chunk, I’ll explain in comments in the code.
use std::collections::HashMap;
use cluster::client::ClusterClient;
use cluster::NodeAlias;
// Basic struct declaration. Right now, it has a simple HashMap of ClusterClients mapped by their NodeAlias
// Remember: NodeAlias is just a type alias for String.
pub struct Manager {
clients: HashMap<NodeAlias, ClusterClient>
}
impl Manager {
/// Basic constructor for the Manager
pub fn new() -> Manager {
Manager {
clients: HashMap::new()
}
}
/// Adds a client, which will be another cluster member. We should really return something other than a boolean,
/// but we'll have to revisit that when we redo how we handle errors.
pub fn add_client(&mut self, alias: NodeAlias, client: ClusterClient) -> bool {
if self.clients.contains_key(&alias) {
error!("Tried to add a client that already existed");
return false;
}
self.clients.insert(alias, client);
true
}
/// Delete a client by alias. Same story with the bool as above.
pub fn del_client(&mut self, alias: NodeAlias) -> bool {
self.clients.remove(&alias);
}
}
// And of course some tests
#[cfg(test)]
mod test {
use super::Manager;
fn test_create_manager() {
let test_manager = Manager::new();
}
}
Messages
This is where things start to get tricky. We need a protocol to communicate back and forth between cluster members. All we can send are 0s and 1s. How is the receiver supposed to know where in that stream the node alias is, for example? A protocol let’s us define these things.
Making a Protocol
There’s a ton of already existing protocols. JSON is one. Protobuf. Flatbuf. We could also make up our own.
As an example, we could say that the first 64 bytes are the node alias. This has a few repercussions:
Node aliases can only be 64 bytes or less
We potentially waste space if the node alias is less than 64 bytes
But…
I started implementing this part with Cap’n Proto, but in the end, I think it is too complex for a tutorial. So let’s do this a different way: JSON! But first, let’s go over the process of forming and joining a cluster
Forming a Cluster
One node has to bootstrap the cluster. Right now, that means running !start_cluster
. Later on, we’ll do this with CLI flags or env vars. Starting a cluster binds that particular Iridium VM process to a specific port, and it begins listening for data.
We now have a cluster. A cluster of 1, but still.
Joining a Cluster
Now we can start a second Iridium VM, and use the !join_cluster
with the arguments of the bootstrap node’s ip and port. It will connect. Now, a few things have to happen here. Think of it has a handshake, or the nodes introducing themselves to each other. If our nodes are called A and B, it would go something like:
B: "Hello A! My name is B!" A: "Hello B! My name is A! Here are all the other nodes I know about in this cluster!"
Of course, if we add in encryption, authentication, authorization, that interaction grows quite a bit more complicated. For the rest of this tutorial, let’s get this working.
Messages
We’ll represent each different message type using enums:
pub enum IridiumMessage {
Hello{alias: String},
HelloAck{alias: String, nodes: Vec<(String, String, String)}
}
The Hello
contains the node alias of the node that wants to join the cluster. HelloAck
responds with its own alias, and a list of all the nodes, their IPs, and ports. Our joiner will then need to go through each of those nodes, and introduce itself to them. But for now, let’s get the joiner making and sending a Hello
.
Go ahead and open up src/cluster/message.rs
and in it put:
pub enum IridiumMessage{
Hello{alias: String},
HelloAck{alias: String, nodes: Vec<(String, String, String)>}
}
And don’t forget to add a pub mod message;
in mod.rs
.
The Client
Open up src/cluster/client.rs
. To the ClusterClient impl, add the following function:
pub fn send_hello(&self) {
let msg = IridiumMessage{alias: ....};
}
pub struct ClusterClient {
alias: String,
reader: BufReader<TcpStream>,
writer: BufWriter<TcpStream>,
rx: Option<Receiver<String>>,
tx: Option<Sender<String>>,
raw_stream: TcpStream,
}
And then in the impl
, we need to change the new
function up:
pub fn new(stream: TcpStream, alias: String) -> ClusterClient {
// TODO: Handle this better
let reader = stream.try_clone().unwrap();
let writer = stream.try_clone().unwrap();
let (tx, rx) = channel();
ClusterClient {
reader: BufReader::new(reader),
writer: BufWriter::new(writer),
raw_stream: stream,
tx: Some(tx),
rx: Some(rx),
alias: alias,
}
}
Now let’s head over to repl/mod.rs
, and down to the join_cluster
method. I’ve commented the code inline.
fn join_cluster(&mut self, args: &[&str]) {
self.send_message(format!("Attempting to join cluster..."));
// Extract the IP and Port arguments passed in
let ip = args[0];
let port = args[1];
// Convert them to a form we can use as a SocketAddr
let addr = (ip.to_owned() + ":" + port);
// Attempt to make the actual connection
if let Ok(stream) = TcpStream::connect(addr) {
self.send_message(format!("Connected to cluster!"));
// Adds the remote cluster to our list of connected clusters
let cc = cluster::client::ClusterClient::new(stream);
if let Some(ref a) = self.vm.alias {
self.connection_manager.add_client(a.to_string(), cc);
}
} else {
self.send_message(format!("Could not connect to cluster!"));
}
}
Testing
For basic testing, I open a terminal, and split the screen vertically. In one, I do this:
$ iridium --node-alias=node1
Welcome to Iridium! Let's be productive!
>>> !start_cluster
Starting cluster server!
>>>
$ iridium --node-alias=node2
Welcome to Iridium! Let's be productive!
>>> !join_cluster 127.0.0.1 2254
Attempting to join cluster...
Connected to cluster!
>>>
Listing Members
Let’s add in a little utility command that lists all the cluster members we know about. First, we need to add a function to src/cluster/manager.rs
to let us get a list of aliases:
pub fn get_client_names(&self) -> Vec<String> {
let mut results = vec![];
for (alias, _) in &self.clients {
results.push(alias.to_owned());
}
results
}
src/repl/mod.rs
, add a command called !cluster_members
. It should call a function, cluster_members
, that is:fn cluster_members(&mut self, args: &[&str]) {
self.send_message(format!("Listing Known Nodes:"));
let cluster_members = self.connection_manager.get_client_names();
self.send_message(format!("{:#?}", cluster_members));
}
Now if we start two iridium VMs, join them, and list the members, we see…an empty list!
Wrapping Up
This is a good place to stop, as we’ll need to make some more changes to get all this hooked up. But we now have nodes that can join each other!
If you need some assistance with any of the topics in the tutorials, or just devops and application development in general, we offer consulting services. Check it out over here or click Services along the top.