So You Want to Build a Language VM - Part 27 - Clustering: Part 1

Begins adding in clustering

Intro

Hello everyone! The delay in tutorials was the result of the website re-work (courtesy of Bitdream), that I hope is less…​awful. In the interim, I did add a few features and fixed some bugs which would not have made for exciting tutorials, so you’ll want to start this tutorial from the latest master. This tutorial, we’re going to get different instances of the VM talking to each other over TCP. They won’t do much, but they’ll at least be able to connect.

Why?

Good question!

Redundancy

If your application is running on more than one physically separate server, if one fails, the other can continue to operate. In the case of a network of Iridium VMs, making them aware of each other allows for intelligent handling of failures.

Scaling

If you get a traffic spike, more machines running your application can be brought up.

Other Stuff…​

There’s a few other cool things I have in mind, but I’m not sure if they’ll work yet. So I’ll hold off until later. =)

How?

The same way we added multi-user support! We’ll open TCP channels between the node members.

Cluster Design

This is where it gets a bit more complicated. Distributed computing is a hard problem, and there are many ways to do it. Current buzzwords in the industry include:

  • Raft

  • Paxos

  • Leaderless

  • Consistent hashing

A lot of the issues in distributed systems arise from building distributed database systems. Since that isn’t what we’re building, we have less to worry about. Plus, we have the Erlang BEAM VM design we can just steal! Which is kind of handy.

Terminology

Before we get started, let’s define some terms.

  1. A Node is a running Iridium VM

  2. A Host is a physical or virtual server running Linux or some other OS

  3. A Cluster is a collection of Nodes that can talk to each other over some shared network

  4. A Topology is how a cluster is connected together. Are all nodes connected to all other nodes? Are there proxy nodes?

Important
A Host can run more than one Node.

Naming a Node

We need to be able to unique identify each node within a cluster. That is, we don’t want two nodes named Murgatroyd. This uniqueness is important because we are going to be able to pass Messages between nodes, and part of that is being able to unique identify that node.

Generating IDs

One option is to just use random UUIDs. In fact, we added a field for a UUID in the VM struct awhile back, and are populating it with a random UUID. If we are ok with that, then we’re already done with this step!

Specific IDs

While UUIDs are cool, they can be, uh, difficult to remember. It would be nice to provide a CLI flag to specify the Node ID. The issue with this is that someone could give an entirely different Iridium VM the same name. This isn’t good for auditability purposes, and is one of the nice things about the current system of generating a random UUID.

Aliases

So what we need are aliases. This is an arbitrary string that a user can provide when they start an Iridium VM. If they provide an alias, the Node will be addressable by either.

Open up src/bin/cli.yml, and let’s add an optional node-alias flag. Just below the DATA_ROOT_DIR option, we can add:

- DATA_ROOT_DIR:
    help: Root directory where the Iridium VM should store its data. Defaults to /var/lib/iridium.
    required: false
    takes_value: true
    long: data-root-dir
- NODE_ALIAS:
    help: An alias that can be used to refer to a running VM across a network
    required: false
    takes_value: true
    long: node-alias

And then iridium.rs, we add a check for it:

let alias = matches.value_of("NODE_ALIAS").unwrap_or("");

Then at last, we add the new alias field to the VM struct in src/vm.rs:

pub struct VM {
    /// Array that simulates having hardware registers
    pub registers: [i32; 32],
    /// Array that simulates having floating point hardware registers
    pub float_registers: [f64; 32],
    /// Program counter that tracks which byte is being executed
    pc: usize,
    /// The bytecode of the program being run
    pub program: Vec<u8>,
    /// Used for heap memory
    heap: Vec<u8>,
    /// Used to represent the stack
    stack: Vec<u8>,
    /// Contains the remainder of modulo division ops
    remainder: usize,
    /// Contains the result of the last comparison operation
    equal_flag: bool,
    /// Loop counter field, used with the `LOOP` instruction
    loop_counter: usize,
    /// Contains the read-only section data
    ro_data: Vec<u8>,
    /// Is a unique, randomly generated UUID for identifying this VM
    id: Uuid,
    /// An alias that can be specified by the user and used to refer to the Node
    alias: Option<String>,
    /// Keeps a list of events for a particular VM
    events: Vec<VMEvent>,
    /// Number of logical cores the system reports
    pub logical_cores: usize,
}

Let’s use a builder-type function to pass in the alias. In the VM implementation, we can add this function:

