Split embassy crate into embassy-executor, embassy-util.

This commit is contained in:
Dario Nieuwenhuis
2022-07-29 21:58:35 +02:00
parent 8745d646f0
commit a0f1b0ee01
319 changed files with 1159 additions and 998 deletions

View File

@ -0,0 +1,70 @@
[package]
name = "embassy-executor"
version = "0.1.0"
edition = "2021"
[package.metadata.embassy_docs]
src_base = "https://github.com/embassy-rs/embassy/blob/embassy-executor-v$VERSION/embassy-executor/src/"
src_base_git = "https://github.com/embassy-rs/embassy/blob/$COMMIT/embassy-executor/src/"
features = ["nightly", "defmt", "unstable-traits", "time", "time-tick-1mhz"]
flavors = [
{ name = "std", target = "x86_64-unknown-linux-gnu", features = ["std"] },
{ name = "wasm", target = "wasm32-unknown-unknown", features = ["wasm"] },
{ name = "thumbv6m-none-eabi", target = "thumbv6m-none-eabi", features = [] },
{ name = "thumbv7m-none-eabi", target = "thumbv7m-none-eabi", features = [] },
{ name = "thumbv7em-none-eabi", target = "thumbv7em-none-eabi", features = [] },
{ name = "thumbv7em-none-eabihf", target = "thumbv7em-none-eabihf", features = [] },
{ name = "thumbv8m.base-none-eabi", target = "thumbv8m.base-none-eabi", features = [] },
{ name = "thumbv8m.main-none-eabi", target = "thumbv8m.main-none-eabi", features = [] },
{ name = "thumbv8m.main-none-eabihf", target = "thumbv8m.main-none-eabihf", features = [] },
]
[features]
default = []
std = ["time", "time-tick-1mhz", "embassy-macros/std"]
wasm = ["wasm-bindgen", "js-sys", "embassy-macros/wasm", "wasm-timer", "time", "time-tick-1mhz"]
# Enable nightly-only features
nightly = ["embedded-hal-async"]
# Implement embedded-hal 1.0 alpha and embedded-hal-async traits.
# Implement embedded-hal-async traits if `nightly` is set as well.
unstable-traits = ["embedded-hal-1"]
# Display a timestamp of the number of seconds since startup next to defmt log messages
# To use this you must have a time driver provided.
defmt-timestamp-uptime = ["defmt"]
# Enable `embassy_executor::time` module.
# NOTE: This feature is only intended to be enabled by crates providing the time driver implementation.
# Enabling it directly without supplying a time driver will fail to link.
time = []
# Set the `embassy_executor::time` tick rate.
# NOTE: This feature is only intended to be enabled by crates providing the time driver implementation.
# If you're not writing your own driver, check the driver documentation to customize the tick rate.
# If you're writing a driver and your tick rate is not listed here, please add it and send a PR!
time-tick-32768hz = ["time"]
time-tick-1000hz = ["time"]
time-tick-1mhz = ["time"]
time-tick-16mhz = ["time"]
[dependencies]
defmt = { version = "0.3", optional = true }
log = { version = "0.4.14", optional = true }
embedded-hal-02 = { package = "embedded-hal", version = "0.2.6" }
embedded-hal-1 = { package = "embedded-hal", version = "1.0.0-alpha.8", optional = true}
embedded-hal-async = { version = "0.1.0-alpha.1", optional = true}
futures-util = { version = "0.3.17", default-features = false }
embassy-macros = { version = "0.1.0", path = "../embassy-macros"}
atomic-polyfill = "0.1.5"
critical-section = "0.2.5"
cfg-if = "1.0.0"
# WASM dependencies
wasm-bindgen = { version = "0.2.76", features = ["nightly"], optional = true }
js-sys = { version = "0.3", optional = true }
wasm-timer = { version = "0.2.5", optional = true }

29
embassy-executor/build.rs Normal file
View File

@ -0,0 +1,29 @@
use std::env;
fn main() {
let target = env::var("TARGET").unwrap();
if target.starts_with("thumbv6m-") {
println!("cargo:rustc-cfg=cortex_m");
println!("cargo:rustc-cfg=armv6m");
} else if target.starts_with("thumbv7m-") {
println!("cargo:rustc-cfg=cortex_m");
println!("cargo:rustc-cfg=armv7m");
} else if target.starts_with("thumbv7em-") {
println!("cargo:rustc-cfg=cortex_m");
println!("cargo:rustc-cfg=armv7m");
println!("cargo:rustc-cfg=armv7em"); // (not currently used)
} else if target.starts_with("thumbv8m.base") {
println!("cargo:rustc-cfg=cortex_m");
println!("cargo:rustc-cfg=armv8m");
println!("cargo:rustc-cfg=armv8m_base");
} else if target.starts_with("thumbv8m.main") {
println!("cargo:rustc-cfg=cortex_m");
println!("cargo:rustc-cfg=armv8m");
println!("cargo:rustc-cfg=armv8m_main");
}
if target.ends_with("-eabihf") {
println!("cargo:rustc-cfg=has_fpu");
}
}

View File

@ -0,0 +1,59 @@
use core::arch::asm;
use core::marker::PhantomData;
use core::ptr;
use super::{raw, Spawner};
/// Thread mode executor, using WFE/SEV.
///
/// This is the simplest and most common kind of executor. It runs on
/// thread mode (at the lowest priority level), and uses the `WFE` ARM instruction
/// to sleep when it has no more work to do. When a task is woken, a `SEV` instruction
/// is executed, to make the `WFE` exit from sleep and poll the task.
///
/// This executor allows for ultra low power consumption for chips where `WFE`
/// triggers low-power sleep without extra steps. If your chip requires extra steps,
/// you may use [`raw::Executor`] directly to program custom behavior.
pub struct Executor {
inner: raw::Executor,
not_send: PhantomData<*mut ()>,
}
impl Executor {
/// Create a new Executor.
pub fn new() -> Self {
Self {
inner: raw::Executor::new(|_| unsafe { asm!("sev") }, ptr::null_mut()),
not_send: PhantomData,
}
}
/// Run the executor.
///
/// The `init` closure is called with a [`Spawner`] that spawns tasks on
/// this executor. Use it to spawn the initial task(s). After `init` returns,
/// the executor starts running the tasks.
///
/// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`),
/// for example by passing it as an argument to the initial tasks.
///
/// This function requires `&'static mut self`. This means you have to store the
/// Executor instance in a place where it'll live forever and grants you mutable
/// access. There's a few ways to do this:
///
/// - a [Forever](crate::util::Forever) (safe)
/// - a `static mut` (unsafe)
/// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe)
///
/// This function never returns.
pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
init(self.inner.spawner());
loop {
unsafe {
self.inner.poll();
asm!("wfe");
};
}
}
}

View File

@ -0,0 +1,74 @@
use core::marker::PhantomData;
use core::ptr;
use atomic_polyfill::{AtomicBool, Ordering};
use super::{raw, Spawner};
/// global atomic used to keep track of whether there is work to do since sev() is not available on RISCV
///
static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false);
/// RISCV32 Executor
pub struct Executor {
inner: raw::Executor,
not_send: PhantomData<*mut ()>,
}
impl Executor {
/// Create a new Executor.
pub fn new() -> Self {
Self {
// use Signal_Work_Thread_Mode as substitute for local interrupt register
inner: raw::Executor::new(
|_| {
SIGNAL_WORK_THREAD_MODE.store(true, Ordering::SeqCst);
},
ptr::null_mut(),
),
not_send: PhantomData,
}
}
/// Run the executor.
///
/// The `init` closure is called with a [`Spawner`] that spawns tasks on
/// this executor. Use it to spawn the initial task(s). After `init` returns,
/// the executor starts running the tasks.
///
/// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`),
/// for example by passing it as an argument to the initial tasks.
///
/// This function requires `&'static mut self`. This means you have to store the
/// Executor instance in a place where it'll live forever and grants you mutable
/// access. There's a few ways to do this:
///
/// - a [Forever](crate::util::Forever) (safe)
/// - a `static mut` (unsafe)
/// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe)
///
/// This function never returns.
pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
init(self.inner.spawner());
loop {
unsafe {
self.inner.poll();
// we do not care about race conditions between the load and store operations, interrupts
//will only set this value to true.
critical_section::with(|_| {
// if there is work to do, loop back to polling
// TODO can we relax this?
if SIGNAL_WORK_THREAD_MODE.load(Ordering::SeqCst) {
SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst);
}
// if not, wait for interrupt
else {
core::arch::asm!("wfi");
}
});
// if an interrupt occurred while waiting, it will be serviced here
}
}
}
}

View File

@ -0,0 +1,84 @@
use std::marker::PhantomData;
use std::sync::{Condvar, Mutex};
use super::{raw, Spawner};
/// Single-threaded std-based executor.
pub struct Executor {
inner: raw::Executor,
not_send: PhantomData<*mut ()>,
signaler: &'static Signaler,
}
impl Executor {
/// Create a new Executor.
pub fn new() -> Self {
let signaler = &*Box::leak(Box::new(Signaler::new()));
Self {
inner: raw::Executor::new(
|p| unsafe {
let s = &*(p as *const () as *const Signaler);
s.signal()
},
signaler as *const _ as _,
),
not_send: PhantomData,
signaler,
}
}
/// Run the executor.
///
/// The `init` closure is called with a [`Spawner`] that spawns tasks on
/// this executor. Use it to spawn the initial task(s). After `init` returns,
/// the executor starts running the tasks.
///
/// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`),
/// for example by passing it as an argument to the initial tasks.
///
/// This function requires `&'static mut self`. This means you have to store the
/// Executor instance in a place where it'll live forever and grants you mutable
/// access. There's a few ways to do this:
///
/// - a [Forever](crate::util::Forever) (safe)
/// - a `static mut` (unsafe)
/// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe)
///
/// This function never returns.
pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
init(self.inner.spawner());
loop {
unsafe { self.inner.poll() };
self.signaler.wait()
}
}
}
struct Signaler {
mutex: Mutex<bool>,
condvar: Condvar,
}
impl Signaler {
fn new() -> Self {
Self {
mutex: Mutex::new(false),
condvar: Condvar::new(),
}
}
fn wait(&self) {
let mut signaled = self.mutex.lock().unwrap();
while !*signaled {
signaled = self.condvar.wait(signaled).unwrap();
}
*signaled = false;
}
fn signal(&self) {
let mut signaled = self.mutex.lock().unwrap();
*signaled = true;
self.condvar.notify_one();
}
}

View File

