An Adventure In Rust's Async Runtime Model
December 2021
Abstraction
In Rust, we need to invent our self a async runtime because that Rust’s team decides not to include
it in std
, as it will nearly double the size of its code. We’ll need to invent our runtime & provide
a library in order to use this feature. At this point, there are tokio-rs
and
async-std
for you to use directly. Both have reach production level quality and
pretty easy to use, especially, tokio-rs
has been used widely in production.
While we have some awesome runtime libraries, it makes me wonder that how do those runtime actually work and how does the runtime model look like. You know, sometimes we just want to DIY some cool stuff, so here I am.
The code of this project can be found at smb374/thread-poll-server
.
1. The Model
The execution model is rather simple to explain (but we all know simple stuff can be hard to do well):
- Async functions can be
spawn
ed using a globalSpawner
. Spawner
will wrap up the function to aTask
, which will contain the boxed function.Spawner
will then notify theExecutor
to run it. Each task spawned will be queued by notification’s order.Executor
will continue run tasks in queue until all the task have been reached its end state,Pending
orReady(T)
.Pending
tasks will sleep until someone or theReactor
wakes it up.Ready(T)
means the task produced a return value of typeT
, the task will end and return.
Executor
will notifyReactor
to wait readiness events. Once it get some events,Reactor
will requeue the corresponding tasks toExecutor
’s queue.
Sounds simple, right? But there are quite a lot of pitfalls that can make you shoot yourself in your foot:
- Global variables are simply a headache when writing rust, because the type of Global variables need to satisfy
Send + Sync
, which is decided by the compiler. - Even if you’re in single thread, you still need to take locking & synchronization into consideration because Rust can’t tell whether you’re multi-threading or not, it assumes all the program you do is multi-threaded.
- Rust only provides
Future
trait, meaning that you’ll have to implement all this stuff. - Not even this Model is a standard, there’s no standardized model or procedure on how to run a async task, plus
Future
is lazy. - etc.
There are different ways to drive the lazy Future
to run, but this concept is used by most of the runtimes (I guess).
In the following sections I’ll talk about my design approach on Reactor
, single-threaded runtime,and multi-threaded runtime.
2. Reactor
Reactor
takes the job of getting rediness events and wakeup corresponding tasks.
Mostly, these kind of events are tighted with IO events or timer events.
I’ve only implement the IO reactor with the help of mio
library, which is a wrapper of the OS’s IO multiplexing solution.
The only thing it need to do is to wait events and wake up corresponding tasks, that’s all. A global Registry
will accept
registration and deregistration to the reactor, and a global map will accept adding/removing wakers corresponding to the event.
Note that the poll
method can only be accessed with mutable reference, which Rust guarentees that the will only be at most one mutable
reference access at the sametime, we’ll have to separate a polling thread for Reactor
in multi-threaded runtime.
Reactor
’s wait code:
// src/lib/reactor.rs
pub fn wait(&mut self, timeout: Option<Duration>) -> io::Result<()> {
// poll for IO events, block until one event appears.
self.poll.poll(&mut self.events, timeout)?;
let mut guard = WAKER_MAP.lock(); // lock global waker map.
let wakers_ref = guard.deref_mut();
for e in self.events.iter() { // iterate the events
// find token in waker map
if let Some(waker_set) = wakers_ref.get_mut(&e.token()) {
// wake up read waiting tasks
if e.is_readable() && !waker_set.read.is_empty() {
// drain all the wakers to clean the vec at the same time.
waker_set.read.drain(..).for_each(|w| w.wake_by_ref());
}
// wake up write waiting tasks
if e.is_writable() && !waker_set.write.is_empty() {
waker_set.write.drain(..).for_each(|w| w.wake_by_ref());
}
}
}
Ok(())
}
3. Single-threaded runtime
The single-threaded runtime can be described briefly by the following graph:
Executor
will be started byblock_on
an async function, which spawn the task & start the main loop.- Main loop will receive the
block_on
task and other task spawned. - For every task received, it will run itself until the under lying future returns either
Poll::Pending
orPoll::Ready(())
- After all the tasks are either sleeped or exited,
Executor
will block and waitReactor
finish waiting events Reactor
will wake up corresponding tasks according to the event it received- Tasks that is woke up will send itself to
Executor
The actual main loop & block_on
code:
// src/lib/single_thread.rs
fn run(&self) {
// setup reactor
let mut reactor = reactor::Reactor::new();
reactor.setup_registry();
loop {
// try to receive any task it got in queue, non-blocking
// Will get `mpsc::TryRecvError::Empty` when no task is in queue,
// meaning that all the tasks are either finished or slept.
match self.rx.try_recv() {
Ok(msg) => match msg {
// run task
Message::Run(task) => task.run(),
// received disconnect message, cleanup and exit.
Message::Close => break,
},
Err(mpsc::TryRecvError::Empty) => {
// mio wait for io harvest
reactor.wait(None).unwrap();
}
// no one is connected, bye.
Err(mpsc::TryRecvError::Disconnected) => break,
}
}
}
pub fn block_on<F>(&self, future: F)
where
F: Future<Output = ()> + 'static + Send,
{
spawn(future);
self.run()
}
For the rest of the part (e.g.: struct definition, imports, misc functions), please refer to src/lib/single_thread.rs
& src/lib/reactor.rs
.
The code of these two files is 300- SLOCs, pretty short and simple to implement and understand.
The problematic part is when you need to extend it to multi-threaded runtime.
4. Multi-threaded runtime
I think we all agree that things will get more complex than you thought when it comes to multi-threading. At least it’s totally true when writing an async runtime.
The first problem we’ll encounter is task scheduling. How would you distribute tasks to all the worker threads has always been a problem. Plus, different strategy suits for different scenarioes, it’s hard to decide what to use.
The second problem is you have take atomic actions, locking problem, and starvation into consideration. While a simple round-robin or hash distribution seems fair enough, but that’s fair only on schedule time. In reality, threads may starve for tasks, this will cause problem when we’re designing a server software that may encounter C10K problem, where you definitely don’t want a thread just chill and do nothing. Also, atomic actions and locking are pretty expensive, avoiding constantly doing them is also a hard problem.
I’ve designed two schedulers: the simple Round Robin scheduler and the more complicated Work Stealing scheduler. I’m not very good at multi-threading, so if you guys think there’s a better way, don’t hasitate and tell me your thoughts, simply launch an issue is a huge help.
4-1. The execution model
The brief execution model can be described by this graph:
Spawner
spawns multiple tasks and send to theScheduler
Scheduler
will then schedule the tasks use chosen strategy to distribute the task to each worker threads- Each worker thread will run all the tasks until there’s no tasks to run, then each of them will wait wakeups or task schedule
- At the mean time, the poll thread will make
Reactor
to continously poll for readiness events and wake up tasks - It’s up to the woke up tasks to wake up at the worker threads or to reschedule itself
4-2. The Scheduler trait and the Executor
To make us easy to swith or implement scheduling strategies, we defined a public Scheduler
trait:
// src/lib/schedulers/mod.rs
pub trait Scheduler {
fn init(size: usize) -> (Spawner, Self);
fn schedule(&mut self, future: BoxedFuture);
fn reschedule(&mut self, task: Arc<Task>);
fn shutdown(self);
fn receiver(&self) -> &Receiver<ScheduleMessage>;
}
This trait defined the needed behaviours that a Scheduler
must satisfy to run with the Executor
.
Take a look at the Executor
’s code:
// src/lib/multi_thread.rs
impl<S: Scheduler> Executor<S> {
pub fn new() -> Self {
// get number of current cpu cores
let cpus = num_cpus::get();
let size = if cpus == 0 { 1 } else { cpus - 1 };
// setup scheduler & spawner
let (spawner, scheduler) = S::init(size);
let (tx, rx) = channel::unbounded();
// setup global SPAWNER
SPAWNER.lock().deref_mut().replace(spawner);
// create the poll thread
let poll_thread_handle = thread::spawn(move || Self::poll_thread(rx));
Self {
scheduler,
poll_thread_notifier: tx,
poll_thread_handle,
}
}
fn poll_thread(rx: Receiver<()>) {
// poll thread will continously block until any rediness event
// it will exit automatically when error occurs or shutdown notify.
let mut reactor = reactor::Reactor::new();
reactor.setup_registry();
loop {
match rx.try_recv() {
// exit signal
Ok(()) | Err(TryRecvError::Disconnected) => break,
Err(TryRecvError::Empty) => {}
};
// blocking wait for readiness events
if let Err(e) = reactor.wait(None) {
eprintln!("reactor wait error: {}, exit poll thread", e);
break;
}
}
}
fn run(mut self) {
// The main loop will continously schedule tasks
// until shutdown or error.
loop {
// blocking receive schedule messages
match self.scheduler.receiver().recv() {
// continously schedule tasks
Ok(msg) => match msg {
ScheduleMessage::Schedule(future) => self.scheduler.schedule(future),
ScheduleMessage::Reschedule(task) => self.scheduler.reschedule(task),
ScheduleMessage::Shutdown => break,
},
Err(_) => {
eprintln!("exit...");
break;
}
}
}
// shutdown worker threads
self.scheduler.shutdown();
// shutdown poll thread
self.poll_thread_notifier
.send(())
.expect("Failed to send notify");
let _ = self.poll_thread_handle.join();
}
// The `block_on` function
pub fn block_on<F>(self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
spawn(future);
self.run();
}
}
The logic of the Executor
is similliar to the single-threaded runtime, except it needs to spawn a polling thread and the
tasks is scheduled to the worker threads.
However, the scheduler’s implementation is up to you to spawn worker threads and schedule the tasks.
I’ve made two scheduler implementations, RoundRobinScheduler
and WorkStealingScheduler
.
Note that when implementing both schedulers, I use crossbeam
’s channel rather than the std::mpsc
one, for multiplexing channels and broadcasting.
4-3. The RoundRobinScheduler
This scheduler is a pretty simple one: it just round through all the worker threads and spread it evenly when scheduling, but there’s no guarantee that the workload of each worker thread is fair.
The code is relatively easy to understand, so I won’t explain further, just look at the code.
4-4. The WorkStealingScheduler
This scheduler is a much complex one, compared to RoundRobinScheduler
.
The whole work stealing process of my implementation:
- Each worker thread will pop as much task from the
Worker
queue and execute them - When the worker queue is empty, it will try to receive as much woke up tasks as possible, then push them into the
Worker
queue. - If no woke up task is pushed, it will try to steal task from
Injector
queue of the scheduler or other’sWorker
queue. - If no tasks are stole, then it will block waiting for wake up task, broadcasting message from scheduler to inform that
Injector
queue has new tasks, or close message to exit the loop.- Note that batch stealing from
Injector
is done bysteal_batch_and_pop(&self.worker)
. It will try to steal multiple tasks and if success it will pop one out as the ruturn result. - code for stealing:
- Note that batch stealing from
// src/lib/schedulers/work_stealing.rs
fn steal_task(&self) -> Option<Arc<Task>> {
// A infinite iterator
// will generate *ONE* task at a time
std::iter::repeat_with(|| {
self.injector
.steal()
.or_else(|| self.stealers.iter().map(|s| s.steal()).collect())
})
// find first non-Retry steal result
.find(|s| !s.is_retry())
// Steal<T> -> Option<T>
.and_then(|s| s.success())
}
The whole code can be viewed at here, the main logic is described as above.
Conclusion
We have successfully went through the hard part of implementing an async runtime (Executor
and Runner
).
The runtime compactible library part is simply grab corresponding lib from std
, wrap it, run stuff as non-blocking mode,
register event on registry and add waker if io::ErrorKind::WouldBlock
then return Poll::Pending
,
return result with Poll::Ready(T)
.
I’ve made AsyncTcpListener
& AsyncTcpStream
with async functions that will serve an echo server.
Try it and launch issues. If you like the repo, just give me a star.
Also I need some help on the multi-threading runtime’s stuff, if you can help me with that part ot would be greatful.
See you at the next post!