One of my next goals in my Raft project is to tame the tick() with mio. In this post, we'll explore what it is, what it can do, and why it matters. First things first: What is MIO?
MIO is a lightweight IO library for Rust with a focus on adding as little overhead as possible over the OS abstractions.
Now you're probably thinking... "So what? We have plenty of shiny stuff in the new std::io..." and you're right! But hold off on judgement until you read this:
Features
- Event loop backed by
epoll,kqueue.- Zero allocations at runtime
- Non-blocking TCP, UDP and Unix domain sockets
- High performance timer system
- Thread safe message channel for cross thread communication
Okay, here it's starting to get interesting. epoll, and kqueue are event notification interfaces available in Linux, and the BSDs (Darwin included). Zero allocations are great for performance, and non-blocking sockets are insanely useful for network applications.
For a Raft context, the ability to use evented timers is applicable for heartbeats and election timings.
If you've used node.js, io.js, or Python's Twisted framework event loops might be familiar to you. Yes I know callback hell sucks! Fear not, this is Rust, not some scruffy loosely-typed, garbage-collected, non-blocking language!
The General Idea
MIO works off this general concept:
- Make a thread for the
EventLoop. - Register interest with
IOHandles. - Supply the loop with a
Handler. - Run your code.
Hello, World
The following program is a nice, simple example of what MIO looks like:
extern crate mio;
use mio::*;
use std::fmt::Debug;
fn main() {
// Create an event loop
let mut event_loop = EventLoop::<(), String>::new().unwrap();
let sender = event_loop.channel();
sender.send("Hello".to_string()).unwrap();
// Start it
event_loop.run(&mut BearHandler).unwrap();
}
struct BearHandler;
impl<T, M: Send + Debug> Handler<T, M> for BearHandler {
/// A message has been delivered
fn notify(&mut self, _reactor: &mut EventLoop<T, M>, msg: M) {
println!("{:?}", msg);
}
}
A lot is going on here. Let's take a closer look.
EventLoop::<(), String>::new();
This creates a new EventLoop. The Event loop uses T tokens of type (), and channels that consume and pass M: Send of type String. You can use anything that implements Send for messages.
event_loop.channel();
This gives us a channel to send Strings. When we send() later our BearHandler wakes up and invokes notify.
From there, notify prints out the message. In a real application, this is where your business logic would be to determine what to do with the message.
Getting a Handle(r)
A Handler can implement some or all of the following functions:
pub trait Handler<T: Token, M: Send> {
/// A registered IoHandle has available data to read
fn readable(&mut self, reactor: &mut EventLoop<T, M>, hint: ReadHint, token: T);
/// A registered IoHandle is available to write to
fn writable(&mut self, reactor: &mut EventLoop<T, M>, token: T);
/// A registered timer has expired
fn timeout(&mut self, reactor: &mut EventLoop<T, M>, token: T);
/// A message has been delivered
fn notify(&mut self, reactor: &mut EventLoop<T, M>, msg: M);
/// A signal has been delivered to the process
fn signal(&mut self, reactor: &mut EventLoop<T, M>, info: mio::SigInfo);
}
So this is a lot more then just sending some Strings around! Did you doubt me?
What's really cool is your Handler can have a data backing since it's just a trait that you can implement yourself! Let's do that now:
extern crate mio;
use mio::*;
fn main() {
// Create an event loop
let mut event_loop = EventLoop::<(), u64>::new().unwrap();
let sender = event_loop.channel();
for i in 0.. 5 {
sender.send(i).unwrap();
}
// Start it
event_loop.run(&mut BearHandler(0)).unwrap();
}
struct BearHandler(u64);
impl<T> Handler<T, u64> for BearHandler {
fn notify(&mut self, _reactor: &mut EventLoop<T, u64>, msg: u64) {
self.0 += msg;
println!("Message: {}, Total: {}", msg, self.0);
}
}
Output:
Message: 0, Total: 0
Message: 1, Total: 1
Message: 2, Total: 3
Message: 3, Total: 6
Message: 4, Total: 10
In this case, our BearHandler was a humble u64 tuple which we mutated, but you could easily make this a more complicated struct.
Registered Interest
Just sending messages isn't particularly interesting, let's wire up some new interests.
Alright, so let's take our humble little BearHandler and build it into a bit of a state mutation game:
- Each time a timer fire (every 250ms) send a UDP packet to some socket.
- Each time that socket gets hit decrement a
countby 1. - Each time on the channel increment it by that much.
First, we'll modify the BearHandler to reflect these changes. First, we'll make it a proper struct and let it store a pair of sockets as well as it's count.
Second, we'll implement the readable, timeout, and notify. Note in the readable we take care to drain the socket. Also, note how the timeout doesn't need to reset itself, we can clear it with clear_timeout if we want.
impl Handler<Token, u64> for BearHandler {
fn readable(&mut self, _reactor: &mut EventLoop<Token, u64>, _token: Token, _hint: ReadHint) {
let mut buffer = buf::RingBuf::new(1024);
// Drain socket, otherwise infinite loop!
net::TryRecv::recv_from(&self.listener, &mut buffer.writer()).unwrap();
self.count -= 1;
println!("Decremented, Total: {}", self.count);
}
fn timeout(&mut self, reactor: &mut EventLoop<Token, u64>, _token: Token) {
self.sender.send_to(&[0], "127.0.0.1:12345").unwrap();
// Reset
reactor.timeout(TIMEOUT, Duration::milliseconds(250)).unwrap();
println!("Timeout");
}
fn notify(&mut self, _reactor: &mut EventLoop<Token, u64>, msg: u64) {
self.count += msg;
println!("Increment by: {}, Total: {}", msg, self.count);
}
}
Next, registering the various events, we'll use some of MIO's mio::net items:
const LISTENER: Token = Token(0);
const TIMEOUT: Token = Token(1);
fn main() {
// Create an event loop
let mut event_loop = EventLoop::<Token, u64>::new().unwrap();
// Register Interest
let listener = UdpSocket::bind("127.0.0.1:12345").unwrap();
event_loop.register(&listener, LISTENER).unwrap(); // Token lets us distinguish.
// Increments
let incrementer = event_loop.channel();
for i in 0.. 5 {
incrementer.send(i).unwrap();
}
// Decrements
event_loop.timeout(TIMEOUT, Duration::milliseconds(250)).unwrap();
// Start it
let sender = UdpSocket::bind("127.0.0.1:12346").unwrap();
event_loop.run(&mut BearHandler {
count: 0,
listener: listener,
sender: sender
}).unwrap();
}
Output:
Increment by: 0, Total: 0
Increment by: 1, Total: 1
Increment by: 2, Total: 3
Increment by: 3, Total: 6
Increment by: 4, Total: 10
Timeout
Decremented, Total: 9
Timeout
Decremented, Total: 8
Timeout
Decremented, Total: 7
Timeout
Decremented, Total: 6
Timeout
Decremented, Total: 5
Timeout
Decremented, Total: 4
Timeout
Decremented, Total: 3
Timeout
Decremented, Total: 2
Timeout
Decremented, Total: 1
Timeout
Decremented, Total: 0
By the way, in our examples MIO will take over the main thread and block. In a normal application you'll want to kick it off into a new thread when you
start()it.
Learn More
- I found Wycats'
mio-bookrepo very useful. - As always, the docs were a great help.
- This post is discussed on Reddit.
- This post is also discussed on Hacker News!
Help Out!
We're tracking progress on integrating MIO into Raft with this issue. Feel free to weigh in or help out!
A huge thanks to @danburkert for their contributions this week!