So You Want to Build a Language VM - Part 33 - Cluster Syncing

Cluster Syncing

Intro

I don’t know about you, but I’m getting tired of all this clustering. But, the end is in sight! For real this time! I promise!

WWhen we ended last tutorial, we had been able to send bincoded messages. A cluster member could join another cluster, and receive a list of other nodes back.

Our next task is for the new node to take each node it receives, and establish its own, independent TCP connection to them.

The Code

As you probably suspected, we’ll be working mainly in src/cluster/client.rs.

There’s an important thing to keep in mind, though. If we have a 10 node cluster, we don’t want to send a Hello to each, and get back the same node list 10 times.

The Join Message

To deal with this, we can make another message type that does not trigger a response with all other nodes in the cluster. We can add that to message.rs like so:

#[derive(Serialize, Deserialize, Debug)]
pub enum IridiumMessage {
    Hello {
        alias: String,
    },
    HelloAck {
        alias: (String, String, String),
        nodes: Vec<(String, String, String)>,
    },
    Join {
        alias: (String, String, String)
    }
}

In fact, this is a good time to change our (String, String, String) into a type alias. In cluster/mod.rs, add this:

type NodeAlias = String;
type NodeIP = String;
type NodePort = String;
type NodeInfo = (NodeAlias, NodeIP, NodePort);

Now we can change our IridiumMessage enum to look like:

#[derive(Serialize, Deserialize, Debug)]
pub enum IridiumMessage {
    Hello {
        alias: NodeAlias,
        port: NodePort,
    },
    HelloAck {
        alias: NodeInfo,
        nodes: Vec<NodeInfo>,
    },
    Join {
        info: NodeInfo,
        port: NodePort,
    },
}

A bit neater! Also, note the addition of port to both the Hello and Join messages. Why we have to send this information along is important enough to merit a longer description!

Important

Each VM listens on a specific port, with the default being 2254. If we have two physical machines, each running one VM, then Node A can connect to Node B’s IP and port 2254. When it does this, there is another port involves: the source port. Node A can’t send packets from port 2254, since it is in use. So the application chooses a random, high numbered port (often over 30000). These are sometimes called ephemeral ports.

Here is the important part: Node A cannot receive connections on that port! It is just for it to receive responses from Node B. So when we publish a node, we want to publish the port other nodes can connect to it on, not one of the ephemeral ports.

Figuring out all this port publishing took quite some time to get right, but it is working ok-ish for now.

=== Join Generation Function Now let’s add a function to the IridiumMessage impl to generate a Join message.

/// Generates a join message
pub fn join(alias: &str, port: &str) -> Result<Vec<u8>> {
    trace!("Generating join message!");
    let new_message = IridiumMessage::Join {
        alias: alias.into(),
        port: port.into(),
    };
    serialize(&new_message)
}

Nothing too special here. Just remember, when we call this function, the port we give it is the port the node wants to receive connections from other nodes on. This is specified via the -P flag on the CLI.

Sending out the Join

Of course, now we need to do some work to send this message out to all the nodes we receive. Right now, we handle this in the run function of client.rs. Specifically, this part:

match message {
    &IridiumMessage::HelloAck {
        ref nodes,
        ref alias,
    } => {
        debug!("Received list of nodes: {:?} from {:?}", nodes, alias);
    }

That is, we just print out the list of nodes. So let’s write a function to send a join message to each of those. For now, we’ll just drop the code needed to send a join out into the function. We’ll have to factor it out soon, as this run function is getting quite long.

let join_message: std::result::Result<std::vec::Vec<u8>, std::boxed::Box<bincode::ErrorKind>>;
if let Some(ref alias) = self.alias_as_string() {
    join_message = IridiumMessage::join(&alias, &self.port_as_string().unwrap());
} else {
    error!("Unable to get my own alias to send a join message to other cluster members");
    continue;
}
let join_message = join_message.unwrap();
for node in nodes {
    let remote_alias = &node.0;
    let remote_ip = &node.1;
    let remote_port = &node.2;
    let addr = remote_ip.to_owned() + ":" + remote_port;
    if let Ok(stream) = TcpStream::connect(addr) {
        let mut cluster_client = ClusterClient::new(stream, self.connection_manager.clone(), self.bind_port.clone().unwrap());
        cluster_client.write_bytes(&join_message);
        if let Ok(mut cm) = self.connection_manager.write() {
            let client_tuple = (remote_alias.to_string(), cluster_client.ip_as_string().unwrap(), cluster_client.port_as_string().unwrap());
            cm.add_client(client_tuple, cluster_client);
        }
    } else {
        error!("Unable to establish connection to: {:?}", node);
    }
}

We had to make a few changes to the ClusterClient data structure:

#[derive(Debug)]
pub struct ClusterClient {
    alias: Option<String>,
    pub reader: BufReader<TcpStream>,
    pub writer: BufWriter<TcpStream>,
    pub connection_manager: Arc<RwLock<Manager>>,
    pub bind_port: Option<String>,
    rx: Option<Arc<Mutex<Receiver<String>>>>,
    _tx: Option<Arc<Mutex<Sender<String>>>>,
    pub raw_stream: TcpStream,
}

Note the addition of the bind_port field and the connection_manager fields. This was the easiest way to give ClusterClients access to that data in the short term. Since a ClusterClient can react to messages, it may need to add or remove clients. And it also needs to know the port the node wants to receive connections on.

== A Few Convienience Functions If you look in client.rs, you’ll note that I’ve added a few convenience functions to get the local and remote ip and port as Strings.

Handling the Join on the Server

If you look in server.rs, you’ll see this bit around line 59:

// Handles another node sending a Join message. In this case, we don't want to send back a list of all known nodes.
IridiumMessage::Join { alias, port } => {
debug!("Received join message from alias: {:?}", alias);
if let Ok(mut connection_manager) = cmgr.write() {
    debug!("Added new client {} to conneciton manager", alias);
    let client_tuple = (alias.to_string(), client.remote_ip_as_string().unwrap(), port);
    connection_manager.add_client(client_tuple, client);
} else {
    error!("Unable to add {} to connection manager", alias);
}

All the server does is add the client.

Change to the Hash of Clients

Prior to this, we were using the alias as the key. Due to locking complexities, I changed the key to be a tuple consisting of (alias, ip, port). That way we don’t need to acquire a lock on the client when we are getting the client list. This is an annoying workaround to the fact that each client, in the background, is running a receive loop and has locked itself. We’ll need a better way to handle this.

End

If you look through all the changes, you’ll see a bunch of debug log statements added and a few other minor things. The topics covered in this article are the most important, though. If you look in the new scripts/ directory, you can find a script that uses tmux to create a test 3 node cluster on a single machine. If you are on OSX, you use homebrew to install tmux. I hope to turn this into a docker compose setup soon.

The code as it was at the end of this tutorial is available here: https://gitlab.com/subnetzero/iridium/tags/0.0.33

Until next time!


If you need some assistance with any of the topics in the tutorials, or just devops and application development in general, we offer consulting services. Check it out over here or click Services along the top.