So You Want to Build a Language VM - Part 32 - More Clustering?!

More Clustering?!

Intro

Hello again everyone! In this tutorial, we’re going to continue to work on clustering. When we left off in the last tutorial, we had the joiner node sending a hello message and the server node adding it to its list. The next tasks are:

  1. Send a hello back

  2. Send a list of all known nodes to the new joiner == Full-mesh Network Remember how I mentioned we’d be doing a full mesh network? I realized an illustration might be handy, so more beautiful text art!

         ┌───────────────────────────────┐
         │                               │
         │                               ▼
   ┌───────────┐                   ┌───────────┐
   │           │                   │           │
   │           │                   │           │
┌─▶│   VM 1    │         ┌─────────│   VM 2    │
│  │           │         │         │           │
│  │           │         │         │           │
│  └───────────┘         │         └───────────┘
│        │               │               ▲
│        │               │               │
│        │               ▼               │
│        │         ┌───────────┐         │
│        │         │           │         │
│        │         │           │         │
│        └────────▶│   VM 3    │─────────┘
│                  │           │
│                  │           │
│                  └───────────┘
│                        │
│                        │
└────────────────────────┘

See how each node is connected to every other node? This has advantages and disadvantages:

Advantages

  1. Relatively simple from a coding standpoint. You just send the entire list to any new joiner. When one leaves, everyone knows about it and removes it from their list.

  2. With so many connections, the cluster has a high level of resiliency to transient network issues.

  3. Full-mesh topologies are easy to think about

Disadvantages

  1. TCP Connections to other nodes take a surprising amount of resources. In terms of memory, this can be a few hundred to a few thousand bytes.

  2. Heartbeats

That last one bears a longer explanation, so it gets a subsection!

Heartbeats

In the diagram above, how does VM2 know that VM1 is up and running? They send messages back and forth at regular intervals to communicate their status. VM2 also does this to VM3. VM1 sends to VM2 and…​well, you get the idea.

The more nodes you add, the more heartbeats are required simply for the cluster to function. Historically, there’s been a limit of about 50 nodes to an Erlang cluster. One important thing to realize, though, is that for the purposes of the heartbeats of those 50 nodes, it doesn’t matter if they are 2 CPU, 2 GB of RAM tiny machines or a 128 CPU 5 TB of RAM monster. This is one case where you can combine vertical and horizontal scaling and get a lot of mileage.

Other Models

Yes, there are other models that would let us scale well beyond ~50 nodes. But we aren’t going to use them.

Yet.

These are more complex to implement, and for right now, this model will work fine for our purposes.

To the Code!

Most of the work we’ll be doing is in the src/cluster/client.rs file. We have the function send_hello, which is fine. The function that receives it, however is the recv_loop function, shown below:

pub fn run(&mut self) {
    self.recv_loop();
    let mut buf = String::new();
    loop {
        match self.reader.read_line(&mut buf) {
            Ok(_) => {
                buf.trim_right();
            }
            Err(e) => {
                println!("Error receiving: {:#?}", e);
            }
        }
    }
}

Note how in our run function we enter an infinite loop trying to read from a socket. Right now, all we do is trim the buffer, but we actually need to process the message. Which means we need a protocol!

Protocol

A Protocol simply a defined way for two things to communicate. We could use JSON, and send that back and forth. We could use YAML, Protobuf, Capnproto, or many others. Since we are doing this to learn low level coding, let’s make our own simple protocol.

The Blank Canvas

We are _ârtįstés! We can do whatever we want! Let’s start by allocating 1 byte at the front of our messages to indicate the type of message. That gives us 2^8, or 256 types of messages.

It would look like this: [0, 0, 0, 0, 0, 0, 0, 0]. That’s 8 bits, or 1 byte. If we treat it as a u8, it is equal to 0.

What about a message that sends a list of all known nodes? Why, that can be number 1: [0, 0, 0, 0, 0, 0, 0, 1].

But The List…​

This is where we have to extend our protocol. We can say that the next byte that follows contains the number of nodes in the cluster. If it is a three node cluster, we’d then have:

[0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 1]

If our receiver sees a message of type 1, then we code in logic that tells it to expect the number of nodes in the next byte. After that, the node can send its entire list.

