From bb68f5d0e83eec609ef0016baa4710b30ce49d62 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Wed, 17 Mar 2021 01:55:01 +0100 Subject: [PATCH] Add optimized single-word WakerRegistration, add AtomicWakerRegistration. --- embassy/Cargo.toml | 2 + embassy/src/util/mod.rs | 2 + embassy/src/util/waker.rs | 75 +++++++++++++++++--------- embassy/src/util/waker_agnostic.rs | 87 ++++++++++++++++++++++++++++++ 4 files changed, 142 insertions(+), 24 deletions(-) create mode 100644 embassy/src/util/waker_agnostic.rs diff --git a/embassy/Cargo.toml b/embassy/Cargo.toml index c58fcbf2..39ad64a0 100644 --- a/embassy/Cargo.toml +++ b/embassy/Cargo.toml @@ -13,6 +13,8 @@ defmt-info = [] defmt-warn = [] defmt-error = [] +executor-agnostic = [] + [dependencies] defmt = { version = "0.2.0", optional = true } log = { version = "0.4.11", optional = true } diff --git a/embassy/src/util/mod.rs b/embassy/src/util/mod.rs index ae434a8b..e64e7f1f 100644 --- a/embassy/src/util/mod.rs +++ b/embassy/src/util/mod.rs @@ -3,6 +3,8 @@ mod forever; mod mutex; mod portal; mod signal; + +#[cfg_attr(feature = "executor-agnostic", path = "waker_agnostic.rs")] mod waker; pub use drop_bomb::*; diff --git a/embassy/src/util/waker.rs b/embassy/src/util/waker.rs index 68e45cf1..1f2d3a77 100644 --- a/embassy/src/util/waker.rs +++ b/embassy/src/util/waker.rs @@ -1,11 +1,14 @@ -use core::mem; -use core::task::Context; +use core::ptr::{self, NonNull}; use core::task::Waker; +use atomic_polyfill::{AtomicPtr, Ordering}; + +use crate::executor::raw::{task_from_waker, wake_task, Task}; + /// Utility struct to register and wake a waker. #[derive(Debug)] pub struct WakerRegistration { - waker: Option, + waker: Option>, } impl WakerRegistration { @@ -15,37 +18,61 @@ impl WakerRegistration { /// Register a waker. Overwrites the previous waker, if any. pub fn register(&mut self, w: &Waker) { + let w = unsafe { task_from_waker(w) }; match self.waker { - // Optimization: If both the old and new Wakers wake the same task, we can simply - // keep the old waker, skipping the clone. (In most executor implementations, - // cloning a waker is somewhat expensive, comparable to cloning an Arc). - Some(ref w2) if (w2.will_wake(w)) => {} - _ => { - // clone the new waker and store it - if let Some(old_waker) = mem::replace(&mut self.waker, Some(w.clone())) { - // We had a waker registered for another task. Wake it, so the other task can - // reregister itself if it's still interested. - // - // If two tasks are waiting on the same thing concurrently, this will cause them - // to wake each other in a loop fighting over this WakerRegistration. This wastes - // CPU but things will still work. - // - // If the user wants to have two tasks waiting on the same thing they should use - // a more appropriate primitive that can store multiple wakers. - old_waker.wake() - } + // Optimization: If both the old and new Wakers wake the same task, do nothing. + Some(w2) if w == w2 => {} + Some(w2) => { + // We had a waker registered for another task. Wake it, so the other task can + // reregister itself if it's still interested. + // + // If two tasks are waiting on the same thing concurrently, this will cause them + // to wake each other in a loop fighting over this WakerRegistration. This wastes + // CPU but things will still work. + // + // If the user wants to have two tasks waiting on the same thing they should use + // a more appropriate primitive that can store multiple wakers. + + unsafe { wake_task(w2) } + self.waker = Some(w); } + None => self.waker = Some(w), } } /// Wake the registered waker, if any. pub fn wake(&mut self) { if let Some(w) = self.waker.take() { - w.wake() + unsafe { wake_task(w) } + } + } +} + +pub struct AtomicWakerRegistration { + waker: AtomicPtr, +} + +impl AtomicWakerRegistration { + pub const fn new() -> Self { + Self { + waker: AtomicPtr::new(ptr::null_mut()), } } - pub fn context(&self) -> Option> { - self.waker.as_ref().map(|w| Context::from_waker(w)) + /// Register a waker. Overwrites the previous waker, if any. + pub fn register(&self, w: &Waker) { + let w = unsafe { task_from_waker(w) }; + let w2 = self.waker.swap(w.as_ptr(), Ordering::Relaxed); + if !w2.is_null() && w2 != w.as_ptr() { + unsafe { wake_task(NonNull::new_unchecked(w2)) }; + } + } + + /// Wake the registered waker, if any. + pub fn wake(&self) { + let w2 = self.waker.swap(ptr::null_mut(), Ordering::Relaxed); + if !w2.is_null() { + unsafe { wake_task(NonNull::new_unchecked(w2)) }; + } } } diff --git a/embassy/src/util/waker_agnostic.rs b/embassy/src/util/waker_agnostic.rs new file mode 100644 index 00000000..b4234c0f --- /dev/null +++ b/embassy/src/util/waker_agnostic.rs @@ -0,0 +1,87 @@ +use core::cell::Cell; +use core::mem; +use core::task::Waker; + +use cortex_m::interrupt::Mutex; + +/// Utility struct to register and wake a waker. +#[derive(Debug)] +pub struct WakerRegistration { + waker: Option, +} + +impl WakerRegistration { + pub const fn new() -> Self { + Self { waker: None } + } + + /// Register a waker. Overwrites the previous waker, if any. + pub fn register(&mut self, w: &Waker) { + match self.waker { + // Optimization: If both the old and new Wakers wake the same task, we can simply + // keep the old waker, skipping the clone. (In most executor implementations, + // cloning a waker is somewhat expensive, comparable to cloning an Arc). + Some(ref w2) if (w2.will_wake(w)) => {} + _ => { + // clone the new waker and store it + if let Some(old_waker) = mem::replace(&mut self.waker, Some(w.clone())) { + // We had a waker registered for another task. Wake it, so the other task can + // reregister itself if it's still interested. + // + // If two tasks are waiting on the same thing concurrently, this will cause them + // to wake each other in a loop fighting over this WakerRegistration. This wastes + // CPU but things will still work. + // + // If the user wants to have two tasks waiting on the same thing they should use + // a more appropriate primitive that can store multiple wakers. + old_waker.wake() + } + } + } + } + + /// Wake the registered waker, if any. + pub fn wake(&mut self) { + if let Some(w) = self.waker.take() { + w.wake() + } + } +} + +/// Utility struct to register and wake a waker. +pub struct AtomicWakerRegistration { + waker: Mutex>>, +} + +impl AtomicWakerRegistration { + pub const fn new() -> Self { + Self { + waker: Mutex::new(Cell::new(None)), + } + } + + /// Register a waker. Overwrites the previous waker, if any. + pub fn register(&mut self, w: &Waker) { + cortex_m::interrupt::free(|cs| { + let cell = self.waker.borrow(cs); + cell.set(match cell.replace(None) { + Some(w2) if (w2.will_wake(w)) => Some(w2), + Some(w2) => { + w2.wake(); + Some(w.clone()) + } + None => Some(w.clone()), + }) + }) + } + + /// Wake the registered waker, if any. + pub fn wake(&mut self) { + cortex_m::interrupt::free(|cs| { + let cell = self.waker.borrow(cs); + if let Some(w) = cell.replace(None) { + w.wake() + } + }) + } +}