Executor: Replace unnecessary atomics in runqueue
This commit is contained in:
parent
5a03b2e9e8
commit
6a6c673c5f
@ -4,15 +4,16 @@ use core::ptr::NonNull;
|
|||||||
use atomic_polyfill::{AtomicPtr, Ordering};
|
use atomic_polyfill::{AtomicPtr, Ordering};
|
||||||
|
|
||||||
use super::{TaskHeader, TaskRef};
|
use super::{TaskHeader, TaskRef};
|
||||||
|
use crate::raw::util::SyncUnsafeCell;
|
||||||
|
|
||||||
pub(crate) struct RunQueueItem {
|
pub(crate) struct RunQueueItem {
|
||||||
next: AtomicPtr<TaskHeader>,
|
next: SyncUnsafeCell<Option<TaskRef>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RunQueueItem {
|
impl RunQueueItem {
|
||||||
pub const fn new() -> Self {
|
pub const fn new() -> Self {
|
||||||
Self {
|
Self {
|
||||||
next: AtomicPtr::new(ptr::null_mut()),
|
next: SyncUnsafeCell::new(None),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -51,7 +52,12 @@ impl RunQueue {
|
|||||||
self.head
|
self.head
|
||||||
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev| {
|
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev| {
|
||||||
was_empty = prev.is_null();
|
was_empty = prev.is_null();
|
||||||
task.header().run_queue_item.next.store(prev, Ordering::Relaxed);
|
unsafe {
|
||||||
|
// safety: the pointer is either null or valid
|
||||||
|
let prev = NonNull::new(prev).map(|ptr| TaskRef::from_ptr(ptr.as_ptr()));
|
||||||
|
// safety: there are no concurrent accesses to `next`
|
||||||
|
task.header().run_queue_item.next.set(prev);
|
||||||
|
}
|
||||||
Some(task.as_ptr() as *mut _)
|
Some(task.as_ptr() as *mut _)
|
||||||
})
|
})
|
||||||
.ok();
|
.ok();
|
||||||
@ -64,18 +70,19 @@ impl RunQueue {
|
|||||||
/// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
|
/// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
|
||||||
pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
|
pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
|
||||||
// Atomically empty the queue.
|
// Atomically empty the queue.
|
||||||
let mut ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel);
|
let ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel);
|
||||||
|
|
||||||
|
// safety: the pointer is either null or valid
|
||||||
|
let mut next = unsafe { NonNull::new(ptr).map(|ptr| TaskRef::from_ptr(ptr.as_ptr())) };
|
||||||
|
|
||||||
// Iterate the linked list of tasks that were previously in the queue.
|
// Iterate the linked list of tasks that were previously in the queue.
|
||||||
while let Some(task) = NonNull::new(ptr) {
|
while let Some(task) = next {
|
||||||
let task = unsafe { TaskRef::from_ptr(task.as_ptr()) };
|
|
||||||
// If the task re-enqueues itself, the `next` pointer will get overwritten.
|
// If the task re-enqueues itself, the `next` pointer will get overwritten.
|
||||||
// Therefore, first read the next pointer, and only then process the task.
|
// Therefore, first read the next pointer, and only then process the task.
|
||||||
let next = task.header().run_queue_item.next.load(Ordering::Relaxed);
|
// safety: there are no concurrent accesses to `next`
|
||||||
|
next = unsafe { task.header().run_queue_item.next.get() };
|
||||||
|
|
||||||
on_task(task);
|
on_task(task);
|
||||||
|
|
||||||
ptr = next
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user