diff --git a/embassy/src/channel/channel.rs b/embassy/src/channel/channel.rs index d749e597..c7a89793 100644 --- a/embassy/src/channel/channel.rs +++ b/embassy/src/channel/channel.rs @@ -66,6 +66,57 @@ where } } +/// Send-only access to a [`Channel`] without knowing channel size. +#[derive(Copy)] +pub struct DynamicSender<'ch, M, T> +where + M: RawMutex, +{ + channel: &'ch dyn DynamicChannel, +} + +impl<'ch, M, T> Clone for DynamicSender<'ch, M, T> +where + M: RawMutex, +{ + fn clone(&self) -> Self { + DynamicSender { + channel: self.channel, + } + } +} + +impl<'ch, M, T, const N: usize> From> for DynamicSender<'ch, M, T> +where + M: RawMutex, +{ + fn from(s: Sender<'ch, M, T, N>) -> Self { + Self { channel: s.channel } + } +} + +impl<'ch, M, T> DynamicSender<'ch, M, T> +where + M: RawMutex, +{ + /// Sends a value. + /// + /// See [`Channel::send()`] + pub fn send(&self, message: T) -> DynamicSendFuture<'ch, M, T> { + DynamicSendFuture { + channel: self.channel, + message: Some(message), + } + } + + /// Attempt to immediately send a message. + /// + /// See [`Channel::send()`] + pub fn try_send(&self, message: T) -> Result<(), TrySendError> { + self.channel.try_send_with_context(message, None) + } +} + /// Receive-only access to a [`Channel`]. #[derive(Copy)] pub struct Receiver<'ch, M, T, const N: usize> @@ -105,6 +156,56 @@ where } } +/// Receive-only access to a [`Channel`] without knowing channel size. +#[derive(Copy)] +pub struct DynamicReceiver<'ch, M, T> +where + M: RawMutex, +{ + channel: &'ch dyn DynamicChannel, +} + +impl<'ch, M, T> Clone for DynamicReceiver<'ch, M, T> +where + M: RawMutex, +{ + fn clone(&self) -> Self { + DynamicReceiver { + channel: self.channel, + } + } +} + +impl<'ch, M, T> DynamicReceiver<'ch, M, T> +where + M: RawMutex, +{ + /// Receive the next value. + /// + /// See [`Channel::recv()`]. + pub fn recv(&self) -> DynamicRecvFuture<'_, M, T> { + DynamicRecvFuture { + channel: self.channel, + } + } + + /// Attempt to immediately receive the next value. + /// + /// See [`Channel::try_recv()`] + pub fn try_recv(&self) -> Result { + self.channel.try_recv_with_context(None) + } +} + +impl<'ch, M, T, const N: usize> From> for DynamicReceiver<'ch, M, T> +where + M: RawMutex, +{ + fn from(s: Receiver<'ch, M, T, N>) -> Self { + Self { channel: s.channel } + } +} + pub struct RecvFuture<'ch, M, T, const N: usize> where M: RawMutex, @@ -119,11 +220,31 @@ where type Output = T; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.channel - .lock(|c| match c.try_recv_with_context(Some(cx)) { - Ok(v) => Poll::Ready(v), - Err(TryRecvError::Empty) => Poll::Pending, - }) + match self.channel.try_recv_with_context(Some(cx)) { + Ok(v) => Poll::Ready(v), + Err(TryRecvError::Empty) => Poll::Pending, + } + } +} + +pub struct DynamicRecvFuture<'ch, M, T> +where + M: RawMutex, +{ + channel: &'ch dyn DynamicChannel, +} + +impl<'ch, M, T> Future for DynamicRecvFuture<'ch, M, T> +where + M: RawMutex, +{ + type Output = T; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.channel.try_recv_with_context(Some(cx)) { + Ok(v) => Poll::Ready(v), + Err(TryRecvError::Empty) => Poll::Pending, + } } } @@ -143,7 +264,7 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.message.take() { - Some(m) => match self.channel.lock(|c| c.try_send_with_context(m, Some(cx))) { + Some(m) => match self.channel.try_send_with_context(m, Some(cx)) { Ok(..) => Poll::Ready(()), Err(TrySendError::Full(m)) => { self.message = Some(m); @@ -157,6 +278,49 @@ where impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {} +pub struct DynamicSendFuture<'ch, M, T> +where + M: RawMutex, +{ + channel: &'ch dyn DynamicChannel, + message: Option, +} + +impl<'ch, M, T> Future for DynamicSendFuture<'ch, M, T> +where + M: RawMutex, +{ + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.message.take() { + Some(m) => match self.channel.try_send_with_context(m, Some(cx)) { + Ok(..) => Poll::Ready(()), + Err(TrySendError::Full(m)) => { + self.message = Some(m); + Poll::Pending + } + }, + None => panic!("Message cannot be None"), + } + } +} + +impl<'ch, M, T> Unpin for DynamicSendFuture<'ch, M, T> where M: RawMutex {} + +trait DynamicChannel +where + M: RawMutex, +{ + fn try_send_with_context( + &self, + message: T, + cx: Option<&mut Context<'_>>, + ) -> Result<(), TrySendError>; + + fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result; +} + /// Error returned by [`try_recv`](Channel::try_recv). #[derive(PartialEq, Eq, Clone, Copy, Debug)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] @@ -287,6 +451,18 @@ where self.inner.lock(|rc| f(&mut *rc.borrow_mut())) } + fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result { + self.lock(|c| c.try_recv_with_context(cx)) + } + + fn try_send_with_context( + &self, + m: T, + cx: Option<&mut Context<'_>>, + ) -> Result<(), TrySendError> { + self.lock(|c| c.try_send_with_context(m, cx)) + } + /// Get a sender for this channel. pub fn sender(&self) -> Sender<'_, M, T, N> { Sender { channel: self } @@ -339,6 +515,25 @@ where } } +/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the +/// tradeoff cost of dynamic dispatch. +impl DynamicChannel for Channel +where + M: RawMutex, +{ + fn try_send_with_context( + &self, + m: T, + cx: Option<&mut Context<'_>>, + ) -> Result<(), TrySendError> { + Channel::try_send_with_context(self, m, cx) + } + + fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result { + Channel::try_recv_with_context(self, cx) + } +} + #[cfg(test)] mod tests { use core::time::Duration; @@ -411,6 +606,16 @@ mod tests { let _ = s1.clone(); } + #[test] + fn dynamic_dispatch() { + let c = Channel::::new(); + let s: DynamicSender<'_, NoopRawMutex, u32> = c.sender().into(); + let r: DynamicReceiver<'_, NoopRawMutex, u32> = c.receiver().into(); + + assert!(s.try_send(1).is_ok()); + assert_eq!(r.try_recv().unwrap(), 1); + } + #[futures_test::test] async fn receiver_receives_given_try_send_async() { let executor = ThreadPool::new().unwrap();