From ca0d02933b2551e89f0862493be217d1fb9097f1 Mon Sep 17 00:00:00 2001 From: Scott Mabin Date: Sat, 18 Nov 2023 14:21:43 +0000 Subject: [PATCH 1/7] Priority channel using binary heap --- embassy-sync/src/lib.rs | 1 + embassy-sync/src/priority_channel.rs | 744 +++++++++++++++++++++++++++ 2 files changed, 745 insertions(+) create mode 100644 embassy-sync/src/priority_channel.rs diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs index c40fa3b6..3ffcb913 100644 --- a/embassy-sync/src/lib.rs +++ b/embassy-sync/src/lib.rs @@ -15,6 +15,7 @@ pub mod blocking_mutex; pub mod channel; pub mod mutex; pub mod pipe; +pub mod priority_channel; pub mod pubsub; pub mod signal; pub mod waitqueue; diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs new file mode 100644 index 00000000..04148b56 --- /dev/null +++ b/embassy-sync/src/priority_channel.rs @@ -0,0 +1,744 @@ +//! A queue for sending values between asynchronous tasks. +//! +//! Similar to a [`Channel`](crate::channel::Channel), however [`PriorityChannel`] sifts higher priority items to the front of the queue. + +use core::cell::RefCell; +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; + +use heapless::BinaryHeap; + +use crate::blocking_mutex::raw::RawMutex; +use crate::blocking_mutex::Mutex; +use crate::waitqueue::WakerRegistration; + +/// Send-only access to a [`PriorityChannel`]. +pub struct Sender<'ch, M, T, K, const N: usize> +where + T: Ord, + K: heapless::binary_heap::Kind, + M: RawMutex, +{ + channel: &'ch PriorityChannel, +} + +impl<'ch, M, T, K, const N: usize> Clone for Sender<'ch, M, T, K, N> +where + T: Ord, + K: heapless::binary_heap::Kind, + M: RawMutex, +{ + fn clone(&self) -> Self { + Sender { channel: self.channel } + } +} + +impl<'ch, M, T, K, const N: usize> Copy for Sender<'ch, M, T, K, N> +where + T: Ord, + K: heapless::binary_heap::Kind, + M: RawMutex, +{ +} + +impl<'ch, M, T, K, const N: usize> Sender<'ch, M, T, K, N> +where + T: Ord, + K: heapless::binary_heap::Kind, + M: RawMutex, +{ + /// Sends a value. + /// + /// See [`PriorityChannel::send()`] + pub fn send(&self, message: T) -> SendFuture<'ch, M, T, K, N> { + self.channel.send(message) + } + + /// Attempt to immediately send a message. + /// + /// See [`PriorityChannel::send()`] + pub fn try_send(&self, message: T) -> Result<(), TrySendError> { + self.channel.try_send(message) + } + + /// Allows a poll_fn to poll until the channel is ready to send + /// + /// See [`PriorityChannel::poll_ready_to_send()`] + pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { + self.channel.poll_ready_to_send(cx) + } +} + +/// Send-only access to a [`PriorityChannel`] without knowing channel size. +pub struct DynamicSender<'ch, T> { + channel: &'ch dyn DynamicChannel, +} + +impl<'ch, T> Clone for DynamicSender<'ch, T> { + fn clone(&self) -> Self { + DynamicSender { channel: self.channel } + } +} + +impl<'ch, T> Copy for DynamicSender<'ch, T> {} + +impl<'ch, M, T, K, const N: usize> From> for DynamicSender<'ch, T> +where + T: Ord, + K: heapless::binary_heap::Kind, + M: RawMutex, +{ + fn from(s: Sender<'ch, M, T, K, N>) -> Self { + Self { channel: s.channel } + } +} + +impl<'ch, T> DynamicSender<'ch, T> { + /// Sends a value. + /// + /// See [`PriorityChannel::send()`] + pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> { + DynamicSendFuture { + channel: self.channel, + message: Some(message), + } + } + + /// Attempt to immediately send a message. + /// + /// See [`PriorityChannel::send()`] + pub fn try_send(&self, message: T) -> Result<(), TrySendError> { + self.channel.try_send_with_context(message, None) + } + + /// Allows a poll_fn to poll until the channel is ready to send + /// + /// See [`PriorityChannel::poll_ready_to_send()`] + pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { + self.channel.poll_ready_to_send(cx) + } +} + +/// Receive-only access to a [`PriorityChannel`]. +pub struct Receiver<'ch, M, T, K, const N: usize> +where + T: Ord, + K: heapless::binary_heap::Kind, + M: RawMutex, +{ + channel: &'ch PriorityChannel, +} + +impl<'ch, M, T, K, const N: usize> Clone for Receiver<'ch, M, T, K, N> +where + T: Ord, + K: heapless::binary_heap::Kind, + M: RawMutex, +{ + fn clone(&self) -> Self { + Receiver { channel: self.channel } + } +} + +impl<'ch, M, T, K, const N: usize> Copy for Receiver<'ch, M, T, K, N> +where + T: Ord, + K: heapless::binary_heap::Kind, + M: RawMutex, +{ +} + +impl<'ch, M, T, K, const N: usize> Receiver<'ch, M, T, K, N> +where + T: Ord, + K: heapless::binary_heap::Kind, + M: RawMutex, +{ + /// Receive the next value. + /// + /// See [`PriorityChannel::receive()`]. + pub fn receive(&self) -> ReceiveFuture<'_, M, T, K, N> { + self.channel.receive() + } + + /// Attempt to immediately receive the next value. + /// + /// See [`PriorityChannel::try_receive()`] + pub fn try_receive(&self) -> Result { + self.channel.try_receive() + } + + /// Allows a poll_fn to poll until the channel is ready to receive + /// + /// See [`PriorityChannel::poll_ready_to_receive()`] + pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> { + self.channel.poll_ready_to_receive(cx) + } + + /// Poll the channel for the next item + /// + /// See [`PriorityChannel::poll_receive()`] + pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll { + self.channel.poll_receive(cx) + } +} + +/// Receive-only access to a [`PriorityChannel`] without knowing channel size. +pub struct DynamicReceiver<'ch, T> { + channel: &'ch dyn DynamicChannel, +} + +impl<'ch, T> Clone for DynamicReceiver<'ch, T> { + fn clone(&self) -> Self { + DynamicReceiver { channel: self.channel } + } +} + +impl<'ch, T> Copy for DynamicReceiver<'ch, T> {} + +impl<'ch, T> DynamicReceiver<'ch, T> { + /// Receive the next value. + /// + /// See [`PriorityChannel::receive()`]. + pub fn receive(&self) -> DynamicReceiveFuture<'_, T> { + DynamicReceiveFuture { channel: self.channel } + } + + /// Attempt to immediately receive the next value. + /// + /// See [`PriorityChannel::try_receive()`] + pub fn try_receive(&self) -> Result { + self.channel.try_receive_with_context(None) + } + + /// Allows a poll_fn to poll until the channel is ready to receive + /// + /// See [`PriorityChannel::poll_ready_to_receive()`] + pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> { + self.channel.poll_ready_to_receive(cx) + } + + /// Poll the channel for the next item + /// + /// See [`PriorityChannel::poll_receive()`] + pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll { + self.channel.poll_receive(cx) + } +} + +impl<'ch, M, T, K, const N: usize> From> for DynamicReceiver<'ch, T> +where + T: Ord, + K: heapless::binary_heap::Kind, + M: RawMutex, +{ + fn from(s: Receiver<'ch, M, T, K, N>) -> Self { + Self { channel: s.channel } + } +} + +/// Future returned by [`PriorityChannel::receive`] and [`Receiver::receive`]. +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct ReceiveFuture<'ch, M, T, K, const N: usize> +where + T: Ord, + K: heapless::binary_heap::Kind, + M: RawMutex, +{ + channel: &'ch PriorityChannel, +} + +impl<'ch, M, T, K, const N: usize> Future for ReceiveFuture<'ch, M, T, K, N> +where + T: Ord, + K: heapless::binary_heap::Kind, + M: RawMutex, +{ + type Output = T; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.channel.poll_receive(cx) + } +} + +/// Future returned by [`DynamicReceiver::receive`]. +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct DynamicReceiveFuture<'ch, T> { + channel: &'ch dyn DynamicChannel, +} + +impl<'ch, T> Future for DynamicReceiveFuture<'ch, T> { + type Output = T; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.channel.try_receive_with_context(Some(cx)) { + Ok(v) => Poll::Ready(v), + Err(TryReceiveError::Empty) => Poll::Pending, + } + } +} + +/// Future returned by [`PriorityChannel::send`] and [`Sender::send`]. +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct SendFuture<'ch, M, T, K, const N: usize> +where + T: Ord, + K: heapless::binary_heap::Kind, + M: RawMutex, +{ + channel: &'ch PriorityChannel, + message: Option, +} + +impl<'ch, M, T, K, const N: usize> Future for SendFuture<'ch, M, T, K, N> +where + T: Ord, + K: heapless::binary_heap::Kind, + M: RawMutex, +{ + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.message.take() { + Some(m) => match self.channel.try_send_with_context(m, Some(cx)) { + Ok(..) => Poll::Ready(()), + Err(TrySendError::Full(m)) => { + self.message = Some(m); + Poll::Pending + } + }, + None => panic!("Message cannot be None"), + } + } +} + +impl<'ch, M, T, K, const N: usize> Unpin for SendFuture<'ch, M, T, K, N> +where + T: Ord, + K: heapless::binary_heap::Kind, + M: RawMutex, +{ +} + +/// Future returned by [`DynamicSender::send`]. +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct DynamicSendFuture<'ch, T> { + channel: &'ch dyn DynamicChannel, + message: Option, +} + +impl<'ch, T> Future for DynamicSendFuture<'ch, T> { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.message.take() { + Some(m) => match self.channel.try_send_with_context(m, Some(cx)) { + Ok(..) => Poll::Ready(()), + Err(TrySendError::Full(m)) => { + self.message = Some(m); + Poll::Pending + } + }, + None => panic!("Message cannot be None"), + } + } +} + +impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {} + +trait DynamicChannel { + fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError>; + + fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result; + + fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>; + fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>; + + fn poll_receive(&self, cx: &mut Context<'_>) -> Poll; +} + +/// Error returned by [`try_receive`](PriorityChannel::try_receive). +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum TryReceiveError { + /// A message could not be received because the channel is empty. + Empty, +} + +/// Error returned by [`try_send`](PriorityChannel::try_send). +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum TrySendError { + /// The data could not be sent on the channel because the channel is + /// currently full and sending would require blocking. + Full(T), +} + +struct ChannelState { + queue: BinaryHeap, + receiver_waker: WakerRegistration, + senders_waker: WakerRegistration, +} + +impl ChannelState +where + T: Ord, + K: heapless::binary_heap::Kind, +{ + const fn new() -> Self { + ChannelState { + queue: BinaryHeap::new(), + receiver_waker: WakerRegistration::new(), + senders_waker: WakerRegistration::new(), + } + } + + fn try_receive(&mut self) -> Result { + self.try_receive_with_context(None) + } + + fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result { + if self.queue.len() == self.queue.capacity() { + self.senders_waker.wake(); + } + + if let Some(message) = self.queue.pop() { + Ok(message) + } else { + if let Some(cx) = cx { + self.receiver_waker.register(cx.waker()); + } + Err(TryReceiveError::Empty) + } + } + + fn poll_receive(&mut self, cx: &mut Context<'_>) -> Poll { + if self.queue.len() == self.queue.capacity() { + self.senders_waker.wake(); + } + + if let Some(message) = self.queue.pop() { + Poll::Ready(message) + } else { + self.receiver_waker.register(cx.waker()); + Poll::Pending + } + } + + fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> Poll<()> { + self.receiver_waker.register(cx.waker()); + + if !self.queue.is_empty() { + Poll::Ready(()) + } else { + Poll::Pending + } + } + + fn try_send(&mut self, message: T) -> Result<(), TrySendError> { + self.try_send_with_context(message, None) + } + + fn try_send_with_context(&mut self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError> { + match self.queue.push(message) { + Ok(()) => { + self.receiver_waker.wake(); + Ok(()) + } + Err(message) => { + if let Some(cx) = cx { + self.senders_waker.register(cx.waker()); + } + Err(TrySendError::Full(message)) + } + } + } + + fn poll_ready_to_send(&mut self, cx: &mut Context<'_>) -> Poll<()> { + self.senders_waker.register(cx.waker()); + + if !self.queue.len() == self.queue.capacity() { + Poll::Ready(()) + } else { + Poll::Pending + } + } +} + +/// A bounded channel for communicating between asynchronous tasks +/// with backpressure. +/// +/// The channel will buffer up to the provided number of messages. Once the +/// buffer is full, attempts to `send` new messages will wait until a message is +/// received from the channel. +/// +/// All data sent will become available in the same order as it was sent. +pub struct PriorityChannel +where + T: Ord, + K: heapless::binary_heap::Kind, + M: RawMutex, +{ + inner: Mutex>>, +} + +impl PriorityChannel +where + T: Ord, + K: heapless::binary_heap::Kind, + M: RawMutex, +{ + /// Establish a new bounded channel. For example, to create one with a NoopMutex: + /// + /// ``` + /// use embassy_sync::channel::PriorityChannel; + /// use embassy_sync::blocking_mutex::raw::NoopRawMutex; + /// + /// // Declare a bounded channel of 3 u32s. + /// let mut channel = PriorityChannel::::new(); + /// ``` + pub const fn new() -> Self { + Self { + inner: Mutex::new(RefCell::new(ChannelState::new())), + } + } + + fn lock(&self, f: impl FnOnce(&mut ChannelState) -> R) -> R { + self.inner.lock(|rc| f(&mut *unwrap!(rc.try_borrow_mut()))) + } + + fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result { + self.lock(|c| c.try_receive_with_context(cx)) + } + + /// Poll the channel for the next message + pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll { + self.lock(|c| c.poll_receive(cx)) + } + + fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError> { + self.lock(|c| c.try_send_with_context(m, cx)) + } + + /// Allows a poll_fn to poll until the channel is ready to receive + pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> { + self.lock(|c| c.poll_ready_to_receive(cx)) + } + + /// Allows a poll_fn to poll until the channel is ready to send + pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { + self.lock(|c| c.poll_ready_to_send(cx)) + } + + /// Get a sender for this channel. + pub fn sender(&self) -> Sender<'_, M, T, K, N> { + Sender { channel: self } + } + + /// Get a receiver for this channel. + pub fn receiver(&self) -> Receiver<'_, M, T, K, N> { + Receiver { channel: self } + } + + /// Send a value, waiting until there is capacity. + /// + /// Sending completes when the value has been pushed to the channel's queue. + /// This doesn't mean the value has been received yet. + pub fn send(&self, message: T) -> SendFuture<'_, M, T, K, N> { + SendFuture { + channel: self, + message: Some(message), + } + } + + /// Attempt to immediately send a message. + /// + /// This method differs from [`send`](PriorityChannel::send) by returning immediately if the channel's + /// buffer is full, instead of waiting. + /// + /// # Errors + /// + /// If the channel capacity has been reached, i.e., the channel has `n` + /// buffered values where `n` is the argument passed to [`PriorityChannel`], then an + /// error is returned. + pub fn try_send(&self, message: T) -> Result<(), TrySendError> { + self.lock(|c| c.try_send(message)) + } + + /// Receive the next value. + /// + /// If there are no messages in the channel's buffer, this method will + /// wait until a message is sent. + pub fn receive(&self) -> ReceiveFuture<'_, M, T, K, N> { + ReceiveFuture { channel: self } + } + + /// Attempt to immediately receive a message. + /// + /// This method will either receive a message from the channel immediately or return an error + /// if the channel is empty. + pub fn try_receive(&self) -> Result { + self.lock(|c| c.try_receive()) + } +} + +/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the +/// tradeoff cost of dynamic dispatch. +impl DynamicChannel for PriorityChannel +where + T: Ord, + K: heapless::binary_heap::Kind, + M: RawMutex, +{ + fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError> { + PriorityChannel::try_send_with_context(self, m, cx) + } + + fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result { + PriorityChannel::try_receive_with_context(self, cx) + } + + fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { + PriorityChannel::poll_ready_to_send(self, cx) + } + + fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> { + PriorityChannel::poll_ready_to_receive(self, cx) + } + + fn poll_receive(&self, cx: &mut Context<'_>) -> Poll { + PriorityChannel::poll_receive(self, cx) + } +} + +#[cfg(test)] +mod tests { + use core::time::Duration; + + use futures_executor::ThreadPool; + use futures_timer::Delay; + use futures_util::task::SpawnExt; + use static_cell::StaticCell; + + use super::*; + use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; + + fn capacity(c: &ChannelState) -> usize { + c.queue.capacity() - c.queue.len() + } + + #[test] + fn sending_once() { + let mut c = ChannelState::::new(); + assert!(c.try_send(1).is_ok()); + assert_eq!(capacity(&c), 2); + } + + #[test] + fn sending_when_full() { + let mut c = ChannelState::::new(); + let _ = c.try_send(1); + let _ = c.try_send(1); + let _ = c.try_send(1); + match c.try_send(2) { + Err(TrySendError::Full(2)) => assert!(true), + _ => assert!(false), + } + assert_eq!(capacity(&c), 0); + } + + #[test] + fn receiving_once_with_one_send() { + let mut c = ChannelState::::new(); + assert!(c.try_send(1).is_ok()); + assert_eq!(c.try_receive().unwrap(), 1); + assert_eq!(capacity(&c), 3); + } + + #[test] + fn receiving_when_empty() { + let mut c = ChannelState::::new(); + match c.try_receive() { + Err(TryReceiveError::Empty) => assert!(true), + _ => assert!(false), + } + assert_eq!(capacity(&c), 3); + } + + #[test] + fn simple_send_and_receive() { + let c = PriorityChannel::::new(); + assert!(c.try_send(1).is_ok()); + assert_eq!(c.try_receive().unwrap(), 1); + } + + #[test] + fn cloning() { + let c = PriorityChannel::::new(); + let r1 = c.receiver(); + let s1 = c.sender(); + + let _ = r1.clone(); + let _ = s1.clone(); + } + + #[test] + fn dynamic_dispatch() { + let c = PriorityChannel::::new(); + let s: DynamicSender<'_, u32> = c.sender().into(); + let r: DynamicReceiver<'_, u32> = c.receiver().into(); + + assert!(s.try_send(1).is_ok()); + assert_eq!(r.try_receive().unwrap(), 1); + } + + #[futures_test::test] + async fn receiver_receives_given_try_send_async() { + let executor = ThreadPool::new().unwrap(); + + static CHANNEL: StaticCell> = StaticCell::new(); + let c = &*CHANNEL.init(PriorityChannel::new()); + let c2 = c; + assert!(executor + .spawn(async move { + assert!(c2.try_send(1).is_ok()); + }) + .is_ok()); + assert_eq!(c.receive().await, 1); + } + + #[futures_test::test] + async fn sender_send_completes_if_capacity() { + let c = PriorityChannel::::new(); + c.send(1).await; + assert_eq!(c.receive().await, 1); + } + + #[futures_test::test] + async fn senders_sends_wait_until_capacity() { + let executor = ThreadPool::new().unwrap(); + + static CHANNEL: StaticCell> = StaticCell::new(); + let c = &*CHANNEL.init(PriorityChannel::new()); + assert!(c.try_send(1).is_ok()); + + let c2 = c; + let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await }); + let c2 = c; + let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await }); + // Wish I could think of a means of determining that the async send is waiting instead. + // However, I've used the debugger to observe that the send does indeed wait. + Delay::new(Duration::from_millis(500)).await; + assert_eq!(c.receive().await, 1); + assert!(executor + .spawn(async move { + loop { + c.receive().await; + } + }) + .is_ok()); + send_task_1.unwrap().await; + send_task_2.unwrap().await; + } +} From 270ec324b0f4bdf400043bbb4023a32c6c7d94d4 Mon Sep 17 00:00:00 2001 From: Scott Mabin Date: Sat, 18 Nov 2023 14:31:09 +0000 Subject: [PATCH 2/7] Reduce duplication, fix tests --- embassy-sync/src/channel.rs | 2 +- embassy-sync/src/priority_channel.rs | 100 +++++++++++---------------- 2 files changed, 41 insertions(+), 61 deletions(-) diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index a512e0c4..aa267471 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -321,7 +321,7 @@ impl<'ch, T> Future for DynamicSendFuture<'ch, T> { impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {} -trait DynamicChannel { +pub(crate) trait DynamicChannel { fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError>; fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result; diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs index 04148b56..13c407c2 100644 --- a/embassy-sync/src/priority_channel.rs +++ b/embassy-sync/src/priority_channel.rs @@ -7,17 +7,19 @@ use core::future::Future; use core::pin::Pin; use core::task::{Context, Poll}; +use heapless::binary_heap::Kind; use heapless::BinaryHeap; use crate::blocking_mutex::raw::RawMutex; use crate::blocking_mutex::Mutex; +use crate::channel::{DynamicChannel, TryReceiveError, TrySendError}; use crate::waitqueue::WakerRegistration; /// Send-only access to a [`PriorityChannel`]. pub struct Sender<'ch, M, T, K, const N: usize> where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { channel: &'ch PriorityChannel, @@ -26,7 +28,7 @@ where impl<'ch, M, T, K, const N: usize> Clone for Sender<'ch, M, T, K, N> where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { fn clone(&self) -> Self { @@ -37,7 +39,7 @@ where impl<'ch, M, T, K, const N: usize> Copy for Sender<'ch, M, T, K, N> where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { } @@ -45,7 +47,7 @@ where impl<'ch, M, T, K, const N: usize> Sender<'ch, M, T, K, N> where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { /// Sends a value. @@ -86,7 +88,7 @@ impl<'ch, T> Copy for DynamicSender<'ch, T> {} impl<'ch, M, T, K, const N: usize> From> for DynamicSender<'ch, T> where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { fn from(s: Sender<'ch, M, T, K, N>) -> Self { @@ -124,7 +126,7 @@ impl<'ch, T> DynamicSender<'ch, T> { pub struct Receiver<'ch, M, T, K, const N: usize> where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { channel: &'ch PriorityChannel, @@ -133,7 +135,7 @@ where impl<'ch, M, T, K, const N: usize> Clone for Receiver<'ch, M, T, K, N> where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { fn clone(&self) -> Self { @@ -144,7 +146,7 @@ where impl<'ch, M, T, K, const N: usize> Copy for Receiver<'ch, M, T, K, N> where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { } @@ -152,7 +154,7 @@ where impl<'ch, M, T, K, const N: usize> Receiver<'ch, M, T, K, N> where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { /// Receive the next value. @@ -230,7 +232,7 @@ impl<'ch, T> DynamicReceiver<'ch, T> { impl<'ch, M, T, K, const N: usize> From> for DynamicReceiver<'ch, T> where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { fn from(s: Receiver<'ch, M, T, K, N>) -> Self { @@ -243,7 +245,7 @@ where pub struct ReceiveFuture<'ch, M, T, K, const N: usize> where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { channel: &'ch PriorityChannel, @@ -252,7 +254,7 @@ where impl<'ch, M, T, K, const N: usize> Future for ReceiveFuture<'ch, M, T, K, N> where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { type Output = T; @@ -284,7 +286,7 @@ impl<'ch, T> Future for DynamicReceiveFuture<'ch, T> { pub struct SendFuture<'ch, M, T, K, const N: usize> where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { channel: &'ch PriorityChannel, @@ -294,7 +296,7 @@ where impl<'ch, M, T, K, const N: usize> Future for SendFuture<'ch, M, T, K, N> where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { type Output = (); @@ -316,7 +318,7 @@ where impl<'ch, M, T, K, const N: usize> Unpin for SendFuture<'ch, M, T, K, N> where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { } @@ -347,34 +349,6 @@ impl<'ch, T> Future for DynamicSendFuture<'ch, T> { impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {} -trait DynamicChannel { - fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError>; - - fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result; - - fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>; - fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>; - - fn poll_receive(&self, cx: &mut Context<'_>) -> Poll; -} - -/// Error returned by [`try_receive`](PriorityChannel::try_receive). -#[derive(PartialEq, Eq, Clone, Copy, Debug)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub enum TryReceiveError { - /// A message could not be received because the channel is empty. - Empty, -} - -/// Error returned by [`try_send`](PriorityChannel::try_send). -#[derive(PartialEq, Eq, Clone, Copy, Debug)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub enum TrySendError { - /// The data could not be sent on the channel because the channel is - /// currently full and sending would require blocking. - Full(T), -} - struct ChannelState { queue: BinaryHeap, receiver_waker: WakerRegistration, @@ -384,7 +358,7 @@ struct ChannelState { impl ChannelState where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, { const fn new() -> Self { ChannelState { @@ -477,7 +451,7 @@ where pub struct PriorityChannel where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { inner: Mutex>>, @@ -486,17 +460,18 @@ where impl PriorityChannel where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { /// Establish a new bounded channel. For example, to create one with a NoopMutex: /// /// ``` - /// use embassy_sync::channel::PriorityChannel; + /// # use heapless::binary_heap::Max; + /// use embassy_sync::priority_channel::PriorityChannel; /// use embassy_sync::blocking_mutex::raw::NoopRawMutex; /// /// // Declare a bounded channel of 3 u32s. - /// let mut channel = PriorityChannel::::new(); + /// let mut channel = PriorityChannel::::new(); /// ``` pub const fn new() -> Self { Self { @@ -588,7 +563,7 @@ where impl DynamicChannel for PriorityChannel where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError> { @@ -619,25 +594,30 @@ mod tests { use futures_executor::ThreadPool; use futures_timer::Delay; use futures_util::task::SpawnExt; + use heapless::binary_heap::{Kind, Max}; use static_cell::StaticCell; use super::*; use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; - fn capacity(c: &ChannelState) -> usize { + fn capacity(c: &ChannelState) -> usize + where + T: Ord, + K: Kind, + { c.queue.capacity() - c.queue.len() } #[test] fn sending_once() { - let mut c = ChannelState::::new(); + let mut c = ChannelState::::new(); assert!(c.try_send(1).is_ok()); assert_eq!(capacity(&c), 2); } #[test] fn sending_when_full() { - let mut c = ChannelState::::new(); + let mut c = ChannelState::::new(); let _ = c.try_send(1); let _ = c.try_send(1); let _ = c.try_send(1); @@ -650,7 +630,7 @@ mod tests { #[test] fn receiving_once_with_one_send() { - let mut c = ChannelState::::new(); + let mut c = ChannelState::::new(); assert!(c.try_send(1).is_ok()); assert_eq!(c.try_receive().unwrap(), 1); assert_eq!(capacity(&c), 3); @@ -658,7 +638,7 @@ mod tests { #[test] fn receiving_when_empty() { - let mut c = ChannelState::::new(); + let mut c = ChannelState::::new(); match c.try_receive() { Err(TryReceiveError::Empty) => assert!(true), _ => assert!(false), @@ -668,14 +648,14 @@ mod tests { #[test] fn simple_send_and_receive() { - let c = PriorityChannel::::new(); + let c = PriorityChannel::::new(); assert!(c.try_send(1).is_ok()); assert_eq!(c.try_receive().unwrap(), 1); } #[test] fn cloning() { - let c = PriorityChannel::::new(); + let c = PriorityChannel::::new(); let r1 = c.receiver(); let s1 = c.sender(); @@ -685,7 +665,7 @@ mod tests { #[test] fn dynamic_dispatch() { - let c = PriorityChannel::::new(); + let c = PriorityChannel::::new(); let s: DynamicSender<'_, u32> = c.sender().into(); let r: DynamicReceiver<'_, u32> = c.receiver().into(); @@ -697,7 +677,7 @@ mod tests { async fn receiver_receives_given_try_send_async() { let executor = ThreadPool::new().unwrap(); - static CHANNEL: StaticCell> = StaticCell::new(); + static CHANNEL: StaticCell> = StaticCell::new(); let c = &*CHANNEL.init(PriorityChannel::new()); let c2 = c; assert!(executor @@ -710,7 +690,7 @@ mod tests { #[futures_test::test] async fn sender_send_completes_if_capacity() { - let c = PriorityChannel::::new(); + let c = PriorityChannel::::new(); c.send(1).await; assert_eq!(c.receive().await, 1); } @@ -719,7 +699,7 @@ mod tests { async fn senders_sends_wait_until_capacity() { let executor = ThreadPool::new().unwrap(); - static CHANNEL: StaticCell> = StaticCell::new(); + static CHANNEL: StaticCell> = StaticCell::new(); let c = &*CHANNEL.init(PriorityChannel::new()); assert!(c.try_send(1).is_ok()); From 2efa73f4313aeaad9b65b634425b689520096b9a Mon Sep 17 00:00:00 2001 From: Scott Mabin Date: Sat, 18 Nov 2023 14:37:15 +0000 Subject: [PATCH 3/7] docs and simple test for priority --- embassy-sync/src/priority_channel.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs index 13c407c2..6f419aa6 100644 --- a/embassy-sync/src/priority_channel.rs +++ b/embassy-sync/src/priority_channel.rs @@ -1,6 +1,7 @@ //! A queue for sending values between asynchronous tasks. //! //! Similar to a [`Channel`](crate::channel::Channel), however [`PriorityChannel`] sifts higher priority items to the front of the queue. +//! Priority is determined by the `Ord` trait. Priority behavior is determined by the [`Kind`](heapless::binary_heap::Kind) parameter of the channel. use core::cell::RefCell; use core::future::Future; @@ -628,6 +629,16 @@ mod tests { assert_eq!(capacity(&c), 0); } + #[test] + fn send_priority() { + // Prio channel with kind `Max` sifts larger numbers to the front of the queue + let mut c = ChannelState::::new(); + assert!(c.try_send(1).is_ok()); + assert!(c.try_send(3).is_ok()); + assert_eq!(c.try_receive().unwrap(), 3); + assert_eq!(c.try_receive().unwrap(), 1); + } + #[test] fn receiving_once_with_one_send() { let mut c = ChannelState::::new(); From 7589b5e13e0e01922804e603918fea0aa4dac30a Mon Sep 17 00:00:00 2001 From: Scott Mabin Date: Sat, 18 Nov 2023 14:56:29 +0000 Subject: [PATCH 4/7] reduce duplication further by sharing Dynamic sender/receiver --- embassy-sync/src/channel.rs | 4 +- embassy-sync/src/priority_channel.rs | 84 +--------------------------- 2 files changed, 3 insertions(+), 85 deletions(-) diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index aa267471..ff712930 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -76,7 +76,7 @@ where /// Send-only access to a [`Channel`] without knowing channel size. pub struct DynamicSender<'ch, T> { - channel: &'ch dyn DynamicChannel, + pub(crate) channel: &'ch dyn DynamicChannel, } impl<'ch, T> Clone for DynamicSender<'ch, T> { @@ -176,7 +176,7 @@ where /// Receive-only access to a [`Channel`] without knowing channel size. pub struct DynamicReceiver<'ch, T> { - channel: &'ch dyn DynamicChannel, + pub(crate) channel: &'ch dyn DynamicChannel, } impl<'ch, T> Clone for DynamicReceiver<'ch, T> { diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs index 6f419aa6..14366281 100644 --- a/embassy-sync/src/priority_channel.rs +++ b/embassy-sync/src/priority_channel.rs @@ -13,7 +13,7 @@ use heapless::BinaryHeap; use crate::blocking_mutex::raw::RawMutex; use crate::blocking_mutex::Mutex; -use crate::channel::{DynamicChannel, TryReceiveError, TrySendError}; +use crate::channel::{DynamicChannel, DynamicReceiver, DynamicSender, TryReceiveError, TrySendError}; use crate::waitqueue::WakerRegistration; /// Send-only access to a [`PriorityChannel`]. @@ -73,19 +73,6 @@ where } } -/// Send-only access to a [`PriorityChannel`] without knowing channel size. -pub struct DynamicSender<'ch, T> { - channel: &'ch dyn DynamicChannel, -} - -impl<'ch, T> Clone for DynamicSender<'ch, T> { - fn clone(&self) -> Self { - DynamicSender { channel: self.channel } - } -} - -impl<'ch, T> Copy for DynamicSender<'ch, T> {} - impl<'ch, M, T, K, const N: usize> From> for DynamicSender<'ch, T> where T: Ord, @@ -97,32 +84,6 @@ where } } -impl<'ch, T> DynamicSender<'ch, T> { - /// Sends a value. - /// - /// See [`PriorityChannel::send()`] - pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> { - DynamicSendFuture { - channel: self.channel, - message: Some(message), - } - } - - /// Attempt to immediately send a message. - /// - /// See [`PriorityChannel::send()`] - pub fn try_send(&self, message: T) -> Result<(), TrySendError> { - self.channel.try_send_with_context(message, None) - } - - /// Allows a poll_fn to poll until the channel is ready to send - /// - /// See [`PriorityChannel::poll_ready_to_send()`] - pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { - self.channel.poll_ready_to_send(cx) - } -} - /// Receive-only access to a [`PriorityChannel`]. pub struct Receiver<'ch, M, T, K, const N: usize> where @@ -187,49 +148,6 @@ where } } -/// Receive-only access to a [`PriorityChannel`] without knowing channel size. -pub struct DynamicReceiver<'ch, T> { - channel: &'ch dyn DynamicChannel, -} - -impl<'ch, T> Clone for DynamicReceiver<'ch, T> { - fn clone(&self) -> Self { - DynamicReceiver { channel: self.channel } - } -} - -impl<'ch, T> Copy for DynamicReceiver<'ch, T> {} - -impl<'ch, T> DynamicReceiver<'ch, T> { - /// Receive the next value. - /// - /// See [`PriorityChannel::receive()`]. - pub fn receive(&self) -> DynamicReceiveFuture<'_, T> { - DynamicReceiveFuture { channel: self.channel } - } - - /// Attempt to immediately receive the next value. - /// - /// See [`PriorityChannel::try_receive()`] - pub fn try_receive(&self) -> Result { - self.channel.try_receive_with_context(None) - } - - /// Allows a poll_fn to poll until the channel is ready to receive - /// - /// See [`PriorityChannel::poll_ready_to_receive()`] - pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> { - self.channel.poll_ready_to_receive(cx) - } - - /// Poll the channel for the next item - /// - /// See [`PriorityChannel::poll_receive()`] - pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll { - self.channel.poll_receive(cx) - } -} - impl<'ch, M, T, K, const N: usize> From> for DynamicReceiver<'ch, T> where T: Ord, From f482a105b8491f3c21d41cb7e6f52fe6d778258f Mon Sep 17 00:00:00 2001 From: Scott Mabin Date: Sat, 18 Nov 2023 15:01:12 +0000 Subject: [PATCH 5/7] more clean up, refactor channel into module to share code --- embassy-sync/src/channel.rs | 8 ++-- .../priority.rs} | 45 +------------------ embassy-sync/src/lib.rs | 1 - 3 files changed, 6 insertions(+), 48 deletions(-) rename embassy-sync/src/{priority_channel.rs => channel/priority.rs} (92%) diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index ff712930..1843bbae 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -29,6 +29,8 @@ use crate::blocking_mutex::raw::RawMutex; use crate::blocking_mutex::Mutex; use crate::waitqueue::WakerRegistration; +pub mod priority; + /// Send-only access to a [`Channel`]. pub struct Sender<'ch, M, T, const N: usize> where @@ -76,7 +78,7 @@ where /// Send-only access to a [`Channel`] without knowing channel size. pub struct DynamicSender<'ch, T> { - pub(crate) channel: &'ch dyn DynamicChannel, + channel: &'ch dyn DynamicChannel, } impl<'ch, T> Clone for DynamicSender<'ch, T> { @@ -176,7 +178,7 @@ where /// Receive-only access to a [`Channel`] without knowing channel size. pub struct DynamicReceiver<'ch, T> { - pub(crate) channel: &'ch dyn DynamicChannel, + channel: &'ch dyn DynamicChannel, } impl<'ch, T> Clone for DynamicReceiver<'ch, T> { @@ -321,7 +323,7 @@ impl<'ch, T> Future for DynamicSendFuture<'ch, T> { impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {} -pub(crate) trait DynamicChannel { +trait DynamicChannel { fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError>; fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result; diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/channel/priority.rs similarity index 92% rename from embassy-sync/src/priority_channel.rs rename to embassy-sync/src/channel/priority.rs index 14366281..1fd137db 100644 --- a/embassy-sync/src/priority_channel.rs +++ b/embassy-sync/src/channel/priority.rs @@ -183,23 +183,6 @@ where } } -/// Future returned by [`DynamicReceiver::receive`]. -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct DynamicReceiveFuture<'ch, T> { - channel: &'ch dyn DynamicChannel, -} - -impl<'ch, T> Future for DynamicReceiveFuture<'ch, T> { - type Output = T; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.channel.try_receive_with_context(Some(cx)) { - Ok(v) => Poll::Ready(v), - Err(TryReceiveError::Empty) => Poll::Pending, - } - } -} - /// Future returned by [`PriorityChannel::send`] and [`Sender::send`]. #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct SendFuture<'ch, M, T, K, const N: usize> @@ -242,32 +225,6 @@ where { } -/// Future returned by [`DynamicSender::send`]. -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct DynamicSendFuture<'ch, T> { - channel: &'ch dyn DynamicChannel, - message: Option, -} - -impl<'ch, T> Future for DynamicSendFuture<'ch, T> { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.message.take() { - Some(m) => match self.channel.try_send_with_context(m, Some(cx)) { - Ok(..) => Poll::Ready(()), - Err(TrySendError::Full(m)) => { - self.message = Some(m); - Poll::Pending - } - }, - None => panic!("Message cannot be None"), - } - } -} - -impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {} - struct ChannelState { queue: BinaryHeap, receiver_waker: WakerRegistration, @@ -386,7 +343,7 @@ where /// /// ``` /// # use heapless::binary_heap::Max; - /// use embassy_sync::priority_channel::PriorityChannel; + /// use embassy_sync::channel::priority::PriorityChannel; /// use embassy_sync::blocking_mutex::raw::NoopRawMutex; /// /// // Declare a bounded channel of 3 u32s. diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs index 3ffcb913..c40fa3b6 100644 --- a/embassy-sync/src/lib.rs +++ b/embassy-sync/src/lib.rs @@ -15,7 +15,6 @@ pub mod blocking_mutex; pub mod channel; pub mod mutex; pub mod pipe; -pub mod priority_channel; pub mod pubsub; pub mod signal; pub mod waitqueue; From 5a60024af71b70c059d4a2a2eacdfd7f73a3398d Mon Sep 17 00:00:00 2001 From: Scott Mabin Date: Sat, 18 Nov 2023 15:08:16 +0000 Subject: [PATCH 6/7] docs --- embassy-sync/README.md | 1 + embassy-sync/src/channel/priority.rs | 6 +++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/embassy-sync/README.md b/embassy-sync/README.md index cc65cf6e..55618f72 100644 --- a/embassy-sync/README.md +++ b/embassy-sync/README.md @@ -5,6 +5,7 @@ An [Embassy](https://embassy.dev) project. Synchronization primitives and data structures with async support: - [`Channel`](channel::Channel) - A Multiple Producer Multiple Consumer (MPMC) channel. Each message is only received by a single consumer. +- [`PriorityChannel`](channel::priority::PriorityChannel) - A Multiple Producer Multiple Consumer (MPMC) channel. Each message is only received by a single consumer. Higher priority items are sifted to the front of the channel. - [`PubSubChannel`](pubsub::PubSubChannel) - A broadcast channel (publish-subscribe) channel. Each message is received by all consumers. - [`Signal`](signal::Signal) - Signalling latest value to a single consumer. - [`Mutex`](mutex::Mutex) - Mutex for synchronizing state between asynchronous tasks. diff --git a/embassy-sync/src/channel/priority.rs b/embassy-sync/src/channel/priority.rs index 1fd137db..61dc7be6 100644 --- a/embassy-sync/src/channel/priority.rs +++ b/embassy-sync/src/channel/priority.rs @@ -323,7 +323,9 @@ where /// buffer is full, attempts to `send` new messages will wait until a message is /// received from the channel. /// -/// All data sent will become available in the same order as it was sent. +/// Sent data may be reordered based on their priorty within the channel. +/// For example, in a [`Max`](heapless::binary_heap::Max) [`PriorityChannel`] +/// containing `u32`'s, data sent in the following order `[1, 2, 3]` will be recieved as `[3, 2, 1]`. pub struct PriorityChannel where T: Ord, @@ -509,8 +511,10 @@ mod tests { // Prio channel with kind `Max` sifts larger numbers to the front of the queue let mut c = ChannelState::::new(); assert!(c.try_send(1).is_ok()); + assert!(c.try_send(2).is_ok()); assert!(c.try_send(3).is_ok()); assert_eq!(c.try_receive().unwrap(), 3); + assert_eq!(c.try_receive().unwrap(), 2); assert_eq!(c.try_receive().unwrap(), 1); } From 454828accbfa3eecfbe782a6a23435c7a01ee29b Mon Sep 17 00:00:00 2001 From: Scott Mabin Date: Mon, 20 Nov 2023 11:28:31 +0000 Subject: [PATCH 7/7] revert module changes, reexport heapless relevant items --- embassy-sync/src/channel.rs | 8 +++----- embassy-sync/src/lib.rs | 1 + .../src/{channel/priority.rs => priority_channel.rs} | 5 ++--- 3 files changed, 6 insertions(+), 8 deletions(-) rename embassy-sync/src/{channel/priority.rs => priority_channel.rs} (99%) diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index 1843bbae..ff712930 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -29,8 +29,6 @@ use crate::blocking_mutex::raw::RawMutex; use crate::blocking_mutex::Mutex; use crate::waitqueue::WakerRegistration; -pub mod priority; - /// Send-only access to a [`Channel`]. pub struct Sender<'ch, M, T, const N: usize> where @@ -78,7 +76,7 @@ where /// Send-only access to a [`Channel`] without knowing channel size. pub struct DynamicSender<'ch, T> { - channel: &'ch dyn DynamicChannel, + pub(crate) channel: &'ch dyn DynamicChannel, } impl<'ch, T> Clone for DynamicSender<'ch, T> { @@ -178,7 +176,7 @@ where /// Receive-only access to a [`Channel`] without knowing channel size. pub struct DynamicReceiver<'ch, T> { - channel: &'ch dyn DynamicChannel, + pub(crate) channel: &'ch dyn DynamicChannel, } impl<'ch, T> Clone for DynamicReceiver<'ch, T> { @@ -323,7 +321,7 @@ impl<'ch, T> Future for DynamicSendFuture<'ch, T> { impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {} -trait DynamicChannel { +pub(crate) trait DynamicChannel { fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError>; fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result; diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs index c40fa3b6..3ffcb913 100644 --- a/embassy-sync/src/lib.rs +++ b/embassy-sync/src/lib.rs @@ -15,6 +15,7 @@ pub mod blocking_mutex; pub mod channel; pub mod mutex; pub mod pipe; +pub mod priority_channel; pub mod pubsub; pub mod signal; pub mod waitqueue; diff --git a/embassy-sync/src/channel/priority.rs b/embassy-sync/src/priority_channel.rs similarity index 99% rename from embassy-sync/src/channel/priority.rs rename to embassy-sync/src/priority_channel.rs index 61dc7be6..bd75c013 100644 --- a/embassy-sync/src/channel/priority.rs +++ b/embassy-sync/src/priority_channel.rs @@ -8,7 +8,7 @@ use core::future::Future; use core::pin::Pin; use core::task::{Context, Poll}; -use heapless::binary_heap::Kind; +pub use heapless::binary_heap::{Kind, Max, Min}; use heapless::BinaryHeap; use crate::blocking_mutex::raw::RawMutex; @@ -344,8 +344,7 @@ where /// Establish a new bounded channel. For example, to create one with a NoopMutex: /// /// ``` - /// # use heapless::binary_heap::Max; - /// use embassy_sync::channel::priority::PriorityChannel; + /// use embassy_sync::priority_channel::{PriorityChannel, Max}; /// use embassy_sync::blocking_mutex::raw::NoopRawMutex; /// /// // Declare a bounded channel of 3 u32s.