diff --git a/embassy-macros/src/macros/task.rs b/embassy-macros/src/macros/task.rs index 396ce18f..e48de3d6 100644 --- a/embassy-macros/src/macros/task.rs +++ b/embassy-macros/src/macros/task.rs @@ -73,26 +73,10 @@ pub fn run(args: syn::AttributeArgs, f: syn::ItemFn) -> Result #embassy_path::executor::SpawnToken { - use ::core::future::Future; - use #embassy_path::executor::SpawnToken; - use #embassy_path::executor::raw::TaskStorage; - - type Fut = impl Future + 'static; - - #[allow(clippy::declare_interior_mutable_const)] - const NEW_TS: TaskStorage = TaskStorage::new(); - - static POOL: [TaskStorage; #pool_size] = [NEW_TS; #pool_size]; - - // Opaque type laundering, to obscure its origin! - // Workaround for "opaque type's hidden type cannot be another opaque type from the same scope" - // https://github.com/rust-lang/rust/issues/96406 - fn launder_tait(token: SpawnToken) -> SpawnToken { - token - } - - launder_tait(unsafe { TaskStorage::spawn_pool(&POOL, move || #task_inner_ident(#(#arg_names,)*)) }) + #visibility fn #task_ident(#fargs) -> #embassy_path::executor::SpawnToken { + type Fut = impl ::core::future::Future + 'static; + static POOL: #embassy_path::executor::raw::TaskPool = #embassy_path::executor::raw::TaskPool::new(); + unsafe { POOL._spawn_async_fn(move || #task_inner_ident(#(#arg_names,)*)) } } }; diff --git a/embassy/src/executor/raw/mod.rs b/embassy/src/executor/raw/mod.rs index 3ae52ae3..5cf399cd 100644 --- a/embassy/src/executor/raw/mod.rs +++ b/embassy/src/executor/raw/mod.rs @@ -110,8 +110,8 @@ impl TaskHeader { /// Raw storage in which a task can be spawned. /// /// This struct holds the necessary memory to spawn one task whose future is `F`. -/// At a given time, the `Task` may be in spawned or not-spawned state. You may spawn it -/// with [`Task::spawn()`], which will fail if it is already spawned. +/// At a given time, the `TaskStorage` may be in spawned or not-spawned state. You +/// may spawn it with [`TaskStorage::spawn()`], which will fail if it is already spawned. /// /// A `TaskStorage` must live forever, it may not be deallocated even after the task has finished /// running. Hence the relevant methods require `&'static self`. It may be reused, however. @@ -121,7 +121,7 @@ impl TaskHeader { /// the memory for the task is allocated: on the stack, or on the heap with e.g. `Box::leak`, etc. // repr(C) is needed to guarantee that the Task is located at offset 0 -// This makes it safe to cast between Task and Task pointers. +// This makes it safe to cast between TaskHeader and TaskStorage pointers. #[repr(C)] pub struct TaskStorage { raw: TaskHeader, @@ -129,6 +129,9 @@ pub struct TaskStorage { } impl TaskStorage { + #[cfg(feature = "nightly")] + const NEW: Self = Self::new(); + /// Create a new TaskStorage, in not-spawned state. #[cfg(feature = "nightly")] pub const fn new() -> Self { @@ -147,22 +150,6 @@ impl TaskStorage { } } - /// Try to spawn a task in a pool. - /// - /// See [`Self::spawn()`] for details. - /// - /// This will loop over the pool and spawn the task in the first storage that - /// is currently free. If none is free, - pub fn spawn_pool(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken { - for task in pool { - if task.spawn_allocate() { - return unsafe { task.spawn_initialize(future) }; - } - } - - SpawnToken::new_failed() - } - /// Try to spawn the task. /// /// The `future` closure constructs the future. It's only called if spawning is @@ -172,15 +159,15 @@ impl TaskStorage { /// /// This function will fail if the task is already spawned and has not finished running. /// In this case, the error is delayed: a "poisoned" SpawnToken is returned, which will - /// cause [`Executor::spawn()`] to return the error. + /// cause [`Spawner::spawn()`] to return the error. /// /// Once the task has finished running, you may spawn it again. It is allowed to spawn it /// on a different executor. - pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken { + pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken { if self.spawn_allocate() { - unsafe { self.spawn_initialize(future) } + unsafe { SpawnToken::::new(self.spawn_initialize(future)) } } else { - SpawnToken::new_failed() + SpawnToken::::new_failed() } } @@ -192,12 +179,11 @@ impl TaskStorage { .is_ok() } - unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> SpawnToken { + unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> NonNull { // Initialize the task self.raw.poll_fn.write(Self::poll); self.future.write(future()); - - SpawnToken::new(NonNull::new_unchecked(&self.raw as *const TaskHeader as _)) + NonNull::new_unchecked(&self.raw as *const TaskHeader as *mut TaskHeader) } unsafe fn poll(p: NonNull) { @@ -222,6 +208,89 @@ impl TaskStorage { unsafe impl Sync for TaskStorage {} +/// Raw storage that can hold up to N tasks of the same type. +/// +/// This is essentially a `[TaskStorage; N]`. +#[cfg(feature = "nightly")] +pub struct TaskPool { + pool: [TaskStorage; N], +} + +#[cfg(feature = "nightly")] +impl TaskPool { + /// Create a new TaskPool, with all tasks in non-spawned state. + pub const fn new() -> Self { + Self { + pool: [TaskStorage::NEW; N], + } + } + + /// Try to spawn a task in the pool. + /// + /// See [`TaskStorage::spawn()`] for details. + /// + /// This will loop over the pool and spawn the task in the first storage that + /// is currently free. If none is free, a "poisoned" SpawnToken is returned, + /// which will cause [`Spawner::spawn()`] to return the error. + pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken { + for task in &self.pool { + if task.spawn_allocate() { + return unsafe { SpawnToken::::new(task.spawn_initialize(future)) }; + } + } + + SpawnToken::::new_failed() + } + + /// Like spawn(), but allows the task to be send-spawned if the args are Send even if + /// the future is !Send. + /// + /// Not covered by semver guarantees. DO NOT call this directly. Intended to be used + /// by the Embassy macros ONLY. + /// + /// SAFETY: `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn` + /// is an `async fn`, NOT a hand-written `Future`. + #[doc(hidden)] + pub unsafe fn _spawn_async_fn(&'static self, future: FutFn) -> SpawnToken + where + FutFn: FnOnce() -> F, + { + // When send-spawning a task, we construct the future in this thread, and effectively + // "send" it to the executor thread by enqueuing it in its queue. Therefore, in theory, + // send-spawning should require the future `F` to be `Send`. + // + // The problem is this is more restrictive than needed. Once the future is executing, + // it is never sent to another thread. It is only sent when spawning. It should be + // enough for the task's arguments to be Send. (and in practice it's super easy to + // accidentally make your futures !Send, for example by holding an `Rc` or a `&RefCell` across an `.await`.) + // + // We can do it by sending the task args and constructing the future in the executor thread + // on first poll. However, this cannot be done in-place, so it'll waste stack space for a copy + // of the args. + // + // Luckily, an `async fn` future contains just the args when freshly constructed. So, if the + // args are Send, it's OK to send a !Send future, as long as we do it before first polling it. + // + // (Note: this is how the generators are implemented today, it's not officially guaranteed yet, + // but it's possible it'll be guaranteed in the future. See zulip thread: + // https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async/topic/.22only.20before.20poll.22.20Send.20futures ) + // + // The `FutFn` captures all the args, so if it's Send, the task can be send-spawned. + // This is why we return `SpawnToken` below. + // + // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly + // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken`. + + for task in &self.pool { + if task.spawn_allocate() { + return SpawnToken::::new(task.spawn_initialize(future)); + } + } + + SpawnToken::::new_failed() + } +} + /// Raw executor. /// /// This is the core of the Embassy executor. It is low-level, requiring manual diff --git a/embassy/src/executor/spawner.rs b/embassy/src/executor/spawner.rs index e6770e29..73c1f786 100644 --- a/embassy/src/executor/spawner.rs +++ b/embassy/src/executor/spawner.rs @@ -12,17 +12,21 @@ use super::raw; /// value is a `SpawnToken` that represents an instance of the task, ready to spawn. You must /// then spawn it into an executor, typically with [`Spawner::spawn()`]. /// +/// The generic parameter `S` determines whether the task can be spawned in executors +/// in other threads or not. If `S: Send`, it can, which allows spawning it into a [`SendSpawner`]. +/// If not, it can't, so it can only be spawned into the current thread's executor, with [`Spawner`]. +/// /// # Panics /// /// Dropping a SpawnToken instance panics. You may not "abort" spawning a task in this way. /// Once you've invoked a task function and obtained a SpawnToken, you *must* spawn it. #[must_use = "Calling a task function does nothing on its own. You must spawn the returned SpawnToken, typically with Spawner::spawn()"] -pub struct SpawnToken { +pub struct SpawnToken { raw_task: Option>, - phantom: PhantomData<*mut F>, + phantom: PhantomData<*mut S>, } -impl SpawnToken { +impl SpawnToken { pub(crate) unsafe fn new(raw_task: NonNull) -> Self { Self { raw_task: Some(raw_task), @@ -38,7 +42,7 @@ impl SpawnToken { } } -impl Drop for SpawnToken { +impl Drop for SpawnToken { fn drop(&mut self) { // TODO deallocate the task instead. panic!("SpawnToken instances may not be dropped. You must pass them to Spawner::spawn()") @@ -97,7 +101,7 @@ impl Spawner { /// Spawn a task into an executor. /// /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy::task]`). - pub fn spawn(&self, token: SpawnToken) -> Result<(), SpawnError> { + pub fn spawn(&self, token: SpawnToken) -> Result<(), SpawnError> { let task = token.raw_task; mem::forget(token); @@ -119,7 +123,7 @@ impl Spawner { /// # Panics /// /// Panics if the spawning fails. - pub fn must_spawn(&self, token: SpawnToken) { + pub fn must_spawn(&self, token: SpawnToken) { unwrap!(self.spawn(token)); } @@ -173,7 +177,7 @@ impl SendSpawner { /// Spawn a task into an executor. /// /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy::task]`). - pub fn spawn(&self, token: SpawnToken) -> Result<(), SpawnError> { + pub fn spawn(&self, token: SpawnToken) -> Result<(), SpawnError> { let header = token.raw_task; mem::forget(token); @@ -191,7 +195,7 @@ impl SendSpawner { /// # Panics /// /// Panics if the spawning fails. - pub fn must_spawn(&self, token: SpawnToken) { + pub fn must_spawn(&self, token: SpawnToken) { unwrap!(self.spawn(token)); } }