diff --git a/embassy/src/util/mpsc.rs b/embassy/src/util/mpsc.rs index cc9e2a5d..d41c8629 100644 --- a/embassy/src/util/mpsc.rs +++ b/embassy/src/util/mpsc.rs @@ -156,18 +156,10 @@ where /// closed by `recv` until they are all consumed. /// /// [`close`]: Self::close - pub async fn recv(&mut self) -> Option { - futures::future::poll_fn(|cx| self.recv_poll(cx)).await - } - - fn recv_poll(&mut self, cx: &mut Context<'_>) -> Poll> { - Channel::lock(self.channel_cell, |c| { - match c.try_recv_with_context(Some(cx)) { - Ok(v) => Poll::Ready(Some(v)), - Err(TryRecvError::Closed) => Poll::Ready(None), - Err(TryRecvError::Empty) => Poll::Pending, - } - }) + pub fn recv(&mut self) -> RecvFuture<'ch, M, T, N> { + RecvFuture { + channel_cell: self.channel_cell, + } } /// Attempts to immediately receive a message on this `Receiver` @@ -202,6 +194,30 @@ where } } +pub struct RecvFuture<'ch, M, T, const N: usize> +where + M: Mutex, +{ + channel_cell: &'ch UnsafeCell>, +} + +impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N> +where + M: Mutex, +{ + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Channel::lock(self.channel_cell, |c| { + match c.try_recv_with_context(Some(cx)) { + Ok(v) => Poll::Ready(Some(v)), + Err(TryRecvError::Closed) => Poll::Ready(None), + Err(TryRecvError::Empty) => Poll::Pending, + } + }) + } +} + impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> where M: Mutex, @@ -224,12 +240,11 @@ where /// /// [`close`]: Receiver::close /// [`Receiver`]: Receiver - pub async fn send(&self, message: T) -> Result<(), SendError> { + pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> { SendFuture { sender: self.clone(), message: Some(message), } - .await } /// Attempts to immediately send a message on this `Sender` @@ -278,7 +293,7 @@ where } } -struct SendFuture<'ch, M, T, const N: usize> +pub struct SendFuture<'ch, M, T, const N: usize> where M: Mutex, {