The final part of our example is the Reactor. Our Reactor will:
- Efficiently wait and handle events that our runtime is interested in
- Store a collection of Waker types and make sure to wake the correct Waker when it gets a notification on a source it’s tracking
- Provide the necessary mechanisms for leaf futures such as HttpGetFuture, to register and deregister interests in events
- Provide a way for leaf futures to store the last received Waker
When we’re done with this step, we should have everything we need for our runtime, so let’s get to it.
Start by opening the reactor.rs file.
The first thing we do is add the dependencies we need:
ch08/b-reactor-executor/src/runtime/reactor.rs
use crate::runtime::Waker;
use mio::{net::TcpStream, Events, Interest, Poll, Registry, Token};
use std::{
collections::HashMap,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex, OnceLock,
},
thread,
};
After we’ve added our dependencies, we create a type alias called Wakers that aliases the type for our wakers collection:
ch08/b-reactor-executor/src/runtime/reactor.rs
type Wakers = Arc<Mutex<HashMap<usize, Waker>>>;
The next line will declare a static variable called REACTOR:
ch08/b-reactor-executor/src/runtime/reactor.rs
static REACTOR: OnceLock<Reactor> = OnceLock::new();
This variable will hold a OnceLock<Reactor>. In contrast to our CURRENT_EXEC static variable, this will be possible to access from different threads. OnceLock allows us to define a static variable that we can write to once so that we can initialize it when we start our Reactor. By doing so, we also make sure that there can only be a single instance of this specific reactor running in our program.
The variable will be private to this module, so we create a public function allowing other parts of our program to access it:
ch08/b-reactor-executor/src/runtime/reactor.rs
pub fn reactor() -> &’static Reactor {
REACTOR.get().expect(“Called outside an runtime context”)
}
The next thing we do is define our Reactor struct:
ch08/b-reactor-executor/src/runtime/reactor.rs
pub struct Reactor {
wakers: Wakers,
registry: Registry,
next_id: AtomicUsize,
}
This will be all the state our Reactor struct needs to hold:
- wakers – A HashMap of Waker objects, each identified by an integer
- registry – Holds a Registry instance so that we can interact with the event queue in mio
- next_id – Stores the next available ID so that we can track which event occurred and which Waker should be woken
The implementation of Reactor is actually quite simple. It’s only four short methods for interacting with the Reactor instance, so I’ll present them all here and give a brief explanation next:
ch08/b-reactor-executor/src/runtime/reactor.rs
impl Reactor {
pub fn register(&self, stream: &mut TcpStream, interest: Interest, id: usize) {
self.registry.register(stream, Token(id), interest).unwrap();
}
pub fn set_waker(&self, waker: &Waker, id: usize) {
let _ = self
.wakers
.lock()
.map(|mut w| w.insert(id, waker.clone()).is_none())
.unwrap();
}
pub fn deregister(&self, stream: &mut TcpStream, id: usize) {
self.wakers.lock().map(|mut w| w.remove(&id)).unwrap();
self.registry.deregister(stream).unwrap();
}
pub fn next_id(&self) -> usize {
self.next_id.fetch_add(1, Ordering::Relaxed)
}
}
Let’s briefly explain what these four methods do:
- register – This method is a thin wrapper around Registry::register, which we know from Chapter 4. The one thing to make a note of here is that we pass in an id property so that we can identify which event has occurred when we receive a notification later on.
- set_waker – This method adds a Waker to our HashMap using the provided id property as a key to identify it. If there is a Waker there already, we replace it and drop the old one. An important point to remember is that we should always store the most recent Waker so that this function can be called multiple times, even though there is already a Waker associated with the TcpStream.
- deregister – This function does two things. First, it removes the Waker from our wakers collection. Then, it deregisters the TcpStream from our Poll instance.
- I want to remind you at this point that while we only work with TcpStream in our examples, this could, in theory, be done with anything that implements mio’s Source trait, so the same thought process is valid in a much broader context than what we deal with here.
- next_id – This simply gets the current next_id value and increments the counter atomically. We don’t care about any happens before/after relationships happening here; we only care about not handing out the same value twice, so Ordering::Relaxed will suffice here. Memory ordering in atomic operations is a complex topic that we won’t be able to dive into in this book, but if you want to know more about the different memory orderings in Rust and what they mean, the official documentation is the right place to start: https://doc.rust-lang.org/stable/std/sync/atomic/enum.Ordering.html.