executor: Use critical sections instead of atomic CAS loops
This commit is contained in:
parent
05bc4d198e
commit
39e5677621
@ -20,6 +20,7 @@ use core::pin::Pin;
|
|||||||
use core::ptr::NonNull;
|
use core::ptr::NonNull;
|
||||||
use core::task::{Context, Poll};
|
use core::task::{Context, Poll};
|
||||||
use core::{mem, ptr};
|
use core::{mem, ptr};
|
||||||
|
use critical_section::CriticalSection;
|
||||||
|
|
||||||
use self::run_queue::{RunQueue, RunQueueItem};
|
use self::run_queue::{RunQueue, RunQueueItem};
|
||||||
use self::util::UninitCell;
|
use self::util::UninitCell;
|
||||||
@ -71,30 +72,22 @@ impl TaskHeader {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) unsafe fn enqueue(&self) {
|
pub(crate) unsafe fn enqueue(&self) {
|
||||||
let mut current = self.state.load(Ordering::Acquire);
|
critical_section::with(|cs| {
|
||||||
loop {
|
let state = self.state.load(Ordering::Relaxed);
|
||||||
|
|
||||||
// If already scheduled, or if not started,
|
// If already scheduled, or if not started,
|
||||||
if (current & STATE_RUN_QUEUED != 0) || (current & STATE_SPAWNED == 0) {
|
if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mark it as scheduled
|
// Mark it as scheduled
|
||||||
let new = current | STATE_RUN_QUEUED;
|
self.state
|
||||||
|
.store(state | STATE_RUN_QUEUED, Ordering::Relaxed);
|
||||||
|
|
||||||
match self.state.compare_exchange_weak(
|
// We have just marked the task as scheduled, so enqueue it.
|
||||||
current,
|
let executor = &*self.executor.get();
|
||||||
new,
|
executor.enqueue(cs, self as *const TaskHeader as *mut TaskHeader);
|
||||||
Ordering::AcqRel,
|
})
|
||||||
Ordering::Acquire,
|
|
||||||
) {
|
|
||||||
Ok(_) => break,
|
|
||||||
Err(next_current) => current = next_current,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// We have just marked the task as scheduled, so enqueue it.
|
|
||||||
let executor = &*self.executor.get();
|
|
||||||
executor.enqueue(self as *const TaskHeader as *mut TaskHeader);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -264,8 +257,8 @@ impl Executor {
|
|||||||
/// - `task` must be a valid pointer to a spawned task.
|
/// - `task` must be a valid pointer to a spawned task.
|
||||||
/// - `task` must be set up to run in this executor.
|
/// - `task` must be set up to run in this executor.
|
||||||
/// - `task` must NOT be already enqueued (in this executor or another one).
|
/// - `task` must NOT be already enqueued (in this executor or another one).
|
||||||
unsafe fn enqueue(&self, task: *mut TaskHeader) {
|
unsafe fn enqueue(&self, cs: CriticalSection, task: *mut TaskHeader) {
|
||||||
if self.run_queue.enqueue(task) {
|
if self.run_queue.enqueue(cs, task) {
|
||||||
(self.signal_fn)(self.signal_ctx)
|
(self.signal_fn)(self.signal_ctx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -282,7 +275,10 @@ impl Executor {
|
|||||||
pub(super) unsafe fn spawn(&'static self, task: NonNull<TaskHeader>) {
|
pub(super) unsafe fn spawn(&'static self, task: NonNull<TaskHeader>) {
|
||||||
let task = task.as_ref();
|
let task = task.as_ref();
|
||||||
task.executor.set(self);
|
task.executor.set(self);
|
||||||
self.enqueue(task as *const _ as _);
|
|
||||||
|
critical_section::with(|cs| {
|
||||||
|
self.enqueue(cs, task as *const _ as _);
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Poll all queued tasks in this executor.
|
/// Poll all queued tasks in this executor.
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
use atomic_polyfill::{AtomicPtr, Ordering};
|
use atomic_polyfill::{AtomicPtr, Ordering};
|
||||||
use core::ptr;
|
use core::ptr;
|
||||||
use core::ptr::NonNull;
|
use core::ptr::NonNull;
|
||||||
|
use critical_section::CriticalSection;
|
||||||
|
|
||||||
use super::TaskHeader;
|
use super::TaskHeader;
|
||||||
|
|
||||||
@ -43,19 +44,10 @@ impl RunQueue {
|
|||||||
/// # Safety
|
/// # Safety
|
||||||
///
|
///
|
||||||
/// `item` must NOT be already enqueued in any queue.
|
/// `item` must NOT be already enqueued in any queue.
|
||||||
pub(crate) unsafe fn enqueue(&self, task: *mut TaskHeader) -> bool {
|
pub(crate) unsafe fn enqueue(&self, _cs: CriticalSection, task: *mut TaskHeader) -> bool {
|
||||||
let mut prev = self.head.load(Ordering::Acquire);
|
let prev = self.head.load(Ordering::Relaxed);
|
||||||
loop {
|
(*task).run_queue_item.next.store(prev, Ordering::Relaxed);
|
||||||
(*task).run_queue_item.next.store(prev, Ordering::Relaxed);
|
self.head.store(task, Ordering::Relaxed);
|
||||||
match self
|
|
||||||
.head
|
|
||||||
.compare_exchange_weak(prev, task, Ordering::AcqRel, Ordering::Acquire)
|
|
||||||
{
|
|
||||||
Ok(_) => break,
|
|
||||||
Err(next_prev) => prev = next_prev,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
prev.is_null()
|
prev.is_null()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user