From 293f54d13406850d24d1226eb77989f4fa8db9f4 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Wed, 27 Apr 2022 03:23:54 +0200 Subject: [PATCH 1/3] executor: add raw::TaskPool. This simplifies the macro code a bit. --- embassy-macros/src/macros/task.rs | 9 ++--- embassy/src/executor/raw/mod.rs | 56 +++++++++++++++++++++---------- 2 files changed, 42 insertions(+), 23 deletions(-) diff --git a/embassy-macros/src/macros/task.rs b/embassy-macros/src/macros/task.rs index 396ce18f..c450982c 100644 --- a/embassy-macros/src/macros/task.rs +++ b/embassy-macros/src/macros/task.rs @@ -76,14 +76,11 @@ pub fn run(args: syn::AttributeArgs, f: syn::ItemFn) -> Result #embassy_path::executor::SpawnToken { use ::core::future::Future; use #embassy_path::executor::SpawnToken; - use #embassy_path::executor::raw::TaskStorage; + use #embassy_path::executor::raw::TaskPool; type Fut = impl Future + 'static; - #[allow(clippy::declare_interior_mutable_const)] - const NEW_TS: TaskStorage = TaskStorage::new(); - - static POOL: [TaskStorage; #pool_size] = [NEW_TS; #pool_size]; + static POOL: TaskPool = TaskPool::new(); // Opaque type laundering, to obscure its origin! // Workaround for "opaque type's hidden type cannot be another opaque type from the same scope" @@ -92,7 +89,7 @@ pub fn run(args: syn::AttributeArgs, f: syn::ItemFn) -> Result { raw: TaskHeader, @@ -129,6 +129,9 @@ pub struct TaskStorage { } impl TaskStorage { + #[cfg(feature = "nightly")] + const NEW: Self = Self::new(); + /// Create a new TaskStorage, in not-spawned state. #[cfg(feature = "nightly")] pub const fn new() -> Self { @@ -147,22 +150,6 @@ impl TaskStorage { } } - /// Try to spawn a task in a pool. - /// - /// See [`Self::spawn()`] for details. - /// - /// This will loop over the pool and spawn the task in the first storage that - /// is currently free. If none is free, - pub fn spawn_pool(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken { - for task in pool { - if task.spawn_allocate() { - return unsafe { task.spawn_initialize(future) }; - } - } - - SpawnToken::new_failed() - } - /// Try to spawn the task. /// /// The `future` closure constructs the future. It's only called if spawning is @@ -222,6 +209,41 @@ impl TaskStorage { unsafe impl Sync for TaskStorage {} +/// Raw storage that can hold up to N tasks of the same type. +/// +/// This is essentially a `[TaskStorage; N]`. +#[cfg(feature = "nightly")] +pub struct TaskPool { + pool: [TaskStorage; N], +} + +#[cfg(feature = "nightly")] +impl TaskPool { + /// Create a new TaskPool, with all tasks in non-spawned state. + pub const fn new() -> Self { + Self { + pool: [TaskStorage::NEW; N], + } + } + + /// Try to spawn a task in the pool. + /// + /// See [`TaskStorage::spawn()`] for details. + /// + /// This will loop over the pool and spawn the task in the first storage that + /// is currently free. If none is free, a "poisoned" SpawnToken is returned, + /// which will cause [`Executor::spawn()`] to return the error. + pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken { + for task in &self.pool { + if task.spawn_allocate() { + return unsafe { task.spawn_initialize(future) }; + } + } + + SpawnToken::new_failed() + } +} + /// Raw executor. /// /// This is the core of the Embassy executor. It is low-level, requiring manual From 6f6c16f44924de4d71d0e5e3acc0908f2dd474e6 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Wed, 27 Apr 2022 04:27:42 +0200 Subject: [PATCH 2/3] executor: make send-spawning only require the task args to be Send, not the whole future. --- embassy-macros/src/macros/task.rs | 21 +++------------- embassy/src/executor/raw/mod.rs | 41 +++++++++++++++++++++++++------ embassy/src/executor/spawner.rs | 20 +++++++++------ 3 files changed, 49 insertions(+), 33 deletions(-) diff --git a/embassy-macros/src/macros/task.rs b/embassy-macros/src/macros/task.rs index c450982c..96932d77 100644 --- a/embassy-macros/src/macros/task.rs +++ b/embassy-macros/src/macros/task.rs @@ -73,23 +73,10 @@ pub fn run(args: syn::AttributeArgs, f: syn::ItemFn) -> Result #embassy_path::executor::SpawnToken { - use ::core::future::Future; - use #embassy_path::executor::SpawnToken; - use #embassy_path::executor::raw::TaskPool; - - type Fut = impl Future + 'static; - - static POOL: TaskPool = TaskPool::new(); - - // Opaque type laundering, to obscure its origin! - // Workaround for "opaque type's hidden type cannot be another opaque type from the same scope" - // https://github.com/rust-lang/rust/issues/96406 - fn launder_tait(token: SpawnToken) -> SpawnToken { - token - } - - launder_tait(POOL.spawn(move || #task_inner_ident(#(#arg_names,)*))) + #visibility fn #task_ident(#fargs) -> #embassy_path::executor::SpawnToken { + type Fut = impl ::core::future::Future + 'static; + static POOL: #embassy_path::executor::raw::TaskPool = #embassy_path::executor::raw::TaskPool::new(); + POOL.spawn(move || #task_inner_ident(#(#arg_names,)*)) } }; diff --git a/embassy/src/executor/raw/mod.rs b/embassy/src/executor/raw/mod.rs index 5af35d86..6b14b8e8 100644 --- a/embassy/src/executor/raw/mod.rs +++ b/embassy/src/executor/raw/mod.rs @@ -110,8 +110,8 @@ impl TaskHeader { /// Raw storage in which a task can be spawned. /// /// This struct holds the necessary memory to spawn one task whose future is `F`. -/// At a given time, the `Task` may be in spawned or not-spawned state. You may spawn it -/// with [`Task::spawn()`], which will fail if it is already spawned. +/// At a given time, the `TaskStorage` may be in spawned or not-spawned state. You +/// may spawn it with [`TaskStorage::spawn()`], which will fail if it is already spawned. /// /// A `TaskStorage` must live forever, it may not be deallocated even after the task has finished /// running. Hence the relevant methods require `&'static self`. It may be reused, however. @@ -159,11 +159,11 @@ impl TaskStorage { /// /// This function will fail if the task is already spawned and has not finished running. /// In this case, the error is delayed: a "poisoned" SpawnToken is returned, which will - /// cause [`Executor::spawn()`] to return the error. + /// cause [`Spawner::spawn()`] to return the error. /// /// Once the task has finished running, you may spawn it again. It is allowed to spawn it /// on a different executor. - pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken { + pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken { if self.spawn_allocate() { unsafe { self.spawn_initialize(future) } } else { @@ -179,12 +179,37 @@ impl TaskStorage { .is_ok() } - unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> SpawnToken { + unsafe fn spawn_initialize(&'static self, future: FutFn) -> SpawnToken + where + FutFn: FnOnce() -> F, + { // Initialize the task self.raw.poll_fn.write(Self::poll); self.future.write(future()); - SpawnToken::new(NonNull::new_unchecked(&self.raw as *const TaskHeader as _)) + // When send-spawning a task, we construct the future in this thread, and effectively + // "send" it to the executor thread by enqueuing it in its queue. Therefore, in theory, + // send-spawning should require the future `F` to be `Send`. + // + // The problem is this is more restrictive than needed. Once the future is executing, + // it is never sent to another thread. It is only sent when spawning. It should be + // enough for the task's arguments to be Send. (and in practice it's super easy to + // accidentally make your futures !Send, for example by holding an `Rc` or a `&RefCell` across an `.await`.) + // + // We can do it by sending the task args and constructing the future in the executor thread + // on first poll. However, this cannot be done in-place, so it'll waste stack space for a copy + // of the args. + // + // Luckily, an `async fn` future contains just the args when freshly constructed. So, if the + // args are Send, it's OK to send a !Send future, as long as we do it before first polling it. + // + // (Note: this is how the generators are implemented today, it's not officially guaranteed yet, + // but it's possible it'll be guaranteed in the future. See zulip thread: + // https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async/topic/.22only.20before.20poll.22.20Send.20futures ) + // + // The `FutFn` captures all the args, so if it's Send, the task can be send-spawned. + // This is why we return `SpawnToken` below. + SpawnToken::::new(NonNull::new_unchecked(&self.raw as *const TaskHeader as _)) } unsafe fn poll(p: NonNull) { @@ -232,8 +257,8 @@ impl TaskPool { /// /// This will loop over the pool and spawn the task in the first storage that /// is currently free. If none is free, a "poisoned" SpawnToken is returned, - /// which will cause [`Executor::spawn()`] to return the error. - pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken { + /// which will cause [`Spawner::spawn()`] to return the error. + pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken { for task in &self.pool { if task.spawn_allocate() { return unsafe { task.spawn_initialize(future) }; diff --git a/embassy/src/executor/spawner.rs b/embassy/src/executor/spawner.rs index e6770e29..73c1f786 100644 --- a/embassy/src/executor/spawner.rs +++ b/embassy/src/executor/spawner.rs @@ -12,17 +12,21 @@ use super::raw; /// value is a `SpawnToken` that represents an instance of the task, ready to spawn. You must /// then spawn it into an executor, typically with [`Spawner::spawn()`]. /// +/// The generic parameter `S` determines whether the task can be spawned in executors +/// in other threads or not. If `S: Send`, it can, which allows spawning it into a [`SendSpawner`]. +/// If not, it can't, so it can only be spawned into the current thread's executor, with [`Spawner`]. +/// /// # Panics /// /// Dropping a SpawnToken instance panics. You may not "abort" spawning a task in this way. /// Once you've invoked a task function and obtained a SpawnToken, you *must* spawn it. #[must_use = "Calling a task function does nothing on its own. You must spawn the returned SpawnToken, typically with Spawner::spawn()"] -pub struct SpawnToken { +pub struct SpawnToken { raw_task: Option>, - phantom: PhantomData<*mut F>, + phantom: PhantomData<*mut S>, } -impl SpawnToken { +impl SpawnToken { pub(crate) unsafe fn new(raw_task: NonNull) -> Self { Self { raw_task: Some(raw_task), @@ -38,7 +42,7 @@ impl SpawnToken { } } -impl Drop for SpawnToken { +impl Drop for SpawnToken { fn drop(&mut self) { // TODO deallocate the task instead. panic!("SpawnToken instances may not be dropped. You must pass them to Spawner::spawn()") @@ -97,7 +101,7 @@ impl Spawner { /// Spawn a task into an executor. /// /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy::task]`). - pub fn spawn(&self, token: SpawnToken) -> Result<(), SpawnError> { + pub fn spawn(&self, token: SpawnToken) -> Result<(), SpawnError> { let task = token.raw_task; mem::forget(token); @@ -119,7 +123,7 @@ impl Spawner { /// # Panics /// /// Panics if the spawning fails. - pub fn must_spawn(&self, token: SpawnToken) { + pub fn must_spawn(&self, token: SpawnToken) { unwrap!(self.spawn(token)); } @@ -173,7 +177,7 @@ impl SendSpawner { /// Spawn a task into an executor. /// /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy::task]`). - pub fn spawn(&self, token: SpawnToken) -> Result<(), SpawnError> { + pub fn spawn(&self, token: SpawnToken) -> Result<(), SpawnError> { let header = token.raw_task; mem::forget(token); @@ -191,7 +195,7 @@ impl SendSpawner { /// # Panics /// /// Panics if the spawning fails. - pub fn must_spawn(&self, token: SpawnToken) { + pub fn must_spawn(&self, token: SpawnToken) { unwrap!(self.spawn(token)); } } From 1599009a4f5fe1a0f9596b7b27bfd9fd84366377 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Wed, 27 Apr 2022 04:45:23 +0200 Subject: [PATCH 3/3] executor: "send-spawn is OK if the args are Send" only holds for async fn futures. The normal `spawn()` methods can be called directly by the user, with arbitrary hand-implemented futures. We can't enforce they're only called with `async fn` futures. Therefore, make these require `F: Send`, and add a "private" one only for use in the macro, which can enforce it. --- embassy-macros/src/macros/task.rs | 2 +- embassy/src/executor/raw/mod.rs | 86 +++++++++++++++++++------------ 2 files changed, 55 insertions(+), 33 deletions(-) diff --git a/embassy-macros/src/macros/task.rs b/embassy-macros/src/macros/task.rs index 96932d77..e48de3d6 100644 --- a/embassy-macros/src/macros/task.rs +++ b/embassy-macros/src/macros/task.rs @@ -76,7 +76,7 @@ pub fn run(args: syn::AttributeArgs, f: syn::ItemFn) -> Result #embassy_path::executor::SpawnToken { type Fut = impl ::core::future::Future + 'static; static POOL: #embassy_path::executor::raw::TaskPool = #embassy_path::executor::raw::TaskPool::new(); - POOL.spawn(move || #task_inner_ident(#(#arg_names,)*)) + unsafe { POOL._spawn_async_fn(move || #task_inner_ident(#(#arg_names,)*)) } } }; diff --git a/embassy/src/executor/raw/mod.rs b/embassy/src/executor/raw/mod.rs index 6b14b8e8..5cf399cd 100644 --- a/embassy/src/executor/raw/mod.rs +++ b/embassy/src/executor/raw/mod.rs @@ -165,9 +165,9 @@ impl TaskStorage { /// on a different executor. pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken { if self.spawn_allocate() { - unsafe { self.spawn_initialize(future) } + unsafe { SpawnToken::::new(self.spawn_initialize(future)) } } else { - SpawnToken::new_failed() + SpawnToken::::new_failed() } } @@ -179,37 +179,11 @@ impl TaskStorage { .is_ok() } - unsafe fn spawn_initialize(&'static self, future: FutFn) -> SpawnToken - where - FutFn: FnOnce() -> F, - { + unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> NonNull { // Initialize the task self.raw.poll_fn.write(Self::poll); self.future.write(future()); - - // When send-spawning a task, we construct the future in this thread, and effectively - // "send" it to the executor thread by enqueuing it in its queue. Therefore, in theory, - // send-spawning should require the future `F` to be `Send`. - // - // The problem is this is more restrictive than needed. Once the future is executing, - // it is never sent to another thread. It is only sent when spawning. It should be - // enough for the task's arguments to be Send. (and in practice it's super easy to - // accidentally make your futures !Send, for example by holding an `Rc` or a `&RefCell` across an `.await`.) - // - // We can do it by sending the task args and constructing the future in the executor thread - // on first poll. However, this cannot be done in-place, so it'll waste stack space for a copy - // of the args. - // - // Luckily, an `async fn` future contains just the args when freshly constructed. So, if the - // args are Send, it's OK to send a !Send future, as long as we do it before first polling it. - // - // (Note: this is how the generators are implemented today, it's not officially guaranteed yet, - // but it's possible it'll be guaranteed in the future. See zulip thread: - // https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async/topic/.22only.20before.20poll.22.20Send.20futures ) - // - // The `FutFn` captures all the args, so if it's Send, the task can be send-spawned. - // This is why we return `SpawnToken` below. - SpawnToken::::new(NonNull::new_unchecked(&self.raw as *const TaskHeader as _)) + NonNull::new_unchecked(&self.raw as *const TaskHeader as *mut TaskHeader) } unsafe fn poll(p: NonNull) { @@ -261,11 +235,59 @@ impl TaskPool { pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken { for task in &self.pool { if task.spawn_allocate() { - return unsafe { task.spawn_initialize(future) }; + return unsafe { SpawnToken::::new(task.spawn_initialize(future)) }; } } - SpawnToken::new_failed() + SpawnToken::::new_failed() + } + + /// Like spawn(), but allows the task to be send-spawned if the args are Send even if + /// the future is !Send. + /// + /// Not covered by semver guarantees. DO NOT call this directly. Intended to be used + /// by the Embassy macros ONLY. + /// + /// SAFETY: `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn` + /// is an `async fn`, NOT a hand-written `Future`. + #[doc(hidden)] + pub unsafe fn _spawn_async_fn(&'static self, future: FutFn) -> SpawnToken + where + FutFn: FnOnce() -> F, + { + // When send-spawning a task, we construct the future in this thread, and effectively + // "send" it to the executor thread by enqueuing it in its queue. Therefore, in theory, + // send-spawning should require the future `F` to be `Send`. + // + // The problem is this is more restrictive than needed. Once the future is executing, + // it is never sent to another thread. It is only sent when spawning. It should be + // enough for the task's arguments to be Send. (and in practice it's super easy to + // accidentally make your futures !Send, for example by holding an `Rc` or a `&RefCell` across an `.await`.) + // + // We can do it by sending the task args and constructing the future in the executor thread + // on first poll. However, this cannot be done in-place, so it'll waste stack space for a copy + // of the args. + // + // Luckily, an `async fn` future contains just the args when freshly constructed. So, if the + // args are Send, it's OK to send a !Send future, as long as we do it before first polling it. + // + // (Note: this is how the generators are implemented today, it's not officially guaranteed yet, + // but it's possible it'll be guaranteed in the future. See zulip thread: + // https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async/topic/.22only.20before.20poll.22.20Send.20futures ) + // + // The `FutFn` captures all the args, so if it's Send, the task can be send-spawned. + // This is why we return `SpawnToken` below. + // + // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly + // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken`. + + for task in &self.pool { + if task.spawn_allocate() { + return SpawnToken::::new(task.spawn_initialize(future)); + } + } + + SpawnToken::::new_failed() } }