So You Want to Build a Language VM - Part 31 - Making Clustering Make Sense
Making the Clustering Make Sense
Intro
Sorry everyone! I’ve been busy, hence the delay. Back in tutorial 29, we had two nodes talking to each other, but we got a lot of random text on the screen. Let’s figure out why!
The Problem
>>> !join_cluster 127.0.0.1 2254
Attempting to join cluster...
Connected to cluster!
thread 'main' panicked at 'called `Option::unwrap()` on a `None` value', libcore/option.rs:345:21
stack backtrace:
0: std::sys::unix::backtrace::tracing::imp::unwind_backtrace
1: std::sys_common::backtrace::print
2: std::panicking::default_hook::{{closure}}
3: std::panicking::default_hook
4: std::panicking::rust_panic_with_hook
5: std::panicking::continue_panic_fmt
6: rust_begin_unwind
7: core::panicking::panic_fmt
8: core::panicking::panic
9: <core::option::Option<T>>::unwrap
10: iridium::cluster::client::ClusterClient::send_hello
11: iridium::repl::REPL::join_cluster
12: iridium::repl::REPL::execute_command
13: iridium::repl::REPL::run
14: iridium::main
15: std::rt::lang_start::{{closure}}
16: std::panicking::try::do_call
17: __rust_maybe_catch_panic
18: std::rt::lang_start_internal
19: std::rt::lang_start
20: main
What is going on in send_hello
? Well, we seem to have not specified an alias! We can start up an iridium node with no alias. Our ClusterClient
initialization looks like:
ClusterClient {
reader: BufReader::new(reader),
writer: BufWriter::new(writer),
raw_stream: stream,
_tx: Some(Arc::new(Mutex::new(tx))),
rx: Some(Arc::new(Mutex::new(rx))),
alias: None,
}
new
function of ClusterClient
, or we could a more builder-pattern type thing. I’m going to opt for the builder-pattern as I like it.Add a new function to ClusterClient
:
/// Sets the alias of the ClusterClient and returns it
pub fn with_alias(mut self, alias: String) -> Self {
self.alias = Some(alias);
self
}
src/vm.rs
, we need to make the id attribute public:pub id: Uuid,
src/repl/mod.rs
, in the join_cluster
function, we have this line:let mut cc = cluster::client::ClusterClient::new(stream);
let mut cc = cluster::client::ClusterClient::new(stream).with_alias(self.vm.id.to_string());
Now let’s see what happens!
>>> Alias is: "aa4356b8-f0a0-4250-b5e5-eaabd265e9b7\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}....that goes on for awhile"
But hey, we got the alias portion!
The Second Problem
Why is it doing this? Well, remember that buffer we declared over in src/cluster/server.rs
? Let’s look at the section:
thread::spawn(move || {
let mut buf = [0; 1024];
let mut client = ClusterClient::new(stream);
let _bytes_read = client.reader.read(&mut buf);
let alias = String::from_utf8_lossy(&buf);
println!("Alias is: {:?}", alias);
client.run();
});
You can see we even have a handy-dandy variable, _bytes_read
, that has the info. A slight modificiation to our code…
thread::spawn(move || {
let mut buf = [0; 1024];
let mut client = ClusterClient::new(stream);
let bytes_read = client.reader.read(&mut buf).unwrap();
let alias = String::from_utf8_lossy(&buf[0..bytes_read]);
println!("Alias is: {:?}", alias);
client.run();
});
And now let’s try again. On the server side:
Fletchers-MBP:iridium fletcher$ iridium
>>> Welcome to Iridium! Let's be productive!
!start_cluster
Started cluster server!
Then on the client side:
Welcome to Iridium! Let's be productive!
>>> !join_cluster 127.0.0.1 2254
Attempting to join cluster...
Connected to cluster!
>>>
Then back on the server side:
>>> Alias is: "57be0d98-7634-411b-944f-0d1934ead78d"
Yay! We only read the number of bytes received.
Important | Yes, this means that each time we receive data, we only overwrite data up to a certain point in the buffer. If we read 512 bytes, then the last 512 bytes would be the 512 bytes from the previous input. |
Listing Members
If, on the VM we used to join the cluster, we run !cluster_members
, we get:
>>> !cluster_members
Listing Known Nodes:
>>> [
""
]
And if we run it on the VM we used to start the cluster, we get:
!cluster_members
Listing Known Nodes:
>>> []
Receiving the Hello
Let’s start from the point of receiving the alias. In our function signature for listen
, over in src/cluster/server.rs
, we have:
pub fn listen(addr: SocketAddr, _connection_manager: Arc<RwLock<Manager>>) {
connection_manager
, and then the function body to look like this:pub fn listen(addr: SocketAddr, connection_manager: Arc<RwLock<Manager>>) {
info!("Initializing Cluster server...");
let listener = TcpListener::bind(addr).unwrap();
for stream in listener.incoming() {
let mut cmgr = connection_manager.clone();
info!("New Node connected!");
let stream = stream.unwrap();
thread::spawn(move || {
let mut buf = [0; 1024];
let mut client = ClusterClient::new(stream);
let bytes_read = client.reader.read(&mut buf).unwrap();
let alias = String::from_utf8_lossy(&buf[0..bytes_read]);
cmgr.write().unwrap().add_client(alias.to_string(), client);
let mut client = cmgr.write().get_client(alias.to_string());
client.run();
});
}
}
We clone the connection_manager whenever we get a new connection. Since this is an Arc, we’re incrementing the reference count. When it goes out of scope, the reference count will go down. We do this because we don’t want to pass in
connection_manager
to the thread, because we couldn’t make more clones.Note how we are not calling
client.run()
; that’s because we pass ownership of the client to the connection manager. We’ll move that logic there.
Cluster Manager
Head over to src/cluster/manager.rs
. Our add_client
function looks like this:
pub fn add_client(&mut self, alias: NodeAlias, client: ClusterClient) -> bool {
if self.clients.contains_key(&alias) {
error!("Tried to add a client that already existed");
return false;
}
debug!("Adding {}", alias);
client.run();
self.clients.insert(alias, client);
true
}
>>> !join_cluster 127.0.0.1 2254
Attempting to join cluster...
Connected to cluster!
run
to the connection manager? It’s not starting that in a separate thread, so it blocks the main thread.The Fix
I tried a few different things, such as starting run
in the the add_client
function. But, since no matter what we’re doing things across threads, we need to wrap the client in an Arc.
Head to src/cluster/manager.rs
and change the Manager
struct to:
#[derive(Default)]
pub struct Manager {
clients: HashMap<NodeAlias, Arc<RwLock<ClusterClient>>>,
}
Manager
:pub fn get_client(&mut self, alias: NodeAlias) -> Option<Arc<RwLock<ClusterClient>>> {
Some(self.clients.get_mut(&alias).unwrap().clone())
}
And next we need to modify the add_client
function:
pub fn add_client(&mut self, alias: NodeAlias, client: ClusterClient) -> bool {
if self.clients.contains_key(&alias) {
error!("Tried to add a client that already existed");
return false;
}
let client = Arc::new(RwLock::new(client));
self.clients.insert(alias.clone(), client);
let cloned_client = self.get_client(alias).unwrap();
thread::spawn(move || {
cloned_client.write().unwrap().run();
});
true
}
Arc
and RwLock
, adding a reference to our HashMap, then spawning its run
function in a background thread. Finally, we need to change our listen function in src/cluster/server.rs
:pub fn listen(addr: SocketAddr, connection_manager: Arc<RwLock<Manager>>) {
info!("Initializing Cluster server...");
let listener = TcpListener::bind(addr).unwrap();
for stream in listener.incoming() {
let mut cmgr = connection_manager.clone();
info!("New Node connected!");
let stream = stream.unwrap();
thread::spawn(move || {
let mut buf = [0; 1024];
let mut client = ClusterClient::new(stream);
let bytes_read = client.reader.read(&mut buf).unwrap();
let alias = String::from_utf8_lossy(&buf[0..bytes_read]);
let mut cmgr_lock = cmgr.write().unwrap();
cmgr_lock.add_client(alias.to_string(), client);
});
}
}
run
here. Let’s see what happens now if we try to cluster:Fletchers-MBP:~ fletcher$ iridium
Welcome to Iridium! Let's be productive!
>>> !join_cluster 127.0.0.1 2254
Attempting to join cluster...
Connected to cluster!
>>>
>>> !cluster_members
Listing Known Nodes:
>>> [
""
]
>>> !cluster_members
Listing Known Nodes:
>>> [
"65fda5a6-8b30-4cf1-926f-6fec0d842461"
]
End
The reason we do not see any nodes on the joiner is because we haven’t written any code to send the other known nodes back to a client when it joins. We’ll do that next tutorial!
If you need some assistance with any of the topics in the tutorials, or just devops and application development in general, we offer consulting services. Check it out over here or click Services along the top.