diff --git a/Cargo.toml b/Cargo.toml index abce83ec..17fbf0a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,8 +14,6 @@ exclude = [ panic-probe = { git = "https://github.com/knurling-rs/probe-run", branch="main" } defmt-rtt = { git = "https://github.com/knurling-rs/defmt", branch="cursed-symbol-names-linkers-must-repent-for-their-sins" } defmt = { git = "https://github.com/knurling-rs/defmt", branch="cursed-symbol-names-linkers-must-repent-for-their-sins" } -static-executor = { git = "https://github.com/Dirbaio/static-executor", branch="multi"} -futures-intrusive = { git = "https://github.com/Dirbaio/futures-intrusive", branch="master"} [profile.dev] codegen-units = 1 diff --git a/embassy-macros/Cargo.toml b/embassy-macros/Cargo.toml new file mode 100644 index 00000000..1f3e20aa --- /dev/null +++ b/embassy-macros/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "embassy-macros" +version = "0.1.0" +authors = ["Dario Nieuwenhuis "] +edition = "2018" + +[dependencies] +syn = { version = "1.0.39", features = ["full", "extra-traits"] } +quote = "1.0.7" +darling = "0.10.2" + +[lib] +proc-macro = true diff --git a/embassy-macros/src/lib.rs b/embassy-macros/src/lib.rs new file mode 100644 index 00000000..1745311b --- /dev/null +++ b/embassy-macros/src/lib.rs @@ -0,0 +1,114 @@ +#![feature(proc_macro_diagnostic)] + +extern crate proc_macro; + +use darling::FromMeta; +use proc_macro::{Diagnostic, Level, Span, TokenStream}; +use quote::{format_ident, quote}; +use syn::spanned::Spanned; + +#[derive(Debug, FromMeta)] +struct MacroArgs { + #[darling(default)] + pool_size: Option, +} + +#[proc_macro_attribute] +pub fn task(args: TokenStream, item: TokenStream) -> TokenStream { + let args = syn::parse_macro_input!(args as syn::AttributeArgs); + let mut task_fn = syn::parse_macro_input!(item as syn::ItemFn); + + let args = match MacroArgs::from_list(&args) { + Ok(v) => v, + Err(e) => { + return TokenStream::from(e.write_errors()); + } + }; + + let pool_size: usize = args.pool_size.unwrap_or(1); + + let mut fail = false; + if task_fn.sig.asyncness.is_none() { + task_fn + .sig + .span() + .unwrap() + .error("task functions must be async") + .emit(); + fail = true; + } + if task_fn.sig.generics.params.len() != 0 { + task_fn + .sig + .span() + .unwrap() + .error("task functions must not be generic") + .emit(); + fail = true; + } + if pool_size < 1 { + Span::call_site() + .error("pool_size must be 1 or greater") + .emit(); + fail = true + } + + let mut arg_names: syn::punctuated::Punctuated = + syn::punctuated::Punctuated::new(); + let args = &task_fn.sig.inputs; + + for arg in args.iter() { + match arg { + syn::FnArg::Receiver(_) => { + arg.span() + .unwrap() + .error("task functions must not have receiver arguments") + .emit(); + fail = true; + } + syn::FnArg::Typed(t) => match t.pat.as_ref() { + syn::Pat::Ident(i) => arg_names.push(i.ident.clone()), + _ => { + arg.span() + .unwrap() + .error("pattern matching in task arguments is not yet supporteds") + .emit(); + fail = true; + } + }, + } + } + + if fail { + return TokenStream::new(); + } + + let name = task_fn.sig.ident.clone(); + + let type_name = format_ident!("__embassy_executor_type_{}", name); + let pool_name = format_ident!("__embassy_executor_pool_{}", name); + let task_fn_name = format_ident!("__embassy_executor_task_{}", name); + let create_fn_name = format_ident!("__embassy_executor_create_{}", name); + + let visibility = &task_fn.vis; + + task_fn.sig.ident = task_fn_name.clone(); + + let result = quote! { + #task_fn + #[allow(non_camel_case_types)] + type #type_name = impl ::core::future::Future + 'static; + + fn #create_fn_name(#args) -> #type_name { + #task_fn_name(#arg_names) + } + + #[allow(non_upper_case_globals)] + static #pool_name: [::embassy::executor::Task<#type_name>; #pool_size] = [::embassy::executor::Task::new(); #pool_size]; + + #visibility fn #name(#args) -> ::embassy::executor::SpawnToken { + unsafe { ::embassy::executor::Task::spawn(&#pool_name, || #create_fn_name(#arg_names)) } + } + }; + result.into() +} diff --git a/embassy/Cargo.toml b/embassy/Cargo.toml index 7b8e6cfa..4abcda99 100644 --- a/embassy/Cargo.toml +++ b/embassy/Cargo.toml @@ -13,4 +13,4 @@ cortex-m = "0.6.3" futures = { version = "0.3.5", default-features = false, features = [ "async-await" ] } pin-project = { version = "0.4.23", default-features = false } futures-intrusive = { version = "0.3.1", default-features = false } -static-executor = { version = "0.1.0", features=["defmt"]} +embassy-macros = { version = "0.1.0", path = "../embassy-macros"} diff --git a/embassy/src/executor.rs b/embassy/src/executor.rs deleted file mode 100644 index d42a19b9..00000000 --- a/embassy/src/executor.rs +++ /dev/null @@ -1,48 +0,0 @@ -use core::marker::PhantomData; -use static_executor as se; - -use crate::time; -use crate::time::Alarm; - -pub use se::{task, SpawnError, SpawnToken}; - -pub struct Executor { - inner: se::Executor, - alarm: A, - timer: time::TimerService, -} - -impl Executor { - pub fn new(alarm: A, signal_fn: fn()) -> Self { - alarm.set_callback(signal_fn); - Self { - inner: se::Executor::new(signal_fn), - alarm, - timer: time::TimerService::new(time::IntrusiveClock), - } - } - - /// Spawn a future on this executor. - /// - /// safety: can only be called from the executor thread - pub unsafe fn spawn(&'static self, token: SpawnToken) -> Result<(), SpawnError> { - self.inner.spawn(token) - } - - /// Runs the executor until the queue is empty. - /// - /// safety: can only be called from the executor thread - pub unsafe fn run(&'static self) { - time::with_timer_service(&self.timer, || { - self.timer.check_expirations(); - self.inner.run(); - - match self.timer.next_expiration() { - // If this is in the past, set_alarm will immediately trigger the alarm, - // which will make the wfe immediately return so we do another loop iteration. - Some(at) => self.alarm.set(at), - None => self.alarm.clear(), - } - }) - } -} diff --git a/embassy/src/executor/executor.rs b/embassy/src/executor/executor.rs new file mode 100644 index 00000000..7ef9230e --- /dev/null +++ b/embassy/src/executor/executor.rs @@ -0,0 +1,305 @@ +#![no_std] +#![feature(const_fn)] + +use core::cell::Cell; +use core::cell::UnsafeCell; +use core::future::Future; +use core::mem; +use core::mem::MaybeUninit; +use core::pin::Pin; +use core::ptr; +use core::ptr::NonNull; +use core::sync::atomic::{AtomicPtr, AtomicU32, Ordering}; +use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; + +//============= +// UninitCell + +struct UninitCell(MaybeUninit>); +impl UninitCell { + const fn uninit() -> Self { + Self(MaybeUninit::uninit()) + } + + unsafe fn as_mut_ptr(&self) -> *mut T { + (*self.0.as_ptr()).get() + } + + unsafe fn as_mut(&self) -> &mut T { + &mut *self.as_mut_ptr() + } + + unsafe fn write(&self, val: T) { + ptr::write(self.as_mut_ptr(), val) + } + + unsafe fn drop_in_place(&self) { + ptr::drop_in_place(self.as_mut_ptr()) + } +} + +impl UninitCell { + unsafe fn read(&self) -> T { + ptr::read(self.as_mut_ptr()) + } +} + +//============= +// Data structures + +const STATE_RUNNING: u32 = 1 << 0; +const STATE_QUEUED: u32 = 1 << 1; + +struct Header { + state: AtomicU32, + next: AtomicPtr
, + executor: Cell<*const Executor>, + poll_fn: UninitCell, // Valid if STATE_RUNNING +} + +// repr(C) is needed to guarantee that header is located at offset 0 +// This makes it safe to cast between Header and Task pointers. +#[repr(C)] +pub struct Task { + header: Header, + future: UninitCell, // Valid if STATE_RUNNING +} + +#[derive(Copy, Clone, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum SpawnError { + Busy, +} + +//============= +// Atomic task queue using a very, very simple lock-free linked-list queue: +// +// To enqueue a task, task.next is set to the old head, and head is atomically set to task. +// +// Dequeuing is done in batches: the queue is emptied by atomically replacing head with +// null. Then the batch is iterated following the next pointers until null is reached. +// +// Note that batches will be iterated in the opposite order as they were enqueued. This should +// be OK for our use case. Hopefully it doesn't create executor fairness problems. + +struct Queue { + head: AtomicPtr
, +} + +impl Queue { + const fn new() -> Self { + Self { + head: AtomicPtr::new(ptr::null_mut()), + } + } + + /// Enqueues an item. Returns true if the queue was empty. + unsafe fn enqueue(&self, item: *mut Header) -> bool { + let mut prev = self.head.load(Ordering::Acquire); + loop { + (*item).next.store(prev, Ordering::Relaxed); + match self + .head + .compare_exchange_weak(prev, item, Ordering::AcqRel, Ordering::Acquire) + { + Ok(_) => break, + Err(next_prev) => prev = next_prev, + } + } + + prev.is_null() + } + + unsafe fn dequeue_all(&self, on_task: impl Fn(*mut Header)) { + loop { + let mut task = self.head.swap(ptr::null_mut(), Ordering::AcqRel); + + if task.is_null() { + // Queue is empty, we're done + return; + } + + while !task.is_null() { + on_task(task); + task = (*task).next.load(Ordering::Relaxed); + } + } + } +} + +//============= +// Waker + +static WAKER_VTABLE: RawWakerVTable = + RawWakerVTable::new(waker_clone, waker_wake, waker_wake, waker_drop); + +unsafe fn waker_clone(p: *const ()) -> RawWaker { + RawWaker::new(p, &WAKER_VTABLE) +} + +unsafe fn waker_wake(p: *const ()) { + let header = &*(p as *const Header); + + let mut current = header.state.load(Ordering::Acquire); + loop { + // If already scheduled, or if not started, + if (current & STATE_QUEUED != 0) || (current & STATE_RUNNING == 0) { + return; + } + + // Mark it as scheduled + let new = current | STATE_QUEUED; + + match header + .state + .compare_exchange_weak(current, new, Ordering::AcqRel, Ordering::Acquire) + { + Ok(_) => break, + Err(next_current) => current = next_current, + } + } + + // We have just marked the task as scheduled, so enqueue it. + let executor = &*header.executor.get(); + executor.enqueue(p as *mut Header); +} + +unsafe fn waker_drop(_: *const ()) { + // nop +} + +//============= +// Task + +impl Task { + pub const fn new() -> Self { + Self { + header: Header { + state: AtomicU32::new(0), + next: AtomicPtr::new(ptr::null_mut()), + executor: Cell::new(ptr::null()), + poll_fn: UninitCell::uninit(), + }, + future: UninitCell::uninit(), + } + } + + pub unsafe fn spawn(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken { + for task in pool { + let state = STATE_RUNNING | STATE_QUEUED; + if task + .header + .state + .compare_and_swap(0, state, Ordering::AcqRel) + == 0 + { + // Initialize the task + task.header.poll_fn.write(Self::poll); + task.future.write(future()); + + return SpawnToken { + header: Some(NonNull::new_unchecked(&task.header as *const Header as _)), + }; + } + } + + return SpawnToken { header: None }; + } + + unsafe fn poll(p: *mut Header) { + let this = &*(p as *const Task); + + let future = Pin::new_unchecked(this.future.as_mut()); + let waker = Waker::from_raw(RawWaker::new(p as _, &WAKER_VTABLE)); + let mut cx = Context::from_waker(&waker); + match future.poll(&mut cx) { + Poll::Ready(_) => { + this.future.drop_in_place(); + this.header + .state + .fetch_and(!STATE_RUNNING, Ordering::AcqRel); + } + Poll::Pending => {} + } + } +} + +unsafe impl Sync for Task {} + +//============= +// Spawn token + +#[must_use = "Calling a task function does nothing on its own. To spawn a task, pass the result to Executor::spawn()"] +pub struct SpawnToken { + header: Option>, +} + +impl Drop for SpawnToken { + fn drop(&mut self) { + // TODO maybe we can deallocate the task instead. + panic!("Please do not drop SpawnToken instances") + } +} + +//============= +// Executor + +pub struct Executor { + queue: Queue, + signal_fn: fn(), +} + +impl Executor { + pub const fn new(signal_fn: fn()) -> Self { + Self { + queue: Queue::new(), + signal_fn: signal_fn, + } + } + + unsafe fn enqueue(&self, item: *mut Header) { + if self.queue.enqueue(item) { + (self.signal_fn)() + } + } + + /// Spawn a future on this executor. + /// + /// safety: can only be called from the executor thread + pub unsafe fn spawn(&'static self, token: SpawnToken) -> Result<(), SpawnError> { + let header = token.header; + mem::forget(token); + + match header { + Some(header) => { + let header = header.as_ref(); + header.executor.set(self); + self.enqueue(header as *const _ as _); + Ok(()) + } + None => Err(SpawnError::Busy), + } + } + + /// Runs the executor until the queue is empty. + /// + /// safety: can only be called from the executor thread + pub unsafe fn run(&self) { + self.queue.dequeue_all(|p| { + let header = &*p; + + let state = header.state.fetch_and(!STATE_QUEUED, Ordering::AcqRel); + if state & STATE_RUNNING == 0 { + // If task is not running, ignore it. This can happen in the following scenario: + // - Task gets dequeued, poll starts + // - While task is being polled, it gets woken. It gets placed in the queue. + // - Task poll finishes, returning done=true + // - RUNNING bit is cleared, but the task is already in the queue. + return; + } + + // Run the task + header.poll_fn.read()(p as _); + }); + } +} diff --git a/embassy/src/executor/mod.rs b/embassy/src/executor/mod.rs new file mode 100644 index 00000000..1a68bdfd --- /dev/null +++ b/embassy/src/executor/mod.rs @@ -0,0 +1,9 @@ +mod executor; +mod timer_executor; + +// for time::Timer +pub(crate) use timer_executor::current_timer_queue; + +pub use embassy_macros::task; +pub use executor::{Executor, SpawnError, SpawnToken, Task}; +pub use timer_executor::TimerExecutor; diff --git a/embassy/src/executor/timer_executor.rs b/embassy/src/executor/timer_executor.rs new file mode 100644 index 00000000..21a81383 --- /dev/null +++ b/embassy/src/executor/timer_executor.rs @@ -0,0 +1,77 @@ +use super::executor::{Executor, SpawnError, SpawnToken}; +use core::ptr; +use core::sync::atomic::{AtomicPtr, Ordering}; +use futures_intrusive::timer as fi; + +use crate::time::Alarm; + +pub(crate) struct IntrusiveClock; + +impl fi::Clock for IntrusiveClock { + fn now(&self) -> u64 { + crate::time::now() + } +} + +pub(crate) type TimerQueue = fi::LocalTimerService; + +pub struct TimerExecutor { + inner: Executor, + alarm: A, + timer_queue: TimerQueue, +} + +impl TimerExecutor { + pub fn new(alarm: A, signal_fn: fn()) -> Self { + alarm.set_callback(signal_fn); + Self { + inner: Executor::new(signal_fn), + alarm, + timer_queue: TimerQueue::new(&IntrusiveClock), + } + } + + /// Spawn a future on this executor. + /// + /// safety: can only be called from the executor thread + pub unsafe fn spawn(&'static self, token: SpawnToken) -> Result<(), SpawnError> { + self.inner.spawn(token) + } + + /// Runs the executor until the queue is empty. + /// + /// safety: can only be called from the executor thread + pub unsafe fn run(&'static self) { + with_timer_queue(&self.timer_queue, || { + self.timer_queue.check_expirations(); + self.inner.run(); + + match self.timer_queue.next_expiration() { + // If this is in the past, set_alarm will immediately trigger the alarm, + // which will make the wfe immediately return so we do another loop iteration. + Some(at) => self.alarm.set(at), + None => self.alarm.clear(), + } + }) + } +} + +static CURRENT_TIMER_QUEUE: AtomicPtr = AtomicPtr::new(ptr::null_mut()); + +fn with_timer_queue(svc: &'static TimerQueue, f: impl FnOnce() -> R) -> R { + let svc = svc as *const _ as *mut _; + let prev_svc = CURRENT_TIMER_QUEUE.swap(svc, Ordering::Relaxed); + let r = f(); + let svc2 = CURRENT_TIMER_QUEUE.swap(prev_svc, Ordering::Relaxed); + assert_eq!(svc, svc2); + r +} + +pub(crate) fn current_timer_queue() -> &'static TimerQueue { + unsafe { + CURRENT_TIMER_QUEUE + .load(Ordering::Relaxed) + .as_ref() + .unwrap() + } +} diff --git a/embassy/src/time.rs b/embassy/src/time.rs deleted file mode 100644 index 532a53a3..00000000 --- a/embassy/src/time.rs +++ /dev/null @@ -1,331 +0,0 @@ -use core::cell::Cell; -use core::convert::TryInto; -use core::future::Future; -use core::ops::{Add, AddAssign, Div, DivAssign, Mul, MulAssign, Sub, SubAssign}; -use core::pin::Pin; -use core::ptr; -use core::sync::atomic::{AtomicPtr, Ordering}; -use core::task::{Context, Poll}; - -use crate::util::*; -use fi::LocalTimer; -use futures_intrusive::timer as fi; -static mut CLOCK: Option<&'static dyn Clock> = None; - -pub unsafe fn set_clock(clock: &'static dyn Clock) { - CLOCK = Some(clock); -} - -fn now() -> u64 { - unsafe { CLOCK.dexpect(defmt::intern!("No clock set")).now() } -} - -#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)] -pub struct Instant { - ticks: u64, -} - -// TODO allow customizing, probably via Cargo features `tick-hz-32768` or something. -pub const TICKS_PER_SECOND: u32 = 32768; - -impl Instant { - pub fn now() -> Instant { - Instant { ticks: now() } - } - - pub fn into_ticks(&self) -> u64 { - self.ticks - } - - pub fn duration_since(&self, earlier: Instant) -> Duration { - Duration { - ticks: (self.ticks - earlier.ticks).try_into().unwrap(), - } - } - - pub fn checked_duration_since(&self, earlier: Instant) -> Option { - if self.ticks < earlier.ticks { - None - } else { - Some(Duration { - ticks: (self.ticks - earlier.ticks).try_into().unwrap(), - }) - } - } - - pub fn saturating_duration_since(&self, earlier: Instant) -> Duration { - Duration { - ticks: if self.ticks < earlier.ticks { - 0 - } else { - (self.ticks - earlier.ticks).try_into().unwrap() - }, - } - } - - pub fn elapsed(&self) -> Duration { - Instant::now() - *self - } - - pub fn checked_add(&self, duration: Duration) -> Option { - self.ticks - .checked_add(duration.ticks.into()) - .map(|ticks| Instant { ticks }) - } - pub fn checked_sub(&self, duration: Duration) -> Option { - self.ticks - .checked_sub(duration.ticks.into()) - .map(|ticks| Instant { ticks }) - } -} - -impl Add for Instant { - type Output = Instant; - - fn add(self, other: Duration) -> Instant { - self.checked_add(other) - .expect("overflow when adding duration to instant") - } -} - -impl AddAssign for Instant { - fn add_assign(&mut self, other: Duration) { - *self = *self + other; - } -} - -impl Sub for Instant { - type Output = Instant; - - fn sub(self, other: Duration) -> Instant { - self.checked_sub(other) - .expect("overflow when subtracting duration from instant") - } -} - -impl SubAssign for Instant { - fn sub_assign(&mut self, other: Duration) { - *self = *self - other; - } -} - -impl Sub for Instant { - type Output = Duration; - - fn sub(self, other: Instant) -> Duration { - self.duration_since(other) - } -} - -#[derive(defmt::Format, Debug, Default, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)] -pub struct Duration { - ticks: u32, -} - -impl Duration { - pub fn into_ticks(&self) -> u32 { - self.ticks - } - - pub const fn from_ticks(ticks: u32) -> Duration { - Duration { ticks } - } - - pub const fn from_secs(secs: u32) -> Duration { - Duration { - ticks: secs * TICKS_PER_SECOND, - } - } - - pub const fn from_millis(millis: u32) -> Duration { - Duration { - ticks: millis * TICKS_PER_SECOND / 1000, - } - } - - pub fn checked_add(self, rhs: Duration) -> Option { - self.ticks - .checked_add(rhs.ticks) - .map(|ticks| Duration { ticks }) - } - - pub fn checked_sub(self, rhs: Duration) -> Option { - self.ticks - .checked_sub(rhs.ticks) - .map(|ticks| Duration { ticks }) - } - - pub fn checked_mul(self, rhs: u32) -> Option { - self.ticks.checked_mul(rhs).map(|ticks| Duration { ticks }) - } - - pub fn checked_div(self, rhs: u32) -> Option { - self.ticks.checked_div(rhs).map(|ticks| Duration { ticks }) - } -} - -impl Add for Duration { - type Output = Duration; - - fn add(self, rhs: Duration) -> Duration { - self.checked_add(rhs) - .expect("overflow when adding durations") - } -} - -impl AddAssign for Duration { - fn add_assign(&mut self, rhs: Duration) { - *self = *self + rhs; - } -} - -impl Sub for Duration { - type Output = Duration; - - fn sub(self, rhs: Duration) -> Duration { - self.checked_sub(rhs) - .expect("overflow when subtracting durations") - } -} - -impl SubAssign for Duration { - fn sub_assign(&mut self, rhs: Duration) { - *self = *self - rhs; - } -} - -impl Mul for Duration { - type Output = Duration; - - fn mul(self, rhs: u32) -> Duration { - self.checked_mul(rhs) - .expect("overflow when multiplying duration by scalar") - } -} - -impl Mul for u32 { - type Output = Duration; - - fn mul(self, rhs: Duration) -> Duration { - rhs * self - } -} - -impl MulAssign for Duration { - fn mul_assign(&mut self, rhs: u32) { - *self = *self * rhs; - } -} - -impl Div for Duration { - type Output = Duration; - - fn div(self, rhs: u32) -> Duration { - self.checked_div(rhs) - .expect("divide by zero error when dividing duration by scalar") - } -} - -impl DivAssign for Duration { - fn div_assign(&mut self, rhs: u32) { - *self = *self / rhs; - } -} - -pub(crate) struct IntrusiveClock; - -impl fi::Clock for IntrusiveClock { - fn now(&self) -> u64 { - now() - } -} - -pub(crate) type TimerService = fi::LocalTimerService; - -static CURRENT_TIMER_SERVICE: AtomicPtr = AtomicPtr::new(ptr::null_mut()); - -pub(crate) fn with_timer_service(svc: &'static TimerService, f: impl FnOnce() -> R) -> R { - let svc = svc as *const _ as *mut _; - let prev_svc = CURRENT_TIMER_SERVICE.swap(svc, Ordering::Relaxed); - let r = f(); - let svc2 = CURRENT_TIMER_SERVICE.swap(prev_svc, Ordering::Relaxed); - assert_eq!(svc, svc2); - r -} - -fn current_timer_service() -> &'static TimerService { - unsafe { - CURRENT_TIMER_SERVICE - .load(Ordering::Relaxed) - .as_ref() - .unwrap() - } -} - -pub struct Timer { - inner: fi::LocalTimerFuture<'static>, -} - -impl Timer { - pub fn at(when: Instant) -> Self { - let svc: &TimerService = current_timer_service(); - Self { - inner: svc.deadline(when.into_ticks()), - } - } - - pub fn after(dur: Duration) -> Self { - Self::at(Instant::now() + dur) - } -} - -impl Future for Timer { - type Output = (); - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().inner) }.poll(cx) - } -} -/// Monotonic clock -pub trait Clock { - /// Return the current timestamp in ticks. - /// This is guaranteed to be monotonic, i.e. a call to now() will always return - /// a greater or equal value than earler calls. - fn now(&self) -> u64; -} - -impl Clock for &T { - fn now(&self) -> u64 { - T::now(self) - } -} - -/// Trait to register a callback at a given timestamp. -pub trait Alarm { - /// Sets the callback function to be called when the alarm triggers. - /// The callback may be called from any context (interrupt or thread mode). - fn set_callback(&self, callback: fn()); - - /// Sets an alarm at the given timestamp. When the clock reaches that - /// timestamp, the provided callback funcion will be called. - /// - /// When callback is called, it is guaranteed that now() will return a value greater or equal than timestamp. - /// - /// Only one alarm can be active at a time. This overwrites any previously-set alarm if any. - fn set(&self, timestamp: u64); - - /// Clears the previously-set alarm. - /// If no alarm was set, this is a noop. - fn clear(&self); -} - -impl Alarm for &T { - fn set_callback(&self, callback: fn()) { - T::set_callback(self, callback); - } - fn set(&self, timestamp: u64) { - T::set(self, timestamp); - } - fn clear(&self) { - T::clear(self) - } -} diff --git a/embassy/src/time/duration.rs b/embassy/src/time/duration.rs new file mode 100644 index 00000000..1d5ad754 --- /dev/null +++ b/embassy/src/time/duration.rs @@ -0,0 +1,118 @@ +use core::ops::{Add, AddAssign, Div, DivAssign, Mul, MulAssign, Sub, SubAssign}; + +use super::TICKS_PER_SECOND; + +#[derive(defmt::Format, Debug, Default, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub struct Duration { + pub(crate) ticks: u32, +} + +impl Duration { + pub fn into_ticks(&self) -> u32 { + self.ticks + } + + pub const fn from_ticks(ticks: u32) -> Duration { + Duration { ticks } + } + + pub const fn from_secs(secs: u32) -> Duration { + Duration { + ticks: secs * TICKS_PER_SECOND, + } + } + + pub const fn from_millis(millis: u32) -> Duration { + Duration { + ticks: millis * TICKS_PER_SECOND / 1000, + } + } + + pub fn checked_add(self, rhs: Duration) -> Option { + self.ticks + .checked_add(rhs.ticks) + .map(|ticks| Duration { ticks }) + } + + pub fn checked_sub(self, rhs: Duration) -> Option { + self.ticks + .checked_sub(rhs.ticks) + .map(|ticks| Duration { ticks }) + } + + pub fn checked_mul(self, rhs: u32) -> Option { + self.ticks.checked_mul(rhs).map(|ticks| Duration { ticks }) + } + + pub fn checked_div(self, rhs: u32) -> Option { + self.ticks.checked_div(rhs).map(|ticks| Duration { ticks }) + } +} + +impl Add for Duration { + type Output = Duration; + + fn add(self, rhs: Duration) -> Duration { + self.checked_add(rhs) + .expect("overflow when adding durations") + } +} + +impl AddAssign for Duration { + fn add_assign(&mut self, rhs: Duration) { + *self = *self + rhs; + } +} + +impl Sub for Duration { + type Output = Duration; + + fn sub(self, rhs: Duration) -> Duration { + self.checked_sub(rhs) + .expect("overflow when subtracting durations") + } +} + +impl SubAssign for Duration { + fn sub_assign(&mut self, rhs: Duration) { + *self = *self - rhs; + } +} + +impl Mul for Duration { + type Output = Duration; + + fn mul(self, rhs: u32) -> Duration { + self.checked_mul(rhs) + .expect("overflow when multiplying duration by scalar") + } +} + +impl Mul for u32 { + type Output = Duration; + + fn mul(self, rhs: Duration) -> Duration { + rhs * self + } +} + +impl MulAssign for Duration { + fn mul_assign(&mut self, rhs: u32) { + *self = *self * rhs; + } +} + +impl Div for Duration { + type Output = Duration; + + fn div(self, rhs: u32) -> Duration { + self.checked_div(rhs) + .expect("divide by zero error when dividing duration by scalar") + } +} + +impl DivAssign for Duration { + fn div_assign(&mut self, rhs: u32) { + *self = *self / rhs; + } +} diff --git a/embassy/src/time/instant.rs b/embassy/src/time/instant.rs new file mode 100644 index 00000000..9c23d088 --- /dev/null +++ b/embassy/src/time/instant.rs @@ -0,0 +1,102 @@ +use core::convert::TryInto; +use core::ops::{Add, AddAssign, Div, DivAssign, Mul, MulAssign, Sub, SubAssign}; +use core::pin::Pin; +use core::ptr; +use core::sync::atomic::{AtomicPtr, Ordering}; +use core::task::{Context, Poll}; + +use super::{now, Duration}; + +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub struct Instant { + ticks: u64, +} + +impl Instant { + pub fn now() -> Instant { + Instant { ticks: now() } + } + + pub fn into_ticks(&self) -> u64 { + self.ticks + } + + pub fn duration_since(&self, earlier: Instant) -> Duration { + Duration { + ticks: (self.ticks - earlier.ticks).try_into().unwrap(), + } + } + + pub fn checked_duration_since(&self, earlier: Instant) -> Option { + if self.ticks < earlier.ticks { + None + } else { + Some(Duration { + ticks: (self.ticks - earlier.ticks).try_into().unwrap(), + }) + } + } + + pub fn saturating_duration_since(&self, earlier: Instant) -> Duration { + Duration { + ticks: if self.ticks < earlier.ticks { + 0 + } else { + (self.ticks - earlier.ticks).try_into().unwrap() + }, + } + } + + pub fn elapsed(&self) -> Duration { + Instant::now() - *self + } + + pub fn checked_add(&self, duration: Duration) -> Option { + self.ticks + .checked_add(duration.ticks.into()) + .map(|ticks| Instant { ticks }) + } + pub fn checked_sub(&self, duration: Duration) -> Option { + self.ticks + .checked_sub(duration.ticks.into()) + .map(|ticks| Instant { ticks }) + } +} + +impl Add for Instant { + type Output = Instant; + + fn add(self, other: Duration) -> Instant { + self.checked_add(other) + .expect("overflow when adding duration to instant") + } +} + +impl AddAssign for Instant { + fn add_assign(&mut self, other: Duration) { + *self = *self + other; + } +} + +impl Sub for Instant { + type Output = Instant; + + fn sub(self, other: Duration) -> Instant { + self.checked_sub(other) + .expect("overflow when subtracting duration from instant") + } +} + +impl SubAssign for Instant { + fn sub_assign(&mut self, other: Duration) { + *self = *self - other; + } +} + +impl Sub for Instant { + type Output = Duration; + + fn sub(self, other: Instant) -> Duration { + self.duration_since(other) + } +} diff --git a/embassy/src/time/mod.rs b/embassy/src/time/mod.rs new file mode 100644 index 00000000..b3ae10e7 --- /dev/null +++ b/embassy/src/time/mod.rs @@ -0,0 +1,24 @@ +mod duration; +mod instant; +mod timer; +mod traits; + +pub use duration::Duration; +pub use instant::Instant; +pub use timer::Timer; +pub use traits::*; + +use crate::util::Dewrap; + +// TODO allow customizing, probably via Cargo features `tick-hz-32768` or something. +pub const TICKS_PER_SECOND: u32 = 32768; + +static mut CLOCK: Option<&'static dyn Clock> = None; + +pub unsafe fn set_clock(clock: &'static dyn Clock) { + CLOCK = Some(clock); +} + +pub(crate) fn now() -> u64 { + unsafe { CLOCK.dexpect(defmt::intern!("No clock set")).now() } +} diff --git a/embassy/src/time/timer.rs b/embassy/src/time/timer.rs new file mode 100644 index 00000000..0315d9fb --- /dev/null +++ b/embassy/src/time/timer.rs @@ -0,0 +1,30 @@ +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; +use futures_intrusive::timer::{LocalTimer, LocalTimerFuture}; + +use super::{Duration, Instant}; +use crate::executor::current_timer_queue; + +pub struct Timer { + inner: LocalTimerFuture<'static>, +} + +impl Timer { + pub fn at(when: Instant) -> Self { + Self { + inner: current_timer_queue().deadline(when.into_ticks()), + } + } + + pub fn after(dur: Duration) -> Self { + Self::at(Instant::now() + dur) + } +} + +impl Future for Timer { + type Output = (); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().inner) }.poll(cx) + } +} diff --git a/embassy/src/time/traits.rs b/embassy/src/time/traits.rs new file mode 100644 index 00000000..7faa27cd --- /dev/null +++ b/embassy/src/time/traits.rs @@ -0,0 +1,44 @@ +/// Monotonic clock +pub trait Clock { + /// Return the current timestamp in ticks. + /// This is guaranteed to be monotonic, i.e. a call to now() will always return + /// a greater or equal value than earler calls. + fn now(&self) -> u64; +} + +impl Clock for &T { + fn now(&self) -> u64 { + T::now(self) + } +} + +/// Trait to register a callback at a given timestamp. +pub trait Alarm { + /// Sets the callback function to be called when the alarm triggers. + /// The callback may be called from any context (interrupt or thread mode). + fn set_callback(&self, callback: fn()); + + /// Sets an alarm at the given timestamp. When the clock reaches that + /// timestamp, the provided callback funcion will be called. + /// + /// When callback is called, it is guaranteed that now() will return a value greater or equal than timestamp. + /// + /// Only one alarm can be active at a time. This overwrites any previously-set alarm if any. + fn set(&self, timestamp: u64); + + /// Clears the previously-set alarm. + /// If no alarm was set, this is a noop. + fn clear(&self); +} + +impl Alarm for &T { + fn set_callback(&self, callback: fn()) { + T::set_callback(self, callback); + } + fn set(&self, timestamp: u64) { + T::set(self, timestamp); + } + fn clear(&self) { + T::clear(self) + } +} diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 7b49ffb1..7c44b07a 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -26,5 +26,5 @@ panic-probe = "0.1.0" nrf52840-hal = { version = "0.11.0" } embassy = { version = "0.1.0", path = "../embassy" } embassy-nrf = { version = "0.1.0", path = "../embassy-nrf", features = ["defmt-trace", "52840"] } -static-executor = { version = "0.1.0", features=["defmt"]} -futures = { version = "0.3.5", default-features = false } \ No newline at end of file +futures = { version = "0.3.5", default-features = false } +cortex-m-rtic = { git = "https://github.com/rtic-rs/cortex-m-rtic", branch = "master"} \ No newline at end of file diff --git a/examples/src/bin/gpiote.rs b/examples/src/bin/gpiote.rs index 5578062f..5a6ae933 100644 --- a/examples/src/bin/gpiote.rs +++ b/examples/src/bin/gpiote.rs @@ -13,7 +13,7 @@ use embassy_nrf::gpiote; use futures::pin_mut; use nrf52840_hal::gpio; -use static_executor::{task, Executor}; +use embassy::executor::{task, Executor}; static EXECUTOR: Executor = Executor::new(|| cortex_m::asm::sev()); #[task] diff --git a/examples/src/bin/qspi.rs b/examples/src/bin/qspi.rs index 4e6ee53e..c60a666e 100644 --- a/examples/src/bin/qspi.rs +++ b/examples/src/bin/qspi.rs @@ -11,7 +11,7 @@ use embassy::flash::Flash; use embassy_nrf::qspi; use nrf52840_hal::gpio; -use static_executor::{task, Executor}; +use embassy::executor::{task, Executor}; static EXECUTOR: Executor = Executor::new(|| cortex_m::asm::sev()); const PAGE_SIZE: usize = 4096; diff --git a/examples/src/bin/uart.rs b/examples/src/bin/uart.rs index 6b9df380..0eec2cd8 100644 --- a/examples/src/bin/uart.rs +++ b/examples/src/bin/uart.rs @@ -12,7 +12,7 @@ use embassy_nrf::uarte; use futures::pin_mut; use nrf52840_hal::gpio; -use static_executor::{task, Executor}; +use embassy::executor::{task, Executor}; static EXECUTOR: Executor = Executor::new(|| cortex_m::asm::sev()); #[task]