Recently there has been a lot of progress in the Rust language towards a robust asynchronous stack. In this article we'll take a look at what these things are, take a tour of what's available, play with some examples, and talk about how the pieces fit together.
We'll get started with the
futures crate to start, move on to
futures_cpupool, and then eventually to
tokio. We will assume you have some knowledge of programming, and have at least thought about trying Rust before.
What are Futures and Async?
When writing code to do some action which may take some time, such as requesting a resource from a remote host, it is not always desirable to block further execution of our program. This is particularly true in the case where we are writing a web server, or performing a large number of complex calculations.
One way of handling this is to spawn threads and discretely slice up tasks to be distributed across threads. This is not always as convienent or as easy as it may sound. Suddenly we are forced to figure out how tasks are best split, how to allocate resources, to program against race conditions, and manage interthread communication.
As a community we've learnt some techniques over the years, such as CPU pools which allow a number of threads to cooperate on a task, and 'future' values which resolve to their intended value after they are finished some computation. These provide us useful and powerful tools that makes it easier, safer, and more fun to write such code.
How We Got Here
Much of this async work has been in development since green threads were removed from Rust around the 0.7 release. There have been several projects related to futures and async since, their influence can be felt by what we have now and what is on the horizon.
Much of the async story today is founded on the ideas and lessons learned from
std's green threads,
mio in particular is a foundation of nearly the entire async area of Rust. If you have interest in seeing high quality systems code I highly reccomend paying some attention to this project.
futures are the focus of much community effort. Recently Tokio accounced its first release, while futures have been relatively stable for a couple months. This has spurned a large amount of development within the community to support and leverage these new capabilities.
futures crate offers a number of structures and abstractions to enable building asynchronous code. By itself it's quite simple, and through its clever design it is fundamentally a zero cost abstraction. This is an important thing to keep in mind as we work through our examples: Writing code with futures should have no performance overhead over code without.
When compiled futures boil down to actual state machines while still allowing us to write our code in a relatively familiar 'callback style' pattern. If you've already been using Rust then working with Futures will feel very similar to how iterators feel.
One of the things we'll commonly do in this section is "sleep a little bit" on a thread to simulate some asynchronous action. In order to do this let's create a little function:
extern crate rand; use thread; use Duration; use ; // This function sleeps for a bit, then returns how long it slept.
Now that we've got that established we can use it in our first future. We'll use a
oneshot to delegate a task to another thread, then pick it up back in the main thread. A oneshot is essentially a single use channel, something can be sent from the sender to the receiver, then the channel is closed.
extern crate futures; extern crate fun_with_futures; use thread; use Future; use oneshot; use sleep_a_little_bit;
If we run this example we'll see something like:
--> START +++ WAITED 542 <-- END 542
The output is exactly what we might expect if we were doing this via the
std channels. The code looks similar as well. At this point we're barely using Futures at all, so it shouldn't be a huge surprise that there is nothing surprising happening here. Futures start to come into their own during much more complicated tasks.
Next, let's look at how we can work with a set of futures. In this example we'll spawn a number of threads, have them do some long running task, and then collect all of the results into a vector.
extern crate futures; extern crate fun_with_futures; use thread; use Future; use join_all; use sleep_a_little_bit; const NUM_OF_TASKS: usize = 10;
map call behaves just like the
map of an
Option<T> or a
Result<T,E>. It transforms some
Future<T> into some
Future<U>. Running this example in output similar to the following:
0 --> START 1 --> START 3 --> START 2 --> START 4 --> START 5 --> START 6 --> START 7 --> START 8 --> START 9 --> START 4 +++ WAITED 124 4 <-- END 1 +++ WAITED 130 1 <-- END 0 +++ WAITED 174 0 <-- END 2 +++ WAITED 268 2 <-- END 6 +++ WAITED 445 6 <-- END 3 +++ WAITED 467 3 <-- END 9 +++ WAITED 690 9 <-- END 8 +++ WAITED 694 8 <-- END 5 +++ WAITED 743 5 <-- END 7 +++ WAITED 802 7 <-- END Job is done. Values returned in order: true
In this example we can observe that all of the futures started, waited various times, then finished. They did not finish in order, but the resulting vector did come out in the correct order. Again, this result is not entirely surprising and we could have done something very similar with
Remember, futures are a basic building block, not a batteries included solution. They are intended to be built on top of.
We'll cover one more example which will feel fairly familiar to people who have used the channels from
std, then we'll start doing more interesting stuff.
The next primitive from
futures we'll use is the
futures::sync::mpsc::channel which behaves similar to the
std::sync::mpsc::channel. We'll build a channel then pass off the sender
tx to another thread, then we'll fire a series of messages along the channel. The
rx side of the channel can then be used similarly to an
extern crate futures; extern crate fun_with_futures; use thread; use ; use Stream; use mpsc; use Sink; use sleep_a_little_bit; const BUFFER_SIZE: usize = 10;
The output will be similar to the following:
--> START THREAD +++ THREAD WAITED 166 +++ FOLDING 166 INTO 0 +++ THREAD WAITED 554 +++ FOLDING 554 INTO 166 +++ THREAD WAITED 583 +++ FOLDING 583 INTO 720 +++ THREAD WAITED 175 +++ FOLDING 175 INTO 1303 +++ THREAD WAITED 136 +++ FOLDING 136 INTO 1478 +++ THREAD WAITED 155 +++ FOLDING 155 INTO 1614 +++ THREAD WAITED 90 +++ FOLDING 90 INTO 1769 +++ THREAD WAITED 986 +++ FOLDING 986 INTO 1859 +++ THREAD WAITED 830 +++ FOLDING 830 INTO 2845 +++ THREAD WAITED 21 <-- END THREAD +++ FOLDING 21 INTO 3675 SUM 3696
Now that we're familiar with these basic ideas we can move on to working with
Dipping Our Toes Into the CPU Pool
Earlier we alluded to the idea of using futures with a CPU pool where we didn't have to manage which thread did which work, but in our examples so far we've still had to manually spin up threads. So let's fix that.
futures_cpupool crate offers us this functionality. Let's take a look at a basic example which does nearly the same thing as our second example above:
extern crate futures; extern crate fun_with_futures; extern crate futures_cpupool; use ; use Builder; use sleep_a_little_bit; // Feel free to change me! const NUM_OF_TASKS: usize = 10;
This outputs something similar to the following:
0 --> START 1 --> START 2 --> START 3 --> START 4 --> START 5 --> START 6 --> START 7 --> START 0 <-- WAITED 37 8 --> START 2 <-- WAITED 86 9 --> START 3 <-- WAITED 110 6 <-- WAITED 326 9 <-- WAITED 458 4 <-- WAITED 568 5 <-- WAITED 748 7 <-- WAITED 757 1 <-- WAITED 794 8 <-- WAITED 838 Job is done. Values returned in order: true
This time we didn't need to manage threads or which thread does what. The pool just handled it for us. This is pretty handy. We can be rather arbitrary with what we do with the pool. For example different spawned tasks can return different types:
extern crate futures; extern crate fun_with_futures; extern crate futures_cpupool; use Future; use Builder; use sleep_a_little_bit;
This means that we can create a CPU pool and delegate arbitrary tasks to it throughout the lifetime of our program. It may be that you want to create multiple pools to prevent starvation, for example a pool with 2 workers for network connections, and another with 2 workers for delayed jobs.
Visiting Tokio's Core
tokio project has been developed closely alongside of
futures and the projects share many authors. The project has a number of crates intended for building asynchronous applications. In the
tokio-core crate there are things like the main event loop, TCP handlers, and timeouts. Building on top of that are crates such as
tokio-service which build on top of these constructs.
We'll start with just
tokio-core and doing a simple HTTP GET request. Note since we're not actually using a HTTP client we need to handle all the details ourselves. Here's what that looks like:
extern crate futures; extern crate tokio_core; use ToSocketAddrs; use Future; use ; use Core; use TcpStream; const DOMAIN: &'static str = "google.com"; const PORT: u16 = 80;
This looks a bit different than what we've previously been doing with our futures.
Core::new() is how we create an event loop, or 'Reactor', which we can get
Handles which we use when issuing tasks such as
TcpStream::Connect. You can learn more about the basics of the Reactor here.
Working with the types provided by Tokio is slightly different than working with those provided by
std, however many of the ideas and concepts are the same. Many of the changes are due to the differences between syncronous and asynchronous I/O.
Near the end of the example we have
core.run() which is a way to fire off one off tasks and get a return value from them, similar to how we were using futures. Tokio also provides a
spawn_fn() functions which are executed in the background and must do their own handling of errors, making it ideal for tasks such as responding to new connections.
Let's play with
spawn_fn() in our next example by making a simple server.
extern crate futures; extern crate tokio_core; use SocketAddr; use Future; use Stream; use ; use Core; use TcpListener; const LISTEN_TO: &'static str = "0.0.0.0:8080"; // Throw some data at it with `echo "Test" | nc 127.0.0.1 8080`
echo "Test" | nc 127.0.0.1 8080 in another terminal echos back the data sent.
However, our example here isn't very useful, or idiomatic! For example, we can't use something like
curl to interact with it. It's a start though. Let's do better.
We'll improve on our previous example by using a
Codecs allow us to describe how to encode and decode the frames a
TcpStream (or anything else that can be
framed()). It asks us to define an
In and an
Out type as well as
Here's what our example with
EchoCodec could look like:
extern crate futures; extern crate tokio_core; use SocketAddr; use Future; use Stream; use Result; use ; use Core; use TcpListener; const LISTEN_TO: &'static str = "0.0.0.0:8080"; // Codecs can have state, but this one doesn't. ; // Throw some data at it with `echo "Test" | nc 127.0.0.1 8080`
If you test this out you'll find the same behaviour as the previous example. With a little change we can even make it handle lines instead of entire messages, we'll borrow some code from Tokio's examples to do this:
// Codecs can have state, but this one doesn't. ;
Running this example you can connect with
nc 127.0.0.1 8080 and send some lines of text, then get them back. Notice how we didn't have to change the rest of the code at all in order to make this change? All we had to do was change the behaivor of the codec.
Now let's try making a codec that encodes and decodes HTTP 1.1 GET and POST requests made from
curl that we can build on later:
// Codecs can have state, but this one doesn't. ;
Let's be honest with ourselves here, this is very naive and is obviously incomplete! However it works for
curl -vvvv localhost:8080 and
curl -vvvv localhost:8080 --data hello, and we got to learn a bit about how to use Tokio. So far so good! Let's build on it.
Services and Protocols
tokio-core is, by definition, minimal. The
tokio-service crates build atop it to create common abstractions which we can use for various applications.
In this next example we'll build off our previous example of our naive HTTP server. Since the code chunks are starting to get significant let's work with isolated bits and we'll show the full example at the very end.
Our goal will be to build an ultra simple web server that responds to GET/POST requests. A POST to
/cats with the data
meow should return a
200 OK with the old value (if any) as the data, and any future GET to
/cats should return
meow as well until it is replaced. Our goal is simplicity and learning, not being robust or perfect.
First, we'll go ahead and define our protocol. We'll use a simple pipelined, non-streaming protocol. This code is quite generic and will generally look quite similar for different implementations.
tokio-proto allow us to define the general style of our network. The
tokio-proto docs provide a good explanation of the differences.
use ServerProto; // Like codecs, protocols can carry state too! ;
The service is less generic, and handles our little "database" inside of it. Due to the
Proto and the
Codec already handling most of the complicated bits, it's fairly straightforward. Services are reusable abstractions that operate over protocols.
Here's what our little HTTP example looks like:
// Surprise! Services can also carry state.
At this point we have all the pieces and just need to put them all together. This requires some changes to our
main() function like so:
// Throw some data at it with `curl 127.0.0.1 8080/foo` and `curl 127.0.0.1 8080 --data bar`
Testing it out:
This is great, we've created a little network connected database with a "REST-ish" API. I hope this has taught you a bit about Futures and Tokio, and inspired you to play around further!
This post was supported in part through a time allocation from Asquera and can be found here. Thanks!
Complete Tokio Example
extern crate futures; extern crate tokio_core; extern crate tokio_proto; extern crate tokio_service; use SocketAddr; use ; use ; use ; use HashMap; use ; use TcpServer; use ServerProto; use Service; const LISTEN_TO: &'static str = "0.0.0.0:8080"; // Codecs can have state, but this one doesn't. ; // Like codecs, protocols can carry state too! ; // Surprise! Services can also carry state. // Throw some data at it with `curl 127.0.0.1 8080/foo` and `curl 127.0.0.1 8080 --data bar`