integrate static-executor, cleanup time module.

This commit is contained in:
Dario Nieuwenhuis 2020-10-19 21:15:24 +02:00
parent 0e1adc58f4
commit cd9ecaef57
18 changed files with 842 additions and 387 deletions

View File

@ -14,8 +14,6 @@ exclude = [
panic-probe = { git = "https://github.com/knurling-rs/probe-run", branch="main" } panic-probe = { git = "https://github.com/knurling-rs/probe-run", branch="main" }
defmt-rtt = { git = "https://github.com/knurling-rs/defmt", branch="cursed-symbol-names-linkers-must-repent-for-their-sins" } defmt-rtt = { git = "https://github.com/knurling-rs/defmt", branch="cursed-symbol-names-linkers-must-repent-for-their-sins" }
defmt = { git = "https://github.com/knurling-rs/defmt", branch="cursed-symbol-names-linkers-must-repent-for-their-sins" } defmt = { git = "https://github.com/knurling-rs/defmt", branch="cursed-symbol-names-linkers-must-repent-for-their-sins" }
static-executor = { git = "https://github.com/Dirbaio/static-executor", branch="multi"}
futures-intrusive = { git = "https://github.com/Dirbaio/futures-intrusive", branch="master"}
[profile.dev] [profile.dev]
codegen-units = 1 codegen-units = 1

13
embassy-macros/Cargo.toml Normal file
View File

@ -0,0 +1,13 @@
[package]
name = "embassy-macros"
version = "0.1.0"
authors = ["Dario Nieuwenhuis <dirbaio@dirbaio.net>"]
edition = "2018"
[dependencies]
syn = { version = "1.0.39", features = ["full", "extra-traits"] }
quote = "1.0.7"
darling = "0.10.2"
[lib]
proc-macro = true

114
embassy-macros/src/lib.rs Normal file
View File

@ -0,0 +1,114 @@
#![feature(proc_macro_diagnostic)]
extern crate proc_macro;
use darling::FromMeta;
use proc_macro::{Diagnostic, Level, Span, TokenStream};
use quote::{format_ident, quote};
use syn::spanned::Spanned;
#[derive(Debug, FromMeta)]
struct MacroArgs {
#[darling(default)]
pool_size: Option<usize>,
}
#[proc_macro_attribute]
pub fn task(args: TokenStream, item: TokenStream) -> TokenStream {
let args = syn::parse_macro_input!(args as syn::AttributeArgs);
let mut task_fn = syn::parse_macro_input!(item as syn::ItemFn);
let args = match MacroArgs::from_list(&args) {
Ok(v) => v,
Err(e) => {
return TokenStream::from(e.write_errors());
}
};
let pool_size: usize = args.pool_size.unwrap_or(1);
let mut fail = false;
if task_fn.sig.asyncness.is_none() {
task_fn
.sig
.span()
.unwrap()
.error("task functions must be async")
.emit();
fail = true;
}
if task_fn.sig.generics.params.len() != 0 {
task_fn
.sig
.span()
.unwrap()
.error("task functions must not be generic")
.emit();
fail = true;
}
if pool_size < 1 {
Span::call_site()
.error("pool_size must be 1 or greater")
.emit();
fail = true
}
let mut arg_names: syn::punctuated::Punctuated<syn::Ident, syn::Token![,]> =
syn::punctuated::Punctuated::new();
let args = &task_fn.sig.inputs;
for arg in args.iter() {
match arg {
syn::FnArg::Receiver(_) => {
arg.span()
.unwrap()
.error("task functions must not have receiver arguments")
.emit();
fail = true;
}
syn::FnArg::Typed(t) => match t.pat.as_ref() {
syn::Pat::Ident(i) => arg_names.push(i.ident.clone()),
_ => {
arg.span()
.unwrap()
.error("pattern matching in task arguments is not yet supporteds")
.emit();
fail = true;
}
},
}
}
if fail {
return TokenStream::new();
}
let name = task_fn.sig.ident.clone();
let type_name = format_ident!("__embassy_executor_type_{}", name);
let pool_name = format_ident!("__embassy_executor_pool_{}", name);
let task_fn_name = format_ident!("__embassy_executor_task_{}", name);
let create_fn_name = format_ident!("__embassy_executor_create_{}", name);
let visibility = &task_fn.vis;
task_fn.sig.ident = task_fn_name.clone();
let result = quote! {
#task_fn
#[allow(non_camel_case_types)]
type #type_name = impl ::core::future::Future + 'static;
fn #create_fn_name(#args) -> #type_name {
#task_fn_name(#arg_names)
}
#[allow(non_upper_case_globals)]
static #pool_name: [::embassy::executor::Task<#type_name>; #pool_size] = [::embassy::executor::Task::new(); #pool_size];
#visibility fn #name(#args) -> ::embassy::executor::SpawnToken {
unsafe { ::embassy::executor::Task::spawn(&#pool_name, || #create_fn_name(#arg_names)) }
}
};
result.into()
}

View File

@ -13,4 +13,4 @@ cortex-m = "0.6.3"
futures = { version = "0.3.5", default-features = false, features = [ "async-await" ] } futures = { version = "0.3.5", default-features = false, features = [ "async-await" ] }
pin-project = { version = "0.4.23", default-features = false } pin-project = { version = "0.4.23", default-features = false }
futures-intrusive = { version = "0.3.1", default-features = false } futures-intrusive = { version = "0.3.1", default-features = false }
static-executor = { version = "0.1.0", features=["defmt"]} embassy-macros = { version = "0.1.0", path = "../embassy-macros"}

View File

@ -1,48 +0,0 @@
use core::marker::PhantomData;
use static_executor as se;
use crate::time;
use crate::time::Alarm;
pub use se::{task, SpawnError, SpawnToken};
pub struct Executor<A: Alarm> {
inner: se::Executor,
alarm: A,
timer: time::TimerService,
}
impl<A: Alarm> Executor<A> {
pub fn new(alarm: A, signal_fn: fn()) -> Self {
alarm.set_callback(signal_fn);
Self {
inner: se::Executor::new(signal_fn),
alarm,
timer: time::TimerService::new(time::IntrusiveClock),
}
}
/// Spawn a future on this executor.
///
/// safety: can only be called from the executor thread
pub unsafe fn spawn(&'static self, token: SpawnToken) -> Result<(), SpawnError> {
self.inner.spawn(token)
}
/// Runs the executor until the queue is empty.
///
/// safety: can only be called from the executor thread
pub unsafe fn run(&'static self) {
time::with_timer_service(&self.timer, || {
self.timer.check_expirations();
self.inner.run();
match self.timer.next_expiration() {
// If this is in the past, set_alarm will immediately trigger the alarm,
// which will make the wfe immediately return so we do another loop iteration.
Some(at) => self.alarm.set(at),
None => self.alarm.clear(),
}
})
}
}

View File

@ -0,0 +1,305 @@
#![no_std]
#![feature(const_fn)]
use core::cell::Cell;
use core::cell::UnsafeCell;
use core::future::Future;
use core::mem;
use core::mem::MaybeUninit;
use core::pin::Pin;
use core::ptr;
use core::ptr::NonNull;
use core::sync::atomic::{AtomicPtr, AtomicU32, Ordering};
use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
//=============
// UninitCell
struct UninitCell<T>(MaybeUninit<UnsafeCell<T>>);
impl<T> UninitCell<T> {
const fn uninit() -> Self {
Self(MaybeUninit::uninit())
}
unsafe fn as_mut_ptr(&self) -> *mut T {
(*self.0.as_ptr()).get()
}
unsafe fn as_mut(&self) -> &mut T {
&mut *self.as_mut_ptr()
}
unsafe fn write(&self, val: T) {
ptr::write(self.as_mut_ptr(), val)
}
unsafe fn drop_in_place(&self) {
ptr::drop_in_place(self.as_mut_ptr())
}
}
impl<T: Copy> UninitCell<T> {
unsafe fn read(&self) -> T {
ptr::read(self.as_mut_ptr())
}
}
//=============
// Data structures
const STATE_RUNNING: u32 = 1 << 0;
const STATE_QUEUED: u32 = 1 << 1;
struct Header {
state: AtomicU32,
next: AtomicPtr<Header>,
executor: Cell<*const Executor>,
poll_fn: UninitCell<unsafe fn(*mut Header)>, // Valid if STATE_RUNNING
}
// repr(C) is needed to guarantee that header is located at offset 0
// This makes it safe to cast between Header and Task pointers.
#[repr(C)]
pub struct Task<F: Future + 'static> {
header: Header,
future: UninitCell<F>, // Valid if STATE_RUNNING
}
#[derive(Copy, Clone, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum SpawnError {
Busy,
}
//=============
// Atomic task queue using a very, very simple lock-free linked-list queue:
//
// To enqueue a task, task.next is set to the old head, and head is atomically set to task.
//
// Dequeuing is done in batches: the queue is emptied by atomically replacing head with
// null. Then the batch is iterated following the next pointers until null is reached.
//
// Note that batches will be iterated in the opposite order as they were enqueued. This should
// be OK for our use case. Hopefully it doesn't create executor fairness problems.
struct Queue {
head: AtomicPtr<Header>,
}
impl Queue {
const fn new() -> Self {
Self {
head: AtomicPtr::new(ptr::null_mut()),
}
}
/// Enqueues an item. Returns true if the queue was empty.
unsafe fn enqueue(&self, item: *mut Header) -> bool {
let mut prev = self.head.load(Ordering::Acquire);
loop {
(*item).next.store(prev, Ordering::Relaxed);
match self
.head
.compare_exchange_weak(prev, item, Ordering::AcqRel, Ordering::Acquire)
{
Ok(_) => break,
Err(next_prev) => prev = next_prev,
}
}
prev.is_null()
}
unsafe fn dequeue_all(&self, on_task: impl Fn(*mut Header)) {
loop {
let mut task = self.head.swap(ptr::null_mut(), Ordering::AcqRel);
if task.is_null() {
// Queue is empty, we're done
return;
}
while !task.is_null() {
on_task(task);
task = (*task).next.load(Ordering::Relaxed);
}
}
}
}
//=============
// Waker
static WAKER_VTABLE: RawWakerVTable =
RawWakerVTable::new(waker_clone, waker_wake, waker_wake, waker_drop);
unsafe fn waker_clone(p: *const ()) -> RawWaker {
RawWaker::new(p, &WAKER_VTABLE)
}
unsafe fn waker_wake(p: *const ()) {
let header = &*(p as *const Header);
let mut current = header.state.load(Ordering::Acquire);
loop {
// If already scheduled, or if not started,
if (current & STATE_QUEUED != 0) || (current & STATE_RUNNING == 0) {
return;
}
// Mark it as scheduled
let new = current | STATE_QUEUED;
match header
.state
.compare_exchange_weak(current, new, Ordering::AcqRel, Ordering::Acquire)
{
Ok(_) => break,
Err(next_current) => current = next_current,
}
}
// We have just marked the task as scheduled, so enqueue it.
let executor = &*header.executor.get();
executor.enqueue(p as *mut Header);
}
unsafe fn waker_drop(_: *const ()) {
// nop
}
//=============
// Task
impl<F: Future + 'static> Task<F> {
pub const fn new() -> Self {
Self {
header: Header {
state: AtomicU32::new(0),
next: AtomicPtr::new(ptr::null_mut()),
executor: Cell::new(ptr::null()),
poll_fn: UninitCell::uninit(),
},
future: UninitCell::uninit(),
}
}
pub unsafe fn spawn(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken {
for task in pool {
let state = STATE_RUNNING | STATE_QUEUED;
if task
.header
.state
.compare_and_swap(0, state, Ordering::AcqRel)
== 0
{
// Initialize the task
task.header.poll_fn.write(Self::poll);
task.future.write(future());
return SpawnToken {
header: Some(NonNull::new_unchecked(&task.header as *const Header as _)),
};
}
}
return SpawnToken { header: None };
}
unsafe fn poll(p: *mut Header) {
let this = &*(p as *const Task<F>);
let future = Pin::new_unchecked(this.future.as_mut());
let waker = Waker::from_raw(RawWaker::new(p as _, &WAKER_VTABLE));
let mut cx = Context::from_waker(&waker);
match future.poll(&mut cx) {
Poll::Ready(_) => {
this.future.drop_in_place();
this.header
.state
.fetch_and(!STATE_RUNNING, Ordering::AcqRel);
}
Poll::Pending => {}
}
}
}
unsafe impl<F: Future + 'static> Sync for Task<F> {}
//=============
// Spawn token
#[must_use = "Calling a task function does nothing on its own. To spawn a task, pass the result to Executor::spawn()"]
pub struct SpawnToken {
header: Option<NonNull<Header>>,
}
impl Drop for SpawnToken {
fn drop(&mut self) {
// TODO maybe we can deallocate the task instead.
panic!("Please do not drop SpawnToken instances")
}
}
//=============
// Executor
pub struct Executor {
queue: Queue,
signal_fn: fn(),
}
impl Executor {
pub const fn new(signal_fn: fn()) -> Self {
Self {
queue: Queue::new(),
signal_fn: signal_fn,
}
}
unsafe fn enqueue(&self, item: *mut Header) {
if self.queue.enqueue(item) {
(self.signal_fn)()
}
}
/// Spawn a future on this executor.
///
/// safety: can only be called from the executor thread
pub unsafe fn spawn(&'static self, token: SpawnToken) -> Result<(), SpawnError> {
let header = token.header;
mem::forget(token);
match header {
Some(header) => {
let header = header.as_ref();
header.executor.set(self);
self.enqueue(header as *const _ as _);
Ok(())
}
None => Err(SpawnError::Busy),
}
}
/// Runs the executor until the queue is empty.
///
/// safety: can only be called from the executor thread
pub unsafe fn run(&self) {
self.queue.dequeue_all(|p| {
let header = &*p;
let state = header.state.fetch_and(!STATE_QUEUED, Ordering::AcqRel);
if state & STATE_RUNNING == 0 {
// If task is not running, ignore it. This can happen in the following scenario:
// - Task gets dequeued, poll starts
// - While task is being polled, it gets woken. It gets placed in the queue.
// - Task poll finishes, returning done=true
// - RUNNING bit is cleared, but the task is already in the queue.
return;
}
// Run the task
header.poll_fn.read()(p as _);
});
}
}

