executor: make send-spawning only require the task args to be Send, not the whole future.
This commit is contained in:
parent
293f54d134
commit
6f6c16f449
@ -73,23 +73,10 @@ pub fn run(args: syn::AttributeArgs, f: syn::ItemFn) -> Result<TokenStream, Toke
|
|||||||
// in the user's code.
|
// in the user's code.
|
||||||
#task_inner
|
#task_inner
|
||||||
|
|
||||||
#visibility fn #task_ident(#fargs) -> #embassy_path::executor::SpawnToken<impl ::core::future::Future + 'static> {
|
#visibility fn #task_ident(#fargs) -> #embassy_path::executor::SpawnToken<impl Sized> {
|
||||||
use ::core::future::Future;
|
type Fut = impl ::core::future::Future + 'static;
|
||||||
use #embassy_path::executor::SpawnToken;
|
static POOL: #embassy_path::executor::raw::TaskPool<Fut, #pool_size> = #embassy_path::executor::raw::TaskPool::new();
|
||||||
use #embassy_path::executor::raw::TaskPool;
|
POOL.spawn(move || #task_inner_ident(#(#arg_names,)*))
|
||||||
|
|
||||||
type Fut = impl Future + 'static;
|
|
||||||
|
|
||||||
static POOL: TaskPool<Fut, #pool_size> = TaskPool::new();
|
|
||||||
|
|
||||||
// Opaque type laundering, to obscure its origin!
|
|
||||||
// Workaround for "opaque type's hidden type cannot be another opaque type from the same scope"
|
|
||||||
// https://github.com/rust-lang/rust/issues/96406
|
|
||||||
fn launder_tait(token: SpawnToken<impl Future+'static>) -> SpawnToken<impl Future+'static> {
|
|
||||||
token
|
|
||||||
}
|
|
||||||
|
|
||||||
launder_tait(POOL.spawn(move || #task_inner_ident(#(#arg_names,)*)))
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -110,8 +110,8 @@ impl TaskHeader {
|
|||||||
/// Raw storage in which a task can be spawned.
|
/// Raw storage in which a task can be spawned.
|
||||||
///
|
///
|
||||||
/// This struct holds the necessary memory to spawn one task whose future is `F`.
|
/// This struct holds the necessary memory to spawn one task whose future is `F`.
|
||||||
/// At a given time, the `Task` may be in spawned or not-spawned state. You may spawn it
|
/// At a given time, the `TaskStorage` may be in spawned or not-spawned state. You
|
||||||
/// with [`Task::spawn()`], which will fail if it is already spawned.
|
/// may spawn it with [`TaskStorage::spawn()`], which will fail if it is already spawned.
|
||||||
///
|
///
|
||||||
/// A `TaskStorage` must live forever, it may not be deallocated even after the task has finished
|
/// A `TaskStorage` must live forever, it may not be deallocated even after the task has finished
|
||||||
/// running. Hence the relevant methods require `&'static self`. It may be reused, however.
|
/// running. Hence the relevant methods require `&'static self`. It may be reused, however.
|
||||||
@ -159,11 +159,11 @@ impl<F: Future + 'static> TaskStorage<F> {
|
|||||||
///
|
///
|
||||||
/// This function will fail if the task is already spawned and has not finished running.
|
/// This function will fail if the task is already spawned and has not finished running.
|
||||||
/// In this case, the error is delayed: a "poisoned" SpawnToken is returned, which will
|
/// In this case, the error is delayed: a "poisoned" SpawnToken is returned, which will
|
||||||
/// cause [`Executor::spawn()`] to return the error.
|
/// cause [`Spawner::spawn()`] to return the error.
|
||||||
///
|
///
|
||||||
/// Once the task has finished running, you may spawn it again. It is allowed to spawn it
|
/// Once the task has finished running, you may spawn it again. It is allowed to spawn it
|
||||||
/// on a different executor.
|
/// on a different executor.
|
||||||
pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<F> {
|
pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
|
||||||
if self.spawn_allocate() {
|
if self.spawn_allocate() {
|
||||||
unsafe { self.spawn_initialize(future) }
|
unsafe { self.spawn_initialize(future) }
|
||||||
} else {
|
} else {
|
||||||
@ -179,12 +179,37 @@ impl<F: Future + 'static> TaskStorage<F> {
|
|||||||
.is_ok()
|
.is_ok()
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> SpawnToken<F> {
|
unsafe fn spawn_initialize<FutFn>(&'static self, future: FutFn) -> SpawnToken<impl Sized>
|
||||||
|
where
|
||||||
|
FutFn: FnOnce() -> F,
|
||||||
|
{
|
||||||
// Initialize the task
|
// Initialize the task
|
||||||
self.raw.poll_fn.write(Self::poll);
|
self.raw.poll_fn.write(Self::poll);
|
||||||
self.future.write(future());
|
self.future.write(future());
|
||||||
|
|
||||||
SpawnToken::new(NonNull::new_unchecked(&self.raw as *const TaskHeader as _))
|
// When send-spawning a task, we construct the future in this thread, and effectively
|
||||||
|
// "send" it to the executor thread by enqueuing it in its queue. Therefore, in theory,
|
||||||
|
// send-spawning should require the future `F` to be `Send`.
|
||||||
|
//
|
||||||
|
// The problem is this is more restrictive than needed. Once the future is executing,
|
||||||
|
// it is never sent to another thread. It is only sent when spawning. It should be
|
||||||
|
// enough for the task's arguments to be Send. (and in practice it's super easy to
|
||||||
|
// accidentally make your futures !Send, for example by holding an `Rc` or a `&RefCell` across an `.await`.)
|
||||||
|
//
|
||||||
|
// We can do it by sending the task args and constructing the future in the executor thread
|
||||||
|
// on first poll. However, this cannot be done in-place, so it'll waste stack space for a copy
|
||||||
|
// of the args.
|
||||||
|
//
|
||||||
|
// Luckily, an `async fn` future contains just the args when freshly constructed. So, if the
|
||||||
|
// args are Send, it's OK to send a !Send future, as long as we do it before first polling it.
|
||||||
|
//
|
||||||
|
// (Note: this is how the generators are implemented today, it's not officially guaranteed yet,
|
||||||
|
// but it's possible it'll be guaranteed in the future. See zulip thread:
|
||||||
|
// https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async/topic/.22only.20before.20poll.22.20Send.20futures )
|
||||||
|
//
|
||||||
|
// The `FutFn` captures all the args, so if it's Send, the task can be send-spawned.
|
||||||
|
// This is why we return `SpawnToken<FutFn>` below.
|
||||||
|
SpawnToken::<FutFn>::new(NonNull::new_unchecked(&self.raw as *const TaskHeader as _))
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe fn poll(p: NonNull<TaskHeader>) {
|
unsafe fn poll(p: NonNull<TaskHeader>) {
|
||||||
@ -232,8 +257,8 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
|
|||||||
///
|
///
|
||||||
/// This will loop over the pool and spawn the task in the first storage that
|
/// This will loop over the pool and spawn the task in the first storage that
|
||||||
/// is currently free. If none is free, a "poisoned" SpawnToken is returned,
|
/// is currently free. If none is free, a "poisoned" SpawnToken is returned,
|
||||||
/// which will cause [`Executor::spawn()`] to return the error.
|
/// which will cause [`Spawner::spawn()`] to return the error.
|
||||||
pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<F> {
|
pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
|
||||||
for task in &self.pool {
|
for task in &self.pool {
|
||||||
if task.spawn_allocate() {
|
if task.spawn_allocate() {
|
||||||
return unsafe { task.spawn_initialize(future) };
|
return unsafe { task.spawn_initialize(future) };
|
||||||
|
@ -12,17 +12,21 @@ use super::raw;
|
|||||||
/// value is a `SpawnToken` that represents an instance of the task, ready to spawn. You must
|
/// value is a `SpawnToken` that represents an instance of the task, ready to spawn. You must
|
||||||
/// then spawn it into an executor, typically with [`Spawner::spawn()`].
|
/// then spawn it into an executor, typically with [`Spawner::spawn()`].
|
||||||
///
|
///
|
||||||
|
/// The generic parameter `S` determines whether the task can be spawned in executors
|
||||||
|
/// in other threads or not. If `S: Send`, it can, which allows spawning it into a [`SendSpawner`].
|
||||||
|
/// If not, it can't, so it can only be spawned into the current thread's executor, with [`Spawner`].
|
||||||
|
///
|
||||||
/// # Panics
|
/// # Panics
|
||||||
///
|
///
|
||||||
/// Dropping a SpawnToken instance panics. You may not "abort" spawning a task in this way.
|
/// Dropping a SpawnToken instance panics. You may not "abort" spawning a task in this way.
|
||||||
/// Once you've invoked a task function and obtained a SpawnToken, you *must* spawn it.
|
/// Once you've invoked a task function and obtained a SpawnToken, you *must* spawn it.
|
||||||
#[must_use = "Calling a task function does nothing on its own. You must spawn the returned SpawnToken, typically with Spawner::spawn()"]
|
#[must_use = "Calling a task function does nothing on its own. You must spawn the returned SpawnToken, typically with Spawner::spawn()"]
|
||||||
pub struct SpawnToken<F> {
|
pub struct SpawnToken<S> {
|
||||||
raw_task: Option<NonNull<raw::TaskHeader>>,
|
raw_task: Option<NonNull<raw::TaskHeader>>,
|
||||||
phantom: PhantomData<*mut F>,
|
phantom: PhantomData<*mut S>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F> SpawnToken<F> {
|
impl<S> SpawnToken<S> {
|
||||||
pub(crate) unsafe fn new(raw_task: NonNull<raw::TaskHeader>) -> Self {
|
pub(crate) unsafe fn new(raw_task: NonNull<raw::TaskHeader>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
raw_task: Some(raw_task),
|
raw_task: Some(raw_task),
|
||||||
@ -38,7 +42,7 @@ impl<F> SpawnToken<F> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F> Drop for SpawnToken<F> {
|
impl<S> Drop for SpawnToken<S> {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
// TODO deallocate the task instead.
|
// TODO deallocate the task instead.
|
||||||
panic!("SpawnToken instances may not be dropped. You must pass them to Spawner::spawn()")
|
panic!("SpawnToken instances may not be dropped. You must pass them to Spawner::spawn()")
|
||||||
@ -97,7 +101,7 @@ impl Spawner {
|
|||||||
/// Spawn a task into an executor.
|
/// Spawn a task into an executor.
|
||||||
///
|
///
|
||||||
/// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy::task]`).
|
/// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy::task]`).
|
||||||
pub fn spawn<F>(&self, token: SpawnToken<F>) -> Result<(), SpawnError> {
|
pub fn spawn<S>(&self, token: SpawnToken<S>) -> Result<(), SpawnError> {
|
||||||
let task = token.raw_task;
|
let task = token.raw_task;
|
||||||
mem::forget(token);
|
mem::forget(token);
|
||||||
|
|
||||||
@ -119,7 +123,7 @@ impl Spawner {
|
|||||||
/// # Panics
|
/// # Panics
|
||||||
///
|
///
|
||||||
/// Panics if the spawning fails.
|
/// Panics if the spawning fails.
|
||||||
pub fn must_spawn<F>(&self, token: SpawnToken<F>) {
|
pub fn must_spawn<S>(&self, token: SpawnToken<S>) {
|
||||||
unwrap!(self.spawn(token));
|
unwrap!(self.spawn(token));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -173,7 +177,7 @@ impl SendSpawner {
|
|||||||
/// Spawn a task into an executor.
|
/// Spawn a task into an executor.
|
||||||
///
|
///
|
||||||
/// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy::task]`).
|
/// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy::task]`).
|
||||||
pub fn spawn<F: Send>(&self, token: SpawnToken<F>) -> Result<(), SpawnError> {
|
pub fn spawn<S: Send>(&self, token: SpawnToken<S>) -> Result<(), SpawnError> {
|
||||||
let header = token.raw_task;
|
let header = token.raw_task;
|
||||||
mem::forget(token);
|
mem::forget(token);
|
||||||
|
|
||||||
@ -191,7 +195,7 @@ impl SendSpawner {
|
|||||||
/// # Panics
|
/// # Panics
|
||||||
///
|
///
|
||||||
/// Panics if the spawning fails.
|
/// Panics if the spawning fails.
|
||||||
pub fn must_spawn<F: Send>(&self, token: SpawnToken<F>) {
|
pub fn must_spawn<S: Send>(&self, token: SpawnToken<S>) {
|
||||||
unwrap!(self.spawn(token));
|
unwrap!(self.spawn(token));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user