Address feedback after code review
This commit is contained in:
parent
ba6e452cc5
commit
53608a87ac
@ -26,10 +26,22 @@ unstable-traits = ["embedded-hal-1"]
|
|||||||
# To use this you must have a time driver provided.
|
# To use this you must have a time driver provided.
|
||||||
defmt-timestamp-uptime = ["defmt"]
|
defmt-timestamp-uptime = ["defmt"]
|
||||||
|
|
||||||
# Create a global queue that can be used with any executor
|
# Create a global, generic queue that can be used with any executor
|
||||||
# To use this you must have a time driver provided.
|
# To use this you must have a time driver provided.
|
||||||
generic-queue = []
|
generic-queue = []
|
||||||
|
|
||||||
|
# Set the number of timers for the generic queue.
|
||||||
|
#
|
||||||
|
# At most 1 `generic-queue-*` feature can be enabled. If none is enabled, a default of 64 timers is used.
|
||||||
|
#
|
||||||
|
# When using embassy-time from libraries, you should *not* enable any `generic-queue-*` feature, to allow the
|
||||||
|
# end user to pick.
|
||||||
|
generic-queue-8 = ["generic-queue"]
|
||||||
|
generic-queue-16 = ["generic-queue"]
|
||||||
|
generic-queue-32 = ["generic-queue"]
|
||||||
|
generic-queue-64 = ["generic-queue"]
|
||||||
|
generic-queue-128 = ["generic-queue"]
|
||||||
|
|
||||||
# Set the `embassy_time` tick rate.
|
# Set the `embassy_time` tick rate.
|
||||||
#
|
#
|
||||||
# At most 1 `tick-*` feature can be enabled. If none is enabled, a default of 1MHz is used.
|
# At most 1 `tick-*` feature can be enabled. If none is enabled, a default of 1MHz is used.
|
||||||
@ -128,3 +140,5 @@ wasm-timer = { version = "0.2.5", optional = true }
|
|||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
serial_test = "0.9"
|
serial_test = "0.9"
|
||||||
|
critical-section = { version = "1.1", features = ["std"] }
|
||||||
|
|
||||||
|
@ -19,6 +19,8 @@ mod timer;
|
|||||||
mod driver_std;
|
mod driver_std;
|
||||||
#[cfg(feature = "wasm")]
|
#[cfg(feature = "wasm")]
|
||||||
mod driver_wasm;
|
mod driver_wasm;
|
||||||
|
#[cfg(feature = "generic-queue")]
|
||||||
|
mod queue_generic;
|
||||||
|
|
||||||
pub use delay::{block_for, Delay};
|
pub use delay::{block_for, Delay};
|
||||||
pub use duration::Duration;
|
pub use duration::Duration;
|
||||||
|
@ -1,617 +1,58 @@
|
|||||||
//! Generic timer queue implementation
|
//! Timer queue implementation
|
||||||
//!
|
//!
|
||||||
//! This module provides a timer queue that works with any executor.
|
//! This module defines the interface a timer queue needs to implement to power the `embassy_time` module.
|
||||||
//!
|
//!
|
||||||
//! In terms of performance, this queue will likely be less efficient in comparison to executor-native queues,
|
//! # Implementing a timer queue
|
||||||
//! like the one provided with e.g. the `embassy-executor` crate.
|
|
||||||
//!
|
//!
|
||||||
//! # Enabling the queue
|
//! - Define a struct `MyTimerQueue`
|
||||||
//! - Enable the Cargo feature `generic-queue`. This will automatically instantiate the queue.
|
//! - Implement [`TimerQueue`] for it
|
||||||
|
//! - Register it as the global timer queue with [`timer_queue_impl`](crate::timer_queue_impl).
|
||||||
//!
|
//!
|
||||||
//! # Initializing the queue
|
//! # Linkage details
|
||||||
//! - Call ```unsafe { embassy_time::queue::initialize(); }``` early on in your program, before any of your futures that utilize `embassy-time` are polled.
|
|
||||||
//!
|
//!
|
||||||
//! # Customizing the queue
|
//! Check the documentation of the [`driver`](crate::driver) module for more information.
|
||||||
//! - It is possible to customize two aspects of the queue:
|
|
||||||
//! - Queue size:
|
|
||||||
//! By default, the queue can hold up to 128 timer schedules and their corresponding wakers. While it will not crash if more timer schedules are added,
|
|
||||||
//! the performance will degrade, as one of the already added wakers will be awoken, thus making room for the new timer schedule and its waker.
|
|
||||||
//! - The mutex (i.e. the [`RawMutex`](embassy_sync::blocking_mutex::raw::RawMutex) implementation) utilized by the queue:
|
|
||||||
//! By default, the utilized [`RawMutex`](embassy_sync::blocking_mutex::raw::RawMutex) implementation is [`CriticalSectionRawMutex`](embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex)
|
|
||||||
//! which is provided by the `critical-section` crate. This should work just fine, except in a few niche cases like running on
|
|
||||||
//! top of an RTOS which provides a [`Driver`](crate::driver::Driver) implementation that will call-back directly from an ISR. As the
|
|
||||||
//! `critical-section` implementation for RTOS-es will likely provide an RTOS mutex which cannot be locked from an ISR, user needs to instead
|
|
||||||
//! configure the queue with a "disable-all-interrupts" style of mutex.
|
|
||||||
//! - To customize any of these queue aspects, don't enable the `generic-queue` Cargo feature and instead instantiate the queue with the [`generic_queue`](crate::generic_queue)
|
|
||||||
//! macro, as per the example below.
|
|
||||||
//!
|
//!
|
||||||
|
//! Similarly to driver, if there is none or multiple timer queues in the crate tree, linking will fail.
|
||||||
//!
|
//!
|
||||||
//! # Example
|
//! # Example
|
||||||
//!
|
//!
|
||||||
//! ```ignore
|
//! ```
|
||||||
//! use embassy_time::queue::Queue;
|
//! use core::task::Waker;
|
||||||
//!
|
//!
|
||||||
//! // You only need to invoke this macro in case you need to customize the queue.
|
//! use embassy_time::Instant;
|
||||||
//! //
|
//! use embassy_time::queue::{TimerQueue};
|
||||||
//! // Otherwise, just depend on the `embassy-time` crate with feature `generic-queue` enabled,
|
|
||||||
//! // and the queue instantiation will be done for you behind the scenes.
|
|
||||||
//! embassy_time::generic_queue!(static QUEUE: Queue<200, MyCustomRawMutex> = Queue::new());
|
|
||||||
//!
|
//!
|
||||||
//! fn main() {
|
//! struct MyTimerQueue{}; // not public!
|
||||||
//! unsafe {
|
//! embassy_time::timer_queue_impl!(static QUEUE: MyTimerQueue = MyTimerQueue{});
|
||||||
//! embassy_time::queue::initialize();
|
//!
|
||||||
|
//! impl TimerQueue for MyTimerQueue {
|
||||||
|
//! fn schedule_wake(&'static self, at: Instant, waker: &Waker) {
|
||||||
|
//! todo!()
|
||||||
//! }
|
//! }
|
||||||
//! }
|
//! }
|
||||||
//! ```
|
//! ```
|
||||||
use core::cell::{Cell, RefCell};
|
|
||||||
use core::cmp::Ordering;
|
|
||||||
use core::task::Waker;
|
use core::task::Waker;
|
||||||
|
|
||||||
use atomic_polyfill::{AtomicU64, Ordering as AtomicOrdering};
|
|
||||||
use embassy_sync::blocking_mutex::raw::{CriticalSectionRawMutex, RawMutex};
|
|
||||||
use embassy_sync::blocking_mutex::Mutex;
|
|
||||||
use heapless::sorted_linked_list::{LinkedIndexU8, Min, SortedLinkedList};
|
|
||||||
|
|
||||||
use crate::driver::{allocate_alarm, set_alarm, set_alarm_callback, AlarmHandle};
|
|
||||||
use crate::Instant;
|
use crate::Instant;
|
||||||
|
|
||||||
#[derive(Debug)]
|
/// Timer queue
|
||||||
struct Timer {
|
pub trait TimerQueue {
|
||||||
at: Instant,
|
/// Schedules a waker in the queue to be awoken at moment `at`.
|
||||||
waker: Waker,
|
/// If this moment is in the past, the waker might be awoken immediately.
|
||||||
|
fn schedule_wake(&'static self, at: Instant, waker: &Waker);
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PartialEq for Timer {
|
/// Set the TimerQueue implementation.
|
||||||
fn eq(&self, other: &Self) -> bool {
|
|
||||||
self.at == other.at
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Eq for Timer {}
|
|
||||||
|
|
||||||
impl PartialOrd for Timer {
|
|
||||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
|
||||||
self.at.partial_cmp(&other.at)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Ord for Timer {
|
|
||||||
fn cmp(&self, other: &Self) -> Ordering {
|
|
||||||
self.at.cmp(&other.at)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct InnerQueue<const N: usize> {
|
|
||||||
queue: SortedLinkedList<Timer, LinkedIndexU8, Min, N>,
|
|
||||||
alarm_at: Instant,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<const N: usize> InnerQueue<N> {
|
|
||||||
const fn new() -> Self {
|
|
||||||
Self {
|
|
||||||
queue: SortedLinkedList::new_u8(),
|
|
||||||
alarm_at: Instant::MAX,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn schedule(&mut self, at: Instant, waker: &Waker, alarm_schedule: &AtomicU64) {
|
|
||||||
self.queue
|
|
||||||
.find_mut(|timer| timer.waker.will_wake(waker))
|
|
||||||
.map(|mut timer| {
|
|
||||||
timer.waker = waker.clone();
|
|
||||||
timer.at = at;
|
|
||||||
|
|
||||||
timer.finish();
|
|
||||||
})
|
|
||||||
.unwrap_or_else(|| {
|
|
||||||
let mut timer = Timer {
|
|
||||||
waker: waker.clone(),
|
|
||||||
at,
|
|
||||||
};
|
|
||||||
|
|
||||||
loop {
|
|
||||||
match self.queue.push(timer) {
|
|
||||||
Ok(()) => break,
|
|
||||||
Err(e) => timer = e,
|
|
||||||
}
|
|
||||||
|
|
||||||
self.queue.pop().unwrap().waker.wake();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Don't wait for the alarm callback to trigger and directly
|
|
||||||
// dispatch all timers that are already due
|
|
||||||
//
|
|
||||||
// Then update the alarm if necessary
|
|
||||||
self.dispatch(alarm_schedule);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn dispatch(&mut self, alarm_schedule: &AtomicU64) {
|
|
||||||
let now = Instant::now();
|
|
||||||
|
|
||||||
while self.queue.peek().filter(|timer| timer.at <= now).is_some() {
|
|
||||||
self.queue.pop().unwrap().waker.wake();
|
|
||||||
}
|
|
||||||
|
|
||||||
self.update_alarm(alarm_schedule);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn update_alarm(&mut self, alarm_schedule: &AtomicU64) {
|
|
||||||
if let Some(timer) = self.queue.peek() {
|
|
||||||
let new_at = timer.at;
|
|
||||||
|
|
||||||
if self.alarm_at != new_at {
|
|
||||||
self.alarm_at = new_at;
|
|
||||||
alarm_schedule.store(new_at.as_ticks(), AtomicOrdering::SeqCst);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
self.alarm_at = Instant::MAX;
|
|
||||||
alarm_schedule.store(Instant::MAX.as_ticks(), AtomicOrdering::SeqCst);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn handle_alarm(&mut self, alarm_schedule: &AtomicU64) {
|
|
||||||
self.alarm_at = Instant::MAX;
|
|
||||||
|
|
||||||
self.dispatch(alarm_schedule);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The generic queue implementation
|
|
||||||
pub struct Queue<const N: usize = 128, R: RawMutex = CriticalSectionRawMutex> {
|
|
||||||
inner: Mutex<R, RefCell<InnerQueue<N>>>,
|
|
||||||
alarm: Cell<Option<AlarmHandle>>,
|
|
||||||
alarm_schedule: AtomicU64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<const N: usize, R: RawMutex + 'static> Queue<N, R> {
|
|
||||||
/// Create a Queue
|
|
||||||
pub const fn new() -> Self {
|
|
||||||
Self {
|
|
||||||
inner: Mutex::new(RefCell::new(InnerQueue::<N>::new())),
|
|
||||||
alarm: Cell::new(None),
|
|
||||||
alarm_schedule: AtomicU64::new(u64::MAX),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Initialize the queue
|
|
||||||
///
|
|
||||||
/// This method is called from [`initialize`](crate::queue::initialize), so you are not expected to call it directly.
|
|
||||||
/// Call [`initialize`](crate::queue::initialize) instead.
|
|
||||||
///
|
|
||||||
/// # Safety
|
|
||||||
/// It is UB call this function more than once, or to call it after any of your
|
|
||||||
/// futures that use `embassy-time` are polled already.
|
|
||||||
pub unsafe fn initialize(&'static self) {
|
|
||||||
if self.alarm.get().is_some() {
|
|
||||||
panic!("Queue is already initialized");
|
|
||||||
}
|
|
||||||
|
|
||||||
let handle = allocate_alarm().unwrap();
|
|
||||||
self.alarm.set(Some(handle));
|
|
||||||
|
|
||||||
set_alarm_callback(handle, Self::handle_alarm_callback, self as *const _ as _);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Schedule a new waker to be awoken at moment `at`
|
|
||||||
///
|
|
||||||
/// This method is called internally by [`embassy-time`](crate), so you are not expected to call it directly.
|
|
||||||
pub fn schedule(&'static self, at: Instant, waker: &Waker) {
|
|
||||||
self.check_initialized();
|
|
||||||
|
|
||||||
self.inner
|
|
||||||
.lock(|inner| inner.borrow_mut().schedule(at, waker, &self.alarm_schedule));
|
|
||||||
|
|
||||||
self.update_alarm();
|
|
||||||
}
|
|
||||||
|
|
||||||
fn check_initialized(&self) {
|
|
||||||
if self.alarm.get().is_none() {
|
|
||||||
panic!("Queue is not initialized yet");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn update_alarm(&self) {
|
|
||||||
// Need to set the alarm when we are *not* holding the mutex on the inner queue
|
|
||||||
// because mutexes are not re-entrant, which is a problem because `set_alarm` might immediately
|
|
||||||
// call us back if the timestamp is in the past.
|
|
||||||
let alarm_at = self.alarm_schedule.swap(u64::MAX, AtomicOrdering::SeqCst);
|
|
||||||
|
|
||||||
if alarm_at < u64::MAX {
|
|
||||||
set_alarm(self.alarm.get().unwrap(), alarm_at);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn handle_alarm(&self) {
|
|
||||||
self.check_initialized();
|
|
||||||
self.inner
|
|
||||||
.lock(|inner| inner.borrow_mut().handle_alarm(&self.alarm_schedule));
|
|
||||||
|
|
||||||
self.update_alarm();
|
|
||||||
}
|
|
||||||
|
|
||||||
fn handle_alarm_callback(ctx: *mut ()) {
|
|
||||||
unsafe { (ctx as *const Self).as_ref().unwrap() }.handle_alarm();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
unsafe impl<const N: usize, R: RawMutex + 'static> Send for Queue<N, R> {}
|
|
||||||
unsafe impl<const N: usize, R: RawMutex + 'static> Sync for Queue<N, R> {}
|
|
||||||
|
|
||||||
/// Initialize the queue
|
|
||||||
///
|
|
||||||
/// Call this function early on in your program, before any of your futures that utilize `embassy-time` are polled.
|
|
||||||
///
|
|
||||||
/// # Safety
|
|
||||||
/// It is UB call this function more than once, or to call it after any of your
|
|
||||||
/// futures that use `embassy-time` are polled already.
|
|
||||||
pub unsafe fn initialize() {
|
|
||||||
extern "Rust" {
|
|
||||||
fn _embassy_time_generic_queue_initialize();
|
|
||||||
}
|
|
||||||
|
|
||||||
_embassy_time_generic_queue_initialize();
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Instantiates a global, generic (as in executor-agnostic) timer queue.
|
|
||||||
///
|
|
||||||
/// Unless you plan to customize the queue (size or mutex), prefer
|
|
||||||
/// instantiating the queue via the `generic-queue` feature.
|
|
||||||
///
|
///
|
||||||
/// See the module documentation for an example.
|
/// See the module documentation for an example.
|
||||||
#[macro_export]
|
#[macro_export]
|
||||||
macro_rules! generic_queue {
|
macro_rules! timer_queue_impl {
|
||||||
(static $name:ident: $t: ty = $val:expr) => {
|
(static $name:ident: $t: ty = $val:expr) => {
|
||||||
static $name: $t = $val;
|
static $name: $t = $val;
|
||||||
|
|
||||||
#[no_mangle]
|
|
||||||
fn _embassy_time_generic_queue_initialize() {
|
|
||||||
unsafe {
|
|
||||||
$crate::queue::Queue::initialize(&$name);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[no_mangle]
|
#[no_mangle]
|
||||||
fn _embassy_time_schedule_wake(at: $crate::Instant, waker: &core::task::Waker) {
|
fn _embassy_time_schedule_wake(at: $crate::Instant, waker: &core::task::Waker) {
|
||||||
$crate::queue::Queue::schedule(&$name, at, waker);
|
<$t as $crate::queue::TimerQueue>::schedule_wake(&$name, at, waker);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "generic-queue")]
|
|
||||||
generic_queue!(static QUEUE: Queue = Queue::new());
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use core::cell::Cell;
|
|
||||||
use core::sync::atomic::Ordering;
|
|
||||||
use core::task::{RawWaker, RawWakerVTable, Waker};
|
|
||||||
use std::rc::Rc;
|
|
||||||
use std::sync::Mutex;
|
|
||||||
|
|
||||||
use embassy_sync::blocking_mutex::raw::RawMutex;
|
|
||||||
use serial_test::serial;
|
|
||||||
|
|
||||||
use super::InnerQueue;
|
|
||||||
use crate::driver::{AlarmHandle, Driver};
|
|
||||||
use crate::Instant;
|
|
||||||
|
|
||||||
struct InnerTestDriver {
|
|
||||||
now: u64,
|
|
||||||
alarm: u64,
|
|
||||||
callback: fn(*mut ()),
|
|
||||||
ctx: *mut (),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl InnerTestDriver {
|
|
||||||
const fn new() -> Self {
|
|
||||||
Self {
|
|
||||||
now: 0,
|
|
||||||
alarm: u64::MAX,
|
|
||||||
callback: Self::noop,
|
|
||||||
ctx: core::ptr::null_mut(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn noop(_ctx: *mut ()) {}
|
|
||||||
}
|
|
||||||
|
|
||||||
unsafe impl Send for InnerTestDriver {}
|
|
||||||
|
|
||||||
struct TestDriver(Mutex<InnerTestDriver>);
|
|
||||||
|
|
||||||
impl TestDriver {
|
|
||||||
const fn new() -> Self {
|
|
||||||
Self(Mutex::new(InnerTestDriver::new()))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn reset(&self) {
|
|
||||||
*self.0.lock().unwrap() = InnerTestDriver::new();
|
|
||||||
}
|
|
||||||
|
|
||||||
fn set_now(&self, now: u64) {
|
|
||||||
let notify = {
|
|
||||||
let mut inner = self.0.lock().unwrap();
|
|
||||||
|
|
||||||
if inner.now < now {
|
|
||||||
inner.now = now;
|
|
||||||
|
|
||||||
if inner.alarm <= now {
|
|
||||||
inner.alarm = u64::MAX;
|
|
||||||
|
|
||||||
Some((inner.callback, inner.ctx))
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
panic!("Going back in time?");
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some((callback, ctx)) = notify {
|
|
||||||
(callback)(ctx);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Driver for TestDriver {
|
|
||||||
fn now(&self) -> u64 {
|
|
||||||
self.0.lock().unwrap().now
|
|
||||||
}
|
|
||||||
|
|
||||||
unsafe fn allocate_alarm(&self) -> Option<AlarmHandle> {
|
|
||||||
Some(AlarmHandle::new(0))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn set_alarm_callback(&self, _alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) {
|
|
||||||
let mut inner = self.0.lock().unwrap();
|
|
||||||
|
|
||||||
inner.callback = callback;
|
|
||||||
inner.ctx = ctx;
|
|
||||||
}
|
|
||||||
|
|
||||||
fn set_alarm(&self, _alarm: AlarmHandle, timestamp: u64) {
|
|
||||||
let notify = {
|
|
||||||
let mut inner = self.0.lock().unwrap();
|
|
||||||
|
|
||||||
if timestamp <= inner.now {
|
|
||||||
Some((inner.callback, inner.ctx))
|
|
||||||
} else {
|
|
||||||
inner.alarm = timestamp;
|
|
||||||
None
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some((callback, ctx)) = notify {
|
|
||||||
(callback)(ctx);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct TestWaker {
|
|
||||||
pub awoken: Rc<Cell<bool>>,
|
|
||||||
pub waker: Waker,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TestWaker {
|
|
||||||
fn new() -> Self {
|
|
||||||
let flag = Rc::new(Cell::new(false));
|
|
||||||
|
|
||||||
const VTABLE: RawWakerVTable = RawWakerVTable::new(
|
|
||||||
|data: *const ()| {
|
|
||||||
unsafe {
|
|
||||||
Rc::increment_strong_count(data as *const Cell<bool>);
|
|
||||||
}
|
|
||||||
|
|
||||||
RawWaker::new(data as _, &VTABLE)
|
|
||||||
},
|
|
||||||
|data: *const ()| unsafe {
|
|
||||||
let data = data as *const Cell<bool>;
|
|
||||||
data.as_ref().unwrap().set(true);
|
|
||||||
Rc::decrement_strong_count(data);
|
|
||||||
},
|
|
||||||
|data: *const ()| unsafe {
|
|
||||||
(data as *const Cell<bool>).as_ref().unwrap().set(true);
|
|
||||||
},
|
|
||||||
|data: *const ()| unsafe {
|
|
||||||
Rc::decrement_strong_count(data);
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
let raw = RawWaker::new(Rc::into_raw(flag.clone()) as _, &VTABLE);
|
|
||||||
|
|
||||||
Self {
|
|
||||||
awoken: flag.clone(),
|
|
||||||
waker: unsafe { Waker::from_raw(raw) },
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: This impl should be part of `embassy-sync`, hidden behind the "std" feature gate
|
|
||||||
pub struct StdRawMutex(std::sync::Mutex<()>);
|
|
||||||
|
|
||||||
unsafe impl RawMutex for StdRawMutex {
|
|
||||||
const INIT: Self = StdRawMutex(std::sync::Mutex::new(()));
|
|
||||||
|
|
||||||
fn lock<R>(&self, f: impl FnOnce() -> R) -> R {
|
|
||||||
let _guard = self.0.lock().unwrap();
|
|
||||||
|
|
||||||
f()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const QUEUE_MAX_LEN: usize = 8;
|
|
||||||
|
|
||||||
crate::time_driver_impl!(static DRIVER: TestDriver = TestDriver::new());
|
|
||||||
crate::generic_queue!(static QUEUE: super::Queue<{ QUEUE_MAX_LEN }, StdRawMutex> = super::Queue::new());
|
|
||||||
|
|
||||||
fn setup() {
|
|
||||||
DRIVER.reset();
|
|
||||||
|
|
||||||
QUEUE.alarm.set(None);
|
|
||||||
QUEUE.alarm_schedule.store(u64::MAX, Ordering::SeqCst);
|
|
||||||
QUEUE.inner.lock(|inner| {
|
|
||||||
*inner.borrow_mut() = InnerQueue::new();
|
|
||||||
});
|
|
||||||
|
|
||||||
unsafe { super::initialize() };
|
|
||||||
}
|
|
||||||
|
|
||||||
fn queue_len() -> usize {
|
|
||||||
QUEUE.inner.lock(|inner| inner.borrow().queue.iter().count())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
#[serial]
|
|
||||||
#[should_panic(expected = "Queue is not initialized yet")]
|
|
||||||
fn test_not_initialized() {
|
|
||||||
static QUEUE: super::Queue<{ QUEUE_MAX_LEN }, StdRawMutex> = super::Queue::new();
|
|
||||||
|
|
||||||
let waker = TestWaker::new();
|
|
||||||
|
|
||||||
QUEUE.schedule(Instant::from_secs(1), &waker.waker);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
#[serial]
|
|
||||||
fn test_initialized() {
|
|
||||||
static QUEUE: super::Queue<{ QUEUE_MAX_LEN }, StdRawMutex> = super::Queue::new();
|
|
||||||
|
|
||||||
assert!(QUEUE.alarm.get().is_none());
|
|
||||||
|
|
||||||
unsafe { QUEUE.initialize() };
|
|
||||||
|
|
||||||
assert!(QUEUE.alarm.get().is_some());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
#[serial]
|
|
||||||
#[should_panic(expected = "Queue is already initialized")]
|
|
||||||
fn test_already_initialized() {
|
|
||||||
static QUEUE: super::Queue<{ QUEUE_MAX_LEN }, StdRawMutex> = super::Queue::new();
|
|
||||||
|
|
||||||
unsafe { QUEUE.initialize() };
|
|
||||||
|
|
||||||
assert!(QUEUE.alarm.get().is_some());
|
|
||||||
|
|
||||||
unsafe { QUEUE.initialize() };
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
#[serial]
|
|
||||||
fn test_schedule() {
|
|
||||||
setup();
|
|
||||||
|
|
||||||
assert_eq!(queue_len(), 0);
|
|
||||||
|
|
||||||
let waker = TestWaker::new();
|
|
||||||
|
|
||||||
QUEUE.schedule(Instant::from_secs(1), &waker.waker);
|
|
||||||
|
|
||||||
assert!(!waker.awoken.get());
|
|
||||||
assert_eq!(queue_len(), 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
#[serial]
|
|
||||||
fn test_schedule_same() {
|
|
||||||
setup();
|
|
||||||
|
|
||||||
let waker = TestWaker::new();
|
|
||||||
|
|
||||||
QUEUE.schedule(Instant::from_secs(1), &waker.waker);
|
|
||||||
|
|
||||||
assert_eq!(queue_len(), 1);
|
|
||||||
|
|
||||||
QUEUE.schedule(Instant::from_secs(1), &waker.waker);
|
|
||||||
|
|
||||||
assert_eq!(queue_len(), 1);
|
|
||||||
|
|
||||||
QUEUE.schedule(Instant::from_secs(100), &waker.waker);
|
|
||||||
|
|
||||||
assert_eq!(queue_len(), 1);
|
|
||||||
|
|
||||||
let waker2 = TestWaker::new();
|
|
||||||
|
|
||||||
QUEUE.schedule(Instant::from_secs(100), &waker2.waker);
|
|
||||||
|
|
||||||
assert_eq!(queue_len(), 2);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
#[serial]
|
|
||||||
fn test_trigger() {
|
|
||||||
setup();
|
|
||||||
|
|
||||||
let waker = TestWaker::new();
|
|
||||||
|
|
||||||
QUEUE.schedule(Instant::from_secs(100), &waker.waker);
|
|
||||||
|
|
||||||
assert!(!waker.awoken.get());
|
|
||||||
|
|
||||||
DRIVER.set_now(Instant::from_secs(99).as_ticks());
|
|
||||||
|
|
||||||
assert!(!waker.awoken.get());
|
|
||||||
|
|
||||||
assert_eq!(queue_len(), 1);
|
|
||||||
|
|
||||||
DRIVER.set_now(Instant::from_secs(100).as_ticks());
|
|
||||||
|
|
||||||
assert!(waker.awoken.get());
|
|
||||||
|
|
||||||
assert_eq!(queue_len(), 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
#[serial]
|
|
||||||
fn test_immediate_trigger() {
|
|
||||||
setup();
|
|
||||||
|
|
||||||
let waker = TestWaker::new();
|
|
||||||
|
|
||||||
QUEUE.schedule(Instant::from_secs(100), &waker.waker);
|
|
||||||
|
|
||||||
DRIVER.set_now(Instant::from_secs(50).as_ticks());
|
|
||||||
|
|
||||||
let waker2 = TestWaker::new();
|
|
||||||
|
|
||||||
QUEUE.schedule(Instant::from_secs(40), &waker2.waker);
|
|
||||||
|
|
||||||
assert!(!waker.awoken.get());
|
|
||||||
assert!(waker2.awoken.get());
|
|
||||||
assert_eq!(queue_len(), 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
#[serial]
|
|
||||||
fn test_queue_overflow() {
|
|
||||||
setup();
|
|
||||||
|
|
||||||
for i in 1..QUEUE_MAX_LEN {
|
|
||||||
let waker = TestWaker::new();
|
|
||||||
|
|
||||||
QUEUE.schedule(Instant::from_secs(310), &waker.waker);
|
|
||||||
|
|
||||||
assert_eq!(queue_len(), i);
|
|
||||||
assert!(!waker.awoken.get());
|
|
||||||
}
|
|
||||||
|
|
||||||
let first_waker = TestWaker::new();
|
|
||||||
|
|
||||||
QUEUE.schedule(Instant::from_secs(300), &first_waker.waker);
|
|
||||||
|
|
||||||
assert_eq!(queue_len(), QUEUE_MAX_LEN);
|
|
||||||
assert!(!first_waker.awoken.get());
|
|
||||||
|
|
||||||
let second_waker = TestWaker::new();
|
|
||||||
|
|
||||||
QUEUE.schedule(Instant::from_secs(305), &second_waker.waker);
|
|
||||||
|
|
||||||
assert_eq!(queue_len(), QUEUE_MAX_LEN);
|
|
||||||
assert!(first_waker.awoken.get());
|
|
||||||
|
|
||||||
QUEUE.schedule(Instant::from_secs(320), &TestWaker::new().waker);
|
|
||||||
assert_eq!(queue_len(), QUEUE_MAX_LEN);
|
|
||||||
assert!(second_waker.awoken.get());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
474
embassy-time/src/queue_generic.rs
Normal file
474
embassy-time/src/queue_generic.rs
Normal file
@ -0,0 +1,474 @@
|
|||||||
|
use core::cell::RefCell;
|
||||||
|
use core::cmp::Ordering;
|
||||||
|
use core::task::Waker;
|
||||||
|
|
||||||
|
use atomic_polyfill::{AtomicU64, Ordering as AtomicOrdering};
|
||||||
|
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
|
||||||
|
use embassy_sync::blocking_mutex::Mutex;
|
||||||
|
use heapless::sorted_linked_list::{LinkedIndexU8, Min, SortedLinkedList};
|
||||||
|
|
||||||
|
use crate::driver::{allocate_alarm, set_alarm, set_alarm_callback, AlarmHandle};
|
||||||
|
use crate::queue::TimerQueue;
|
||||||
|
use crate::Instant;
|
||||||
|
|
||||||
|
#[cfg(feature = "generic-queue-8")]
|
||||||
|
const QUEUE_SIZE: usize = 8;
|
||||||
|
#[cfg(feature = "generic-queue-16")]
|
||||||
|
const QUEUE_SIZE: usize = 16;
|
||||||
|
#[cfg(feature = "generic-queue-32")]
|
||||||
|
const QUEUE_SIZE: usize = 32;
|
||||||
|
#[cfg(feature = "generic-queue-64")]
|
||||||
|
const QUEUE_SIZE: usize = 32;
|
||||||
|
#[cfg(feature = "generic-queue-128")]
|
||||||
|
const QUEUE_SIZE: usize = 128;
|
||||||
|
#[cfg(not(any(
|
||||||
|
feature = "generic-queue-8",
|
||||||
|
feature = "generic-queue-16",
|
||||||
|
feature = "generic-queue-32",
|
||||||
|
feature = "generic-queue-64",
|
||||||
|
feature = "generic-queue-128"
|
||||||
|
)))]
|
||||||
|
const QUEUE_SIZE: usize = 64;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct Timer {
|
||||||
|
at: Instant,
|
||||||
|
waker: Waker,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PartialEq for Timer {
|
||||||
|
fn eq(&self, other: &Self) -> bool {
|
||||||
|
self.at == other.at
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Eq for Timer {}
|
||||||
|
|
||||||
|
impl PartialOrd for Timer {
|
||||||
|
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||||
|
self.at.partial_cmp(&other.at)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Ord for Timer {
|
||||||
|
fn cmp(&self, other: &Self) -> Ordering {
|
||||||
|
self.at.cmp(&other.at)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct InnerQueue {
|
||||||
|
queue: SortedLinkedList<Timer, LinkedIndexU8, Min, { QUEUE_SIZE }>,
|
||||||
|
alarm: Option<AlarmHandle>,
|
||||||
|
alarm_at: Instant,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl InnerQueue {
|
||||||
|
const fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
queue: SortedLinkedList::new_u8(),
|
||||||
|
alarm: None,
|
||||||
|
alarm_at: Instant::MAX,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn schedule_wake(&mut self, at: Instant, waker: &Waker, alarm_schedule: &AtomicU64) {
|
||||||
|
self.queue
|
||||||
|
.find_mut(|timer| timer.waker.will_wake(waker))
|
||||||
|
.map(|mut timer| {
|
||||||
|
timer.at = at;
|
||||||
|
timer.finish();
|
||||||
|
})
|
||||||
|
.unwrap_or_else(|| {
|
||||||
|
let mut timer = Timer {
|
||||||
|
waker: waker.clone(),
|
||||||
|
at,
|
||||||
|
};
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match self.queue.push(timer) {
|
||||||
|
Ok(()) => break,
|
||||||
|
Err(e) => timer = e,
|
||||||
|
}
|
||||||
|
|
||||||
|
self.queue.pop().unwrap().waker.wake();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Don't wait for the alarm callback to trigger and directly
|
||||||
|
// dispatch all timers that are already due
|
||||||
|
//
|
||||||
|
// Then update the alarm if necessary
|
||||||
|
self.dispatch(alarm_schedule);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn dispatch(&mut self, alarm_schedule: &AtomicU64) {
|
||||||
|
let now = Instant::now();
|
||||||
|
|
||||||
|
while self.queue.peek().filter(|timer| timer.at <= now).is_some() {
|
||||||
|
self.queue.pop().unwrap().waker.wake();
|
||||||
|
}
|
||||||
|
|
||||||
|
self.update_alarm(alarm_schedule);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_alarm(&mut self, alarm_schedule: &AtomicU64) {
|
||||||
|
if let Some(timer) = self.queue.peek() {
|
||||||
|
let new_at = timer.at;
|
||||||
|
|
||||||
|
if self.alarm_at != new_at {
|
||||||
|
self.alarm_at = new_at;
|
||||||
|
alarm_schedule.store(new_at.as_ticks(), AtomicOrdering::SeqCst);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
self.alarm_at = Instant::MAX;
|
||||||
|
alarm_schedule.store(Instant::MAX.as_ticks(), AtomicOrdering::SeqCst);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_alarm(&mut self, alarm_schedule: &AtomicU64) {
|
||||||
|
self.alarm_at = Instant::MAX;
|
||||||
|
|
||||||
|
self.dispatch(alarm_schedule);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Queue {
|
||||||
|
inner: Mutex<CriticalSectionRawMutex, RefCell<InnerQueue>>,
|
||||||
|
alarm_schedule: AtomicU64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Queue {
|
||||||
|
const fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
inner: Mutex::new(RefCell::new(InnerQueue::new())),
|
||||||
|
alarm_schedule: AtomicU64::new(u64::MAX),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn schedule_wake(&'static self, at: Instant, waker: &Waker) {
|
||||||
|
self.inner.lock(|inner| {
|
||||||
|
let mut inner = inner.borrow_mut();
|
||||||
|
|
||||||
|
if inner.alarm.is_none() {
|
||||||
|
let handle = unsafe { allocate_alarm() }.unwrap();
|
||||||
|
inner.alarm = Some(handle);
|
||||||
|
|
||||||
|
set_alarm_callback(handle, Self::handle_alarm_callback, self as *const _ as _);
|
||||||
|
}
|
||||||
|
|
||||||
|
inner.schedule_wake(at, waker, &self.alarm_schedule)
|
||||||
|
});
|
||||||
|
|
||||||
|
self.update_alarm();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_alarm(&self) {
|
||||||
|
// Need to set the alarm when we are *not* holding the mutex on the inner queue
|
||||||
|
// because mutexes are not re-entrant, which is a problem because `set_alarm` might immediately
|
||||||
|
// call us back if the timestamp is in the past.
|
||||||
|
let alarm_at = self.alarm_schedule.swap(u64::MAX, AtomicOrdering::SeqCst);
|
||||||
|
|
||||||
|
if alarm_at < u64::MAX {
|
||||||
|
set_alarm(self.inner.lock(|inner| inner.borrow().alarm.unwrap()), alarm_at);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_alarm(&self) {
|
||||||
|
self.inner
|
||||||
|
.lock(|inner| inner.borrow_mut().handle_alarm(&self.alarm_schedule));
|
||||||
|
|
||||||
|
self.update_alarm();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_alarm_callback(ctx: *mut ()) {
|
||||||
|
unsafe { (ctx as *const Self).as_ref().unwrap() }.handle_alarm();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TimerQueue for Queue {
|
||||||
|
fn schedule_wake(&'static self, at: Instant, waker: &Waker) {
|
||||||
|
Queue::schedule_wake(self, at, waker);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
crate::timer_queue_impl!(static QUEUE: Queue = Queue::new());
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use core::cell::Cell;
|
||||||
|
use core::sync::atomic::Ordering;
|
||||||
|
use core::task::{RawWaker, RawWakerVTable, Waker};
|
||||||
|
use std::rc::Rc;
|
||||||
|
use std::sync::Mutex;
|
||||||
|
|
||||||
|
use serial_test::serial;
|
||||||
|
|
||||||
|
use super::InnerQueue;
|
||||||
|
use crate::driver::{AlarmHandle, Driver};
|
||||||
|
use crate::queue_generic::QUEUE;
|
||||||
|
use crate::Instant;
|
||||||
|
|
||||||
|
struct InnerTestDriver {
|
||||||
|
now: u64,
|
||||||
|
alarm: u64,
|
||||||
|
callback: fn(*mut ()),
|
||||||
|
ctx: *mut (),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl InnerTestDriver {
|
||||||
|
const fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
now: 0,
|
||||||
|
alarm: u64::MAX,
|
||||||
|
callback: Self::noop,
|
||||||
|
ctx: core::ptr::null_mut(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn noop(_ctx: *mut ()) {}
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe impl Send for InnerTestDriver {}
|
||||||
|
|
||||||
|
struct TestDriver(Mutex<InnerTestDriver>);
|
||||||
|
|
||||||
|
impl TestDriver {
|
||||||
|
const fn new() -> Self {
|
||||||
|
Self(Mutex::new(InnerTestDriver::new()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn reset(&self) {
|
||||||
|
*self.0.lock().unwrap() = InnerTestDriver::new();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_now(&self, now: u64) {
|
||||||
|
let notify = {
|
||||||
|
let mut inner = self.0.lock().unwrap();
|
||||||
|
|
||||||
|
if inner.now < now {
|
||||||
|
inner.now = now;
|
||||||
|
|
||||||
|
if inner.alarm <= now {
|
||||||
|
inner.alarm = u64::MAX;
|
||||||
|
|
||||||
|
Some((inner.callback, inner.ctx))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
panic!("Going back in time?");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some((callback, ctx)) = notify {
|
||||||
|
(callback)(ctx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Driver for TestDriver {
|
||||||
|
fn now(&self) -> u64 {
|
||||||
|
self.0.lock().unwrap().now
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe fn allocate_alarm(&self) -> Option<AlarmHandle> {
|
||||||
|
Some(AlarmHandle::new(0))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_alarm_callback(&self, _alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) {
|
||||||
|
let mut inner = self.0.lock().unwrap();
|
||||||
|
|
||||||
|
inner.callback = callback;
|
||||||
|
inner.ctx = ctx;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_alarm(&self, _alarm: AlarmHandle, timestamp: u64) {
|
||||||
|
let notify = {
|
||||||
|
let mut inner = self.0.lock().unwrap();
|
||||||
|
|
||||||
|
if timestamp <= inner.now {
|
||||||
|
Some((inner.callback, inner.ctx))
|
||||||
|
} else {
|
||||||
|
inner.alarm = timestamp;
|
||||||
|
None
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some((callback, ctx)) = notify {
|
||||||
|
(callback)(ctx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct TestWaker {
|
||||||
|
pub awoken: Rc<Cell<bool>>,
|
||||||
|
pub waker: Waker,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TestWaker {
|
||||||
|
fn new() -> Self {
|
||||||
|
let flag = Rc::new(Cell::new(false));
|
||||||
|
|
||||||
|
const VTABLE: RawWakerVTable = RawWakerVTable::new(
|
||||||
|
|data: *const ()| {
|
||||||
|
unsafe {
|
||||||
|
Rc::increment_strong_count(data as *const Cell<bool>);
|
||||||
|
}
|
||||||
|
|
||||||
|
RawWaker::new(data as _, &VTABLE)
|
||||||
|
},
|
||||||
|
|data: *const ()| unsafe {
|
||||||
|
let data = data as *const Cell<bool>;
|
||||||
|
data.as_ref().unwrap().set(true);
|
||||||
|
Rc::decrement_strong_count(data);
|
||||||
|
},
|
||||||
|
|data: *const ()| unsafe {
|
||||||
|
(data as *const Cell<bool>).as_ref().unwrap().set(true);
|
||||||
|
},
|
||||||
|
|data: *const ()| unsafe {
|
||||||
|
Rc::decrement_strong_count(data);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
let raw = RawWaker::new(Rc::into_raw(flag.clone()) as _, &VTABLE);
|
||||||
|
|
||||||
|
Self {
|
||||||
|
awoken: flag.clone(),
|
||||||
|
waker: unsafe { Waker::from_raw(raw) },
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
crate::time_driver_impl!(static DRIVER: TestDriver = TestDriver::new());
|
||||||
|
|
||||||
|
fn setup() {
|
||||||
|
DRIVER.reset();
|
||||||
|
|
||||||
|
QUEUE.alarm_schedule.store(u64::MAX, Ordering::SeqCst);
|
||||||
|
QUEUE.inner.lock(|inner| {
|
||||||
|
*inner.borrow_mut() = InnerQueue::new();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
fn queue_len() -> usize {
|
||||||
|
QUEUE.inner.lock(|inner| inner.borrow().queue.iter().count())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[serial]
|
||||||
|
fn test_schedule() {
|
||||||
|
setup();
|
||||||
|
|
||||||
|
assert_eq!(queue_len(), 0);
|
||||||
|
|
||||||
|
let waker = TestWaker::new();
|
||||||
|
|
||||||
|
QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker);
|
||||||
|
|
||||||
|
assert!(!waker.awoken.get());
|
||||||
|
assert_eq!(queue_len(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[serial]
|
||||||
|
fn test_schedule_same() {
|
||||||
|
setup();
|
||||||
|
|
||||||
|
let waker = TestWaker::new();
|
||||||
|
|
||||||
|
QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker);
|
||||||
|
|
||||||
|
assert_eq!(queue_len(), 1);
|
||||||
|
|
||||||
|
QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker);
|
||||||
|
|
||||||
|
assert_eq!(queue_len(), 1);
|
||||||
|
|
||||||
|
QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker);
|
||||||
|
|
||||||
|
assert_eq!(queue_len(), 1);
|
||||||
|
|
||||||
|
let waker2 = TestWaker::new();
|
||||||
|
|
||||||
|
QUEUE.schedule_wake(Instant::from_secs(100), &waker2.waker);
|
||||||
|
|
||||||
|
assert_eq!(queue_len(), 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[serial]
|
||||||
|
fn test_trigger() {
|
||||||
|
setup();
|
||||||
|
|
||||||
|
let waker = TestWaker::new();
|
||||||
|
|
||||||
|
QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker);
|
||||||
|
|
||||||
|
assert!(!waker.awoken.get());
|
||||||
|
|
||||||
|
DRIVER.set_now(Instant::from_secs(99).as_ticks());
|
||||||
|
|
||||||
|
assert!(!waker.awoken.get());
|
||||||
|
|
||||||
|
assert_eq!(queue_len(), 1);
|
||||||
|
|
||||||
|
DRIVER.set_now(Instant::from_secs(100).as_ticks());
|
||||||
|
|
||||||
|
assert!(waker.awoken.get());
|
||||||
|
|
||||||
|
assert_eq!(queue_len(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[serial]
|
||||||
|
fn test_immediate_trigger() {
|
||||||
|
setup();
|
||||||
|
|
||||||
|
let waker = TestWaker::new();
|
||||||
|
|
||||||
|
QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker);
|
||||||
|
|
||||||
|
DRIVER.set_now(Instant::from_secs(50).as_ticks());
|
||||||
|
|
||||||
|
let waker2 = TestWaker::new();
|
||||||
|
|
||||||
|
QUEUE.schedule_wake(Instant::from_secs(40), &waker2.waker);
|
||||||
|
|
||||||
|
assert!(!waker.awoken.get());
|
||||||
|
assert!(waker2.awoken.get());
|
||||||
|
assert_eq!(queue_len(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[serial]
|
||||||
|
fn test_queue_overflow() {
|
||||||
|
setup();
|
||||||
|
|
||||||
|
for i in 1..super::QUEUE_SIZE {
|
||||||
|
let waker = TestWaker::new();
|
||||||
|
|
||||||
|
QUEUE.schedule_wake(Instant::from_secs(310), &waker.waker);
|
||||||
|
|
||||||
|
assert_eq!(queue_len(), i);
|
||||||
|
assert!(!waker.awoken.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
let first_waker = TestWaker::new();
|
||||||
|
|
||||||
|
QUEUE.schedule_wake(Instant::from_secs(300), &first_waker.waker);
|
||||||
|
|
||||||
|
assert_eq!(queue_len(), super::QUEUE_SIZE);
|
||||||
|
assert!(!first_waker.awoken.get());
|
||||||
|
|
||||||
|
let second_waker = TestWaker::new();
|
||||||
|
|
||||||
|
QUEUE.schedule_wake(Instant::from_secs(305), &second_waker.waker);
|
||||||
|
|
||||||
|
assert_eq!(queue_len(), super::QUEUE_SIZE);
|
||||||
|
assert!(first_waker.awoken.get());
|
||||||
|
|
||||||
|
QUEUE.schedule_wake(Instant::from_secs(320), &TestWaker::new().waker);
|
||||||
|
assert_eq!(queue_len(), super::QUEUE_SIZE);
|
||||||
|
assert!(second_waker.awoken.get());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user