So, we need to find a different way for our executor to sleep and get woken up that doesn’t rely directly on Poll.

It turns out that this is quite easy. The standard library gives us what we need to get something working. By calling std::thread::current(), we can get a Thread object. This object is a handle to the current thread, and it gives us access to a few methods, one of which is unpark.

The standard library also gives us a method called std::thread::park(), which simply asks the OS scheduler to park our thread until we ask for it to get unparked later on.

It turns out that if we combine these, we have a way to both park and unpark the executor, which is exactly what we need.

Let’s create a Waker type based on this. In our example, we’ll define the Waker inside the executor module since that’s where we create this exact type of Waker, but you could argue that it belongs to the future module since it’s a part of the Future trait.

Important note

Our Waker relies on calling park/unpark on the Thread type from the standard library. This is OK for our example since it’s easy to understand, but given that any part of the code (including any libraries you use) can get a handle to the same thread by calling std::thread::current() and call park/unpark on it, it’s not a robust solution. If unrelated parts of the code call park/unpark on the same thread, we can miss wakeups or end up in deadlocks. Most production libraries create their own Parker type or rely on something such as crossbeam::sync::Parker (https://docs.rs/crossbeam/latest/crossbeam/sync/struct.Parker.html) instead.

We won’t implement Waker as a trait since passing trait objects around will significantly increase the complexity of our example, and it’s not in line with the current design of Future and Waker in Rust either.

Open the executor.rs file located inside the runtime folder, and let’s add all the imports we’re going to need right from the start:

ch08/b-reactor-executor/src/runtime/executor.rs
use crate::future::{Future, PollState};
use std::{
    cell::{Cell, RefCell},
    collections::HashMap,
    sync::{Arc, Mutex},
    thread::{self, Thread},
};

The next thing we add is our Waker:

ch08/b-reactor-executor/src/runtime/executor.rs
#[derive(Clone)]
pub struct Waker {
    thread: Thread,
    id: usize,
    ready_queue: Arc<Mutex<Vec<usize>>>,
}

The Waker will hold three things for us:

  • thread – A handle to the Thread object we mentioned earlier.
  • id – An usize that identifies which task this Waker is associated with.
  • ready_queue – This is a reference that can be shared between threads to a Vec<usize>, where usize represents the ID of a task that’s in the ready queue. We share this object with the executor so that we can push the task ID associated with the Waker onto that queue when it’s ready.

The implementation of our Waker will be quite simple:

ch08/b-reactor-executor/src/runtime/executor.rs
impl Waker {
    pub fn wake(&self) {
        self.ready_queue
            .lock()
            .map(|mut q| q.push(self.id))
            .unwrap();
        self.thread.unpark();
    }
}

When Waker::wake is called, we first take a lock on the Mutex that protects the ready queue we share with the executor. We then push the id value that identifies the task that this Waker is associated with onto the ready queue.

After that’s done, we call unpark on the executor thread and wake it up. It will now find the task associated with this Waker in the ready queue and call poll on it.

It’s worth mentioning that many designs take a shared reference (for example, an Arc<…>) to the future/task itself, and push that onto the queue. By doing so, they skip a level of indirection that we get here by representing the task as a usize instead of passing in a reference to it.

However, I personally think this way of doing it is easier to understand and reason about, and the end result will be the same.

Leave a Reply

Your email address will not be published. Required fields are marked *