executor: Replace NonNull<TaskHeader>
with TaskRef
This commit is contained in:
@ -43,14 +43,11 @@ pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1;
|
||||
pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
|
||||
|
||||
/// Raw task header for use in task pointers.
|
||||
///
|
||||
/// This is an opaque struct, used for raw pointers to tasks, for use
|
||||
/// with funtions like [`wake_task`] and [`task_from_waker`].
|
||||
pub struct TaskHeader {
|
||||
pub(crate) struct TaskHeader {
|
||||
pub(crate) state: AtomicU32,
|
||||
pub(crate) run_queue_item: RunQueueItem,
|
||||
pub(crate) executor: Cell<*const Executor>, // Valid if state != 0
|
||||
pub(crate) poll_fn: UninitCell<unsafe fn(NonNull<TaskHeader>)>, // Valid if STATE_SPAWNED
|
||||
pub(crate) executor: Cell<*const Executor>, // Valid if state != 0
|
||||
pub(crate) poll_fn: UninitCell<unsafe fn(TaskRef)>, // Valid if STATE_SPAWNED
|
||||
|
||||
#[cfg(feature = "integrated-timers")]
|
||||
pub(crate) expires_at: Cell<Instant>,
|
||||
@ -59,7 +56,7 @@ pub struct TaskHeader {
|
||||
}
|
||||
|
||||
impl TaskHeader {
|
||||
pub(crate) const fn new() -> Self {
|
||||
const fn new() -> Self {
|
||||
Self {
|
||||
state: AtomicU32::new(0),
|
||||
run_queue_item: RunQueueItem::new(),
|
||||
@ -74,6 +71,36 @@ impl TaskHeader {
|
||||
}
|
||||
}
|
||||
|
||||
/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased.
|
||||
#[derive(Clone, Copy)]
|
||||
pub struct TaskRef {
|
||||
ptr: NonNull<TaskHeader>,
|
||||
}
|
||||
|
||||
impl TaskRef {
|
||||
fn new<F: Future + 'static>(task: &'static TaskStorage<F>) -> Self {
|
||||
Self {
|
||||
ptr: NonNull::from(task).cast(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Safety: The pointer must have been obtained with `Task::as_ptr`
|
||||
pub(crate) unsafe fn from_ptr(ptr: *const TaskHeader) -> Self {
|
||||
Self {
|
||||
ptr: NonNull::new_unchecked(ptr as *mut TaskHeader),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn header(self) -> &'static TaskHeader {
|
||||
unsafe { self.ptr.as_ref() }
|
||||
}
|
||||
|
||||
/// The returned pointer is valid for the entire TaskStorage.
|
||||
pub(crate) fn as_ptr(self) -> *const TaskHeader {
|
||||
self.ptr.as_ptr()
|
||||
}
|
||||
}
|
||||
|
||||
/// Raw storage in which a task can be spawned.
|
||||
///
|
||||
/// This struct holds the necessary memory to spawn one task whose future is `F`.
|
||||
@ -135,14 +162,14 @@ impl<F: Future + 'static> TaskStorage<F> {
|
||||
.is_ok()
|
||||
}
|
||||
|
||||
unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> NonNull<TaskHeader> {
|
||||
unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> TaskRef {
|
||||
// Initialize the task
|
||||
self.raw.poll_fn.write(Self::poll);
|
||||
self.future.write(future());
|
||||
NonNull::new_unchecked(self as *const TaskStorage<F> as *const TaskHeader as *mut TaskHeader)
|
||||
TaskRef::new(self)
|
||||
}
|
||||
|
||||
unsafe fn poll(p: NonNull<TaskHeader>) {
|
||||
unsafe fn poll(p: TaskRef) {
|
||||
let this = &*(p.as_ptr() as *const TaskStorage<F>);
|
||||
|
||||
let future = Pin::new_unchecked(this.future.as_mut());
|
||||
@ -307,7 +334,7 @@ impl Executor {
|
||||
/// - `task` must be set up to run in this executor.
|
||||
/// - `task` must NOT be already enqueued (in this executor or another one).
|
||||
#[inline(always)]
|
||||
unsafe fn enqueue(&self, cs: CriticalSection, task: NonNull<TaskHeader>) {
|
||||
unsafe fn enqueue(&self, cs: CriticalSection, task: TaskRef) {
|
||||
#[cfg(feature = "rtos-trace")]
|
||||
trace::task_ready_begin(task.as_ptr() as u32);
|
||||
|
||||
@ -325,8 +352,8 @@ impl Executor {
|
||||
/// It is OK to use `unsafe` to call this from a thread that's not the executor thread.
|
||||
/// In this case, the task's Future must be Send. This is because this is effectively
|
||||
/// sending the task to the executor thread.
|
||||
pub(super) unsafe fn spawn(&'static self, task: NonNull<TaskHeader>) {
|
||||
task.as_ref().executor.set(self);
|
||||
pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
|
||||
task.header().executor.set(self);
|
||||
|
||||
#[cfg(feature = "rtos-trace")]
|
||||
trace::task_new(task.as_ptr() as u32);
|
||||
@ -359,7 +386,7 @@ impl Executor {
|
||||
self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task));
|
||||
|
||||
self.run_queue.dequeue_all(|p| {
|
||||
let task = p.as_ref();
|
||||
let task = p.header();
|
||||
|
||||
#[cfg(feature = "integrated-timers")]
|
||||
task.expires_at.set(Instant::MAX);
|
||||
@ -378,7 +405,7 @@ impl Executor {
|
||||
trace::task_exec_begin(p.as_ptr() as u32);
|
||||
|
||||
// Run the task
|
||||
task.poll_fn.read()(p as _);
|
||||
task.poll_fn.read()(p);
|
||||
|
||||
#[cfg(feature = "rtos-trace")]
|
||||
trace::task_exec_end();
|
||||
@ -424,9 +451,9 @@ impl Executor {
|
||||
/// # Safety
|
||||
///
|
||||
/// `task` must be a valid task pointer obtained from [`task_from_waker`].
|
||||
pub unsafe fn wake_task(task: NonNull<TaskHeader>) {
|
||||
pub unsafe fn wake_task(task: TaskRef) {
|
||||
critical_section::with(|cs| {
|
||||
let header = task.as_ref();
|
||||
let header = task.header();
|
||||
let state = header.state.load(Ordering::Relaxed);
|
||||
|
||||
// If already scheduled, or if not started,
|
||||
@ -450,7 +477,7 @@ struct TimerQueue;
|
||||
impl embassy_time::queue::TimerQueue for TimerQueue {
|
||||
fn schedule_wake(&'static self, at: Instant, waker: &core::task::Waker) {
|
||||
let task = waker::task_from_waker(waker);
|
||||
let task = unsafe { task.as_ref() };
|
||||
let task = task.header();
|
||||
let expires_at = task.expires_at.get();
|
||||
task.expires_at.set(expires_at.min(at));
|
||||
}
|
||||
|
Reference in New Issue
Block a user