diff --git a/embassy-util/Cargo.toml b/embassy-util/Cargo.toml index 32b796c0..ef5acc0f 100644 --- a/embassy-util/Cargo.toml +++ b/embassy-util/Cargo.toml @@ -6,11 +6,14 @@ edition = "2021" [package.metadata.embassy_docs] src_base = "https://github.com/embassy-rs/embassy/blob/embassy-util-v$VERSION/embassy-util/src/" src_base_git = "https://github.com/embassy-rs/embassy/blob/$COMMIT/embassy-util/src/" -features = ["nightly", "defmt", "unstable-traits", "time", "time-tick-1mhz"] +features = ["nightly"] flavors = [ { name = "default", target = "x86_64-unknown-linux-gnu" }, ] +[features] +nightly = ["embedded-io/async"] + [dependencies] defmt = { version = "0.3", optional = true } log = { version = "0.4.14", optional = true } @@ -20,6 +23,7 @@ atomic-polyfill = "0.1.5" critical-section = "0.2.5" heapless = "0.7.5" cfg-if = "1.0.0" +embedded-io = "0.3.0" [dev-dependencies] futures-executor = { version = "0.3.17", features = [ "thread-pool" ] } diff --git a/embassy-util/src/lib.rs b/embassy-util/src/lib.rs index 07b1633e..110c7281 100644 --- a/embassy-util/src/lib.rs +++ b/embassy-util/src/lib.rs @@ -1,6 +1,5 @@ #![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)] #![cfg_attr(feature = "nightly", feature(generic_associated_types, type_alias_impl_trait))] -#![cfg_attr(all(feature = "nightly", target_arch = "xtensa"), feature(asm_experimental_arch))] #![allow(clippy::new_without_default)] #![doc = include_str!("../../README.md")] #![warn(missing_docs)] @@ -8,9 +7,13 @@ // This mod MUST go first, so that the others see its macros. pub(crate) mod fmt; +// internal use +mod ring_buffer; + pub mod blocking_mutex; pub mod channel; pub mod mutex; +pub mod pipe; pub mod waitqueue; mod forever; diff --git a/embassy-util/src/pipe.rs b/embassy-util/src/pipe.rs new file mode 100644 index 00000000..9c20aeef --- /dev/null +++ b/embassy-util/src/pipe.rs @@ -0,0 +1,551 @@ +//! Async byte stream pipe. + +use core::cell::RefCell; +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; + +use crate::blocking_mutex::raw::RawMutex; +use crate::blocking_mutex::Mutex; +use crate::ring_buffer::RingBuffer; +use crate::waitqueue::WakerRegistration; + +/// Write-only access to a [`Pipe`]. +#[derive(Copy)] +pub struct Writer<'p, M, const N: usize> +where + M: RawMutex, +{ + pipe: &'p Pipe, +} + +impl<'p, M, const N: usize> Clone for Writer<'p, M, N> +where + M: RawMutex, +{ + fn clone(&self) -> Self { + Writer { pipe: self.pipe } + } +} + +impl<'p, M, const N: usize> Writer<'p, M, N> +where + M: RawMutex, +{ + /// Writes a value. + /// + /// See [`Pipe::write()`] + pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> { + self.pipe.write(buf) + } + + /// Attempt to immediately write a message. + /// + /// See [`Pipe::write()`] + pub fn try_write(&self, buf: &[u8]) -> Result { + self.pipe.try_write(buf) + } +} + +/// Future returned by [`Pipe::write`] and [`Writer::write`]. +pub struct WriteFuture<'p, M, const N: usize> +where + M: RawMutex, +{ + pipe: &'p Pipe, + buf: &'p [u8], +} + +impl<'p, M, const N: usize> Future for WriteFuture<'p, M, N> +where + M: RawMutex, +{ + type Output = usize; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.pipe.try_write_with_context(Some(cx), self.buf) { + Ok(n) => Poll::Ready(n), + Err(TryWriteError::Full) => Poll::Pending, + } + } +} + +impl<'p, M, const N: usize> Unpin for WriteFuture<'p, M, N> where M: RawMutex {} + +/// Read-only access to a [`Pipe`]. +#[derive(Copy)] +pub struct Reader<'p, M, const N: usize> +where + M: RawMutex, +{ + pipe: &'p Pipe, +} + +impl<'p, M, const N: usize> Clone for Reader<'p, M, N> +where + M: RawMutex, +{ + fn clone(&self) -> Self { + Reader { pipe: self.pipe } + } +} + +impl<'p, M, const N: usize> Reader<'p, M, N> +where + M: RawMutex, +{ + /// Reads a value. + /// + /// See [`Pipe::read()`] + pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> { + self.pipe.read(buf) + } + + /// Attempt to immediately read a message. + /// + /// See [`Pipe::read()`] + pub fn try_read(&self, buf: &mut [u8]) -> Result { + self.pipe.try_read(buf) + } +} + +/// Future returned by [`Pipe::read`] and [`Reader::read`]. +pub struct ReadFuture<'p, M, const N: usize> +where + M: RawMutex, +{ + pipe: &'p Pipe, + buf: &'p mut [u8], +} + +impl<'p, M, const N: usize> Future for ReadFuture<'p, M, N> +where + M: RawMutex, +{ + type Output = usize; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.pipe.try_read_with_context(Some(cx), self.buf) { + Ok(n) => Poll::Ready(n), + Err(TryReadError::Empty) => Poll::Pending, + } + } +} + +impl<'p, M, const N: usize> Unpin for ReadFuture<'p, M, N> where M: RawMutex {} + +/// Error returned by [`try_read`](Pipe::try_read). +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum TryReadError { + /// No data could be read from the pipe because it is currently + /// empty, and reading would require blocking. + Empty, +} + +/// Error returned by [`try_write`](Pipe::try_write). +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum TryWriteError { + /// No data could be written to the pipe because it is + /// currently full, and writing would require blocking. + Full, +} + +struct PipeState { + buffer: RingBuffer, + read_waker: WakerRegistration, + write_waker: WakerRegistration, +} + +impl PipeState { + const fn new() -> Self { + PipeState { + buffer: RingBuffer::new(), + read_waker: WakerRegistration::new(), + write_waker: WakerRegistration::new(), + } + } + + fn clear(&mut self) { + self.buffer.clear(); + self.write_waker.wake(); + } + + fn try_read(&mut self, buf: &mut [u8]) -> Result { + self.try_read_with_context(None, buf) + } + + fn try_read_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result { + if self.buffer.is_full() { + self.write_waker.wake(); + } + + let available = self.buffer.pop_buf(); + if available.is_empty() { + if let Some(cx) = cx { + self.read_waker.register(cx.waker()); + } + return Err(TryReadError::Empty); + } + + let n = available.len().min(buf.len()); + buf[..n].copy_from_slice(&available[..n]); + self.buffer.pop(n); + Ok(n) + } + + fn try_write(&mut self, buf: &[u8]) -> Result { + self.try_write_with_context(None, buf) + } + + fn try_write_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result { + if self.buffer.is_empty() { + self.read_waker.wake(); + } + + let available = self.buffer.push_buf(); + if available.is_empty() { + if let Some(cx) = cx { + self.write_waker.register(cx.waker()); + } + return Err(TryWriteError::Full); + } + + let n = available.len().min(buf.len()); + available[..n].copy_from_slice(&buf[..n]); + self.buffer.push(n); + Ok(n) + } +} + +/// A bounded pipe for communicating between asynchronous tasks +/// with backpressure. +/// +/// The pipe will buffer up to the provided number of messages. Once the +/// buffer is full, attempts to `write` new messages will wait until a message is +/// read from the pipe. +/// +/// All data written will become available in the same order as it was written. +pub struct Pipe +where + M: RawMutex, +{ + inner: Mutex>>, +} + +impl Pipe +where + M: RawMutex, +{ + /// Establish a new bounded pipe. For example, to create one with a NoopMutex: + /// + /// ``` + /// use embassy_util::pipe::Pipe; + /// use embassy_util::blocking_mutex::raw::NoopRawMutex; + /// + /// // Declare a bounded pipe, with a buffer of 256 bytes. + /// let mut pipe = Pipe::::new(); + /// ``` + pub const fn new() -> Self { + Self { + inner: Mutex::new(RefCell::new(PipeState::new())), + } + } + + fn lock(&self, f: impl FnOnce(&mut PipeState) -> R) -> R { + self.inner.lock(|rc| f(&mut *rc.borrow_mut())) + } + + fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result { + self.lock(|c| c.try_read_with_context(cx, buf)) + } + + fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result { + self.lock(|c| c.try_write_with_context(cx, buf)) + } + + /// Get a writer for this pipe. + pub fn writer(&self) -> Writer<'_, M, N> { + Writer { pipe: self } + } + + /// Get a reader for this pipe. + pub fn reader(&self) -> Reader<'_, M, N> { + Reader { pipe: self } + } + + /// Write a value, waiting until there is capacity. + /// + /// Writeing completes when the value has been pushed to the pipe's queue. + /// This doesn't mean the value has been read yet. + pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> { + WriteFuture { pipe: self, buf } + } + + /// Attempt to immediately write a message. + /// + /// This method differs from [`write`](Pipe::write) by returning immediately if the pipe's + /// buffer is full, instead of waiting. + /// + /// # Errors + /// + /// If the pipe capacity has been reached, i.e., the pipe has `n` + /// buffered values where `n` is the argument passed to [`Pipe`], then an + /// error is returned. + pub fn try_write(&self, buf: &[u8]) -> Result { + self.lock(|c| c.try_write(buf)) + } + + /// Receive the next value. + /// + /// If there are no messages in the pipe's buffer, this method will + /// wait until a message is written. + pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> { + ReadFuture { pipe: self, buf } + } + + /// Attempt to immediately read a message. + /// + /// This method will either read a message from the pipe immediately or return an error + /// if the pipe is empty. + pub fn try_read(&self, buf: &mut [u8]) -> Result { + self.lock(|c| c.try_read(buf)) + } + + /// Clear the data in the pipe's buffer. + pub fn clear(&self) { + self.lock(|c| c.clear()) + } + + /// Return whether the pipe is full (no free space in the buffer) + pub fn is_full(&self) -> bool { + self.len() == N + } + + /// Return whether the pipe is empty (no data buffered) + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Total byte capacity. + /// + /// This is the same as the `N` generic param. + pub fn capacity(&self) -> usize { + N + } + + /// Used byte capacity. + pub fn len(&self) -> usize { + self.lock(|c| c.buffer.len()) + } + + /// Free byte capacity. + /// + /// This is equivalent to `capacity() - len()` + pub fn free_capacity(&self) -> usize { + N - self.len() + } +} + +#[cfg(feature = "nightly")] +mod io_impls { + use core::convert::Infallible; + + use futures_util::FutureExt; + + use super::*; + + impl embedded_io::Io for Pipe { + type Error = Infallible; + } + + impl embedded_io::asynch::Read for Pipe { + type ReadFuture<'a> = impl Future> + where + Self: 'a; + + fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { + Pipe::read(self, buf).map(Ok) + } + } + + impl embedded_io::asynch::Write for Pipe { + type WriteFuture<'a> = impl Future> + where + Self: 'a; + + fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { + Pipe::write(self, buf).map(Ok) + } + + type FlushFuture<'a> = impl Future> + where + Self: 'a; + + fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { + futures_util::future::ready(Ok(())) + } + } + + impl embedded_io::Io for &Pipe { + type Error = Infallible; + } + + impl embedded_io::asynch::Read for &Pipe { + type ReadFuture<'a> = impl Future> + where + Self: 'a; + + fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { + Pipe::read(self, buf).map(Ok) + } + } + + impl embedded_io::asynch::Write for &Pipe { + type WriteFuture<'a> = impl Future> + where + Self: 'a; + + fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { + Pipe::write(self, buf).map(Ok) + } + + type FlushFuture<'a> = impl Future> + where + Self: 'a; + + fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { + futures_util::future::ready(Ok(())) + } + } + + impl embedded_io::Io for Reader<'_, M, N> { + type Error = Infallible; + } + + impl embedded_io::asynch::Read for Reader<'_, M, N> { + type ReadFuture<'a> = impl Future> + where + Self: 'a; + + fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { + Reader::read(self, buf).map(Ok) + } + } + + impl embedded_io::Io for Writer<'_, M, N> { + type Error = Infallible; + } + + impl embedded_io::asynch::Write for Writer<'_, M, N> { + type WriteFuture<'a> = impl Future> + where + Self: 'a; + + fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { + Writer::write(self, buf).map(Ok) + } + + type FlushFuture<'a> = impl Future> + where + Self: 'a; + + fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { + futures_util::future::ready(Ok(())) + } + } +} + +#[cfg(test)] +mod tests { + use futures_executor::ThreadPool; + use futures_util::task::SpawnExt; + + use super::*; + use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; + use crate::Forever; + + fn capacity(c: &PipeState) -> usize { + N - c.buffer.len() + } + + #[test] + fn writing_once() { + let mut c = PipeState::<3>::new(); + assert!(c.try_write(&[1]).is_ok()); + assert_eq!(capacity(&c), 2); + } + + #[test] + fn writing_when_full() { + let mut c = PipeState::<3>::new(); + assert_eq!(c.try_write(&[42]), Ok(1)); + assert_eq!(c.try_write(&[43]), Ok(1)); + assert_eq!(c.try_write(&[44]), Ok(1)); + assert_eq!(c.try_write(&[45]), Err(TryWriteError::Full)); + assert_eq!(capacity(&c), 0); + } + + #[test] + fn receiving_once_with_one_send() { + let mut c = PipeState::<3>::new(); + assert!(c.try_write(&[42]).is_ok()); + let mut buf = [0; 16]; + assert_eq!(c.try_read(&mut buf), Ok(1)); + assert_eq!(buf[0], 42); + assert_eq!(capacity(&c), 3); + } + + #[test] + fn receiving_when_empty() { + let mut c = PipeState::<3>::new(); + let mut buf = [0; 16]; + assert_eq!(c.try_read(&mut buf), Err(TryReadError::Empty)); + assert_eq!(capacity(&c), 3); + } + + #[test] + fn simple_send_and_receive() { + let c = Pipe::::new(); + assert!(c.try_write(&[42]).is_ok()); + let mut buf = [0; 16]; + assert_eq!(c.try_read(&mut buf), Ok(1)); + assert_eq!(buf[0], 42); + } + + #[test] + fn cloning() { + let c = Pipe::::new(); + let r1 = c.reader(); + let w1 = c.writer(); + + let _ = r1.clone(); + let _ = w1.clone(); + } + + #[futures_test::test] + async fn receiver_receives_given_try_write_async() { + let executor = ThreadPool::new().unwrap(); + + static CHANNEL: Forever> = Forever::new(); + let c = &*CHANNEL.put(Pipe::new()); + let c2 = c; + let f = async move { + assert_eq!(c2.try_write(&[42]), Ok(1)); + }; + executor.spawn(f).unwrap(); + let mut buf = [0; 16]; + assert_eq!(c.read(&mut buf).await, 1); + assert_eq!(buf[0], 42); + } + + #[futures_test::test] + async fn sender_send_completes_if_capacity() { + let c = Pipe::::new(); + c.write(&[42]).await; + let mut buf = [0; 16]; + assert_eq!(c.read(&mut buf).await, 1); + assert_eq!(buf[0], 42); + } +} diff --git a/embassy-util/src/ring_buffer.rs b/embassy-util/src/ring_buffer.rs new file mode 100644 index 00000000..52108402 --- /dev/null +++ b/embassy-util/src/ring_buffer.rs @@ -0,0 +1,146 @@ +pub struct RingBuffer { + buf: [u8; N], + start: usize, + end: usize, + empty: bool, +} + +impl RingBuffer { + pub const fn new() -> Self { + Self { + buf: [0; N], + start: 0, + end: 0, + empty: true, + } + } + + pub fn push_buf(&mut self) -> &mut [u8] { + if self.start == self.end && !self.empty { + trace!(" ringbuf: push_buf empty"); + return &mut self.buf[..0]; + } + + let n = if self.start <= self.end { + self.buf.len() - self.end + } else { + self.start - self.end + }; + + trace!(" ringbuf: push_buf {:?}..{:?}", self.end, self.end + n); + &mut self.buf[self.end..self.end + n] + } + + pub fn push(&mut self, n: usize) { + trace!(" ringbuf: push {:?}", n); + if n == 0 { + return; + } + + self.end = self.wrap(self.end + n); + self.empty = false; + } + + pub fn pop_buf(&mut self) -> &mut [u8] { + if self.empty { + trace!(" ringbuf: pop_buf empty"); + return &mut self.buf[..0]; + } + + let n = if self.end <= self.start { + self.buf.len() - self.start + } else { + self.end - self.start + }; + + trace!(" ringbuf: pop_buf {:?}..{:?}", self.start, self.start + n); + &mut self.buf[self.start..self.start + n] + } + + pub fn pop(&mut self, n: usize) { + trace!(" ringbuf: pop {:?}", n); + if n == 0 { + return; + } + + self.start = self.wrap(self.start + n); + self.empty = self.start == self.end; + } + + pub fn is_full(&self) -> bool { + self.start == self.end && !self.empty + } + + pub fn is_empty(&self) -> bool { + self.empty + } + + #[allow(unused)] + pub fn len(&self) -> usize { + if self.empty { + 0 + } else if self.start < self.end { + self.end - self.start + } else { + N + self.end - self.start + } + } + + pub fn clear(&mut self) { + self.start = 0; + self.end = 0; + self.empty = true; + } + + fn wrap(&self, n: usize) -> usize { + assert!(n <= self.buf.len()); + if n == self.buf.len() { + 0 + } else { + n + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn push_pop() { + let mut rb: RingBuffer<4> = RingBuffer::new(); + let buf = rb.push_buf(); + assert_eq!(4, buf.len()); + buf[0] = 1; + buf[1] = 2; + buf[2] = 3; + buf[3] = 4; + rb.push(4); + + let buf = rb.pop_buf(); + assert_eq!(4, buf.len()); + assert_eq!(1, buf[0]); + rb.pop(1); + + let buf = rb.pop_buf(); + assert_eq!(3, buf.len()); + assert_eq!(2, buf[0]); + rb.pop(1); + + let buf = rb.pop_buf(); + assert_eq!(2, buf.len()); + assert_eq!(3, buf[0]); + rb.pop(1); + + let buf = rb.pop_buf(); + assert_eq!(1, buf.len()); + assert_eq!(4, buf[0]); + rb.pop(1); + + let buf = rb.pop_buf(); + assert_eq!(0, buf.len()); + + let buf = rb.push_buf(); + assert_eq!(4, buf.len()); + } +}