From ba6e452cc5d6c33029f34d7cfb5cd5ea846979bd Mon Sep 17 00:00:00 2001 From: ivmarkov Date: Tue, 20 Sep 2022 20:23:56 +0300 Subject: [PATCH] Documentation and initial testing framework Add mock waker First simple test Tests & documentation --- embassy-time/Cargo.toml | 6 +- embassy-time/src/lib.rs | 2 +- embassy-time/src/queue.rs | 486 +++++++++++++++++++++++++++++++++++--- 3 files changed, 459 insertions(+), 35 deletions(-) diff --git a/embassy-time/Cargo.toml b/embassy-time/Cargo.toml index 0e3391d1..4fbf97f0 100644 --- a/embassy-time/Cargo.toml +++ b/embassy-time/Cargo.toml @@ -26,7 +26,8 @@ unstable-traits = ["embedded-hal-1"] # To use this you must have a time driver provided. defmt-timestamp-uptime = ["defmt"] -# TODO: Doc +# Create a global queue that can be used with any executor +# To use this you must have a time driver provided. generic-queue = [] # Set the `embassy_time` tick rate. @@ -124,3 +125,6 @@ heapless = "0.7" wasm-bindgen = { version = "0.2.81", optional = true } js-sys = { version = "0.3", optional = true } wasm-timer = { version = "0.2.5", optional = true } + +[dev-dependencies] +serial_test = "0.9" diff --git a/embassy-time/src/lib.rs b/embassy-time/src/lib.rs index 0457a657..50f437ba 100644 --- a/embassy-time/src/lib.rs +++ b/embassy-time/src/lib.rs @@ -1,4 +1,4 @@ -#![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)] +#![cfg_attr(not(any(feature = "std", feature = "wasm", test)), no_std)] #![cfg_attr(feature = "nightly", feature(type_alias_impl_trait))] #![doc = include_str!("../README.md")] #![allow(clippy::new_without_default)] diff --git a/embassy-time/src/queue.rs b/embassy-time/src/queue.rs index 7e84090b..56ad5af8 100644 --- a/embassy-time/src/queue.rs +++ b/embassy-time/src/queue.rs @@ -1,9 +1,53 @@ //! Generic timer queue implementation -use core::cell::RefCell; +//! +//! This module provides a timer queue that works with any executor. +//! +//! In terms of performance, this queue will likely be less efficient in comparison to executor-native queues, +//! like the one provided with e.g. the `embassy-executor` crate. +//! +//! # Enabling the queue +//! - Enable the Cargo feature `generic-queue`. This will automatically instantiate the queue. +//! +//! # Initializing the queue +//! - Call ```unsafe { embassy_time::queue::initialize(); }``` early on in your program, before any of your futures that utilize `embassy-time` are polled. +//! +//! # Customizing the queue +//! - It is possible to customize two aspects of the queue: +//! - Queue size: +//! By default, the queue can hold up to 128 timer schedules and their corresponding wakers. While it will not crash if more timer schedules are added, +//! the performance will degrade, as one of the already added wakers will be awoken, thus making room for the new timer schedule and its waker. +//! - The mutex (i.e. the [`RawMutex`](embassy_sync::blocking_mutex::raw::RawMutex) implementation) utilized by the queue: +//! By default, the utilized [`RawMutex`](embassy_sync::blocking_mutex::raw::RawMutex) implementation is [`CriticalSectionRawMutex`](embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex) +//! which is provided by the `critical-section` crate. This should work just fine, except in a few niche cases like running on +//! top of an RTOS which provides a [`Driver`](crate::driver::Driver) implementation that will call-back directly from an ISR. As the +//! `critical-section` implementation for RTOS-es will likely provide an RTOS mutex which cannot be locked from an ISR, user needs to instead +//! configure the queue with a "disable-all-interrupts" style of mutex. +//! - To customize any of these queue aspects, don't enable the `generic-queue` Cargo feature and instead instantiate the queue with the [`generic_queue`](crate::generic_queue) +//! macro, as per the example below. +//! +//! +//! # Example +//! +//! ```ignore +//! use embassy_time::queue::Queue; +//! +//! // You only need to invoke this macro in case you need to customize the queue. +//! // +//! // Otherwise, just depend on the `embassy-time` crate with feature `generic-queue` enabled, +//! // and the queue instantiation will be done for you behind the scenes. +//! embassy_time::generic_queue!(static QUEUE: Queue<200, MyCustomRawMutex> = Queue::new()); +//! +//! fn main() { +//! unsafe { +//! embassy_time::queue::initialize(); +//! } +//! } +//! ``` +use core::cell::{Cell, RefCell}; use core::cmp::Ordering; use core::task::Waker; -use atomic_polyfill::{AtomicBool, Ordering as AtomicOrdering}; +use atomic_polyfill::{AtomicU64, Ordering as AtomicOrdering}; use embassy_sync::blocking_mutex::raw::{CriticalSectionRawMutex, RawMutex}; use embassy_sync::blocking_mutex::Mutex; use heapless::sorted_linked_list::{LinkedIndexU8, Min, SortedLinkedList}; @@ -40,7 +84,6 @@ impl Ord for Timer { struct InnerQueue { queue: SortedLinkedList, alarm_at: Instant, - alarm: Option, } impl InnerQueue { @@ -48,11 +91,10 @@ impl InnerQueue { Self { queue: SortedLinkedList::new_u8(), alarm_at: Instant::MAX, - alarm: None, } } - fn schedule(&mut self, at: Instant, waker: &Waker) { + fn schedule(&mut self, at: Instant, waker: &Waker, alarm_schedule: &AtomicU64) { self.queue .find_mut(|timer| timer.waker.will_wake(waker)) .map(|mut timer| { @@ -81,90 +123,128 @@ impl InnerQueue { // dispatch all timers that are already due // // Then update the alarm if necessary - self.dispatch(); + self.dispatch(alarm_schedule); } - fn dispatch(&mut self) { + fn dispatch(&mut self, alarm_schedule: &AtomicU64) { let now = Instant::now(); while self.queue.peek().filter(|timer| timer.at <= now).is_some() { self.queue.pop().unwrap().waker.wake(); } - self.update_alarm(); + self.update_alarm(alarm_schedule); } - fn update_alarm(&mut self) { + fn update_alarm(&mut self, alarm_schedule: &AtomicU64) { if let Some(timer) = self.queue.peek() { let new_at = timer.at; if self.alarm_at != new_at { self.alarm_at = new_at; - set_alarm(self.alarm.unwrap(), new_at.as_ticks()); + alarm_schedule.store(new_at.as_ticks(), AtomicOrdering::SeqCst); } } else { self.alarm_at = Instant::MAX; + alarm_schedule.store(Instant::MAX.as_ticks(), AtomicOrdering::SeqCst); } } - fn handle_alarm(&mut self) { + fn handle_alarm(&mut self, alarm_schedule: &AtomicU64) { self.alarm_at = Instant::MAX; - self.dispatch(); + self.dispatch(alarm_schedule); } } -/// TODO: Doc +/// The generic queue implementation pub struct Queue { - initialized: AtomicBool, inner: Mutex>>, + alarm: Cell>, + alarm_schedule: AtomicU64, } impl Queue { - /// TODO: Doc + /// Create a Queue pub const fn new() -> Self { Self { - initialized: AtomicBool::new(false), inner: Mutex::new(RefCell::new(InnerQueue::::new())), + alarm: Cell::new(None), + alarm_schedule: AtomicU64::new(u64::MAX), } } - /// TODO: Doc + /// Initialize the queue + /// + /// This method is called from [`initialize`](crate::queue::initialize), so you are not expected to call it directly. + /// Call [`initialize`](crate::queue::initialize) instead. + /// + /// # Safety + /// It is UB call this function more than once, or to call it after any of your + /// futures that use `embassy-time` are polled already. pub unsafe fn initialize(&'static self) { - if self.initialized.load(AtomicOrdering::SeqCst) { - panic!("Queue already initialized"); + if self.alarm.get().is_some() { + panic!("Queue is already initialized"); } let handle = allocate_alarm().unwrap(); - self.inner.lock(|inner| inner.borrow_mut().alarm = Some(handle)); + self.alarm.set(Some(handle)); - set_alarm_callback(handle, Self::handle_alarm, self as *const _ as _); - - self.initialized.store(true, AtomicOrdering::SeqCst); + set_alarm_callback(handle, Self::handle_alarm_callback, self as *const _ as _); } - /// TODO: Doc + /// Schedule a new waker to be awoken at moment `at` + /// + /// This method is called internally by [`embassy-time`](crate), so you are not expected to call it directly. pub fn schedule(&'static self, at: Instant, waker: &Waker) { self.check_initialized(); - self.inner.lock(|inner| inner.borrow_mut().schedule(at, waker)); + self.inner + .lock(|inner| inner.borrow_mut().schedule(at, waker, &self.alarm_schedule)); + + self.update_alarm(); } fn check_initialized(&self) { - if !self.initialized.load(AtomicOrdering::SeqCst) { - panic!("Queue is not initialized"); + if self.alarm.get().is_none() { + panic!("Queue is not initialized yet"); } } - fn handle_alarm(ctx: *mut ()) { - let this = unsafe { (ctx as *const Self).as_ref().unwrap() }; + fn update_alarm(&self) { + // Need to set the alarm when we are *not* holding the mutex on the inner queue + // because mutexes are not re-entrant, which is a problem because `set_alarm` might immediately + // call us back if the timestamp is in the past. + let alarm_at = self.alarm_schedule.swap(u64::MAX, AtomicOrdering::SeqCst); - this.check_initialized(); - this.inner.lock(|inner| inner.borrow_mut().handle_alarm()); + if alarm_at < u64::MAX { + set_alarm(self.alarm.get().unwrap(), alarm_at); + } + } + + fn handle_alarm(&self) { + self.check_initialized(); + self.inner + .lock(|inner| inner.borrow_mut().handle_alarm(&self.alarm_schedule)); + + self.update_alarm(); + } + + fn handle_alarm_callback(ctx: *mut ()) { + unsafe { (ctx as *const Self).as_ref().unwrap() }.handle_alarm(); } } -/// TODO: Doc +unsafe impl Send for Queue {} +unsafe impl Sync for Queue {} + +/// Initialize the queue +/// +/// Call this function early on in your program, before any of your futures that utilize `embassy-time` are polled. +/// +/// # Safety +/// It is UB call this function more than once, or to call it after any of your +/// futures that use `embassy-time` are polled already. pub unsafe fn initialize() { extern "Rust" { fn _embassy_time_generic_queue_initialize(); @@ -173,7 +253,12 @@ pub unsafe fn initialize() { _embassy_time_generic_queue_initialize(); } -/// TODO: Doc +/// Instantiates a global, generic (as in executor-agnostic) timer queue. +/// +/// Unless you plan to customize the queue (size or mutex), prefer +/// instantiating the queue via the `generic-queue` feature. +/// +/// See the module documentation for an example. #[macro_export] macro_rules! generic_queue { (static $name:ident: $t: ty = $val:expr) => { @@ -195,3 +280,338 @@ macro_rules! generic_queue { #[cfg(feature = "generic-queue")] generic_queue!(static QUEUE: Queue = Queue::new()); + +#[cfg(test)] +mod tests { + use core::cell::Cell; + use core::sync::atomic::Ordering; + use core::task::{RawWaker, RawWakerVTable, Waker}; + use std::rc::Rc; + use std::sync::Mutex; + + use embassy_sync::blocking_mutex::raw::RawMutex; + use serial_test::serial; + + use super::InnerQueue; + use crate::driver::{AlarmHandle, Driver}; + use crate::Instant; + + struct InnerTestDriver { + now: u64, + alarm: u64, + callback: fn(*mut ()), + ctx: *mut (), + } + + impl InnerTestDriver { + const fn new() -> Self { + Self { + now: 0, + alarm: u64::MAX, + callback: Self::noop, + ctx: core::ptr::null_mut(), + } + } + + fn noop(_ctx: *mut ()) {} + } + + unsafe impl Send for InnerTestDriver {} + + struct TestDriver(Mutex); + + impl TestDriver { + const fn new() -> Self { + Self(Mutex::new(InnerTestDriver::new())) + } + + fn reset(&self) { + *self.0.lock().unwrap() = InnerTestDriver::new(); + } + + fn set_now(&self, now: u64) { + let notify = { + let mut inner = self.0.lock().unwrap(); + + if inner.now < now { + inner.now = now; + + if inner.alarm <= now { + inner.alarm = u64::MAX; + + Some((inner.callback, inner.ctx)) + } else { + None + } + } else { + panic!("Going back in time?"); + } + }; + + if let Some((callback, ctx)) = notify { + (callback)(ctx); + } + } + } + + impl Driver for TestDriver { + fn now(&self) -> u64 { + self.0.lock().unwrap().now + } + + unsafe fn allocate_alarm(&self) -> Option { + Some(AlarmHandle::new(0)) + } + + fn set_alarm_callback(&self, _alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) { + let mut inner = self.0.lock().unwrap(); + + inner.callback = callback; + inner.ctx = ctx; + } + + fn set_alarm(&self, _alarm: AlarmHandle, timestamp: u64) { + let notify = { + let mut inner = self.0.lock().unwrap(); + + if timestamp <= inner.now { + Some((inner.callback, inner.ctx)) + } else { + inner.alarm = timestamp; + None + } + }; + + if let Some((callback, ctx)) = notify { + (callback)(ctx); + } + } + } + + struct TestWaker { + pub awoken: Rc>, + pub waker: Waker, + } + + impl TestWaker { + fn new() -> Self { + let flag = Rc::new(Cell::new(false)); + + const VTABLE: RawWakerVTable = RawWakerVTable::new( + |data: *const ()| { + unsafe { + Rc::increment_strong_count(data as *const Cell); + } + + RawWaker::new(data as _, &VTABLE) + }, + |data: *const ()| unsafe { + let data = data as *const Cell; + data.as_ref().unwrap().set(true); + Rc::decrement_strong_count(data); + }, + |data: *const ()| unsafe { + (data as *const Cell).as_ref().unwrap().set(true); + }, + |data: *const ()| unsafe { + Rc::decrement_strong_count(data); + }, + ); + + let raw = RawWaker::new(Rc::into_raw(flag.clone()) as _, &VTABLE); + + Self { + awoken: flag.clone(), + waker: unsafe { Waker::from_raw(raw) }, + } + } + } + + // TODO: This impl should be part of `embassy-sync`, hidden behind the "std" feature gate + pub struct StdRawMutex(std::sync::Mutex<()>); + + unsafe impl RawMutex for StdRawMutex { + const INIT: Self = StdRawMutex(std::sync::Mutex::new(())); + + fn lock(&self, f: impl FnOnce() -> R) -> R { + let _guard = self.0.lock().unwrap(); + + f() + } + } + + const QUEUE_MAX_LEN: usize = 8; + + crate::time_driver_impl!(static DRIVER: TestDriver = TestDriver::new()); + crate::generic_queue!(static QUEUE: super::Queue<{ QUEUE_MAX_LEN }, StdRawMutex> = super::Queue::new()); + + fn setup() { + DRIVER.reset(); + + QUEUE.alarm.set(None); + QUEUE.alarm_schedule.store(u64::MAX, Ordering::SeqCst); + QUEUE.inner.lock(|inner| { + *inner.borrow_mut() = InnerQueue::new(); + }); + + unsafe { super::initialize() }; + } + + fn queue_len() -> usize { + QUEUE.inner.lock(|inner| inner.borrow().queue.iter().count()) + } + + #[test] + #[serial] + #[should_panic(expected = "Queue is not initialized yet")] + fn test_not_initialized() { + static QUEUE: super::Queue<{ QUEUE_MAX_LEN }, StdRawMutex> = super::Queue::new(); + + let waker = TestWaker::new(); + + QUEUE.schedule(Instant::from_secs(1), &waker.waker); + } + + #[test] + #[serial] + fn test_initialized() { + static QUEUE: super::Queue<{ QUEUE_MAX_LEN }, StdRawMutex> = super::Queue::new(); + + assert!(QUEUE.alarm.get().is_none()); + + unsafe { QUEUE.initialize() }; + + assert!(QUEUE.alarm.get().is_some()); + } + + #[test] + #[serial] + #[should_panic(expected = "Queue is already initialized")] + fn test_already_initialized() { + static QUEUE: super::Queue<{ QUEUE_MAX_LEN }, StdRawMutex> = super::Queue::new(); + + unsafe { QUEUE.initialize() }; + + assert!(QUEUE.alarm.get().is_some()); + + unsafe { QUEUE.initialize() }; + } + + #[test] + #[serial] + fn test_schedule() { + setup(); + + assert_eq!(queue_len(), 0); + + let waker = TestWaker::new(); + + QUEUE.schedule(Instant::from_secs(1), &waker.waker); + + assert!(!waker.awoken.get()); + assert_eq!(queue_len(), 1); + } + + #[test] + #[serial] + fn test_schedule_same() { + setup(); + + let waker = TestWaker::new(); + + QUEUE.schedule(Instant::from_secs(1), &waker.waker); + + assert_eq!(queue_len(), 1); + + QUEUE.schedule(Instant::from_secs(1), &waker.waker); + + assert_eq!(queue_len(), 1); + + QUEUE.schedule(Instant::from_secs(100), &waker.waker); + + assert_eq!(queue_len(), 1); + + let waker2 = TestWaker::new(); + + QUEUE.schedule(Instant::from_secs(100), &waker2.waker); + + assert_eq!(queue_len(), 2); + } + + #[test] + #[serial] + fn test_trigger() { + setup(); + + let waker = TestWaker::new(); + + QUEUE.schedule(Instant::from_secs(100), &waker.waker); + + assert!(!waker.awoken.get()); + + DRIVER.set_now(Instant::from_secs(99).as_ticks()); + + assert!(!waker.awoken.get()); + + assert_eq!(queue_len(), 1); + + DRIVER.set_now(Instant::from_secs(100).as_ticks()); + + assert!(waker.awoken.get()); + + assert_eq!(queue_len(), 0); + } + + #[test] + #[serial] + fn test_immediate_trigger() { + setup(); + + let waker = TestWaker::new(); + + QUEUE.schedule(Instant::from_secs(100), &waker.waker); + + DRIVER.set_now(Instant::from_secs(50).as_ticks()); + + let waker2 = TestWaker::new(); + + QUEUE.schedule(Instant::from_secs(40), &waker2.waker); + + assert!(!waker.awoken.get()); + assert!(waker2.awoken.get()); + assert_eq!(queue_len(), 1); + } + + #[test] + #[serial] + fn test_queue_overflow() { + setup(); + + for i in 1..QUEUE_MAX_LEN { + let waker = TestWaker::new(); + + QUEUE.schedule(Instant::from_secs(310), &waker.waker); + + assert_eq!(queue_len(), i); + assert!(!waker.awoken.get()); + } + + let first_waker = TestWaker::new(); + + QUEUE.schedule(Instant::from_secs(300), &first_waker.waker); + + assert_eq!(queue_len(), QUEUE_MAX_LEN); + assert!(!first_waker.awoken.get()); + + let second_waker = TestWaker::new(); + + QUEUE.schedule(Instant::from_secs(305), &second_waker.waker); + + assert_eq!(queue_len(), QUEUE_MAX_LEN); + assert!(first_waker.awoken.get()); + + QUEUE.schedule(Instant::from_secs(320), &TestWaker::new().waker); + assert_eq!(queue_len(), QUEUE_MAX_LEN); + assert!(second_waker.awoken.get()); + } +}