So You Want to Build a Language VM - Part 28 - Clustering: Part 2

Finishes basic clustering

Intro

Hi, me again! In our last tutorial, we had added a separate TCP server to the Iridium VM. In this one, we’re going to finish up the client part, so that two Iridium VMs can talk to one another. Make sure you start this tutorial from this tag: https://gitlab.com/subnetzero/iridium/tags/0.0.27

Important
The code at the end of this is not working. We still have one more tutorial to do!

Join Cluster Command

For this tutorial, let’s add some variation to our lives and start by making the command to join a cluster. Right now, there is a !start_cluster command. Let’s add a !join_cluster command.

REPL

This is only a few steps:

  1. In src/repl/mod.rs, find the function called execute_command, and add !start_cluster to it. Something like: "!join_cluster" ⇒ self.join_cluster(&args[1..]),

  2. Make a stub function at the end

The stub function should look like:

fn join_cluster(&mut self, _args: &[&str]) {
    debug!("Attempting to join cluster...");
}
We’ll add along the way, of course.

Cluster Module

Since we are doing a full-mesh, every node is a server, and it connects as a client to every other node. This implies a few things:

  1. We need to keep a list of all known nodes

  2. We need to track the network connectivity to all our connected nodes

  3. We need to keep the list of nodes in sync across all nodes

Note
This is where we get to start doing some cool distributed systems stuff. =)

A Bit of Setup

  1. Make the directory src/cluster if you haven’t already

  2. Create an empty file client.rs

  3. Create an empty file server.rs

  4. Create an empty file mod.rs

  5. Create an empty file named manager.rs

In the mod.rs file, put:

pub mod server;
pub mod client;
pub mod manager;

type NodeAlias = String;

List of Nodes

Let’s start here. We can make a simple Manager struct that handles this for us. It will have the responsibility of tracking each connected client, adding new ones, and removing dead ones. Create src/cluster/manager.rs, and put the following in it. Since its a larger chunk, I’ll explain in comments in the code.

use std::collections::HashMap;

use cluster::client::ClusterClient;
use cluster::NodeAlias;

// Basic struct declaration. Right now, it has a simple HashMap of ClusterClients mapped by their NodeAlias
// Remember: NodeAlias is just a type alias for String.
pub struct Manager {
    clients: HashMap<NodeAlias, ClusterClient>
}

impl Manager {
    /// Basic constructor for the Manager
    pub fn new() -> Manager {
        Manager {
            clients: HashMap::new()
        }
    }

    /// Adds a client, which will be another cluster member. We should really return something other than a boolean,
    /// but we'll have to revisit that when we redo how we handle errors.
    pub fn add_client(&mut self, alias: NodeAlias, client: ClusterClient) -> bool {
        if self.clients.contains_key(&alias) {
            error!("Tried to add a client that already existed");
            return false;
        }
        self.clients.insert(alias, client);
        true
    }

    /// Delete a client by alias. Same story with the bool as above.
    pub fn del_client(&mut self, alias: NodeAlias) -> bool {
        self.clients.remove(&alias);
    }
}

// And of course some tests
#[cfg(test)]
mod test {
    use super::Manager;

    fn test_create_manager() {
        let test_manager = Manager::new();
    }
}

Messages

This is where things start to get tricky. We need a protocol to communicate back and forth between cluster members. All we can send are 0s and 1s. How is the receiver supposed to know where in that stream the node alias is, for example? A protocol let’s us define these things.

Making a Protocol

There’s a ton of already existing protocols. JSON is one. Protobuf. Flatbuf. We could also make up our own.

As an example, we could say that the first 64 bytes are the node alias. This has a few repercussions:

  1. Node aliases can only be 64 bytes or less

  2. We potentially waste space if the node alias is less than 64 bytes

But…​

I started implementing this part with Cap’n Proto, but in the end, I think it is too complex for a tutorial. So let’s do this a different way: JSON! But first, let’s go over the process of forming and joining a cluster

Forming a Cluster

One node has to bootstrap the cluster. Right now, that means running !start_cluster. Later on, we’ll do this with CLI flags or env vars. Starting a cluster binds that particular Iridium VM process to a specific port, and it begins listening for data.

We now have a cluster. A cluster of 1, but still.

Joining a Cluster

Now we can start a second Iridium VM, and use the !join_cluster with the arguments of the bootstrap node’s ip and port. It will connect. Now, a few things have to happen here. Think of it has a handshake, or the nodes introducing themselves to each other. If our nodes are called A and B, it would go something like:

