So You Want to Build a Language VM - Part 27 - Clustering: Part 1
Begins adding in clustering
Intro
Hello everyone! The delay in tutorials was the result of the website re-work (courtesy of Bitdream), that I hope is less…awful. In the interim, I did add a few features and fixed some bugs which would not have made for exciting tutorials, so you’ll want to start this tutorial from the latest master
.
This tutorial, we’re going to get different instances of the VM talking to each other over TCP. They won’t do much, but they’ll at least be able to connect.
Why?
Good question!
Redundancy
If your application is running on more than one physically separate server, if one fails, the other can continue to operate. In the case of a network of Iridium VMs, making them aware of each other allows for intelligent handling of failures.
Scaling
If you get a traffic spike, more machines running your application can be brought up.
Other Stuff…
There’s a few other cool things I have in mind, but I’m not sure if they’ll work yet. So I’ll hold off until later. =)
How?
The same way we added multi-user support! We’ll open TCP channels between the node members.
Cluster Design
This is where it gets a bit more complicated. Distributed computing is a hard problem, and there are many ways to do it. Current buzzwords in the industry include:
Raft
Paxos
Leaderless
Consistent hashing
A lot of the issues in distributed systems arise from building distributed database systems. Since that isn’t what we’re building, we have less to worry about. Plus, we have the Erlang BEAM VM design we can just steal! Which is kind of handy.
Terminology
Before we get started, let’s define some terms.
A
Node
is a running Iridium VMA
Host
is a physical or virtual server running Linux or some other OSA
Cluster
is a collection ofNodes
that can talk to each other over some shared networkA
Topology
is how a cluster is connected together. Are all nodes connected to all other nodes? Are there proxy nodes?
Important | A Host can run more than one Node . |
Naming a Node
We need to be able to unique identify each node within a cluster. That is, we don’t want two nodes named Murgatroyd
. This uniqueness is important because we are going to be able to pass Messages
between nodes, and part of that is being able to unique identify that node.
Generating IDs
One option is to just use random UUIDs. In fact, we added a field for a UUID in the VM struct awhile back, and are populating it with a random UUID. If we are ok with that, then we’re already done with this step!
Specific IDs
While UUIDs are cool, they can be, uh, difficult to remember. It would be nice to provide a CLI flag to specify the Node ID. The issue with this is that someone could give an entirely different Iridium VM the same name. This isn’t good for auditability purposes, and is one of the nice things about the current system of generating a random UUID.
Aliases
So what we need are aliases. This is an arbitrary string that a user can provide when they start an Iridium VM. If they provide an alias, the Node
will be addressable by either.
Open up src/bin/cli.yml
, and let’s add an optional node-alias
flag. Just below the DATA_ROOT_DIR option, we can add:
- DATA_ROOT_DIR:
help: Root directory where the Iridium VM should store its data. Defaults to /var/lib/iridium.
required: false
takes_value: true
long: data-root-dir
- NODE_ALIAS:
help: An alias that can be used to refer to a running VM across a network
required: false
takes_value: true
long: node-alias
And then iridium.rs
, we add a check for it:
let alias = matches.value_of("NODE_ALIAS").unwrap_or("");
Then at last, we add the new alias
field to the VM struct in src/vm.rs
:
pub struct VM {
/// Array that simulates having hardware registers
pub registers: [i32; 32],
/// Array that simulates having floating point hardware registers
pub float_registers: [f64; 32],
/// Program counter that tracks which byte is being executed
pc: usize,
/// The bytecode of the program being run
pub program: Vec<u8>,
/// Used for heap memory
heap: Vec<u8>,
/// Used to represent the stack
stack: Vec<u8>,
/// Contains the remainder of modulo division ops
remainder: usize,
/// Contains the result of the last comparison operation
equal_flag: bool,
/// Loop counter field, used with the `LOOP` instruction
loop_counter: usize,
/// Contains the read-only section data
ro_data: Vec<u8>,
/// Is a unique, randomly generated UUID for identifying this VM
id: Uuid,
/// An alias that can be specified by the user and used to refer to the Node
alias: Option<String>,
/// Keeps a list of events for a particular VM
events: Vec<VMEvent>,
/// Number of logical cores the system reports
pub logical_cores: usize,
}
Let’s use a builder-type function to pass in the alias. In the VM implementation, we can add this function:
pub fn with_alias(mut self, alias: String) -> Self {
if alias == "" {
self.alias = Some(alias)
} else {
self.alias = None
}
self
}
And then in the src/bin/iridium.rs
file, where we instantiate a VM, we can do this:
let alias = matches.value_of("NODE_ALIAS").unwrap_or("");
That takes care of the aliasing part! Let’s move on to the networking part. Let’s add another builder-style function to the VM struct.
pub fn with_cluster_bind(mut self, server_addr: String, server_port: String) -> Self {
self.server_addr = Some(server_addr);
self.server_port = Some(server_port);
self
}
This will set the fields up with the strings we got from the CLI, if the user passed them.
Server to Server
This is not much different from how we did the multi-user REPLs. We do not want to use the same socket, as this will be for dedicated server-to-server communications. So let’s add in the ability to specify an address and port for server-to-server communications.
Let’s add in two more options to cli.yml
:
- SERVER_LISTEN_PORT:
help: Which port Iridium should listen for remote connections on from other Iridium VMs. Defaults to 2254.
required: false
takes_value: true
long: server-bind-port
short: P
- SERVER_LISTEN_HOST:
help: Which address Iridium should listen for remote connections on from other Iridium VMs. Defaults to "127.0.0.1".
required: false
takes_value: true
long: server-bind-host
short: H
Then get their values in iridium.rs
:
let server_addr = matches.value_of("SERVER_LISTEN_HOST").unwrap_or("127.0.0.1");
let server_port = matches.value_of("SERVER_LISTEN_PORT").unwrap_or("2254");
And, of course, we’ll need to add yet more fields to the VM. =)
// Server address that the VM will bind to for server-to-server communications
server_addr: Option<String>,
// Port the server will bind to for server-to-server communications
server_port: Option<String>
VM Instantiation Change
To make things a bit cleaner and easier for a VM to bind to a socket, we need to change this:
let mut vm = VM::new();
let mut vm = VM::new().with_alias(alias.to_string()).with_cluster_bind(server_addr.into(), server_port.into());
INPUT_FILE
(around like 64 in iridium.rs
) and the other is where we create a VM for use with the REPL
, around like 83.This is a minor refactor to make our lives a bit easier.
REPL Change
The second one is also requires us to make a small change to the REPL:
let mut repl = REPL::new(vm);
We now want to pass a VM instance in. In repl.rs
, that means we need to change the signature to:
pub fn new(vm: VM) -> REPL
Yay, More Threading!
Yes, we’re going to use threading for this. This has a tradeoff, though. We’re also going to use a full-mesh network.
Full-Mesh Network
If we have a cluster of 5 Nodes
, there’s a lot of different Topologies
we can have.
Ring
In this topology, the node connections would look like this:
A -> B -> C -> D -> E -> A
Each Node
is connected to two other Nodes
. If we lose one node, we still have a path between the remaining nodes, but it is less optimal.
Full Mesh
A full mesh topology would look like:
A -> (B, C, D, E)
B -> (A, C, D, E)
C -> (D, A, B, E)
D -> (A, B, C, E)
E -> (A, B, C, D)
Each node is connected to every other node. This has the advantage of being very resilient, but it is also expensive. As the number of nodes grows, the number of needed connections on each node goes up. It is O(N!), where N is the number of Nodes
. So not great.
For now, though, it will serve our purposes, and we can optimize it with various techniques, such as using a partial-mesh network, later on.
Transitivity
Suppose we have a 3 Node
cluster, A B and C. If A → C
, A and C will also connect to B. Or, put another way, whenever a new node joins the cluster, it will get a list of all nodes of the cluster, and connect to them.
Implementation
OK, let’s implement this. First up, we need:
A background thread that listens for connections
Something that watches for topology changes (nodes entering/leaving the cluster) and responds appropriately
Let’s start with…
=== Listening Socket
We’ve done this before, so it’s a good starting point. Let’s do it in a new module called cluster
. In src/
make cluster/
, and in there make three files: mod.rs
, client.rs
, and server.rs
. Let’s start on the server side.
Server
This is quite simple, and very similar to the remote/server.rs
code:
use std::net::{TcpListener, SocketAddr};
use std::thread;
use cluster::client::ClusterClient;
pub fn listen(addr: SocketAddr) {
info!("Initializing Cluster server...");
let listener =
TcpListener::bind(addr).unwrap();
for stream in listener.incoming() {
info!("New Node connected!");
let stream = stream.unwrap();
thread::spawn(|| {
let mut client = ClusterClient::new(stream);
client.run();
});
}
}
We create a listener and start listening for connections. When a new client (Iridium VM in this case) connects, we spawn a new thread, start the cluster client up, and go back to listening.
client
This one is a bit longer, so I’m going to make comments in the code itself to explain what is going on.
use std::io::{BufRead, Write};
use std::io::{BufReader, BufWriter};
use std::net::TcpStream;
use std::thread;
use std::sync::mpsc::channel;
use std::sync::mpsc::{Receiver, Sender};
/// This represents another Iridium VM that has joined this server. On their end, we would be the ClusterClient.
/// The main difference between this and `Client` is that we don't create a `REPL`, as we don't need one.
pub struct ClusterClient {
// Wrap the stream in a BufReader to make it easier to read
reader: BufReader<TcpStream>,
// Wrap the stream in a BufWriter to make it easier to write
writer: BufWriter<TcpStream>,
// These are standard mpsc channels.
// We'll start a thread that watches for messages coming in on on this channel from other parts of *our* application
// to be sent out to the ClusterClient
rx: Option<Receiver<String>>,
//If something wants to send something to this client, they can clone the `tx` channel.
tx: Option<Sender<String>>,
raw_stream: TcpStream,
}
impl ClusterClient {
/// Creates and returns a new ClusterClient that sets up the stream clones and mpsc channels
pub fn new(stream: TcpStream) -> ClusterClient {
// TODO: Handle this better
let reader = stream.try_clone().unwrap();
let writer = stream.try_clone().unwrap();
let (tx, rx) = channel();
ClusterClient {
reader: BufReader::new(reader),
writer: BufWriter::new(writer),
raw_stream: stream,
tx: Some(tx),
rx: Some(rx)
}
}
// Writes a message as bytes to the connected ClusterClient
fn w(&mut self, msg: &str) -> bool {
match self.writer.write_all(msg.as_bytes()) {
Ok(_) => match self.writer.flush() {
Ok(_) => true,
Err(e) => {
println!("Error flushing to client: {}", e);
false
}
},
Err(e) => {
println!("Error writing to client: {}", e);
false
}
}
}
// This is a background loop that watches for incoming messages on the mpsc channel. When it receives one, it sends it out to the ClusterClient.
fn recv_loop(&mut self) {
// We take the rx channel so that we can move ownership into the thread and loop on it
let chan = self.rx.take().unwrap();
let mut writer = self.raw_stream.try_clone().unwrap();
let _t = thread::spawn(move || {
loop {
match chan.recv() {
Ok(msg) => {
match writer.write_all(msg.as_bytes()) {
Ok(_) => {}
Err(_e) => {}
};
match writer.flush() {
Ok(_) => {}
Err(_e) => {}
}
}
Err(_e) => {}
}
}
});
}
/// Main function to call to set up the ClusterClient
pub fn run(&mut self) {
// Starts the recv_loop in a background thread
self.recv_loop();
let mut buf = String::new();
// Loop over the incoming data, waiting for data.
loop {
match self.reader.read_line(&mut buf) {
Ok(_) => {
buf.trim_right();
}
Err(e) => {
println!("Error receiving: {:#?}", e);
}
}
}
}
}
Phew! Sorry for the long block of code. To summarize:
VM starts up
A thread binds to a TCP socket and listens for incoming connections
New connections are handed off to a new thread, that starts their recv loop
A New Command
We don’t yet have a way to start up the cluster. For that, let’s head into src/repl/mod.rs
and add a new command called !start_cluster
. I will leave that implementation as a fun exercise, but you can reference the source code if you get stuck: https://gitlab.com/subnetzero/iridium/tags/0.0.27
In the next tutorial, we’ll work on passing messages between cluster members. See you then!
If you need some assistance with any of the topics in the tutorials, or just devops and application development in general, we offer consulting services. Check it out over here or click Services along the top.