executor: Replace unsound critical sections with atomics

This commit is contained in:
Grant Miller 2023-04-02 14:11:31 -05:00
parent eed2b12325
commit 8290236ed6
2 changed files with 25 additions and 21 deletions

View File

@ -22,7 +22,6 @@ use core::ptr::NonNull;
use core::task::{Context, Poll}; use core::task::{Context, Poll};
use atomic_polyfill::{AtomicU32, Ordering}; use atomic_polyfill::{AtomicU32, Ordering};
use critical_section::CriticalSection;
#[cfg(feature = "integrated-timers")] #[cfg(feature = "integrated-timers")]
use embassy_time::driver::{self, AlarmHandle}; use embassy_time::driver::{self, AlarmHandle};
#[cfg(feature = "integrated-timers")] #[cfg(feature = "integrated-timers")]
@ -373,11 +372,11 @@ impl SyncExecutor {
/// - `task` must be set up to run in this executor. /// - `task` must be set up to run in this executor.
/// - `task` must NOT be already enqueued (in this executor or another one). /// - `task` must NOT be already enqueued (in this executor or another one).
#[inline(always)] #[inline(always)]
unsafe fn enqueue(&self, cs: CriticalSection, task: TaskRef) { unsafe fn enqueue(&self, task: TaskRef) {
#[cfg(feature = "rtos-trace")] #[cfg(feature = "rtos-trace")]
trace::task_ready_begin(task.as_ptr() as u32); trace::task_ready_begin(task.as_ptr() as u32);
if self.run_queue.enqueue(cs, task) { if self.run_queue.enqueue(task) {
self.pender.pend(); self.pender.pend();
} }
} }
@ -394,9 +393,7 @@ impl SyncExecutor {
#[cfg(feature = "rtos-trace")] #[cfg(feature = "rtos-trace")]
trace::task_new(task.as_ptr() as u32); trace::task_new(task.as_ptr() as u32);
critical_section::with(|cs| { self.enqueue(task);
self.enqueue(cs, task);
})
} }
/// # Safety /// # Safety
@ -552,24 +549,25 @@ impl Executor {
/// ///
/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. /// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
pub fn wake_task(task: TaskRef) { pub fn wake_task(task: TaskRef) {
critical_section::with(|cs| { let header = task.header();
let header = task.header();
let state = header.state.load(Ordering::Relaxed);
let res = header.state.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
// If already scheduled, or if not started, // If already scheduled, or if not started,
if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
return; None
} else {
// Mark it as scheduled
Some(state | STATE_RUN_QUEUED)
} }
});
// Mark it as scheduled if res.is_ok() {
header.state.store(state | STATE_RUN_QUEUED, Ordering::Relaxed);
// We have just marked the task as scheduled, so enqueue it. // We have just marked the task as scheduled, so enqueue it.
unsafe { unsafe {
let executor = header.executor.get().unwrap_unchecked(); let executor = header.executor.get().unwrap_unchecked();
executor.enqueue(cs, task); executor.enqueue(task);
} }
}) }
} }
#[cfg(feature = "integrated-timers")] #[cfg(feature = "integrated-timers")]

View File

@ -2,7 +2,6 @@ use core::ptr;
use core::ptr::NonNull; use core::ptr::NonNull;
use atomic_polyfill::{AtomicPtr, Ordering}; use atomic_polyfill::{AtomicPtr, Ordering};
use critical_section::CriticalSection;
use super::{TaskHeader, TaskRef}; use super::{TaskHeader, TaskRef};
@ -46,11 +45,18 @@ impl RunQueue {
/// ///
/// `item` must NOT be already enqueued in any queue. /// `item` must NOT be already enqueued in any queue.
#[inline(always)] #[inline(always)]
pub(crate) unsafe fn enqueue(&self, _cs: CriticalSection, task: TaskRef) -> bool { pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool {
let prev = self.head.load(Ordering::Relaxed); let mut was_empty = false;
task.header().run_queue_item.next.store(prev, Ordering::Relaxed);
self.head.store(task.as_ptr() as _, Ordering::Relaxed); self.head
prev.is_null() .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev| {
was_empty = prev.is_null();
task.header().run_queue_item.next.store(prev, Ordering::Relaxed);
Some(task.as_ptr() as *mut _)
})
.ok();
was_empty
} }
/// Empty the queue, then call `on_task` for each task that was in the queue. /// Empty the queue, then call `on_task` for each task that was in the queue.