Stream now ignores lag
This commit is contained in:
parent
c7cdecfc93
commit
790426e2f6
@ -300,8 +300,10 @@ impl<'a, T: Clone> Drop for Subscriber<'a, T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Warning: The stream implementation ignores lag results and returns all messages.
|
||||||
|
/// This might miss some messages without you knowing it.
|
||||||
impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> {
|
impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> {
|
||||||
type Item = WaitResult<T>;
|
type Item = T;
|
||||||
|
|
||||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
let this = unsafe { self.get_unchecked_mut() };
|
let this = unsafe { self.get_unchecked_mut() };
|
||||||
@ -311,7 +313,7 @@ impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> {
|
|||||||
// Yes, so we are done polling
|
// Yes, so we are done polling
|
||||||
Some(WaitResult::Message(message)) => {
|
Some(WaitResult::Message(message)) => {
|
||||||
this.next_message_id += 1;
|
this.next_message_id += 1;
|
||||||
Poll::Ready(Some(WaitResult::Message(message)))
|
Poll::Ready(Some(message))
|
||||||
}
|
}
|
||||||
// No, so we need to reregister our waker and sleep again
|
// No, so we need to reregister our waker and sleep again
|
||||||
None => {
|
None => {
|
||||||
@ -321,10 +323,12 @@ impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> {
|
|||||||
}
|
}
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
}
|
}
|
||||||
// We missed a couple of messages. We must do our internal bookkeeping and return that we lagged
|
// We missed a couple of messages. We must do our internal bookkeeping.
|
||||||
|
// This stream impl doesn't return lag results, so we just ignore and start over
|
||||||
Some(WaitResult::Lagged(amount)) => {
|
Some(WaitResult::Lagged(amount)) => {
|
||||||
this.next_message_id += amount;
|
this.next_message_id += amount;
|
||||||
Poll::Ready(Some(WaitResult::Lagged(amount)))
|
cx.waker().wake_by_ref();
|
||||||
|
Poll::Pending
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user