@ -0,0 +1,74 @@
use core::marker::PhantomData;
use js_sys::Promise;
use wasm_bindgen::prelude::*;
use super::raw::util::UninitCell;
use super::raw::{self};
use super::Spawner;
/// WASM executor, wasm_bindgen to schedule tasks on the JS event loop.
pub struct Executor {
inner: raw::Executor,
ctx: &'static WasmContext,
not_send: PhantomData<*mut ()>,
}
pub(crate) struct WasmContext {
promise: Promise,
closure: UninitCell<Closure<dyn FnMut(JsValue)>>,
}
impl WasmContext {
pub fn new() -> Self {
Self {
promise: Promise::resolve(&JsValue::undefined()),
closure: UninitCell::uninit(),
}
}
}
impl Executor {
/// Create a new Executor.
pub fn new() -> Self {
let ctx = &*Box::leak(Box::new(WasmContext::new()));
let inner = raw::Executor::new(
|p| unsafe {
let ctx = &*(p as *const () as *const WasmContext);
let _ = ctx.promise.then(ctx.closure.as_mut());
},
ctx as *const _ as _,
);
Self {
inner,
not_send: PhantomData,
ctx,
}
}
/// Run the executor.
///
/// The `init` closure is called with a [`Spawner`] that spawns tasks on
/// this executor. Use it to spawn the initial task(s). After `init` returns,
/// the executor starts running the tasks.
///
/// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`),
/// for example by passing it as an argument to the initial tasks.
///
/// This function requires `&'static mut self`. This means you have to store the
/// Executor instance in a place where it'll live forever and grants you mutable
/// access. There's a few ways to do this:
///
/// - a [Forever](crate::util::Forever) (safe)
/// - a `static mut` (unsafe)
/// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe)
pub fn start(&'static mut self, init: impl FnOnce(Spawner)) {
unsafe {
let executor = &self.inner;
self.ctx.closure.write(Closure::new(move |_| {
executor.poll();
}));
init(self.inner.spawner());
}
}
}

View File

@ -0,0 +1,75 @@
use core::marker::PhantomData;
use core::ptr;
use atomic_polyfill::{AtomicBool, Ordering};
use super::{raw, Spawner};
/// global atomic used to keep track of whether there is work to do since sev() is not available on Xtensa
///
static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false);
/// Xtensa Executor
pub struct Executor {
inner: raw::Executor,
not_send: PhantomData<*mut ()>,
}
impl Executor {
/// Create a new Executor.
pub fn new() -> Self {
Self {
// use Signal_Work_Thread_Mode as substitute for local interrupt register
inner: raw::Executor::new(
|_| {
SIGNAL_WORK_THREAD_MODE.store(true, Ordering::SeqCst);
},
ptr::null_mut(),
),
not_send: PhantomData,
}
}
/// Run the executor.
///
/// The `init` closure is called with a [`Spawner`] that spawns tasks on
/// this executor. Use it to spawn the initial task(s). After `init` returns,
/// the executor starts running the tasks.
///
/// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`),
/// for example by passing it as an argument to the initial tasks.
///
/// This function requires `&'static mut self`. This means you have to store the
/// Executor instance in a place where it'll live forever and grants you mutable
/// access. There's a few ways to do this:
///
/// - a [Forever](crate::util::Forever) (safe)
/// - a `static mut` (unsafe)
/// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe)
///
/// This function never returns.
pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
init(self.inner.spawner());
loop {
unsafe {
self.inner.poll();
// we do not care about race conditions between the load and store operations, interrupts
// will only set this value to true.
// if there is work to do, loop back to polling
// TODO can we relax this?
critical_section::with(|_| {
if SIGNAL_WORK_THREAD_MODE.load(Ordering::SeqCst) {
SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst);
} else {
// waiti sets the PS.INTLEVEL when slipping into sleep
// because critical sections in Xtensa are implemented via increasing
// PS.INTLEVEL the critical section ends here
// take care not add code after `waiti` if it needs to be inside the CS
core::arch::asm!("waiti 0"); // critical section ends here
}
});
}
}
}
}

View File

@ -0,0 +1,44 @@
//! Async task executor.
//!
//! This module provides an async/await executor designed for embedded usage.
//!
//! - No `alloc`, no heap needed. Task futures are statically allocated.
//! - No "fixed capacity" data structures, executor works with 1 or 1000 tasks without needing config/tuning.
//! - Integrated timer queue: sleeping is easy, just do `Timer::after(Duration::from_secs(1)).await;`.
//! - No busy-loop polling: CPU sleeps when there's no work to do, using interrupts or `WFE/SEV`.
//! - Efficient polling: a wake will only poll the woken task, not all of them.
//! - Fair: a task can't monopolize CPU time even if it's constantly being woken. All other tasks get a chance to run before a given task gets polled for the second time.
//! - Creating multiple executor instances is supported, to run tasks with multiple priority levels. This allows higher-priority tasks to preempt lower-priority tasks.
cfg_if::cfg_if! {
if #[cfg(cortex_m)] {
#[path="arch/cortex_m.rs"]
mod arch;
pub use arch::*;
}
else if #[cfg(target_arch="riscv32")] {
#[path="arch/riscv32.rs"]
mod arch;
pub use arch::*;
}
else if #[cfg(all(target_arch="xtensa", feature = "nightly"))] {
#[path="arch/xtensa.rs"]
mod arch;
pub use arch::*;
}
else if #[cfg(feature="wasm")] {
#[path="arch/wasm.rs"]
mod arch;
pub use arch::*;
}
else if #[cfg(feature="std")] {
#[path="arch/std.rs"]
mod arch;
pub use arch::*;
}
}
pub mod raw;
mod spawner;
pub use spawner::*;

View File