View File

@ -0,0 +1,9 @@
mod executor;
mod timer_executor;
// for time::Timer
pub(crate) use timer_executor::current_timer_queue;
pub use embassy_macros::task;
pub use executor::{Executor, SpawnError, SpawnToken, Task};
pub use timer_executor::TimerExecutor;

View File

@ -0,0 +1,77 @@
use super::executor::{Executor, SpawnError, SpawnToken};
use core::ptr;
use core::sync::atomic::{AtomicPtr, Ordering};
use futures_intrusive::timer as fi;
use crate::time::Alarm;
pub(crate) struct IntrusiveClock;
impl fi::Clock for IntrusiveClock {
fn now(&self) -> u64 {
crate::time::now()
}
}
pub(crate) type TimerQueue = fi::LocalTimerService;
pub struct TimerExecutor<A: Alarm> {
inner: Executor,
alarm: A,
timer_queue: TimerQueue,
}
impl<A: Alarm> TimerExecutor<A> {
pub fn new(alarm: A, signal_fn: fn()) -> Self {
alarm.set_callback(signal_fn);
Self {
inner: Executor::new(signal_fn),
alarm,
timer_queue: TimerQueue::new(&IntrusiveClock),
}
}
/// Spawn a future on this executor.
///
/// safety: can only be called from the executor thread
pub unsafe fn spawn(&'static self, token: SpawnToken) -> Result<(), SpawnError> {
self.inner.spawn(token)
}
/// Runs the executor until the queue is empty.
///
/// safety: can only be called from the executor thread
pub unsafe fn run(&'static self) {
with_timer_queue(&self.timer_queue, || {
self.timer_queue.check_expirations();
self.inner.run();
match self.timer_queue.next_expiration() {
// If this is in the past, set_alarm will immediately trigger the alarm,
// which will make the wfe immediately return so we do another loop iteration.
Some(at) => self.alarm.set(at),
None => self.alarm.clear(),
}
})
}
}
static CURRENT_TIMER_QUEUE: AtomicPtr<TimerQueue> = AtomicPtr::new(ptr::null_mut());
fn with_timer_queue<R>(svc: &'static TimerQueue, f: impl FnOnce() -> R) -> R {
let svc = svc as *const _ as *mut _;
let prev_svc = CURRENT_TIMER_QUEUE.swap(svc, Ordering::Relaxed);
let r = f();
let svc2 = CURRENT_TIMER_QUEUE.swap(prev_svc, Ordering::Relaxed);
assert_eq!(svc, svc2);
r
}
pub(crate) fn current_timer_queue() -> &'static TimerQueue {
unsafe {
CURRENT_TIMER_QUEUE
.load(Ordering::Relaxed)
.as_ref()
.unwrap()
}
}

