Executor API V2.

- It's no longer possible to call run() reentrantly from within a task (soundness issue)
- it's now possible to spawn Send tasks across threads (SendSpawner, #37)
This commit is contained in:
Dario Nieuwenhuis
2021-02-02 05:14:52 +01:00
parent d098952077
commit aeaa34d7a1
25 changed files with 495 additions and 377 deletions

View File

@ -2,129 +2,68 @@ pub use embassy_macros::task;
use core::future::Future;
use core::marker::PhantomData;
use core::mem;
use core::pin::Pin;
use core::ptr;
use core::ptr::NonNull;
use core::sync::atomic::{AtomicU32, Ordering};
use core::task::{Context, Poll, Waker};
use core::{
cell::{Cell, UnsafeCell},
cmp::min,
};
use core::sync::atomic::Ordering;
use core::task::{Context, Poll};
use core::{mem, ptr};
pub mod raw;
mod run_queue;
pub(crate) mod timer;
mod timer_queue;
mod util;
mod waker;
use self::run_queue::{RunQueue, RunQueueItem};
use self::timer_queue::{TimerQueue, TimerQueueItem};
use self::util::UninitCell;
use crate::{
fmt::{panic, *},
time::{Alarm, Instant},
};
use crate::fmt::{panic, *};
use crate::interrupt::OwnedInterrupt;
use crate::time::Alarm;
/// Task is spawned (has a future)
pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
/// Task is in the executor run queue
pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1;
/// Task is in the executor timer queue
pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
pub(crate) struct TaskHeader {
state: AtomicU32,
run_queue_item: RunQueueItem,
expires_at: Cell<Instant>,
timer_queue_item: TimerQueueItem,
executor: Cell<*const Executor>, // Valid if state != 0
poll_fn: UninitCell<unsafe fn(*mut TaskHeader)>, // Valid if STATE_SPAWNED
}
impl TaskHeader {
const fn new() -> Self {
Self {
state: AtomicU32::new(0),
expires_at: Cell::new(Instant::from_ticks(0)),
run_queue_item: RunQueueItem::new(),
timer_queue_item: TimerQueueItem::new(),
executor: Cell::new(ptr::null()),
poll_fn: UninitCell::uninit(),
}
}
pub(crate) unsafe fn enqueue(&self) {
let mut current = self.state.load(Ordering::Acquire);
loop {
// If already scheduled, or if not started,
if (current & STATE_RUN_QUEUED != 0) || (current & STATE_SPAWNED == 0) {
return;
}
// Mark it as scheduled
let new = current | STATE_RUN_QUEUED;
match self.state.compare_exchange_weak(
current,
new,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => break,
Err(next_current) => current = next_current,
}
}
// We have just marked the task as scheduled, so enqueue it.
let executor = &*self.executor.get();
executor.enqueue(self as *const TaskHeader as *mut TaskHeader);
}
}
// repr(C) is needed to guarantee that header is located at offset 0
// This makes it safe to cast between Header and Task pointers.
// repr(C) is needed to guarantee that the raw::Task is located at offset 0
// This makes it safe to cast between raw::Task and Task pointers.
#[repr(C)]
pub struct Task<F: Future + 'static> {
header: TaskHeader,
raw: raw::Task,
future: UninitCell<F>, // Valid if STATE_SPAWNED
}
impl<F: Future + 'static> Task<F> {
pub const fn new() -> Self {
Self {
header: TaskHeader::new(),
raw: raw::Task::new(),
future: UninitCell::uninit(),
}
}
pub unsafe fn spawn(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken {
pub unsafe fn spawn(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken<F> {
for task in pool {
let state = STATE_SPAWNED | STATE_RUN_QUEUED;
let state = raw::STATE_SPAWNED | raw::STATE_RUN_QUEUED;
if task
.header
.raw
.state
.compare_exchange(0, state, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
// Initialize the task
task.header.poll_fn.write(Self::poll);
task.raw.poll_fn.write(Self::poll);
task.future.write(future());
return SpawnToken {
header: Some(NonNull::new_unchecked(
&task.header as *const TaskHeader as _,
)),
raw_task: Some(NonNull::new_unchecked(&task.raw as *const raw::Task as _)),
phantom: PhantomData,
};
}
}
return SpawnToken { header: None };
return SpawnToken {
raw_task: None,
phantom: PhantomData,
};
}
unsafe fn poll(p: *mut TaskHeader) {
let this = &*(p as *const Task<F>);
unsafe fn poll(p: NonNull<raw::Task>) {
let this = &*(p.as_ptr() as *const Task<F>);
let future = Pin::new_unchecked(this.future.as_mut());
let waker = waker::from_task(p);
@ -132,9 +71,9 @@ impl<F: Future + 'static> Task<F> {
match future.poll(&mut cx) {
Poll::Ready(_) => {
this.future.drop_in_place();
this.header
this.raw
.state
.fetch_and(!STATE_SPAWNED, Ordering::AcqRel);
.fetch_and(!raw::STATE_SPAWNED, Ordering::AcqRel);
}
Poll::Pending => {}
}
@ -144,11 +83,12 @@ impl<F: Future + 'static> Task<F> {
unsafe impl<F: Future + 'static> Sync for Task<F> {}
#[must_use = "Calling a task function does nothing on its own. You must pass the returned SpawnToken to Executor::spawn()"]
pub struct SpawnToken {
header: Option<NonNull<TaskHeader>>,
pub struct SpawnToken<F> {
raw_task: Option<NonNull<raw::Task>>,
phantom: PhantomData<*mut F>,
}
impl Drop for SpawnToken {
impl<F> Drop for SpawnToken<F> {
fn drop(&mut self) {
// TODO deallocate the task instead.
panic!("SpawnToken instances may not be dropped. You must pass them to Executor::spawn()")
@ -161,116 +101,167 @@ pub enum SpawnError {
Busy,
}
pub struct Executor {
alarm: Option<&'static dyn Alarm>,
run_queue: RunQueue,
timer_queue: TimerQueue,
signal_fn: fn(),
/// Handle to spawn tasks into an executor.
///
/// This Spawner can spawn any task (Send and non-Send ones), but it can
/// only be used in the executor thread (it is not Send itself).
///
/// If you want to spawn tasks from another thread, use [SendSpawner].
pub struct Spawner {
executor: &'static raw::Executor,
not_send: PhantomData<*mut ()>,
}
impl Executor {
pub const fn new(signal_fn: fn()) -> Self {
impl Spawner {
fn new(executor: &'static raw::Executor) -> Self {
Self {
alarm: None,
run_queue: RunQueue::new(),
timer_queue: TimerQueue::new(),
signal_fn: signal_fn,
not_send: PhantomData,
}
}
pub const fn new_with_alarm(alarm: &'static dyn Alarm, signal_fn: fn()) -> Self {
Self {
alarm: Some(alarm),
run_queue: RunQueue::new(),
timer_queue: TimerQueue::new(),
signal_fn: signal_fn,
executor,
not_send: PhantomData,
}
}
unsafe fn enqueue(&self, item: *mut TaskHeader) {
if self.run_queue.enqueue(item) {
(self.signal_fn)()
}
}
/// Spawn a future on this executor.
pub fn spawn(&'static self, token: SpawnToken) -> Result<(), SpawnError> {
let header = token.header;
pub fn spawn<F>(&self, token: SpawnToken<F>) -> Result<(), SpawnError> {
let task = token.raw_task;
mem::forget(token);
match header {
Some(header) => unsafe {
let header = header.as_ref();
header.executor.set(self);
self.enqueue(header as *const _ as _);
match task {
Some(task) => {
unsafe { self.executor.spawn(task) };
Ok(())
},
}
None => Err(SpawnError::Busy),
}
}
/// Runs the executor until the queue is empty.
pub fn run(&self) {
unsafe {
if self.alarm.is_some() {
self.timer_queue.dequeue_expired(Instant::now(), |p| {
let header = &*p;
header.enqueue();
});
}
self.run_queue.dequeue_all(|p| {
let header = &*p;
header.expires_at.set(Instant::MAX);
let state = header.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
if state & STATE_SPAWNED == 0 {
// If task is not running, ignore it. This can happen in the following scenario:
// - Task gets dequeued, poll starts
// - While task is being polled, it gets woken. It gets placed in the queue.
// - Task poll finishes, returning done=true
// - RUNNING bit is cleared, but the task is already in the queue.
return;
}
// Run the task
header.poll_fn.read()(p as _);
// Enqueue or update into timer_queue
self.timer_queue.update(p);
});
// If this is in the past, set_alarm will immediately trigger the alarm,
// which will make the wfe immediately return so we do another loop iteration.
if let Some(alarm) = self.alarm {
let next_expiration = self.timer_queue.next_expiration();
alarm.set_callback(self.signal_fn);
alarm.set(next_expiration.as_ticks());
}
/// Convert this Spawner to a SendSpawner. This allows you to send the
/// spawner to other threads, but the spawner loses the ability to spawn
/// non-Send tasks.
pub fn make_send(&self) -> SendSpawner {
SendSpawner {
executor: self.executor,
not_send: PhantomData,
}
}
}
pub(crate) unsafe fn register_timer(at: Instant, waker: &Waker) {
let p = waker::task_from_waker(waker);
let header = &*p;
let expires_at = header.expires_at.get();
header.expires_at.set(min(expires_at, at));
/// Handle to spawn tasks into an executor from any thread.
///
/// This Spawner can be used from any thread (it implements Send and Sync, so after any task (Send and non-Send ones), but it can
/// only be used in the executor thread (it is not Send itself).
///
/// If you want to spawn tasks from another thread, use [SendSpawner].
pub struct SendSpawner {
executor: &'static raw::Executor,
not_send: PhantomData<*mut ()>,
}
pub mod raw {
use super::waker;
use core::ptr::NonNull;
use core::task::Waker;
unsafe impl Send for SendSpawner {}
unsafe impl Sync for SendSpawner {}
pub fn task_from_waker(waker: &Waker) -> NonNull<()> {
unsafe { NonNull::new_unchecked(waker::task_from_waker(waker) as *mut ()) }
/// Handle to spawn tasks to an executor.
///
/// This Spawner can spawn any task (Send and non-Send ones), but it can
/// only be used in the executor thread (it is not Send itself).
///
/// If you want to spawn tasks from another thread, use [SendSpawner].
impl SendSpawner {
fn new(executor: &'static raw::Executor) -> Self {
Self {
executor,
not_send: PhantomData,
}
}
pub unsafe fn wake_task(task: NonNull<()>) {
let header = &*waker::task_from_ptr(task.as_ptr());
header.enqueue();
pub fn spawn<F: Send>(&self, token: SpawnToken<F>) -> Result<(), SpawnError> {
let header = token.raw_task;
mem::forget(token);
match header {
Some(header) => {
unsafe { self.executor.spawn(header) };
Ok(())
}
None => Err(SpawnError::Busy),
}
}
}
pub struct Executor {
inner: raw::Executor,
not_send: PhantomData<*mut ()>,
}
impl Executor {
pub const fn new() -> Self {
Self {
inner: raw::Executor::new(|_| cortex_m::asm::sev(), ptr::null_mut()),
not_send: PhantomData,
}
}
pub fn set_alarm(&mut self, alarm: &'static dyn Alarm) {
self.inner.set_alarm(alarm);
}
/// Runs the executor.
///
/// This function never returns.
pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
init(Spawner::new(&self.inner));
loop {
unsafe { self.inner.run_queued() };
cortex_m::asm::wfe();
}
}
}
fn pend_by_number(n: u8) {
struct N(u8);
unsafe impl cortex_m::interrupt::Nr for N {
fn nr(&self) -> u8 {
self.0
}
}
cortex_m::peripheral::NVIC::pend(N(n))
}
pub struct IrqExecutor<I: OwnedInterrupt> {
irq: I,
inner: raw::Executor,
not_send: PhantomData<*mut ()>,
}
impl<I: OwnedInterrupt> IrqExecutor<I> {
pub fn new(irq: I) -> Self {
let ctx = irq.number() as *mut ();
Self {
irq,
inner: raw::Executor::new(|ctx| pend_by_number(ctx as u8), ctx),
not_send: PhantomData,
}
}
pub fn set_alarm(&mut self, alarm: &'static dyn Alarm) {
self.inner.set_alarm(alarm);
}
/// Start the executor.
///
/// `init` is called in the interrupt context, then the interrupt is
/// configured to run the executor.
pub fn start(&'static mut self, init: impl FnOnce(Spawner) + Send) {
self.irq.disable();
init(Spawner::new(&self.inner));
self.irq.set_handler(
|ctx| unsafe {
let executor = &*(ctx as *const raw::Executor);
executor.run_queued();
},
&self.inner as *const _ as _,
);
self.irq.enable();
}
}

154
embassy/src/executor/raw.rs Normal file
View File

@ -0,0 +1,154 @@
use core::cell::Cell;
use core::cmp::min;
use core::ptr;
use core::ptr::NonNull;
use core::sync::atomic::{AtomicU32, Ordering};
use core::task::Waker;
use super::run_queue::{RunQueue, RunQueueItem};
use super::timer_queue::{TimerQueue, TimerQueueItem};
use super::util::UninitCell;
use super::waker;
use crate::time::{Alarm, Instant};
/// Task is spawned (has a future)
pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
/// Task is in the executor run queue
pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1;
/// Task is in the executor timer queue
pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
pub struct Task {
pub(crate) state: AtomicU32,
pub(crate) run_queue_item: RunQueueItem,
pub(crate) expires_at: Cell<Instant>,
pub(crate) timer_queue_item: TimerQueueItem,
pub(crate) executor: Cell<*const Executor>, // Valid if state != 0
pub(crate) poll_fn: UninitCell<unsafe fn(NonNull<Task>)>, // Valid if STATE_SPAWNED
}
impl Task {
pub(crate) const fn new() -> Self {
Self {
state: AtomicU32::new(0),
expires_at: Cell::new(Instant::from_ticks(0)),
run_queue_item: RunQueueItem::new(),
timer_queue_item: TimerQueueItem::new(),
executor: Cell::new(ptr::null()),
poll_fn: UninitCell::uninit(),
}
}
pub(crate) unsafe fn enqueue(&self) {
let mut current = self.state.load(Ordering::Acquire);
loop {
// If already scheduled, or if not started,
if (current & STATE_RUN_QUEUED != 0) || (current & STATE_SPAWNED == 0) {
return;
}
// Mark it as scheduled
let new = current | STATE_RUN_QUEUED;
match self.state.compare_exchange_weak(
current,
new,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => break,
Err(next_current) => current = next_current,
}
}
// We have just marked the task as scheduled, so enqueue it.
let executor = &*self.executor.get();
executor.enqueue(self as *const Task as *mut Task);
}
}
pub(crate) struct Executor {
run_queue: RunQueue,
timer_queue: TimerQueue,
signal_fn: fn(*mut ()),
signal_ctx: *mut (),
alarm: Option<&'static dyn Alarm>,
}
impl Executor {
pub(crate) const fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self {
Self {
run_queue: RunQueue::new(),
timer_queue: TimerQueue::new(),
signal_fn,
signal_ctx,
alarm: None,
}
}
pub(crate) fn set_alarm(&mut self, alarm: &'static dyn Alarm) {
self.alarm = Some(alarm);
}
unsafe fn enqueue(&self, item: *mut Task) {
if self.run_queue.enqueue(item) {
(self.signal_fn)(self.signal_ctx)
}
}
pub(crate) unsafe fn spawn(&'static self, task: NonNull<Task>) {
let task = task.as_ref();
task.executor.set(self);
self.enqueue(task as *const _ as _);
}
pub(crate) unsafe fn run_queued(&self) {
if self.alarm.is_some() {
self.timer_queue.dequeue_expired(Instant::now(), |p| {
p.as_ref().enqueue();
});
}
self.run_queue.dequeue_all(|p| {
let task = p.as_ref();
task.expires_at.set(Instant::MAX);
let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
if state & STATE_SPAWNED == 0 {
// If task is not running, ignore it. This can happen in the following scenario:
// - Task gets dequeued, poll starts
// - While task is being polled, it gets woken. It gets placed in the queue.
// - Task poll finishes, returning done=true
// - RUNNING bit is cleared, but the task is already in the queue.
return;
}
// Run the task
task.poll_fn.read()(p as _);
// Enqueue or update into timer_queue
self.timer_queue.update(p);
});
// If this is in the past, set_alarm will immediately trigger the alarm,
// which will make the wfe immediately return so we do another loop iteration.
if let Some(alarm) = self.alarm {
let next_expiration = self.timer_queue.next_expiration();
alarm.set_callback(self.signal_fn, self.signal_ctx);
alarm.set(next_expiration.as_ticks());
}
}
}
pub use super::waker::task_from_waker;
pub unsafe fn wake_task(task: NonNull<Task>) {
task.as_ref().enqueue();
}
pub(crate) unsafe fn register_timer(at: Instant, waker: &Waker) {
let task = waker::task_from_waker(waker);
let task = task.as_ref();
let expires_at = task.expires_at.get();
task.expires_at.set(min(expires_at, at));
}

View File

@ -1,10 +1,11 @@
use core::ptr;
use core::ptr::NonNull;
use core::sync::atomic::{AtomicPtr, Ordering};
use super::TaskHeader;
use super::raw::Task;
pub(crate) struct RunQueueItem {
next: AtomicPtr<TaskHeader>,
next: AtomicPtr<Task>,
}
impl RunQueueItem {
@ -27,7 +28,7 @@ impl RunQueueItem {
/// current batch is completely processed, so even if a task enqueues itself instantly (for example
/// by waking its own waker) can't prevent other tasks from running.
pub(crate) struct RunQueue {
head: AtomicPtr<TaskHeader>,
head: AtomicPtr<Task>,
}
impl RunQueue {
@ -38,7 +39,7 @@ impl RunQueue {
}
/// Enqueues an item. Returns true if the queue was empty.
pub(crate) unsafe fn enqueue(&self, item: *mut TaskHeader) -> bool {
pub(crate) unsafe fn enqueue(&self, item: *mut Task) -> bool {
let mut prev = self.head.load(Ordering::Acquire);
loop {
(*item).run_queue_item.next.store(prev, Ordering::Relaxed);
@ -54,7 +55,7 @@ impl RunQueue {
prev.is_null()
}
pub(crate) unsafe fn dequeue_all(&self, on_task: impl Fn(*mut TaskHeader)) {
pub(crate) unsafe fn dequeue_all(&self, on_task: impl Fn(NonNull<Task>)) {
let mut task = self.head.swap(ptr::null_mut(), Ordering::AcqRel);
while !task.is_null() {
@ -62,7 +63,7 @@ impl RunQueue {
// Therefore, first read the next pointer, and only then process the task.
let next = (*task).run_queue_item.next.load(Ordering::Relaxed);
on_task(task);
on_task(NonNull::new_unchecked(task));
task = next
}

View File

@ -3,6 +3,7 @@ use core::pin::Pin;
use core::task::{Context, Poll};
use futures::Stream;
use super::raw;
use crate::time::{Duration, Instant};
pub struct Timer {
@ -34,7 +35,7 @@ impl Future for Timer {
if self.yielded_once && self.expires_at <= Instant::now() {
Poll::Ready(())
} else {
unsafe { super::register_timer(self.expires_at, cx.waker()) };
unsafe { raw::register_timer(self.expires_at, cx.waker()) };
self.yielded_once = true;
Poll::Pending
}
@ -66,7 +67,7 @@ impl Stream for Ticker {
self.expires_at += dur;
Poll::Ready(Some(()))
} else {
unsafe { super::register_timer(self.expires_at, cx.waker()) };
unsafe { raw::register_timer(self.expires_at, cx.waker()) };
Poll::Pending
}
}

View File

@ -1,13 +1,14 @@
use core::cell::Cell;
use core::cmp::min;
use core::ptr;
use core::ptr::NonNull;
use core::sync::atomic::{AtomicPtr, Ordering};
use core::{cmp::min, ptr};
use super::raw::{Task, STATE_TIMER_QUEUED};
use crate::time::Instant;
use super::{TaskHeader, STATE_TIMER_QUEUED};
pub(crate) struct TimerQueueItem {
next: Cell<*mut TaskHeader>,
next: Cell<*mut Task>,
}
impl TimerQueueItem {
@ -19,7 +20,7 @@ impl TimerQueueItem {
}
pub(crate) struct TimerQueue {
head: Cell<*mut TaskHeader>,
head: Cell<*mut Task>,
}
impl TimerQueue {
@ -29,15 +30,15 @@ impl TimerQueue {
}
}
pub(crate) unsafe fn update(&self, p: *mut TaskHeader) {
let header = &*p;
if header.expires_at.get() != Instant::MAX {
let old_state = header.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel);
pub(crate) unsafe fn update(&self, p: NonNull<Task>) {
let task = p.as_ref();
if task.expires_at.get() != Instant::MAX {
let old_state = task.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel);
let is_new = old_state & STATE_TIMER_QUEUED == 0;
if is_new {
header.timer_queue_item.next.set(self.head.get());
self.head.set(p);
task.timer_queue_item.next.set(self.head.get());
self.head.set(p.as_ptr());
}
}
}
@ -45,18 +46,18 @@ impl TimerQueue {
pub(crate) unsafe fn next_expiration(&self) -> Instant {
let mut res = Instant::MAX;
self.retain(|p| {
let header = &*p;
let expires = header.expires_at.get();
let task = p.as_ref();
let expires = task.expires_at.get();
res = min(res, expires);
expires != Instant::MAX
});
res
}
pub(crate) unsafe fn dequeue_expired(&self, now: Instant, on_task: impl Fn(*mut TaskHeader)) {
pub(crate) unsafe fn dequeue_expired(&self, now: Instant, on_task: impl Fn(NonNull<Task>)) {
self.retain(|p| {
let header = &*p;
if header.expires_at.get() <= now {
let task = p.as_ref();
if task.expires_at.get() <= now {
on_task(p);
false
} else {
@ -65,20 +66,18 @@ impl TimerQueue {
});
}
pub(crate) unsafe fn retain(&self, mut f: impl FnMut(*mut TaskHeader) -> bool) {
pub(crate) unsafe fn retain(&self, mut f: impl FnMut(NonNull<Task>) -> bool) {
let mut prev = &self.head;
while !prev.get().is_null() {
let p = prev.get();
let header = &*p;
let p = NonNull::new_unchecked(prev.get());
let task = &*p.as_ptr();
if f(p) {
// Skip to next
prev = &header.timer_queue_item.next;
prev = &task.timer_queue_item.next;
} else {
// Remove it
prev.set(header.timer_queue_item.next.get());
header
.state
.fetch_and(!STATE_TIMER_QUEUED, Ordering::AcqRel);
prev.set(task.timer_queue_item.next.get());
task.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::AcqRel);
}
}
}

View File

@ -1,7 +1,8 @@
use core::mem;
use core::ptr::NonNull;
use core::task::{RawWaker, RawWakerVTable, Waker};
use super::TaskHeader;
use super::raw::Task;
const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop);
@ -10,26 +11,21 @@ unsafe fn clone(p: *const ()) -> RawWaker {
}
unsafe fn wake(p: *const ()) {
let header = &*task_from_ptr(p);
header.enqueue();
(*(p as *mut Task)).enqueue()
}
unsafe fn drop(_: *const ()) {
// nop
}
pub(crate) unsafe fn from_task(p: *mut TaskHeader) -> Waker {
Waker::from_raw(RawWaker::new(p as _, &VTABLE))
pub(crate) unsafe fn from_task(p: NonNull<Task>) -> Waker {
Waker::from_raw(RawWaker::new(p.as_ptr() as _, &VTABLE))
}
pub(crate) unsafe fn task_from_ptr(p: *const ()) -> *mut TaskHeader {
p as *mut TaskHeader
}
pub(crate) unsafe fn task_from_waker(w: &Waker) -> *mut TaskHeader {
let w: &WakerHack = mem::transmute(w);
assert_eq!(w.vtable, &VTABLE);
task_from_ptr(w.data)
pub unsafe fn task_from_waker(waker: &Waker) -> NonNull<Task> {
let hack: &WakerHack = mem::transmute(waker);
assert_eq!(hack.vtable, &VTABLE);
NonNull::new_unchecked(hack.data as *mut Task)
}
struct WakerHack {

View File

@ -32,6 +32,7 @@ unsafe impl cortex_m::interrupt::Nr for NrWrap {
pub unsafe trait OwnedInterrupt {
type Priority: From<u8> + Into<u8> + Copy;
fn number(&self) -> u8;
unsafe fn steal() -> Self;
/// Implementation detail, do not use outside embassy crates.
#[doc(hidden)]

View File

@ -2,7 +2,6 @@
#![feature(generic_associated_types)]
#![feature(const_fn)]
#![feature(const_fn_fn_ptr_basics)]
#![feature(const_in_array_repeat_expressions)]
#![feature(const_option)]
// This mod MUST go first, so that the others see its macros.

View File

@ -16,7 +16,7 @@ impl<T: Clock + ?Sized> Clock for &T {
pub trait Alarm {
/// Sets the callback function to be called when the alarm triggers.
/// The callback may be called from any context (interrupt or thread mode).
fn set_callback(&self, callback: fn());
fn set_callback(&self, callback: fn(*mut ()), ctx: *mut ());
/// Sets an alarm at the given timestamp. When the clock reaches that
/// timestamp, the provided callback funcion will be called.
@ -32,8 +32,8 @@ pub trait Alarm {
}
impl<T: Alarm + ?Sized> Alarm for &T {
fn set_callback(&self, callback: fn()) {
T::set_callback(self, callback);
fn set_callback(&self, callback: fn(*mut ()), ctx: *mut ()) {
T::set_callback(self, callback, ctx);
}
fn set(&self, timestamp: u64) {
T::set(self, timestamp);

View File

@ -110,7 +110,7 @@ impl<'a, I: OwnedInterrupt> InterruptFuture<'a, I> {
};
if ctx as *const _ != ptr::null() {
executor::raw::wake_task(ptr::NonNull::new_unchecked(ctx));
executor::raw::wake_task(ptr::NonNull::new_unchecked(ctx as _));
}
NVIC::mask(NrWrap(irq));
@ -124,10 +124,8 @@ impl<'a, I: OwnedInterrupt> Future for InterruptFuture<'a, I> {
fn poll(self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let s = unsafe { self.get_unchecked_mut() };
s.interrupt.set_handler(
Self::interrupt_handler,
executor::raw::task_from_waker(&cx.waker()).cast().as_ptr(),
);
let ctx = unsafe { executor::raw::task_from_waker(&cx.waker()).cast().as_ptr() };
s.interrupt.set_handler(Self::interrupt_handler, ctx);
if s.interrupt.is_enabled() {
Poll::Pending
} else {