@ -0,0 +1,433 @@
//! Raw executor.
//!
//! This module exposes "raw" Executor and Task structs for more low level control.
//!
//! ## WARNING: here be dragons!
//!
//! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe
//! executor wrappers in [`executor`](crate::executor) and the [`embassy_executor::task`](embassy_macros::task) macro, which are fully safe.
mod run_queue;
#[cfg(feature = "time")]
mod timer_queue;
pub(crate) mod util;
mod waker;
use core::cell::Cell;
use core::future::Future;
use core::pin::Pin;
use core::ptr::NonNull;
use core::task::{Context, Poll};
use core::{mem, ptr};
use atomic_polyfill::{AtomicU32, Ordering};
use critical_section::CriticalSection;
use self::run_queue::{RunQueue, RunQueueItem};
use self::util::UninitCell;
pub use self::waker::task_from_waker;
use super::SpawnToken;
#[cfg(feature = "time")]
use crate::time::driver::{self, AlarmHandle};
#[cfg(feature = "time")]
use crate::time::Instant;
/// Task is spawned (has a future)
pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
/// Task is in the executor run queue
pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1;
/// Task is in the executor timer queue
#[cfg(feature = "time")]
pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
/// Raw task header for use in task pointers.
///
/// This is an opaque struct, used for raw pointers to tasks, for use
/// with funtions like [`wake_task`] and [`task_from_waker`].
pub struct TaskHeader {
pub(crate) state: AtomicU32,
pub(crate) run_queue_item: RunQueueItem,
pub(crate) executor: Cell<*const Executor>, // Valid if state != 0
pub(crate) poll_fn: UninitCell<unsafe fn(NonNull<TaskHeader>)>, // Valid if STATE_SPAWNED
#[cfg(feature = "time")]
pub(crate) expires_at: Cell<Instant>,
#[cfg(feature = "time")]
pub(crate) timer_queue_item: timer_queue::TimerQueueItem,
}
impl TaskHeader {
pub(crate) const fn new() -> Self {
Self {
state: AtomicU32::new(0),
run_queue_item: RunQueueItem::new(),
executor: Cell::new(ptr::null()),
poll_fn: UninitCell::uninit(),
#[cfg(feature = "time")]
expires_at: Cell::new(Instant::from_ticks(0)),
#[cfg(feature = "time")]
timer_queue_item: timer_queue::TimerQueueItem::new(),
}
}
pub(crate) unsafe fn enqueue(&self) {
critical_section::with(|cs| {
let state = self.state.load(Ordering::Relaxed);
// If already scheduled, or if not started,
if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
return;
}
// Mark it as scheduled
self.state.store(state | STATE_RUN_QUEUED, Ordering::Relaxed);
// We have just marked the task as scheduled, so enqueue it.
let executor = &*self.executor.get();
executor.enqueue(cs, self as *const TaskHeader as *mut TaskHeader);
})
}
}
/// Raw storage in which a task can be spawned.
///
/// This struct holds the necessary memory to spawn one task whose future is `F`.
/// At a given time, the `TaskStorage` may be in spawned or not-spawned state. You
/// may spawn it with [`TaskStorage::spawn()`], which will fail if it is already spawned.
///
/// A `TaskStorage` must live forever, it may not be deallocated even after the task has finished
/// running. Hence the relevant methods require `&'static self`. It may be reused, however.
///
/// Internally, the [embassy_executor::task](embassy_macros::task) macro allocates an array of `TaskStorage`s
/// in a `static`. The most common reason to use the raw `Task` is to have control of where
/// the memory for the task is allocated: on the stack, or on the heap with e.g. `Box::leak`, etc.
// repr(C) is needed to guarantee that the Task is located at offset 0
// This makes it safe to cast between TaskHeader and TaskStorage pointers.
#[repr(C)]
pub struct TaskStorage<F: Future + 'static> {
raw: TaskHeader,
future: UninitCell<F>, // Valid if STATE_SPAWNED
}
impl<F: Future + 'static> TaskStorage<F> {
const NEW: Self = Self::new();
/// Create a new TaskStorage, in not-spawned state.
pub const fn new() -> Self {
Self {
raw: TaskHeader::new(),
future: UninitCell::uninit(),
}
}
/// Try to spawn the task.
///
/// The `future` closure constructs the future. It's only called if spawning is
/// actually possible. It is a closure instead of a simple `future: F` param to ensure
/// the future is constructed in-place, avoiding a temporary copy in the stack thanks to
/// NRVO optimizations.
///
/// This function will fail if the task is already spawned and has not finished running.
/// In this case, the error is delayed: a "poisoned" SpawnToken is returned, which will
/// cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error.
///
/// Once the task has finished running, you may spawn it again. It is allowed to spawn it
/// on a different executor.
pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
if self.spawn_mark_used() {
return unsafe { SpawnToken::<F>::new(self.spawn_initialize(future)) };
}
SpawnToken::<F>::new_failed()
}
fn spawn_mark_used(&'static self) -> bool {
let state = STATE_SPAWNED | STATE_RUN_QUEUED;
self.raw
.state
.compare_exchange(0, state, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
}
unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> NonNull<TaskHeader> {
// Initialize the task
self.raw.poll_fn.write(Self::poll);
self.future.write(future());
NonNull::new_unchecked(&self.raw as *const TaskHeader as *mut TaskHeader)
}
unsafe fn poll(p: NonNull<TaskHeader>) {
let this = &*(p.as_ptr() as *const TaskStorage<F>);
let future = Pin::new_unchecked(this.future.as_mut());
let waker = waker::from_task(p);
let mut cx = Context::from_waker(&waker);
match future.poll(&mut cx) {
Poll::Ready(_) => {
this.future.drop_in_place();
this.raw.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel);
}
Poll::Pending => {}
}
// the compiler is emitting a virtual call for waker drop, but we know
// it's a noop for our waker.
mem::forget(waker);
}
}
unsafe impl<F: Future + 'static> Sync for TaskStorage<F> {}
/// Raw storage that can hold up to N tasks of the same type.
///
/// This is essentially a `[TaskStorage<F>; N]`.
pub struct TaskPool<F: Future + 'static, const N: usize> {
pool: [TaskStorage<F>; N],
}
impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
/// Create a new TaskPool, with all tasks in non-spawned state.
pub const fn new() -> Self {
Self {
pool: [TaskStorage::NEW; N],
}
}
/// Try to spawn a task in the pool.
///
/// See [`TaskStorage::spawn()`] for details.
///
/// This will loop over the pool and spawn the task in the first storage that
/// is currently free. If none is free, a "poisoned" SpawnToken is returned,
/// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error.
pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
for task in &self.pool {
if task.spawn_mark_used() {
return unsafe { SpawnToken::<F>::new(task.spawn_initialize(future)) };
}
}
SpawnToken::<F>::new_failed()
}
/// Like spawn(), but allows the task to be send-spawned if the args are Send even if
/// the future is !Send.
///
/// Not covered by semver guarantees. DO NOT call this directly. Intended to be used
/// by the Embassy macros ONLY.
///
/// SAFETY: `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn`
/// is an `async fn`, NOT a hand-written `Future`.
#[doc(hidden)]
pub unsafe fn _spawn_async_fn<FutFn>(&'static self, future: FutFn) -> SpawnToken<impl Sized>
where
FutFn: FnOnce() -> F,
{
// When send-spawning a task, we construct the future in this thread, and effectively
// "send" it to the executor thread by enqueuing it in its queue. Therefore, in theory,
// send-spawning should require the future `F` to be `Send`.
//
// The problem is this is more restrictive than needed. Once the future is executing,
// it is never sent to another thread. It is only sent when spawning. It should be
// enough for the task's arguments to be Send. (and in practice it's super easy to
// accidentally make your futures !Send, for example by holding an `Rc` or a `&RefCell` across an `.await`.)
//
// We can do it by sending the task args and constructing the future in the executor thread
// on first poll. However, this cannot be done in-place, so it'll waste stack space for a copy
// of the args.
//
// Luckily, an `async fn` future contains just the args when freshly constructed. So, if the
// args are Send, it's OK to send a !Send future, as long as we do it before first polling it.
//
// (Note: this is how the generators are implemented today, it's not officially guaranteed yet,
// but it's possible it'll be guaranteed in the future. See zulip thread:
// https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async/topic/.22only.20before.20poll.22.20Send.20futures )
//
// The `FutFn` captures all the args, so if it's Send, the task can be send-spawned.
// This is why we return `SpawnToken<FutFn>` below.
//
// This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly
// by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`.
for task in &self.pool {
if task.spawn_mark_used() {
return SpawnToken::<FutFn>::new(task.spawn_initialize(future));
}
}
SpawnToken::<FutFn>::new_failed()
}
}
/// Raw executor.
///
/// This is the core of the Embassy executor. It is low-level, requiring manual
/// handling of wakeups and task polling. If you can, prefer using one of the
/// higher level executors in [`crate::executor`].
///
/// The raw executor leaves it up to you to handle wakeups and scheduling:
///
/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks
/// that "want to run").
/// - You must supply a `signal_fn`. The executor will call it to notify you it has work
/// to do. You must arrange for `poll()` to be called as soon as possible.
///
/// `signal_fn` can be called from *any* context: any thread, any interrupt priority
/// level, etc. It may be called synchronously from any `Executor` method call as well.
/// You must deal with this correctly.
///
/// In particular, you must NOT call `poll` directly from `signal_fn`, as this violates
/// the requirement for `poll` to not be called reentrantly.
pub struct Executor {
run_queue: RunQueue,
signal_fn: fn(*mut ()),
signal_ctx: *mut (),
#[cfg(feature = "time")]
pub(crate) timer_queue: timer_queue::TimerQueue,
#[cfg(feature = "time")]
alarm: AlarmHandle,
}
impl Executor {
/// Create a new executor.
///
/// When the executor has work to do, it will call `signal_fn` with
/// `signal_ctx` as argument.
///
/// See [`Executor`] docs for details on `signal_fn`.
pub fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self {
#[cfg(feature = "time")]
let alarm = unsafe { unwrap!(driver::allocate_alarm()) };
#[cfg(feature = "time")]
driver::set_alarm_callback(alarm, signal_fn, signal_ctx);
Self {
run_queue: RunQueue::new(),
signal_fn,
signal_ctx,
#[cfg(feature = "time")]
timer_queue: timer_queue::TimerQueue::new(),
#[cfg(feature = "time")]
alarm,
}
}
/// Enqueue a task in the task queue
///
/// # Safety
/// - `task` must be a valid pointer to a spawned task.
/// - `task` must be set up to run in this executor.
/// - `task` must NOT be already enqueued (in this executor or another one).
#[inline(always)]
unsafe fn enqueue(&self, cs: CriticalSection, task: *mut TaskHeader) {
if self.run_queue.enqueue(cs, task) {
(self.signal_fn)(self.signal_ctx)
}
}
/// Spawn a task in this executor.
///
/// # Safety
///
/// `task` must be a valid pointer to an initialized but not-already-spawned task.
///
/// It is OK to use `unsafe` to call this from a thread that's not the executor thread.
/// In this case, the task's Future must be Send. This is because this is effectively
/// sending the task to the executor thread.
pub(super) unsafe fn spawn(&'static self, task: NonNull<TaskHeader>) {
let task = task.as_ref();
task.executor.set(self);
critical_section::with(|cs| {
self.enqueue(cs, task as *const _ as _);
})
}
/// Poll all queued tasks in this executor.
///
/// This loops over all tasks that are queued to be polled (i.e. they're
/// freshly spawned or they've been woken). Other tasks are not polled.
///
/// You must call `poll` after receiving a call to `signal_fn`. It is OK
/// to call `poll` even when not requested by `signal_fn`, but it wastes
/// energy.
///
/// # Safety
///
/// You must NOT call `poll` reentrantly on the same executor.
///
/// In particular, note that `poll` may call `signal_fn` synchronously. Therefore, you
/// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to
/// somehow schedule for `poll()` to be called later, at a time you know for sure there's
/// no `poll()` already running.
pub unsafe fn poll(&'static self) {
#[cfg(feature = "time")]
self.timer_queue.dequeue_expired(Instant::now(), |p| {
p.as_ref().enqueue();
});
self.run_queue.dequeue_all(|p| {
let task = p.as_ref();
#[cfg(feature = "time")]
task.expires_at.set(Instant::MAX);
let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
if state & STATE_SPAWNED == 0 {
// If task is not running, ignore it. This can happen in the following scenario:
// - Task gets dequeued, poll starts
// - While task is being polled, it gets woken. It gets placed in the queue.
// - Task poll finishes, returning done=true
// - RUNNING bit is cleared, but the task is already in the queue.
return;
}
// Run the task
task.poll_fn.read()(p as _);
// Enqueue or update into timer_queue
#[cfg(feature = "time")]
self.timer_queue.update(p);
});
#[cfg(feature = "time")]
{
// If this is already in the past, set_alarm will immediately trigger the alarm.
// This will cause `signal_fn` to be called, which will cause `poll()` to be called again,
// so we immediately do another poll loop iteration.
let next_expiration = self.timer_queue.next_expiration();
driver::set_alarm(self.alarm, next_expiration.as_ticks());
}
}
/// Get a spawner that spawns tasks in this executor.
///
/// It is OK to call this method multiple times to obtain multiple
/// `Spawner`s. You may also copy `Spawner`s.
pub fn spawner(&'static self) -> super::Spawner {
super::Spawner::new(self)
}
}
/// Wake a task by raw pointer.
///
/// You can obtain task pointers from `Waker`s using [`task_from_waker`].
///
/// # Safety
///
/// `task` must be a valid task pointer obtained from [`task_from_waker`].
pub unsafe fn wake_task(task: NonNull<TaskHeader>) {
task.as_ref().enqueue();
}
#[cfg(feature = "time")]
pub(crate) unsafe fn register_timer(at: Instant, waker: &core::task::Waker) {
let task = waker::task_from_waker(waker);
let task = task.as_ref();
let expires_at = task.expires_at.get();
task.expires_at.set(expires_at.min(at));
}

View File

@ -0,0 +1,74 @@
use core::ptr;
use core::ptr::NonNull;
use atomic_polyfill::{AtomicPtr, Ordering};
use critical_section::CriticalSection;
use super::TaskHeader;
pub(crate) struct RunQueueItem {
next: AtomicPtr<TaskHeader>,
}
impl RunQueueItem {
pub const fn new() -> Self {
Self {
next: AtomicPtr::new(ptr::null_mut()),
}
}
}
/// Atomic task queue using a very, very simple lock-free linked-list queue:
///
/// To enqueue a task, task.next is set to the old head, and head is atomically set to task.
///
/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with
/// null. Then the batch is iterated following the next pointers until null is reached.
///
/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK
/// for our purposes: it can't create fairness problems since the next batch won't run until the
/// current batch is completely processed, so even if a task enqueues itself instantly (for example
/// by waking its own waker) can't prevent other tasks from running.
pub(crate) struct RunQueue {
head: AtomicPtr<TaskHeader>,
}
impl RunQueue {
pub const fn new() -> Self {
Self {
head: AtomicPtr::new(ptr::null_mut()),
}
}
/// Enqueues an item. Returns true if the queue was empty.
///
/// # Safety
///
/// `item` must NOT be already enqueued in any queue.
#[inline(always)]
pub(crate) unsafe fn enqueue(&self, _cs: CriticalSection, task: *mut TaskHeader) -> bool {
let prev = self.head.load(Ordering::Relaxed);
(*task).run_queue_item.next.store(prev, Ordering::Relaxed);
self.head.store(task, Ordering::Relaxed);
prev.is_null()
}
/// Empty the queue, then call `on_task` for each task that was in the queue.
/// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
/// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
pub(crate) fn dequeue_all(&self, on_task: impl Fn(NonNull<TaskHeader>)) {
// Atomically empty the queue.
let mut ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel);
// Iterate the linked list of tasks that were previously in the queue.
while let Some(task) = NonNull::new(ptr) {
// If the task re-enqueues itself, the `next` pointer will get overwritten.
// Therefore, first read the next pointer, and only then process the task.
let next = unsafe { task.as_ref() }.run_queue_item.next.load(Ordering::Relaxed);
on_task(task);
ptr = next
}
}
}

