Merge #891
891: Async Pipe r=Dirbaio a=Dirbaio Co-authored-by: Dario Nieuwenhuis <dirbaio@dirbaio.net>
This commit is contained in:
commit
ba67f6d3a8
@ -6,11 +6,14 @@ edition = "2021"
|
|||||||
[package.metadata.embassy_docs]
|
[package.metadata.embassy_docs]
|
||||||
src_base = "https://github.com/embassy-rs/embassy/blob/embassy-util-v$VERSION/embassy-util/src/"
|
src_base = "https://github.com/embassy-rs/embassy/blob/embassy-util-v$VERSION/embassy-util/src/"
|
||||||
src_base_git = "https://github.com/embassy-rs/embassy/blob/$COMMIT/embassy-util/src/"
|
src_base_git = "https://github.com/embassy-rs/embassy/blob/$COMMIT/embassy-util/src/"
|
||||||
features = ["nightly", "defmt", "unstable-traits", "time", "time-tick-1mhz"]
|
features = ["nightly"]
|
||||||
flavors = [
|
flavors = [
|
||||||
{ name = "default", target = "x86_64-unknown-linux-gnu" },
|
{ name = "default", target = "x86_64-unknown-linux-gnu" },
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[features]
|
||||||
|
nightly = ["embedded-io/async"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
defmt = { version = "0.3", optional = true }
|
defmt = { version = "0.3", optional = true }
|
||||||
log = { version = "0.4.14", optional = true }
|
log = { version = "0.4.14", optional = true }
|
||||||
@ -20,6 +23,7 @@ atomic-polyfill = "0.1.5"
|
|||||||
critical-section = "0.2.5"
|
critical-section = "0.2.5"
|
||||||
heapless = "0.7.5"
|
heapless = "0.7.5"
|
||||||
cfg-if = "1.0.0"
|
cfg-if = "1.0.0"
|
||||||
|
embedded-io = "0.3.0"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
futures-executor = { version = "0.3.17", features = [ "thread-pool" ] }
|
futures-executor = { version = "0.3.17", features = [ "thread-pool" ] }
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
#![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)]
|
#![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)]
|
||||||
#![cfg_attr(feature = "nightly", feature(generic_associated_types, type_alias_impl_trait))]
|
#![cfg_attr(feature = "nightly", feature(generic_associated_types, type_alias_impl_trait))]
|
||||||
#![cfg_attr(all(feature = "nightly", target_arch = "xtensa"), feature(asm_experimental_arch))]
|
|
||||||
#![allow(clippy::new_without_default)]
|
#![allow(clippy::new_without_default)]
|
||||||
#![doc = include_str!("../../README.md")]
|
#![doc = include_str!("../../README.md")]
|
||||||
#![warn(missing_docs)]
|
#![warn(missing_docs)]
|
||||||
@ -8,9 +7,13 @@
|
|||||||
// This mod MUST go first, so that the others see its macros.
|
// This mod MUST go first, so that the others see its macros.
|
||||||
pub(crate) mod fmt;
|
pub(crate) mod fmt;
|
||||||
|
|
||||||
|
// internal use
|
||||||
|
mod ring_buffer;
|
||||||
|
|
||||||
pub mod blocking_mutex;
|
pub mod blocking_mutex;
|
||||||
pub mod channel;
|
pub mod channel;
|
||||||
pub mod mutex;
|
pub mod mutex;
|
||||||
|
pub mod pipe;
|
||||||
pub mod waitqueue;
|
pub mod waitqueue;
|
||||||
|
|
||||||
mod forever;
|
mod forever;
|
||||||
|
551
embassy-util/src/pipe.rs
Normal file
551
embassy-util/src/pipe.rs
Normal file
@ -0,0 +1,551 @@
|
|||||||
|
//! Async byte stream pipe.
|
||||||
|
|
||||||
|
use core::cell::RefCell;
|
||||||
|
use core::future::Future;
|
||||||
|
use core::pin::Pin;
|
||||||
|
use core::task::{Context, Poll};
|
||||||
|
|
||||||
|
use crate::blocking_mutex::raw::RawMutex;
|
||||||
|
use crate::blocking_mutex::Mutex;
|
||||||
|
use crate::ring_buffer::RingBuffer;
|
||||||
|
use crate::waitqueue::WakerRegistration;
|
||||||
|
|
||||||
|
/// Write-only access to a [`Pipe`].
|
||||||
|
#[derive(Copy)]
|
||||||
|
pub struct Writer<'p, M, const N: usize>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
pipe: &'p Pipe<M, N>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'p, M, const N: usize> Clone for Writer<'p, M, N>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Writer { pipe: self.pipe }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'p, M, const N: usize> Writer<'p, M, N>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
/// Writes a value.
|
||||||
|
///
|
||||||
|
/// See [`Pipe::write()`]
|
||||||
|
pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> {
|
||||||
|
self.pipe.write(buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Attempt to immediately write a message.
|
||||||
|
///
|
||||||
|
/// See [`Pipe::write()`]
|
||||||
|
pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
|
||||||
|
self.pipe.try_write(buf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Future returned by [`Pipe::write`] and [`Writer::write`].
|
||||||
|
pub struct WriteFuture<'p, M, const N: usize>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
pipe: &'p Pipe<M, N>,
|
||||||
|
buf: &'p [u8],
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'p, M, const N: usize> Future for WriteFuture<'p, M, N>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
type Output = usize;
|
||||||
|
|
||||||
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
match self.pipe.try_write_with_context(Some(cx), self.buf) {
|
||||||
|
Ok(n) => Poll::Ready(n),
|
||||||
|
Err(TryWriteError::Full) => Poll::Pending,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'p, M, const N: usize> Unpin for WriteFuture<'p, M, N> where M: RawMutex {}
|
||||||
|
|
||||||
|
/// Read-only access to a [`Pipe`].
|
||||||
|
#[derive(Copy)]
|
||||||
|
pub struct Reader<'p, M, const N: usize>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
pipe: &'p Pipe<M, N>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'p, M, const N: usize> Clone for Reader<'p, M, N>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Reader { pipe: self.pipe }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'p, M, const N: usize> Reader<'p, M, N>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
/// Reads a value.
|
||||||
|
///
|
||||||
|
/// See [`Pipe::read()`]
|
||||||
|
pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> {
|
||||||
|
self.pipe.read(buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Attempt to immediately read a message.
|
||||||
|
///
|
||||||
|
/// See [`Pipe::read()`]
|
||||||
|
pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
|
||||||
|
self.pipe.try_read(buf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Future returned by [`Pipe::read`] and [`Reader::read`].
|
||||||
|
pub struct ReadFuture<'p, M, const N: usize>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
pipe: &'p Pipe<M, N>,
|
||||||
|
buf: &'p mut [u8],
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'p, M, const N: usize> Future for ReadFuture<'p, M, N>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
type Output = usize;
|
||||||
|
|
||||||
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
match self.pipe.try_read_with_context(Some(cx), self.buf) {
|
||||||
|
Ok(n) => Poll::Ready(n),
|
||||||
|
Err(TryReadError::Empty) => Poll::Pending,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'p, M, const N: usize> Unpin for ReadFuture<'p, M, N> where M: RawMutex {}
|
||||||
|
|
||||||
|
/// Error returned by [`try_read`](Pipe::try_read).
|
||||||
|
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
|
||||||
|
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
|
||||||
|
pub enum TryReadError {
|
||||||
|
/// No data could be read from the pipe because it is currently
|
||||||
|
/// empty, and reading would require blocking.
|
||||||
|
Empty,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Error returned by [`try_write`](Pipe::try_write).
|
||||||
|
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
|
||||||
|
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
|
||||||
|
pub enum TryWriteError {
|
||||||
|
/// No data could be written to the pipe because it is
|
||||||
|
/// currently full, and writing would require blocking.
|
||||||
|
Full,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct PipeState<const N: usize> {
|
||||||
|
buffer: RingBuffer<N>,
|
||||||
|
read_waker: WakerRegistration,
|
||||||
|
write_waker: WakerRegistration,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<const N: usize> PipeState<N> {
|
||||||
|
const fn new() -> Self {
|
||||||
|
PipeState {
|
||||||
|
buffer: RingBuffer::new(),
|
||||||
|
read_waker: WakerRegistration::new(),
|
||||||
|
write_waker: WakerRegistration::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn clear(&mut self) {
|
||||||
|
self.buffer.clear();
|
||||||
|
self.write_waker.wake();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_read(&mut self, buf: &mut [u8]) -> Result<usize, TryReadError> {
|
||||||
|
self.try_read_with_context(None, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_read_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> {
|
||||||
|
if self.buffer.is_full() {
|
||||||
|
self.write_waker.wake();
|
||||||
|
}
|
||||||
|
|
||||||
|
let available = self.buffer.pop_buf();
|
||||||
|
if available.is_empty() {
|
||||||
|
if let Some(cx) = cx {
|
||||||
|
self.read_waker.register(cx.waker());
|
||||||
|
}
|
||||||
|
return Err(TryReadError::Empty);
|
||||||
|
}
|
||||||
|
|
||||||
|
let n = available.len().min(buf.len());
|
||||||
|
buf[..n].copy_from_slice(&available[..n]);
|
||||||
|
self.buffer.pop(n);
|
||||||
|
Ok(n)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_write(&mut self, buf: &[u8]) -> Result<usize, TryWriteError> {
|
||||||
|
self.try_write_with_context(None, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_write_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> {
|
||||||
|
if self.buffer.is_empty() {
|
||||||
|
self.read_waker.wake();
|
||||||
|
}
|
||||||
|
|
||||||
|
let available = self.buffer.push_buf();
|
||||||
|
if available.is_empty() {
|
||||||
|
if let Some(cx) = cx {
|
||||||
|
self.write_waker.register(cx.waker());
|
||||||
|
}
|
||||||
|
return Err(TryWriteError::Full);
|
||||||
|
}
|
||||||
|
|
||||||
|
let n = available.len().min(buf.len());
|
||||||
|
available[..n].copy_from_slice(&buf[..n]);
|
||||||
|
self.buffer.push(n);
|
||||||
|
Ok(n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A bounded pipe for communicating between asynchronous tasks
|
||||||
|
/// with backpressure.
|
||||||
|
///
|
||||||
|
/// The pipe will buffer up to the provided number of messages. Once the
|
||||||
|
/// buffer is full, attempts to `write` new messages will wait until a message is
|
||||||
|
/// read from the pipe.
|
||||||
|
///
|
||||||
|
/// All data written will become available in the same order as it was written.
|
||||||
|
pub struct Pipe<M, const N: usize>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
inner: Mutex<M, RefCell<PipeState<N>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M, const N: usize> Pipe<M, N>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
/// Establish a new bounded pipe. For example, to create one with a NoopMutex:
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// use embassy_util::pipe::Pipe;
|
||||||
|
/// use embassy_util::blocking_mutex::raw::NoopRawMutex;
|
||||||
|
///
|
||||||
|
/// // Declare a bounded pipe, with a buffer of 256 bytes.
|
||||||
|
/// let mut pipe = Pipe::<NoopRawMutex, 256>::new();
|
||||||
|
/// ```
|
||||||
|
pub const fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
inner: Mutex::new(RefCell::new(PipeState::new())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn lock<R>(&self, f: impl FnOnce(&mut PipeState<N>) -> R) -> R {
|
||||||
|
self.inner.lock(|rc| f(&mut *rc.borrow_mut()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> {
|
||||||
|
self.lock(|c| c.try_read_with_context(cx, buf))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> {
|
||||||
|
self.lock(|c| c.try_write_with_context(cx, buf))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get a writer for this pipe.
|
||||||
|
pub fn writer(&self) -> Writer<'_, M, N> {
|
||||||
|
Writer { pipe: self }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get a reader for this pipe.
|
||||||
|
pub fn reader(&self) -> Reader<'_, M, N> {
|
||||||
|
Reader { pipe: self }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Write a value, waiting until there is capacity.
|
||||||
|
///
|
||||||
|
/// Writeing completes when the value has been pushed to the pipe's queue.
|
||||||
|
/// This doesn't mean the value has been read yet.
|
||||||
|
pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> {
|
||||||
|
WriteFuture { pipe: self, buf }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Attempt to immediately write a message.
|
||||||
|
///
|
||||||
|
/// This method differs from [`write`](Pipe::write) by returning immediately if the pipe's
|
||||||
|
/// buffer is full, instead of waiting.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// If the pipe capacity has been reached, i.e., the pipe has `n`
|
||||||
|
/// buffered values where `n` is the argument passed to [`Pipe`], then an
|
||||||
|
/// error is returned.
|
||||||
|
pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
|
||||||
|
self.lock(|c| c.try_write(buf))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Receive the next value.
|
||||||
|
///
|
||||||
|
/// If there are no messages in the pipe's buffer, this method will
|
||||||
|
/// wait until a message is written.
|
||||||
|
pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> {
|
||||||
|
ReadFuture { pipe: self, buf }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Attempt to immediately read a message.
|
||||||
|
///
|
||||||
|
/// This method will either read a message from the pipe immediately or return an error
|
||||||
|
/// if the pipe is empty.
|
||||||
|
pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
|
||||||
|
self.lock(|c| c.try_read(buf))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Clear the data in the pipe's buffer.
|
||||||
|
pub fn clear(&self) {
|
||||||
|
self.lock(|c| c.clear())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return whether the pipe is full (no free space in the buffer)
|
||||||
|
pub fn is_full(&self) -> bool {
|
||||||
|
self.len() == N
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return whether the pipe is empty (no data buffered)
|
||||||
|
pub fn is_empty(&self) -> bool {
|
||||||
|
self.len() == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Total byte capacity.
|
||||||
|
///
|
||||||
|
/// This is the same as the `N` generic param.
|
||||||
|
pub fn capacity(&self) -> usize {
|
||||||
|
N
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Used byte capacity.
|
||||||
|
pub fn len(&self) -> usize {
|
||||||
|
self.lock(|c| c.buffer.len())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Free byte capacity.
|
||||||
|
///
|
||||||
|
/// This is equivalent to `capacity() - len()`
|
||||||
|
pub fn free_capacity(&self) -> usize {
|
||||||
|
N - self.len()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "nightly")]
|
||||||
|
mod io_impls {
|
||||||
|
use core::convert::Infallible;
|
||||||
|
|
||||||
|
use futures_util::FutureExt;
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
impl<M: RawMutex, const N: usize> embedded_io::Io for Pipe<M, N> {
|
||||||
|
type Error = Infallible;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for Pipe<M, N> {
|
||||||
|
type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
|
||||||
|
where
|
||||||
|
Self: 'a;
|
||||||
|
|
||||||
|
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> {
|
||||||
|
Pipe::read(self, buf).map(Ok)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for Pipe<M, N> {
|
||||||
|
type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
|
||||||
|
where
|
||||||
|
Self: 'a;
|
||||||
|
|
||||||
|
fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> {
|
||||||
|
Pipe::write(self, buf).map(Ok)
|
||||||
|
}
|
||||||
|
|
||||||
|
type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>>
|
||||||
|
where
|
||||||
|
Self: 'a;
|
||||||
|
|
||||||
|
fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> {
|
||||||
|
futures_util::future::ready(Ok(()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M: RawMutex, const N: usize> embedded_io::Io for &Pipe<M, N> {
|
||||||
|
type Error = Infallible;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for &Pipe<M, N> {
|
||||||
|
type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
|
||||||
|
where
|
||||||
|
Self: 'a;
|
||||||
|
|
||||||
|
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> {
|
||||||
|
Pipe::read(self, buf).map(Ok)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for &Pipe<M, N> {
|
||||||
|
type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
|
||||||
|
where
|
||||||
|
Self: 'a;
|
||||||
|
|
||||||
|
fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> {
|
||||||
|
Pipe::write(self, buf).map(Ok)
|
||||||
|
}
|
||||||
|
|
||||||
|
type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>>
|
||||||
|
where
|
||||||
|
Self: 'a;
|
||||||
|
|
||||||
|
fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> {
|
||||||
|
futures_util::future::ready(Ok(()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M: RawMutex, const N: usize> embedded_io::Io for Reader<'_, M, N> {
|
||||||
|
type Error = Infallible;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for Reader<'_, M, N> {
|
||||||
|
type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
|
||||||
|
where
|
||||||
|
Self: 'a;
|
||||||
|
|
||||||
|
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> {
|
||||||
|
Reader::read(self, buf).map(Ok)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M: RawMutex, const N: usize> embedded_io::Io for Writer<'_, M, N> {
|
||||||
|
type Error = Infallible;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for Writer<'_, M, N> {
|
||||||
|
type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
|
||||||
|
where
|
||||||
|
Self: 'a;
|
||||||
|
|
||||||
|
fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> {
|
||||||
|
Writer::write(self, buf).map(Ok)
|
||||||
|
}
|
||||||
|
|
||||||
|
type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>>
|
||||||
|
where
|
||||||
|
Self: 'a;
|
||||||
|
|
||||||
|
fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> {
|
||||||
|
futures_util::future::ready(Ok(()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use futures_executor::ThreadPool;
|
||||||
|
use futures_util::task::SpawnExt;
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
|
||||||
|
use crate::Forever;
|
||||||
|
|
||||||
|
fn capacity<const N: usize>(c: &PipeState<N>) -> usize {
|
||||||
|
N - c.buffer.len()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn writing_once() {
|
||||||
|
let mut c = PipeState::<3>::new();
|
||||||
|
assert!(c.try_write(&[1]).is_ok());
|
||||||
|
assert_eq!(capacity(&c), 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn writing_when_full() {
|
||||||
|
let mut c = PipeState::<3>::new();
|
||||||
|
assert_eq!(c.try_write(&[42]), Ok(1));
|
||||||
|
assert_eq!(c.try_write(&[43]), Ok(1));
|
||||||
|
assert_eq!(c.try_write(&[44]), Ok(1));
|
||||||
|
assert_eq!(c.try_write(&[45]), Err(TryWriteError::Full));
|
||||||
|
assert_eq!(capacity(&c), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn receiving_once_with_one_send() {
|
||||||
|
let mut c = PipeState::<3>::new();
|
||||||
|
assert!(c.try_write(&[42]).is_ok());
|
||||||
|
let mut buf = [0; 16];
|
||||||
|
assert_eq!(c.try_read(&mut buf), Ok(1));
|
||||||
|
assert_eq!(buf[0], 42);
|
||||||
|
assert_eq!(capacity(&c), 3);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn receiving_when_empty() {
|
||||||
|
let mut c = PipeState::<3>::new();
|
||||||
|
let mut buf = [0; 16];
|
||||||
|
assert_eq!(c.try_read(&mut buf), Err(TryReadError::Empty));
|
||||||
|
assert_eq!(capacity(&c), 3);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn simple_send_and_receive() {
|
||||||
|
let c = Pipe::<NoopRawMutex, 3>::new();
|
||||||
|
assert!(c.try_write(&[42]).is_ok());
|
||||||
|
let mut buf = [0; 16];
|
||||||
|
assert_eq!(c.try_read(&mut buf), Ok(1));
|
||||||
|
assert_eq!(buf[0], 42);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn cloning() {
|
||||||
|
let c = Pipe::<NoopRawMutex, 3>::new();
|
||||||
|
let r1 = c.reader();
|
||||||
|
let w1 = c.writer();
|
||||||
|
|
||||||
|
let _ = r1.clone();
|
||||||
|
let _ = w1.clone();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[futures_test::test]
|
||||||
|
async fn receiver_receives_given_try_write_async() {
|
||||||
|
let executor = ThreadPool::new().unwrap();
|
||||||
|
|
||||||
|
static CHANNEL: Forever<Pipe<CriticalSectionRawMutex, 3>> = Forever::new();
|
||||||
|
let c = &*CHANNEL.put(Pipe::new());
|
||||||
|
let c2 = c;
|
||||||
|
let f = async move {
|
||||||
|
assert_eq!(c2.try_write(&[42]), Ok(1));
|
||||||
|
};
|
||||||
|
executor.spawn(f).unwrap();
|
||||||
|
let mut buf = [0; 16];
|
||||||
|
assert_eq!(c.read(&mut buf).await, 1);
|
||||||
|
assert_eq!(buf[0], 42);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[futures_test::test]
|
||||||
|
async fn sender_send_completes_if_capacity() {
|
||||||
|
let c = Pipe::<CriticalSectionRawMutex, 1>::new();
|
||||||
|
c.write(&[42]).await;
|
||||||
|
let mut buf = [0; 16];
|
||||||
|
assert_eq!(c.read(&mut buf).await, 1);
|
||||||
|
assert_eq!(buf[0], 42);
|
||||||
|
}
|
||||||
|
}
|
146
embassy-util/src/ring_buffer.rs
Normal file
146
embassy-util/src/ring_buffer.rs
Normal file
@ -0,0 +1,146 @@
|
|||||||
|
pub struct RingBuffer<const N: usize> {
|
||||||
|
buf: [u8; N],
|
||||||
|
start: usize,
|
||||||
|
end: usize,
|
||||||
|
empty: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<const N: usize> RingBuffer<N> {
|
||||||
|
pub const fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
buf: [0; N],
|
||||||
|
start: 0,
|
||||||
|
end: 0,
|
||||||
|
empty: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn push_buf(&mut self) -> &mut [u8] {
|
||||||
|
if self.start == self.end && !self.empty {
|
||||||
|
trace!(" ringbuf: push_buf empty");
|
||||||
|
return &mut self.buf[..0];
|
||||||
|
}
|
||||||
|
|
||||||
|
let n = if self.start <= self.end {
|
||||||
|
self.buf.len() - self.end
|
||||||
|
} else {
|
||||||
|
self.start - self.end
|
||||||
|
};
|
||||||
|
|
||||||
|
trace!(" ringbuf: push_buf {:?}..{:?}", self.end, self.end + n);
|
||||||
|
&mut self.buf[self.end..self.end + n]
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn push(&mut self, n: usize) {
|
||||||
|
trace!(" ringbuf: push {:?}", n);
|
||||||
|
if n == 0 {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.end = self.wrap(self.end + n);
|
||||||
|
self.empty = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn pop_buf(&mut self) -> &mut [u8] {
|
||||||
|
if self.empty {
|
||||||
|
trace!(" ringbuf: pop_buf empty");
|
||||||
|
return &mut self.buf[..0];
|
||||||
|
}
|
||||||
|
|
||||||
|
let n = if self.end <= self.start {
|
||||||
|
self.buf.len() - self.start
|
||||||
|
} else {
|
||||||
|
self.end - self.start
|
||||||
|
};
|
||||||
|
|
||||||
|
trace!(" ringbuf: pop_buf {:?}..{:?}", self.start, self.start + n);
|
||||||
|
&mut self.buf[self.start..self.start + n]
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn pop(&mut self, n: usize) {
|
||||||
|
trace!(" ringbuf: pop {:?}", n);
|
||||||
|
if n == 0 {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.start = self.wrap(self.start + n);
|
||||||
|
self.empty = self.start == self.end;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_full(&self) -> bool {
|
||||||
|
self.start == self.end && !self.empty
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_empty(&self) -> bool {
|
||||||
|
self.empty
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(unused)]
|
||||||
|
pub fn len(&self) -> usize {
|
||||||
|
if self.empty {
|
||||||
|
0
|
||||||
|
} else if self.start < self.end {
|
||||||
|
self.end - self.start
|
||||||
|
} else {
|
||||||
|
N + self.end - self.start
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn clear(&mut self) {
|
||||||
|
self.start = 0;
|
||||||
|
self.end = 0;
|
||||||
|
self.empty = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn wrap(&self, n: usize) -> usize {
|
||||||
|
assert!(n <= self.buf.len());
|
||||||
|
if n == self.buf.len() {
|
||||||
|
0
|
||||||
|
} else {
|
||||||
|
n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn push_pop() {
|
||||||
|
let mut rb: RingBuffer<4> = RingBuffer::new();
|
||||||
|
let buf = rb.push_buf();
|
||||||
|
assert_eq!(4, buf.len());
|
||||||
|
buf[0] = 1;
|
||||||
|
buf[1] = 2;
|
||||||
|
buf[2] = 3;
|
||||||
|
buf[3] = 4;
|
||||||
|
rb.push(4);
|
||||||
|
|
||||||
|
let buf = rb.pop_buf();
|
||||||
|
assert_eq!(4, buf.len());
|
||||||
|
assert_eq!(1, buf[0]);
|
||||||
|
rb.pop(1);
|
||||||
|
|
||||||
|
let buf = rb.pop_buf();
|
||||||
|
assert_eq!(3, buf.len());
|
||||||
|
assert_eq!(2, buf[0]);
|
||||||
|
rb.pop(1);
|
||||||
|
|
||||||
|
let buf = rb.pop_buf();
|
||||||
|
assert_eq!(2, buf.len());
|
||||||
|
assert_eq!(3, buf[0]);
|
||||||
|
rb.pop(1);
|
||||||
|
|
||||||
|
let buf = rb.pop_buf();
|
||||||
|
assert_eq!(1, buf.len());
|
||||||
|
assert_eq!(4, buf[0]);
|
||||||
|
rb.pop(1);
|
||||||
|
|
||||||
|
let buf = rb.pop_buf();
|
||||||
|
assert_eq!(0, buf.len());
|
||||||
|
|
||||||
|
let buf = rb.push_buf();
|
||||||
|
assert_eq!(4, buf.len());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user