diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index e1258ebb..5bcb1e6e 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -354,46 +354,54 @@ impl Executor { /// somehow schedule for `poll()` to be called later, at a time you know for sure there's /// no `poll()` already running. pub unsafe fn poll(&'static self) { - #[cfg(feature = "integrated-timers")] - self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task)); + loop { + #[cfg(feature = "integrated-timers")] + self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task)); - self.run_queue.dequeue_all(|p| { - let task = p.as_ref(); + self.run_queue.dequeue_all(|p| { + let task = p.as_ref(); + + #[cfg(feature = "integrated-timers")] + task.expires_at.set(Instant::MAX); + + let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); + if state & STATE_SPAWNED == 0 { + // If task is not running, ignore it. This can happen in the following scenario: + // - Task gets dequeued, poll starts + // - While task is being polled, it gets woken. It gets placed in the queue. + // - Task poll finishes, returning done=true + // - RUNNING bit is cleared, but the task is already in the queue. + return; + } + + #[cfg(feature = "rtos-trace")] + trace::task_exec_begin(p.as_ptr() as u32); + + // Run the task + task.poll_fn.read()(p as _); + + #[cfg(feature = "rtos-trace")] + trace::task_exec_end(); + + // Enqueue or update into timer_queue + #[cfg(feature = "integrated-timers")] + self.timer_queue.update(p); + }); #[cfg(feature = "integrated-timers")] - task.expires_at.set(Instant::MAX); - - let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); - if state & STATE_SPAWNED == 0 { - // If task is not running, ignore it. This can happen in the following scenario: - // - Task gets dequeued, poll starts - // - While task is being polled, it gets woken. It gets placed in the queue. - // - Task poll finishes, returning done=true - // - RUNNING bit is cleared, but the task is already in the queue. - return; + { + // If this is already in the past, set_alarm might return false + // In that case do another poll loop iteration. + let next_expiration = self.timer_queue.next_expiration(); + if driver::set_alarm(self.alarm, next_expiration.as_ticks()) { + break; + } } - #[cfg(feature = "rtos-trace")] - trace::task_exec_begin(p.as_ptr() as u32); - - // Run the task - task.poll_fn.read()(p as _); - - #[cfg(feature = "rtos-trace")] - trace::task_exec_end(); - - // Enqueue or update into timer_queue - #[cfg(feature = "integrated-timers")] - self.timer_queue.update(p); - }); - - #[cfg(feature = "integrated-timers")] - { - // If this is already in the past, set_alarm will immediately trigger the alarm. - // This will cause `signal_fn` to be called, which will cause `poll()` to be called again, - // so we immediately do another poll loop iteration. - let next_expiration = self.timer_queue.next_expiration(); - driver::set_alarm(self.alarm, next_expiration.as_ticks()); + #[cfg(not(feature = "integrated-timers"))] + { + break; + } } #[cfg(feature = "rtos-trace")] diff --git a/embassy-nrf/src/time_driver.rs b/embassy-nrf/src/time_driver.rs index c32a4463..0d03ad52 100644 --- a/embassy-nrf/src/time_driver.rs +++ b/embassy-nrf/src/time_driver.rs @@ -243,20 +243,19 @@ impl Driver for RtcDriver { }) } - fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) { + fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool { critical_section::with(|cs| { + let t = self.now(); + + // If alarm timestamp has passed don't set the alarm and return `false` to indicate that. + if timestamp <= t { + return false; + } + let n = alarm.id() as _; let alarm = self.get_alarm(cs, alarm); alarm.timestamp.set(timestamp); - let t = self.now(); - - // If alarm timestamp has passed, trigger it instantly. - if timestamp <= t { - self.trigger_alarm(n, cs); - return; - } - let r = rtc(); // If it hasn't triggered yet, setup it in the compare channel. @@ -287,6 +286,8 @@ impl Driver for RtcDriver { // It will be setup later by `next_period`. r.intenclr.write(|w| unsafe { w.bits(compare_n(n)) }); } + + true }) } } diff --git a/embassy-rp/src/timer.rs b/embassy-rp/src/timer.rs index 5215c0c0..8f280f55 100644 --- a/embassy-rp/src/timer.rs +++ b/embassy-rp/src/timer.rs @@ -68,9 +68,16 @@ impl Driver for TimerDriver { }) } - fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) { + fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool { let n = alarm.id() as usize; critical_section::with(|cs| { + let now = self.now(); + + // If alarm timestamp has passed don't set the alarm and return `false` to indicate that. + if timestamp <= now { + return false; + } + let alarm = &self.alarms.borrow(cs)[n]; alarm.timestamp.set(timestamp); @@ -80,13 +87,7 @@ impl Driver for TimerDriver { // it is checked if the alarm time has passed. unsafe { pac::TIMER.alarm(n).write_value(timestamp as u32) }; - let now = self.now(); - - // If alarm timestamp has passed, trigger it instantly. - // This disarms it. - if timestamp <= now { - self.trigger_alarm(n, cs); - } + true }) } } diff --git a/embassy-stm32/src/time_driver.rs b/embassy-stm32/src/time_driver.rs index ed3225c5..e4c266e7 100644 --- a/embassy-stm32/src/time_driver.rs +++ b/embassy-stm32/src/time_driver.rs @@ -292,21 +292,21 @@ impl Driver for RtcDriver { }) } - fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) { + fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool { critical_section::with(|cs| { + let t = self.now(); + + // If alarm timestamp has passed don't set the alarm and return `false` to indicate that. + if timestamp <= t { + return false; + } + let r = T::regs_gp16(); let n = alarm.id() as _; let alarm = self.get_alarm(cs, alarm); alarm.timestamp.set(timestamp); - let t = self.now(); - if timestamp <= t { - unsafe { r.dier().modify(|w| w.set_ccie(n + 1, false)) }; - self.trigger_alarm(n, cs); - return; - } - let safe_timestamp = timestamp.max(t + 3); // Write the CCR value regardless of whether we're going to enable it now or not. @@ -317,6 +317,8 @@ impl Driver for RtcDriver { let diff = timestamp - t; // NOTE(unsafe) We're in a critical section unsafe { r.dier().modify(|w| w.set_ccie(n + 1, diff < 0xc000)) }; + + true }) } } diff --git a/embassy-time/src/driver.rs b/embassy-time/src/driver.rs index 79ae14b9..5c2ad3b2 100644 --- a/embassy-time/src/driver.rs +++ b/embassy-time/src/driver.rs @@ -105,20 +105,21 @@ pub trait Driver: Send + Sync + 'static { /// Sets an alarm at the given timestamp. When the current timestamp reaches the alarm /// timestamp, the provided callback function will be called. /// - /// If `timestamp` is already in the past, the alarm callback must be immediately fired. - /// In this case, it is allowed (but not mandatory) to call the alarm callback synchronously from `set_alarm`. + /// The `Driver` implementation should guarantee that the alarm callback is never called synchronously from `set_alarm`. + /// Rather - if `timestamp` is already in the past - `false` should be returned and alarm should not be set, + /// or alternatively, the driver should return `true` and arrange to call the alarm callback as soon as possible, but not synchronously. /// /// When callback is called, it is guaranteed that now() will return a value greater or equal than timestamp. /// /// Only one alarm can be active at a time for each AlarmHandle. This overwrites any previously-set alarm if any. - fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64); + fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool; } extern "Rust" { fn _embassy_time_now() -> u64; fn _embassy_time_allocate_alarm() -> Option; fn _embassy_time_set_alarm_callback(alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()); - fn _embassy_time_set_alarm(alarm: AlarmHandle, timestamp: u64); + fn _embassy_time_set_alarm(alarm: AlarmHandle, timestamp: u64) -> bool; } /// See [`Driver::now`] @@ -139,7 +140,7 @@ pub fn set_alarm_callback(alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ( } /// See [`Driver::set_alarm`] -pub fn set_alarm(alarm: AlarmHandle, timestamp: u64) { +pub fn set_alarm(alarm: AlarmHandle, timestamp: u64) -> bool { unsafe { _embassy_time_set_alarm(alarm, timestamp) } } @@ -167,7 +168,7 @@ macro_rules! time_driver_impl { } #[no_mangle] - fn _embassy_time_set_alarm(alarm: $crate::driver::AlarmHandle, timestamp: u64) { + fn _embassy_time_set_alarm(alarm: $crate::driver::AlarmHandle, timestamp: u64) -> bool { <$t as $crate::driver::Driver>::set_alarm(&$name, alarm, timestamp) } }; diff --git a/embassy-time/src/driver_std.rs b/embassy-time/src/driver_std.rs index 2ddb2e60..fc7fd197 100644 --- a/embassy-time/src/driver_std.rs +++ b/embassy-time/src/driver_std.rs @@ -127,12 +127,14 @@ impl Driver for TimeDriver { alarm.ctx = ctx; } - fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) { + fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool { self.init(); let mut alarms = unsafe { self.alarms.as_ref() }.lock().unwrap(); let alarm = &mut alarms[alarm.id() as usize]; alarm.timestamp = timestamp; unsafe { self.signaler.as_ref() }.signal(); + + true } } diff --git a/embassy-time/src/driver_wasm.rs b/embassy-time/src/driver_wasm.rs index e4497e6a..d7a6b0d8 100644 --- a/embassy-time/src/driver_wasm.rs +++ b/embassy-time/src/driver_wasm.rs @@ -81,13 +81,15 @@ impl Driver for TimeDriver { } } - fn set_alarm_callback(&self, alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) { + fn set_alarm_callback(&self, alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) -> bool { self.init(); let mut alarms = unsafe { self.alarms.as_ref() }.lock().unwrap(); let alarm = &mut alarms[alarm.id() as usize]; alarm.closure.replace(Closure::new(move || { callback(ctx); })); + + true } fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) { diff --git a/embassy-time/src/queue_generic.rs b/embassy-time/src/queue_generic.rs index 1c4e5398..83f73484 100644 --- a/embassy-time/src/queue_generic.rs +++ b/embassy-time/src/queue_generic.rs @@ -2,7 +2,6 @@ use core::cell::RefCell; use core::cmp::Ordering; use core::task::Waker; -use atomic_polyfill::{AtomicU64, Ordering as AtomicOrdering}; use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; use embassy_sync::blocking_mutex::Mutex; use heapless::sorted_linked_list::{LinkedIndexU8, Min, SortedLinkedList}; @@ -71,7 +70,7 @@ impl InnerQueue { } } - fn schedule_wake(&mut self, at: Instant, waker: &Waker, alarm_schedule: &AtomicU64) { + fn schedule_wake(&mut self, at: Instant, waker: &Waker) { self.queue .find_mut(|timer| timer.waker.will_wake(waker)) .map(|mut timer| { @@ -98,50 +97,54 @@ impl InnerQueue { // dispatch all timers that are already due // // Then update the alarm if necessary - self.dispatch(alarm_schedule); + self.dispatch(); } - fn dispatch(&mut self, alarm_schedule: &AtomicU64) { - let now = Instant::now(); + fn dispatch(&mut self) { + loop { + let now = Instant::now(); - while self.queue.peek().filter(|timer| timer.at <= now).is_some() { - self.queue.pop().unwrap().waker.wake(); + while self.queue.peek().filter(|timer| timer.at <= now).is_some() { + self.queue.pop().unwrap().waker.wake(); + } + + if self.update_alarm() { + break; + } } - - self.update_alarm(alarm_schedule); } - fn update_alarm(&mut self, alarm_schedule: &AtomicU64) { + fn update_alarm(&mut self) -> bool { if let Some(timer) = self.queue.peek() { let new_at = timer.at; if self.alarm_at != new_at { self.alarm_at = new_at; - alarm_schedule.store(new_at.as_ticks(), AtomicOrdering::SeqCst); + + return set_alarm(self.alarm.unwrap(), self.alarm_at.as_ticks()); } } else { self.alarm_at = Instant::MAX; - alarm_schedule.store(Instant::MAX.as_ticks(), AtomicOrdering::SeqCst); } + + true } - fn handle_alarm(&mut self, alarm_schedule: &AtomicU64) { + fn handle_alarm(&mut self) { self.alarm_at = Instant::MAX; - self.dispatch(alarm_schedule); + self.dispatch(); } } struct Queue { inner: Mutex>, - alarm_schedule: AtomicU64, } impl Queue { const fn new() -> Self { Self { inner: Mutex::new(RefCell::new(InnerQueue::new())), - alarm_schedule: AtomicU64::new(u64::MAX), } } @@ -156,28 +159,12 @@ impl Queue { set_alarm_callback(handle, Self::handle_alarm_callback, self as *const _ as _); } - inner.schedule_wake(at, waker, &self.alarm_schedule) + inner.schedule_wake(at, waker) }); - - self.update_alarm(); - } - - fn update_alarm(&self) { - // Need to set the alarm when we are *not* holding the mutex on the inner queue - // because mutexes are not re-entrant, which is a problem because `set_alarm` might immediately - // call us back if the timestamp is in the past. - let alarm_at = self.alarm_schedule.swap(u64::MAX, AtomicOrdering::SeqCst); - - if alarm_at < u64::MAX { - set_alarm(self.inner.lock(|inner| inner.borrow().alarm.unwrap()), alarm_at); - } } fn handle_alarm(&self) { - self.inner - .lock(|inner| inner.borrow_mut().handle_alarm(&self.alarm_schedule)); - - self.update_alarm(); + self.inner.lock(|inner| inner.borrow_mut().handle_alarm()); } fn handle_alarm_callback(ctx: *mut ()) { @@ -196,7 +183,6 @@ crate::timer_queue_impl!(static QUEUE: Queue = Queue::new()); #[cfg(test)] mod tests { use core::cell::Cell; - use core::sync::atomic::Ordering; use core::task::{RawWaker, RawWakerVTable, Waker}; use std::rc::Rc; use std::sync::Mutex; @@ -282,20 +268,14 @@ mod tests { inner.ctx = ctx; } - fn set_alarm(&self, _alarm: AlarmHandle, timestamp: u64) { - let notify = { - let mut inner = self.0.lock().unwrap(); + fn set_alarm(&self, _alarm: AlarmHandle, timestamp: u64) -> bool { + let mut inner = self.0.lock().unwrap(); - if timestamp <= inner.now { - Some((inner.callback, inner.ctx)) - } else { - inner.alarm = timestamp; - None - } - }; - - if let Some((callback, ctx)) = notify { - (callback)(ctx); + if timestamp <= inner.now { + false + } else { + inner.alarm = timestamp; + true } } } @@ -344,7 +324,6 @@ mod tests { fn setup() { DRIVER.reset(); - QUEUE.alarm_schedule.store(u64::MAX, Ordering::SeqCst); QUEUE.inner.lock(|inner| { *inner.borrow_mut() = InnerQueue::new(); });