Now that we’ve built the block_on() function, it’s time to take one step further and turn it into a real executor. We want our executor to run not just one future at a time but many futures concurrently!

This blog post is inspired by juliex, a minimal executor and one of the first that pioneered async/await support in Rust. Today we’re writing a more modern and cleaner version of juliex from scratch.

The goal for our executor is to have only simple and completely safe code while delivering performance that rivals existing best-in-class executors.

Crates we’ll use as dependencies are crossbeam, async-task, once_cell, futures, and num_cpus.

The interface

The executor is going to have just one function that spawns a future:

fn spawn<F, R>(future: F) -> JoinHandle<R>
where
    F: Future<Output = R> + Send + 'static,
    R: Send + 'static,
{
    todo!()
}

The returned JoinHandle<R> is a type that implements Future<Output = R> and retrieves its output once the task has completed.

Note the similarities between this spawn() function and std::thread::spawn() — they’re almost equivalent, except one spawns an async task and the other spawns a thread.

Here’s a simple example spawning a task and awaiting its output:

fn main() {
    futures::executor::block_on(async {
        let handle = spawn(async { 1 + 2 });
        assert_eq!(handle.await, 3);
    });
}

Passing the output to JoinHandle

Since JoinHandle is a type implementing Future, let’s be lazy for now and simply define it as an alias for a pinned boxed future:

type JoinHandle<R> = Pin<Box<dyn Future<Output = R> + Send>>;

This works for now but don’t fret, later on we’ll rewrite it cleanly as a fresh struct and implement Future for it manually.

The output of the spawned future has to be sent to JoinHandle somehow. One way to do that is to create a oneshot channel and send the output through the channel when the future completes. The JoinHandle is then a future that awaits a message from the channel:

use futures::channel::oneshot;

fn spawn<F, R>(future: F) -> JoinHandle<R>
where
    F: Future<Output = R> + Send + 'static,
    R: Send + 'static,
{
    let (s, r) = oneshot::channel();
    let future = async move {
        let _ = s.send(future.await);
    };

    todo!()

    Box::pin(async { r.await.unwrap() })
}

The next step is allocating the wrapper future on the heap and pushing it into some kind of global task queue so that it gets processed by the executor. We call such an allocated future a task.

The anatomy of a task

A task consists of a future and its state. We need to keep track of the state to know whether the task is scheduled for running, whether it is currently running, whether it has completed, and so on.

Here’s the definition for our Task type:

struct Task {
    state: AtomicUsize,
    future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
}

We haven’t decided yet what exactly state is, but it will be some kind of AtomicUsize that can be updated from any thread. Let’s figure that out later.

The output type of the future is () — that is because the spawn() function has wrapped the original future into one that sends the output into the oneshot channel and then simply returns with ().

The future is pinned on the heap. It has to be because only pinned futures can be polled. But why is it also wrapped into a Mutex?

Every Waker associated with the task will hold a Task reference so that it can wake the task by pushing it into the global task queue. Therein lies the problem: Task instances are shared among threads, but polling the future requires mutable access to it. Solution: we wrap the future into a mutex to get mutable access to it.

If all this sounds confusing, don’t worry, it’ll make more sense once we finish the whole executor!

Moving on. Let’s complete the spawn() function by allocating a Task holding the future and its state:

fn spawn<F, R>(future: F) -> JoinHandle<R>
where
    F: Future<Output = R> + Send + 'static,
    R: Send + 'static,
{
    let (s, r) = oneshot::channel();
    let future = async move {
        let _ = s.send(future.await);
    };

    let task = Arc::new(Task {
        state: AtomicUsize::new(0),
        future: Mutex::new(Box::pin(future)),
    });
    QUEUE.send(task).unwrap();

    Box::pin(async { r.await.unwrap() })
}

Once the task is allocated, we push it into QUEUE, the global queue containing runnable tasks. The spawn() function is now complete, so let’s define QUEUE next…

Executor threads

Since we’re building an executor, there must be a background thread pool that takes runnable tasks from the queue and runs them, i.e. polls their futures.

Let’s define the global task queue and spawn executor threads the first time it is initialized:

use crossbeam::channel;
use once_cell::sync::Lazy;

static QUEUE: Lazy<channel::Sender<Arc<Task>>> = Lazy::new(|| {
    let (sender, receiver) = channel::unbounded::<Arc<Task>>();

    for _ in 0..num_cpus::get().max(1) {
        let receiver = receiver.clone();
        thread::spawn(move || receiver.iter().for_each(|task| task.run()));
    }

    sender
});

Pretty simple — an executor thread is literally a one-liner! So the task queue is an unbounded channel, while executor threads receive tasks from this channel and run each one of them.

The number of executor threads equals the number of cores on the system, which is retrieved by the num_cpus crate.

Now that we have the task queue and a thread pool, the last missing piece to implement is the run() method.

Task execution

Running a task simply means polling its future. We already know how to poll futures from the previous blog post where we implemented block_on(), which is going to help.

The run() method looks something like this:

impl Task {
    fn run(self: Arc<Task>) {
        let waker = todo!();

        let cx = &mut Context::from_waker(&waker);
        self.future.try_lock().unwrap().as_mut().poll(cx);
    }
}