pub fn with_alias(mut self, alias: String) -> Self {
    if alias == "" {
        self.alias = Some(alias)
    } else {
        self.alias = None
    }
    self
}

And then in the src/bin/iridium.rs file, where we instantiate a VM, we can do this:

let alias = matches.value_of("NODE_ALIAS").unwrap_or("");

That takes care of the aliasing part! Let’s move on to the networking part. Let’s add another builder-style function to the VM struct.

pub fn with_cluster_bind(mut self, server_addr: String, server_port: String) -> Self {
    self.server_addr = Some(server_addr);
    self.server_port = Some(server_port);
    self
}

This will set the fields up with the strings we got from the CLI, if the user passed them.

Server to Server

This is not much different from how we did the multi-user REPLs. We do not want to use the same socket, as this will be for dedicated server-to-server communications. So let’s add in the ability to specify an address and port for server-to-server communications.

Let’s add in two more options to cli.yml:

- SERVER_LISTEN_PORT:
    help: Which port Iridium should listen for remote connections on from other Iridium VMs. Defaults to 2254.
    required: false
    takes_value: true
    long: server-bind-port
    short: P
- SERVER_LISTEN_HOST:
    help: Which address Iridium should listen for remote connections on from other Iridium VMs. Defaults to "127.0.0.1".
    required: false
    takes_value: true
    long: server-bind-host
    short: H

Then get their values in iridium.rs:

let server_addr = matches.value_of("SERVER_LISTEN_HOST").unwrap_or("127.0.0.1");
let server_port = matches.value_of("SERVER_LISTEN_PORT").unwrap_or("2254");

And, of course, we’ll need to add yet more fields to the VM. =)

// Server address that the VM will bind to for server-to-server communications
server_addr: Option<String>,
// Port the server will bind to for server-to-server communications
server_port: Option<String>

VM Instantiation Change

To make things a bit cleaner and easier for a VM to bind to a socket, we need to change this:

let mut vm = VM::new();
to:
let mut vm = VM::new().with_alias(alias.to_string()).with_cluster_bind(server_addr.into(), server_port.into());
in two places. One is where we invoke the VM with an INPUT_FILE (around like 64 in iridium.rs) and the other is where we create a VM for use with the REPL, around like 83.

This is a minor refactor to make our lives a bit easier.

REPL Change

The second one is also requires us to make a small change to the REPL:

let mut repl = REPL::new(vm);

We now want to pass a VM instance in. In repl.rs, that means we need to change the signature to:

pub fn new(vm: VM) -> REPL

Yay, More Threading!

Yes, we’re going to use threading for this. This has a tradeoff, though. We’re also going to use a full-mesh network.

Full-Mesh Network

If we have a cluster of 5 Nodes, there’s a lot of different Topologies we can have.

Ring

In this topology, the node connections would look like this:

A -> B -> C -> D -> E -> A

Each Node is connected to two other Nodes. If we lose one node, we still have a path between the remaining nodes, but it is less optimal.

Full Mesh

A full mesh topology would look like:

A -> (B, C, D, E)
B -> (A, C, D, E)
C -> (D, A, B, E)
D -> (A, B, C, E)
E -> (A, B, C, D)

Each node is connected to every other node. This has the advantage of being very resilient, but it is also expensive. As the number of nodes grows, the number of needed connections on each node goes up. It is O(N!), where N is the number of Nodes. So not great.

For now, though, it will serve our purposes, and we can optimize it with various techniques, such as using a partial-mesh network, later on.

Transitivity

Suppose we have a 3 Node cluster, A B and C. If A → C, A and C will also connect to B. Or, put another way, whenever a new node joins the cluster, it will get a list of all nodes of the cluster, and connect to them.

Implementation

OK, let’s implement this. First up, we need:

  1. A background thread that listens for connections

  2. Something that watches for topology changes (nodes entering/leaving the cluster) and responds appropriately

Let’s start with…​ === Listening Socket We’ve done this before, so it’s a good starting point. Let’s do it in a new module called cluster. In src/ make cluster/, and in there make three files: mod.rs, client.rs, and server.rs. Let’s start on the server side.

Server

This is quite simple, and very similar to the remote/server.rs code:

use std::net::{TcpListener, SocketAddr};
use std::thread;
use cluster::client::ClusterClient;

