//! Async byte stream pipe. use core::cell::{RefCell, UnsafeCell}; use core::future::Future; use core::ops::Range; use core::pin::Pin; use core::task::{Context, Poll}; use crate::blocking_mutex::raw::RawMutex; use crate::blocking_mutex::Mutex; use crate::ring_buffer::RingBuffer; use crate::waitqueue::WakerRegistration; /// Write-only access to a [`Pipe`]. pub struct Writer<'p, M, const N: usize> where M: RawMutex, { pipe: &'p Pipe, } impl<'p, M, const N: usize> Clone for Writer<'p, M, N> where M: RawMutex, { fn clone(&self) -> Self { Writer { pipe: self.pipe } } } impl<'p, M, const N: usize> Copy for Writer<'p, M, N> where M: RawMutex {} impl<'p, M, const N: usize> Writer<'p, M, N> where M: RawMutex, { /// Write some bytes to the pipe. /// /// See [`Pipe::write()`] pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> { self.pipe.write(buf) } /// Attempt to immediately write some bytes to the pipe. /// /// See [`Pipe::try_write()`] pub fn try_write(&self, buf: &[u8]) -> Result { self.pipe.try_write(buf) } } /// Future returned by [`Pipe::write`] and [`Writer::write`]. #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct WriteFuture<'p, M, const N: usize> where M: RawMutex, { pipe: &'p Pipe, buf: &'p [u8], } impl<'p, M, const N: usize> Future for WriteFuture<'p, M, N> where M: RawMutex, { type Output = usize; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.pipe.try_write_with_context(Some(cx), self.buf) { Ok(n) => Poll::Ready(n), Err(TryWriteError::Full) => Poll::Pending, } } } impl<'p, M, const N: usize> Unpin for WriteFuture<'p, M, N> where M: RawMutex {} /// Read-only access to a [`Pipe`]. pub struct Reader<'p, M, const N: usize> where M: RawMutex, { pipe: &'p Pipe, } impl<'p, M, const N: usize> Reader<'p, M, N> where M: RawMutex, { /// Read some bytes from the pipe. /// /// See [`Pipe::read()`] pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> { self.pipe.read(buf) } /// Attempt to immediately read some bytes from the pipe. /// /// See [`Pipe::try_read()`] pub fn try_read(&self, buf: &mut [u8]) -> Result { self.pipe.try_read(buf) } /// Return the contents of the internal buffer, filling it with more data from the inner reader if it is empty. /// /// If no bytes are currently available to read, this function waits until at least one byte is available. /// /// If the reader is at end-of-file (EOF), an empty slice is returned. pub fn fill_buf(&mut self) -> FillBufFuture<'_, M, N> { FillBufFuture { pipe: Some(self.pipe) } } /// Try returning contents of the internal buffer. /// /// If no bytes are currently available to read, this function returns `Err(TryReadError::Empty)`. /// /// If the reader is at end-of-file (EOF), an empty slice is returned. pub fn try_fill_buf(&mut self) -> Result<&[u8], TryReadError> { unsafe { self.pipe.try_fill_buf_with_context(None) } } /// Tell this buffer that `amt` bytes have been consumed from the buffer, so they should no longer be returned in calls to `fill_buf`. pub fn consume(&mut self, amt: usize) { self.pipe.consume(amt) } } /// Future returned by [`Pipe::read`] and [`Reader::read`]. #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct ReadFuture<'p, M, const N: usize> where M: RawMutex, { pipe: &'p Pipe, buf: &'p mut [u8], } impl<'p, M, const N: usize> Future for ReadFuture<'p, M, N> where M: RawMutex, { type Output = usize; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.pipe.try_read_with_context(Some(cx), self.buf) { Ok(n) => Poll::Ready(n), Err(TryReadError::Empty) => Poll::Pending, } } } impl<'p, M, const N: usize> Unpin for ReadFuture<'p, M, N> where M: RawMutex {} /// Future returned by [`Pipe::fill_buf`] and [`Reader::fill_buf`]. #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct FillBufFuture<'p, M, const N: usize> where M: RawMutex, { pipe: Option<&'p Pipe>, } impl<'p, M, const N: usize> Future for FillBufFuture<'p, M, N> where M: RawMutex, { type Output = &'p [u8]; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let pipe = self.pipe.take().unwrap(); match unsafe { pipe.try_fill_buf_with_context(Some(cx)) } { Ok(buf) => Poll::Ready(buf), Err(TryReadError::Empty) => { self.pipe = Some(pipe); Poll::Pending } } } } impl<'p, M, const N: usize> Unpin for FillBufFuture<'p, M, N> where M: RawMutex {} /// Error returned by [`try_read`](Pipe::try_read). #[derive(PartialEq, Eq, Clone, Copy, Debug)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] pub enum TryReadError { /// No data could be read from the pipe because it is currently /// empty, and reading would require blocking. Empty, } /// Error returned by [`try_write`](Pipe::try_write). #[derive(PartialEq, Eq, Clone, Copy, Debug)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] pub enum TryWriteError { /// No data could be written to the pipe because it is /// currently full, and writing would require blocking. Full, } struct PipeState { buffer: RingBuffer, read_waker: WakerRegistration, write_waker: WakerRegistration, } #[repr(transparent)] struct Buffer(UnsafeCell<[u8; N]>); impl Buffer { unsafe fn get<'a>(&self, r: Range) -> &'a [u8] { let p = self.0.get() as *const u8; core::slice::from_raw_parts(p.add(r.start), r.end - r.start) } unsafe fn get_mut<'a>(&self, r: Range) -> &'a mut [u8] { let p = self.0.get() as *mut u8; core::slice::from_raw_parts_mut(p.add(r.start), r.end - r.start) } } unsafe impl Send for Buffer {} unsafe impl Sync for Buffer {} /// A bounded byte-oriented pipe for communicating between asynchronous tasks /// with backpressure. /// /// The pipe will buffer up to the provided number of bytes. Once the /// buffer is full, attempts to `write` new bytes will wait until buffer space is freed up. /// /// All data written will become available in the same order as it was written. pub struct Pipe where M: RawMutex, { buf: Buffer, inner: Mutex>>, } impl Pipe where M: RawMutex, { /// Establish a new bounded pipe. For example, to create one with a NoopMutex: /// /// ``` /// use embassy_sync::pipe::Pipe; /// use embassy_sync::blocking_mutex::raw::NoopRawMutex; /// /// // Declare a bounded pipe, with a buffer of 256 bytes. /// let mut pipe = Pipe::::new(); /// ``` pub const fn new() -> Self { Self { buf: Buffer(UnsafeCell::new([0; N])), inner: Mutex::new(RefCell::new(PipeState { buffer: RingBuffer::new(), read_waker: WakerRegistration::new(), write_waker: WakerRegistration::new(), })), } } fn lock(&self, f: impl FnOnce(&mut PipeState) -> R) -> R { self.inner.lock(|rc| f(&mut *rc.borrow_mut())) } fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result { self.inner.lock(|rc: &RefCell>| { let s = &mut *rc.borrow_mut(); if s.buffer.is_full() { s.write_waker.wake(); } let available = unsafe { self.buf.get(s.buffer.pop_buf()) }; if available.is_empty() { if let Some(cx) = cx { s.read_waker.register(cx.waker()); } return Err(TryReadError::Empty); } let n = available.len().min(buf.len()); buf[..n].copy_from_slice(&available[..n]); s.buffer.pop(n); Ok(n) }) } // safety: While the returned slice is alive, // no `read` or `consume` methods in the pipe must be called. unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError> { self.inner.lock(|rc: &RefCell>| { let s = &mut *rc.borrow_mut(); if s.buffer.is_full() { s.write_waker.wake(); } let available = unsafe { self.buf.get(s.buffer.pop_buf()) }; if available.is_empty() { if let Some(cx) = cx { s.read_waker.register(cx.waker()); } return Err(TryReadError::Empty); } Ok(available) }) } fn consume(&self, amt: usize) { self.inner.lock(|rc: &RefCell>| { let s = &mut *rc.borrow_mut(); let available = s.buffer.pop_buf(); assert!(amt <= available.len()); s.buffer.pop(amt); }) } fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result { self.inner.lock(|rc: &RefCell>| { let s = &mut *rc.borrow_mut(); if s.buffer.is_empty() { s.read_waker.wake(); } let available = unsafe { self.buf.get_mut(s.buffer.push_buf()) }; if available.is_empty() { if let Some(cx) = cx { s.write_waker.register(cx.waker()); } return Err(TryWriteError::Full); } let n = available.len().min(buf.len()); available[..n].copy_from_slice(&buf[..n]); s.buffer.push(n); Ok(n) }) } /// Split this pipe into a BufRead-capable reader and a writer. /// /// The reader and writer borrow the current pipe mutably, so it is not /// possible to use it directly while they exist. This is needed because /// implementing `BufRead` requires there is a single reader. /// /// The writer is cloneable, the reader is not. pub fn split(&mut self) -> (Reader<'_, M, N>, Writer<'_, M, N>) { (Reader { pipe: self }, Writer { pipe: self }) } /// Write some bytes to the pipe. /// /// This method writes a nonzero amount of bytes from `buf` into the pipe, and /// returns the amount of bytes written. /// /// If it is not possible to write a nonzero amount of bytes because the pipe's buffer is full, /// this method will wait until it isn't. See [`try_write`](Self::try_write) for a variant that /// returns an error instead of waiting. /// /// It is not guaranteed that all bytes in the buffer are written, even if there's enough /// free space in the pipe buffer for all. In other words, it is possible for `write` to return /// without writing all of `buf` (returning a number less than `buf.len()`) and still leave /// free space in the pipe buffer. You should always `write` in a loop, or use helpers like /// `write_all` from the `embedded-io` crate. pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> { WriteFuture { pipe: self, buf } } /// Write all bytes to the pipe. /// /// This method writes all bytes from `buf` into the pipe pub async fn write_all(&self, mut buf: &[u8]) { while !buf.is_empty() { let n = self.write(buf).await; buf = &buf[n..]; } } /// Attempt to immediately write some bytes to the pipe. /// /// This method will either write a nonzero amount of bytes to the pipe immediately, /// or return an error if the pipe is empty. See [`write`](Self::write) for a variant /// that waits instead of returning an error. pub fn try_write(&self, buf: &[u8]) -> Result { self.try_write_with_context(None, buf) } /// Read some bytes from the pipe. /// /// This method reads a nonzero amount of bytes from the pipe into `buf` and /// returns the amount of bytes read. /// /// If it is not possible to read a nonzero amount of bytes because the pipe's buffer is empty, /// this method will wait until it isn't. See [`try_read`](Self::try_read) for a variant that /// returns an error instead of waiting. /// /// It is not guaranteed that all bytes in the buffer are read, even if there's enough /// space in `buf` for all. In other words, it is possible for `read` to return /// without filling `buf` (returning a number less than `buf.len()`) and still leave bytes /// in the pipe buffer. You should always `read` in a loop, or use helpers like /// `read_exact` from the `embedded-io` crate. pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> { ReadFuture { pipe: self, buf } } /// Attempt to immediately read some bytes from the pipe. /// /// This method will either read a nonzero amount of bytes from the pipe immediately, /// or return an error if the pipe is empty. See [`read`](Self::read) for a variant /// that waits instead of returning an error. pub fn try_read(&self, buf: &mut [u8]) -> Result { self.try_read_with_context(None, buf) } /// Clear the data in the pipe's buffer. pub fn clear(&self) { self.inner.lock(|rc: &RefCell>| { let s = &mut *rc.borrow_mut(); s.buffer.clear(); s.write_waker.wake(); }) } /// Return whether the pipe is full (no free space in the buffer) pub fn is_full(&self) -> bool { self.len() == N } /// Return whether the pipe is empty (no data buffered) pub fn is_empty(&self) -> bool { self.len() == 0 } /// Total byte capacity. /// /// This is the same as the `N` generic param. pub fn capacity(&self) -> usize { N } /// Used byte capacity. pub fn len(&self) -> usize { self.lock(|c| c.buffer.len()) } /// Free byte capacity. /// /// This is equivalent to `capacity() - len()` pub fn free_capacity(&self) -> usize { N - self.len() } } #[cfg(feature = "nightly")] mod io_impls { use core::convert::Infallible; use super::*; impl embedded_io_async::ErrorType for Pipe { type Error = Infallible; } impl embedded_io_async::Read for Pipe { async fn read(&mut self, buf: &mut [u8]) -> Result { Ok(Pipe::read(self, buf).await) } } impl embedded_io_async::Write for Pipe { async fn write(&mut self, buf: &[u8]) -> Result { Ok(Pipe::write(self, buf).await) } async fn flush(&mut self) -> Result<(), Self::Error> { Ok(()) } } impl embedded_io_async::ErrorType for &Pipe { type Error = Infallible; } impl embedded_io_async::Read for &Pipe { async fn read(&mut self, buf: &mut [u8]) -> Result { Ok(Pipe::read(self, buf).await) } } impl embedded_io_async::Write for &Pipe { async fn write(&mut self, buf: &[u8]) -> Result { Ok(Pipe::write(self, buf).await) } async fn flush(&mut self) -> Result<(), Self::Error> { Ok(()) } } impl embedded_io_async::ErrorType for Reader<'_, M, N> { type Error = Infallible; } impl embedded_io_async::Read for Reader<'_, M, N> { async fn read(&mut self, buf: &mut [u8]) -> Result { Ok(Reader::read(self, buf).await) } } impl embedded_io_async::BufRead for Reader<'_, M, N> { async fn fill_buf(&mut self) -> Result<&[u8], Self::Error> { Ok(Reader::fill_buf(self).await) } fn consume(&mut self, amt: usize) { Reader::consume(self, amt) } } impl embedded_io_async::ErrorType for Writer<'_, M, N> { type Error = Infallible; } impl embedded_io_async::Write for Writer<'_, M, N> { async fn write(&mut self, buf: &[u8]) -> Result { Ok(Writer::write(self, buf).await) } async fn flush(&mut self) -> Result<(), Self::Error> { Ok(()) } } } #[cfg(test)] mod tests { use futures_executor::ThreadPool; use futures_util::task::SpawnExt; use static_cell::StaticCell; use super::*; use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; #[test] fn writing_once() { let c = Pipe::::new(); assert!(c.try_write(&[1]).is_ok()); assert_eq!(c.free_capacity(), 2); } #[test] fn writing_when_full() { let c = Pipe::::new(); assert_eq!(c.try_write(&[42]), Ok(1)); assert_eq!(c.try_write(&[43]), Ok(1)); assert_eq!(c.try_write(&[44]), Ok(1)); assert_eq!(c.try_write(&[45]), Err(TryWriteError::Full)); assert_eq!(c.free_capacity(), 0); } #[test] fn receiving_once_with_one_send() { let c = Pipe::::new(); assert!(c.try_write(&[42]).is_ok()); let mut buf = [0; 16]; assert_eq!(c.try_read(&mut buf), Ok(1)); assert_eq!(buf[0], 42); assert_eq!(c.free_capacity(), 3); } #[test] fn receiving_when_empty() { let c = Pipe::::new(); let mut buf = [0; 16]; assert_eq!(c.try_read(&mut buf), Err(TryReadError::Empty)); assert_eq!(c.free_capacity(), 3); } #[test] fn simple_send_and_receive() { let c = Pipe::::new(); assert!(c.try_write(&[42]).is_ok()); let mut buf = [0; 16]; assert_eq!(c.try_read(&mut buf), Ok(1)); assert_eq!(buf[0], 42); } #[test] fn read_buf() { let mut c = Pipe::::new(); let (mut r, w) = c.split(); assert!(w.try_write(&[42, 43]).is_ok()); let buf = r.try_fill_buf().unwrap(); assert_eq!(buf, &[42, 43]); let buf = r.try_fill_buf().unwrap(); assert_eq!(buf, &[42, 43]); r.consume(1); let buf = r.try_fill_buf().unwrap(); assert_eq!(buf, &[43]); r.consume(1); assert_eq!(r.try_fill_buf(), Err(TryReadError::Empty)); assert_eq!(w.try_write(&[44, 45, 46]), Ok(1)); assert_eq!(w.try_write(&[45, 46]), Ok(2)); let buf = r.try_fill_buf().unwrap(); assert_eq!(buf, &[44]); // only one byte due to wraparound. r.consume(1); let buf = r.try_fill_buf().unwrap(); assert_eq!(buf, &[45, 46]); assert!(w.try_write(&[47]).is_ok()); let buf = r.try_fill_buf().unwrap(); assert_eq!(buf, &[45, 46, 47]); r.consume(3); } #[test] fn writer_is_cloneable() { let mut c = Pipe::::new(); let (_r, w) = c.split(); let _ = w.clone(); } #[futures_test::test] async fn receiver_receives_given_try_write_async() { let executor = ThreadPool::new().unwrap(); static CHANNEL: StaticCell> = StaticCell::new(); let c = &*CHANNEL.init(Pipe::new()); let c2 = c; let f = async move { assert_eq!(c2.try_write(&[42]), Ok(1)); }; executor.spawn(f).unwrap(); let mut buf = [0; 16]; assert_eq!(c.read(&mut buf).await, 1); assert_eq!(buf[0], 42); } #[futures_test::test] async fn sender_send_completes_if_capacity() { let c = Pipe::::new(); c.write(&[42]).await; let mut buf = [0; 16]; assert_eq!(c.read(&mut buf).await, 1); assert_eq!(buf[0], 42); } }