View File

@ -1,331 +0,0 @@
use core::cell::Cell;
use core::convert::TryInto;
use core::future::Future;
use core::ops::{Add, AddAssign, Div, DivAssign, Mul, MulAssign, Sub, SubAssign};
use core::pin::Pin;
use core::ptr;
use core::sync::atomic::{AtomicPtr, Ordering};
use core::task::{Context, Poll};
use crate::util::*;
use fi::LocalTimer;
use futures_intrusive::timer as fi;
static mut CLOCK: Option<&'static dyn Clock> = None;
pub unsafe fn set_clock(clock: &'static dyn Clock) {
CLOCK = Some(clock);
}
fn now() -> u64 {
unsafe { CLOCK.dexpect(defmt::intern!("No clock set")).now() }
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct Instant {
ticks: u64,
}
// TODO allow customizing, probably via Cargo features `tick-hz-32768` or something.
pub const TICKS_PER_SECOND: u32 = 32768;
impl Instant {
pub fn now() -> Instant {
Instant { ticks: now() }
}
pub fn into_ticks(&self) -> u64 {
self.ticks
}
pub fn duration_since(&self, earlier: Instant) -> Duration {
Duration {
ticks: (self.ticks - earlier.ticks).try_into().unwrap(),
}
}
pub fn checked_duration_since(&self, earlier: Instant) -> Option<Duration> {
if self.ticks < earlier.ticks {
None
} else {
Some(Duration {
ticks: (self.ticks - earlier.ticks).try_into().unwrap(),
})
}
}
pub fn saturating_duration_since(&self, earlier: Instant) -> Duration {
Duration {
ticks: if self.ticks < earlier.ticks {
0
} else {
(self.ticks - earlier.ticks).try_into().unwrap()
},
}
}
pub fn elapsed(&self) -> Duration {
Instant::now() - *self
}
pub fn checked_add(&self, duration: Duration) -> Option<Instant> {
self.ticks
.checked_add(duration.ticks.into())
.map(|ticks| Instant { ticks })
}
pub fn checked_sub(&self, duration: Duration) -> Option<Instant> {
self.ticks
.checked_sub(duration.ticks.into())
.map(|ticks| Instant { ticks })
}
}
impl Add<Duration> for Instant {
type Output = Instant;
fn add(self, other: Duration) -> Instant {
self.checked_add(other)
.expect("overflow when adding duration to instant")
}
}
impl AddAssign<Duration> for Instant {
fn add_assign(&mut self, other: Duration) {
*self = *self + other;
}
}
impl Sub<Duration> for Instant {
type Output = Instant;
fn sub(self, other: Duration) -> Instant {
self.checked_sub(other)
.expect("overflow when subtracting duration from instant")
}
}
impl SubAssign<Duration> for Instant {
fn sub_assign(&mut self, other: Duration) {
*self = *self - other;
}
}
impl Sub<Instant> for Instant {
type Output = Duration;
fn sub(self, other: Instant) -> Duration {
self.duration_since(other)
}
}
#[derive(defmt::Format, Debug, Default, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct Duration {
ticks: u32,
}
impl Duration {
pub fn into_ticks(&self) -> u32 {
self.ticks
}
pub const fn from_ticks(ticks: u32) -> Duration {
Duration { ticks }
}
pub const fn from_secs(secs: u32) -> Duration {
Duration {
ticks: secs * TICKS_PER_SECOND,
}
}
pub const fn from_millis(millis: u32) -> Duration {
Duration {
ticks: millis * TICKS_PER_SECOND / 1000,
}
}
pub fn checked_add(self, rhs: Duration) -> Option<Duration> {
self.ticks
.checked_add(rhs.ticks)
.map(|ticks| Duration { ticks })
}
pub fn checked_sub(self, rhs: Duration) -> Option<Duration> {
self.ticks
.checked_sub(rhs.ticks)
.map(|ticks| Duration { ticks })
}
pub fn checked_mul(self, rhs: u32) -> Option<Duration> {
self.ticks.checked_mul(rhs).map(|ticks| Duration { ticks })
}
pub fn checked_div(self, rhs: u32) -> Option<Duration> {
self.ticks.checked_div(rhs).map(|ticks| Duration { ticks })
}
}
impl Add for Duration {
type Output = Duration;
fn add(self, rhs: Duration) -> Duration {
self.checked_add(rhs)
.expect("overflow when adding durations")
}
}
impl AddAssign for Duration {
fn add_assign(&mut self, rhs: Duration) {
*self = *self + rhs;
}
}
impl Sub for Duration {
type Output = Duration;
fn sub(self, rhs: Duration) -> Duration {
self.checked_sub(rhs)
.expect("overflow when subtracting durations")
}
}
impl SubAssign for Duration {
fn sub_assign(&mut self, rhs: Duration) {
*self = *self - rhs;
}
}
impl Mul<u32> for Duration {
type Output = Duration;
fn mul(self, rhs: u32) -> Duration {
self.checked_mul(rhs)
.expect("overflow when multiplying duration by scalar")
}
}
impl Mul<Duration> for u32 {
type Output = Duration;
fn mul(self, rhs: Duration) -> Duration {
rhs * self
}
}
impl MulAssign<u32> for Duration {
fn mul_assign(&mut self, rhs: u32) {
*self = *self * rhs;
}
}
impl Div<u32> for Duration {
type Output = Duration;
fn div(self, rhs: u32) -> Duration {
self.checked_div(rhs)
.expect("divide by zero error when dividing duration by scalar")
}
}
impl DivAssign<u32> for Duration {
fn div_assign(&mut self, rhs: u32) {
*self = *self / rhs;
}
}
pub(crate) struct IntrusiveClock;
impl fi::Clock for IntrusiveClock {
fn now(&self) -> u64 {
now()
}
}
pub(crate) type TimerService = fi::LocalTimerService<IntrusiveClock>;
static CURRENT_TIMER_SERVICE: AtomicPtr<TimerService> = AtomicPtr::new(ptr::null_mut());
pub(crate) fn with_timer_service<R>(svc: &'static TimerService, f: impl FnOnce() -> R) -> R {
let svc = svc as *const _ as *mut _;
let prev_svc = CURRENT_TIMER_SERVICE.swap(svc, Ordering::Relaxed);
let r = f();
let svc2 = CURRENT_TIMER_SERVICE.swap(prev_svc, Ordering::Relaxed);
assert_eq!(svc, svc2);
r
}
fn current_timer_service() -> &'static TimerService {
unsafe {
CURRENT_TIMER_SERVICE
.load(Ordering::Relaxed)
.as_ref()
.unwrap()
}
}
pub struct Timer {
inner: fi::LocalTimerFuture<'static>,
}
impl Timer {
pub fn at(when: Instant) -> Self {
let svc: &TimerService = current_timer_service();
Self {
inner: svc.deadline(when.into_ticks()),
}
}
pub fn after(dur: Duration) -> Self {
Self::at(Instant::now() + dur)
}
}
impl Future for Timer {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().inner) }.poll(cx)
}
}
/// Monotonic clock
pub trait Clock {
/// Return the current timestamp in ticks.
/// This is guaranteed to be monotonic, i.e. a call to now() will always return
/// a greater or equal value than earler calls.
fn now(&self) -> u64;
}
impl<T: Clock + ?Sized> Clock for &T {
fn now(&self) -> u64 {
T::now(self)
}
}
/// Trait to register a callback at a given timestamp.
pub trait Alarm {
/// Sets the callback function to be called when the alarm triggers.
/// The callback may be called from any context (interrupt or thread mode).
fn set_callback(&self, callback: fn());
/// Sets an alarm at the given timestamp. When the clock reaches that
/// timestamp, the provided callback funcion will be called.
///
/// When callback is called, it is guaranteed that now() will return a value greater or equal than timestamp.
///
/// Only one alarm can be active at a time. This overwrites any previously-set alarm if any.
fn set(&self, timestamp: u64);
/// Clears the previously-set alarm.
/// If no alarm was set, this is a noop.
fn clear(&self);
}
impl<T: Alarm + ?Sized> Alarm for &T {
fn set_callback(&self, callback: fn()) {
T::set_callback(self, callback);
}
fn set(&self, timestamp: u64) {
T::set(self, timestamp);
}
fn clear(&self) {
T::clear(self)
}
}

