From 41d558a5f40bbea865f2ba0899b34baed9c1c0d1 Mon Sep 17 00:00:00 2001 From: Grant Miller Date: Mon, 20 Mar 2023 16:20:51 -0500 Subject: [PATCH 1/3] executor: Allow TaskStorage to auto-implement `Sync` --- embassy-executor/src/raw/mod.rs | 156 +++++++++++++++++------- embassy-executor/src/raw/timer_queue.rs | 14 ++- embassy-executor/src/raw/util.rs | 29 +++++ embassy-executor/src/spawner.rs | 12 +- 4 files changed, 154 insertions(+), 57 deletions(-) diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 42bd8226..938492c2 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -13,8 +13,8 @@ mod timer_queue; pub(crate) mod util; mod waker; -use core::cell::Cell; use core::future::Future; +use core::marker::PhantomData; use core::mem; use core::pin::Pin; use core::ptr::NonNull; @@ -30,7 +30,7 @@ use embassy_time::Instant; use rtos_trace::trace; use self::run_queue::{RunQueue, RunQueueItem}; -use self::util::UninitCell; +use self::util::{SyncUnsafeCell, UninitCell}; pub use self::waker::task_from_waker; use super::SpawnToken; @@ -46,11 +46,11 @@ pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; pub(crate) struct TaskHeader { pub(crate) state: AtomicU32, pub(crate) run_queue_item: RunQueueItem, - pub(crate) executor: Cell>, - poll_fn: Cell>, + pub(crate) executor: SyncUnsafeCell>, + poll_fn: SyncUnsafeCell>, #[cfg(feature = "integrated-timers")] - pub(crate) expires_at: Cell, + pub(crate) expires_at: SyncUnsafeCell, #[cfg(feature = "integrated-timers")] pub(crate) timer_queue_item: timer_queue::TimerQueueItem, } @@ -61,6 +61,9 @@ pub struct TaskRef { ptr: NonNull, } +unsafe impl Send for TaskRef where &'static TaskHeader: Send {} +unsafe impl Sync for TaskRef where &'static TaskHeader: Sync {} + impl TaskRef { fn new(task: &'static TaskStorage) -> Self { Self { @@ -115,12 +118,12 @@ impl TaskStorage { raw: TaskHeader { state: AtomicU32::new(0), run_queue_item: RunQueueItem::new(), - executor: Cell::new(None), + executor: SyncUnsafeCell::new(None), // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` - poll_fn: Cell::new(None), + poll_fn: SyncUnsafeCell::new(None), #[cfg(feature = "integrated-timers")] - expires_at: Cell::new(Instant::from_ticks(0)), + expires_at: SyncUnsafeCell::new(Instant::from_ticks(0)), #[cfg(feature = "integrated-timers")] timer_queue_item: timer_queue::TimerQueueItem::new(), }, @@ -170,9 +173,15 @@ impl TaskStorage { // it's a noop for our waker. mem::forget(waker); } -} -unsafe impl Sync for TaskStorage {} + #[doc(hidden)] + #[allow(dead_code)] + fn _assert_sync(self) { + fn assert_sync(_: T) {} + + assert_sync(self) + } +} struct AvailableTask { task: &'static TaskStorage, @@ -279,29 +288,13 @@ impl TaskPool { } } -/// Raw executor. -/// -/// This is the core of the Embassy executor. It is low-level, requiring manual -/// handling of wakeups and task polling. If you can, prefer using one of the -/// [higher level executors](crate::Executor). -/// -/// The raw executor leaves it up to you to handle wakeups and scheduling: -/// -/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks -/// that "want to run"). -/// - You must supply a `signal_fn`. The executor will call it to notify you it has work -/// to do. You must arrange for `poll()` to be called as soon as possible. -/// -/// `signal_fn` can be called from *any* context: any thread, any interrupt priority -/// level, etc. It may be called synchronously from any `Executor` method call as well. -/// You must deal with this correctly. -/// -/// In particular, you must NOT call `poll` directly from `signal_fn`, as this violates -/// the requirement for `poll` to not be called reentrantly. -pub struct Executor { +struct SignalCtx(*mut ()); +unsafe impl Sync for SignalCtx {} + +pub(crate) struct SyncExecutor { run_queue: RunQueue, signal_fn: fn(*mut ()), - signal_ctx: *mut (), + signal_ctx: SignalCtx, #[cfg(feature = "integrated-timers")] pub(crate) timer_queue: timer_queue::TimerQueue, @@ -309,14 +302,8 @@ pub struct Executor { alarm: AlarmHandle, } -impl Executor { - /// Create a new executor. - /// - /// When the executor has work to do, it will call `signal_fn` with - /// `signal_ctx` as argument. - /// - /// See [`Executor`] docs for details on `signal_fn`. - pub fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self { +impl SyncExecutor { + pub(crate) fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self { #[cfg(feature = "integrated-timers")] let alarm = unsafe { unwrap!(driver::allocate_alarm()) }; #[cfg(feature = "integrated-timers")] @@ -325,7 +312,7 @@ impl Executor { Self { run_queue: RunQueue::new(), signal_fn, - signal_ctx, + signal_ctx: SignalCtx(signal_ctx), #[cfg(feature = "integrated-timers")] timer_queue: timer_queue::TimerQueue::new(), @@ -346,7 +333,7 @@ impl Executor { trace::task_ready_begin(task.as_ptr() as u32); if self.run_queue.enqueue(cs, task) { - (self.signal_fn)(self.signal_ctx) + (self.signal_fn)(self.signal_ctx.0) } } @@ -387,7 +374,8 @@ impl Executor { /// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to /// somehow schedule for `poll()` to be called later, at a time you know for sure there's /// no `poll()` already running. - pub unsafe fn poll(&'static self) { + pub(crate) unsafe fn poll(&'static self) { + #[allow(clippy::never_loop)] loop { #[cfg(feature = "integrated-timers")] self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task)); @@ -441,6 +429,84 @@ impl Executor { #[cfg(feature = "rtos-trace")] trace::system_idle(); } +} + +/// Raw executor. +/// +/// This is the core of the Embassy executor. It is low-level, requiring manual +/// handling of wakeups and task polling. If you can, prefer using one of the +/// [higher level executors](crate::Executor). +/// +/// The raw executor leaves it up to you to handle wakeups and scheduling: +/// +/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks +/// that "want to run"). +/// - You must supply a `signal_fn`. The executor will call it to notify you it has work +/// to do. You must arrange for `poll()` to be called as soon as possible. +/// +/// `signal_fn` can be called from *any* context: any thread, any interrupt priority +/// level, etc. It may be called synchronously from any `Executor` method call as well. +/// You must deal with this correctly. +/// +/// In particular, you must NOT call `poll` directly from `signal_fn`, as this violates +/// the requirement for `poll` to not be called reentrantly. +#[repr(transparent)] +pub struct Executor { + pub(crate) inner: SyncExecutor, + + _not_sync: PhantomData<*mut ()>, +} + +impl Executor { + pub(crate) unsafe fn wrap(inner: &SyncExecutor) -> &Self { + mem::transmute(inner) + } + /// Create a new executor. + /// + /// When the executor has work to do, it will call `signal_fn` with + /// `signal_ctx` as argument. + /// + /// See [`Executor`] docs for details on `signal_fn`. + pub fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self { + Self { + inner: SyncExecutor::new(signal_fn, signal_ctx), + _not_sync: PhantomData, + } + } + + /// Spawn a task in this executor. + /// + /// # Safety + /// + /// `task` must be a valid pointer to an initialized but not-already-spawned task. + /// + /// It is OK to use `unsafe` to call this from a thread that's not the executor thread. + /// In this case, the task's Future must be Send. This is because this is effectively + /// sending the task to the executor thread. + pub(super) unsafe fn spawn(&'static self, task: TaskRef) { + self.inner.spawn(task) + } + + /// Poll all queued tasks in this executor. + /// + /// This loops over all tasks that are queued to be polled (i.e. they're + /// freshly spawned or they've been woken). Other tasks are not polled. + /// + /// You must call `poll` after receiving a call to `signal_fn`. It is OK + /// to call `poll` even when not requested by `signal_fn`, but it wastes + /// energy. + /// + /// # Safety + /// + /// You must NOT call `poll` reentrantly on the same executor. + /// + /// In particular, note that `poll` may call `signal_fn` synchronously. Therefore, you + /// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to + /// somehow schedule for `poll()` to be called later, at a time you know for sure there's + /// no `poll()` already running. + pub unsafe fn poll(&'static self) { + self.inner.poll() + } /// Get a spawner that spawns tasks in this executor. /// @@ -483,8 +549,10 @@ impl embassy_time::queue::TimerQueue for TimerQueue { fn schedule_wake(&'static self, at: Instant, waker: &core::task::Waker) { let task = waker::task_from_waker(waker); let task = task.header(); - let expires_at = task.expires_at.get(); - task.expires_at.set(expires_at.min(at)); + unsafe { + let expires_at = task.expires_at.get(); + task.expires_at.set(expires_at.min(at)); + } } } diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs index 57d6d3cd..dc71c95b 100644 --- a/embassy-executor/src/raw/timer_queue.rs +++ b/embassy-executor/src/raw/timer_queue.rs @@ -1,28 +1,32 @@ -use core::cell::Cell; use core::cmp::min; use atomic_polyfill::Ordering; use embassy_time::Instant; use super::{TaskRef, STATE_TIMER_QUEUED}; +use crate::raw::util::SyncUnsafeCell; pub(crate) struct TimerQueueItem { - next: Cell>, + next: SyncUnsafeCell>, } impl TimerQueueItem { pub const fn new() -> Self { - Self { next: Cell::new(None) } + Self { + next: SyncUnsafeCell::new(None), + } } } pub(crate) struct TimerQueue { - head: Cell>, + head: SyncUnsafeCell>, } impl TimerQueue { pub const fn new() -> Self { - Self { head: Cell::new(None) } + Self { + head: SyncUnsafeCell::new(None), + } } pub(crate) unsafe fn update(&self, p: TaskRef) { diff --git a/embassy-executor/src/raw/util.rs b/embassy-executor/src/raw/util.rs index 2b1f6b6f..e2e8f4df 100644 --- a/embassy-executor/src/raw/util.rs +++ b/embassy-executor/src/raw/util.rs @@ -25,3 +25,32 @@ impl UninitCell { ptr::drop_in_place(self.as_mut_ptr()) } } + +unsafe impl Sync for UninitCell {} + +#[repr(transparent)] +pub struct SyncUnsafeCell { + value: UnsafeCell, +} + +unsafe impl Sync for SyncUnsafeCell {} + +impl SyncUnsafeCell { + #[inline] + pub const fn new(value: T) -> Self { + Self { + value: UnsafeCell::new(value), + } + } + + pub unsafe fn set(&self, value: T) { + *self.value.get() = value; + } + + pub unsafe fn get(&self) -> T + where + T: Copy, + { + *self.value.get() + } +} diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs index 7c0a0183..2b622404 100644 --- a/embassy-executor/src/spawner.rs +++ b/embassy-executor/src/spawner.rs @@ -92,6 +92,7 @@ impl Spawner { poll_fn(|cx| { let task = raw::task_from_waker(cx.waker()); let executor = unsafe { task.header().executor.get().unwrap_unchecked() }; + let executor = unsafe { raw::Executor::wrap(executor) }; Poll::Ready(Self::new(executor)) }) .await @@ -130,9 +131,7 @@ impl Spawner { /// spawner to other threads, but the spawner loses the ability to spawn /// non-Send tasks. pub fn make_send(&self) -> SendSpawner { - SendSpawner { - executor: self.executor, - } + SendSpawner::new(&self.executor.inner) } } @@ -145,14 +144,11 @@ impl Spawner { /// If you want to spawn non-Send tasks, use [Spawner]. #[derive(Copy, Clone)] pub struct SendSpawner { - executor: &'static raw::Executor, + executor: &'static raw::SyncExecutor, } -unsafe impl Send for SendSpawner {} -unsafe impl Sync for SendSpawner {} - impl SendSpawner { - pub(crate) fn new(executor: &'static raw::Executor) -> Self { + pub(crate) fn new(executor: &'static raw::SyncExecutor) -> Self { Self { executor } } From 805bca1f5aab8f95bf37007eb9be9016bc0dd8c1 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Mon, 27 Mar 2023 00:20:24 +0200 Subject: [PATCH 2/3] executor: deduplicate doc comments. --- embassy-executor/src/raw/mod.rs | 25 +------------------------ 1 file changed, 1 insertion(+), 24 deletions(-) diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 938492c2..0120334b 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -337,15 +337,6 @@ impl SyncExecutor { } } - /// Spawn a task in this executor. - /// - /// # Safety - /// - /// `task` must be a valid pointer to an initialized but not-already-spawned task. - /// - /// It is OK to use `unsafe` to call this from a thread that's not the executor thread. - /// In this case, the task's Future must be Send. This is because this is effectively - /// sending the task to the executor thread. pub(super) unsafe fn spawn(&'static self, task: TaskRef) { task.header().executor.set(Some(self)); @@ -357,23 +348,9 @@ impl SyncExecutor { }) } - /// Poll all queued tasks in this executor. - /// - /// This loops over all tasks that are queued to be polled (i.e. they're - /// freshly spawned or they've been woken). Other tasks are not polled. - /// - /// You must call `poll` after receiving a call to `signal_fn`. It is OK - /// to call `poll` even when not requested by `signal_fn`, but it wastes - /// energy. - /// /// # Safety /// - /// You must NOT call `poll` reentrantly on the same executor. - /// - /// In particular, note that `poll` may call `signal_fn` synchronously. Therefore, you - /// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to - /// somehow schedule for `poll()` to be called later, at a time you know for sure there's - /// no `poll()` already running. + /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created. pub(crate) unsafe fn poll(&'static self) { #[allow(clippy::never_loop)] loop { From 21400da073d7173e4c2445cbbcd2cd430f120ad1 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Mon, 27 Mar 2023 00:22:00 +0200 Subject: [PATCH 3/3] executor: Use AtomicPtr for signal_ctx, removes 1 unsafe. --- embassy-executor/src/raw/mod.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 0120334b..15ff18fc 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -18,6 +18,7 @@ use core::marker::PhantomData; use core::mem; use core::pin::Pin; use core::ptr::NonNull; +use core::sync::atomic::AtomicPtr; use core::task::{Context, Poll}; use atomic_polyfill::{AtomicU32, Ordering}; @@ -288,13 +289,10 @@ impl TaskPool { } } -struct SignalCtx(*mut ()); -unsafe impl Sync for SignalCtx {} - pub(crate) struct SyncExecutor { run_queue: RunQueue, signal_fn: fn(*mut ()), - signal_ctx: SignalCtx, + signal_ctx: AtomicPtr<()>, #[cfg(feature = "integrated-timers")] pub(crate) timer_queue: timer_queue::TimerQueue, @@ -312,7 +310,7 @@ impl SyncExecutor { Self { run_queue: RunQueue::new(), signal_fn, - signal_ctx: SignalCtx(signal_ctx), + signal_ctx: AtomicPtr::new(signal_ctx), #[cfg(feature = "integrated-timers")] timer_queue: timer_queue::TimerQueue::new(), @@ -333,7 +331,7 @@ impl SyncExecutor { trace::task_ready_begin(task.as_ptr() as u32); if self.run_queue.enqueue(cs, task) { - (self.signal_fn)(self.signal_ctx.0) + (self.signal_fn)(self.signal_ctx.load(Ordering::Relaxed)) } }