diff --git a/embassy/src/channel/pubsub.rs b/embassy/src/channel/pubsub.rs index d81e822a..4c360fea 100644 --- a/embassy/src/channel/pubsub.rs +++ b/embassy/src/channel/pubsub.rs @@ -300,8 +300,10 @@ impl<'a, T: Clone> Drop for Subscriber<'a, T> { } } +/// Warning: The stream implementation ignores lag results and returns all messages. +/// This might miss some messages without you knowing it. impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> { - type Item = WaitResult; + type Item = T; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = unsafe { self.get_unchecked_mut() }; @@ -311,7 +313,7 @@ impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> { // Yes, so we are done polling Some(WaitResult::Message(message)) => { this.next_message_id += 1; - Poll::Ready(Some(WaitResult::Message(message))) + Poll::Ready(Some(message)) } // No, so we need to reregister our waker and sleep again None => { @@ -321,10 +323,12 @@ impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> { } Poll::Pending } - // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged + // We missed a couple of messages. We must do our internal bookkeeping. + // This stream impl doesn't return lag results, so we just ignore and start over Some(WaitResult::Lagged(amount)) => { this.next_message_id += amount; - Poll::Ready(Some(WaitResult::Lagged(amount))) + cx.waker().wake_by_ref(); + Poll::Pending } } }