diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index a512e0c4..aa267471 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -321,7 +321,7 @@ impl<'ch, T> Future for DynamicSendFuture<'ch, T> { impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {} -trait DynamicChannel { +pub(crate) trait DynamicChannel { fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError>; fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result; diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs index 04148b56..13c407c2 100644 --- a/embassy-sync/src/priority_channel.rs +++ b/embassy-sync/src/priority_channel.rs @@ -7,17 +7,19 @@ use core::future::Future; use core::pin::Pin; use core::task::{Context, Poll}; +use heapless::binary_heap::Kind; use heapless::BinaryHeap; use crate::blocking_mutex::raw::RawMutex; use crate::blocking_mutex::Mutex; +use crate::channel::{DynamicChannel, TryReceiveError, TrySendError}; use crate::waitqueue::WakerRegistration; /// Send-only access to a [`PriorityChannel`]. pub struct Sender<'ch, M, T, K, const N: usize> where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { channel: &'ch PriorityChannel, @@ -26,7 +28,7 @@ where impl<'ch, M, T, K, const N: usize> Clone for Sender<'ch, M, T, K, N> where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { fn clone(&self) -> Self { @@ -37,7 +39,7 @@ where impl<'ch, M, T, K, const N: usize> Copy for Sender<'ch, M, T, K, N> where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { } @@ -45,7 +47,7 @@ where impl<'ch, M, T, K, const N: usize> Sender<'ch, M, T, K, N> where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { /// Sends a value. @@ -86,7 +88,7 @@ impl<'ch, T> Copy for DynamicSender<'ch, T> {} impl<'ch, M, T, K, const N: usize> From> for DynamicSender<'ch, T> where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { fn from(s: Sender<'ch, M, T, K, N>) -> Self { @@ -124,7 +126,7 @@ impl<'ch, T> DynamicSender<'ch, T> { pub struct Receiver<'ch, M, T, K, const N: usize> where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { channel: &'ch PriorityChannel, @@ -133,7 +135,7 @@ where impl<'ch, M, T, K, const N: usize> Clone for Receiver<'ch, M, T, K, N> where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { fn clone(&self) -> Self { @@ -144,7 +146,7 @@ where impl<'ch, M, T, K, const N: usize> Copy for Receiver<'ch, M, T, K, N> where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { } @@ -152,7 +154,7 @@ where impl<'ch, M, T, K, const N: usize> Receiver<'ch, M, T, K, N> where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { /// Receive the next value. @@ -230,7 +232,7 @@ impl<'ch, T> DynamicReceiver<'ch, T> { impl<'ch, M, T, K, const N: usize> From> for DynamicReceiver<'ch, T> where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { fn from(s: Receiver<'ch, M, T, K, N>) -> Self { @@ -243,7 +245,7 @@ where pub struct ReceiveFuture<'ch, M, T, K, const N: usize> where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { channel: &'ch PriorityChannel, @@ -252,7 +254,7 @@ where impl<'ch, M, T, K, const N: usize> Future for ReceiveFuture<'ch, M, T, K, N> where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { type Output = T; @@ -284,7 +286,7 @@ impl<'ch, T> Future for DynamicReceiveFuture<'ch, T> { pub struct SendFuture<'ch, M, T, K, const N: usize> where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { channel: &'ch PriorityChannel, @@ -294,7 +296,7 @@ where impl<'ch, M, T, K, const N: usize> Future for SendFuture<'ch, M, T, K, N> where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { type Output = (); @@ -316,7 +318,7 @@ where impl<'ch, M, T, K, const N: usize> Unpin for SendFuture<'ch, M, T, K, N> where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { } @@ -347,34 +349,6 @@ impl<'ch, T> Future for DynamicSendFuture<'ch, T> { impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {} -trait DynamicChannel { - fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError>; - - fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result; - - fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>; - fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>; - - fn poll_receive(&self, cx: &mut Context<'_>) -> Poll; -} - -/// Error returned by [`try_receive`](PriorityChannel::try_receive). -#[derive(PartialEq, Eq, Clone, Copy, Debug)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub enum TryReceiveError { - /// A message could not be received because the channel is empty. - Empty, -} - -/// Error returned by [`try_send`](PriorityChannel::try_send). -#[derive(PartialEq, Eq, Clone, Copy, Debug)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub enum TrySendError { - /// The data could not be sent on the channel because the channel is - /// currently full and sending would require blocking. - Full(T), -} - struct ChannelState { queue: BinaryHeap, receiver_waker: WakerRegistration, @@ -384,7 +358,7 @@ struct ChannelState { impl ChannelState where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, { const fn new() -> Self { ChannelState { @@ -477,7 +451,7 @@ where pub struct PriorityChannel where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { inner: Mutex>>, @@ -486,17 +460,18 @@ where impl PriorityChannel where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { /// Establish a new bounded channel. For example, to create one with a NoopMutex: /// /// ``` - /// use embassy_sync::channel::PriorityChannel; + /// # use heapless::binary_heap::Max; + /// use embassy_sync::priority_channel::PriorityChannel; /// use embassy_sync::blocking_mutex::raw::NoopRawMutex; /// /// // Declare a bounded channel of 3 u32s. - /// let mut channel = PriorityChannel::::new(); + /// let mut channel = PriorityChannel::::new(); /// ``` pub const fn new() -> Self { Self { @@ -588,7 +563,7 @@ where impl DynamicChannel for PriorityChannel where T: Ord, - K: heapless::binary_heap::Kind, + K: Kind, M: RawMutex, { fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError> { @@ -619,25 +594,30 @@ mod tests { use futures_executor::ThreadPool; use futures_timer::Delay; use futures_util::task::SpawnExt; + use heapless::binary_heap::{Kind, Max}; use static_cell::StaticCell; use super::*; use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; - fn capacity(c: &ChannelState) -> usize { + fn capacity(c: &ChannelState) -> usize + where + T: Ord, + K: Kind, + { c.queue.capacity() - c.queue.len() } #[test] fn sending_once() { - let mut c = ChannelState::::new(); + let mut c = ChannelState::::new(); assert!(c.try_send(1).is_ok()); assert_eq!(capacity(&c), 2); } #[test] fn sending_when_full() { - let mut c = ChannelState::::new(); + let mut c = ChannelState::::new(); let _ = c.try_send(1); let _ = c.try_send(1); let _ = c.try_send(1); @@ -650,7 +630,7 @@ mod tests { #[test] fn receiving_once_with_one_send() { - let mut c = ChannelState::::new(); + let mut c = ChannelState::::new(); assert!(c.try_send(1).is_ok()); assert_eq!(c.try_receive().unwrap(), 1); assert_eq!(capacity(&c), 3); @@ -658,7 +638,7 @@ mod tests { #[test] fn receiving_when_empty() { - let mut c = ChannelState::::new(); + let mut c = ChannelState::::new(); match c.try_receive() { Err(TryReceiveError::Empty) => assert!(true), _ => assert!(false), @@ -668,14 +648,14 @@ mod tests { #[test] fn simple_send_and_receive() { - let c = PriorityChannel::::new(); + let c = PriorityChannel::::new(); assert!(c.try_send(1).is_ok()); assert_eq!(c.try_receive().unwrap(), 1); } #[test] fn cloning() { - let c = PriorityChannel::::new(); + let c = PriorityChannel::::new(); let r1 = c.receiver(); let s1 = c.sender(); @@ -685,7 +665,7 @@ mod tests { #[test] fn dynamic_dispatch() { - let c = PriorityChannel::::new(); + let c = PriorityChannel::::new(); let s: DynamicSender<'_, u32> = c.sender().into(); let r: DynamicReceiver<'_, u32> = c.receiver().into(); @@ -697,7 +677,7 @@ mod tests { async fn receiver_receives_given_try_send_async() { let executor = ThreadPool::new().unwrap(); - static CHANNEL: StaticCell> = StaticCell::new(); + static CHANNEL: StaticCell> = StaticCell::new(); let c = &*CHANNEL.init(PriorityChannel::new()); let c2 = c; assert!(executor @@ -710,7 +690,7 @@ mod tests { #[futures_test::test] async fn sender_send_completes_if_capacity() { - let c = PriorityChannel::::new(); + let c = PriorityChannel::::new(); c.send(1).await; assert_eq!(c.receive().await, 1); } @@ -719,7 +699,7 @@ mod tests { async fn senders_sends_wait_until_capacity() { let executor = ThreadPool::new().unwrap(); - static CHANNEL: StaticCell> = StaticCell::new(); + static CHANNEL: StaticCell> = StaticCell::new(); let c = &*CHANNEL.init(PriorityChannel::new()); assert!(c.try_send(1).is_ok());