View File

@ -0,0 +1,118 @@
use core::ops::{Add, AddAssign, Div, DivAssign, Mul, MulAssign, Sub, SubAssign};
use super::TICKS_PER_SECOND;
#[derive(defmt::Format, Debug, Default, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct Duration {
pub(crate) ticks: u32,
}
impl Duration {
pub fn into_ticks(&self) -> u32 {
self.ticks
}
pub const fn from_ticks(ticks: u32) -> Duration {
Duration { ticks }
}
pub const fn from_secs(secs: u32) -> Duration {
Duration {
ticks: secs * TICKS_PER_SECOND,
}
}
pub const fn from_millis(millis: u32) -> Duration {
Duration {
ticks: millis * TICKS_PER_SECOND / 1000,
}
}
pub fn checked_add(self, rhs: Duration) -> Option<Duration> {
self.ticks
.checked_add(rhs.ticks)
.map(|ticks| Duration { ticks })
}
pub fn checked_sub(self, rhs: Duration) -> Option<Duration> {
self.ticks
.checked_sub(rhs.ticks)
.map(|ticks| Duration { ticks })
}
pub fn checked_mul(self, rhs: u32) -> Option<Duration> {
self.ticks.checked_mul(rhs).map(|ticks| Duration { ticks })
}
pub fn checked_div(self, rhs: u32) -> Option<Duration> {
self.ticks.checked_div(rhs).map(|ticks| Duration { ticks })
}
}
impl Add for Duration {
type Output = Duration;
fn add(self, rhs: Duration) -> Duration {
self.checked_add(rhs)
.expect("overflow when adding durations")
}
}
impl AddAssign for Duration {
fn add_assign(&mut self, rhs: Duration) {
*self = *self + rhs;
}
}
impl Sub for Duration {
type Output = Duration;
fn sub(self, rhs: Duration) -> Duration {
self.checked_sub(rhs)
.expect("overflow when subtracting durations")
}
}
impl SubAssign for Duration {
fn sub_assign(&mut self, rhs: Duration) {
*self = *self - rhs;
}
}
impl Mul<u32> for Duration {
type Output = Duration;
fn mul(self, rhs: u32) -> Duration {
self.checked_mul(rhs)
.expect("overflow when multiplying duration by scalar")
}
}
impl Mul<Duration> for u32 {
type Output = Duration;
fn mul(self, rhs: Duration) -> Duration {
rhs * self
}
}
impl MulAssign<u32> for Duration {
fn mul_assign(&mut self, rhs: u32) {
*self = *self * rhs;
}
}
impl Div<u32> for Duration {
type Output = Duration;
fn div(self, rhs: u32) -> Duration {
self.checked_div(rhs)
.expect("divide by zero error when dividing duration by scalar")
}
}
impl DivAssign<u32> for Duration {
fn div_assign(&mut self, rhs: u32) {
*self = *self / rhs;
}
}