Note that we need to lock the future to get mutable access and poll it. By design, no other thread will hold the lock at the same time, so try_lock() must always succeed.

But how do we create a waker? We’re going to use async_task::waker_fn() like the last time, but what is the wake function supposed to do?

We can’t push an Arc<Task> into QUEUE just like that. Here are potential race conditions we should think about:

  • What if a task completes and is then woken? A Waker can outlive its associated future, and we don’t want to end up with completed tasks in the queue.
  • What if a task is woken twice in a row before it gets run? We don’t want to have two references to the same task in the queue.
  • What if a task is woken while it’s running? If we push a reference to it into the queue, another executor thread might take it and attempt to run it at the same time.

If we think hard about it, we come up with two simple rules that solve all of these problems elegantly:

  1. The wake function schedules the task if it wasn’t woken already and if it’s not currently running.
  2. If the task was woken while it was running, the executor thread reschedules it.

Let’s sketch these rules out:

impl Task {
    fn run(self: Arc<Task>) {
        let waker = async_task::waker_fn(|| {
            todo!("schedule if the task is not woken already and is not running");
        });

        let cx = &mut Context::from_waker(&waker);
        self.future.try_lock().unwrap().as_mut().poll(cx);

        todo!("schedule if the task was woken while running");
    }
}

Remember the state field of type AtomicUsize we defined inside Task? Now is the time to store some useful data in it. There are two pieces of information we care about tasks that will help us implement waking:

  1. Has the task been woken already?
  2. Is the task currently running?

Both of those are true/false values, and we can represent them with two bits inside the state field:

const WOKEN: usize = 0b01;
const RUNNING: usize = 0b10;

The wake function sets the WOKEN bit. If both bits have previously been 0 (i.e. the task was neither woken nor running), then we schedule the task by pushing its reference into the queue:

let task = self.clone();
let waker = async_task::waker_fn(move || {
    if task.state.fetch_or(WOKEN, Ordering::SeqCst) == 0 {
        QUEUE.send(task.clone()).unwrap();
    }
});

Just before polling the future, we unset the WOKEN bit and set the RUNNING bit:

self.state.store(RUNNING, Ordering::SeqCst);
let cx = &mut Context::from_waker(&waker);
let poll = self.future.try_lock().unwrap().as_mut().poll(cx);

After polling the future, we unset the RUNNING bit and check if the previous state had bits WOKEN and RUNNING set (i.e. the task was woken while running). If so, we reschedule the task:

if poll.is_pending() {
    if self.state.fetch_and(!RUNNING, Ordering::SeqCst) == WOKEN | RUNNING {
        QUEUE.send(self).unwrap();
    }
}

Interestingly, if the task completes (i.e. its future is not pending anymore), we leave it in the running state forever. That way future wakeups can’t reschedule the completed task.

And that’s all. Done! We have a real executor now — see the complete implementation in v1.rs.

A touch of magic

If you found the Task struct and its state transitions intimidating, I feel you. But there is good news. You’ll be relieved to hear none of that mess needs to be done by hand because async-task can do it for us!

We basically need to replace Arc<Task> with async_task::Task<()> and replace the oneshot channel with async_task::JoinHandle<()>.

This is how we simplify spawning:

type Task = async_task::Task<()>;

fn spawn<F, R>(future: F) -> JoinHandle<R>
where
    F: Future<Output = R> + Send + 'static,
    R: Send + 'static,
{
    let (task, handle) = async_task::spawn(future, |t| QUEUE.send(t).unwrap(), ());
    task.schedule();
    Box::pin(async { handle.await.unwrap() })
}

The async_task::spawn() constructor takes three arguments:

  • The spawned future.
  • A schedule function that pushes the task into the queue. This function will be invoked either by the waker or by the run() method after polling the future.
  • An arbitrary piece of metadata called tag that is kept inside the task. Let’s not worry about it in this blog post and simply store () as the tag, i.e. nothing.

The constructor then returns two values:

  • An async_task::Task<()>. The () type is the tag.
  • An async_task::JoinHandle<R, ()>. Again, the () type is the tag. This join handle is a Future that outputs Option<R>, where the output of None indicates the task has panicked or got cancelled.

If you’re wondering about the schedule() method, it just invokes the schedule function on the task to push it into the queue. We could’ve also pushed the task into QUEUE by ourselves - the end result is the same.

Putting all pieces together, we end up with this remarkably simple executor:

static QUEUE: Lazy<channel::Sender<Task>> = Lazy::new(|| {
    let (sender, receiver) = channel::unbounded::<Task>();

    for _ in 0..num_cpus::get().max(1) {
        let receiver = receiver.clone();
        thread::spawn(move || receiver.iter().for_each(|task| task.run()));
    }

    sender
});

type Task = async_task::Task<()>;
type JoinHandle<R> = Pin<Box<dyn Future<Output = R> + Send>>;

fn spawn<F, R>(future: F) -> JoinHandle<R>
where
    F: Future<Output = R> + Send + 'static,
    R: Send + 'static,
{
    let (task, handle) = async_task::spawn(future, |t| QUEUE.send(t).unwrap(), ());
    task.schedule();
    Box::pin(async { handle.await.unwrap() })
}

