Now that our Reactor is set up, we only have two short functions left. The first one is event_loop, which will hold the logic for our event loop that waits and reacts to new events:
ch08/b-reactor-executor/src/runtime/reactor.rs
fn event_loop(mut poll: Poll, wakers: Wakers) {
let mut events = Events::with_capacity(100);
loop {
poll.poll(&mut events, None).unwrap();
for e in events.iter() {
let Token(id) = e.token();
let wakers = wakers.lock().unwrap();
if let Some(waker) = wakers.get(&id) {
waker.wake();
}
}
}
}
This function takes a Poll instance and a Wakers collection as arguments. Let’s go through it step by step:
- The first thing we do is create an events collection. This should be familiar since we did the exact same thing in Chapter 4.
- The next thing we do is create a loop that in our case will continue to loop for eternity. This makes our example short and simple, but it has the downside that we have no way of shutting our event loop down once it’s started. Fixing that is not especially difficult, but since it won’t be necessary for our example, we don’t cover this here.
- Inside the loop, we call Poll::poll with a timeout of None, which means it will never time out and block until it receives an event notification.
- When the call returns, we loop through every event we receive.
- If we receive an event, it means that something we registered interest in happened, so we get the id we passed in when we first registered an interest in events on this TcpStream.
- Lastly, we try to get the associated Waker and call Waker::wake on it. We guard ourselves from the fact that the Waker may have been removed from our collection already, in which case we do nothing.
It’s worth noting that we can filter events if we want to here. Tokio provides some methods on the Event object to check several things about the event it reported. For our use in this example, we don’t need to filter events.
Finally, the last function is the second public function in this module and the one that initializes and starts the runtime:
ch08/b-reactor-executor/src/runtime/runtime.rs
pub fn start() {
use thread::spawn;
let wakers = Arc::new(Mutex::new(HashMap::new()));
let poll = Poll::new().unwrap();
let registry = poll.registry().try_clone().unwrap();
let next_id = AtomicUsize::new(1);
let reactor = Reactor {
wakers: wakers.clone(),
registry,
next_id,
};
REACTOR.set(reactor).ok().expect(“Reactor already running”);
spawn(move || event_loop(poll, wakers));
}
The start method should be fairly easy to understand. The first thing we do is create our Wakers collection and our Poll instance. From the Poll instance, we get an owned version of Registry. We initialize next_id to 1 (for debugging purposes, I wanted to initialize it to a different start value than our Executor) and create our Reactor object.
Then, we set the static variable we named REACTOR by giving it our Reactor instance.
The last thing is probably the most important one to pay attention to. We spawn a new OS thread and start our event_loop function on that one. This also means that we pass on our Poll instance to the event loop thread for good.
Now, the best practice would be to store the JoinHandle returned from spawn so that we can join the thread later on, but our thread has no way to shut down the event loop anyway, so joining it later makes little sense, and we simply discard the handle.
I don’t know if you agree with me, but the logic here is not that complex when we break it down into smaller pieces. Since we know how epoll and mio work already, the rest is pretty easy to understand.
Now, we’re not done yet. We still have some small changes to make to our HttpGetFuture leaf future since it doesn’t register with the reactor at the moment. Let’s fix that.
Start by opening the http.rs file.
Since we already added the correct imports when we opened the file to adapt everything to the new Future interface, there are only a few places we need to change that so this leaf future integrates nicely with our reactor.