102
embassy/src/time/instant.rs Normal file
View File

@ -0,0 +1,102 @@
use core::convert::TryInto;
use core::ops::{Add, AddAssign, Div, DivAssign, Mul, MulAssign, Sub, SubAssign};
use core::pin::Pin;
use core::ptr;
use core::sync::atomic::{AtomicPtr, Ordering};
use core::task::{Context, Poll};
use super::{now, Duration};
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct Instant {
ticks: u64,
}
impl Instant {
pub fn now() -> Instant {
Instant { ticks: now() }
}
pub fn into_ticks(&self) -> u64 {
self.ticks
}
pub fn duration_since(&self, earlier: Instant) -> Duration {
Duration {
ticks: (self.ticks - earlier.ticks).try_into().unwrap(),
}
}
pub fn checked_duration_since(&self, earlier: Instant) -> Option<Duration> {
if self.ticks < earlier.ticks {
None
} else {
Some(Duration {
ticks: (self.ticks - earlier.ticks).try_into().unwrap(),
})
}
}
pub fn saturating_duration_since(&self, earlier: Instant) -> Duration {
Duration {
ticks: if self.ticks < earlier.ticks {
0
} else {
(self.ticks - earlier.ticks).try_into().unwrap()
},
}
}
pub fn elapsed(&self) -> Duration {
Instant::now() - *self
}
pub fn checked_add(&self, duration: Duration) -> Option<Instant> {
self.ticks
.checked_add(duration.ticks.into())
.map(|ticks| Instant { ticks })
}
pub fn checked_sub(&self, duration: Duration) -> Option<Instant> {
self.ticks
.checked_sub(duration.ticks.into())
.map(|ticks| Instant { ticks })
}
}
impl Add<Duration> for Instant {
type Output = Instant;
fn add(self, other: Duration) -> Instant {
self.checked_add(other)
.expect("overflow when adding duration to instant")
}
}
impl AddAssign<Duration> for Instant {
fn add_assign(&mut self, other: Duration) {
*self = *self + other;
}
}
impl Sub<Duration> for Instant {
type Output = Instant;
fn sub(self, other: Duration) -> Instant {
self.checked_sub(other)
.expect("overflow when subtracting duration from instant")
}
}
impl SubAssign<Duration> for Instant {
fn sub_assign(&mut self, other: Duration) {
*self = *self - other;
}
}
impl Sub<Instant> for Instant {
type Output = Duration;
fn sub(self, other: Instant) -> Duration {
self.duration_since(other)
}
}

