Split waker to separate file.

This commit is contained in:
Dario Nieuwenhuis 2020-12-26 17:22:36 +01:00
parent 3df66c44e3
commit 8b7a42a4f9
2 changed files with 66 additions and 64 deletions

View File

@ -8,16 +8,17 @@ use core::pin::Pin;
use core::ptr;
use core::ptr::NonNull;
use core::sync::atomic::{AtomicU32, Ordering};
use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use core::task::{Context, Poll, RawWaker, Waker};
mod run_queue;
mod util;
mod waker;
use self::run_queue::{RunQueue, RunQueueItem};
use self::util::UninitCell;
/// Task is spawned and future hasn't finished running yet.
const STATE_RUNNING: u32 = 1 << 0;
/// Task is spawned (has a future)
const STATE_SPAWNED: u32 = 1 << 0;
/// Task is in the executor run queue
const STATE_RUN_QUEUED: u32 = 1 << 1;
/// Task is in the executor timer queue
@ -27,7 +28,36 @@ pub(crate) struct TaskHeader {
state: AtomicU32,
run_queue_item: RunQueueItem,
executor: Cell<*const Executor>, // Valid if state != 0
poll_fn: UninitCell<unsafe fn(*mut TaskHeader)>, // Valid if STATE_RUNNING
poll_fn: UninitCell<unsafe fn(*mut TaskHeader)>, // Valid if STATE_SPAWNED
}
impl TaskHeader {
pub(crate) unsafe fn enqueue(&self) {
let mut current = self.state.load(Ordering::Acquire);
loop {
// If already scheduled, or if not started,
if (current & STATE_RUN_QUEUED != 0) || (current & STATE_SPAWNED == 0) {
return;
}
// Mark it as scheduled
let new = current | STATE_RUN_QUEUED;
match self.state.compare_exchange_weak(
current,
new,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => break,
Err(next_current) => current = next_current,
}
}
// We have just marked the task as scheduled, so enqueue it.
let executor = &*self.executor.get();
executor.enqueue(self as *const TaskHeader as *mut TaskHeader);
}
}
// repr(C) is needed to guarantee that header is located at offset 0
@ -35,59 +65,9 @@ pub(crate) struct TaskHeader {
#[repr(C)]
pub struct Task<F: Future + 'static> {
header: TaskHeader,
future: UninitCell<F>, // Valid if STATE_RUNNING
future: UninitCell<F>, // Valid if STATE_SPAWNED
}
#[derive(Copy, Clone, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum SpawnError {
Busy,
}
//=============
// Waker
static WAKER_VTABLE: RawWakerVTable =
RawWakerVTable::new(waker_clone, waker_wake, waker_wake, waker_drop);
unsafe fn waker_clone(p: *const ()) -> RawWaker {
RawWaker::new(p, &WAKER_VTABLE)
}
unsafe fn waker_wake(p: *const ()) {
let header = &*(p as *const TaskHeader);
let mut current = header.state.load(Ordering::Acquire);
loop {
// If already scheduled, or if not started,
if (current & STATE_RUN_QUEUED != 0) || (current & STATE_RUNNING == 0) {
return;
}
// Mark it as scheduled
let new = current | STATE_RUN_QUEUED;
match header
.state
.compare_exchange_weak(current, new, Ordering::AcqRel, Ordering::Acquire)
{
Ok(_) => break,
Err(next_current) => current = next_current,
}
}
// We have just marked the task as scheduled, so enqueue it.
let executor = &*header.executor.get();
executor.enqueue(p as *mut TaskHeader);
}
unsafe fn waker_drop(_: *const ()) {
// nop
}
//=============
// Task
impl<F: Future + 'static> Task<F> {
pub const fn new() -> Self {
Self {
@ -103,7 +83,7 @@ impl<F: Future + 'static> Task<F> {
pub unsafe fn spawn(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken {
for task in pool {
let state = STATE_RUNNING | STATE_RUN_QUEUED;
let state = STATE_SPAWNED | STATE_RUN_QUEUED;
if task
.header
.state
@ -129,14 +109,14 @@ impl<F: Future + 'static> Task<F> {
let this = &*(p as *const Task<F>);
let future = Pin::new_unchecked(this.future.as_mut());
let waker = Waker::from_raw(RawWaker::new(p as _, &WAKER_VTABLE));
let waker = waker::from_task(p);
let mut cx = Context::from_waker(&waker);
match future.poll(&mut cx) {
Poll::Ready(_) => {
this.future.drop_in_place();
this.header
.state
.fetch_and(!STATE_RUNNING, Ordering::AcqRel);
.fetch_and(!STATE_SPAWNED, Ordering::AcqRel);
}
Poll::Pending => {}
}
@ -145,9 +125,6 @@ impl<F: Future + 'static> Task<F> {
unsafe impl<F: Future + 'static> Sync for Task<F> {}
//=============
// Spawn token
#[must_use = "Calling a task function does nothing on its own. You must pass the returned SpawnToken to Executor::spawn()"]
pub struct SpawnToken {
header: Option<NonNull<TaskHeader>>,
@ -160,8 +137,11 @@ impl Drop for SpawnToken {
}
}
//=============
// Executor
#[derive(Copy, Clone, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum SpawnError {
Busy,
}
pub struct Executor {
run_queue: RunQueue,
@ -207,7 +187,7 @@ impl Executor {
let header = &*p;
let state = header.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
if state & STATE_RUNNING == 0 {
if state & STATE_SPAWNED == 0 {
// If task is not running, ignore it. This can happen in the following scenario:
// - Task gets dequeued, poll starts
// - While task is being polled, it gets woken. It gets placed in the queue.

View File

@ -0,0 +1,22 @@
use core::task::{RawWaker, RawWakerVTable, Waker};
use super::TaskHeader;
static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop);
unsafe fn clone(p: *const ()) -> RawWaker {
RawWaker::new(p, &VTABLE)
}
unsafe fn wake(p: *const ()) {
let header = &*(p as *const TaskHeader);
header.enqueue();
}
unsafe fn drop(_: *const ()) {
// nop
}
pub(crate) unsafe fn from_task(p: *mut TaskHeader) -> Waker {
Waker::from_raw(RawWaker::new(p as _, &VTABLE))
}