From 39e5677621186b6ba440920d58c3ade5144b8ead Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Fri, 15 Oct 2021 23:38:44 +0200 Subject: [PATCH 1/2] executor: Use critical sections instead of atomic CAS loops --- embassy/src/executor/raw/mod.rs | 38 ++++++++++++--------------- embassy/src/executor/raw/run_queue.rs | 18 ++++--------- 2 files changed, 22 insertions(+), 34 deletions(-) diff --git a/embassy/src/executor/raw/mod.rs b/embassy/src/executor/raw/mod.rs index 08de7773..bc20db33 100644 --- a/embassy/src/executor/raw/mod.rs +++ b/embassy/src/executor/raw/mod.rs @@ -20,6 +20,7 @@ use core::pin::Pin; use core::ptr::NonNull; use core::task::{Context, Poll}; use core::{mem, ptr}; +use critical_section::CriticalSection; use self::run_queue::{RunQueue, RunQueueItem}; use self::util::UninitCell; @@ -71,30 +72,22 @@ impl TaskHeader { } pub(crate) unsafe fn enqueue(&self) { - let mut current = self.state.load(Ordering::Acquire); - loop { + critical_section::with(|cs| { + let state = self.state.load(Ordering::Relaxed); + // If already scheduled, or if not started, - if (current & STATE_RUN_QUEUED != 0) || (current & STATE_SPAWNED == 0) { + if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { return; } // Mark it as scheduled - let new = current | STATE_RUN_QUEUED; + self.state + .store(state | STATE_RUN_QUEUED, Ordering::Relaxed); - match self.state.compare_exchange_weak( - current, - new, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => break, - Err(next_current) => current = next_current, - } - } - - // We have just marked the task as scheduled, so enqueue it. - let executor = &*self.executor.get(); - executor.enqueue(self as *const TaskHeader as *mut TaskHeader); + // We have just marked the task as scheduled, so enqueue it. + let executor = &*self.executor.get(); + executor.enqueue(cs, self as *const TaskHeader as *mut TaskHeader); + }) } } @@ -264,8 +257,8 @@ impl Executor { /// - `task` must be a valid pointer to a spawned task. /// - `task` must be set up to run in this executor. /// - `task` must NOT be already enqueued (in this executor or another one). - unsafe fn enqueue(&self, task: *mut TaskHeader) { - if self.run_queue.enqueue(task) { + unsafe fn enqueue(&self, cs: CriticalSection, task: *mut TaskHeader) { + if self.run_queue.enqueue(cs, task) { (self.signal_fn)(self.signal_ctx) } } @@ -282,7 +275,10 @@ impl Executor { pub(super) unsafe fn spawn(&'static self, task: NonNull) { let task = task.as_ref(); task.executor.set(self); - self.enqueue(task as *const _ as _); + + critical_section::with(|cs| { + self.enqueue(cs, task as *const _ as _); + }) } /// Poll all queued tasks in this executor. diff --git a/embassy/src/executor/raw/run_queue.rs b/embassy/src/executor/raw/run_queue.rs index 24624e1b..3d68d659 100644 --- a/embassy/src/executor/raw/run_queue.rs +++ b/embassy/src/executor/raw/run_queue.rs @@ -1,6 +1,7 @@ use atomic_polyfill::{AtomicPtr, Ordering}; use core::ptr; use core::ptr::NonNull; +use critical_section::CriticalSection; use super::TaskHeader; @@ -43,19 +44,10 @@ impl RunQueue { /// # Safety /// /// `item` must NOT be already enqueued in any queue. - pub(crate) unsafe fn enqueue(&self, task: *mut TaskHeader) -> bool { - let mut prev = self.head.load(Ordering::Acquire); - loop { - (*task).run_queue_item.next.store(prev, Ordering::Relaxed); - match self - .head - .compare_exchange_weak(prev, task, Ordering::AcqRel, Ordering::Acquire) - { - Ok(_) => break, - Err(next_prev) => prev = next_prev, - } - } - + pub(crate) unsafe fn enqueue(&self, _cs: CriticalSection, task: *mut TaskHeader) -> bool { + let prev = self.head.load(Ordering::Relaxed); + (*task).run_queue_item.next.store(prev, Ordering::Relaxed); + self.head.store(task, Ordering::Relaxed); prev.is_null() } From d32477f5a19496911a2a95b002bb7cddf5b9e605 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Sat, 16 Oct 2021 01:49:30 +0200 Subject: [PATCH 2/2] executor: Inline enqueue fns --- embassy/src/executor/raw/mod.rs | 1 + embassy/src/executor/raw/run_queue.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/embassy/src/executor/raw/mod.rs b/embassy/src/executor/raw/mod.rs index bc20db33..5ee38715 100644 --- a/embassy/src/executor/raw/mod.rs +++ b/embassy/src/executor/raw/mod.rs @@ -257,6 +257,7 @@ impl Executor { /// - `task` must be a valid pointer to a spawned task. /// - `task` must be set up to run in this executor. /// - `task` must NOT be already enqueued (in this executor or another one). + #[inline(always)] unsafe fn enqueue(&self, cs: CriticalSection, task: *mut TaskHeader) { if self.run_queue.enqueue(cs, task) { (self.signal_fn)(self.signal_ctx) diff --git a/embassy/src/executor/raw/run_queue.rs b/embassy/src/executor/raw/run_queue.rs index 3d68d659..7aa2079f 100644 --- a/embassy/src/executor/raw/run_queue.rs +++ b/embassy/src/executor/raw/run_queue.rs @@ -44,6 +44,7 @@ impl RunQueue { /// # Safety /// /// `item` must NOT be already enqueued in any queue. + #[inline(always)] pub(crate) unsafe fn enqueue(&self, _cs: CriticalSection, task: *mut TaskHeader) -> bool { let prev = self.head.load(Ordering::Relaxed); (*task).run_queue_item.next.store(prev, Ordering::Relaxed);