Merge #1291
1291: executor: Allow TaskStorage to auto-implement `Sync` r=Dirbaio a=GrantM11235 Co-authored-by: Grant Miller <GrantM11235@gmail.com> Co-authored-by: Dario Nieuwenhuis <dirbaio@dirbaio.net>
This commit is contained in:
commit
8a3a7c65a8
@ -13,11 +13,12 @@ mod timer_queue;
|
||||
pub(crate) mod util;
|
||||
mod waker;
|
||||
|
||||
use core::cell::Cell;
|
||||
use core::future::Future;
|
||||
use core::marker::PhantomData;
|
||||
use core::mem;
|
||||
use core::pin::Pin;
|
||||
use core::ptr::NonNull;
|
||||
use core::sync::atomic::AtomicPtr;
|
||||
use core::task::{Context, Poll};
|
||||
|
||||
use atomic_polyfill::{AtomicU32, Ordering};
|
||||
@ -30,7 +31,7 @@ use embassy_time::Instant;
|
||||
use rtos_trace::trace;
|
||||
|
||||
use self::run_queue::{RunQueue, RunQueueItem};
|
||||
use self::util::UninitCell;
|
||||
use self::util::{SyncUnsafeCell, UninitCell};
|
||||
pub use self::waker::task_from_waker;
|
||||
use super::SpawnToken;
|
||||
|
||||
@ -46,11 +47,11 @@ pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
|
||||
pub(crate) struct TaskHeader {
|
||||
pub(crate) state: AtomicU32,
|
||||
pub(crate) run_queue_item: RunQueueItem,
|
||||
pub(crate) executor: Cell<Option<&'static Executor>>,
|
||||
poll_fn: Cell<Option<unsafe fn(TaskRef)>>,
|
||||
pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>,
|
||||
poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
|
||||
|
||||
#[cfg(feature = "integrated-timers")]
|
||||
pub(crate) expires_at: Cell<Instant>,
|
||||
pub(crate) expires_at: SyncUnsafeCell<Instant>,
|
||||
#[cfg(feature = "integrated-timers")]
|
||||
pub(crate) timer_queue_item: timer_queue::TimerQueueItem,
|
||||
}
|
||||
@ -61,6 +62,9 @@ pub struct TaskRef {
|
||||
ptr: NonNull<TaskHeader>,
|
||||
}
|
||||
|
||||
unsafe impl Send for TaskRef where &'static TaskHeader: Send {}
|
||||
unsafe impl Sync for TaskRef where &'static TaskHeader: Sync {}
|
||||
|
||||
impl TaskRef {
|
||||
fn new<F: Future + 'static>(task: &'static TaskStorage<F>) -> Self {
|
||||
Self {
|
||||
@ -115,12 +119,12 @@ impl<F: Future + 'static> TaskStorage<F> {
|
||||
raw: TaskHeader {
|
||||
state: AtomicU32::new(0),
|
||||
run_queue_item: RunQueueItem::new(),
|
||||
executor: Cell::new(None),
|
||||
executor: SyncUnsafeCell::new(None),
|
||||
// Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
|
||||
poll_fn: Cell::new(None),
|
||||
poll_fn: SyncUnsafeCell::new(None),
|
||||
|
||||
#[cfg(feature = "integrated-timers")]
|
||||
expires_at: Cell::new(Instant::from_ticks(0)),
|
||||
expires_at: SyncUnsafeCell::new(Instant::from_ticks(0)),
|
||||
#[cfg(feature = "integrated-timers")]
|
||||
timer_queue_item: timer_queue::TimerQueueItem::new(),
|
||||
},
|
||||
@ -170,9 +174,15 @@ impl<F: Future + 'static> TaskStorage<F> {
|
||||
// it's a noop for our waker.
|
||||
mem::forget(waker);
|
||||
}
|
||||
}
|
||||
|
||||
unsafe impl<F: Future + 'static> Sync for TaskStorage<F> {}
|
||||
#[doc(hidden)]
|
||||
#[allow(dead_code)]
|
||||
fn _assert_sync(self) {
|
||||
fn assert_sync<T: Sync>(_: T) {}
|
||||
|
||||
assert_sync(self)
|
||||
}
|
||||
}
|
||||
|
||||
struct AvailableTask<F: Future + 'static> {
|
||||
task: &'static TaskStorage<F>,
|
||||
@ -279,29 +289,10 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Raw executor.
|
||||
///
|
||||
/// This is the core of the Embassy executor. It is low-level, requiring manual
|
||||
/// handling of wakeups and task polling. If you can, prefer using one of the
|
||||
/// [higher level executors](crate::Executor).
|
||||
///
|
||||
/// The raw executor leaves it up to you to handle wakeups and scheduling:
|
||||
///
|
||||
/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks
|
||||
/// that "want to run").
|
||||
/// - You must supply a `signal_fn`. The executor will call it to notify you it has work
|
||||
/// to do. You must arrange for `poll()` to be called as soon as possible.
|
||||
///
|
||||
/// `signal_fn` can be called from *any* context: any thread, any interrupt priority
|
||||
/// level, etc. It may be called synchronously from any `Executor` method call as well.
|
||||
/// You must deal with this correctly.
|
||||
///
|
||||
/// In particular, you must NOT call `poll` directly from `signal_fn`, as this violates
|
||||
/// the requirement for `poll` to not be called reentrantly.
|
||||
pub struct Executor {
|
||||
pub(crate) struct SyncExecutor {
|
||||
run_queue: RunQueue,
|
||||
signal_fn: fn(*mut ()),
|
||||
signal_ctx: *mut (),
|
||||
signal_ctx: AtomicPtr<()>,
|
||||
|
||||
#[cfg(feature = "integrated-timers")]
|
||||
pub(crate) timer_queue: timer_queue::TimerQueue,
|
||||
@ -309,14 +300,8 @@ pub struct Executor {
|
||||
alarm: AlarmHandle,
|
||||
}
|
||||
|
||||
impl Executor {
|
||||
/// Create a new executor.
|
||||
///
|
||||
/// When the executor has work to do, it will call `signal_fn` with
|
||||
/// `signal_ctx` as argument.
|
||||
///
|
||||
/// See [`Executor`] docs for details on `signal_fn`.
|
||||
pub fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self {
|
||||
impl SyncExecutor {
|
||||
pub(crate) fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self {
|
||||
#[cfg(feature = "integrated-timers")]
|
||||
let alarm = unsafe { unwrap!(driver::allocate_alarm()) };
|
||||
#[cfg(feature = "integrated-timers")]
|
||||
@ -325,7 +310,7 @@ impl Executor {
|
||||
Self {
|
||||
run_queue: RunQueue::new(),
|
||||
signal_fn,
|
||||
signal_ctx,
|
||||
signal_ctx: AtomicPtr::new(signal_ctx),
|
||||
|
||||
#[cfg(feature = "integrated-timers")]
|
||||
timer_queue: timer_queue::TimerQueue::new(),
|
||||
@ -346,19 +331,10 @@ impl Executor {
|
||||
trace::task_ready_begin(task.as_ptr() as u32);
|
||||
|
||||
if self.run_queue.enqueue(cs, task) {
|
||||
(self.signal_fn)(self.signal_ctx)
|
||||
(self.signal_fn)(self.signal_ctx.load(Ordering::Relaxed))
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn a task in this executor.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// `task` must be a valid pointer to an initialized but not-already-spawned task.
|
||||
///
|
||||
/// It is OK to use `unsafe` to call this from a thread that's not the executor thread.
|
||||
/// In this case, the task's Future must be Send. This is because this is effectively
|
||||
/// sending the task to the executor thread.
|
||||
pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
|
||||
task.header().executor.set(Some(self));
|
||||
|
||||
@ -370,24 +346,11 @@ impl Executor {
|
||||
})
|
||||
}
|
||||
|
||||
/// Poll all queued tasks in this executor.
|
||||
///
|
||||
/// This loops over all tasks that are queued to be polled (i.e. they're
|
||||
/// freshly spawned or they've been woken). Other tasks are not polled.
|
||||
///
|
||||
/// You must call `poll` after receiving a call to `signal_fn`. It is OK
|
||||
/// to call `poll` even when not requested by `signal_fn`, but it wastes
|
||||
/// energy.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// You must NOT call `poll` reentrantly on the same executor.
|
||||
///
|
||||
/// In particular, note that `poll` may call `signal_fn` synchronously. Therefore, you
|
||||
/// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to
|
||||
/// somehow schedule for `poll()` to be called later, at a time you know for sure there's
|
||||
/// no `poll()` already running.
|
||||
pub unsafe fn poll(&'static self) {
|
||||
/// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created.
|
||||
pub(crate) unsafe fn poll(&'static self) {
|
||||
#[allow(clippy::never_loop)]
|
||||
loop {
|
||||
#[cfg(feature = "integrated-timers")]
|
||||
self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task));
|
||||
@ -441,6 +404,84 @@ impl Executor {
|
||||
#[cfg(feature = "rtos-trace")]
|
||||
trace::system_idle();
|
||||
}
|
||||
}
|
||||
|
||||
/// Raw executor.
|
||||
///
|
||||
/// This is the core of the Embassy executor. It is low-level, requiring manual
|
||||
/// handling of wakeups and task polling. If you can, prefer using one of the
|
||||
/// [higher level executors](crate::Executor).
|
||||
///
|
||||
/// The raw executor leaves it up to you to handle wakeups and scheduling:
|
||||
///
|
||||
/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks
|
||||
/// that "want to run").
|
||||
/// - You must supply a `signal_fn`. The executor will call it to notify you it has work
|
||||
/// to do. You must arrange for `poll()` to be called as soon as possible.
|
||||
///
|
||||
/// `signal_fn` can be called from *any* context: any thread, any interrupt priority
|
||||
/// level, etc. It may be called synchronously from any `Executor` method call as well.
|
||||
/// You must deal with this correctly.
|
||||
///
|
||||
/// In particular, you must NOT call `poll` directly from `signal_fn`, as this violates
|
||||
/// the requirement for `poll` to not be called reentrantly.
|
||||
#[repr(transparent)]
|
||||
pub struct Executor {
|
||||
pub(crate) inner: SyncExecutor,
|
||||
|
||||
_not_sync: PhantomData<*mut ()>,
|
||||
}
|
||||
|
||||
impl Executor {
|
||||
pub(crate) unsafe fn wrap(inner: &SyncExecutor) -> &Self {
|
||||
mem::transmute(inner)
|
||||
}
|
||||
/// Create a new executor.
|
||||
///
|
||||
/// When the executor has work to do, it will call `signal_fn` with
|
||||
/// `signal_ctx` as argument.
|
||||
///
|
||||
/// See [`Executor`] docs for details on `signal_fn`.
|
||||
pub fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self {
|
||||
Self {
|
||||
inner: SyncExecutor::new(signal_fn, signal_ctx),
|
||||
_not_sync: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn a task in this executor.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// `task` must be a valid pointer to an initialized but not-already-spawned task.
|
||||
///
|
||||
/// It is OK to use `unsafe` to call this from a thread that's not the executor thread.
|
||||
/// In this case, the task's Future must be Send. This is because this is effectively
|
||||
/// sending the task to the executor thread.
|
||||
pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
|
||||
self.inner.spawn(task)
|
||||
}
|
||||
|
||||
/// Poll all queued tasks in this executor.
|
||||
///
|
||||
/// This loops over all tasks that are queued to be polled (i.e. they're
|
||||
/// freshly spawned or they've been woken). Other tasks are not polled.
|
||||
///
|
||||
/// You must call `poll` after receiving a call to `signal_fn`. It is OK
|
||||
/// to call `poll` even when not requested by `signal_fn`, but it wastes
|
||||
/// energy.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// You must NOT call `poll` reentrantly on the same executor.
|
||||
///
|
||||
/// In particular, note that `poll` may call `signal_fn` synchronously. Therefore, you
|
||||
/// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to
|
||||
/// somehow schedule for `poll()` to be called later, at a time you know for sure there's
|
||||
/// no `poll()` already running.
|
||||
pub unsafe fn poll(&'static self) {
|
||||
self.inner.poll()
|
||||
}
|
||||
|
||||
/// Get a spawner that spawns tasks in this executor.
|
||||
///
|
||||
@ -483,10 +524,12 @@ impl embassy_time::queue::TimerQueue for TimerQueue {
|
||||
fn schedule_wake(&'static self, at: Instant, waker: &core::task::Waker) {
|
||||
let task = waker::task_from_waker(waker);
|
||||
let task = task.header();
|
||||
unsafe {
|
||||
let expires_at = task.expires_at.get();
|
||||
task.expires_at.set(expires_at.min(at));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "integrated-timers")]
|
||||
embassy_time::timer_queue_impl!(static TIMER_QUEUE: TimerQueue = TimerQueue);
|
||||
|
@ -1,28 +1,32 @@
|
||||
use core::cell::Cell;
|
||||
use core::cmp::min;
|
||||
|
||||
use atomic_polyfill::Ordering;
|
||||
use embassy_time::Instant;
|
||||
|
||||
use super::{TaskRef, STATE_TIMER_QUEUED};
|
||||
use crate::raw::util::SyncUnsafeCell;
|
||||
|
||||
pub(crate) struct TimerQueueItem {
|
||||
next: Cell<Option<TaskRef>>,
|
||||
next: SyncUnsafeCell<Option<TaskRef>>,
|
||||
}
|
||||
|
||||
impl TimerQueueItem {
|
||||
pub const fn new() -> Self {
|
||||
Self { next: Cell::new(None) }
|
||||
Self {
|
||||
next: SyncUnsafeCell::new(None),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct TimerQueue {
|
||||
head: Cell<Option<TaskRef>>,
|
||||
head: SyncUnsafeCell<Option<TaskRef>>,
|
||||
}
|
||||
|
||||
impl TimerQueue {
|
||||
pub const fn new() -> Self {
|
||||
Self { head: Cell::new(None) }
|
||||
Self {
|
||||
head: SyncUnsafeCell::new(None),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) unsafe fn update(&self, p: TaskRef) {
|
||||
|
@ -25,3 +25,32 @@ impl<T> UninitCell<T> {
|
||||
ptr::drop_in_place(self.as_mut_ptr())
|
||||
}
|
||||
}
|
||||
|
||||
unsafe impl<T> Sync for UninitCell<T> {}
|
||||
|
||||
#[repr(transparent)]
|
||||
pub struct SyncUnsafeCell<T> {
|
||||
value: UnsafeCell<T>,
|
||||
}
|
||||
|
||||
unsafe impl<T: Sync> Sync for SyncUnsafeCell<T> {}
|
||||
|
||||
impl<T> SyncUnsafeCell<T> {
|
||||
#[inline]
|
||||
pub const fn new(value: T) -> Self {
|
||||
Self {
|
||||
value: UnsafeCell::new(value),
|
||||
}
|
||||
}
|
||||
|
||||
pub unsafe fn set(&self, value: T) {
|
||||
*self.value.get() = value;
|
||||
}
|
||||
|
||||
pub unsafe fn get(&self) -> T
|
||||
where
|
||||
T: Copy,
|
||||
{
|
||||
*self.value.get()
|
||||
}
|
||||
}
|
||||
|
@ -92,6 +92,7 @@ impl Spawner {
|
||||
poll_fn(|cx| {
|
||||
let task = raw::task_from_waker(cx.waker());
|
||||
let executor = unsafe { task.header().executor.get().unwrap_unchecked() };
|
||||
let executor = unsafe { raw::Executor::wrap(executor) };
|
||||
Poll::Ready(Self::new(executor))
|
||||
})
|
||||
.await
|
||||
@ -130,9 +131,7 @@ impl Spawner {
|
||||
/// spawner to other threads, but the spawner loses the ability to spawn
|
||||
/// non-Send tasks.
|
||||
pub fn make_send(&self) -> SendSpawner {
|
||||
SendSpawner {
|
||||
executor: self.executor,
|
||||
}
|
||||
SendSpawner::new(&self.executor.inner)
|
||||
}
|
||||
}
|
||||
|
||||
@ -145,14 +144,11 @@ impl Spawner {
|
||||
/// If you want to spawn non-Send tasks, use [Spawner].
|
||||
#[derive(Copy, Clone)]
|
||||
pub struct SendSpawner {
|
||||
executor: &'static raw::Executor,
|
||||
executor: &'static raw::SyncExecutor,
|
||||
}
|
||||
|
||||
unsafe impl Send for SendSpawner {}
|
||||
unsafe impl Sync for SendSpawner {}
|
||||
|
||||
impl SendSpawner {
|
||||
pub(crate) fn new(executor: &'static raw::Executor) -> Self {
|
||||
pub(crate) fn new(executor: &'static raw::SyncExecutor) -> Self {
|
||||
Self { executor }
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user