util: add pipe
This commit is contained in:
		| @@ -8,9 +8,13 @@ | ||||
| // This mod MUST go first, so that the others see its macros. | ||||
| pub(crate) mod fmt; | ||||
|  | ||||
| // internal use | ||||
| mod ring_buffer; | ||||
|  | ||||
| pub mod blocking_mutex; | ||||
| pub mod channel; | ||||
| pub mod mutex; | ||||
| pub mod pipe; | ||||
| pub mod waitqueue; | ||||
|  | ||||
| mod forever; | ||||
|   | ||||
							
								
								
									
										413
									
								
								embassy-util/src/pipe.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										413
									
								
								embassy-util/src/pipe.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,413 @@ | ||||
| //! Async byte stream pipe. | ||||
|  | ||||
| use core::cell::RefCell; | ||||
| use core::future::Future; | ||||
| use core::pin::Pin; | ||||
| use core::task::{Context, Poll}; | ||||
|  | ||||
| use crate::blocking_mutex::raw::RawMutex; | ||||
| use crate::blocking_mutex::Mutex; | ||||
| use crate::ring_buffer::RingBuffer; | ||||
| use crate::waitqueue::WakerRegistration; | ||||
|  | ||||
| /// Write-only access to a [`Pipe`]. | ||||
| #[derive(Copy)] | ||||
| pub struct Writer<'p, M, const N: usize> | ||||
| where | ||||
|     M: RawMutex, | ||||
| { | ||||
|     pipe: &'p Pipe<M, N>, | ||||
| } | ||||
|  | ||||
| impl<'p, M, const N: usize> Clone for Writer<'p, M, N> | ||||
| where | ||||
|     M: RawMutex, | ||||
| { | ||||
|     fn clone(&self) -> Self { | ||||
|         Writer { pipe: self.pipe } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<'p, M, const N: usize> Writer<'p, M, N> | ||||
| where | ||||
|     M: RawMutex, | ||||
| { | ||||
|     /// Writes a value. | ||||
|     /// | ||||
|     /// See [`Pipe::write()`] | ||||
|     pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> { | ||||
|         self.pipe.write(buf) | ||||
|     } | ||||
|  | ||||
|     /// Attempt to immediately write a message. | ||||
|     /// | ||||
|     /// See [`Pipe::write()`] | ||||
|     pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> { | ||||
|         self.pipe.try_write(buf) | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Future returned by [`Pipe::write`] and  [`Writer::write`]. | ||||
| pub struct WriteFuture<'p, M, const N: usize> | ||||
| where | ||||
|     M: RawMutex, | ||||
| { | ||||
|     pipe: &'p Pipe<M, N>, | ||||
|     buf: &'p [u8], | ||||
| } | ||||
|  | ||||
| impl<'p, M, const N: usize> Future for WriteFuture<'p, M, N> | ||||
| where | ||||
|     M: RawMutex, | ||||
| { | ||||
|     type Output = usize; | ||||
|  | ||||
|     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||||
|         match self.pipe.try_write_with_context(Some(cx), self.buf) { | ||||
|             Ok(n) => Poll::Ready(n), | ||||
|             Err(TryWriteError::Full) => Poll::Pending, | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<'p, M, const N: usize> Unpin for WriteFuture<'p, M, N> where M: RawMutex {} | ||||
|  | ||||
| /// Read-only access to a [`Pipe`]. | ||||
| #[derive(Copy)] | ||||
| pub struct Reader<'p, M, const N: usize> | ||||
| where | ||||
|     M: RawMutex, | ||||
| { | ||||
|     pipe: &'p Pipe<M, N>, | ||||
| } | ||||
|  | ||||
| impl<'p, M, const N: usize> Clone for Reader<'p, M, N> | ||||
| where | ||||
|     M: RawMutex, | ||||
| { | ||||
|     fn clone(&self) -> Self { | ||||
|         Reader { pipe: self.pipe } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<'p, M, const N: usize> Reader<'p, M, N> | ||||
| where | ||||
|     M: RawMutex, | ||||
| { | ||||
|     /// Reads a value. | ||||
|     /// | ||||
|     /// See [`Pipe::read()`] | ||||
|     pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> { | ||||
|         self.pipe.read(buf) | ||||
|     } | ||||
|  | ||||
|     /// Attempt to immediately read a message. | ||||
|     /// | ||||
|     /// See [`Pipe::read()`] | ||||
|     pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> { | ||||
|         self.pipe.try_read(buf) | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Future returned by [`Pipe::read`] and  [`Reader::read`]. | ||||
| pub struct ReadFuture<'p, M, const N: usize> | ||||
| where | ||||
|     M: RawMutex, | ||||
| { | ||||
|     pipe: &'p Pipe<M, N>, | ||||
|     buf: &'p mut [u8], | ||||
| } | ||||
|  | ||||
| impl<'p, M, const N: usize> Future for ReadFuture<'p, M, N> | ||||
| where | ||||
|     M: RawMutex, | ||||
| { | ||||
|     type Output = usize; | ||||
|  | ||||
|     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||||
|         match self.pipe.try_read_with_context(Some(cx), self.buf) { | ||||
|             Ok(n) => Poll::Ready(n), | ||||
|             Err(TryReadError::Empty) => Poll::Pending, | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<'p, M, const N: usize> Unpin for ReadFuture<'p, M, N> where M: RawMutex {} | ||||
|  | ||||
| /// Error returned by [`try_read`](Pipe::try_read). | ||||
| #[derive(PartialEq, Eq, Clone, Copy, Debug)] | ||||
| #[cfg_attr(feature = "defmt", derive(defmt::Format))] | ||||
| pub enum TryReadError { | ||||
|     /// No data could be read from the pipe because it is currently | ||||
|     /// empty, and reading would require blocking. | ||||
|     Empty, | ||||
| } | ||||
|  | ||||
| /// Error returned by [`try_write`](Pipe::try_write). | ||||
| #[derive(PartialEq, Eq, Clone, Copy, Debug)] | ||||
| #[cfg_attr(feature = "defmt", derive(defmt::Format))] | ||||
| pub enum TryWriteError { | ||||
|     /// No data could be written to the pipe because it is | ||||
|     /// currently full, and writing would require blocking. | ||||
|     Full, | ||||
| } | ||||
|  | ||||
| struct PipeState<const N: usize> { | ||||
|     buffer: RingBuffer<N>, | ||||
|     read_waker: WakerRegistration, | ||||
|     write_waker: WakerRegistration, | ||||
| } | ||||
|  | ||||
| impl<const N: usize> PipeState<N> { | ||||
|     const fn new() -> Self { | ||||
|         PipeState { | ||||
|             buffer: RingBuffer::new(), | ||||
|             read_waker: WakerRegistration::new(), | ||||
|             write_waker: WakerRegistration::new(), | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn clear(&mut self) { | ||||
|         self.buffer.clear(); | ||||
|         self.write_waker.wake(); | ||||
|     } | ||||
|  | ||||
|     fn try_read(&mut self, buf: &mut [u8]) -> Result<usize, TryReadError> { | ||||
|         self.try_read_with_context(None, buf) | ||||
|     } | ||||
|  | ||||
|     fn try_read_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> { | ||||
|         if self.buffer.is_full() { | ||||
|             self.write_waker.wake(); | ||||
|         } | ||||
|  | ||||
|         let available = self.buffer.pop_buf(); | ||||
|         if available.is_empty() { | ||||
|             if let Some(cx) = cx { | ||||
|                 self.read_waker.register(cx.waker()); | ||||
|             } | ||||
|             return Err(TryReadError::Empty); | ||||
|         } | ||||
|  | ||||
|         let n = available.len().min(buf.len()); | ||||
|         buf[..n].copy_from_slice(&available[..n]); | ||||
|         self.buffer.pop(n); | ||||
|         Ok(n) | ||||
|     } | ||||
|  | ||||
|     fn try_write(&mut self, buf: &[u8]) -> Result<usize, TryWriteError> { | ||||
|         self.try_write_with_context(None, buf) | ||||
|     } | ||||
|  | ||||
|     fn try_write_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> { | ||||
|         if self.buffer.is_empty() { | ||||
|             self.read_waker.wake(); | ||||
|         } | ||||
|  | ||||
|         let available = self.buffer.push_buf(); | ||||
|         if available.is_empty() { | ||||
|             if let Some(cx) = cx { | ||||
|                 self.write_waker.register(cx.waker()); | ||||
|             } | ||||
|             return Err(TryWriteError::Full); | ||||
|         } | ||||
|  | ||||
|         let n = available.len().min(buf.len()); | ||||
|         available[..n].copy_from_slice(&buf[..n]); | ||||
|         self.buffer.push(n); | ||||
|         Ok(n) | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// A bounded pipe for communicating between asynchronous tasks | ||||
| /// with backpressure. | ||||
| /// | ||||
| /// The pipe will buffer up to the provided number of messages.  Once the | ||||
| /// buffer is full, attempts to `write` new messages will wait until a message is | ||||
| /// read from the pipe. | ||||
| /// | ||||
| /// All data written will become available in the same order as it was written. | ||||
| pub struct Pipe<M, const N: usize> | ||||
| where | ||||
|     M: RawMutex, | ||||
| { | ||||
|     inner: Mutex<M, RefCell<PipeState<N>>>, | ||||
| } | ||||
|  | ||||
| impl<M, const N: usize> Pipe<M, N> | ||||
| where | ||||
|     M: RawMutex, | ||||
| { | ||||
|     /// Establish a new bounded pipe. For example, to create one with a NoopMutex: | ||||
|     /// | ||||
|     /// ``` | ||||
|     /// use embassy_util::pipe::Pipe; | ||||
|     /// use embassy_util::blocking_mutex::raw::NoopRawMutex; | ||||
|     /// | ||||
|     /// // Declare a bounded pipe, with a buffer of 256 bytes. | ||||
|     /// let mut pipe = Pipe::<NoopRawMutex, 256>::new(); | ||||
|     /// ``` | ||||
|     pub const fn new() -> Self { | ||||
|         Self { | ||||
|             inner: Mutex::new(RefCell::new(PipeState::new())), | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn lock<R>(&self, f: impl FnOnce(&mut PipeState<N>) -> R) -> R { | ||||
|         self.inner.lock(|rc| f(&mut *rc.borrow_mut())) | ||||
|     } | ||||
|  | ||||
|     fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> { | ||||
|         self.lock(|c| c.try_read_with_context(cx, buf)) | ||||
|     } | ||||
|  | ||||
|     fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> { | ||||
|         self.lock(|c| c.try_write_with_context(cx, buf)) | ||||
|     } | ||||
|  | ||||
|     /// Get a writer for this pipe. | ||||
|     pub fn writer(&self) -> Writer<'_, M, N> { | ||||
|         Writer { pipe: self } | ||||
|     } | ||||
|  | ||||
|     /// Get a reader for this pipe. | ||||
|     pub fn reader(&self) -> Reader<'_, M, N> { | ||||
|         Reader { pipe: self } | ||||
|     } | ||||
|  | ||||
|     /// Write a value, waiting until there is capacity. | ||||
|     /// | ||||
|     /// Writeing completes when the value has been pushed to the pipe's queue. | ||||
|     /// This doesn't mean the value has been read yet. | ||||
|     pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> { | ||||
|         WriteFuture { pipe: self, buf } | ||||
|     } | ||||
|  | ||||
|     /// Attempt to immediately write a message. | ||||
|     /// | ||||
|     /// This method differs from [`write`](Pipe::write) by returning immediately if the pipe's | ||||
|     /// buffer is full, instead of waiting. | ||||
|     /// | ||||
|     /// # Errors | ||||
|     /// | ||||
|     /// If the pipe capacity has been reached, i.e., the pipe has `n` | ||||
|     /// buffered values where `n` is the argument passed to [`Pipe`], then an | ||||
|     /// error is returned. | ||||
|     pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> { | ||||
|         self.lock(|c| c.try_write(buf)) | ||||
|     } | ||||
|  | ||||
|     /// Receive the next value. | ||||
|     /// | ||||
|     /// If there are no messages in the pipe's buffer, this method will | ||||
|     /// wait until a message is written. | ||||
|     pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> { | ||||
|         ReadFuture { pipe: self, buf } | ||||
|     } | ||||
|  | ||||
|     /// Attempt to immediately read a message. | ||||
|     /// | ||||
|     /// This method will either read a message from the pipe immediately or return an error | ||||
|     /// if the pipe is empty. | ||||
|     pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> { | ||||
|         self.lock(|c| c.try_read(buf)) | ||||
|     } | ||||
|  | ||||
|     /// Clear the data in the pipe's buffer. | ||||
|     pub fn clear(&self) { | ||||
|         self.lock(|c| c.clear()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[cfg(test)] | ||||
| mod tests { | ||||
|     use futures_executor::ThreadPool; | ||||
|     use futures_util::task::SpawnExt; | ||||
|  | ||||
|     use super::*; | ||||
|     use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; | ||||
|     use crate::Forever; | ||||
|  | ||||
|     fn capacity<const N: usize>(c: &PipeState<N>) -> usize { | ||||
|         N - c.buffer.len() | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn writing_once() { | ||||
|         let mut c = PipeState::<3>::new(); | ||||
|         assert!(c.try_write(&[1]).is_ok()); | ||||
|         assert_eq!(capacity(&c), 2); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn writing_when_full() { | ||||
|         let mut c = PipeState::<3>::new(); | ||||
|         assert_eq!(c.try_write(&[42]), Ok(1)); | ||||
|         assert_eq!(c.try_write(&[43]), Ok(1)); | ||||
|         assert_eq!(c.try_write(&[44]), Ok(1)); | ||||
|         assert_eq!(c.try_write(&[45]), Err(TryWriteError::Full)); | ||||
|         assert_eq!(capacity(&c), 0); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn receiving_once_with_one_send() { | ||||
|         let mut c = PipeState::<3>::new(); | ||||
|         assert!(c.try_write(&[42]).is_ok()); | ||||
|         let mut buf = [0; 16]; | ||||
|         assert_eq!(c.try_read(&mut buf), Ok(1)); | ||||
|         assert_eq!(buf[0], 42); | ||||
|         assert_eq!(capacity(&c), 3); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn receiving_when_empty() { | ||||
|         let mut c = PipeState::<3>::new(); | ||||
|         let mut buf = [0; 16]; | ||||
|         assert_eq!(c.try_read(&mut buf), Err(TryReadError::Empty)); | ||||
|         assert_eq!(capacity(&c), 3); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn simple_send_and_receive() { | ||||
|         let c = Pipe::<NoopRawMutex, 3>::new(); | ||||
|         assert!(c.try_write(&[42]).is_ok()); | ||||
|         let mut buf = [0; 16]; | ||||
|         assert_eq!(c.try_read(&mut buf), Ok(1)); | ||||
|         assert_eq!(buf[0], 42); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn cloning() { | ||||
|         let c = Pipe::<NoopRawMutex, 3>::new(); | ||||
|         let r1 = c.reader(); | ||||
|         let w1 = c.writer(); | ||||
|  | ||||
|         let _ = r1.clone(); | ||||
|         let _ = w1.clone(); | ||||
|     } | ||||
|  | ||||
|     #[futures_test::test] | ||||
|     async fn receiver_receives_given_try_write_async() { | ||||
|         let executor = ThreadPool::new().unwrap(); | ||||
|  | ||||
|         static CHANNEL: Forever<Pipe<CriticalSectionRawMutex, 3>> = Forever::new(); | ||||
|         let c = &*CHANNEL.put(Pipe::new()); | ||||
|         let c2 = c; | ||||
|         let f = async move { | ||||
|             assert_eq!(c2.try_write(&[42]), Ok(1)); | ||||
|         }; | ||||
|         executor.spawn(f).unwrap(); | ||||
|         let mut buf = [0; 16]; | ||||
|         assert_eq!(c.read(&mut buf).await, 1); | ||||
|         assert_eq!(buf[0], 42); | ||||
|     } | ||||
|  | ||||
|     #[futures_test::test] | ||||
|     async fn sender_send_completes_if_capacity() { | ||||
|         let c = Pipe::<CriticalSectionRawMutex, 1>::new(); | ||||
|         c.write(&[42]).await; | ||||
|         let mut buf = [0; 16]; | ||||
|         assert_eq!(c.read(&mut buf).await, 1); | ||||
|         assert_eq!(buf[0], 42); | ||||
|     } | ||||
| } | ||||
							
								
								
									
										146
									
								
								embassy-util/src/ring_buffer.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										146
									
								
								embassy-util/src/ring_buffer.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,146 @@ | ||||
| pub struct RingBuffer<const N: usize> { | ||||
|     buf: [u8; N], | ||||
|     start: usize, | ||||
|     end: usize, | ||||
|     empty: bool, | ||||
| } | ||||
|  | ||||
| impl<const N: usize> RingBuffer<N> { | ||||
|     pub const fn new() -> Self { | ||||
|         Self { | ||||
|             buf: [0; N], | ||||
|             start: 0, | ||||
|             end: 0, | ||||
|             empty: true, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn push_buf(&mut self) -> &mut [u8] { | ||||
|         if self.start == self.end && !self.empty { | ||||
|             trace!("  ringbuf: push_buf empty"); | ||||
|             return &mut self.buf[..0]; | ||||
|         } | ||||
|  | ||||
|         let n = if self.start <= self.end { | ||||
|             self.buf.len() - self.end | ||||
|         } else { | ||||
|             self.start - self.end | ||||
|         }; | ||||
|  | ||||
|         trace!("  ringbuf: push_buf {:?}..{:?}", self.end, self.end + n); | ||||
|         &mut self.buf[self.end..self.end + n] | ||||
|     } | ||||
|  | ||||
|     pub fn push(&mut self, n: usize) { | ||||
|         trace!("  ringbuf: push {:?}", n); | ||||
|         if n == 0 { | ||||
|             return; | ||||
|         } | ||||
|  | ||||
|         self.end = self.wrap(self.end + n); | ||||
|         self.empty = false; | ||||
|     } | ||||
|  | ||||
|     pub fn pop_buf(&mut self) -> &mut [u8] { | ||||
|         if self.empty { | ||||
|             trace!("  ringbuf: pop_buf empty"); | ||||
|             return &mut self.buf[..0]; | ||||
|         } | ||||
|  | ||||
|         let n = if self.end <= self.start { | ||||
|             self.buf.len() - self.start | ||||
|         } else { | ||||
|             self.end - self.start | ||||
|         }; | ||||
|  | ||||
|         trace!("  ringbuf: pop_buf {:?}..{:?}", self.start, self.start + n); | ||||
|         &mut self.buf[self.start..self.start + n] | ||||
|     } | ||||
|  | ||||
|     pub fn pop(&mut self, n: usize) { | ||||
|         trace!("  ringbuf: pop {:?}", n); | ||||
|         if n == 0 { | ||||
|             return; | ||||
|         } | ||||
|  | ||||
|         self.start = self.wrap(self.start + n); | ||||
|         self.empty = self.start == self.end; | ||||
|     } | ||||
|  | ||||
|     pub fn is_full(&self) -> bool { | ||||
|         self.start == self.end && !self.empty | ||||
|     } | ||||
|  | ||||
|     pub fn is_empty(&self) -> bool { | ||||
|         self.empty | ||||
|     } | ||||
|  | ||||
|     #[allow(unused)] | ||||
|     pub fn len(&self) -> usize { | ||||
|         if self.empty { | ||||
|             0 | ||||
|         } else if self.start < self.end { | ||||
|             self.end - self.start | ||||
|         } else { | ||||
|             N + self.end - self.start | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn clear(&mut self) { | ||||
|         self.start = 0; | ||||
|         self.end = 0; | ||||
|         self.empty = true; | ||||
|     } | ||||
|  | ||||
|     fn wrap(&self, n: usize) -> usize { | ||||
|         assert!(n <= self.buf.len()); | ||||
|         if n == self.buf.len() { | ||||
|             0 | ||||
|         } else { | ||||
|             n | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[cfg(test)] | ||||
| mod tests { | ||||
|     use super::*; | ||||
|  | ||||
|     #[test] | ||||
|     fn push_pop() { | ||||
|         let mut rb: RingBuffer<4> = RingBuffer::new(); | ||||
|         let buf = rb.push_buf(); | ||||
|         assert_eq!(4, buf.len()); | ||||
|         buf[0] = 1; | ||||
|         buf[1] = 2; | ||||
|         buf[2] = 3; | ||||
|         buf[3] = 4; | ||||
|         rb.push(4); | ||||
|  | ||||
|         let buf = rb.pop_buf(); | ||||
|         assert_eq!(4, buf.len()); | ||||
|         assert_eq!(1, buf[0]); | ||||
|         rb.pop(1); | ||||
|  | ||||
|         let buf = rb.pop_buf(); | ||||
|         assert_eq!(3, buf.len()); | ||||
|         assert_eq!(2, buf[0]); | ||||
|         rb.pop(1); | ||||
|  | ||||
|         let buf = rb.pop_buf(); | ||||
|         assert_eq!(2, buf.len()); | ||||
|         assert_eq!(3, buf[0]); | ||||
|         rb.pop(1); | ||||
|  | ||||
|         let buf = rb.pop_buf(); | ||||
|         assert_eq!(1, buf.len()); | ||||
|         assert_eq!(4, buf[0]); | ||||
|         rb.pop(1); | ||||
|  | ||||
|         let buf = rb.pop_buf(); | ||||
|         assert_eq!(0, buf.len()); | ||||
|  | ||||
|         let buf = rb.push_buf(); | ||||
|         assert_eq!(4, buf.len()); | ||||
|     } | ||||
| } | ||||
		Reference in New Issue
	
	Block a user