The complete code can be found in v2.rs.

The benefit of using async_task::spawn() here is not just simplicity. It is also more efficient than hand-rolling our own Task as well as more robust. Just to name one example of robustness, async_task::Task drops the future immediately after it completes rather than when all task references get dropped.

In addition to that, async-task offers useful features like tags and cancellation, but let’s not talk about those today. It’s also worth mentioning async-task is a #[no_std] crate that can even be used without the standard library.

Improved JoinHandle

If you look at our latest executor closely, there is one remaining instance of inefficiency - the redundant Box::pin() allocation for the join handle.

It’d be great if we could use the following type alias, but we can’t because async_task::JoinHandle<R> outputs Option<R>, whereas our JoinHandle<R> outputs R:

type JoinHandle<R> = async_task::JoinHandle<R, ()>;

Instead, let’s wrap async_task::JoinHandle into a new struct that panics if the task panicked or if it was cancelled:

struct JoinHandle<R>(async_task::JoinHandle<R, ()>);

impl<R> Future for JoinHandle<R> {
    type Output = R;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match Pin::new(&mut self.0).poll(cx) {
            Poll::Pending => Poll::Pending,
            Poll::Ready(output) => Poll::Ready(output.expect("task failed")),
        }
    }
}

The complete executor implementation can be found in v3.rs.

Handling panics

So far we haven’t really thought much about what happens when a task panics, i.e. when a panic occurs inside an invocation of poll(). Right now, the run() method simply propagates the panic into the executor. We should think whether this is what we really want.

It’d be wise to handle those panics somehow. For example, we could simply ignore panics and move on. That way, they are still printed on the screen but won’t crash the whole process — panicking threads work exactly the same way.

To ignore panics, we wrap run() into catch_unwind():

use std::panic::catch_unwind;

static QUEUE: Lazy<channel::Sender<Task>> = Lazy::new(|| {
    let (sender, receiver) = channel::unbounded::<Task>();

    for _ in 0..num_cpus::get().max(1) {
        let receiver = receiver.clone();
        thread::spawn(move || {
            receiver.iter().for_each(|task| {
                let _ = catch_unwind(|| task.run());
            })
        });
    }

    sender
});

The complete executor that ignores panics be found in v4.rs.

There are many sensible panic handling strategies. Here are some strategies provided as examples in async-task’s repository:

It’s easy to implement any kind of panic handling strategy you want. And it’s totally up to you to decide which one is best!

How fast is this executor?

The current code is short, simple, and safe. But how fast is it?

A task allocated by async_task::spawn() is just a single allocation storing the task state, the future, and the output of the future when it completes. There are no other hidden costs — spawning is virtually as fast as it can possibly be!

Other executors like async-std and tokio allocate tasks exactly the same way. The basis for our executor is essentially an optimal implementation, and now we’re just one step away from being competitive with popular executors: work stealing.

Right now, all our executor threads share the same task queue. If all threads are hammering the queue at the same time, performance will suffer due to contention. The idea behind work stealing is to assign a distinct queue to each executor thread. That way, an executor thread only needs to steal tasks from other queues when its own queue is empty, meaning contention occurs only rarely rather than all the time.

I’ll talk more about work stealing in another blog post.

Correctness

Concurrency is hard, everybody is telling us. The Go language provides a built-in race detector, tokio has created its own concurrency checker, loom, to look for concurrency bugs, and crossbeam has in some cases even resorted to formal proofs. Sounds scary!

But we can just sit back, relax, and not worry about it. None of the race detectors, sanitizers, or even miri or loom, can catch bugs in our executor. The reason is that we have only written safe code, and safe code is memory safe, i.e. it can’t contain data races. Rust’s type system has already proven our executor correct.

The burden of ensuring memory safety is entirely on the dependencies, more specifically async-task and crossbeam. Rest assured, both take correctness very seriously. async-task has an extensive test suite covering all the edge cases, crossbeam’s channel has lots of tests and even passes the Go and std::sync::mpsc test suites, work-stealing deque is based on a formally proven implementation, while epoch-based garbage collector has a proof of correctness.

Executors are for everyone

Ever since Alex and Aaron first designed zero-cost futures in 2016, the plan was for each spawned future to incur the cost of just a single allocation:

There is one allocation needed per “task”, which usually works out to one per connection.

However, single-allocation tasks were a white lie — it took us years till we actually got them. Consider that tokio 0.1 spawns by allocating the future, then allocating task state, and finally allocating a oneshot channel. That’s three allocations per spawn!

Then, in August 2019, async-task was announced. For the first time ever, we managed to squash the future, task state, and a channel into just a single allocation. The reason why it took us so long is because manual allocation and managing state transitions inside tasks is incredibly complicated. But now that it’s been done, you don’t have to worry about any of it ever again.

Soon after that, in October 2019, tokio also adopted the same approach with an implementation similar to async-task.

These days, anyone can trivially build an efficient executor with single-allocation tasks. What used to be rocket science now isn’t anymore.