Merge #959
959: Generic, executor-agnostic queue implementation r=ivmarkov a=ivmarkov Hopefully relatively well documented. Implementation relies on a fixed-size `SortedLinkedList` from `heapless`. (By default, for up to 128 timer schedules, but we can lower this number to - say - 64.) As discussed earlier, on queue overflow, the `WakerRegistration` approach is utilized, whereas the waker that is ordered first in the queue is awoken to make room for the incoming one (which might be the waker that would be awoken after all!). Wakers are compared with `Waker::will_wake`, so the queue should actually not fill up that easily, if at all. I've left provisions for the user to manually instantiate the queue using a dedicated macro - `generic_queue!` so that users willing to adjust the queue size, or users (like me) who have to use the queue in a complex "on-top-of-RTOS-but-the-timer-driver-calling-back-from-ISR" scenario can customize the mutex that protects the queue. The one thing I'm not completely happy with is the need to call `{ embassy_time::queue::initialize() }` early on before any futures using embassy-time are polled, which is currently on the shoulders of the user. I'm open to any ideas where we can get rid of this and do it on the first call to `_embassy_time_schedule_wake`, without introducing very complex combinations of critical sections, atomics and whatnot. Co-authored-by: ivmarkov <ivan.markov@gmail.com> Co-authored-by: Dario Nieuwenhuis <dirbaio@dirbaio.net>
This commit is contained in:
commit
e5097a8866
@ -354,46 +354,54 @@ impl Executor {
|
||||
/// somehow schedule for `poll()` to be called later, at a time you know for sure there's
|
||||
/// no `poll()` already running.
|
||||
pub unsafe fn poll(&'static self) {
|
||||
#[cfg(feature = "integrated-timers")]
|
||||
self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task));
|
||||
loop {
|
||||
#[cfg(feature = "integrated-timers")]
|
||||
self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task));
|
||||
|
||||
self.run_queue.dequeue_all(|p| {
|
||||
let task = p.as_ref();
|
||||
self.run_queue.dequeue_all(|p| {
|
||||
let task = p.as_ref();
|
||||
|
||||
#[cfg(feature = "integrated-timers")]
|
||||
task.expires_at.set(Instant::MAX);
|
||||
|
||||
let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
|
||||
if state & STATE_SPAWNED == 0 {
|
||||
// If task is not running, ignore it. This can happen in the following scenario:
|
||||
// - Task gets dequeued, poll starts
|
||||
// - While task is being polled, it gets woken. It gets placed in the queue.
|
||||
// - Task poll finishes, returning done=true
|
||||
// - RUNNING bit is cleared, but the task is already in the queue.
|
||||
return;
|
||||
}
|
||||
|
||||
#[cfg(feature = "rtos-trace")]
|
||||
trace::task_exec_begin(p.as_ptr() as u32);
|
||||
|
||||
// Run the task
|
||||
task.poll_fn.read()(p as _);
|
||||
|
||||
#[cfg(feature = "rtos-trace")]
|
||||
trace::task_exec_end();
|
||||
|
||||
// Enqueue or update into timer_queue
|
||||
#[cfg(feature = "integrated-timers")]
|
||||
self.timer_queue.update(p);
|
||||
});
|
||||
|
||||
#[cfg(feature = "integrated-timers")]
|
||||
task.expires_at.set(Instant::MAX);
|
||||
|
||||
let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
|
||||
if state & STATE_SPAWNED == 0 {
|
||||
// If task is not running, ignore it. This can happen in the following scenario:
|
||||
// - Task gets dequeued, poll starts
|
||||
// - While task is being polled, it gets woken. It gets placed in the queue.
|
||||
// - Task poll finishes, returning done=true
|
||||
// - RUNNING bit is cleared, but the task is already in the queue.
|
||||
return;
|
||||
{
|
||||
// If this is already in the past, set_alarm might return false
|
||||
// In that case do another poll loop iteration.
|
||||
let next_expiration = self.timer_queue.next_expiration();
|
||||
if driver::set_alarm(self.alarm, next_expiration.as_ticks()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "rtos-trace")]
|
||||
trace::task_exec_begin(p.as_ptr() as u32);
|
||||
|
||||
// Run the task
|
||||
task.poll_fn.read()(p as _);
|
||||
|
||||
#[cfg(feature = "rtos-trace")]
|
||||
trace::task_exec_end();
|
||||
|
||||
// Enqueue or update into timer_queue
|
||||
#[cfg(feature = "integrated-timers")]
|
||||
self.timer_queue.update(p);
|
||||
});
|
||||
|
||||
#[cfg(feature = "integrated-timers")]
|
||||
{
|
||||
// If this is already in the past, set_alarm will immediately trigger the alarm.
|
||||
// This will cause `signal_fn` to be called, which will cause `poll()` to be called again,
|
||||
// so we immediately do another poll loop iteration.
|
||||
let next_expiration = self.timer_queue.next_expiration();
|
||||
driver::set_alarm(self.alarm, next_expiration.as_ticks());
|
||||
#[cfg(not(feature = "integrated-timers"))]
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "rtos-trace")]
|
||||
@ -436,14 +444,21 @@ pub unsafe fn wake_task(task: NonNull<TaskHeader>) {
|
||||
}
|
||||
|
||||
#[cfg(feature = "integrated-timers")]
|
||||
#[no_mangle]
|
||||
unsafe fn _embassy_time_schedule_wake(at: Instant, waker: &core::task::Waker) {
|
||||
let task = waker::task_from_waker(waker);
|
||||
let task = task.as_ref();
|
||||
let expires_at = task.expires_at.get();
|
||||
task.expires_at.set(expires_at.min(at));
|
||||
struct TimerQueue;
|
||||
|
||||
#[cfg(feature = "integrated-timers")]
|
||||
impl embassy_time::queue::TimerQueue for TimerQueue {
|
||||
fn schedule_wake(&'static self, at: Instant, waker: &core::task::Waker) {
|
||||
let task = waker::task_from_waker(waker);
|
||||
let task = unsafe { task.as_ref() };
|
||||
let expires_at = task.expires_at.get();
|
||||
task.expires_at.set(expires_at.min(at));
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "integrated-timers")]
|
||||
embassy_time::timer_queue_impl!(static TIMER_QUEUE: TimerQueue = TimerQueue);
|
||||
|
||||
#[cfg(feature = "rtos-trace")]
|
||||
impl rtos_trace::RtosTraceOSCallbacks for Executor {
|
||||
fn task_list() {
|
||||
|
@ -243,22 +243,25 @@ impl Driver for RtcDriver {
|
||||
})
|
||||
}
|
||||
|
||||
fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) {
|
||||
fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool {
|
||||
critical_section::with(|cs| {
|
||||
let n = alarm.id() as _;
|
||||
let alarm = self.get_alarm(cs, alarm);
|
||||
alarm.timestamp.set(timestamp);
|
||||
|
||||
let t = self.now();
|
||||
|
||||
// If alarm timestamp has passed, trigger it instantly.
|
||||
if timestamp <= t {
|
||||
self.trigger_alarm(n, cs);
|
||||
return;
|
||||
}
|
||||
|
||||
let r = rtc();
|
||||
|
||||
let t = self.now();
|
||||
if timestamp <= t {
|
||||
// If alarm timestamp has passed the alarm will not fire.
|
||||
// Disarm the alarm and return `false` to indicate that.
|
||||
r.intenclr.write(|w| unsafe { w.bits(compare_n(n)) });
|
||||
|
||||
alarm.timestamp.set(u64::MAX);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
// If it hasn't triggered yet, setup it in the compare channel.
|
||||
|
||||
// Write the CC value regardless of whether we're going to enable it now or not.
|
||||
@ -287,6 +290,8 @@ impl Driver for RtcDriver {
|
||||
// It will be setup later by `next_period`.
|
||||
r.intenclr.write(|w| unsafe { w.bits(compare_n(n)) });
|
||||
}
|
||||
|
||||
true
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -68,7 +68,7 @@ impl Driver for TimerDriver {
|
||||
})
|
||||
}
|
||||
|
||||
fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) {
|
||||
fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool {
|
||||
let n = alarm.id() as usize;
|
||||
critical_section::with(|cs| {
|
||||
let alarm = &self.alarms.borrow(cs)[n];
|
||||
@ -81,11 +81,16 @@ impl Driver for TimerDriver {
|
||||
unsafe { pac::TIMER.alarm(n).write_value(timestamp as u32) };
|
||||
|
||||
let now = self.now();
|
||||
|
||||
// If alarm timestamp has passed, trigger it instantly.
|
||||
// This disarms it.
|
||||
if timestamp <= now {
|
||||
self.trigger_alarm(n, cs);
|
||||
// If alarm timestamp has passed the alarm will not fire.
|
||||
// Disarm the alarm and return `false` to indicate that.
|
||||
unsafe { pac::TIMER.armed().write(|w| w.set_armed(1 << n)) }
|
||||
|
||||
alarm.timestamp.set(u64::MAX);
|
||||
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -292,19 +292,23 @@ impl Driver for RtcDriver {
|
||||
})
|
||||
}
|
||||
|
||||
fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) {
|
||||
fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool {
|
||||
critical_section::with(|cs| {
|
||||
let r = T::regs_gp16();
|
||||
|
||||
let n = alarm.id() as _;
|
||||
let n = alarm.id() as usize;
|
||||
let alarm = self.get_alarm(cs, alarm);
|
||||
alarm.timestamp.set(timestamp);
|
||||
|
||||
let t = self.now();
|
||||
if timestamp <= t {
|
||||
// If alarm timestamp has passed the alarm will not fire.
|
||||
// Disarm the alarm and return `false` to indicate that.
|
||||
unsafe { r.dier().modify(|w| w.set_ccie(n + 1, false)) };
|
||||
self.trigger_alarm(n, cs);
|
||||
return;
|
||||
|
||||
alarm.timestamp.set(u64::MAX);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
let safe_timestamp = timestamp.max(t + 3);
|
||||
@ -317,6 +321,8 @@ impl Driver for RtcDriver {
|
||||
let diff = timestamp - t;
|
||||
// NOTE(unsafe) We're in a critical section
|
||||
unsafe { r.dier().modify(|w| w.set_ccie(n + 1, diff < 0xc000)) };
|
||||
|
||||
true
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -26,6 +26,22 @@ unstable-traits = ["embedded-hal-1"]
|
||||
# To use this you must have a time driver provided.
|
||||
defmt-timestamp-uptime = ["defmt"]
|
||||
|
||||
# Create a global, generic queue that can be used with any executor
|
||||
# To use this you must have a time driver provided.
|
||||
generic-queue = []
|
||||
|
||||
# Set the number of timers for the generic queue.
|
||||
#
|
||||
# At most 1 `generic-queue-*` feature can be enabled. If none is enabled, a default of 64 timers is used.
|
||||
#
|
||||
# When using embassy-time from libraries, you should *not* enable any `generic-queue-*` feature, to allow the
|
||||
# end user to pick.
|
||||
generic-queue-8 = ["generic-queue"]
|
||||
generic-queue-16 = ["generic-queue"]
|
||||
generic-queue-32 = ["generic-queue"]
|
||||
generic-queue-64 = ["generic-queue"]
|
||||
generic-queue-128 = ["generic-queue"]
|
||||
|
||||
# Set the `embassy_time` tick rate.
|
||||
#
|
||||
# At most 1 `tick-*` feature can be enabled. If none is enabled, a default of 1MHz is used.
|
||||
@ -111,11 +127,18 @@ embedded-hal-async = { version = "=0.1.0-alpha.3", optional = true}
|
||||
|
||||
futures-util = { version = "0.3.17", default-features = false }
|
||||
embassy-macros = { version = "0.1.0", path = "../embassy-macros"}
|
||||
embassy-sync = { version = "0.1", path = "../embassy-sync" }
|
||||
atomic-polyfill = "1.0.1"
|
||||
critical-section = "1.1"
|
||||
cfg-if = "1.0.0"
|
||||
heapless = "0.7"
|
||||
|
||||
# WASM dependencies
|
||||
wasm-bindgen = { version = "0.2.81", optional = true }
|
||||
js-sys = { version = "0.3", optional = true }
|
||||
wasm-timer = { version = "0.2.5", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
serial_test = "0.9"
|
||||
critical-section = { version = "1.1", features = ["std"] }
|
||||
|
||||
|
@ -105,20 +105,21 @@ pub trait Driver: Send + Sync + 'static {
|
||||
/// Sets an alarm at the given timestamp. When the current timestamp reaches the alarm
|
||||
/// timestamp, the provided callback function will be called.
|
||||
///
|
||||
/// If `timestamp` is already in the past, the alarm callback must be immediately fired.
|
||||
/// In this case, it is allowed (but not mandatory) to call the alarm callback synchronously from `set_alarm`.
|
||||
/// The `Driver` implementation should guarantee that the alarm callback is never called synchronously from `set_alarm`.
|
||||
/// Rather - if `timestamp` is already in the past - `false` should be returned and alarm should not be set,
|
||||
/// or alternatively, the driver should return `true` and arrange to call the alarm callback as soon as possible, but not synchronously.
|
||||
///
|
||||
/// When callback is called, it is guaranteed that now() will return a value greater or equal than timestamp.
|
||||
///
|
||||
/// Only one alarm can be active at a time for each AlarmHandle. This overwrites any previously-set alarm if any.
|
||||
fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64);
|
||||
fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool;
|
||||
}
|
||||
|
||||
extern "Rust" {
|
||||
fn _embassy_time_now() -> u64;
|
||||
fn _embassy_time_allocate_alarm() -> Option<AlarmHandle>;
|
||||
fn _embassy_time_set_alarm_callback(alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ());
|
||||
fn _embassy_time_set_alarm(alarm: AlarmHandle, timestamp: u64);
|
||||
fn _embassy_time_set_alarm(alarm: AlarmHandle, timestamp: u64) -> bool;
|
||||
}
|
||||
|
||||
/// See [`Driver::now`]
|
||||
@ -139,7 +140,7 @@ pub fn set_alarm_callback(alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut (
|
||||
}
|
||||
|
||||
/// See [`Driver::set_alarm`]
|
||||
pub fn set_alarm(alarm: AlarmHandle, timestamp: u64) {
|
||||
pub fn set_alarm(alarm: AlarmHandle, timestamp: u64) -> bool {
|
||||
unsafe { _embassy_time_set_alarm(alarm, timestamp) }
|
||||
}
|
||||
|
||||
@ -167,7 +168,7 @@ macro_rules! time_driver_impl {
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
fn _embassy_time_set_alarm(alarm: $crate::driver::AlarmHandle, timestamp: u64) {
|
||||
fn _embassy_time_set_alarm(alarm: $crate::driver::AlarmHandle, timestamp: u64) -> bool {
|
||||
<$t as $crate::driver::Driver>::set_alarm(&$name, alarm, timestamp)
|
||||
}
|
||||
};
|
||||
|
@ -127,12 +127,14 @@ impl Driver for TimeDriver {
|
||||
alarm.ctx = ctx;
|
||||
}
|
||||
|
||||
fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) {
|
||||
fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool {
|
||||
self.init();
|
||||
let mut alarms = unsafe { self.alarms.as_ref() }.lock().unwrap();
|
||||
let alarm = &mut alarms[alarm.id() as usize];
|
||||
alarm.timestamp = timestamp;
|
||||
unsafe { self.signaler.as_ref() }.signal();
|
||||
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -90,15 +90,23 @@ impl Driver for TimeDriver {
|
||||
}));
|
||||
}
|
||||
|
||||
fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) {
|
||||
fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool {
|
||||
self.init();
|
||||
let mut alarms = unsafe { self.alarms.as_ref() }.lock().unwrap();
|
||||
let alarm = &mut alarms[alarm.id() as usize];
|
||||
let timeout = (timestamp - self.now()) as u32;
|
||||
if let Some(token) = alarm.token {
|
||||
clearTimeout(token);
|
||||
}
|
||||
alarm.token = Some(setTimeout(alarm.closure.as_ref().unwrap(), timeout / 1000));
|
||||
|
||||
let now = self.now();
|
||||
if timestamp <= now {
|
||||
false
|
||||
} else {
|
||||
let timeout = (timestamp - now) as u32;
|
||||
alarm.token = Some(setTimeout(alarm.closure.as_ref().unwrap(), timeout / 1000));
|
||||
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,4 +1,4 @@
|
||||
#![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)]
|
||||
#![cfg_attr(not(any(feature = "std", feature = "wasm", test)), no_std)]
|
||||
#![cfg_attr(feature = "nightly", feature(type_alias_impl_trait))]
|
||||
#![doc = include_str!("../README.md")]
|
||||
#![allow(clippy::new_without_default)]
|
||||
@ -11,6 +11,7 @@ mod delay;
|
||||
pub mod driver;
|
||||
mod duration;
|
||||
mod instant;
|
||||
pub mod queue;
|
||||
mod tick;
|
||||
mod timer;
|
||||
|
||||
@ -18,6 +19,8 @@ mod timer;
|
||||
mod driver_std;
|
||||
#[cfg(feature = "wasm")]
|
||||
mod driver_wasm;
|
||||
#[cfg(feature = "generic-queue")]
|
||||
mod queue_generic;
|
||||
|
||||
pub use delay::{block_for, Delay};
|
||||
pub use duration::Duration;
|
||||
|
58
embassy-time/src/queue.rs
Normal file
58
embassy-time/src/queue.rs
Normal file
@ -0,0 +1,58 @@
|
||||
//! Timer queue implementation
|
||||
//!
|
||||
//! This module defines the interface a timer queue needs to implement to power the `embassy_time` module.
|
||||
//!
|
||||
//! # Implementing a timer queue
|
||||
//!
|
||||
//! - Define a struct `MyTimerQueue`
|
||||
//! - Implement [`TimerQueue`] for it
|
||||
//! - Register it as the global timer queue with [`timer_queue_impl`](crate::timer_queue_impl).
|
||||
//!
|
||||
//! # Linkage details
|
||||
//!
|
||||
//! Check the documentation of the [`driver`](crate::driver) module for more information.
|
||||
//!
|
||||
//! Similarly to driver, if there is none or multiple timer queues in the crate tree, linking will fail.
|
||||
//!
|
||||
//! # Example
|
||||
//!
|
||||
//! ```
|
||||
//! use core::task::Waker;
|
||||
//!
|
||||
//! use embassy_time::Instant;
|
||||
//! use embassy_time::queue::{TimerQueue};
|
||||
//!
|
||||
//! struct MyTimerQueue{}; // not public!
|
||||
//! embassy_time::timer_queue_impl!(static QUEUE: MyTimerQueue = MyTimerQueue{});
|
||||
//!
|
||||
//! impl TimerQueue for MyTimerQueue {
|
||||
//! fn schedule_wake(&'static self, at: Instant, waker: &Waker) {
|
||||
//! todo!()
|
||||
//! }
|
||||
//! }
|
||||
//! ```
|
||||
use core::task::Waker;
|
||||
|
||||
use crate::Instant;
|
||||
|
||||
/// Timer queue
|
||||
pub trait TimerQueue {
|
||||
/// Schedules a waker in the queue to be awoken at moment `at`.
|
||||
/// If this moment is in the past, the waker might be awoken immediately.
|
||||
fn schedule_wake(&'static self, at: Instant, waker: &Waker);
|
||||
}
|
||||
|
||||
/// Set the TimerQueue implementation.
|
||||
///
|
||||
/// See the module documentation for an example.
|
||||
#[macro_export]
|
||||
macro_rules! timer_queue_impl {
|
||||
(static $name:ident: $t: ty = $val:expr) => {
|
||||
static $name: $t = $val;
|
||||
|
||||
#[no_mangle]
|
||||
fn _embassy_time_schedule_wake(at: $crate::Instant, waker: &core::task::Waker) {
|
||||
<$t as $crate::queue::TimerQueue>::schedule_wake(&$name, at, waker);
|
||||
}
|
||||
};
|
||||
}
|
449
embassy-time/src/queue_generic.rs
Normal file
449
embassy-time/src/queue_generic.rs
Normal file
@ -0,0 +1,449 @@
|
||||
use core::cell::RefCell;
|
||||
use core::cmp::{min, Ordering};
|
||||
use core::task::Waker;
|
||||
|
||||
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
|
||||
use embassy_sync::blocking_mutex::Mutex;
|
||||
use heapless::Vec;
|
||||
|
||||
use crate::driver::{allocate_alarm, set_alarm, set_alarm_callback, AlarmHandle};
|
||||
use crate::queue::TimerQueue;
|
||||
use crate::Instant;
|
||||
|
||||
#[cfg(feature = "generic-queue-8")]
|
||||
const QUEUE_SIZE: usize = 8;
|
||||
#[cfg(feature = "generic-queue-16")]
|
||||
const QUEUE_SIZE: usize = 16;
|
||||
#[cfg(feature = "generic-queue-32")]
|
||||
const QUEUE_SIZE: usize = 32;
|
||||
#[cfg(feature = "generic-queue-64")]
|
||||
const QUEUE_SIZE: usize = 32;
|
||||
#[cfg(feature = "generic-queue-128")]
|
||||
const QUEUE_SIZE: usize = 128;
|
||||
#[cfg(not(any(
|
||||
feature = "generic-queue-8",
|
||||
feature = "generic-queue-16",
|
||||
feature = "generic-queue-32",
|
||||
feature = "generic-queue-64",
|
||||
feature = "generic-queue-128"
|
||||
)))]
|
||||
const QUEUE_SIZE: usize = 64;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Timer {
|
||||
at: Instant,
|
||||
waker: Waker,
|
||||
}
|
||||
|
||||
impl PartialEq for Timer {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.at == other.at
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for Timer {}
|
||||
|
||||
impl PartialOrd for Timer {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
self.at.partial_cmp(&other.at)
|
||||
}
|
||||
}
|
||||
|
||||
impl Ord for Timer {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
self.at.cmp(&other.at)
|
||||
}
|
||||
}
|
||||
|
||||
struct InnerQueue {
|
||||
queue: Vec<Timer, QUEUE_SIZE>,
|
||||
alarm: AlarmHandle,
|
||||
}
|
||||
|
||||
impl InnerQueue {
|
||||
fn schedule_wake(&mut self, at: Instant, waker: &Waker) {
|
||||
self.queue
|
||||
.iter_mut()
|
||||
.find(|timer| timer.waker.will_wake(waker))
|
||||
.map(|mut timer| {
|
||||
timer.at = min(timer.at, at);
|
||||
})
|
||||
.unwrap_or_else(|| {
|
||||
let mut timer = Timer {
|
||||
waker: waker.clone(),
|
||||
at,
|
||||
};
|
||||
|
||||
loop {
|
||||
match self.queue.push(timer) {
|
||||
Ok(()) => break,
|
||||
Err(e) => timer = e,
|
||||
}
|
||||
|
||||
self.queue.pop().unwrap().waker.wake();
|
||||
}
|
||||
});
|
||||
|
||||
// Don't wait for the alarm callback to trigger and directly
|
||||
// dispatch all timers that are already due
|
||||
//
|
||||
// Then update the alarm if necessary
|
||||
self.dispatch();
|
||||
}
|
||||
|
||||
fn dispatch(&mut self) {
|
||||
loop {
|
||||
let now = Instant::now();
|
||||
|
||||
let mut next_alarm = Instant::MAX;
|
||||
|
||||
let mut i = 0;
|
||||
while i < self.queue.len() {
|
||||
let timer = &self.queue[i];
|
||||
if timer.at <= now {
|
||||
let timer = self.queue.swap_remove(i);
|
||||
timer.waker.wake();
|
||||
} else {
|
||||
next_alarm = min(next_alarm, timer.at);
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
|
||||
if self.update_alarm(next_alarm) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn update_alarm(&mut self, next_alarm: Instant) -> bool {
|
||||
if next_alarm == Instant::MAX {
|
||||
true
|
||||
} else {
|
||||
set_alarm(self.alarm, next_alarm.as_ticks())
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_alarm(&mut self) {
|
||||
self.dispatch();
|
||||
}
|
||||
}
|
||||
|
||||
struct Queue {
|
||||
inner: Mutex<CriticalSectionRawMutex, RefCell<Option<InnerQueue>>>,
|
||||
}
|
||||
|
||||
impl Queue {
|
||||
const fn new() -> Self {
|
||||
Self {
|
||||
inner: Mutex::new(RefCell::new(None)),
|
||||
}
|
||||
}
|
||||
|
||||
fn schedule_wake(&'static self, at: Instant, waker: &Waker) {
|
||||
self.inner.lock(|inner| {
|
||||
let mut inner = inner.borrow_mut();
|
||||
|
||||
if inner.is_none() {}
|
||||
|
||||
inner
|
||||
.get_or_insert_with(|| {
|
||||
let handle = unsafe { allocate_alarm() }.unwrap();
|
||||
set_alarm_callback(handle, Self::handle_alarm_callback, self as *const _ as _);
|
||||
InnerQueue {
|
||||
queue: Vec::new(),
|
||||
alarm: handle,
|
||||
}
|
||||
})
|
||||
.schedule_wake(at, waker)
|
||||
});
|
||||
}
|
||||
|
||||
fn handle_alarm(&self) {
|
||||
self.inner
|
||||
.lock(|inner| inner.borrow_mut().as_mut().unwrap().handle_alarm());
|
||||
}
|
||||
|
||||
fn handle_alarm_callback(ctx: *mut ()) {
|
||||
unsafe { (ctx as *const Self).as_ref().unwrap() }.handle_alarm();
|
||||
}
|
||||
}
|
||||
|
||||
impl TimerQueue for Queue {
|
||||
fn schedule_wake(&'static self, at: Instant, waker: &Waker) {
|
||||
Queue::schedule_wake(self, at, waker);
|
||||
}
|
||||
}
|
||||
|
||||
crate::timer_queue_impl!(static QUEUE: Queue = Queue::new());
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use core::cell::Cell;
|
||||
use core::task::{RawWaker, RawWakerVTable, Waker};
|
||||
use std::rc::Rc;
|
||||
use std::sync::Mutex;
|
||||
|
||||
use serial_test::serial;
|
||||
|
||||
use super::InnerQueue;
|
||||
use crate::driver::{AlarmHandle, Driver};
|
||||
use crate::queue_generic::QUEUE;
|
||||
use crate::Instant;
|
||||
|
||||
struct InnerTestDriver {
|
||||
now: u64,
|
||||
alarm: u64,
|
||||
callback: fn(*mut ()),
|
||||
ctx: *mut (),
|
||||
}
|
||||
|
||||
impl InnerTestDriver {
|
||||
const fn new() -> Self {
|
||||
Self {
|
||||
now: 0,
|
||||
alarm: u64::MAX,
|
||||
callback: Self::noop,
|
||||
ctx: core::ptr::null_mut(),
|
||||
}
|
||||
}
|
||||
|
||||
fn noop(_ctx: *mut ()) {}
|
||||
}
|
||||
|
||||
unsafe impl Send for InnerTestDriver {}
|
||||
|
||||
struct TestDriver(Mutex<InnerTestDriver>);
|
||||
|
||||
impl TestDriver {
|
||||
const fn new() -> Self {
|
||||
Self(Mutex::new(InnerTestDriver::new()))
|
||||
}
|
||||
|
||||
fn reset(&self) {
|
||||
*self.0.lock().unwrap() = InnerTestDriver::new();
|
||||
}
|
||||
|
||||
fn set_now(&self, now: u64) {
|
||||
let notify = {
|
||||
let mut inner = self.0.lock().unwrap();
|
||||
|
||||
if inner.now < now {
|
||||
inner.now = now;
|
||||
|
||||
if inner.alarm <= now {
|
||||
inner.alarm = u64::MAX;
|
||||
|
||||
Some((inner.callback, inner.ctx))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
panic!("Going back in time?");
|
||||
}
|
||||
};
|
||||
|
||||
if let Some((callback, ctx)) = notify {
|
||||
(callback)(ctx);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Driver for TestDriver {
|
||||
fn now(&self) -> u64 {
|
||||
self.0.lock().unwrap().now
|
||||
}
|
||||
|
||||
unsafe fn allocate_alarm(&self) -> Option<AlarmHandle> {
|
||||
Some(AlarmHandle::new(0))
|
||||
}
|
||||
|
||||
fn set_alarm_callback(&self, _alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) {
|
||||
let mut inner = self.0.lock().unwrap();
|
||||
|
||||
inner.callback = callback;
|
||||
inner.ctx = ctx;
|
||||
}
|
||||
|
||||
fn set_alarm(&self, _alarm: AlarmHandle, timestamp: u64) -> bool {
|
||||
let mut inner = self.0.lock().unwrap();
|
||||
|
||||
if timestamp <= inner.now {
|
||||
false
|
||||
} else {
|
||||
inner.alarm = timestamp;
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct TestWaker {
|
||||
pub awoken: Rc<Cell<bool>>,
|
||||
pub waker: Waker,
|
||||
}
|
||||
|
||||
impl TestWaker {
|
||||
fn new() -> Self {
|
||||
let flag = Rc::new(Cell::new(false));
|
||||
|
||||
const VTABLE: RawWakerVTable = RawWakerVTable::new(
|
||||
|data: *const ()| {
|
||||
unsafe {
|
||||
Rc::increment_strong_count(data as *const Cell<bool>);
|
||||
}
|
||||
|
||||
RawWaker::new(data as _, &VTABLE)
|
||||
},
|
||||
|data: *const ()| unsafe {
|
||||
let data = data as *const Cell<bool>;
|
||||
data.as_ref().unwrap().set(true);
|
||||
Rc::decrement_strong_count(data);
|
||||
},
|
||||
|data: *const ()| unsafe {
|
||||
(data as *const Cell<bool>).as_ref().unwrap().set(true);
|
||||
},
|
||||
|data: *const ()| unsafe {
|
||||
Rc::decrement_strong_count(data);
|
||||
},
|
||||
);
|
||||
|
||||
let raw = RawWaker::new(Rc::into_raw(flag.clone()) as _, &VTABLE);
|
||||
|
||||
Self {
|
||||
awoken: flag.clone(),
|
||||
waker: unsafe { Waker::from_raw(raw) },
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
crate::time_driver_impl!(static DRIVER: TestDriver = TestDriver::new());
|
||||
|
||||
fn setup() {
|
||||
DRIVER.reset();
|
||||
|
||||
QUEUE.inner.lock(|inner| {
|
||||
*inner.borrow_mut() = InnerQueue::new();
|
||||
});
|
||||
}
|
||||
|
||||
fn queue_len() -> usize {
|
||||
QUEUE.inner.lock(|inner| inner.borrow().queue.iter().count())
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_schedule() {
|
||||
setup();
|
||||
|
||||
assert_eq!(queue_len(), 0);
|
||||
|
||||
let waker = TestWaker::new();
|
||||
|
||||
QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker);
|
||||
|
||||
assert!(!waker.awoken.get());
|
||||
assert_eq!(queue_len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_schedule_same() {
|
||||
setup();
|
||||
|
||||
let waker = TestWaker::new();
|
||||
|
||||
QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker);
|
||||
|
||||
assert_eq!(queue_len(), 1);
|
||||
|
||||
QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker);
|
||||
|
||||
assert_eq!(queue_len(), 1);
|
||||
|
||||
QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker);
|
||||
|
||||
assert_eq!(queue_len(), 1);
|
||||
|
||||
let waker2 = TestWaker::new();
|
||||
|
||||
QUEUE.schedule_wake(Instant::from_secs(100), &waker2.waker);
|
||||
|
||||
assert_eq!(queue_len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_trigger() {
|
||||
setup();
|
||||
|
||||
let waker = TestWaker::new();
|
||||
|
||||
QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker);
|
||||
|
||||
assert!(!waker.awoken.get());
|
||||
|
||||
DRIVER.set_now(Instant::from_secs(99).as_ticks());
|
||||
|
||||
assert!(!waker.awoken.get());
|
||||
|
||||
assert_eq!(queue_len(), 1);
|
||||
|
||||
DRIVER.set_now(Instant::from_secs(100).as_ticks());
|
||||
|
||||
assert!(waker.awoken.get());
|
||||
|
||||
assert_eq!(queue_len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_immediate_trigger() {
|
||||
setup();
|
||||
|
||||
let waker = TestWaker::new();
|
||||
|
||||
QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker);
|
||||
|
||||
DRIVER.set_now(Instant::from_secs(50).as_ticks());
|
||||
|
||||
let waker2 = TestWaker::new();
|
||||
|
||||
QUEUE.schedule_wake(Instant::from_secs(40), &waker2.waker);
|
||||
|
||||
assert!(!waker.awoken.get());
|
||||
assert!(waker2.awoken.get());
|
||||
assert_eq!(queue_len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_queue_overflow() {
|
||||
setup();
|
||||
|
||||
for i in 1..super::QUEUE_SIZE {
|
||||
let waker = TestWaker::new();
|
||||
|
||||
QUEUE.schedule_wake(Instant::from_secs(310), &waker.waker);
|
||||
|
||||
assert_eq!(queue_len(), i);
|
||||
assert!(!waker.awoken.get());
|
||||
}
|
||||
|
||||
let first_waker = TestWaker::new();
|
||||
|
||||
QUEUE.schedule_wake(Instant::from_secs(300), &first_waker.waker);
|
||||
|
||||
assert_eq!(queue_len(), super::QUEUE_SIZE);
|
||||
assert!(!first_waker.awoken.get());
|
||||
|
||||
let second_waker = TestWaker::new();
|
||||
|
||||
QUEUE.schedule_wake(Instant::from_secs(305), &second_waker.waker);
|
||||
|
||||
assert_eq!(queue_len(), super::QUEUE_SIZE);
|
||||
assert!(first_waker.awoken.get());
|
||||
|
||||
QUEUE.schedule_wake(Instant::from_secs(320), &TestWaker::new().waker);
|
||||
assert_eq!(queue_len(), super::QUEUE_SIZE);
|
||||
assert!(second_waker.awoken.get());
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user