Suppose we used a UUID for each node. We know a UUID takes up 128 bits. As a new joiner to the cluster, when we receive the message of type 1, followed by 3, we know that we should expect 128 * 3 more bits.

Well This Sounds Tedious

Well, yes. Happily, we can do all this rather easily by using Rust to serialize data. The heavyweight tool in Rust for that is called serde. We’ll use it to serialize our messages into something called bincode, which is a compact Rust-specific encoding scheme.

Note
serde has support for TOML, JSON, YAML, and a bunch of others. We’re using bincode for ease and compactness.

The Tasks

  1. Define a message enum

  2. Write a message interpreter

Message Enum

If you take a look in src/cluster/message.rs, you’ll see we already started on this. It looks like this:

pub enum IridiumMessage {
    Hello {
        alias: String,
    },
    HelloAck {
        alias: String,
        nodes: Vec<(String, String, String)>,
    },
}

Note how we are sending a Vector of tuples containing three Strings. The first is the alias, the second the IP, the third the port.

Note
We could take advantage of type aliasing here to make this more clear. Feel free to try, and take a look at the source if you need some help.

We need to add three dependencies to Cargo.toml:

bincode = "1.0.1"
serde = "1.0.80"
serde_derive = "1.0.80"

bincode adds in the encoding functionality, serde adds the core serialization functionality, and serde_derive will allow us to derive the Serialize and Deserialize Traits needed by bincode.

After adding those to Cargo.toml, we’ll need to add them to src/bin/iridium.rs and src/lib.rs like so:

extern crate bincode;
#[macro_use]
extern crate serde_derive;
extern crate serde;

Serialization

Now we can change our enum to be:

#[derive(Serialize, Deserialize, Debug)]
pub enum IridiumMessage {
    Hello {
        alias: String,
    },
    HelloAck {
        alias: String,
        nodes: Vec<(String, String, String)>,
    },
}
Notice how we can now derive the needed traits.

Generating Messages

Back over in message.rs, we can now implement functions to generate and serialize messages. To generate the hello message, we can do:

impl IridiumMessage {
    pub fn hello(alias: &str) -> Result<Vec<u8>> {
        let new_message = IridiumMessage::Hello{
            alias: alias.into()
        };
        serialize(&new_message)
    }
}

What we’ll get back is a Vec<u8> that contains the alias, which we can send over a socket. If you look in client.rs, you can see our first send_hello function. We can replace it with:

pub fn send_hello(&mut self) {
    match self.alias {
        Some(ref alias) => {
            if let Ok(hello) = IridiumMessage::hello(alias) {
                if self.raw_stream.write(&hello).is_ok() {
                    trace!("Hello sent!");
                } else {
                    error!("Error sending hello");
                }
            }
        },
        None => {
            error!("Node has no ID to send hello");
        }
    }
}
This uses our IridiumMessage enum to generate and send the message.

Hello Ack

We need a function to respond to the hello. Try and implement it yourself, but check out messages.rs if you get stuck. Note that I added the following convience functions in client.rs to help with it:

pub fn alias_as_string(&self) -> Option<String>;
pub fn ip_as_string(&self) -> Option<String>;
pub fn port_as_string(&self) -> Option<String>;

Receiving Messages

The hard part of this is done. We have a receive loop in place that listens for incoming packets. All we need is a deserializer, which we can put in the IridiumMessage impl as well.

pub fn process_message(message: &[u8]) -> Result<IridiumMessage> {
    deserialize(message)
}

Yup, that’s really it. =)

Over in message.rs, we need to change our run function a bit:

pub fn run(&mut self) {
    self.recv_loop();
    loop {
        let result: bincode::Result<IridiumMessage> = bincode::deserialize_from(&mut self.reader);
        match result {
            Ok(ref message) => {
                match message {
                    &IridiumMessage::HelloAck{ref nodes, ref alias} => {
                        debug!("Received list of nodes: {:?} from {:?}", nodes, alias);
                    },
                    _ => {
                        error!("Unknown message received");
                    }
                }
                debug!("Received message: {:?}", message);
            },
            Err(e) => {
                error!("Error deserializing Iridium message: {:?}", e);
            }
        }
    }
}
Don’t forget to add a use bincode; at the top! And note how match on the type of message we decode here.

We also have to change our listen function in server.rs a bit:

