Merge #1332
1332: executor: Replace unsound critical sections with atomics r=Dirbaio a=GrantM11235 I couldn't figure out the correct orderings, so I just left them as SeqCst for now. Co-authored-by: Grant Miller <GrantM11235@gmail.com>
This commit is contained in:
		@@ -22,7 +22,6 @@ use core::ptr::NonNull;
 | 
			
		||||
use core::task::{Context, Poll};
 | 
			
		||||
 | 
			
		||||
use atomic_polyfill::{AtomicU32, Ordering};
 | 
			
		||||
use critical_section::CriticalSection;
 | 
			
		||||
#[cfg(feature = "integrated-timers")]
 | 
			
		||||
use embassy_time::driver::{self, AlarmHandle};
 | 
			
		||||
#[cfg(feature = "integrated-timers")]
 | 
			
		||||
@@ -373,11 +372,11 @@ impl SyncExecutor {
 | 
			
		||||
    /// - `task` must be set up to run in this executor.
 | 
			
		||||
    /// - `task` must NOT be already enqueued (in this executor or another one).
 | 
			
		||||
    #[inline(always)]
 | 
			
		||||
    unsafe fn enqueue(&self, cs: CriticalSection, task: TaskRef) {
 | 
			
		||||
    unsafe fn enqueue(&self, task: TaskRef) {
 | 
			
		||||
        #[cfg(feature = "rtos-trace")]
 | 
			
		||||
        trace::task_ready_begin(task.as_ptr() as u32);
 | 
			
		||||
 | 
			
		||||
        if self.run_queue.enqueue(cs, task) {
 | 
			
		||||
        if self.run_queue.enqueue(task) {
 | 
			
		||||
            self.pender.pend();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
@@ -394,9 +393,7 @@ impl SyncExecutor {
 | 
			
		||||
        #[cfg(feature = "rtos-trace")]
 | 
			
		||||
        trace::task_new(task.as_ptr() as u32);
 | 
			
		||||
 | 
			
		||||
        critical_section::with(|cs| {
 | 
			
		||||
            self.enqueue(cs, task);
 | 
			
		||||
        })
 | 
			
		||||
        self.enqueue(task);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// # Safety
 | 
			
		||||
@@ -552,24 +549,25 @@ impl Executor {
 | 
			
		||||
///
 | 
			
		||||
/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
 | 
			
		||||
pub fn wake_task(task: TaskRef) {
 | 
			
		||||
    critical_section::with(|cs| {
 | 
			
		||||
        let header = task.header();
 | 
			
		||||
        let state = header.state.load(Ordering::Relaxed);
 | 
			
		||||
    let header = task.header();
 | 
			
		||||
 | 
			
		||||
    let res = header.state.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
 | 
			
		||||
        // If already scheduled, or if not started,
 | 
			
		||||
        if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
 | 
			
		||||
            return;
 | 
			
		||||
            None
 | 
			
		||||
        } else {
 | 
			
		||||
            // Mark it as scheduled
 | 
			
		||||
            Some(state | STATE_RUN_QUEUED)
 | 
			
		||||
        }
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
        // Mark it as scheduled
 | 
			
		||||
        header.state.store(state | STATE_RUN_QUEUED, Ordering::Relaxed);
 | 
			
		||||
 | 
			
		||||
    if res.is_ok() {
 | 
			
		||||
        // We have just marked the task as scheduled, so enqueue it.
 | 
			
		||||
        unsafe {
 | 
			
		||||
            let executor = header.executor.get().unwrap_unchecked();
 | 
			
		||||
            executor.enqueue(cs, task);
 | 
			
		||||
            executor.enqueue(task);
 | 
			
		||||
        }
 | 
			
		||||
    })
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[cfg(feature = "integrated-timers")]
 | 
			
		||||
 
 | 
			
		||||
@@ -2,7 +2,6 @@ use core::ptr;
 | 
			
		||||
use core::ptr::NonNull;
 | 
			
		||||
 | 
			
		||||
use atomic_polyfill::{AtomicPtr, Ordering};
 | 
			
		||||
use critical_section::CriticalSection;
 | 
			
		||||
 | 
			
		||||
use super::{TaskHeader, TaskRef};
 | 
			
		||||
 | 
			
		||||
@@ -46,11 +45,18 @@ impl RunQueue {
 | 
			
		||||
    ///
 | 
			
		||||
    /// `item` must NOT be already enqueued in any queue.
 | 
			
		||||
    #[inline(always)]
 | 
			
		||||
    pub(crate) unsafe fn enqueue(&self, _cs: CriticalSection, task: TaskRef) -> bool {
 | 
			
		||||
        let prev = self.head.load(Ordering::Relaxed);
 | 
			
		||||
        task.header().run_queue_item.next.store(prev, Ordering::Relaxed);
 | 
			
		||||
        self.head.store(task.as_ptr() as _, Ordering::Relaxed);
 | 
			
		||||
        prev.is_null()
 | 
			
		||||
    pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool {
 | 
			
		||||
        let mut was_empty = false;
 | 
			
		||||
 | 
			
		||||
        self.head
 | 
			
		||||
            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev| {
 | 
			
		||||
                was_empty = prev.is_null();
 | 
			
		||||
                task.header().run_queue_item.next.store(prev, Ordering::Relaxed);
 | 
			
		||||
                Some(task.as_ptr() as *mut _)
 | 
			
		||||
            })
 | 
			
		||||
            .ok();
 | 
			
		||||
 | 
			
		||||
        was_empty
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Empty the queue, then call `on_task` for each task that was in the queue.
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user