B: "Hello A! My name is B!" A: "Hello B! My name is A! Here are all the other nodes I know about in this cluster!"

Of course, if we add in encryption, authentication, authorization, that interaction grows quite a bit more complicated. For the rest of this tutorial, let’s get this working.

Messages

We’ll represent each different message type using enums:

pub enum IridiumMessage {
  Hello{alias: String},
  HelloAck{alias: String, nodes: Vec<(String, String, String)}
}

The Hello contains the node alias of the node that wants to join the cluster. HelloAck responds with its own alias, and a list of all the nodes, their IPs, and ports. Our joiner will then need to go through each of those nodes, and introduce itself to them. But for now, let’s get the joiner making and sending a Hello.

Go ahead and open up src/cluster/message.rs and in it put:

pub enum IridiumMessage{
    Hello{alias: String},
    HelloAck{alias: String, nodes: Vec<(String, String, String)>}
}

And don’t forget to add a pub mod message; in mod.rs.

The Client

Open up src/cluster/client.rs. To the ClusterClient impl, add the following function:

pub fn send_hello(&self) {
    let msg = IridiumMessage{alias: ....};
}
Crap. We don’t have the alias stored in the TcpClient. So let’s add a field to the struct:
pub struct ClusterClient {
    alias: String,
    reader: BufReader<TcpStream>,
    writer: BufWriter<TcpStream>,
    rx: Option<Receiver<String>>,
    tx: Option<Sender<String>>,
    raw_stream: TcpStream,
}

And then in the impl, we need to change the new function up:

pub fn new(stream: TcpStream, alias: String) -> ClusterClient {
    // TODO: Handle this better
    let reader = stream.try_clone().unwrap();
    let writer = stream.try_clone().unwrap();
    let (tx, rx) = channel();
    ClusterClient {
        reader: BufReader::new(reader),
        writer: BufWriter::new(writer),
        raw_stream: stream,
        tx: Some(tx),
        rx: Some(rx),
        alias: alias,
    }
}

Now let’s head over to repl/mod.rs, and down to the join_cluster method. I’ve commented the code inline.

fn join_cluster(&mut self, args: &[&str]) {
  self.send_message(format!("Attempting to join cluster..."));
  // Extract the IP and Port arguments passed in
  let ip = args[0];
  let port = args[1];
  // Convert them to a form we can use as a SocketAddr
  let addr = (ip.to_owned() + ":" + port);
  // Attempt to make the actual connection
  if let Ok(stream) = TcpStream::connect(addr) {
      self.send_message(format!("Connected to cluster!"));
      // Adds the remote cluster to our list of connected clusters
      let cc = cluster::client::ClusterClient::new(stream);
      if let Some(ref a) = self.vm.alias {
          self.connection_manager.add_client(a.to_string(), cc);
      }
  } else {
      self.send_message(format!("Could not connect to cluster!"));
  }
}

Testing

For basic testing, I open a terminal, and split the screen vertically. In one, I do this:

$ iridium --node-alias=node1
Welcome to Iridium! Let's be productive!
>>> !start_cluster
Starting cluster server!
>>>
And in the other:
$ iridium --node-alias=node2
Welcome to Iridium! Let's be productive!
>>> !join_cluster 127.0.0.1 2254
Attempting to join cluster...
Connected to cluster!
>>>

Listing Members

Let’s add in a little utility command that lists all the cluster members we know about. First, we need to add a function to src/cluster/manager.rs to let us get a list of aliases:

pub fn get_client_names(&self) -> Vec<String> {
    let mut results = vec![];
    for (alias, _) in &self.clients {
        results.push(alias.to_owned());
    }
    results
}
In src/repl/mod.rs, add a command called !cluster_members. It should call a function, cluster_members, that is:
fn cluster_members(&mut self, args: &[&str]) {
    self.send_message(format!("Listing Known Nodes:"));
    let cluster_members = self.connection_manager.get_client_names();
    self.send_message(format!("{:#?}", cluster_members));
}

Now if we start two iridium VMs, join them, and list the members, we see…​an empty list!

Wrapping Up

This is a good place to stop, as we’ll need to make some more changes to get all this hooked up. But we now have nodes that can join each other!


If you need some assistance with any of the topics in the tutorials, or just devops and application development in general, we offer consulting services. Check it out over here or click Services along the top.