View File

@ -0,0 +1,85 @@
use core::cell::Cell;
use core::cmp::min;
use core::ptr;
use core::ptr::NonNull;
use atomic_polyfill::Ordering;
use super::{TaskHeader, STATE_TIMER_QUEUED};
use crate::time::Instant;
pub(crate) struct TimerQueueItem {
next: Cell<*mut TaskHeader>,
}
impl TimerQueueItem {
pub const fn new() -> Self {
Self {
next: Cell::new(ptr::null_mut()),
}
}
}
pub(crate) struct TimerQueue {
head: Cell<*mut TaskHeader>,
}
impl TimerQueue {
pub const fn new() -> Self {
Self {
head: Cell::new(ptr::null_mut()),
}
}
pub(crate) unsafe fn update(&self, p: NonNull<TaskHeader>) {
let task = p.as_ref();
if task.expires_at.get() != Instant::MAX {
let old_state = task.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel);
let is_new = old_state & STATE_TIMER_QUEUED == 0;
if is_new {
task.timer_queue_item.next.set(self.head.get());
self.head.set(p.as_ptr());
}
}
}
pub(crate) unsafe fn next_expiration(&self) -> Instant {
let mut res = Instant::MAX;
self.retain(|p| {
let task = p.as_ref();
let expires = task.expires_at.get();
res = min(res, expires);
expires != Instant::MAX
});
res
}
pub(crate) unsafe fn dequeue_expired(&self, now: Instant, on_task: impl Fn(NonNull<TaskHeader>)) {
self.retain(|p| {
let task = p.as_ref();
if task.expires_at.get() <= now {
on_task(p);
false
} else {
true
}
});
}
pub(crate) unsafe fn retain(&self, mut f: impl FnMut(NonNull<TaskHeader>) -> bool) {
let mut prev = &self.head;
while !prev.get().is_null() {
let p = NonNull::new_unchecked(prev.get());
let task = &*p.as_ptr();
if f(p) {
// Skip to next
prev = &task.timer_queue_item.next;
} else {
// Remove it
prev.set(task.timer_queue_item.next.get());
task.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::AcqRel);
}
}
}
}

View File

@ -0,0 +1,33 @@
use core::cell::UnsafeCell;
use core::mem::MaybeUninit;
use core::ptr;
pub(crate) struct UninitCell<T>(MaybeUninit<UnsafeCell<T>>);
impl<T> UninitCell<T> {
pub const fn uninit() -> Self {
Self(MaybeUninit::uninit())
}
pub unsafe fn as_mut_ptr(&self) -> *mut T {
(*self.0.as_ptr()).get()
}
#[allow(clippy::mut_from_ref)]
pub unsafe fn as_mut(&self) -> &mut T {
&mut *self.as_mut_ptr()
}
pub unsafe fn write(&self, val: T) {
ptr::write(self.as_mut_ptr(), val)
}
pub unsafe fn drop_in_place(&self) {
ptr::drop_in_place(self.as_mut_ptr())
}
}
impl<T: Copy> UninitCell<T> {
pub unsafe fn read(&self) -> T {
ptr::read(self.as_mut_ptr())
}
}

View File

@ -0,0 +1,53 @@
use core::mem;
use core::ptr::NonNull;
use core::task::{RawWaker, RawWakerVTable, Waker};
use super::TaskHeader;
const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop);
unsafe fn clone(p: *const ()) -> RawWaker {
RawWaker::new(p, &VTABLE)
}
unsafe fn wake(p: *const ()) {
(*(p as *mut TaskHeader)).enqueue()
}
unsafe fn drop(_: *const ()) {
// nop
}
pub(crate) unsafe fn from_task(p: NonNull<TaskHeader>) -> Waker {
Waker::from_raw(RawWaker::new(p.as_ptr() as _, &VTABLE))
}
/// Get a task pointer from a waker.
///
/// This can be used as an optimization in wait queues to store task pointers
/// (1 word) instead of full Wakers (2 words). This saves a bit of RAM and helps
/// avoid dynamic dispatch.
///
/// You can use the returned task pointer to wake the task with [`wake_task`](super::wake_task).
///
/// # Panics
///
/// Panics if the waker is not created by the Embassy executor.
pub fn task_from_waker(waker: &Waker) -> NonNull<TaskHeader> {
// safety: OK because WakerHack has the same layout as Waker.
// This is not really guaranteed because the structs are `repr(Rust)`, it is
// indeed the case in the current implementation.
// TODO use waker_getters when stable. https://github.com/rust-lang/rust/issues/96992
let hack: &WakerHack = unsafe { mem::transmute(waker) };
if hack.vtable != &VTABLE {
panic!("Found waker not created by the Embassy executor. `embassy_executor::time::Timer` only works with the Embassy executor.")
}
// safety: we never create a waker with a null data pointer.
unsafe { NonNull::new_unchecked(hack.data as *mut TaskHeader) }
}
struct WakerHack {
data: *const (),
vtable: &'static RawWakerVTable,
}

View File

@ -0,0 +1,202 @@
use core::marker::PhantomData;
use core::mem;
use core::ptr::NonNull;
use core::task::Poll;
use futures_util::future::poll_fn;
use super::raw;
/// Token to spawn a newly-created task in an executor.
///
/// When calling a task function (like `#[embassy_executor::task] async fn my_task() { ... }`), the returned
/// value is a `SpawnToken` that represents an instance of the task, ready to spawn. You must
/// then spawn it into an executor, typically with [`Spawner::spawn()`].
///
/// The generic parameter `S` determines whether the task can be spawned in executors
/// in other threads or not. If `S: Send`, it can, which allows spawning it into a [`SendSpawner`].
/// If not, it can't, so it can only be spawned into the current thread's executor, with [`Spawner`].
///
/// # Panics
///
/// Dropping a SpawnToken instance panics. You may not "abort" spawning a task in this way.
/// Once you've invoked a task function and obtained a SpawnToken, you *must* spawn it.
#[must_use = "Calling a task function does nothing on its own. You must spawn the returned SpawnToken, typically with Spawner::spawn()"]
pub struct SpawnToken<S> {
raw_task: Option<NonNull<raw::TaskHeader>>,
phantom: PhantomData<*mut S>,
}
impl<S> SpawnToken<S> {
pub(crate) unsafe fn new(raw_task: NonNull<raw::TaskHeader>) -> Self {
Self {
raw_task: Some(raw_task),
phantom: PhantomData,
}
}
pub(crate) fn new_failed() -> Self {
Self {
raw_task: None,
phantom: PhantomData,
}
}
}
impl<S> Drop for SpawnToken<S> {
fn drop(&mut self) {
// TODO deallocate the task instead.
panic!("SpawnToken instances may not be dropped. You must pass them to Spawner::spawn()")
}
}
/// Error returned when spawning a task.
#[derive(Copy, Clone, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum SpawnError {
/// Too many instances of this task are already running.
///
/// By default, a task marked with `#[embassy_executor::task]` can only have one instance
/// running at a time. You may allow multiple instances to run in parallel with
/// `#[embassy_executor::task(pool_size = 4)]`, at the cost of higher RAM usage.
Busy,
}
/// Handle to spawn tasks into an executor.
///
/// This Spawner can spawn any task (Send and non-Send ones), but it can
/// only be used in the executor thread (it is not Send itself).
///
/// If you want to spawn tasks from another thread, use [SendSpawner].
#[derive(Copy, Clone)]
pub struct Spawner {
executor: &'static raw::Executor,
not_send: PhantomData<*mut ()>,
}
impl Spawner {
pub(crate) fn new(executor: &'static raw::Executor) -> Self {
Self {
executor,
not_send: PhantomData,
}
}
/// Get a Spawner for the current executor.
///
/// This function is `async` just to get access to the current async
/// context. It returns instantly, it does not block/yield.
///
/// # Panics
///
/// Panics if the current executor is not an Embassy executor.
pub async fn for_current_executor() -> Self {
poll_fn(|cx| unsafe {
let task = raw::task_from_waker(cx.waker());
let executor = (*task.as_ptr()).executor.get();
Poll::Ready(Self::new(&*executor))
})
.await
}
/// Spawn a task into an executor.
///
/// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy_executor::task]`).
pub fn spawn<S>(&self, token: SpawnToken<S>) -> Result<(), SpawnError> {
let task = token.raw_task;
mem::forget(token);
match task {
Some(task) => {
unsafe { self.executor.spawn(task) };
Ok(())
}
None => Err(SpawnError::Busy),
}
}
// Used by the `embassy_macros::main!` macro to throw an error when spawn
// fails. This is here to allow conditional use of `defmt::unwrap!`
// without introducing a `defmt` feature in the `embassy_macros` package,
// which would require use of `-Z namespaced-features`.
/// Spawn a task into an executor, panicking on failure.
///
/// # Panics
///
/// Panics if the spawning fails.
pub fn must_spawn<S>(&self, token: SpawnToken<S>) {
unwrap!(self.spawn(token));
}
/// Convert this Spawner to a SendSpawner. This allows you to send the
/// spawner to other threads, but the spawner loses the ability to spawn
/// non-Send tasks.
pub fn make_send(&self) -> SendSpawner {
SendSpawner {
executor: self.executor,
}
}
}
/// Handle to spawn tasks into an executor from any thread.
///
/// This Spawner can be used from any thread (it is Send), but it can
/// only spawn Send tasks. The reason for this is spawning is effectively
/// "sending" the tasks to the executor thread.
///
/// If you want to spawn non-Send tasks, use [Spawner].
#[derive(Copy, Clone)]
pub struct SendSpawner {
executor: &'static raw::Executor,
}
unsafe impl Send for SendSpawner {}
unsafe impl Sync for SendSpawner {}
impl SendSpawner {
pub(crate) fn new(executor: &'static raw::Executor) -> Self {
Self { executor }
}
/// Get a Spawner for the current executor.
///
/// This function is `async` just to get access to the current async
/// context. It returns instantly, it does not block/yield.
///
/// # Panics
///
/// Panics if the current executor is not an Embassy executor.
pub async fn for_current_executor() -> Self {
poll_fn(|cx| unsafe {
let task = raw::task_from_waker(cx.waker());
let executor = (*task.as_ptr()).executor.get();
Poll::Ready(Self::new(&*executor))
})
.await
}
/// Spawn a task into an executor.
///
/// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy_executor::task]`).
pub fn spawn<S: Send>(&self, token: SpawnToken<S>) -> Result<(), SpawnError> {
let header = token.raw_task;
mem::forget(token);
match header {
Some(header) => {
unsafe { self.executor.spawn(header) };
Ok(())
}
None => Err(SpawnError::Busy),
}
}
/// Spawn a task into an executor, panicking on failure.
///
/// # Panics
///
/// Panics if the spawning fails.
pub fn must_spawn<S: Send>(&self, token: SpawnToken<S>) {
unwrap!(self.spawn(token));
}
}

