Okay, so we’ll build on the last example and do just the same thing. Create a new project called c-async-await and copy Cargo.toml and everything in the src folder over.

The first thing we’ll do is go to future.rs and add a join_all function below our existing code:

ch07/c-async-await/src/future.rs
pub fn join_all<F: Future>(futures: Vec<F>) -> JoinAll<F> {
    let futures = futures.into_iter().map(|f| (false, f)).collect();
    JoinAll {
        futures,
        finished_count: 0,
    }
}

This function takes a collection of futures as an argument and returns a JoinAll<F> future.

The function simply creates a new collection. In this collection, we will have tuples consisting of the original futures we received and a bool value indicating whether the future is resolved or not.

Next, we have the definition of our JoinAll struct:

ch07/c-async-await/src/future.rs
pub struct JoinAll<F: Future> {
    futures: Vec<(bool, F)>,
    finished_count: usize,
}

This struct will simply store the collection we created and a finished_count. The last field will make it a little bit easier to keep track of how many futures have been resolved.

As we’re getting used to by now, most of the interesting parts happen in the Future implementation for JoinAll:
impl<F: Future> Future for JoinAll<F> {
    type Output = String;
    fn poll(&mut self) -> PollState<Self::Output> {
        for (finished, fut) in self.futures.iter_mut() {
            if *finished {
                continue;
            }
            match fut.poll() {
                PollState::Ready(_) => {
                    *finished = true;
                    self.finished_count += 1;
                }
                PollState::NotReady => continue,
            }
        }
        if self.finished_count == self.futures.len() {
            PollState::Ready(String::new())
        } else {
            PollState::NotReady
        }
    }
}

We set Output to String. This might strike you as strange since we don’t actually return anything from this implementation. The reason is that corofy will only work with futures that return a String (it’s one of its many, many shortcomings), so we just accept that and return an empty string on completion.

Next up is our poll implementation. The first thing we do is to loop over each (flag, future) tuple:
for (finished, fut) in self.futures.iter_mut()

Inside the loop, we first check if the flag for this future is set to finished. If it is, we simply go to the next item in the collection.

If it’s not finished, we poll the future.

If we get PollState::Ready back, we set the flag for this future to true so that we won’t poll it again and we increase the finished count.

Note

It’s worth noting that the join_all implementation we create here will not work in any meaningful way with futures that return a value. In our case, we simply throw the value away, but remember, we’re trying to keep this as simple as possible for now and the only thing we want to show is the concurrency aspect of calling join_all.

Tokio’s join_all implementation puts all the returned values in a Vec<T> and returns them when the JoinAll future resolves.

If we get PollState::NotReady, we simply continue to the next future in the collection.

After iterating through the entire collection, we check if we’ve resolved all the futures we originally received in if self.finished_count == self.futures.len().

If all our futures have been resolved, we return PollState::Ready with an empty string (to make corofy happy). If there are still unresolved futures, we return PollState::NotReady.

Leave a Reply

Your email address will not be published. Required fields are marked *