When a ‘static lifetime is used as a trait bound as we do here, it doesn’t actually mean that the lifetime of the Future trait we pass in must be static (meaning it will have to live until the end of the program). It means that it must be able to last until the end of the program, or, put another way, the lifetime can’t be constrained in any way.

Most often, when you encounter something that requires a ‘static bound, it simply means that you’ll have to give ownership over the thing you pass in. If you pass in any references, they need to have a ‘static lifetime. It’s less difficult to satisfy this constraint than you might expect.

The final part of step 2 will be to define and implement the Executor struct itself.

The Executor struct is very simple, and there is only one line of code to add:

ch08/b-reactor-executor/src/runtime/executor.rs
pub struct Executor;

Since all the state we need for our example is held in ExecutorCore, which is a static thread-local variable, our Executor struct doesn’t need any state. This also means that we don’t strictly need a struct at all, but to keep the API somewhat familiar, we do it anyway.

Most of the executor implementation is a handful of simple helper methods that end up in a block_on function, which is where the interesting parts really happen.

Since these helper methods are short and easy to understand, I’ll present them all here and just briefly go over what they do:

Note

We open the impl Executor block here but will not close it until we’ve finished implementing the block_on function.

ch08/b-reactor-executor/src/runtime/executor.rs
impl Executor {
    pub fn new() -> Self {
        Self {}
    }
    fn pop_ready(&self) -> Option<usize> {
        CURRENT_EXEC.with(|q| q.ready_queue.lock().map(|mut q| q.pop()).unwrap())
    }
    fn get_future(&self, id: usize) -> Option<Task> {
        CURRENT_EXEC.with(|q| q.tasks.borrow_mut().remove(&id))
    }
    fn get_waker(&self, id: usize) -> Waker {
        Waker {
            id,
            thread: thread::current(),
            ready_queue: CURRENT_EXEC.with(|q| q.ready_queue.clone()),
        }
    }
    fn insert_task(&self, id: usize, task: Task) {
        CURRENT_EXEC.with(|q| q.tasks.borrow_mut().insert(id, task));
    }
    fn task_count(&self) -> usize {
        CURRENT_EXEC.with(|q| q.tasks.borrow().len())
    }

So, we have six methods here:

  • new – Creates a new Executor instance. For simplicity, we have no initialization here, and everything is done lazily by design in the thread_local! macro.
  • pop_ready – This function takes a lock on read_queue and pops off an ID that’s ready from the back of Vec. Calling pop here means that we also remove the item from the collection. As a side note, since Waker pushes its ID to the back of ready_queue and we pop off from the back as well, we essentially get a Last In First Out (LIFO) queue. Using something such as VecDeque from the standard library would easily allow us to choose the order in which we remove items from the queue if we wish to change that behavior.
  • get_future – This function takes the ID of a top-level future as an argument, removes the future from the tasks collection, and returns it (if the task is found). This means that if the task returns NotReady (signaling that we’re not done with it), we need to remember to add it back to the collection again.
  • get_waker – This function creates a new Waker instance.
  • insert_task – This function takes an id property and a Task property and inserts them into our tasks collection.
  • task_count – This function simply returns a count of how many tasks we have in the queue.

The final and last part of the Executor implementation is the block_on function. This is also where we close the impl Executor block:

ch08/b-reactor-executor/src/runtime/executor.rs
pub fn block_on<F>(&mut self, future: F)
  where
      F: Future<Output = String> + ‘static,
  {
      spawn(future);
      loop {
          while let Some(id) = self.pop_ready() {
        let mut future = match self.get_future(id) {
          Some(f) => f,
          // guard against false wakeups
          None => continue,
        };
        let waker = self.get_waker(id);
        match future.poll(&waker) {
          PollState::NotReady => self.insert_task(id, future),
          PollState::Ready(_) => continue,
        }
      }
      let task_count = self.task_count();
      let name = thread::current().name().unwrap_or_default().to_string();
      if task_count > 0 {
        println!(“{name}: {task_count} pending tasks.
Sleep until notified.”);
        thread::park();
      } else {
        println!(“{name}: All tasks are finished”);
        break;
      }
    }
  }
}

Leave a Reply

Your email address will not be published. Required fields are marked *