228
embassy-executor/src/fmt.rs Normal file
View File

@ -0,0 +1,228 @@
#![macro_use]
#![allow(unused_macros)]
#[cfg(all(feature = "defmt", feature = "log"))]
compile_error!("You may not enable both `defmt` and `log` features.");
macro_rules! assert {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::assert!($($x)*);
#[cfg(feature = "defmt")]
::defmt::assert!($($x)*);
}
};
}
macro_rules! assert_eq {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::assert_eq!($($x)*);
#[cfg(feature = "defmt")]
::defmt::assert_eq!($($x)*);
}
};
}
macro_rules! assert_ne {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::assert_ne!($($x)*);
#[cfg(feature = "defmt")]
::defmt::assert_ne!($($x)*);
}
};
}
macro_rules! debug_assert {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::debug_assert!($($x)*);
#[cfg(feature = "defmt")]
::defmt::debug_assert!($($x)*);
}
};
}
macro_rules! debug_assert_eq {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::debug_assert_eq!($($x)*);
#[cfg(feature = "defmt")]
::defmt::debug_assert_eq!($($x)*);
}
};
}
macro_rules! debug_assert_ne {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::debug_assert_ne!($($x)*);
#[cfg(feature = "defmt")]
::defmt::debug_assert_ne!($($x)*);
}
};
}
macro_rules! todo {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::todo!($($x)*);
#[cfg(feature = "defmt")]
::defmt::todo!($($x)*);
}
};
}
macro_rules! unreachable {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::unreachable!($($x)*);
#[cfg(feature = "defmt")]
::defmt::unreachable!($($x)*);
}
};
}
macro_rules! panic {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::panic!($($x)*);
#[cfg(feature = "defmt")]
::defmt::panic!($($x)*);
}
};
}
macro_rules! trace {
($s:literal $(, $x:expr)* $(,)?) => {
{
#[cfg(feature = "log")]
::log::trace!($s $(, $x)*);
#[cfg(feature = "defmt")]
::defmt::trace!($s $(, $x)*);
#[cfg(not(any(feature = "log", feature="defmt")))]
let _ = ($( & $x ),*);
}
};
}
macro_rules! debug {
($s:literal $(, $x:expr)* $(,)?) => {
{
#[cfg(feature = "log")]
::log::debug!($s $(, $x)*);
#[cfg(feature = "defmt")]
::defmt::debug!($s $(, $x)*);
#[cfg(not(any(feature = "log", feature="defmt")))]
let _ = ($( & $x ),*);
}
};
}
macro_rules! info {
($s:literal $(, $x:expr)* $(,)?) => {
{
#[cfg(feature = "log")]
::log::info!($s $(, $x)*);
#[cfg(feature = "defmt")]
::defmt::info!($s $(, $x)*);
#[cfg(not(any(feature = "log", feature="defmt")))]
let _ = ($( & $x ),*);
}
};
}
macro_rules! warn {
($s:literal $(, $x:expr)* $(,)?) => {
{
#[cfg(feature = "log")]
::log::warn!($s $(, $x)*);
#[cfg(feature = "defmt")]
::defmt::warn!($s $(, $x)*);
#[cfg(not(any(feature = "log", feature="defmt")))]
let _ = ($( & $x ),*);
}
};
}
macro_rules! error {
($s:literal $(, $x:expr)* $(,)?) => {
{
#[cfg(feature = "log")]
::log::error!($s $(, $x)*);
#[cfg(feature = "defmt")]
::defmt::error!($s $(, $x)*);
#[cfg(not(any(feature = "log", feature="defmt")))]
let _ = ($( & $x ),*);
}
};
}
#[cfg(feature = "defmt")]
macro_rules! unwrap {
($($x:tt)*) => {
::defmt::unwrap!($($x)*)
};
}
#[cfg(not(feature = "defmt"))]
macro_rules! unwrap {
($arg:expr) => {
match $crate::fmt::Try::into_result($arg) {
::core::result::Result::Ok(t) => t,
::core::result::Result::Err(e) => {
::core::panic!("unwrap of `{}` failed: {:?}", ::core::stringify!($arg), e);
}
}
};
($arg:expr, $($msg:expr),+ $(,)? ) => {
match $crate::fmt::Try::into_result($arg) {
::core::result::Result::Ok(t) => t,
::core::result::Result::Err(e) => {
::core::panic!("unwrap of `{}` failed: {}: {:?}", ::core::stringify!($arg), ::core::format_args!($($msg,)*), e);
}
}
}
}
#[cfg(feature = "defmt-timestamp-uptime")]
defmt::timestamp! {"{=u64:us}", crate::time::Instant::now().as_micros() }
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub struct NoneError;
pub trait Try {
type Ok;
type Error;
fn into_result(self) -> Result<Self::Ok, Self::Error>;
}
impl<T> Try for Option<T> {
type Ok = T;
type Error = NoneError;
#[inline]
fn into_result(self) -> Result<T, NoneError> {
self.ok_or(NoneError)
}
}
impl<T, E> Try for Result<T, E> {
type Ok = T;
type Error = E;
#[inline]
fn into_result(self) -> Self {
self
}
}

View File

@ -0,0 +1,22 @@
#![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)]
#![cfg_attr(feature = "nightly", feature(generic_associated_types, type_alias_impl_trait))]
#![cfg_attr(all(feature = "nightly", target_arch = "xtensa"), feature(asm_experimental_arch))]
#![allow(clippy::new_without_default)]
#![doc = include_str!("../../README.md")]
#![warn(missing_docs)]
// This mod MUST go first, so that the others see its macros.
pub(crate) mod fmt;
pub mod executor;
#[cfg(feature = "time")]
pub mod time;
#[cfg(feature = "nightly")]
pub use embassy_macros::{main, task};
#[doc(hidden)]
/// Implementation details for embassy macros. DO NOT USE.
pub mod export {
pub use atomic_polyfill as atomic;
}

View File

@ -0,0 +1,98 @@
use super::{Duration, Instant};
/// Blocks for at least `duration`.
pub fn block_for(duration: Duration) {
let expires_at = Instant::now() + duration;
while Instant::now() < expires_at {}
}
/// Type implementing async delays and blocking `embedded-hal` delays.
///
/// The delays are implemented in a "best-effort" way, meaning that the cpu will block for at least
/// the amount provided, but accuracy can be affected by many factors, including interrupt usage.
/// Make sure to use a suitable tick rate for your use case. The tick rate is defined by the currently
/// active driver.
pub struct Delay;
#[cfg(feature = "unstable-traits")]
mod eh1 {
use super::*;
impl embedded_hal_1::delay::blocking::DelayUs for Delay {
type Error = core::convert::Infallible;
fn delay_us(&mut self, us: u32) -> Result<(), Self::Error> {
Ok(block_for(Duration::from_micros(us as u64)))
}
fn delay_ms(&mut self, ms: u32) -> Result<(), Self::Error> {
Ok(block_for(Duration::from_millis(ms as u64)))
}
}
}
cfg_if::cfg_if! {
if #[cfg(all(feature = "unstable-traits", feature = "nightly"))] {
use crate::time::Timer;
use core::future::Future;
use futures_util::FutureExt;
impl embedded_hal_async::delay::DelayUs for Delay {
type Error = core::convert::Infallible;
type DelayUsFuture<'a> = impl Future<Output = Result<(), Self::Error>> + 'a where Self: 'a;
fn delay_us(&mut self, micros: u32) -> Self::DelayUsFuture<'_> {
Timer::after(Duration::from_micros(micros as _)).map(Ok)
}
type DelayMsFuture<'a> = impl Future<Output = Result<(), Self::Error>> + 'a where Self: 'a;
fn delay_ms(&mut self, millis: u32) -> Self::DelayMsFuture<'_> {
Timer::after(Duration::from_millis(millis as _)).map(Ok)
}
}
}
}
mod eh02 {
use embedded_hal_02::blocking::delay::{DelayMs, DelayUs};
use super::*;
impl DelayMs<u8> for Delay {
fn delay_ms(&mut self, ms: u8) {
block_for(Duration::from_millis(ms as u64))
}
}
impl DelayMs<u16> for Delay {
fn delay_ms(&mut self, ms: u16) {
block_for(Duration::from_millis(ms as u64))
}
}
impl DelayMs<u32> for Delay {
fn delay_ms(&mut self, ms: u32) {
block_for(Duration::from_millis(ms as u64))
}
}
impl DelayUs<u8> for Delay {
fn delay_us(&mut self, us: u8) {
block_for(Duration::from_micros(us as u64))
}
}
impl DelayUs<u16> for Delay {
fn delay_us(&mut self, us: u16) {
block_for(Duration::from_micros(us as u64))
}
}
impl DelayUs<u32> for Delay {
fn delay_us(&mut self, us: u32) {
block_for(Duration::from_micros(us as u64))
}
}
}

View File