pub fn listen(my_alias: String, addr: SocketAddr, connection_manager: Arc<RwLock<Manager>>) {
    info!("Initializing Cluster server...");
    let listener = TcpListener::bind(addr).unwrap();
    for stream in listener.incoming() {
        let tmp_alias = my_alias.clone();
        let mut cmgr = connection_manager.clone();
        info!("New Node connected!");
        let stream = stream.unwrap();
        thread::spawn(move || {
            let mut client = ClusterClient::new(stream);
            let result: bincode::Result<IridiumMessage> = bincode::deserialize_from(&mut client.reader);
            match result {
                Ok(message) => {
                    match message {
                        IridiumMessage::Hello{alias} => {
                            debug!("Found a hello message with alias: {:?}", alias);
                            let mut cmgr_lock = cmgr.write().unwrap();
                            let mut members: Vec<(String, String, String)> = Vec::new();
                        
                            // Now we need to send back a list of cluster members in the form of a Vector of tuples, containing their alias
                            for (key, value) in &cmgr_lock.clients {
                                if let Ok(client) = value.read() {
                                    let tuple = (key.to_string(), client.ip_as_string().unwrap(), client.port_as_string().unwrap());
                                    members.push(tuple);
                                }
                            }
                            let hello_ack = IridiumMessage::HelloAck {
                                nodes: members,
                                alias: (tmp_alias.clone(), addr.ip().to_string(), addr.port().to_string())
                            };

                            client.write_bytes(&bincode::serialize(&hello_ack).unwrap());
                            cmgr_lock.add_client(alias.to_string(), client);
                        },
                        _ => {
                            error!("Non-hello message received from node trying to join");
                        }
                    }
                },
                Err(e) => {
                    error!("Error deserializing Iridium message: {:?}", e);
                }
            }
        });
    }
}

Note
This can be cleaned up a lot.

The problem we have is that this is the bootstrapping process. A node has to say Hello, then get back a list of nodes. It is easier to do this logic here, and then once we are sure everything is in order, we can use client functions to send.

At this point, we should be able to start up two Iridium VMs, have one try to join, and see the hello, and get back a list of nodes. Let’s give it a try!

From the VM where I started the cluster:

Welcome to Iridium! Let's be productive!
>>> !start_cluster
Started cluster server!
>>>  INFO 2018-10-28T03:20:32Z: iridium::cluster::server: Initializing Cluster server...
 INFO 2018-10-28T03:20:41Z: iridium::cluster::server: New Node connected!
DEBUG 2018-10-28T03:20:41Z: iridium::cluster::server: Found a hello message with alias: "b1ff4bc1-0d72-4d1e-95f3-f384b33f5c2e"

And from another VM where I joined the original node:

>>> !join_cluster localhost 2254
Attempting to join cluster...
Connected to cluster!
TRACE 2018-10-28T03:20:41Z: iridium::cluster::message: Generating hello message
TRACE 2018-10-28T03:20:41Z: iridium::cluster::client: Hello sent: [0, 0, 0, 0, 36, 0, 0, 0, 0, 0, 0, 0, 98, 49, 102, 102, 52, 98, 99, 49, 45, 48, 100, 55, 50, 45, 52, 100, 49, 101, 45, 57, 53, 102, 51, 45, 102, 51, 56, 52, 98, 51, 51, 102, 53, 99, 50, 101]
>>> DEBUG 2018-10-28T03:20:41Z: iridium::cluster::client: Received list of nodes: [] from ("", "127.0.0.1", "2254")
DEBUG 2018-10-28T03:20:41Z: iridium::cluster::client: Received message: HelloAck { alias: ("", "127.0.0.1", "2254"), nodes: [] }

Yay! It worked! We are successfully deserializing bincoded structs sent over the network.

Because we only have two nodes, and a node doesn’t add itself to the connection list, we have to send it separately in the alias field. And that’s why the node list is empty.

End

We’re getting closer! Clustering is finnicky, but we’re almost there. The listen function is getting pretty awful, so we’ll probably refactor that sooner rather than later.

We also need to handle disconnections. If you close one of the VMs right now, you’ll see a stream of error messages. But, that’s another tutorial!


If you need some assistance with any of the topics in the tutorials, or just devops and application development in general, we offer consulting services. Check it out over here or click Services along the top.