executor: remove atomic-polyfill.

This commit is contained in:
Dario Nieuwenhuis 2023-11-14 22:32:48 +01:00
parent 50a983fd9b
commit bef9b7a853
10 changed files with 280 additions and 67 deletions

View File

@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## Unreleased
- Add `main` macro reexport for Xtensa arch.
- Remove use of `atomic-polyfill`. The executor now has multiple implementations of its internal data structures for cases where the target supports atomics or doesn't.
## 0.3.2 - 2023-11-06
- Use `atomic-polyfill` for `riscv32`

View File

@ -34,7 +34,7 @@ _arch = [] # some arch was picked
arch-std = ["_arch", "critical-section/std"]
arch-cortex-m = ["_arch", "dep:cortex-m"]
arch-xtensa = ["_arch"]
arch-riscv32 = ["_arch"]
arch-riscv32 = ["_arch", "dep:portable-atomic"]
arch-wasm = ["_arch", "dep:wasm-bindgen", "dep:js-sys"]
# Enable the thread-mode executor (using WFE/SEV in Cortex-M, WFI in other embedded archs)
@ -59,9 +59,12 @@ rtos-trace = { version = "0.1.2", optional = true }
embassy-macros = { version = "0.2.1", path = "../embassy-macros" }
embassy-time = { version = "0.1.5", path = "../embassy-time", optional = true}
atomic-polyfill = "1.0.1"
critical-section = "1.1"
# needed for riscv
# remove when https://github.com/rust-lang/rust/pull/114499 is merged
portable-atomic = { version = "1.5", optional = true }
# arch-cortex-m dependencies
cortex-m = { version = "0.7.6", optional = true }

View File