@ -0,0 +1,170 @@
//! Time driver interface
//!
//! This module defines the interface a driver needs to implement to power the `embassy_executor::time` module.
//!
//! # Implementing a driver
//!
//! - Define a struct `MyDriver`
//! - Implement [`Driver`] for it
//! - Register it as the global driver with [`time_driver_impl`].
//! - Enable the Cargo features `embassy-executor/time` and one of `embassy-executor/time-tick-*` corresponding to the
//! tick rate of your driver.
//!
//! If you wish to make the tick rate configurable by the end user, you should do so by exposing your own
//! Cargo features and having each enable the corresponding `embassy-executor/time-tick-*`.
//!
//! # Linkage details
//!
//! Instead of the usual "trait + generic params" approach, calls from embassy to the driver are done via `extern` functions.
//!
//! `embassy` internally defines the driver functions as `extern "Rust" { fn _embassy_time_now() -> u64; }` and calls them.
//! The driver crate defines the functions as `#[no_mangle] fn _embassy_time_now() -> u64`. The linker will resolve the
//! calls from the `embassy` crate to call into the driver crate.
//!
//! If there is none or multiple drivers in the crate tree, linking will fail.
//!
//! This method has a few key advantages for something as foundational as timekeeping:
//!
//! - The time driver is available everywhere easily, without having to thread the implementation
//! through generic parameters. This is especially helpful for libraries.
//! - It means comparing `Instant`s will always make sense: if there were multiple drivers
//! active, one could compare an `Instant` from driver A to an `Instant` from driver B, which
//! would yield incorrect results.
//!
//! # Example
//!
//! ```
//! use embassy_executor::time::driver::{Driver, AlarmHandle};
//!
//! struct MyDriver{}; // not public!
//! embassy_executor::time_driver_impl!(static DRIVER: MyDriver = MyDriver{});
//!
//! impl Driver for MyDriver {
//! fn now(&self) -> u64 {
//! todo!()
//! }
//! unsafe fn allocate_alarm(&self) -> Option<AlarmHandle> {
//! todo!()
//! }
//! fn set_alarm_callback(&self, alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) {
//! todo!()
//! }
//! fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) {
//! todo!()
//! }
//! }
//! ```
/// Alarm handle, assigned by the driver.
#[derive(Clone, Copy)]
pub struct AlarmHandle {
id: u8,
}
impl AlarmHandle {
/// Create an AlarmHandle
///
/// Safety: May only be called by the current global Driver impl.
/// The impl is allowed to rely on the fact that all `AlarmHandle` instances
/// are created by itself in unsafe code (e.g. indexing operations)
pub unsafe fn new(id: u8) -> Self {
Self { id }
}
/// Get the ID of the AlarmHandle.
pub fn id(&self) -> u8 {
self.id
}
}
/// Time driver
pub trait Driver: Send + Sync + 'static {
/// Return the current timestamp in ticks.
///
/// Implementations MUST ensure that:
/// - This is guaranteed to be monotonic, i.e. a call to now() will always return
/// a greater or equal value than earler calls. Time can't "roll backwards".
/// - It "never" overflows. It must not overflow in a sufficiently long time frame, say
/// in 10_000 years (Human civilization is likely to already have self-destructed
/// 10_000 years from now.). This means if your hardware only has 16bit/32bit timers
/// you MUST extend them to 64-bit, for example by counting overflows in software,
/// or chaining multiple timers together.
fn now(&self) -> u64;
/// Try allocating an alarm handle. Returns None if no alarms left.
/// Initially the alarm has no callback set, and a null `ctx` pointer.
///
/// # Safety
/// It is UB to make the alarm fire before setting a callback.
unsafe fn allocate_alarm(&self) -> Option<AlarmHandle>;
/// Sets the callback function to be called when the alarm triggers.
/// The callback may be called from any context (interrupt or thread mode).
fn set_alarm_callback(&self, alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ());
/// Sets an alarm at the given timestamp. When the current timestamp reaches the alarm
/// timestamp, the provided callback function will be called.
///
/// If `timestamp` is already in the past, the alarm callback must be immediately fired.
/// In this case, it is allowed (but not mandatory) to call the alarm callback synchronously from `set_alarm`.
///
/// When callback is called, it is guaranteed that now() will return a value greater or equal than timestamp.
///
/// Only one alarm can be active at a time for each AlarmHandle. This overwrites any previously-set alarm if any.
fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64);
}
extern "Rust" {
fn _embassy_time_now() -> u64;
fn _embassy_time_allocate_alarm() -> Option<AlarmHandle>;
fn _embassy_time_set_alarm_callback(alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ());
fn _embassy_time_set_alarm(alarm: AlarmHandle, timestamp: u64);
}
pub(crate) fn now() -> u64 {
unsafe { _embassy_time_now() }
}
/// Safety: it is UB to make the alarm fire before setting a callback.
pub(crate) unsafe fn allocate_alarm() -> Option<AlarmHandle> {
_embassy_time_allocate_alarm()
}
pub(crate) fn set_alarm_callback(alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) {
unsafe { _embassy_time_set_alarm_callback(alarm, callback, ctx) }
}
pub(crate) fn set_alarm(alarm: AlarmHandle, timestamp: u64) {
unsafe { _embassy_time_set_alarm(alarm, timestamp) }
}
/// Set the time Driver implementation.
///
/// See the module documentation for an example.
#[macro_export]
macro_rules! time_driver_impl {
(static $name:ident: $t: ty = $val:expr) => {
static $name: $t = $val;
#[no_mangle]
fn _embassy_time_now() -> u64 {
<$t as $crate::time::driver::Driver>::now(&$name)
}
#[no_mangle]
unsafe fn _embassy_time_allocate_alarm() -> Option<$crate::time::driver::AlarmHandle> {
<$t as $crate::time::driver::Driver>::allocate_alarm(&$name)
}
#[no_mangle]
fn _embassy_time_set_alarm_callback(
alarm: $crate::time::driver::AlarmHandle,
callback: fn(*mut ()),
ctx: *mut (),
) {
<$t as $crate::time::driver::Driver>::set_alarm_callback(&$name, alarm, callback, ctx)
}
#[no_mangle]
fn _embassy_time_set_alarm(alarm: $crate::time::driver::AlarmHandle, timestamp: u64) {
<$t as $crate::time::driver::Driver>::set_alarm(&$name, alarm, timestamp)
}
};
}

View File

@ -0,0 +1,208 @@
use std::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::sync::{Condvar, Mutex, Once};
use std::time::{Duration as StdDuration, Instant as StdInstant};
use std::{mem, ptr, thread};
use atomic_polyfill::{AtomicU8, Ordering};
use crate::time::driver::{AlarmHandle, Driver};
const ALARM_COUNT: usize = 4;
struct AlarmState {
timestamp: u64,
// This is really a Option<(fn(*mut ()), *mut ())>
// but fn pointers aren't allowed in const yet
callback: *const (),
ctx: *mut (),
}
unsafe impl Send for AlarmState {}
impl AlarmState {
const fn new() -> Self {
Self {
timestamp: u64::MAX,
callback: ptr::null(),
ctx: ptr::null_mut(),
}
}
}
struct TimeDriver {
alarm_count: AtomicU8,
once: Once,
alarms: UninitCell<Mutex<[AlarmState; ALARM_COUNT]>>,
zero_instant: UninitCell<StdInstant>,
signaler: UninitCell<Signaler>,
}
const ALARM_NEW: AlarmState = AlarmState::new();
crate::time_driver_impl!(static DRIVER: TimeDriver = TimeDriver {
alarm_count: AtomicU8::new(0),
once: Once::new(),
alarms: UninitCell::uninit(),
zero_instant: UninitCell::uninit(),
signaler: UninitCell::uninit(),
});
impl TimeDriver {
fn init(&self) {
self.once.call_once(|| unsafe {
self.alarms.write(Mutex::new([ALARM_NEW; ALARM_COUNT]));
self.zero_instant.write(StdInstant::now());
self.signaler.write(Signaler::new());
thread::spawn(Self::alarm_thread);
});
}
fn alarm_thread() {
let zero = unsafe { DRIVER.zero_instant.read() };
loop {
let now = DRIVER.now();
let mut next_alarm = u64::MAX;
{
let alarms = &mut *unsafe { DRIVER.alarms.as_ref() }.lock().unwrap();
for alarm in alarms {
if alarm.timestamp <= now {
alarm.timestamp = u64::MAX;
// Call after clearing alarm, so the callback can set another alarm.
// safety:
// - we can ignore the possiblity of `f` being unset (null) because of the safety contract of `allocate_alarm`.
// - other than that we only store valid function pointers into alarm.callback
let f: fn(*mut ()) = unsafe { mem::transmute(alarm.callback) };
f(alarm.ctx);
} else {
next_alarm = next_alarm.min(alarm.timestamp);
}
}
}
// Ensure we don't overflow
let until = zero
.checked_add(StdDuration::from_micros(next_alarm))
.unwrap_or_else(|| StdInstant::now() + StdDuration::from_secs(1));
unsafe { DRIVER.signaler.as_ref() }.wait_until(until);
}
}
}
impl Driver for TimeDriver {
fn now(&self) -> u64 {
self.init();
let zero = unsafe { self.zero_instant.read() };
StdInstant::now().duration_since(zero).as_micros() as u64
}
unsafe fn allocate_alarm(&self) -> Option<AlarmHandle> {
let id = self.alarm_count.fetch_update(Ordering::AcqRel, Ordering::Acquire, |x| {
if x < ALARM_COUNT as u8 {
Some(x + 1)
} else {
None
}
});
match id {
Ok(id) => Some(AlarmHandle::new(id)),
Err(_) => None,
}
}
fn set_alarm_callback(&self, alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) {
self.init();
let mut alarms = unsafe { self.alarms.as_ref() }.lock().unwrap();
let alarm = &mut alarms[alarm.id() as usize];
alarm.callback = callback as *const ();
alarm.ctx = ctx;
}
fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) {
self.init();
let mut alarms = unsafe { self.alarms.as_ref() }.lock().unwrap();
let alarm = &mut alarms[alarm.id() as usize];
alarm.timestamp = timestamp;
unsafe { self.signaler.as_ref() }.signal();
}
}
struct Signaler {
mutex: Mutex<bool>,
condvar: Condvar,
}
impl Signaler {
fn new() -> Self {
Self {
mutex: Mutex::new(false),
condvar: Condvar::new(),
}
}
fn wait_until(&self, until: StdInstant) {
let mut signaled = self.mutex.lock().unwrap();
while !*signaled {
let now = StdInstant::now();
if now >= until {
break;
}
let dur = until - now;
let (signaled2, timeout) = self.condvar.wait_timeout(signaled, dur).unwrap();
signaled = signaled2;
if timeout.timed_out() {
break;
}
}
*signaled = false;
}
fn signal(&self) {
let mut signaled = self.mutex.lock().unwrap();
*signaled = true;
self.condvar.notify_one();
}
}
pub(crate) struct UninitCell<T>(MaybeUninit<UnsafeCell<T>>);
unsafe impl<T> Send for UninitCell<T> {}
unsafe impl<T> Sync for UninitCell<T> {}
impl<T> UninitCell<T> {
pub const fn uninit() -> Self {
Self(MaybeUninit::uninit())
}
pub unsafe fn as_ptr(&self) -> *const T {
(*self.0.as_ptr()).get()
}
pub unsafe fn as_mut_ptr(&self) -> *mut T {
(*self.0.as_ptr()).get()
}
pub unsafe fn as_ref(&self) -> &T {
&*self.as_ptr()
}
pub unsafe fn write(&self, val: T) {
ptr::write(self.as_mut_ptr(), val)
}
}
impl<T: Copy> UninitCell<T> {
pub unsafe fn read(&self) -> T {
ptr::read(self.as_mut_ptr())
}
}