24
embassy/src/time/mod.rs Normal file
View File

@ -0,0 +1,24 @@
mod duration;
mod instant;
mod timer;
mod traits;
pub use duration::Duration;
pub use instant::Instant;
pub use timer::Timer;
pub use traits::*;
use crate::util::Dewrap;
// TODO allow customizing, probably via Cargo features `tick-hz-32768` or something.
pub const TICKS_PER_SECOND: u32 = 32768;
static mut CLOCK: Option<&'static dyn Clock> = None;
pub unsafe fn set_clock(clock: &'static dyn Clock) {
CLOCK = Some(clock);
}
pub(crate) fn now() -> u64 {
unsafe { CLOCK.dexpect(defmt::intern!("No clock set")).now() }
}

30
embassy/src/time/timer.rs Normal file
View File

@ -0,0 +1,30 @@
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use futures_intrusive::timer::{LocalTimer, LocalTimerFuture};
use super::{Duration, Instant};
use crate::executor::current_timer_queue;
pub struct Timer {
inner: LocalTimerFuture<'static>,
}
impl Timer {
pub fn at(when: Instant) -> Self {
Self {
inner: current_timer_queue().deadline(when.into_ticks()),
}
}
pub fn after(dur: Duration) -> Self {
Self::at(Instant::now() + dur)
}
}
impl Future for Timer {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().inner) }.poll(cx)
}
}

