diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 183c5e6a..42bd8226 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -15,10 +15,10 @@ mod waker; use core::cell::Cell; use core::future::Future; +use core::mem; use core::pin::Pin; use core::ptr::NonNull; use core::task::{Context, Poll}; -use core::{mem, ptr}; use atomic_polyfill::{AtomicU32, Ordering}; use critical_section::CriticalSection; @@ -46,8 +46,8 @@ pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; pub(crate) struct TaskHeader { pub(crate) state: AtomicU32, pub(crate) run_queue_item: RunQueueItem, - pub(crate) executor: Cell<*const Executor>, // Valid if state != 0 - pub(crate) poll_fn: UninitCell, // Valid if STATE_SPAWNED + pub(crate) executor: Cell>, + poll_fn: Cell>, #[cfg(feature = "integrated-timers")] pub(crate) expires_at: Cell, @@ -55,22 +55,6 @@ pub(crate) struct TaskHeader { pub(crate) timer_queue_item: timer_queue::TimerQueueItem, } -impl TaskHeader { - const fn new() -> Self { - Self { - state: AtomicU32::new(0), - run_queue_item: RunQueueItem::new(), - executor: Cell::new(ptr::null()), - poll_fn: UninitCell::uninit(), - - #[cfg(feature = "integrated-timers")] - expires_at: Cell::new(Instant::from_ticks(0)), - #[cfg(feature = "integrated-timers")] - timer_queue_item: timer_queue::TimerQueueItem::new(), - } - } -} - /// This is essentially a `&'static TaskStorage` where the type of the future has been erased. #[derive(Clone, Copy)] pub struct TaskRef { @@ -128,7 +112,18 @@ impl TaskStorage { /// Create a new TaskStorage, in not-spawned state. pub const fn new() -> Self { Self { - raw: TaskHeader::new(), + raw: TaskHeader { + state: AtomicU32::new(0), + run_queue_item: RunQueueItem::new(), + executor: Cell::new(None), + // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` + poll_fn: Cell::new(None), + + #[cfg(feature = "integrated-timers")] + expires_at: Cell::new(Instant::from_ticks(0)), + #[cfg(feature = "integrated-timers")] + timer_queue_item: timer_queue::TimerQueueItem::new(), + }, future: UninitCell::uninit(), } } @@ -147,26 +142,14 @@ impl TaskStorage { /// Once the task has finished running, you may spawn it again. It is allowed to spawn it /// on a different executor. pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken { - if self.spawn_mark_used() { - return unsafe { SpawnToken::::new(self.spawn_initialize(future)) }; + let task = AvailableTask::claim(self); + match task { + Some(task) => { + let task = task.initialize(future); + unsafe { SpawnToken::::new(task) } + } + None => SpawnToken::new_failed(), } - - SpawnToken::::new_failed() - } - - fn spawn_mark_used(&'static self) -> bool { - let state = STATE_SPAWNED | STATE_RUN_QUEUED; - self.raw - .state - .compare_exchange(0, state, Ordering::AcqRel, Ordering::Acquire) - .is_ok() - } - - unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> TaskRef { - // Initialize the task - self.raw.poll_fn.write(Self::poll); - self.future.write(future()); - TaskRef::new(self) } unsafe fn poll(p: TaskRef) { @@ -191,6 +174,28 @@ impl TaskStorage { unsafe impl Sync for TaskStorage {} +struct AvailableTask { + task: &'static TaskStorage, +} + +impl AvailableTask { + fn claim(task: &'static TaskStorage) -> Option { + task.raw + .state + .compare_exchange(0, STATE_SPAWNED | STATE_RUN_QUEUED, Ordering::AcqRel, Ordering::Acquire) + .ok() + .map(|_| Self { task }) + } + + fn initialize(self, future: impl FnOnce() -> F) -> TaskRef { + unsafe { + self.task.raw.poll_fn.set(Some(TaskStorage::::poll)); + self.task.future.write(future()); + } + TaskRef::new(self.task) + } +} + /// Raw storage that can hold up to N tasks of the same type. /// /// This is essentially a `[TaskStorage; N]`. @@ -214,13 +219,14 @@ impl TaskPool { /// is currently free. If none is free, a "poisoned" SpawnToken is returned, /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error. pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken { - for task in &self.pool { - if task.spawn_mark_used() { - return unsafe { SpawnToken::::new(task.spawn_initialize(future)) }; + let task = self.pool.iter().find_map(AvailableTask::claim); + match task { + Some(task) => { + let task = task.initialize(future); + unsafe { SpawnToken::::new(task) } } + None => SpawnToken::new_failed(), } - - SpawnToken::::new_failed() } /// Like spawn(), but allows the task to be send-spawned if the args are Send even if @@ -262,13 +268,14 @@ impl TaskPool { // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken`. - for task in &self.pool { - if task.spawn_mark_used() { - return SpawnToken::::new(task.spawn_initialize(future)); + let task = self.pool.iter().find_map(AvailableTask::claim); + match task { + Some(task) => { + let task = task.initialize(future); + unsafe { SpawnToken::::new(task) } } + None => SpawnToken::new_failed(), } - - SpawnToken::::new_failed() } } @@ -353,7 +360,7 @@ impl Executor { /// In this case, the task's Future must be Send. This is because this is effectively /// sending the task to the executor thread. pub(super) unsafe fn spawn(&'static self, task: TaskRef) { - task.header().executor.set(self); + task.header().executor.set(Some(self)); #[cfg(feature = "rtos-trace")] trace::task_new(task.as_ptr() as u32); @@ -405,7 +412,7 @@ impl Executor { trace::task_exec_begin(p.as_ptr() as u32); // Run the task - task.poll_fn.read()(p); + task.poll_fn.get().unwrap_unchecked()(p); #[cfg(feature = "rtos-trace")] trace::task_exec_end(); @@ -462,7 +469,7 @@ pub fn wake_task(task: TaskRef) { // We have just marked the task as scheduled, so enqueue it. unsafe { - let executor = &*header.executor.get(); + let executor = header.executor.get().unwrap_unchecked(); executor.enqueue(cs, task); } }) diff --git a/embassy-executor/src/raw/util.rs b/embassy-executor/src/raw/util.rs index ed582218..2b1f6b6f 100644 --- a/embassy-executor/src/raw/util.rs +++ b/embassy-executor/src/raw/util.rs @@ -25,9 +25,3 @@ impl UninitCell { ptr::drop_in_place(self.as_mut_ptr()) } } - -impl UninitCell { - pub unsafe fn read(&self) -> T { - ptr::read(self.as_mut_ptr()) - } -} diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs index 650ea06c..7c0a0183 100644 --- a/embassy-executor/src/spawner.rs +++ b/embassy-executor/src/spawner.rs @@ -89,10 +89,10 @@ impl Spawner { /// /// Panics if the current executor is not an Embassy executor. pub async fn for_current_executor() -> Self { - poll_fn(|cx| unsafe { + poll_fn(|cx| { let task = raw::task_from_waker(cx.waker()); - let executor = task.header().executor.get(); - Poll::Ready(Self::new(&*executor)) + let executor = unsafe { task.header().executor.get().unwrap_unchecked() }; + Poll::Ready(Self::new(executor)) }) .await } @@ -165,10 +165,10 @@ impl SendSpawner { /// /// Panics if the current executor is not an Embassy executor. pub async fn for_current_executor() -> Self { - poll_fn(|cx| unsafe { + poll_fn(|cx| { let task = raw::task_from_waker(cx.waker()); - let executor = task.header().executor.get(); - Poll::Ready(Self::new(&*executor)) + let executor = unsafe { task.header().executor.get().unwrap_unchecked() }; + Poll::Ready(Self::new(executor)) }) .await }