The first thing we do is give HttpGetFuture an identity. It’s the source of events we want to track with our Reactor, so we want it to have the same ID until we’re done with it:
ch08/b-reactor-executor/src/http.rs
struct HttpGetFuture {
stream: Option<mio::net::TcpStream>,
buffer: Vec<u8>,
path: String,
id: usize,
}
We also need to retrieve a new ID from the reactor when the future is created:
ch08/b-reactor-executor/src/http.rs
impl HttpGetFuture {
fn new(path: String) -> Self {
let id = reactor().next_id();
Self {
stream: None,
buffer: vec![],
path,
id,
}
}
Next, we have to locate the poll implementation for HttpGetFuture.
The first thing we need to do is make sure that we register interest with our Poll instance and register the Waker we receive with the Reactor the first time the future gets polled. Since we don’t register directly with Registry anymore, we remove that line of code and add these new lines instead:
ch08/b-reactor-executor/src/http.rs
if self.stream.is_none() {
println!(“FIRST POLL – START OPERATION”);
self.write_request();
let stream = self.stream.as_mut().unwrap();
runtime::reactor().register(stream, Interest::READABLE, self.id);
runtime::reactor().set_waker(waker, self.id);
}
Lastly, we need to make some minor changes to how we handle the different conditions when reading from TcpStream:
ch08/b-reactor-executor/src/http.rs
match self.stream.as_mut().unwrap().read(&mut buff) {
Ok(0) => {
let s = String::from_utf8_lossy(&self.buffer);
runtime::reactor().deregister(self.stream.as_mut().unwrap(), self.id);
break PollState::Ready(s.to_string());
}
Ok(n) => {
self.buffer.extend(&buff[0..n]);
continue;
}
Err(e) if e.kind() == ErrorKind::WouldBlock => {
runtime::reactor().set_waker(waker, self.id);
break PollState::NotReady;
}
Err(e) => panic!(“{e:?}”),
}
The first change is to deregister the stream from our Poll instance when we’re done.
The second change is a little more subtle. If you read the documentation for Future::poll in Rust (https://doc.rust-lang.org/stable/std/future/trait.Future.html#tymethod.poll) carefully, you’ll see that it’s expected that the Waker from the most recent call should be scheduled to wake up. That means that every time we get a WouldBlock error, we need to make sure we store the most recent Waker.
The reason is that the future could have moved to a different executor in between calls, and we need to wake up the correct one (it won’t be possible to move futures like those in our example, but let’s play by the same rules).
And that’s it!
Congratulations! You’ve now created a fully working runtime based on the reactor-executor pattern. Well done!
Now, it’s time to test it and run a few experiments with it.
Let’s go back to main.rs and change the main function so that we get our program running correctly with our new runtime.
First of all, let’s remove the dependency on the Runtime struct and make sure our imports look like this:
ch08/b-reactor-executor/src/main.rs
mod future;
mod http;
mod runtime;
use future::{Future, PollState};
use runtime::Waker;
Next, we need to make sure that we initialize our runtime and pass in our future to executor.block_on. Our main function should look like this:
ch08/b-reactor-executor/src/main.rs
fn main() {
let mut executor = runtime::init();
executor.block_on(async_main());
}
And finally, let’s try it out by running it:
cargo run
.
You should get the following output:
Program starting
FIRST POLL – START OPERATION
main: 1 pending tasks.
Sleep until notified.
HTTP/1.1 200 OK
content-length: 15
connection: close
content-type: text/plain; charset=utf-8
date: Thu, xx xxx xxxx 15:38:08 GMT
HelloAsyncAwait
FIRST POLL – START OPERATION
main: 1 pending tasks.
Sleep until notified.
HTTP/1.1 200 OK
content-length: 15
connection: close
content-type: text/plain; charset=utf-8
date: Thu, xx xxx xxxx 15:38:08 GMT
HelloAsyncAwait
main: All tasks are finished
Great – it’s working just as expected!!!
However, we’re not really using any of the new capabilities of our runtime yet so before we leave this chapter, let’s have some fun and see what it can do.