View File

@ -0,0 +1,134 @@
use std::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::ptr;
use std::sync::{Mutex, Once};
use atomic_polyfill::{AtomicU8, Ordering};
use wasm_bindgen::prelude::*;
use wasm_timer::Instant as StdInstant;
use crate::time::driver::{AlarmHandle, Driver};
const ALARM_COUNT: usize = 4;
struct AlarmState {
token: Option<f64>,
closure: Option<Closure<dyn FnMut() + 'static>>,
}
unsafe impl Send for AlarmState {}
impl AlarmState {
const fn new() -> Self {
Self {
token: None,
closure: None,
}
}
}
#[wasm_bindgen]
extern "C" {
fn setTimeout(closure: &Closure<dyn FnMut()>, millis: u32) -> f64;
fn clearTimeout(token: f64);
}
struct TimeDriver {
alarm_count: AtomicU8,
once: Once,
alarms: UninitCell<Mutex<[AlarmState; ALARM_COUNT]>>,
zero_instant: UninitCell<StdInstant>,
}
const ALARM_NEW: AlarmState = AlarmState::new();
crate::time_driver_impl!(static DRIVER: TimeDriver = TimeDriver {
alarm_count: AtomicU8::new(0),
once: Once::new(),
alarms: UninitCell::uninit(),
zero_instant: UninitCell::uninit(),
});
impl TimeDriver {
fn init(&self) {
self.once.call_once(|| unsafe {
self.alarms.write(Mutex::new([ALARM_NEW; ALARM_COUNT]));
self.zero_instant.write(StdInstant::now());
});
}
}
impl Driver for TimeDriver {
fn now(&self) -> u64 {
self.init();
let zero = unsafe { self.zero_instant.read() };
StdInstant::now().duration_since(zero).as_micros() as u64
}
unsafe fn allocate_alarm(&self) -> Option<AlarmHandle> {
let id = self.alarm_count.fetch_update(Ordering::AcqRel, Ordering::Acquire, |x| {
if x < ALARM_COUNT as u8 {
Some(x + 1)
} else {
None
}
});
match id {
Ok(id) => Some(AlarmHandle::new(id)),
Err(_) => None,
}
}
fn set_alarm_callback(&self, alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) {
self.init();
let mut alarms = unsafe { self.alarms.as_ref() }.lock().unwrap();
let alarm = &mut alarms[alarm.id() as usize];
alarm.closure.replace(Closure::new(move || {
callback(ctx);
}));
}
fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) {
self.init();
let mut alarms = unsafe { self.alarms.as_ref() }.lock().unwrap();
let alarm = &mut alarms[alarm.id() as usize];
let timeout = (timestamp - self.now()) as u32;
if let Some(token) = alarm.token {
clearTimeout(token);
}
alarm.token = Some(setTimeout(alarm.closure.as_ref().unwrap(), timeout / 1000));
}
}
pub(crate) struct UninitCell<T>(MaybeUninit<UnsafeCell<T>>);
unsafe impl<T> Send for UninitCell<T> {}
unsafe impl<T> Sync for UninitCell<T> {}
impl<T> UninitCell<T> {
pub const fn uninit() -> Self {
Self(MaybeUninit::uninit())
}
unsafe fn as_ptr(&self) -> *const T {
(*self.0.as_ptr()).get()
}
pub unsafe fn as_mut_ptr(&self) -> *mut T {
(*self.0.as_ptr()).get()
}
pub unsafe fn as_ref(&self) -> &T {
&*self.as_ptr()
}
pub unsafe fn write(&self, val: T) {
ptr::write(self.as_mut_ptr(), val)
}
}
impl<T: Copy> UninitCell<T> {
pub unsafe fn read(&self) -> T {
ptr::read(self.as_mut_ptr())
}
}

View File

@ -0,0 +1,184 @@
use core::fmt;
use core::ops::{Add, AddAssign, Div, DivAssign, Mul, MulAssign, Sub, SubAssign};
use super::{GCD_1K, GCD_1M, TICKS_PER_SECOND};
#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
/// Represents the difference between two [Instant](struct.Instant.html)s
pub struct Duration {
pub(crate) ticks: u64,
}
impl Duration {
/// The smallest value that can be represented by the `Duration` type.
pub const MIN: Duration = Duration { ticks: u64::MIN };
/// The largest value that can be represented by the `Duration` type.
pub const MAX: Duration = Duration { ticks: u64::MAX };
/// Tick count of the `Duration`.
pub const fn as_ticks(&self) -> u64 {
self.ticks
}
/// Convert the `Duration` to seconds, rounding down.
pub const fn as_secs(&self) -> u64 {
self.ticks / TICKS_PER_SECOND
}
/// Convert the `Duration` to milliseconds, rounding down.
pub const fn as_millis(&self) -> u64 {
self.ticks * (1000 / GCD_1K) / (TICKS_PER_SECOND / GCD_1K)
}
/// Convert the `Duration` to microseconds, rounding down.
pub const fn as_micros(&self) -> u64 {
self.ticks * (1_000_000 / GCD_1M) / (TICKS_PER_SECOND / GCD_1M)
}
/// Creates a duration from the specified number of clock ticks
pub const fn from_ticks(ticks: u64) -> Duration {
Duration { ticks }
}
/// Creates a duration from the specified number of seconds, rounding up.
pub const fn from_secs(secs: u64) -> Duration {
Duration {
ticks: secs * TICKS_PER_SECOND,
}
}
/// Creates a duration from the specified number of milliseconds, rounding up.
pub const fn from_millis(millis: u64) -> Duration {
Duration {
ticks: div_ceil(millis * (TICKS_PER_SECOND / GCD_1K), 1000 / GCD_1K),
}
}
/// Creates a duration from the specified number of microseconds, rounding up.
/// NOTE: Delays this small may be inaccurate.
pub const fn from_micros(micros: u64) -> Duration {
Duration {
ticks: div_ceil(micros * (TICKS_PER_SECOND / GCD_1M), 1_000_000 / GCD_1M),
}
}
/// Creates a duration from the specified number of seconds, rounding down.
pub const fn from_secs_floor(secs: u64) -> Duration {
Duration {
ticks: secs * TICKS_PER_SECOND,
}
}
/// Creates a duration from the specified number of milliseconds, rounding down.
pub const fn from_millis_floor(millis: u64) -> Duration {
Duration {
ticks: millis * (TICKS_PER_SECOND / GCD_1K) / (1000 / GCD_1K),
}
}
/// Creates a duration from the specified number of microseconds, rounding down.
/// NOTE: Delays this small may be inaccurate.
pub const fn from_micros_floor(micros: u64) -> Duration {
Duration {
ticks: micros * (TICKS_PER_SECOND / GCD_1M) / (1_000_000 / GCD_1M),
}
}
/// Adds one Duration to another, returning a new Duration or None in the event of an overflow.
pub fn checked_add(self, rhs: Duration) -> Option<Duration> {
self.ticks.checked_add(rhs.ticks).map(|ticks| Duration { ticks })
}
/// Subtracts one Duration to another, returning a new Duration or None in the event of an overflow.
pub fn checked_sub(self, rhs: Duration) -> Option<Duration> {
self.ticks.checked_sub(rhs.ticks).map(|ticks| Duration { ticks })
}
/// Multiplies one Duration by a scalar u32, returning a new Duration or None in the event of an overflow.
pub fn checked_mul(self, rhs: u32) -> Option<Duration> {
self.ticks.checked_mul(rhs as _).map(|ticks| Duration { ticks })
}
/// Divides one Duration a scalar u32, returning a new Duration or None in the event of an overflow.
pub fn checked_div(self, rhs: u32) -> Option<Duration> {
self.ticks.checked_div(rhs as _).map(|ticks| Duration { ticks })
}
}
impl Add for Duration {
type Output = Duration;
fn add(self, rhs: Duration) -> Duration {
self.checked_add(rhs).expect("overflow when adding durations")
}
}
impl AddAssign for Duration {
fn add_assign(&mut self, rhs: Duration) {
*self = *self + rhs;
}
}
impl Sub for Duration {
type Output = Duration;
fn sub(self, rhs: Duration) -> Duration {
self.checked_sub(rhs).expect("overflow when subtracting durations")
}
}
impl SubAssign for Duration {
fn sub_assign(&mut self, rhs: Duration) {
*self = *self - rhs;
}
}
impl Mul<u32> for Duration {
type Output = Duration;
fn mul(self, rhs: u32) -> Duration {
self.checked_mul(rhs)
.expect("overflow when multiplying duration by scalar")
}
}
impl Mul<Duration> for u32 {
type Output = Duration;
fn mul(self, rhs: Duration) -> Duration {
rhs * self
}
}
impl MulAssign<u32> for Duration {
fn mul_assign(&mut self, rhs: u32) {
*self = *self * rhs;
}
}
impl Div<u32> for Duration {
type Output = Duration;
fn div(self, rhs: u32) -> Duration {
self.checked_div(rhs)
.expect("divide by zero error when dividing duration by scalar")
}
}
impl DivAssign<u32> for Duration {
fn div_assign(&mut self, rhs: u32) {
*self = *self / rhs;
}
}
impl<'a> fmt::Display for Duration {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{} ticks", self.ticks)
}
}
#[inline]
const fn div_ceil(num: u64, den: u64) -> u64 {
(num + den - 1) / den
}

View File