pub fn listen(addr: SocketAddr) {
    info!("Initializing Cluster server...");
    let listener =
        TcpListener::bind(addr).unwrap();
    for stream in listener.incoming() {
        info!("New Node connected!");
        let stream = stream.unwrap();
        thread::spawn(|| {
            let mut client = ClusterClient::new(stream);
            client.run();
        });
    }
}

We create a listener and start listening for connections. When a new client (Iridium VM in this case) connects, we spawn a new thread, start the cluster client up, and go back to listening.

client

This one is a bit longer, so I’m going to make comments in the code itself to explain what is going on.

use std::io::{BufRead, Write};
use std::io::{BufReader, BufWriter};
use std::net::TcpStream;
use std::thread;
use std::sync::mpsc::channel;
use std::sync::mpsc::{Receiver, Sender};

/// This represents another Iridium VM that has joined this server. On their end, we would be the ClusterClient.
/// The main difference between this and `Client` is that we don't create a `REPL`, as we don't need one.
pub struct ClusterClient {
    // Wrap the stream in a BufReader to make it easier to read
    reader: BufReader<TcpStream>,
    // Wrap the stream in a BufWriter to make it easier to write
    writer: BufWriter<TcpStream>,
    // These are standard mpsc channels.
    // We'll start a thread that watches for messages coming in on on this channel from other parts of *our* application
    // to be sent out to the ClusterClient
    rx: Option<Receiver<String>>,
    //If something wants to send something to this client, they can clone the `tx` channel.
    tx: Option<Sender<String>>,
    raw_stream: TcpStream,
}

impl ClusterClient {
    /// Creates and returns a new ClusterClient that sets up the stream clones and mpsc channels
    pub fn new(stream: TcpStream) -> ClusterClient {
        // TODO: Handle this better
        let reader = stream.try_clone().unwrap();
        let writer = stream.try_clone().unwrap();
        let (tx, rx) = channel();
        ClusterClient {
            reader: BufReader::new(reader),
            writer: BufWriter::new(writer),
            raw_stream: stream,
            tx: Some(tx),
            rx: Some(rx)
        }
    }

    // Writes a message as bytes to the connected ClusterClient
    fn w(&mut self, msg: &str) -> bool {
        match self.writer.write_all(msg.as_bytes()) {
            Ok(_) => match self.writer.flush() {
                Ok(_) => true,
                Err(e) => {
                    println!("Error flushing to client: {}", e);
                    false
                }
            },
            Err(e) => {
                println!("Error writing to client: {}", e);
                false
            }
        }
    }

    // This is a background loop that watches for incoming messages on the mpsc channel. When it receives one, it sends it out to the ClusterClient.
    fn recv_loop(&mut self) {
        // We take the rx channel so that we can move ownership into the thread and loop on it
        let chan = self.rx.take().unwrap();
        let mut writer = self.raw_stream.try_clone().unwrap();
        let _t = thread::spawn(move || {
            loop {
                match chan.recv() {
                    Ok(msg) => {
                        match writer.write_all(msg.as_bytes()) {
                            Ok(_) => {}
                            Err(_e) => {}
                        };
                        match writer.flush() {
                            Ok(_) => {}
                            Err(_e) => {}
                        }
                    }
                    Err(_e) => {}
                }
            }
        });
    }

    /// Main function to call to set up the ClusterClient
    pub fn run(&mut self) {
        // Starts the recv_loop in a background thread
        self.recv_loop();
        let mut buf = String::new();
        // Loop over the incoming data, waiting for data.
        loop {
            match self.reader.read_line(&mut buf) {
                Ok(_) => {
                    buf.trim_right();
                }
                Err(e) => {
                    println!("Error receiving: {:#?}", e);
                }
            }
        }
    }
}

Phew! Sorry for the long block of code. To summarize:

  1. VM starts up

  2. A thread binds to a TCP socket and listens for incoming connections

  3. New connections are handed off to a new thread, that starts their recv loop

A New Command

We don’t yet have a way to start up the cluster. For that, let’s head into src/repl/mod.rs and add a new command called !start_cluster. I will leave that implementation as a fun exercise, but you can reference the source code if you get stuck: https://gitlab.com/subnetzero/iridium/tags/0.0.27

In the next tutorial, we’ll work on passing messages between cluster members. See you then!


If you need some assistance with any of the topics in the tutorials, or just devops and application development in general, we offer consulting services. Check it out over here or click Services along the top.