View File

@ -0,0 +1,44 @@
/// Monotonic clock
pub trait Clock {
/// Return the current timestamp in ticks.
/// This is guaranteed to be monotonic, i.e. a call to now() will always return
/// a greater or equal value than earler calls.
fn now(&self) -> u64;
}
impl<T: Clock + ?Sized> Clock for &T {
fn now(&self) -> u64 {
T::now(self)
}
}
/// Trait to register a callback at a given timestamp.
pub trait Alarm {
/// Sets the callback function to be called when the alarm triggers.
/// The callback may be called from any context (interrupt or thread mode).
fn set_callback(&self, callback: fn());
/// Sets an alarm at the given timestamp. When the clock reaches that
/// timestamp, the provided callback funcion will be called.
///
/// When callback is called, it is guaranteed that now() will return a value greater or equal than timestamp.
///
/// Only one alarm can be active at a time. This overwrites any previously-set alarm if any.
fn set(&self, timestamp: u64);
/// Clears the previously-set alarm.
/// If no alarm was set, this is a noop.
fn clear(&self);
}
impl<T: Alarm + ?Sized> Alarm for &T {
fn set_callback(&self, callback: fn()) {
T::set_callback(self, callback);
}
fn set(&self, timestamp: u64) {
T::set(self, timestamp);
}
fn clear(&self) {
T::clear(self)
}
}

