diff --git a/.vscode/settings.json b/.vscode/settings.json index 8c53d209..a6d083ad 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,4 +1,5 @@ { + "rust-analyzer.assist.importMergeBehavior": "last", "editor.formatOnSave": true, "rust-analyzer.cargo.allFeatures": false, "rust-analyzer.checkOnSave.allFeatures": false, @@ -8,4 +9,4 @@ "**/.git/subtree-cache/**": true, "**/target/**": true } -} +} \ No newline at end of file diff --git a/embassy-macros/src/lib.rs b/embassy-macros/src/lib.rs index cb16f65a..23f1cda9 100644 --- a/embassy-macros/src/lib.rs +++ b/embassy-macros/src/lib.rs @@ -11,21 +11,23 @@ use syn::spanned::Spanned; struct MacroArgs { #[darling(default)] pool_size: Option, + #[darling(default)] + send: bool, } #[proc_macro_attribute] pub fn task(args: TokenStream, item: TokenStream) -> TokenStream { - let args = syn::parse_macro_input!(args as syn::AttributeArgs); + let macro_args = syn::parse_macro_input!(args as syn::AttributeArgs); let mut task_fn = syn::parse_macro_input!(item as syn::ItemFn); - let args = match MacroArgs::from_list(&args) { + let macro_args = match MacroArgs::from_list(¯o_args) { Ok(v) => v, Err(e) => { return TokenStream::from(e.write_errors()); } }; - let pool_size: usize = args.pool_size.unwrap_or(1); + let pool_size: usize = macro_args.pool_size.unwrap_or(1); let mut fail = false; if task_fn.sig.asyncness.is_none() { @@ -90,11 +92,16 @@ pub fn task(args: TokenStream, item: TokenStream) -> TokenStream { let visibility = &task_fn.vis; task_fn.sig.ident = format_ident!("task"); + let impl_ty = if macro_args.send { + quote!(impl ::core::future::Future + Send + 'static) + } else { + quote!(impl ::core::future::Future + 'static) + }; let result = quote! { - #visibility fn #name(#args) -> ::embassy::executor::SpawnToken { + #visibility fn #name(#args) -> ::embassy::executor::SpawnToken<#impl_ty> { #task_fn - type F = impl ::core::future::Future + 'static; + type F = #impl_ty; static POOL: [::embassy::executor::Task; #pool_size] = [::embassy::executor::Task::new(); #pool_size]; unsafe { ::embassy::executor::Task::spawn(&POOL, move || task(#arg_names)) } } @@ -119,6 +126,9 @@ pub fn interrupt_declare(item: TokenStream) -> TokenStream { let irq = Interrupt::#name; irq.nr() as u8 } + unsafe fn steal() -> Self { + Self(()) + } unsafe fn __handler(&self) -> &'static ::embassy::interrupt::Handler { #[export_name = #name_handler] static HANDLER: ::embassy::interrupt::Handler = ::embassy::interrupt::Handler::new(); diff --git a/embassy-nrf-examples/src/bin/buffered_uart.rs b/embassy-nrf-examples/src/bin/buffered_uart.rs index 57c6b4cf..7c7283fc 100644 --- a/embassy-nrf-examples/src/bin/buffered_uart.rs +++ b/embassy-nrf-examples/src/bin/buffered_uart.rs @@ -83,11 +83,8 @@ static EXECUTOR: Forever = Forever::new(); fn main() -> ! { info!("Hello World!"); - let executor = EXECUTOR.put(Executor::new(cortex_m::asm::sev)); - unwrap!(executor.spawn(run())); - - loop { - executor.run(); - cortex_m::asm::wfe(); - } + let executor = EXECUTOR.put(Executor::new()); + executor.run(|spawner| { + unwrap!(spawner.spawn(run())); + }); } diff --git a/embassy-nrf-examples/src/bin/executor_fairness_test.rs b/embassy-nrf-examples/src/bin/executor_fairness_test.rs index 9b2c1bd2..1b995573 100644 --- a/embassy-nrf-examples/src/bin/executor_fairness_test.rs +++ b/embassy-nrf-examples/src/bin/executor_fairness_test.rs @@ -61,14 +61,11 @@ fn main() -> ! { unsafe { embassy::time::set_clock(rtc) }; let alarm = ALARM.put(rtc.alarm0()); - let executor = EXECUTOR.put(Executor::new_with_alarm(alarm, cortex_m::asm::sev)); - - unwrap!(executor.spawn(run1())); - unwrap!(executor.spawn(run2())); - unwrap!(executor.spawn(run3())); - - loop { - executor.run(); - cortex_m::asm::wfe(); - } + let executor = EXECUTOR.put(Executor::new()); + executor.set_alarm(alarm); + executor.run(|spawner| { + unwrap!(spawner.spawn(run1())); + unwrap!(spawner.spawn(run2())); + unwrap!(spawner.spawn(run3())); + }); } diff --git a/embassy-nrf-examples/src/bin/gpiote.rs b/embassy-nrf-examples/src/bin/gpiote.rs index afa1b85d..f5315d6a 100644 --- a/embassy-nrf-examples/src/bin/gpiote.rs +++ b/embassy-nrf-examples/src/bin/gpiote.rs @@ -73,11 +73,8 @@ static EXECUTOR: Forever = Forever::new(); fn main() -> ! { info!("Hello World!"); - let executor = EXECUTOR.put(Executor::new(cortex_m::asm::sev)); - unwrap!(executor.spawn(run())); - - loop { - executor.run(); - cortex_m::asm::wfe(); - } + let executor = EXECUTOR.put(Executor::new()); + executor.run(|spawner| { + unwrap!(spawner.spawn(run())); + }); } diff --git a/embassy-nrf-examples/src/bin/gpiote_port.rs b/embassy-nrf-examples/src/bin/gpiote_port.rs index f5aa8132..833096f3 100644 --- a/embassy-nrf-examples/src/bin/gpiote_port.rs +++ b/embassy-nrf-examples/src/bin/gpiote_port.rs @@ -52,11 +52,8 @@ static EXECUTOR: Forever = Forever::new(); fn main() -> ! { info!("Hello World!"); - let executor = EXECUTOR.put(Executor::new(cortex_m::asm::sev)); - unwrap!(executor.spawn(run())); - - loop { - executor.run(); - cortex_m::asm::wfe(); - } + let executor = EXECUTOR.put(Executor::new()); + executor.run(|spawner| { + unwrap!(spawner.spawn(run())); + }); } diff --git a/embassy-nrf-examples/src/bin/multiprio.rs b/embassy-nrf-examples/src/bin/multiprio.rs index c821e3db..8c2ec19a 100644 --- a/embassy-nrf-examples/src/bin/multiprio.rs +++ b/embassy-nrf-examples/src/bin/multiprio.rs @@ -66,9 +66,10 @@ use cortex_m_rt::entry; use defmt::panic; use nrf52840_hal::clocks; -use embassy::executor::{task, Executor}; +use embassy::executor::{task, Executor, IrqExecutor}; use embassy::time::{Duration, Instant, Timer}; use embassy::util::Forever; +use embassy_nrf::interrupt::OwnedInterrupt; use embassy_nrf::{interrupt, pac, rtc}; #[task] @@ -114,12 +115,12 @@ async fn run_low() { } static RTC: Forever> = Forever::new(); +static ALARM_HIGH: Forever> = Forever::new(); +static EXECUTOR_HIGH: Forever> = Forever::new(); +static ALARM_MED: Forever> = Forever::new(); +static EXECUTOR_MED: Forever> = Forever::new(); static ALARM_LOW: Forever> = Forever::new(); static EXECUTOR_LOW: Forever = Forever::new(); -static ALARM_MED: Forever> = Forever::new(); -static EXECUTOR_MED: Forever = Forever::new(); -static ALARM_HIGH: Forever> = Forever::new(); -static EXECUTOR_HIGH: Forever = Forever::new(); #[entry] fn main() -> ! { @@ -136,41 +137,31 @@ fn main() -> ! { rtc.start(); unsafe { embassy::time::set_clock(rtc) }; - let alarm_low = ALARM_LOW.put(rtc.alarm0()); - let executor_low = EXECUTOR_LOW.put(Executor::new_with_alarm(alarm_low, cortex_m::asm::sev)); - let alarm_med = ALARM_MED.put(rtc.alarm1()); - let executor_med = EXECUTOR_MED.put(Executor::new_with_alarm(alarm_med, || { - NVIC::pend(interrupt::SWI0_EGU0) - })); - let alarm_high = ALARM_HIGH.put(rtc.alarm2()); - let executor_high = EXECUTOR_HIGH.put(Executor::new_with_alarm(alarm_high, || { - NVIC::pend(interrupt::SWI1_EGU1) - })); + // High-priority executor: SWI1_EGU1, priority level 6 + let irq = interrupt::take!(SWI1_EGU1); + irq.set_priority(interrupt::Priority::Level6); + let alarm = ALARM_HIGH.put(rtc.alarm2()); + let executor = EXECUTOR_HIGH.put(IrqExecutor::new(irq)); + executor.set_alarm(alarm); + executor.start(|spawner| { + unwrap!(spawner.spawn(run_high())); + }); - unsafe { - let mut nvic: NVIC = core::mem::transmute(()); - nvic.set_priority(interrupt::SWI0_EGU0, 7 << 5); - nvic.set_priority(interrupt::SWI1_EGU1, 6 << 5); - NVIC::unmask(interrupt::SWI0_EGU0); - NVIC::unmask(interrupt::SWI1_EGU1); - } + // Medium-priority executor: SWI0_EGU0, priority level 7 + let irq = interrupt::take!(SWI0_EGU0); + irq.set_priority(interrupt::Priority::Level7); + let alarm = ALARM_MED.put(rtc.alarm1()); + let executor = EXECUTOR_MED.put(IrqExecutor::new(irq)); + executor.set_alarm(alarm); + executor.start(|spawner| { + unwrap!(spawner.spawn(run_med())); + }); - unwrap!(executor_low.spawn(run_low())); - unwrap!(executor_med.spawn(run_med())); - unwrap!(executor_high.spawn(run_high())); - - loop { - executor_low.run(); - cortex_m::asm::wfe(); - } -} - -#[interrupt] -unsafe fn SWI0_EGU0() { - EXECUTOR_MED.steal().run() -} - -#[interrupt] -unsafe fn SWI1_EGU1() { - EXECUTOR_HIGH.steal().run() + // Low priority executor: runs in thread mode, using WFE/SEV + let alarm = ALARM_LOW.put(rtc.alarm0()); + let executor = EXECUTOR_LOW.put(Executor::new()); + executor.set_alarm(alarm); + executor.run(|spawner| { + unwrap!(spawner.spawn(run_low())); + }); } diff --git a/embassy-nrf-examples/src/bin/qspi.rs b/embassy-nrf-examples/src/bin/qspi.rs index a7d47f79..4edbd3f9 100644 --- a/embassy-nrf-examples/src/bin/qspi.rs +++ b/embassy-nrf-examples/src/bin/qspi.rs @@ -124,11 +124,8 @@ static EXECUTOR: Forever = Forever::new(); fn main() -> ! { info!("Hello World!"); - let executor = EXECUTOR.put(Executor::new(cortex_m::asm::sev)); - unwrap!(executor.spawn(run())); - - loop { - executor.run(); - cortex_m::asm::wfe(); - } + let executor = EXECUTOR.put(Executor::new()); + executor.run(|spawner| { + unwrap!(spawner.spawn(run())); + }); } diff --git a/embassy-nrf-examples/src/bin/rtc_async.rs b/embassy-nrf-examples/src/bin/rtc_async.rs index dcdeb704..5260c69a 100644 --- a/embassy-nrf-examples/src/bin/rtc_async.rs +++ b/embassy-nrf-examples/src/bin/rtc_async.rs @@ -53,13 +53,10 @@ fn main() -> ! { unsafe { embassy::time::set_clock(rtc) }; let alarm = ALARM.put(rtc.alarm0()); - let executor = EXECUTOR.put(Executor::new_with_alarm(alarm, cortex_m::asm::sev)); - - unwrap!(executor.spawn(run1())); - unwrap!(executor.spawn(run2())); - - loop { - executor.run(); - cortex_m::asm::wfe(); - } + let executor = EXECUTOR.put(Executor::new()); + executor.set_alarm(alarm); + executor.run(|spawner| { + unwrap!(spawner.spawn(run1())); + unwrap!(spawner.spawn(run2())); + }); } diff --git a/embassy-nrf-examples/src/bin/rtc_raw.rs b/embassy-nrf-examples/src/bin/rtc_raw.rs index 43858546..7c60bb56 100644 --- a/embassy-nrf-examples/src/bin/rtc_raw.rs +++ b/embassy-nrf-examples/src/bin/rtc_raw.rs @@ -38,7 +38,7 @@ fn main() -> ! { rtc.start(); - alarm.set_callback(|| info!("ALARM TRIGGERED")); + alarm.set_callback(|_| info!("ALARM TRIGGERED"), core::ptr::null_mut()); alarm.set(53719); info!("initialized!"); diff --git a/embassy-nrf-examples/src/bin/uart.rs b/embassy-nrf-examples/src/bin/uart.rs index cb38e8fc..c5468d32 100644 --- a/embassy-nrf-examples/src/bin/uart.rs +++ b/embassy-nrf-examples/src/bin/uart.rs @@ -18,7 +18,31 @@ use nrf52840_hal::clocks; use nrf52840_hal::gpio; #[task] -async fn run(mut uart: uarte::Uarte) { +async fn run(uart: pac::UARTE0, port: pac::P0) { + // Init UART + let port0 = gpio::p0::Parts::new(port); + + let pins = uarte::Pins { + rxd: port0.p0_08.into_floating_input().degrade(), + txd: port0 + .p0_06 + .into_push_pull_output(gpio::Level::Low) + .degrade(), + cts: None, + rts: None, + }; + + // NOTE(unsafe): Safe becasue we do not use `mem::forget` anywhere. + let mut uart = unsafe { + uarte::Uarte::new( + uart, + interrupt::take!(UARTE0_UART0), + pins, + uarte::Parity::EXCLUDED, + uarte::Baudrate::BAUD115200, + ) + }; + info!("uarte initialized!"); // Message must be in SRAM @@ -81,36 +105,12 @@ fn main() -> ! { unsafe { embassy::time::set_clock(rtc) }; let alarm = ALARM.put(rtc.alarm0()); - let executor = EXECUTOR.put(Executor::new_with_alarm(alarm, cortex_m::asm::sev)); + let executor = EXECUTOR.put(Executor::new()); + executor.set_alarm(alarm); - // Init UART - let port0 = gpio::p0::Parts::new(p.P0); - - let pins = uarte::Pins { - rxd: port0.p0_08.into_floating_input().degrade(), - txd: port0 - .p0_06 - .into_push_pull_output(gpio::Level::Low) - .degrade(), - cts: None, - rts: None, - }; - - // NOTE(unsafe): Safe becasue we do not use `mem::forget` anywhere. - let uart = unsafe { - uarte::Uarte::new( - p.UARTE0, - interrupt::take!(UARTE0_UART0), - pins, - uarte::Parity::EXCLUDED, - uarte::Baudrate::BAUD115200, - ) - }; - - unwrap!(executor.spawn(run(uart))); - - loop { - executor.run(); - cortex_m::asm::wfe(); - } + let uarte0 = p.UARTE0; + let p0 = p.P0; + executor.run(|spawner| { + unwrap!(spawner.spawn(run(uarte0, p0))); + }); } diff --git a/embassy-nrf/src/rtc.rs b/embassy-nrf/src/rtc.rs index 01558394..dde0fd4c 100644 --- a/embassy-nrf/src/rtc.rs +++ b/embassy-nrf/src/rtc.rs @@ -40,7 +40,7 @@ mod test { struct AlarmState { timestamp: Cell, - callback: Cell>, + callback: Cell>, } impl AlarmState { @@ -159,13 +159,13 @@ impl RTC { alarm.timestamp.set(u64::MAX); // Call after clearing alarm, so the callback can set another alarm. - alarm.callback.get().map(|f| f()); + alarm.callback.get().map(|(f, ctx)| f(ctx)); } - fn set_alarm_callback(&self, n: usize, callback: fn()) { + fn set_alarm_callback(&self, n: usize, callback: fn(*mut ()), ctx: *mut ()) { interrupt::free(|cs| { let alarm = &self.alarms.borrow(cs)[n]; - alarm.callback.set(Some(callback)); + alarm.callback.set(Some((callback, ctx))); }) } @@ -220,8 +220,8 @@ pub struct Alarm { } impl embassy::time::Alarm for Alarm { - fn set_callback(&self, callback: fn()) { - self.rtc.set_alarm_callback(self.n, callback); + fn set_callback(&self, callback: fn(*mut ()), ctx: *mut ()) { + self.rtc.set_alarm_callback(self.n, callback, ctx); } fn set(&self, timestamp: u64) { diff --git a/embassy-stm32f4-examples/src/bin/exti.rs b/embassy-stm32f4-examples/src/bin/exti.rs index 879d0fa2..ec4490b1 100644 --- a/embassy-stm32f4-examples/src/bin/exti.rs +++ b/embassy-stm32f4-examples/src/bin/exti.rs @@ -49,11 +49,8 @@ fn main() -> ! { let dp = stm32::Peripherals::take().unwrap(); let cp = cortex_m::peripheral::Peripherals::take().unwrap(); - let executor = EXECUTOR.put(Executor::new(cortex_m::asm::sev)); - executor.spawn(run(dp, cp)).unwrap(); - - loop { - executor.run(); - //cortex_m::asm::wfe(); // wfe causes RTT to stop working on stm32 - } + let executor = EXECUTOR.put(Executor::new()); + executor.run(|spawner| { + unwrap!(spawner.spawn(run(dp, cp))); + }); } diff --git a/embassy-stm32f4-examples/src/bin/serial.rs b/embassy-stm32f4-examples/src/bin/serial.rs index 93c32b3f..7338d4fe 100644 --- a/embassy-stm32f4-examples/src/bin/serial.rs +++ b/embassy-stm32f4-examples/src/bin/serial.rs @@ -59,11 +59,8 @@ fn main() -> ! { let dp = stm32::Peripherals::take().unwrap(); let cp = cortex_m::peripheral::Peripherals::take().unwrap(); - let executor = EXECUTOR.put(Executor::new(cortex_m::asm::sev)); - executor.spawn(run(dp, cp)).unwrap(); - - loop { - executor.run(); - //cortex_m::asm::wfe(); // wfe causes RTT to stop working on stm32 - } + let executor = EXECUTOR.put(Executor::new()); + executor.run(|spawner| { + unwrap!(spawner.spawn(run(dp, cp))); + }); } diff --git a/embassy/src/executor/mod.rs b/embassy/src/executor/mod.rs index 922b0fe0..7c74fa58 100644 --- a/embassy/src/executor/mod.rs +++ b/embassy/src/executor/mod.rs @@ -2,129 +2,68 @@ pub use embassy_macros::task; use core::future::Future; use core::marker::PhantomData; -use core::mem; use core::pin::Pin; -use core::ptr; use core::ptr::NonNull; -use core::sync::atomic::{AtomicU32, Ordering}; -use core::task::{Context, Poll, Waker}; -use core::{ - cell::{Cell, UnsafeCell}, - cmp::min, -}; +use core::sync::atomic::Ordering; +use core::task::{Context, Poll}; +use core::{mem, ptr}; +pub mod raw; mod run_queue; pub(crate) mod timer; mod timer_queue; mod util; mod waker; -use self::run_queue::{RunQueue, RunQueueItem}; -use self::timer_queue::{TimerQueue, TimerQueueItem}; use self::util::UninitCell; -use crate::{ - fmt::{panic, *}, - time::{Alarm, Instant}, -}; +use crate::fmt::{panic, *}; +use crate::interrupt::OwnedInterrupt; +use crate::time::Alarm; -/// Task is spawned (has a future) -pub(crate) const STATE_SPAWNED: u32 = 1 << 0; -/// Task is in the executor run queue -pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; -/// Task is in the executor timer queue -pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; - -pub(crate) struct TaskHeader { - state: AtomicU32, - run_queue_item: RunQueueItem, - expires_at: Cell, - timer_queue_item: TimerQueueItem, - executor: Cell<*const Executor>, // Valid if state != 0 - poll_fn: UninitCell, // Valid if STATE_SPAWNED -} - -impl TaskHeader { - const fn new() -> Self { - Self { - state: AtomicU32::new(0), - expires_at: Cell::new(Instant::from_ticks(0)), - run_queue_item: RunQueueItem::new(), - timer_queue_item: TimerQueueItem::new(), - executor: Cell::new(ptr::null()), - poll_fn: UninitCell::uninit(), - } - } - - pub(crate) unsafe fn enqueue(&self) { - let mut current = self.state.load(Ordering::Acquire); - loop { - // If already scheduled, or if not started, - if (current & STATE_RUN_QUEUED != 0) || (current & STATE_SPAWNED == 0) { - return; - } - - // Mark it as scheduled - let new = current | STATE_RUN_QUEUED; - - match self.state.compare_exchange_weak( - current, - new, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => break, - Err(next_current) => current = next_current, - } - } - - // We have just marked the task as scheduled, so enqueue it. - let executor = &*self.executor.get(); - executor.enqueue(self as *const TaskHeader as *mut TaskHeader); - } -} - -// repr(C) is needed to guarantee that header is located at offset 0 -// This makes it safe to cast between Header and Task pointers. +// repr(C) is needed to guarantee that the raw::Task is located at offset 0 +// This makes it safe to cast between raw::Task and Task pointers. #[repr(C)] pub struct Task { - header: TaskHeader, + raw: raw::Task, future: UninitCell, // Valid if STATE_SPAWNED } impl Task { pub const fn new() -> Self { Self { - header: TaskHeader::new(), + raw: raw::Task::new(), future: UninitCell::uninit(), } } - pub unsafe fn spawn(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken { + pub unsafe fn spawn(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken { for task in pool { - let state = STATE_SPAWNED | STATE_RUN_QUEUED; + let state = raw::STATE_SPAWNED | raw::STATE_RUN_QUEUED; if task - .header + .raw .state .compare_exchange(0, state, Ordering::AcqRel, Ordering::Acquire) .is_ok() { // Initialize the task - task.header.poll_fn.write(Self::poll); + task.raw.poll_fn.write(Self::poll); task.future.write(future()); return SpawnToken { - header: Some(NonNull::new_unchecked( - &task.header as *const TaskHeader as _, - )), + raw_task: Some(NonNull::new_unchecked(&task.raw as *const raw::Task as _)), + phantom: PhantomData, }; } } - return SpawnToken { header: None }; + return SpawnToken { + raw_task: None, + phantom: PhantomData, + }; } - unsafe fn poll(p: *mut TaskHeader) { - let this = &*(p as *const Task); + unsafe fn poll(p: NonNull) { + let this = &*(p.as_ptr() as *const Task); let future = Pin::new_unchecked(this.future.as_mut()); let waker = waker::from_task(p); @@ -132,9 +71,9 @@ impl Task { match future.poll(&mut cx) { Poll::Ready(_) => { this.future.drop_in_place(); - this.header + this.raw .state - .fetch_and(!STATE_SPAWNED, Ordering::AcqRel); + .fetch_and(!raw::STATE_SPAWNED, Ordering::AcqRel); } Poll::Pending => {} } @@ -144,11 +83,12 @@ impl Task { unsafe impl Sync for Task {} #[must_use = "Calling a task function does nothing on its own. You must pass the returned SpawnToken to Executor::spawn()"] -pub struct SpawnToken { - header: Option>, +pub struct SpawnToken { + raw_task: Option>, + phantom: PhantomData<*mut F>, } -impl Drop for SpawnToken { +impl Drop for SpawnToken { fn drop(&mut self) { // TODO deallocate the task instead. panic!("SpawnToken instances may not be dropped. You must pass them to Executor::spawn()") @@ -161,116 +101,167 @@ pub enum SpawnError { Busy, } -pub struct Executor { - alarm: Option<&'static dyn Alarm>, - run_queue: RunQueue, - timer_queue: TimerQueue, - signal_fn: fn(), +/// Handle to spawn tasks into an executor. +/// +/// This Spawner can spawn any task (Send and non-Send ones), but it can +/// only be used in the executor thread (it is not Send itself). +/// +/// If you want to spawn tasks from another thread, use [SendSpawner]. +pub struct Spawner { + executor: &'static raw::Executor, not_send: PhantomData<*mut ()>, } -impl Executor { - pub const fn new(signal_fn: fn()) -> Self { +impl Spawner { + fn new(executor: &'static raw::Executor) -> Self { Self { - alarm: None, - run_queue: RunQueue::new(), - timer_queue: TimerQueue::new(), - signal_fn: signal_fn, - not_send: PhantomData, - } - } - pub const fn new_with_alarm(alarm: &'static dyn Alarm, signal_fn: fn()) -> Self { - Self { - alarm: Some(alarm), - run_queue: RunQueue::new(), - timer_queue: TimerQueue::new(), - signal_fn: signal_fn, + executor, not_send: PhantomData, } } - unsafe fn enqueue(&self, item: *mut TaskHeader) { - if self.run_queue.enqueue(item) { - (self.signal_fn)() - } - } - - /// Spawn a future on this executor. - pub fn spawn(&'static self, token: SpawnToken) -> Result<(), SpawnError> { - let header = token.header; + pub fn spawn(&self, token: SpawnToken) -> Result<(), SpawnError> { + let task = token.raw_task; mem::forget(token); - match header { - Some(header) => unsafe { - let header = header.as_ref(); - header.executor.set(self); - self.enqueue(header as *const _ as _); + match task { + Some(task) => { + unsafe { self.executor.spawn(task) }; Ok(()) - }, + } None => Err(SpawnError::Busy), } } - /// Runs the executor until the queue is empty. - pub fn run(&self) { - unsafe { - if self.alarm.is_some() { - self.timer_queue.dequeue_expired(Instant::now(), |p| { - let header = &*p; - header.enqueue(); - }); - } - - self.run_queue.dequeue_all(|p| { - let header = &*p; - header.expires_at.set(Instant::MAX); - - let state = header.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); - if state & STATE_SPAWNED == 0 { - // If task is not running, ignore it. This can happen in the following scenario: - // - Task gets dequeued, poll starts - // - While task is being polled, it gets woken. It gets placed in the queue. - // - Task poll finishes, returning done=true - // - RUNNING bit is cleared, but the task is already in the queue. - return; - } - - // Run the task - header.poll_fn.read()(p as _); - - // Enqueue or update into timer_queue - self.timer_queue.update(p); - }); - - // If this is in the past, set_alarm will immediately trigger the alarm, - // which will make the wfe immediately return so we do another loop iteration. - if let Some(alarm) = self.alarm { - let next_expiration = self.timer_queue.next_expiration(); - alarm.set_callback(self.signal_fn); - alarm.set(next_expiration.as_ticks()); - } + /// Convert this Spawner to a SendSpawner. This allows you to send the + /// spawner to other threads, but the spawner loses the ability to spawn + /// non-Send tasks. + pub fn make_send(&self) -> SendSpawner { + SendSpawner { + executor: self.executor, + not_send: PhantomData, } } } -pub(crate) unsafe fn register_timer(at: Instant, waker: &Waker) { - let p = waker::task_from_waker(waker); - let header = &*p; - let expires_at = header.expires_at.get(); - header.expires_at.set(min(expires_at, at)); +/// Handle to spawn tasks into an executor from any thread. +/// +/// This Spawner can be used from any thread (it implements Send and Sync, so after any task (Send and non-Send ones), but it can +/// only be used in the executor thread (it is not Send itself). +/// +/// If you want to spawn tasks from another thread, use [SendSpawner]. +pub struct SendSpawner { + executor: &'static raw::Executor, + not_send: PhantomData<*mut ()>, } -pub mod raw { - use super::waker; - use core::ptr::NonNull; - use core::task::Waker; +unsafe impl Send for SendSpawner {} +unsafe impl Sync for SendSpawner {} - pub fn task_from_waker(waker: &Waker) -> NonNull<()> { - unsafe { NonNull::new_unchecked(waker::task_from_waker(waker) as *mut ()) } +/// Handle to spawn tasks to an executor. +/// +/// This Spawner can spawn any task (Send and non-Send ones), but it can +/// only be used in the executor thread (it is not Send itself). +/// +/// If you want to spawn tasks from another thread, use [SendSpawner]. +impl SendSpawner { + fn new(executor: &'static raw::Executor) -> Self { + Self { + executor, + not_send: PhantomData, + } } - pub unsafe fn wake_task(task: NonNull<()>) { - let header = &*waker::task_from_ptr(task.as_ptr()); - header.enqueue(); + pub fn spawn(&self, token: SpawnToken) -> Result<(), SpawnError> { + let header = token.raw_task; + mem::forget(token); + + match header { + Some(header) => { + unsafe { self.executor.spawn(header) }; + Ok(()) + } + None => Err(SpawnError::Busy), + } + } +} + +pub struct Executor { + inner: raw::Executor, + not_send: PhantomData<*mut ()>, +} + +impl Executor { + pub const fn new() -> Self { + Self { + inner: raw::Executor::new(|_| cortex_m::asm::sev(), ptr::null_mut()), + not_send: PhantomData, + } + } + + pub fn set_alarm(&mut self, alarm: &'static dyn Alarm) { + self.inner.set_alarm(alarm); + } + + /// Runs the executor. + /// + /// This function never returns. + pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { + init(Spawner::new(&self.inner)); + + loop { + unsafe { self.inner.run_queued() }; + cortex_m::asm::wfe(); + } + } +} + +fn pend_by_number(n: u8) { + struct N(u8); + unsafe impl cortex_m::interrupt::Nr for N { + fn nr(&self) -> u8 { + self.0 + } + } + cortex_m::peripheral::NVIC::pend(N(n)) +} + +pub struct IrqExecutor { + irq: I, + inner: raw::Executor, + not_send: PhantomData<*mut ()>, +} + +impl IrqExecutor { + pub fn new(irq: I) -> Self { + let ctx = irq.number() as *mut (); + Self { + irq, + inner: raw::Executor::new(|ctx| pend_by_number(ctx as u8), ctx), + not_send: PhantomData, + } + } + + pub fn set_alarm(&mut self, alarm: &'static dyn Alarm) { + self.inner.set_alarm(alarm); + } + + /// Start the executor. + /// + /// `init` is called in the interrupt context, then the interrupt is + /// configured to run the executor. + pub fn start(&'static mut self, init: impl FnOnce(Spawner) + Send) { + self.irq.disable(); + + init(Spawner::new(&self.inner)); + + self.irq.set_handler( + |ctx| unsafe { + let executor = &*(ctx as *const raw::Executor); + executor.run_queued(); + }, + &self.inner as *const _ as _, + ); + self.irq.enable(); } } diff --git a/embassy/src/executor/raw.rs b/embassy/src/executor/raw.rs new file mode 100644 index 00000000..927b6a42 --- /dev/null +++ b/embassy/src/executor/raw.rs @@ -0,0 +1,154 @@ +use core::cell::Cell; +use core::cmp::min; +use core::ptr; +use core::ptr::NonNull; +use core::sync::atomic::{AtomicU32, Ordering}; +use core::task::Waker; + +use super::run_queue::{RunQueue, RunQueueItem}; +use super::timer_queue::{TimerQueue, TimerQueueItem}; +use super::util::UninitCell; +use super::waker; +use crate::time::{Alarm, Instant}; + +/// Task is spawned (has a future) +pub(crate) const STATE_SPAWNED: u32 = 1 << 0; +/// Task is in the executor run queue +pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; +/// Task is in the executor timer queue +pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; + +pub struct Task { + pub(crate) state: AtomicU32, + pub(crate) run_queue_item: RunQueueItem, + pub(crate) expires_at: Cell, + pub(crate) timer_queue_item: TimerQueueItem, + pub(crate) executor: Cell<*const Executor>, // Valid if state != 0 + pub(crate) poll_fn: UninitCell)>, // Valid if STATE_SPAWNED +} + +impl Task { + pub(crate) const fn new() -> Self { + Self { + state: AtomicU32::new(0), + expires_at: Cell::new(Instant::from_ticks(0)), + run_queue_item: RunQueueItem::new(), + timer_queue_item: TimerQueueItem::new(), + executor: Cell::new(ptr::null()), + poll_fn: UninitCell::uninit(), + } + } + + pub(crate) unsafe fn enqueue(&self) { + let mut current = self.state.load(Ordering::Acquire); + loop { + // If already scheduled, or if not started, + if (current & STATE_RUN_QUEUED != 0) || (current & STATE_SPAWNED == 0) { + return; + } + + // Mark it as scheduled + let new = current | STATE_RUN_QUEUED; + + match self.state.compare_exchange_weak( + current, + new, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(next_current) => current = next_current, + } + } + + // We have just marked the task as scheduled, so enqueue it. + let executor = &*self.executor.get(); + executor.enqueue(self as *const Task as *mut Task); + } +} + +pub(crate) struct Executor { + run_queue: RunQueue, + timer_queue: TimerQueue, + signal_fn: fn(*mut ()), + signal_ctx: *mut (), + alarm: Option<&'static dyn Alarm>, +} + +impl Executor { + pub(crate) const fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self { + Self { + run_queue: RunQueue::new(), + timer_queue: TimerQueue::new(), + signal_fn, + signal_ctx, + alarm: None, + } + } + + pub(crate) fn set_alarm(&mut self, alarm: &'static dyn Alarm) { + self.alarm = Some(alarm); + } + + unsafe fn enqueue(&self, item: *mut Task) { + if self.run_queue.enqueue(item) { + (self.signal_fn)(self.signal_ctx) + } + } + + pub(crate) unsafe fn spawn(&'static self, task: NonNull) { + let task = task.as_ref(); + task.executor.set(self); + self.enqueue(task as *const _ as _); + } + + pub(crate) unsafe fn run_queued(&self) { + if self.alarm.is_some() { + self.timer_queue.dequeue_expired(Instant::now(), |p| { + p.as_ref().enqueue(); + }); + } + + self.run_queue.dequeue_all(|p| { + let task = p.as_ref(); + task.expires_at.set(Instant::MAX); + + let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); + if state & STATE_SPAWNED == 0 { + // If task is not running, ignore it. This can happen in the following scenario: + // - Task gets dequeued, poll starts + // - While task is being polled, it gets woken. It gets placed in the queue. + // - Task poll finishes, returning done=true + // - RUNNING bit is cleared, but the task is already in the queue. + return; + } + + // Run the task + task.poll_fn.read()(p as _); + + // Enqueue or update into timer_queue + self.timer_queue.update(p); + }); + + // If this is in the past, set_alarm will immediately trigger the alarm, + // which will make the wfe immediately return so we do another loop iteration. + if let Some(alarm) = self.alarm { + let next_expiration = self.timer_queue.next_expiration(); + alarm.set_callback(self.signal_fn, self.signal_ctx); + alarm.set(next_expiration.as_ticks()); + } + } +} + +pub use super::waker::task_from_waker; + +pub unsafe fn wake_task(task: NonNull) { + task.as_ref().enqueue(); +} + +pub(crate) unsafe fn register_timer(at: Instant, waker: &Waker) { + let task = waker::task_from_waker(waker); + let task = task.as_ref(); + let expires_at = task.expires_at.get(); + task.expires_at.set(min(expires_at, at)); +} diff --git a/embassy/src/executor/run_queue.rs b/embassy/src/executor/run_queue.rs index 1cdecee3..397d7122 100644 --- a/embassy/src/executor/run_queue.rs +++ b/embassy/src/executor/run_queue.rs @@ -1,10 +1,11 @@ use core::ptr; +use core::ptr::NonNull; use core::sync::atomic::{AtomicPtr, Ordering}; -use super::TaskHeader; +use super::raw::Task; pub(crate) struct RunQueueItem { - next: AtomicPtr, + next: AtomicPtr, } impl RunQueueItem { @@ -27,7 +28,7 @@ impl RunQueueItem { /// current batch is completely processed, so even if a task enqueues itself instantly (for example /// by waking its own waker) can't prevent other tasks from running. pub(crate) struct RunQueue { - head: AtomicPtr, + head: AtomicPtr, } impl RunQueue { @@ -38,7 +39,7 @@ impl RunQueue { } /// Enqueues an item. Returns true if the queue was empty. - pub(crate) unsafe fn enqueue(&self, item: *mut TaskHeader) -> bool { + pub(crate) unsafe fn enqueue(&self, item: *mut Task) -> bool { let mut prev = self.head.load(Ordering::Acquire); loop { (*item).run_queue_item.next.store(prev, Ordering::Relaxed); @@ -54,7 +55,7 @@ impl RunQueue { prev.is_null() } - pub(crate) unsafe fn dequeue_all(&self, on_task: impl Fn(*mut TaskHeader)) { + pub(crate) unsafe fn dequeue_all(&self, on_task: impl Fn(NonNull)) { let mut task = self.head.swap(ptr::null_mut(), Ordering::AcqRel); while !task.is_null() { @@ -62,7 +63,7 @@ impl RunQueue { // Therefore, first read the next pointer, and only then process the task. let next = (*task).run_queue_item.next.load(Ordering::Relaxed); - on_task(task); + on_task(NonNull::new_unchecked(task)); task = next } diff --git a/embassy/src/executor/timer.rs b/embassy/src/executor/timer.rs index 56236a05..9bd98925 100644 --- a/embassy/src/executor/timer.rs +++ b/embassy/src/executor/timer.rs @@ -3,6 +3,7 @@ use core::pin::Pin; use core::task::{Context, Poll}; use futures::Stream; +use super::raw; use crate::time::{Duration, Instant}; pub struct Timer { @@ -34,7 +35,7 @@ impl Future for Timer { if self.yielded_once && self.expires_at <= Instant::now() { Poll::Ready(()) } else { - unsafe { super::register_timer(self.expires_at, cx.waker()) }; + unsafe { raw::register_timer(self.expires_at, cx.waker()) }; self.yielded_once = true; Poll::Pending } @@ -66,7 +67,7 @@ impl Stream for Ticker { self.expires_at += dur; Poll::Ready(Some(())) } else { - unsafe { super::register_timer(self.expires_at, cx.waker()) }; + unsafe { raw::register_timer(self.expires_at, cx.waker()) }; Poll::Pending } } diff --git a/embassy/src/executor/timer_queue.rs b/embassy/src/executor/timer_queue.rs index 428b6cf6..c722ae00 100644 --- a/embassy/src/executor/timer_queue.rs +++ b/embassy/src/executor/timer_queue.rs @@ -1,13 +1,14 @@ use core::cell::Cell; +use core::cmp::min; +use core::ptr; +use core::ptr::NonNull; use core::sync::atomic::{AtomicPtr, Ordering}; -use core::{cmp::min, ptr}; +use super::raw::{Task, STATE_TIMER_QUEUED}; use crate::time::Instant; -use super::{TaskHeader, STATE_TIMER_QUEUED}; - pub(crate) struct TimerQueueItem { - next: Cell<*mut TaskHeader>, + next: Cell<*mut Task>, } impl TimerQueueItem { @@ -19,7 +20,7 @@ impl TimerQueueItem { } pub(crate) struct TimerQueue { - head: Cell<*mut TaskHeader>, + head: Cell<*mut Task>, } impl TimerQueue { @@ -29,15 +30,15 @@ impl TimerQueue { } } - pub(crate) unsafe fn update(&self, p: *mut TaskHeader) { - let header = &*p; - if header.expires_at.get() != Instant::MAX { - let old_state = header.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel); + pub(crate) unsafe fn update(&self, p: NonNull) { + let task = p.as_ref(); + if task.expires_at.get() != Instant::MAX { + let old_state = task.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel); let is_new = old_state & STATE_TIMER_QUEUED == 0; if is_new { - header.timer_queue_item.next.set(self.head.get()); - self.head.set(p); + task.timer_queue_item.next.set(self.head.get()); + self.head.set(p.as_ptr()); } } } @@ -45,18 +46,18 @@ impl TimerQueue { pub(crate) unsafe fn next_expiration(&self) -> Instant { let mut res = Instant::MAX; self.retain(|p| { - let header = &*p; - let expires = header.expires_at.get(); + let task = p.as_ref(); + let expires = task.expires_at.get(); res = min(res, expires); expires != Instant::MAX }); res } - pub(crate) unsafe fn dequeue_expired(&self, now: Instant, on_task: impl Fn(*mut TaskHeader)) { + pub(crate) unsafe fn dequeue_expired(&self, now: Instant, on_task: impl Fn(NonNull)) { self.retain(|p| { - let header = &*p; - if header.expires_at.get() <= now { + let task = p.as_ref(); + if task.expires_at.get() <= now { on_task(p); false } else { @@ -65,20 +66,18 @@ impl TimerQueue { }); } - pub(crate) unsafe fn retain(&self, mut f: impl FnMut(*mut TaskHeader) -> bool) { + pub(crate) unsafe fn retain(&self, mut f: impl FnMut(NonNull) -> bool) { let mut prev = &self.head; while !prev.get().is_null() { - let p = prev.get(); - let header = &*p; + let p = NonNull::new_unchecked(prev.get()); + let task = &*p.as_ptr(); if f(p) { // Skip to next - prev = &header.timer_queue_item.next; + prev = &task.timer_queue_item.next; } else { // Remove it - prev.set(header.timer_queue_item.next.get()); - header - .state - .fetch_and(!STATE_TIMER_QUEUED, Ordering::AcqRel); + prev.set(task.timer_queue_item.next.get()); + task.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::AcqRel); } } } diff --git a/embassy/src/executor/waker.rs b/embassy/src/executor/waker.rs index 5a604d86..bc02c51d 100644 --- a/embassy/src/executor/waker.rs +++ b/embassy/src/executor/waker.rs @@ -1,7 +1,8 @@ use core::mem; +use core::ptr::NonNull; use core::task::{RawWaker, RawWakerVTable, Waker}; -use super::TaskHeader; +use super::raw::Task; const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop); @@ -10,26 +11,21 @@ unsafe fn clone(p: *const ()) -> RawWaker { } unsafe fn wake(p: *const ()) { - let header = &*task_from_ptr(p); - header.enqueue(); + (*(p as *mut Task)).enqueue() } unsafe fn drop(_: *const ()) { // nop } -pub(crate) unsafe fn from_task(p: *mut TaskHeader) -> Waker { - Waker::from_raw(RawWaker::new(p as _, &VTABLE)) +pub(crate) unsafe fn from_task(p: NonNull) -> Waker { + Waker::from_raw(RawWaker::new(p.as_ptr() as _, &VTABLE)) } -pub(crate) unsafe fn task_from_ptr(p: *const ()) -> *mut TaskHeader { - p as *mut TaskHeader -} - -pub(crate) unsafe fn task_from_waker(w: &Waker) -> *mut TaskHeader { - let w: &WakerHack = mem::transmute(w); - assert_eq!(w.vtable, &VTABLE); - task_from_ptr(w.data) +pub unsafe fn task_from_waker(waker: &Waker) -> NonNull { + let hack: &WakerHack = mem::transmute(waker); + assert_eq!(hack.vtable, &VTABLE); + NonNull::new_unchecked(hack.data as *mut Task) } struct WakerHack { diff --git a/embassy/src/interrupt.rs b/embassy/src/interrupt.rs index 7690bea0..45676b31 100644 --- a/embassy/src/interrupt.rs +++ b/embassy/src/interrupt.rs @@ -32,6 +32,7 @@ unsafe impl cortex_m::interrupt::Nr for NrWrap { pub unsafe trait OwnedInterrupt { type Priority: From + Into + Copy; fn number(&self) -> u8; + unsafe fn steal() -> Self; /// Implementation detail, do not use outside embassy crates. #[doc(hidden)] diff --git a/embassy/src/lib.rs b/embassy/src/lib.rs index 74c69b54..baa449db 100644 --- a/embassy/src/lib.rs +++ b/embassy/src/lib.rs @@ -2,7 +2,6 @@ #![feature(generic_associated_types)] #![feature(const_fn)] #![feature(const_fn_fn_ptr_basics)] -#![feature(const_in_array_repeat_expressions)] #![feature(const_option)] // This mod MUST go first, so that the others see its macros. diff --git a/embassy/src/time/traits.rs b/embassy/src/time/traits.rs index 7faa27cd..2c97b13a 100644 --- a/embassy/src/time/traits.rs +++ b/embassy/src/time/traits.rs @@ -16,7 +16,7 @@ impl Clock for &T { pub trait Alarm { /// Sets the callback function to be called when the alarm triggers. /// The callback may be called from any context (interrupt or thread mode). - fn set_callback(&self, callback: fn()); + fn set_callback(&self, callback: fn(*mut ()), ctx: *mut ()); /// Sets an alarm at the given timestamp. When the clock reaches that /// timestamp, the provided callback funcion will be called. @@ -32,8 +32,8 @@ pub trait Alarm { } impl Alarm for &T { - fn set_callback(&self, callback: fn()) { - T::set_callback(self, callback); + fn set_callback(&self, callback: fn(*mut ()), ctx: *mut ()) { + T::set_callback(self, callback, ctx); } fn set(&self, timestamp: u64) { T::set(self, timestamp); diff --git a/embassy/src/util/signal.rs b/embassy/src/util/signal.rs index 2c9c52f1..c8b7a61a 100644 --- a/embassy/src/util/signal.rs +++ b/embassy/src/util/signal.rs @@ -110,7 +110,7 @@ impl<'a, I: OwnedInterrupt> InterruptFuture<'a, I> { }; if ctx as *const _ != ptr::null() { - executor::raw::wake_task(ptr::NonNull::new_unchecked(ctx)); + executor::raw::wake_task(ptr::NonNull::new_unchecked(ctx as _)); } NVIC::mask(NrWrap(irq)); @@ -124,10 +124,8 @@ impl<'a, I: OwnedInterrupt> Future for InterruptFuture<'a, I> { fn poll(self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { let s = unsafe { self.get_unchecked_mut() }; - s.interrupt.set_handler( - Self::interrupt_handler, - executor::raw::task_from_waker(&cx.waker()).cast().as_ptr(), - ); + let ctx = unsafe { executor::raw::task_from_waker(&cx.waker()).cast().as_ptr() }; + s.interrupt.set_handler(Self::interrupt_handler, ctx); if s.interrupt.is_enabled() { Poll::Pending } else { diff --git a/test-build.sh b/test-build.sh index 04e1a95e..92945457 100755 --- a/test-build.sh +++ b/test-build.sh @@ -3,7 +3,7 @@ set -euxo pipefail # embassy std -(cd embassy; cargo build --features log,std) +#(cd embassy; cargo build --features log,std) # embassy embedded (cd embassy; cargo build --target thumbv7em-none-eabi)