@ -0,0 +1,159 @@
use core::fmt;
use core::ops::{Add, AddAssign, Sub, SubAssign};
use super::{driver, Duration, GCD_1K, GCD_1M, TICKS_PER_SECOND};
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
/// An Instant in time, based on the MCU's clock ticks since startup.
pub struct Instant {
ticks: u64,
}
impl Instant {
/// The smallest (earliest) value that can be represented by the `Instant` type.
pub const MIN: Instant = Instant { ticks: u64::MIN };
/// The largest (latest) value that can be represented by the `Instant` type.
pub const MAX: Instant = Instant { ticks: u64::MAX };
/// Returns an Instant representing the current time.
pub fn now() -> Instant {
Instant { ticks: driver::now() }
}
/// Create an Instant from a tick count since system boot.
pub const fn from_ticks(ticks: u64) -> Self {
Self { ticks }
}
/// Create an Instant from a microsecond count since system boot.
pub const fn from_micros(micros: u64) -> Self {
Self {
ticks: micros * (TICKS_PER_SECOND / GCD_1M) / (1_000_000 / GCD_1M),
}
}
/// Create an Instant from a millisecond count since system boot.
pub const fn from_millis(millis: u64) -> Self {
Self {
ticks: millis * (TICKS_PER_SECOND / GCD_1K) / (1000 / GCD_1K),
}
}
/// Create an Instant from a second count since system boot.
pub const fn from_secs(seconds: u64) -> Self {
Self {
ticks: seconds * TICKS_PER_SECOND,
}
}
/// Tick count since system boot.
pub const fn as_ticks(&self) -> u64 {
self.ticks
}
/// Seconds since system boot.
pub const fn as_secs(&self) -> u64 {
self.ticks / TICKS_PER_SECOND
}
/// Milliseconds since system boot.
pub const fn as_millis(&self) -> u64 {
self.ticks * (1000 / GCD_1K) / (TICKS_PER_SECOND / GCD_1K)
}
/// Microseconds since system boot.
pub const fn as_micros(&self) -> u64 {
self.ticks * (1_000_000 / GCD_1M) / (TICKS_PER_SECOND / GCD_1M)
}
/// Duration between this Instant and another Instant
/// Panics on over/underflow.
pub fn duration_since(&self, earlier: Instant) -> Duration {
Duration {
ticks: self.ticks.checked_sub(earlier.ticks).unwrap(),
}
}
/// Duration between this Instant and another Instant
pub fn checked_duration_since(&self, earlier: Instant) -> Option<Duration> {
if self.ticks < earlier.ticks {
None
} else {
Some(Duration {
ticks: self.ticks - earlier.ticks,
})
}
}
/// Returns the duration since the "earlier" Instant.
/// If the "earlier" instant is in the future, the duration is set to zero.
pub fn saturating_duration_since(&self, earlier: Instant) -> Duration {
Duration {
ticks: if self.ticks < earlier.ticks {
0
} else {
self.ticks - earlier.ticks
},
}
}
/// Duration elapsed since this Instant.
pub fn elapsed(&self) -> Duration {
Instant::now() - *self
}
/// Adds one Duration to self, returning a new `Instant` or None in the event of an overflow.
pub fn checked_add(&self, duration: Duration) -> Option<Instant> {
self.ticks.checked_add(duration.ticks).map(|ticks| Instant { ticks })
}
/// Subtracts one Duration to self, returning a new `Instant` or None in the event of an overflow.
pub fn checked_sub(&self, duration: Duration) -> Option<Instant> {
self.ticks.checked_sub(duration.ticks).map(|ticks| Instant { ticks })
}
}
impl Add<Duration> for Instant {
type Output = Instant;
fn add(self, other: Duration) -> Instant {
self.checked_add(other)
.expect("overflow when adding duration to instant")
}
}
impl AddAssign<Duration> for Instant {
fn add_assign(&mut self, other: Duration) {
*self = *self + other;
}
}
impl Sub<Duration> for Instant {
type Output = Instant;
fn sub(self, other: Duration) -> Instant {
self.checked_sub(other)
.expect("overflow when subtracting duration from instant")
}
}
impl SubAssign<Duration> for Instant {
fn sub_assign(&mut self, other: Duration) {
*self = *self - other;
}
}
impl Sub<Instant> for Instant {
type Output = Duration;
fn sub(self, other: Instant) -> Duration {
self.duration_since(other)
}
}
impl<'a> fmt::Display for Instant {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{} ticks", self.ticks)
}
}

View File

@ -0,0 +1,91 @@
//! Timekeeping, delays and timeouts.
//!
//! Timekeeping is done with elapsed time since system boot. Time is represented in
//! ticks, where the tick rate is defined by the current driver, usually to match
//! the tick rate of the hardware.
//!
//! Tick counts are 64 bits. At the highest supported tick rate of 1Mhz this supports
//! representing time spans of up to ~584558 years, which is big enough for all practical
//! purposes and allows not having to worry about overflows.
//!
//! [`Instant`] represents a given instant of time (relative to system boot), and [`Duration`]
//! represents the duration of a span of time. They implement the math operations you'd expect,
//! like addition and substraction.
//!
//! # Delays and timeouts
//!
//! [`Timer`] allows performing async delays. [`Ticker`] allows periodic delays without drifting over time.
//!
//! An implementation of the `embedded-hal` delay traits is provided by [`Delay`], for compatibility
//! with libraries from the ecosystem.
//!
//! # Wall-clock time
//!
//! The `time` module deals exclusively with a monotonically increasing tick count.
//! Therefore it has no direct support for wall-clock time ("real life" datetimes
//! like `2021-08-24 13:33:21`).
//!
//! If persistence across reboots is not needed, support can be built on top of
//! `embassy_executor::time` by storing the offset between "seconds elapsed since boot"
//! and "seconds since unix epoch".
//!
//! # Time driver
//!
//! The `time` module is backed by a global "time driver" specified at build time.
//! Only one driver can be active in a program.
//!
//! All methods and structs transparently call into the active driver. This makes it
//! possible for libraries to use `embassy_executor::time` in a driver-agnostic way without
//! requiring generic parameters.
//!
//! For more details, check the [`driver`] module.
#![deny(missing_docs)]
mod delay;
pub mod driver;
mod duration;
mod instant;
mod timer;
#[cfg(feature = "std")]
mod driver_std;
#[cfg(feature = "wasm")]
mod driver_wasm;
pub use delay::{block_for, Delay};
pub use duration::Duration;
pub use instant::Instant;
pub use timer::{with_timeout, Ticker, TimeoutError, Timer};
#[cfg(feature = "time-tick-1000hz")]
const TPS: u64 = 1_000;
#[cfg(feature = "time-tick-32768hz")]
const TPS: u64 = 32_768;
#[cfg(feature = "time-tick-1mhz")]
const TPS: u64 = 1_000_000;
#[cfg(feature = "time-tick-16mhz")]
const TPS: u64 = 16_000_000;
/// Ticks per second of the global timebase.
///
/// This value is specified by the `time-tick-*` Cargo features, which
/// should be set by the time driver. Some drivers support a fixed tick rate, others
/// allow you to choose a tick rate with Cargo features of their own. You should not
/// set the `time-tick-*` features for embassy yourself as an end user.
pub const TICKS_PER_SECOND: u64 = TPS;
const fn gcd(a: u64, b: u64) -> u64 {
if b == 0 {
a
} else {
gcd(b, a % b)
}
}
pub(crate) const GCD_1K: u64 = gcd(TICKS_PER_SECOND, 1_000);
pub(crate) const GCD_1M: u64 = gcd(TICKS_PER_SECOND, 1_000_000);

View File

@ -0,0 +1,151 @@
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use futures_util::future::{select, Either};
use futures_util::{pin_mut, Stream};
use crate::executor::raw;
use crate::time::{Duration, Instant};
/// Error returned by [`with_timeout`] on timeout.
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct TimeoutError;
/// Runs a given future with a timeout.
///
/// If the future completes before the timeout, its output is returned. Otherwise, on timeout,
/// work on the future is stopped (`poll` is no longer called), the future is dropped and `Err(TimeoutError)` is returned.
pub async fn with_timeout<F: Future>(timeout: Duration, fut: F) -> Result<F::Output, TimeoutError> {
let timeout_fut = Timer::after(timeout);
pin_mut!(fut);
match select(fut, timeout_fut).await {
Either::Left((r, _)) => Ok(r),
Either::Right(_) => Err(TimeoutError),
}
}
/// A future that completes at a specified [Instant](struct.Instant.html).
pub struct Timer {
expires_at: Instant,
yielded_once: bool,
}
impl Timer {
/// Expire at specified [Instant](struct.Instant.html)
pub fn at(expires_at: Instant) -> Self {
Self {
expires_at,
yielded_once: false,
}
}
/// Expire after specified [Duration](struct.Duration.html).
/// This can be used as a `sleep` abstraction.
///
/// Example:
/// ``` no_run
/// # #![feature(type_alias_impl_trait)]
/// #
/// # fn foo() {}
/// use embassy_executor::time::{Duration, Timer};
///
/// #[embassy_executor::task]
/// async fn demo_sleep_seconds() {
/// // suspend this task for one second.
/// Timer::after(Duration::from_secs(1)).await;
/// }
/// ```
pub fn after(duration: Duration) -> Self {
Self {
expires_at: Instant::now() + duration,
yielded_once: false,
}
}
}
impl Unpin for Timer {}
impl Future for Timer {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.yielded_once && self.expires_at <= Instant::now() {
Poll::Ready(())
} else {
unsafe { raw::register_timer(self.expires_at, cx.waker()) };
self.yielded_once = true;
Poll::Pending
}
}
}
/// Asynchronous stream that yields every Duration, indefinitely.
///
/// This stream will tick at uniform intervals, even if blocking work is performed between ticks.
///
/// For instance, consider the following code fragment.
/// ``` no_run
/// # #![feature(type_alias_impl_trait)]
/// #
/// use embassy_executor::time::{Duration, Timer};
/// # fn foo() {}
///
/// #[embassy_executor::task]
/// async fn ticker_example_0() {
/// loop {
/// foo();
/// Timer::after(Duration::from_secs(1)).await;
/// }
/// }
/// ```
///
/// This fragment will not call `foo` every second.
/// Instead, it will call it every second + the time it took to previously call `foo`.
///
/// Example using ticker, which will consistently call `foo` once a second.
///
/// ``` no_run
/// # #![feature(type_alias_impl_trait)]
/// #
/// use embassy_executor::time::{Duration, Ticker};
/// use futures::StreamExt;
/// # fn foo(){}
///
/// #[embassy_executor::task]
/// async fn ticker_example_1() {
/// let mut ticker = Ticker::every(Duration::from_secs(1));
/// loop {
/// foo();
/// ticker.next().await;
/// }
/// }
/// ```
pub struct Ticker {
expires_at: Instant,
duration: Duration,
}
impl Ticker {
/// Creates a new ticker that ticks at the specified duration interval.
pub fn every(duration: Duration) -> Self {
let expires_at = Instant::now() + duration;
Self { expires_at, duration }
}
}
impl Unpin for Ticker {}
impl Stream for Ticker {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.expires_at <= Instant::now() {
let dur = self.duration;
self.expires_at += dur;
Poll::Ready(Some(()))
} else {
unsafe { raw::register_timer(self.expires_at, cx.waker()) };
Poll::Pending
}
}
}