From 3df66c44e3bd84df659e2f4d30bb18128cd89427 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Sat, 26 Dec 2020 16:42:44 +0100 Subject: [PATCH] Split executor into multiple files, remove old timers implementation. --- embassy/src/executor/executor.rs | 301 ------------------------- embassy/src/executor/mod.rs | 231 ++++++++++++++++++- embassy/src/executor/run_queue.rs | 70 ++++++ embassy/src/executor/timer_executor.rs | 77 ------- embassy/src/executor/util.rs | 32 +++ embassy/src/time/mod.rs | 2 - embassy/src/time/timer.rs | 63 ------ 7 files changed, 325 insertions(+), 451 deletions(-) delete mode 100644 embassy/src/executor/executor.rs create mode 100644 embassy/src/executor/run_queue.rs delete mode 100644 embassy/src/executor/timer_executor.rs create mode 100644 embassy/src/executor/util.rs delete mode 100644 embassy/src/time/timer.rs diff --git a/embassy/src/executor/executor.rs b/embassy/src/executor/executor.rs deleted file mode 100644 index 81a91577..00000000 --- a/embassy/src/executor/executor.rs +++ /dev/null @@ -1,301 +0,0 @@ -use core::cell::Cell; -use core::cell::UnsafeCell; -use core::future::Future; -use core::marker::PhantomData; -use core::mem; -use core::mem::MaybeUninit; -use core::pin::Pin; -use core::ptr; -use core::ptr::NonNull; -use core::sync::atomic::{AtomicPtr, AtomicU32, Ordering}; -use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; - -//============= -// UninitCell - -struct UninitCell(MaybeUninit>); -impl UninitCell { - const fn uninit() -> Self { - Self(MaybeUninit::uninit()) - } - - unsafe fn as_mut_ptr(&self) -> *mut T { - (*self.0.as_ptr()).get() - } - - unsafe fn as_mut(&self) -> &mut T { - &mut *self.as_mut_ptr() - } - - unsafe fn write(&self, val: T) { - ptr::write(self.as_mut_ptr(), val) - } - - unsafe fn drop_in_place(&self) { - ptr::drop_in_place(self.as_mut_ptr()) - } -} - -impl UninitCell { - unsafe fn read(&self) -> T { - ptr::read(self.as_mut_ptr()) - } -} - -//============= -// Data structures - -const STATE_RUNNING: u32 = 1 << 0; -const STATE_QUEUED: u32 = 1 << 1; - -struct Header { - state: AtomicU32, - next: AtomicPtr
, - executor: Cell<*const Executor>, - poll_fn: UninitCell, // Valid if STATE_RUNNING -} - -// repr(C) is needed to guarantee that header is located at offset 0 -// This makes it safe to cast between Header and Task pointers. -#[repr(C)] -pub struct Task { - header: Header, - future: UninitCell, // Valid if STATE_RUNNING -} - -#[derive(Copy, Clone, Debug)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub enum SpawnError { - Busy, -} - -//============= -// Atomic task queue using a very, very simple lock-free linked-list queue: -// -// To enqueue a task, task.next is set to the old head, and head is atomically set to task. -// -// Dequeuing is done in batches: the queue is emptied by atomically replacing head with -// null. Then the batch is iterated following the next pointers until null is reached. -// -// Note that batches will be iterated in the opposite order as they were enqueued. This should -// be OK for our use case. Hopefully it doesn't create executor fairness problems. - -struct Queue { - head: AtomicPtr
, -} - -impl Queue { - const fn new() -> Self { - Self { - head: AtomicPtr::new(ptr::null_mut()), - } - } - - /// Enqueues an item. Returns true if the queue was empty. - unsafe fn enqueue(&self, item: *mut Header) -> bool { - let mut prev = self.head.load(Ordering::Acquire); - loop { - (*item).next.store(prev, Ordering::Relaxed); - match self - .head - .compare_exchange_weak(prev, item, Ordering::AcqRel, Ordering::Acquire) - { - Ok(_) => break, - Err(next_prev) => prev = next_prev, - } - } - - prev.is_null() - } - - unsafe fn dequeue_all(&self, on_task: impl Fn(*mut Header)) { - let mut task = self.head.swap(ptr::null_mut(), Ordering::AcqRel); - - while !task.is_null() { - // If the task re-enqueues itself, the `next` pointer will get overwritten. - // Therefore, first read the next pointer, and only then process the task. - let next = (*task).next.load(Ordering::Relaxed); - - on_task(task); - - task = next - } - } -} - -//============= -// Waker - -static WAKER_VTABLE: RawWakerVTable = - RawWakerVTable::new(waker_clone, waker_wake, waker_wake, waker_drop); - -unsafe fn waker_clone(p: *const ()) -> RawWaker { - RawWaker::new(p, &WAKER_VTABLE) -} - -unsafe fn waker_wake(p: *const ()) { - let header = &*(p as *const Header); - - let mut current = header.state.load(Ordering::Acquire); - loop { - // If already scheduled, or if not started, - if (current & STATE_QUEUED != 0) || (current & STATE_RUNNING == 0) { - return; - } - - // Mark it as scheduled - let new = current | STATE_QUEUED; - - match header - .state - .compare_exchange_weak(current, new, Ordering::AcqRel, Ordering::Acquire) - { - Ok(_) => break, - Err(next_current) => current = next_current, - } - } - - // We have just marked the task as scheduled, so enqueue it. - let executor = &*header.executor.get(); - executor.enqueue(p as *mut Header); -} - -unsafe fn waker_drop(_: *const ()) { - // nop -} - -//============= -// Task - -impl Task { - pub const fn new() -> Self { - Self { - header: Header { - state: AtomicU32::new(0), - next: AtomicPtr::new(ptr::null_mut()), - executor: Cell::new(ptr::null()), - poll_fn: UninitCell::uninit(), - }, - future: UninitCell::uninit(), - } - } - - pub unsafe fn spawn(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken { - for task in pool { - let state = STATE_RUNNING | STATE_QUEUED; - if task - .header - .state - .compare_and_swap(0, state, Ordering::AcqRel) - == 0 - { - // Initialize the task - task.header.poll_fn.write(Self::poll); - task.future.write(future()); - - return SpawnToken { - header: Some(NonNull::new_unchecked(&task.header as *const Header as _)), - }; - } - } - - return SpawnToken { header: None }; - } - - unsafe fn poll(p: *mut Header) { - let this = &*(p as *const Task); - - let future = Pin::new_unchecked(this.future.as_mut()); - let waker = Waker::from_raw(RawWaker::new(p as _, &WAKER_VTABLE)); - let mut cx = Context::from_waker(&waker); - match future.poll(&mut cx) { - Poll::Ready(_) => { - this.future.drop_in_place(); - this.header - .state - .fetch_and(!STATE_RUNNING, Ordering::AcqRel); - } - Poll::Pending => {} - } - } -} - -unsafe impl Sync for Task {} - -//============= -// Spawn token - -#[must_use = "Calling a task function does nothing on its own. You must pass the returned SpawnToken to Executor::spawn()"] -pub struct SpawnToken { - header: Option>, -} - -impl Drop for SpawnToken { - fn drop(&mut self) { - // TODO deallocate the task instead. - panic!("SpawnToken instances may not be dropped. You must pass them to Executor::spawn()") - } -} - -//============= -// Executor - -pub struct Executor { - queue: Queue, - signal_fn: fn(), - not_send: PhantomData<*mut ()>, -} - -impl Executor { - pub const fn new(signal_fn: fn()) -> Self { - Self { - queue: Queue::new(), - signal_fn: signal_fn, - not_send: PhantomData, - } - } - - unsafe fn enqueue(&self, item: *mut Header) { - if self.queue.enqueue(item) { - (self.signal_fn)() - } - } - - /// Spawn a future on this executor. - pub fn spawn(&'static self, token: SpawnToken) -> Result<(), SpawnError> { - let header = token.header; - mem::forget(token); - - match header { - Some(header) => unsafe { - let header = header.as_ref(); - header.executor.set(self); - self.enqueue(header as *const _ as _); - Ok(()) - }, - None => Err(SpawnError::Busy), - } - } - - /// Runs the executor until the queue is empty. - pub fn run(&self) { - unsafe { - self.queue.dequeue_all(|p| { - let header = &*p; - - let state = header.state.fetch_and(!STATE_QUEUED, Ordering::AcqRel); - if state & STATE_RUNNING == 0 { - // If task is not running, ignore it. This can happen in the following scenario: - // - Task gets dequeued, poll starts - // - While task is being polled, it gets woken. It gets placed in the queue. - // - Task poll finishes, returning done=true - // - RUNNING bit is cleared, but the task is already in the queue. - return; - } - - // Run the task - header.poll_fn.read()(p as _); - }); - } - } -} diff --git a/embassy/src/executor/mod.rs b/embassy/src/executor/mod.rs index 1a68bdfd..435c97db 100644 --- a/embassy/src/executor/mod.rs +++ b/embassy/src/executor/mod.rs @@ -1,9 +1,224 @@ -mod executor; -mod timer_executor; - -// for time::Timer -pub(crate) use timer_executor::current_timer_queue; - pub use embassy_macros::task; -pub use executor::{Executor, SpawnError, SpawnToken, Task}; -pub use timer_executor::TimerExecutor; + +use core::cell::Cell; +use core::future::Future; +use core::marker::PhantomData; +use core::mem; +use core::pin::Pin; +use core::ptr; +use core::ptr::NonNull; +use core::sync::atomic::{AtomicU32, Ordering}; +use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; + +mod run_queue; +mod util; + +use self::run_queue::{RunQueue, RunQueueItem}; +use self::util::UninitCell; + +/// Task is spawned and future hasn't finished running yet. +const STATE_RUNNING: u32 = 1 << 0; +/// Task is in the executor run queue +const STATE_RUN_QUEUED: u32 = 1 << 1; +/// Task is in the executor timer queue +const STATE_TIMER_QUEUED: u32 = 1 << 2; + +pub(crate) struct TaskHeader { + state: AtomicU32, + run_queue_item: RunQueueItem, + executor: Cell<*const Executor>, // Valid if state != 0 + poll_fn: UninitCell, // Valid if STATE_RUNNING +} + +// repr(C) is needed to guarantee that header is located at offset 0 +// This makes it safe to cast between Header and Task pointers. +#[repr(C)] +pub struct Task { + header: TaskHeader, + future: UninitCell, // Valid if STATE_RUNNING +} + +#[derive(Copy, Clone, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum SpawnError { + Busy, +} + +//============= +// Waker + +static WAKER_VTABLE: RawWakerVTable = + RawWakerVTable::new(waker_clone, waker_wake, waker_wake, waker_drop); + +unsafe fn waker_clone(p: *const ()) -> RawWaker { + RawWaker::new(p, &WAKER_VTABLE) +} + +unsafe fn waker_wake(p: *const ()) { + let header = &*(p as *const TaskHeader); + + let mut current = header.state.load(Ordering::Acquire); + loop { + // If already scheduled, or if not started, + if (current & STATE_RUN_QUEUED != 0) || (current & STATE_RUNNING == 0) { + return; + } + + // Mark it as scheduled + let new = current | STATE_RUN_QUEUED; + + match header + .state + .compare_exchange_weak(current, new, Ordering::AcqRel, Ordering::Acquire) + { + Ok(_) => break, + Err(next_current) => current = next_current, + } + } + + // We have just marked the task as scheduled, so enqueue it. + let executor = &*header.executor.get(); + executor.enqueue(p as *mut TaskHeader); +} + +unsafe fn waker_drop(_: *const ()) { + // nop +} + +//============= +// Task + +impl Task { + pub const fn new() -> Self { + Self { + header: TaskHeader { + state: AtomicU32::new(0), + run_queue_item: RunQueueItem::new(), + executor: Cell::new(ptr::null()), + poll_fn: UninitCell::uninit(), + }, + future: UninitCell::uninit(), + } + } + + pub unsafe fn spawn(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken { + for task in pool { + let state = STATE_RUNNING | STATE_RUN_QUEUED; + if task + .header + .state + .compare_and_swap(0, state, Ordering::AcqRel) + == 0 + { + // Initialize the task + task.header.poll_fn.write(Self::poll); + task.future.write(future()); + + return SpawnToken { + header: Some(NonNull::new_unchecked( + &task.header as *const TaskHeader as _, + )), + }; + } + } + + return SpawnToken { header: None }; + } + + unsafe fn poll(p: *mut TaskHeader) { + let this = &*(p as *const Task); + + let future = Pin::new_unchecked(this.future.as_mut()); + let waker = Waker::from_raw(RawWaker::new(p as _, &WAKER_VTABLE)); + let mut cx = Context::from_waker(&waker); + match future.poll(&mut cx) { + Poll::Ready(_) => { + this.future.drop_in_place(); + this.header + .state + .fetch_and(!STATE_RUNNING, Ordering::AcqRel); + } + Poll::Pending => {} + } + } +} + +unsafe impl Sync for Task {} + +//============= +// Spawn token + +#[must_use = "Calling a task function does nothing on its own. You must pass the returned SpawnToken to Executor::spawn()"] +pub struct SpawnToken { + header: Option>, +} + +impl Drop for SpawnToken { + fn drop(&mut self) { + // TODO deallocate the task instead. + panic!("SpawnToken instances may not be dropped. You must pass them to Executor::spawn()") + } +} + +//============= +// Executor + +pub struct Executor { + run_queue: RunQueue, + signal_fn: fn(), + not_send: PhantomData<*mut ()>, +} + +impl Executor { + pub const fn new(signal_fn: fn()) -> Self { + Self { + run_queue: RunQueue::new(), + signal_fn: signal_fn, + not_send: PhantomData, + } + } + + unsafe fn enqueue(&self, item: *mut TaskHeader) { + if self.run_queue.enqueue(item) { + (self.signal_fn)() + } + } + + /// Spawn a future on this executor. + pub fn spawn(&'static self, token: SpawnToken) -> Result<(), SpawnError> { + let header = token.header; + mem::forget(token); + + match header { + Some(header) => unsafe { + let header = header.as_ref(); + header.executor.set(self); + self.enqueue(header as *const _ as _); + Ok(()) + }, + None => Err(SpawnError::Busy), + } + } + + /// Runs the executor until the queue is empty. + pub fn run(&self) { + unsafe { + self.run_queue.dequeue_all(|p| { + let header = &*p; + + let state = header.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); + if state & STATE_RUNNING == 0 { + // If task is not running, ignore it. This can happen in the following scenario: + // - Task gets dequeued, poll starts + // - While task is being polled, it gets woken. It gets placed in the queue. + // - Task poll finishes, returning done=true + // - RUNNING bit is cleared, but the task is already in the queue. + return; + } + + // Run the task + header.poll_fn.read()(p as _); + }); + } + } +} diff --git a/embassy/src/executor/run_queue.rs b/embassy/src/executor/run_queue.rs new file mode 100644 index 00000000..1cdecee3 --- /dev/null +++ b/embassy/src/executor/run_queue.rs @@ -0,0 +1,70 @@ +use core::ptr; +use core::sync::atomic::{AtomicPtr, Ordering}; + +use super::TaskHeader; + +pub(crate) struct RunQueueItem { + next: AtomicPtr, +} + +impl RunQueueItem { + pub const fn new() -> Self { + Self { + next: AtomicPtr::new(ptr::null_mut()), + } + } +} + +/// Atomic task queue using a very, very simple lock-free linked-list queue: +/// +/// To enqueue a task, task.next is set to the old head, and head is atomically set to task. +/// +/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with +/// null. Then the batch is iterated following the next pointers until null is reached. +/// +/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK +/// for our purposes: it can't crate fairness problems since the next batch won't run until the +/// current batch is completely processed, so even if a task enqueues itself instantly (for example +/// by waking its own waker) can't prevent other tasks from running. +pub(crate) struct RunQueue { + head: AtomicPtr, +} + +impl RunQueue { + pub const fn new() -> Self { + Self { + head: AtomicPtr::new(ptr::null_mut()), + } + } + + /// Enqueues an item. Returns true if the queue was empty. + pub(crate) unsafe fn enqueue(&self, item: *mut TaskHeader) -> bool { + let mut prev = self.head.load(Ordering::Acquire); + loop { + (*item).run_queue_item.next.store(prev, Ordering::Relaxed); + match self + .head + .compare_exchange_weak(prev, item, Ordering::AcqRel, Ordering::Acquire) + { + Ok(_) => break, + Err(next_prev) => prev = next_prev, + } + } + + prev.is_null() + } + + pub(crate) unsafe fn dequeue_all(&self, on_task: impl Fn(*mut TaskHeader)) { + let mut task = self.head.swap(ptr::null_mut(), Ordering::AcqRel); + + while !task.is_null() { + // If the task re-enqueues itself, the `next` pointer will get overwritten. + // Therefore, first read the next pointer, and only then process the task. + let next = (*task).run_queue_item.next.load(Ordering::Relaxed); + + on_task(task); + + task = next + } + } +} diff --git a/embassy/src/executor/timer_executor.rs b/embassy/src/executor/timer_executor.rs deleted file mode 100644 index 1f89490f..00000000 --- a/embassy/src/executor/timer_executor.rs +++ /dev/null @@ -1,77 +0,0 @@ -use super::executor::{Executor, SpawnError, SpawnToken}; -use core::ptr; -use core::sync::atomic::{AtomicPtr, Ordering}; -use futures_intrusive::timer as fi; - -use crate::time::Alarm; - -pub(crate) struct IntrusiveClock; - -impl fi::Clock for IntrusiveClock { - fn now(&self) -> u64 { - crate::time::now() - } -} - -pub(crate) type TimerQueue = fi::LocalTimerService; - -pub struct TimerExecutor { - inner: Executor, - alarm: A, - timer_queue: TimerQueue, -} - -impl TimerExecutor { - pub fn new(alarm: A, signal_fn: fn()) -> Self { - alarm.set_callback(signal_fn); - Self { - inner: Executor::new(signal_fn), - alarm, - timer_queue: TimerQueue::new(&IntrusiveClock), - } - } - - /// Spawn a future on this executor. - /// - /// safety: can only be called from the executor thread - pub fn spawn(&'static self, token: SpawnToken) -> Result<(), SpawnError> { - self.inner.spawn(token) - } - - /// Runs the executor until the queue is empty. - /// - /// safety: can only be called from the executor thread - pub fn run(&'static self) { - with_timer_queue(&self.timer_queue, || { - self.timer_queue.check_expirations(); - self.inner.run(); - - match self.timer_queue.next_expiration() { - // If this is in the past, set_alarm will immediately trigger the alarm, - // which will make the wfe immediately return so we do another loop iteration. - Some(at) => self.alarm.set(at), - None => self.alarm.clear(), - } - }) - } -} - -static CURRENT_TIMER_QUEUE: AtomicPtr = AtomicPtr::new(ptr::null_mut()); - -fn with_timer_queue(svc: &'static TimerQueue, f: impl FnOnce() -> R) -> R { - let svc = svc as *const _ as *mut _; - let prev_svc = CURRENT_TIMER_QUEUE.swap(svc, Ordering::Relaxed); - let r = f(); - let svc2 = CURRENT_TIMER_QUEUE.swap(prev_svc, Ordering::Relaxed); - assert_eq!(svc, svc2); - r -} - -pub(crate) fn current_timer_queue() -> &'static TimerQueue { - unsafe { - CURRENT_TIMER_QUEUE - .load(Ordering::Relaxed) - .as_ref() - .unwrap() - } -} diff --git a/embassy/src/executor/util.rs b/embassy/src/executor/util.rs new file mode 100644 index 00000000..ca15b695 --- /dev/null +++ b/embassy/src/executor/util.rs @@ -0,0 +1,32 @@ +use core::cell::UnsafeCell; +use core::mem::MaybeUninit; +use core::ptr; + +pub(crate) struct UninitCell(MaybeUninit>); +impl UninitCell { + pub const fn uninit() -> Self { + Self(MaybeUninit::uninit()) + } + + pub unsafe fn as_mut_ptr(&self) -> *mut T { + (*self.0.as_ptr()).get() + } + + pub unsafe fn as_mut(&self) -> &mut T { + &mut *self.as_mut_ptr() + } + + pub unsafe fn write(&self, val: T) { + ptr::write(self.as_mut_ptr(), val) + } + + pub unsafe fn drop_in_place(&self) { + ptr::drop_in_place(self.as_mut_ptr()) + } +} + +impl UninitCell { + pub unsafe fn read(&self) -> T { + ptr::read(self.as_mut_ptr()) + } +} diff --git a/embassy/src/time/mod.rs b/embassy/src/time/mod.rs index 2b463155..89683837 100644 --- a/embassy/src/time/mod.rs +++ b/embassy/src/time/mod.rs @@ -1,11 +1,9 @@ mod duration; mod instant; -mod timer; mod traits; pub use duration::Duration; pub use instant::Instant; -pub use timer::{Ticker, Timer}; pub use traits::*; use crate::fmt::*; diff --git a/embassy/src/time/timer.rs b/embassy/src/time/timer.rs deleted file mode 100644 index 8756368c..00000000 --- a/embassy/src/time/timer.rs +++ /dev/null @@ -1,63 +0,0 @@ -use core::future::Future; -use core::pin::Pin; -use core::task::{Context, Poll}; -use futures::Stream; -use futures_intrusive::timer::{LocalTimer, LocalTimerFuture}; - -use super::{Duration, Instant}; -use crate::executor::current_timer_queue; - -pub struct Timer { - inner: LocalTimerFuture<'static>, -} - -impl Timer { - pub fn at(when: Instant) -> Self { - Self { - inner: current_timer_queue().deadline(when.as_ticks()), - } - } - - pub fn after(dur: Duration) -> Self { - Self::at(Instant::now() + dur) - } -} - -impl Future for Timer { - type Output = (); - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().inner) }.poll(cx) - } -} - -pub struct Ticker { - inner: LocalTimerFuture<'static>, - next: Instant, - dur: Duration, -} - -impl Ticker { - pub fn every(dur: Duration) -> Self { - let next = Instant::now() + dur; - Self { - inner: current_timer_queue().deadline(next.as_ticks()), - next, - dur, - } - } -} - -impl Stream for Ticker { - type Item = (); - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = unsafe { self.get_unchecked_mut() }; - match unsafe { Pin::new_unchecked(&mut this.inner) }.poll(cx) { - Poll::Ready(_) => { - this.next += this.dur; - this.inner = current_timer_queue().deadline(this.next.as_ticks()); - Poll::Ready(Some(())) - } - Poll::Pending => Poll::Pending, - } - } -}