View File

@ -26,5 +26,5 @@ panic-probe = "0.1.0"
nrf52840-hal = { version = "0.11.0" } nrf52840-hal = { version = "0.11.0" }
embassy = { version = "0.1.0", path = "../embassy" } embassy = { version = "0.1.0", path = "../embassy" }
embassy-nrf = { version = "0.1.0", path = "../embassy-nrf", features = ["defmt-trace", "52840"] } embassy-nrf = { version = "0.1.0", path = "../embassy-nrf", features = ["defmt-trace", "52840"] }
static-executor = { version = "0.1.0", features=["defmt"]} futures = { version = "0.3.5", default-features = false }
futures = { version = "0.3.5", default-features = false } cortex-m-rtic = { git = "https://github.com/rtic-rs/cortex-m-rtic", branch = "master"}

View File

@ -13,7 +13,7 @@ use embassy_nrf::gpiote;
use futures::pin_mut; use futures::pin_mut;
use nrf52840_hal::gpio; use nrf52840_hal::gpio;
use static_executor::{task, Executor}; use embassy::executor::{task, Executor};
static EXECUTOR: Executor = Executor::new(|| cortex_m::asm::sev()); static EXECUTOR: Executor = Executor::new(|| cortex_m::asm::sev());
#[task] #[task]

View File

@ -11,7 +11,7 @@ use embassy::flash::Flash;
use embassy_nrf::qspi; use embassy_nrf::qspi;
use nrf52840_hal::gpio; use nrf52840_hal::gpio;
use static_executor::{task, Executor}; use embassy::executor::{task, Executor};
static EXECUTOR: Executor = Executor::new(|| cortex_m::asm::sev()); static EXECUTOR: Executor = Executor::new(|| cortex_m::asm::sev());
const PAGE_SIZE: usize = 4096; const PAGE_SIZE: usize = 4096;

View File

@ -12,7 +12,7 @@ use embassy_nrf::uarte;
use futures::pin_mut; use futures::pin_mut;
use nrf52840_hal::gpio; use nrf52840_hal::gpio;
use static_executor::{task, Executor}; use embassy::executor::{task, Executor};
static EXECUTOR: Executor = Executor::new(|| cortex_m::asm::sev()); static EXECUTOR: Executor = Executor::new(|| cortex_m::asm::sev());
#[task] #[task]