@ -115,12 +115,12 @@ mod thread {
pub use interrupt::*;
#[cfg(feature = "executor-interrupt")]
mod interrupt {
use core::cell::UnsafeCell;
use core::cell::{Cell, UnsafeCell};
use core::mem::MaybeUninit;
use atomic_polyfill::{AtomicBool, Ordering};
use cortex_m::interrupt::InterruptNumber;
use cortex_m::peripheral::NVIC;
use critical_section::Mutex;
use crate::raw;
@ -146,7 +146,7 @@ mod interrupt {
/// It is somewhat more complex to use, it's recommended to use the thread-mode
/// [`Executor`] instead, if it works for your use case.
pub struct InterruptExecutor {
started: AtomicBool,
started: Mutex<Cell<bool>>,
executor: UnsafeCell<MaybeUninit<raw::Executor>>,
}
@ -158,7 +158,7 @@ mod interrupt {
#[inline]
pub const fn new() -> Self {
Self {
started: AtomicBool::new(false),
started: Mutex::new(Cell::new(false)),
executor: UnsafeCell::new(MaybeUninit::uninit()),
}
}
@ -167,7 +167,8 @@ mod interrupt {
///
/// # Safety
///
/// You MUST call this from the interrupt handler, and from nowhere else.
/// - You MUST call this from the interrupt handler, and from nowhere else.
/// - You must not call this before calling `start()`.
pub unsafe fn on_interrupt(&'static self) {
let executor = unsafe { (&*self.executor.get()).assume_init_ref() };
executor.poll();
@ -196,11 +197,7 @@ mod interrupt {
/// do it after.
///
pub fn start(&'static self, irq: impl InterruptNumber) -> crate::SendSpawner {
if self
.started
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_err()
{
if critical_section::with(|cs| self.started.borrow(cs).replace(true)) {
panic!("InterruptExecutor::start() called multiple times on the same executor.");
}
@ -222,10 +219,10 @@ mod interrupt {
/// This returns a [`SendSpawner`] you can use to spawn tasks on this
/// executor.
///
/// This MUST only be called on an executor that has already been spawned.
/// This MUST only be called on an executor that has already been started.
/// The function will panic otherwise.
pub fn spawner(&'static self) -> crate::SendSpawner {
if !self.started.load(Ordering::Acquire) {
if !critical_section::with(|cs| self.started.borrow(cs).get()) {
panic!("InterruptExecutor::spawner() called on uninitialized executor.");
}
let executor = unsafe { (&*self.executor.get()).assume_init_ref() };

View File

@ -7,9 +7,9 @@ pub use thread::*;
mod thread {
use core::marker::PhantomData;
use atomic_polyfill::{AtomicBool, Ordering};
#[cfg(feature = "nightly")]
pub use embassy_macros::main_riscv as main;
use portable_atomic::{AtomicBool, Ordering};
use crate::{raw, Spawner};

View File

@ -7,7 +7,14 @@
//! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe
//! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_macros::task) macro, which are fully safe.
#[cfg_attr(target_has_atomic = "ptr", path = "run_queue_atomics.rs")]
#[cfg_attr(not(target_has_atomic = "ptr"), path = "run_queue_critical_section.rs")]
mod run_queue;
#[cfg_attr(target_has_atomic = "8", path = "state_atomics.rs")]
#[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")]
mod state;
#[cfg(feature = "integrated-timers")]
mod timer_queue;
pub(crate) mod util;
@ -21,7 +28,6 @@ use core::pin::Pin;
use core::ptr::NonNull;
use core::task::{Context, Poll};
use atomic_polyfill::{AtomicU32, Ordering};
#[cfg(feature = "integrated-timers")]
use embassy_time::driver::{self, AlarmHandle};
#[cfg(feature = "integrated-timers")]
@ -30,21 +36,14 @@ use embassy_time::Instant;
use rtos_trace::trace;
use self::run_queue::{RunQueue, RunQueueItem};
use self::state::State;
use self::util::{SyncUnsafeCell, UninitCell};
pub use self::waker::task_from_waker;
use super::SpawnToken;
/// Task is spawned (has a future)
pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
/// Task is in the executor run queue
pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1;
/// Task is in the executor timer queue
#[cfg(feature = "integrated-timers")]
pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
/// Raw task header for use in task pointers.
pub(crate) struct TaskHeader {
pub(crate) state: AtomicU32,
pub(crate) state: State,
pub(crate) run_queue_item: RunQueueItem,
pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>,
poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
@ -116,7 +115,7 @@ impl<F: Future + 'static> TaskStorage<F> {
pub const fn new() -> Self {
Self {
raw: TaskHeader {
state: AtomicU32::new(0),
state: State::new(),
run_queue_item: RunQueueItem::new(),
executor: SyncUnsafeCell::new(None),
// Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
@ -161,7 +160,7 @@ impl<F: Future + 'static> TaskStorage<F> {
match future.poll(&mut cx) {
Poll::Ready(_) => {
this.future.drop_in_place();
this.raw.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel);
this.raw.state.despawn();
#[cfg(feature = "integrated-timers")]
this.raw.expires_at.set(Instant::MAX);
@ -193,11 +192,7 @@ impl<F: Future + 'static> AvailableTask<F> {
///
/// This function returns `None` if a task has already been spawned and has not finished running.
pub fn claim(task: &'static TaskStorage<F>) -> Option<Self> {
task.raw
.state
.compare_exchange(0, STATE_SPAWNED | STATE_RUN_QUEUED, Ordering::AcqRel, Ordering::Acquire)
.ok()
.map(|_| Self { task })
task.raw.state.spawn().then(|| Self { task })
}
fn initialize_impl<S>(self, future: impl FnOnce() -> F) -> SpawnToken<S> {
@ -394,8 +389,7 @@ impl SyncExecutor {
#[cfg(feature = "integrated-timers")]
task.expires_at.set(Instant::MAX);
let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
if state & STATE_SPAWNED == 0 {
if !task.state.run_dequeue() {
// If task is not running, ignore it. This can happen in the following scenario:
// - Task gets dequeued, poll starts
// - While task is being polled, it gets woken. It gets placed in the queue.
@ -546,18 +540,7 @@ impl Executor {
/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
pub fn wake_task(task: TaskRef) {
let header = task.header();
let res = header.state.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
// If already scheduled, or if not started,
if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
None
} else {
// Mark it as scheduled
Some(state | STATE_RUN_QUEUED)
}
});
if res.is_ok() {
if header.state.run_enqueue() {
// We have just marked the task as scheduled, so enqueue it.
unsafe {
let executor = header.executor.get().unwrap_unchecked();
@ -571,18 +554,7 @@ pub fn wake_task(task: TaskRef) {
/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
pub fn wake_task_no_pend(task: TaskRef) {
let header = task.header();
let res = header.state.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
// If already scheduled, or if not started,
if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
None
} else {
// Mark it as scheduled
Some(state | STATE_RUN_QUEUED)
}
});
if res.is_ok() {
if header.state.run_enqueue() {
// We have just marked the task as scheduled, so enqueue it.
unsafe {
let executor = header.executor.get().unwrap_unchecked();

View File

@ -1,7 +1,6 @@
use core::ptr;
use core::ptr::NonNull;
use atomic_polyfill::{AtomicPtr, Ordering};
use core::sync::atomic::{AtomicPtr, Ordering};
use super::{TaskHeader, TaskRef};
use crate::raw::util::SyncUnsafeCell;

View File

@ -0,0 +1,75 @@
use core::cell::Cell;
use critical_section::{CriticalSection, Mutex};
use super::TaskRef;
pub(crate) struct RunQueueItem {
next: Mutex<Cell<Option<TaskRef>>>,
}
impl RunQueueItem {
pub const fn new() -> Self {
Self {
next: Mutex::new(Cell::new(None)),
}
}
}
/// Atomic task queue using a very, very simple lock-free linked-list queue:
///
/// To enqueue a task, task.next is set to the old head, and head is atomically set to task.
///
/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with
/// null. Then the batch is iterated following the next pointers until null is reached.
///
/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK
/// for our purposes: it can't create fairness problems since the next batch won't run until the
/// current batch is completely processed, so even if a task enqueues itself instantly (for example
/// by waking its own waker) can't prevent other tasks from running.
pub(crate) struct RunQueue {
head: Mutex<Cell<Option<TaskRef>>>,
}
impl RunQueue {
pub const fn new() -> Self {
Self {
head: Mutex::new(Cell::new(None)),
}
}
/// Enqueues an item. Returns true if the queue was empty.
///
/// # Safety
///
/// `item` must NOT be already enqueued in any queue.
#[inline(always)]
pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool {
critical_section::with(|cs| {
let prev = self.head.borrow(cs).replace(Some(task));
task.header().run_queue_item.next.borrow(cs).set(prev);
prev.is_none()
})
}
/// Empty the queue, then call `on_task` for each task that was in the queue.
/// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
/// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
// Atomically empty the queue.
let mut next = critical_section::with(|cs| self.head.borrow(cs).take());
// Iterate the linked list of tasks that were previously in the queue.
while let Some(task) = next {
// If the task re-enqueues itself, the `next` pointer will get overwritten.
// Therefore, first read the next pointer, and only then process the task.
// safety: we know if the task is enqueued, no one else will touch the `next` pointer.
let cs = unsafe { CriticalSection::new() };
next = task.header().run_queue_item.next.borrow(cs).get();
on_task(task);
}
}
}

View File

@ -0,0 +1,73 @@
use core::sync::atomic::{AtomicU32, Ordering};
/// Task is spawned (has a future)
pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
/// Task is in the executor run queue
pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1;
/// Task is in the executor timer queue
#[cfg(feature = "integrated-timers")]
pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
pub(crate) struct State {
state: AtomicU32,
}
impl State {
pub const fn new() -> State {
Self {
state: AtomicU32::new(0),
}
}
/// If task is idle, mark it as spawned + run_queued and return true.
#[inline(always)]
pub fn spawn(&self) -> bool {
self.state
.compare_exchange(0, STATE_SPAWNED | STATE_RUN_QUEUED, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
}
/// Unmark the task as spawned.
#[inline(always)]
pub fn despawn(&self) {
self.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel);
}
/// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success.
#[inline(always)]
pub fn run_enqueue(&self) -> bool {
self.state
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
// If already scheduled, or if not started,
if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
None
} else {
// Mark it as scheduled
Some(state | STATE_RUN_QUEUED)
}
})
.is_ok()
}
/// Unmark the task as run-queued. Return whether the task is spawned.
#[inline(always)]
pub fn run_dequeue(&self) -> bool {
let state = self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
state & STATE_SPAWNED != 0
}
/// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before)
#[cfg(feature = "integrated-timers")]
#[inline(always)]
pub fn timer_enqueue(&self) -> bool {
let old_state = self.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel);
old_state & STATE_TIMER_QUEUED == 0
}
/// Unmark the task as timer-queued.
#[cfg(feature = "integrated-timers")]
#[inline(always)]
pub fn timer_dequeue(&self) {
self.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::AcqRel);
}
}

View File

@ -0,0 +1,93 @@
use core::cell::Cell;
use critical_section::Mutex;
/// Task is spawned (has a future)
pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
/// Task is in the executor run queue
pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1;
/// Task is in the executor timer queue
#[cfg(feature = "integrated-timers")]
pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
pub(crate) struct State {
state: Mutex<Cell<u32>>,
}
impl State {
pub const fn new() -> State {
Self {
state: Mutex::new(Cell::new(0)),
}
}
fn update<R>(&self, f: impl FnOnce(&mut u32) -> R) -> R {
critical_section::with(|cs| {
let s = self.state.borrow(cs);
let mut val = s.get();
let r = f(&mut val);
s.set(val);
r
})
}
/// If task is idle, mark it as spawned + run_queued and return true.
#[inline(always)]
pub fn spawn(&self) -> bool {
self.update(|s| {
if *s == 0 {
*s = STATE_SPAWNED | STATE_RUN_QUEUED;
true
} else {
false
}
})
}
/// Unmark the task as spawned.
#[inline(always)]
pub fn despawn(&self) {
self.update(|s| *s &= !STATE_SPAWNED);
}
/// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success.
#[inline(always)]
pub fn run_enqueue(&self) -> bool {
self.update(|s| {
if (*s & STATE_RUN_QUEUED != 0) || (*s & STATE_SPAWNED == 0) {
false
} else {
*s |= STATE_RUN_QUEUED;
true
}
})
}
/// Unmark the task as run-queued. Return whether the task is spawned.
#[inline(always)]
pub fn run_dequeue(&self) -> bool {
self.update(|s| {
let ok = *s & STATE_SPAWNED != 0;
*s &= !STATE_RUN_QUEUED;
ok
})
}
/// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before)
#[cfg(feature = "integrated-timers")]
#[inline(always)]
pub fn timer_enqueue(&self) -> bool {
self.update(|s| {
let ok = *s & STATE_TIMER_QUEUED == 0;
*s |= STATE_TIMER_QUEUED;
ok
})
}
/// Unmark the task as timer-queued.
#[cfg(feature = "integrated-timers")]
#[inline(always)]
pub fn timer_dequeue(&self) {
self.update(|s| *s &= !STATE_TIMER_QUEUED);
}
}

View File

@ -1,9 +1,8 @@
use core::cmp::min;
use atomic_polyfill::Ordering;
use embassy_time::Instant;
use super::{TaskRef, STATE_TIMER_QUEUED};
use super::TaskRef;
use crate::raw::util::SyncUnsafeCell;
pub(crate) struct TimerQueueItem {
@ -32,10 +31,7 @@ impl TimerQueue {
pub(crate) unsafe fn update(&self, p: TaskRef) {
let task = p.header();
if task.expires_at.get() != Instant::MAX {
let old_state = task.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel);
let is_new = old_state & STATE_TIMER_QUEUED == 0;
if is_new {
if task.state.timer_enqueue() {
task.timer_queue_item.next.set(self.head.get());
self.head.set(Some(p));
}
@ -75,7 +71,7 @@ impl TimerQueue {
} else {
// Remove it
prev.set(task.timer_queue_item.next.get());
task.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::AcqRel);
task.state.timer_dequeue();
}
}
}