1282: fix(pubsub): Pop messages with count=0 after unsubscribe r=Dirbaio a=rmja



Co-authored-by: Rasmus Melchior Jacobsen <rmja@laesoe.org>
This commit is contained in:
bors[bot] 2023-03-19 23:19:23 +00:00 committed by GitHub
commit 3e541c43e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -371,6 +371,20 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta
.iter_mut() .iter_mut()
.skip(current_message_index) .skip(current_message_index)
.for_each(|(_, counter)| *counter -= 1); .for_each(|(_, counter)| *counter -= 1);
let mut wake_publishers = false;
while let Some((_, count)) = self.queue.front() {
if *count == 0 {
self.queue.pop_front().unwrap();
wake_publishers = true;
} else {
break;
}
}
if wake_publishers {
self.publisher_wakers.wake();
}
} }
} }
@ -612,4 +626,37 @@ mod tests {
sub1.next_message().await; sub1.next_message().await;
assert_eq!(pub0.space(), 4); assert_eq!(pub0.space(), 4);
} }
#[futures_test::test]
async fn empty_channel_when_last_subscriber_is_dropped() {
let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
let pub0 = channel.publisher().unwrap();
let mut sub0 = channel.subscriber().unwrap();
let mut sub1 = channel.subscriber().unwrap();
assert_eq!(4, pub0.space());
pub0.publish(1).await;
pub0.publish(2).await;
assert_eq!(2, channel.space());
assert_eq!(1, sub0.try_next_message_pure().unwrap());
assert_eq!(2, sub0.try_next_message_pure().unwrap());
assert_eq!(2, channel.space());
drop(sub0);
assert_eq!(2, channel.space());
assert_eq!(1, sub1.try_next_message_pure().unwrap());
assert_eq!(3, channel.space());
drop(sub1);
assert_eq!(4, channel.space());
}
} }