From 4333105341072bc5391809906c5c68fee1a31a8e Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Fri, 25 Sep 2020 03:25:06 +0200 Subject: [PATCH] Add Executor with timer queue, Timer, Instant, Duration, Alarm. --- .gitignore | 1 + Cargo.toml | 5 + embassy-nrf/src/rtc.rs | 37 +++-- embassy/Cargo.toml | 1 + embassy/src/clock.rs | 21 --- embassy/src/executor.rs | 72 +++++++++ embassy/src/lib.rs | 3 +- embassy/src/time.rs | 287 ++++++++++++++++++++++++++++++++++ examples/Cargo.toml | 3 +- examples/src/bin/rtc_async.rs | 65 +++----- examples/src/bin/rtc_raw.rs | 7 +- 11 files changed, 416 insertions(+), 86 deletions(-) delete mode 100644 embassy/src/clock.rs create mode 100644 embassy/src/executor.rs create mode 100644 embassy/src/time.rs diff --git a/.gitignore b/.gitignore index 96ef6c0b..25d18d5a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target Cargo.lock +third_party diff --git a/Cargo.toml b/Cargo.toml index 25e30eff..abce83ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,11 +6,16 @@ members = [ "examples", ] +exclude = [ + "third_party" +] + [patch.crates-io] panic-probe = { git = "https://github.com/knurling-rs/probe-run", branch="main" } defmt-rtt = { git = "https://github.com/knurling-rs/defmt", branch="cursed-symbol-names-linkers-must-repent-for-their-sins" } defmt = { git = "https://github.com/knurling-rs/defmt", branch="cursed-symbol-names-linkers-must-repent-for-their-sins" } static-executor = { git = "https://github.com/Dirbaio/static-executor", branch="multi"} +futures-intrusive = { git = "https://github.com/Dirbaio/futures-intrusive", branch="master"} [profile.dev] codegen-units = 1 diff --git a/embassy-nrf/src/rtc.rs b/embassy-nrf/src/rtc.rs index 55869c6e..ddb4fd36 100644 --- a/embassy-nrf/src/rtc.rs +++ b/embassy-nrf/src/rtc.rs @@ -1,8 +1,6 @@ use core::cell::Cell; use core::ops::Deref; use core::sync::atomic::{AtomicU32, Ordering}; -use defmt::trace; -use embassy::clock::Monotonic; use crate::interrupt; use crate::interrupt::Mutex; @@ -86,22 +84,25 @@ impl RTC { interrupt::enable(T::INTERRUPT); } + pub fn now(&self) -> u64 { + let counter = self.rtc.counter.read().bits(); + let period = self.period.load(Ordering::Relaxed); + calc_now(period, counter) + } + fn on_interrupt(&self) { if self.rtc.events_ovrflw.read().bits() == 1 { self.rtc.events_ovrflw.write(|w| w); - trace!("rtc overflow"); self.next_period(); } if self.rtc.events_compare[0].read().bits() == 1 { self.rtc.events_compare[0].write(|w| w); - trace!("rtc compare0"); self.next_period(); } if self.rtc.events_compare[1].read().bits() == 1 { self.rtc.events_compare[1].write(|w| w); - trace!("rtc compare1"); self.trigger_alarm(); } } @@ -162,26 +163,28 @@ impl RTC { } }) } + + pub fn alarm0(&'static self) -> Alarm { + Alarm { rtc: self } + } } -impl Monotonic for RTC { - fn now(&self) -> u64 { - let counter = self.rtc.counter.read().bits(); - let period = self.period.load(Ordering::Relaxed); - calc_now(period, counter) +pub struct Alarm { + rtc: &'static RTC, +} + +impl embassy::time::Alarm for Alarm { + fn set(&self, timestamp: u64, callback: fn()) { + self.rtc.do_set_alarm(timestamp, Some(callback)); } - fn set_alarm(&self, timestamp: u64, callback: fn()) { - self.do_set_alarm(timestamp, Some(callback)); - } - - fn clear_alarm(&self) { - self.do_set_alarm(u64::MAX, None); + fn clear(&self) { + self.rtc.do_set_alarm(u64::MAX, None); } } /// Implemented by all RTC instances. -pub trait Instance: Deref + Sized { +pub trait Instance: Deref + Sized + 'static { /// The interrupt associated with this RTC instance. const INTERRUPT: Interrupt; diff --git a/embassy/Cargo.toml b/embassy/Cargo.toml index 5826e8ba..60428f10 100644 --- a/embassy/Cargo.toml +++ b/embassy/Cargo.toml @@ -13,3 +13,4 @@ cortex-m = "0.6.3" futures = { version = "0.3.5", default-features = false, features = [ "async-await" ] } pin-project = { version = "0.4.23", default-features = false } futures-intrusive = { version = "0.3.1", default-features = false } +static-executor = { version = "0.1.0", features=["defmt"]} diff --git a/embassy/src/clock.rs b/embassy/src/clock.rs deleted file mode 100644 index ca5f26a0..00000000 --- a/embassy/src/clock.rs +++ /dev/null @@ -1,21 +0,0 @@ -/// Monotonic clock with support for setting an alarm. -/// -/// The clock uses a "tick" time unit, whose length is an implementation-dependent constant. -pub trait Monotonic { - /// Returns the current timestamp in ticks. - /// This is guaranteed to be monotonic, i.e. a call to now() will always return - /// a greater or equal value than earler calls. - fn now(&self) -> u64; - - /// Sets an alarm at the given timestamp. When the clock reaches that - /// timestamp, the provided callback funcion will be called. - /// - /// When callback is called, it is guaranteed that now() will return a value greater or equal than timestamp. - /// - /// Only one alarm can be active at a time. This overwrites any previously-set alarm if any. - fn set_alarm(&self, timestamp: u64, callback: fn()); - - /// Clears the previously-set alarm. - /// If no alarm was set, this is a noop. - fn clear_alarm(&self); -} diff --git a/embassy/src/executor.rs b/embassy/src/executor.rs new file mode 100644 index 00000000..c5d024f7 --- /dev/null +++ b/embassy/src/executor.rs @@ -0,0 +1,72 @@ +use core::marker::PhantomData; +use static_executor as se; + +use crate::time; +use crate::time::Alarm; + +pub use se::{task, SpawnError, SpawnToken}; + +pub trait Model { + fn signal(); +} + +pub struct WfeModel; + +impl Model for WfeModel { + fn signal() { + cortex_m::asm::sev() + } +} + +pub struct Executor { + inner: se::Executor, + alarm: A, + timer: time::TimerService, + _phantom: PhantomData, +} + +impl Executor { + pub fn new(alarm: A) -> Self { + Self { + inner: se::Executor::new(M::signal), + alarm, + timer: time::TimerService::new(time::IntrusiveClock), + _phantom: PhantomData, + } + } + + /// Spawn a future on this executor. + /// + /// safety: can only be called from the executor thread + pub unsafe fn spawn(&'static self, token: SpawnToken) -> Result<(), SpawnError> { + self.inner.spawn(token) + } + + /// Runs the executor until the queue is empty. + /// + /// safety: can only be called from the executor thread + pub unsafe fn run_once(&'static self) { + time::with_timer_service(&self.timer, || { + self.timer.check_expirations(); + self.inner.run(); + + match self.timer.next_expiration() { + // If this is in the past, set_alarm will immediately trigger the alarm, + // which will make the wfe immediately return so we do another loop iteration. + Some(at) => self.alarm.set(at, M::signal), + None => self.alarm.clear(), + } + }) + } +} + +impl Executor { + /// Runs the executor forever + /// safety: can only be called from the executor thread + pub unsafe fn run(&'static self) -> ! { + loop { + self.run_once(); + cortex_m::asm::wfe() + } + } +} diff --git a/embassy/src/lib.rs b/embassy/src/lib.rs index 89eef3f4..92ad7742 100644 --- a/embassy/src/lib.rs +++ b/embassy/src/lib.rs @@ -3,7 +3,8 @@ #![feature(generic_associated_types)] #![feature(const_fn)] -pub mod clock; +pub mod executor; pub mod flash; pub mod io; +pub mod time; pub mod util; diff --git a/embassy/src/time.rs b/embassy/src/time.rs new file mode 100644 index 00000000..30fd1b68 --- /dev/null +++ b/embassy/src/time.rs @@ -0,0 +1,287 @@ +use core::cell::Cell; +use core::convert::TryInto; +use core::future::Future; +use core::ops::{Add, AddAssign, Div, DivAssign, Mul, MulAssign, Sub, SubAssign}; +use core::pin::Pin; +use core::ptr; +use core::sync::atomic::{AtomicPtr, Ordering}; +use core::task::{Context, Poll}; + +use fi::LocalTimer; +use futures_intrusive::timer as fi; + +static mut CLOCK: fn() -> u64 = clock_not_set; + +fn clock_not_set() -> u64 { + panic!("No clock set. You must call embassy::time::set_clock() before trying to use the clock") +} + +pub unsafe fn set_clock(clock: fn() -> u64) { + CLOCK = clock; +} + +fn now() -> u64 { + unsafe { CLOCK() } +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub struct Instant { + ticks: u64, +} + +impl Instant { + pub fn now() -> Instant { + Instant { ticks: now() } + } + + pub fn into_ticks(&self) -> u64 { + self.ticks + } + + pub fn duration_since(&self, earlier: Instant) -> Duration { + Duration { + ticks: (self.ticks - earlier.ticks).try_into().unwrap(), + } + } + + pub fn checked_duration_since(&self, earlier: Instant) -> Option { + if self.ticks < earlier.ticks { + None + } else { + Some(Duration { + ticks: (self.ticks - earlier.ticks).try_into().unwrap(), + }) + } + } + + pub fn saturating_duration_since(&self, earlier: Instant) -> Duration { + Duration { + ticks: if self.ticks < earlier.ticks { + 0 + } else { + (self.ticks - earlier.ticks).try_into().unwrap() + }, + } + } + + pub fn elapsed(&self) -> Duration { + Instant::now() - *self + } + + pub fn checked_add(&self, duration: Duration) -> Option { + self.ticks + .checked_add(duration.ticks.into()) + .map(|ticks| Instant { ticks }) + } + pub fn checked_sub(&self, duration: Duration) -> Option { + self.ticks + .checked_sub(duration.ticks.into()) + .map(|ticks| Instant { ticks }) + } +} + +impl Add for Instant { + type Output = Instant; + + fn add(self, other: Duration) -> Instant { + self.checked_add(other) + .expect("overflow when adding duration to instant") + } +} + +impl AddAssign for Instant { + fn add_assign(&mut self, other: Duration) { + *self = *self + other; + } +} + +impl Sub for Instant { + type Output = Instant; + + fn sub(self, other: Duration) -> Instant { + self.checked_sub(other) + .expect("overflow when subtracting duration from instant") + } +} + +impl SubAssign for Instant { + fn sub_assign(&mut self, other: Duration) { + *self = *self - other; + } +} + +impl Sub for Instant { + type Output = Duration; + + fn sub(self, other: Instant) -> Duration { + self.duration_since(other) + } +} + +#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub struct Duration { + ticks: u32, +} + +impl Duration { + pub const fn from_ticks(ticks: u32) -> Duration { + Duration { ticks } + } + + pub fn checked_add(self, rhs: Duration) -> Option { + self.ticks + .checked_add(rhs.ticks) + .map(|ticks| Duration { ticks }) + } + + pub fn checked_sub(self, rhs: Duration) -> Option { + self.ticks + .checked_sub(rhs.ticks) + .map(|ticks| Duration { ticks }) + } + + pub fn checked_mul(self, rhs: u32) -> Option { + self.ticks.checked_mul(rhs).map(|ticks| Duration { ticks }) + } + + pub fn checked_div(self, rhs: u32) -> Option { + self.ticks.checked_div(rhs).map(|ticks| Duration { ticks }) + } +} + +impl Add for Duration { + type Output = Duration; + + fn add(self, rhs: Duration) -> Duration { + self.checked_add(rhs) + .expect("overflow when adding durations") + } +} + +impl AddAssign for Duration { + fn add_assign(&mut self, rhs: Duration) { + *self = *self + rhs; + } +} + +impl Sub for Duration { + type Output = Duration; + + fn sub(self, rhs: Duration) -> Duration { + self.checked_sub(rhs) + .expect("overflow when subtracting durations") + } +} + +impl SubAssign for Duration { + fn sub_assign(&mut self, rhs: Duration) { + *self = *self - rhs; + } +} + +impl Mul for Duration { + type Output = Duration; + + fn mul(self, rhs: u32) -> Duration { + self.checked_mul(rhs) + .expect("overflow when multiplying duration by scalar") + } +} + +impl Mul for u32 { + type Output = Duration; + + fn mul(self, rhs: Duration) -> Duration { + rhs * self + } +} + +impl MulAssign for Duration { + fn mul_assign(&mut self, rhs: u32) { + *self = *self * rhs; + } +} + +impl Div for Duration { + type Output = Duration; + + fn div(self, rhs: u32) -> Duration { + self.checked_div(rhs) + .expect("divide by zero error when dividing duration by scalar") + } +} + +impl DivAssign for Duration { + fn div_assign(&mut self, rhs: u32) { + *self = *self / rhs; + } +} + +pub(crate) struct IntrusiveClock; + +impl fi::Clock for IntrusiveClock { + fn now(&self) -> u64 { + now() + } +} + +pub(crate) type TimerService = fi::LocalTimerService; + +static CURRENT_TIMER_SERVICE: AtomicPtr = AtomicPtr::new(ptr::null_mut()); + +pub(crate) fn with_timer_service(svc: &'static TimerService, f: impl FnOnce() -> R) -> R { + let svc = svc as *const _ as *mut _; + let prev_svc = CURRENT_TIMER_SERVICE.swap(svc, Ordering::Relaxed); + let r = f(); + let svc2 = CURRENT_TIMER_SERVICE.swap(prev_svc, Ordering::Relaxed); + assert_eq!(svc, svc2); + r +} + +fn current_timer_service() -> &'static TimerService { + unsafe { + CURRENT_TIMER_SERVICE + .load(Ordering::Relaxed) + .as_ref() + .unwrap() + } +} + +pub struct Timer { + inner: fi::LocalTimerFuture<'static>, +} + +impl Timer { + pub fn at(when: Instant) -> Self { + let svc: &TimerService = current_timer_service(); + Self { + inner: svc.deadline(when.into_ticks()), + } + } + + pub fn after(dur: Duration) -> Self { + Self::at(Instant::now() + dur) + } +} + +impl Future for Timer { + type Output = (); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().inner) }.poll(cx) + } +} + +/// Trait to register a callback at a given timestamp. +pub trait Alarm { + /// Sets an alarm at the given timestamp. When the clock reaches that + /// timestamp, the provided callback funcion will be called. + /// + /// When callback is called, it is guaranteed that now() will return a value greater or equal than timestamp. + /// + /// Only one alarm can be active at a time. This overwrites any previously-set alarm if any. + fn set(&self, timestamp: u64, callback: fn()); + + /// Clears the previously-set alarm. + /// If no alarm was set, this is a noop. + fn clear(&self); +} diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 4e81d1da..7b49ffb1 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -27,5 +27,4 @@ nrf52840-hal = { version = "0.11.0" } embassy = { version = "0.1.0", path = "../embassy" } embassy-nrf = { version = "0.1.0", path = "../embassy-nrf", features = ["defmt-trace", "52840"] } static-executor = { version = "0.1.0", features=["defmt"]} -futures = { version = "0.3.5", default-features = false } -futures-intrusive = { version = "0.3.1", default-features = false } \ No newline at end of file +futures = { version = "0.3.5", default-features = false } \ No newline at end of file diff --git a/examples/src/bin/rtc_async.rs b/examples/src/bin/rtc_async.rs index 977621d4..3bffabf0 100644 --- a/examples/src/bin/rtc_async.rs +++ b/examples/src/bin/rtc_async.rs @@ -8,39 +8,30 @@ use example_common::*; use core::mem::MaybeUninit; use cortex_m_rt::entry; -use embassy::clock::Monotonic; +use embassy::executor::{task, Executor, WfeModel}; +use embassy::time::{Duration, Instant, Timer}; +use embassy_nrf::pac; use embassy_nrf::rtc; -use futures_intrusive::timer::{Clock, LocalTimer, LocalTimerService}; use nrf52840_hal::clocks; -use static_executor::{task, Executor}; -struct RtcClock(rtc::RTC); - -impl Clock for RtcClock { - fn now(&self) -> u64 { - self.0.now() +#[task] +async fn run1() { + loop { + info!("BIG INFREQUENT TICK"); + Timer::after(Duration::from_ticks(64000)).await; } } #[task] -async fn run1(rtc: &'static rtc::RTC, timer: &'static LocalTimerService) { +async fn run2() { loop { - info!("tick 1"); - timer.deadline(rtc.now() + 64000).await; + info!("tick"); + Timer::after(Duration::from_ticks(13000)).await; } } -#[task] -async fn run2(rtc: &'static rtc::RTC, timer: &'static LocalTimerService) { - loop { - info!("tick 2"); - timer.deadline(rtc.now() + 23000).await; - } -} - -static EXECUTOR: Executor = Executor::new(cortex_m::asm::sev); -static mut RTC: MaybeUninit> = MaybeUninit::uninit(); -static mut TIMER: MaybeUninit = MaybeUninit::uninit(); +static mut RTC: MaybeUninit> = MaybeUninit::uninit(); +static mut EXECUTOR: MaybeUninit>> = MaybeUninit::uninit(); #[entry] fn main() -> ! { @@ -55,35 +46,23 @@ fn main() -> ! { let rtc: &'static _ = unsafe { let ptr = RTC.as_mut_ptr(); - ptr.write(RtcClock(rtc::RTC::new(p.RTC1))); + ptr.write(rtc::RTC::new(p.RTC1)); &*ptr }; - rtc.0.start(); + rtc.start(); + unsafe { embassy::time::set_clock(|| RTC.as_ptr().as_ref().unwrap().now()) }; - let timer: &'static _ = unsafe { - let ptr = TIMER.as_mut_ptr(); - ptr.write(LocalTimerService::new(rtc)); + let executor: &'static _ = unsafe { + let ptr = EXECUTOR.as_mut_ptr(); + ptr.write(Executor::new(rtc.alarm0())); &*ptr }; unsafe { - EXECUTOR.spawn(run1(&rtc.0, timer)).dewrap(); - EXECUTOR.spawn(run2(&rtc.0, timer)).dewrap(); + executor.spawn(run1()).dewrap(); + executor.spawn(run2()).dewrap(); - loop { - timer.check_expirations(); - - EXECUTOR.run(); - - match timer.next_expiration() { - // If this is in the past, set_alarm will immediately trigger the alarm, - // which will make the wfe immediately return so we do another loop iteration. - Some(at) => rtc.0.set_alarm(at, cortex_m::asm::sev), - None => rtc.0.clear_alarm(), - } - - cortex_m::asm::wfe(); - } + executor.run() } } diff --git a/examples/src/bin/rtc_raw.rs b/examples/src/bin/rtc_raw.rs index aa98409c..2e4b2865 100644 --- a/examples/src/bin/rtc_raw.rs +++ b/examples/src/bin/rtc_raw.rs @@ -8,7 +8,7 @@ use example_common::*; use core::mem::MaybeUninit; use cortex_m_rt::entry; -use embassy::clock::Monotonic; +use embassy::time::Alarm; use embassy_nrf::rtc; use nrf52840_hal::clocks; @@ -31,8 +31,11 @@ fn main() -> ! { &*ptr }; + let alarm = rtc.alarm0(); + rtc.start(); - rtc.set_alarm(53719, || info!("ALARM TRIGGERED")); + + alarm.set(53719, || info!("ALARM TRIGGERED")); info!("initialized!");