From 12a6ddfbcd79f2ab62ba264acd997dca0ac64a99 Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Thu, 16 Jun 2022 12:28:12 +0200 Subject: [PATCH 01/18] Added a pubsub channel implementation --- embassy/src/channel/mod.rs | 2 +- embassy/src/channel/pubsub.rs | 590 ++++++++++++++++++++++++++++++++++ 2 files changed, 591 insertions(+), 1 deletion(-) create mode 100644 embassy/src/channel/pubsub.rs diff --git a/embassy/src/channel/mod.rs b/embassy/src/channel/mod.rs index 05edc55d..5df1f5c5 100644 --- a/embassy/src/channel/mod.rs +++ b/embassy/src/channel/mod.rs @@ -1,5 +1,5 @@ //! Async channels pub mod mpmc; - +pub mod pubsub; pub mod signal; diff --git a/embassy/src/channel/pubsub.rs b/embassy/src/channel/pubsub.rs new file mode 100644 index 00000000..5e5cce9c --- /dev/null +++ b/embassy/src/channel/pubsub.rs @@ -0,0 +1,590 @@ +//! Implementation of [PubSubChannel], a queue where published messages get received by all subscribers. + +use core::cell::RefCell; +use core::fmt::Debug; +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll, Waker}; + +use heapless::Deque; + +use crate::blocking_mutex::raw::RawMutex; +use crate::blocking_mutex::Mutex; +use crate::waitqueue::WakerRegistration; + +/// A broadcast channel implementation where multiple publishers can send messages to multiple subscribers +/// +/// Any published message can be read by all subscribers. +/// A publisher can choose how it sends its message. +/// +/// - With [Publisher::publish] the publisher has to wait until there is space in the internal message queue. +/// - With [Publisher::publish_immediate] the publisher doesn't await and instead lets the oldest message +/// in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive +/// an error to indicate that it has lagged. +pub struct PubSubChannel { + inner: Mutex>>, +} + +impl PubSubChannel { + /// Create a new channel + pub const fn new() -> Self { + Self { + inner: Mutex::const_new(M::INIT, RefCell::new(PubSubState::new())), + } + } + + /// Create a new subscriber. It will only receive messages that are published after its creation. + /// + /// If there are no subscriber slots left, an error will be returned. + pub fn subscriber(&self) -> Result, Error> { + self.inner.lock(|inner| { + let mut s = inner.borrow_mut(); + + // Search for an empty subscriber spot + for (i, sub_spot) in s.subscriber_wakers.iter_mut().enumerate() { + if sub_spot.is_none() { + // We've found a spot, so now fill it and create the subscriber + *sub_spot = Some(WakerRegistration::new()); + return Ok(Subscriber { + subscriber_index: i, + next_message_id: s.next_message_id, + channel: self, + }); + } + } + + // No spot was found, we're full + Err(Error::MaximumSubscribersReached) + }) + } + + /// Create a new publisher + /// + /// If there are no publisher slots left, an error will be returned. + pub fn publisher(&self) -> Result, Error> { + self.inner.lock(|inner| { + let mut s = inner.borrow_mut(); + + // Search for an empty publisher spot + for (i, pub_spot) in s.publisher_wakers.iter_mut().enumerate() { + if pub_spot.is_none() { + // We've found a spot, so now fill it and create the subscriber + *pub_spot = Some(WakerRegistration::new()); + return Ok(Publisher { + publisher_index: i, + channel: self, + }); + } + } + + // No spot was found, we're full + Err(Error::MaximumPublishersReached) + }) + } + + /// Create a new publisher that can only send immediate messages. + /// This kind of publisher does not take up a publisher slot. + pub fn immediate_publisher(&self) -> ImmediatePublisher<'_, T> { + ImmediatePublisher { channel: self } + } +} + +impl PubSubBehavior + for PubSubChannel +{ + fn try_publish(&self, message: T) -> Result<(), T> { + self.inner.lock(|inner| { + let mut s = inner.borrow_mut(); + + let active_subscriber_count = s.subscriber_wakers.iter().flatten().count(); + + if active_subscriber_count == 0 { + // We don't need to publish anything because there is no one to receive it + return Ok(()); + } + + if s.queue.is_full() { + return Err(message); + } + // We just did a check for this + unsafe { + s.queue.push_back_unchecked((message, active_subscriber_count)); + } + + s.next_message_id += 1; + + // Wake all of the subscribers + for active_subscriber in s.subscriber_wakers.iter_mut().flatten() { + active_subscriber.wake() + } + + Ok(()) + }) + } + + fn publish_immediate(&self, message: T) { + self.inner.lock(|inner| { + let mut s = inner.borrow_mut(); + + // Make space in the queue if required + if s.queue.is_full() { + s.queue.pop_front(); + } + + // We are going to call something is Self again. + // The lock is fine, but we need to get rid of the refcell borrow + drop(s); + + // This will succeed because we made sure there is space + unsafe { self.try_publish(message).unwrap_unchecked() }; + }); + } + + fn get_message(&self, message_id: u64) -> Option> { + self.inner.lock(|inner| { + let mut s = inner.borrow_mut(); + + let start_id = s.next_message_id - s.queue.len() as u64; + + if message_id < start_id { + return Some(WaitResult::Lagged(start_id - message_id)); + } + + let current_message_index = (message_id - start_id) as usize; + + if current_message_index >= s.queue.len() { + return None; + } + + // We've checked that the index is valid + unsafe { + let queue_item = s.queue.iter_mut().nth(current_message_index).unwrap_unchecked(); + + // We're reading this item, so decrement the counter + queue_item.1 -= 1; + let message = queue_item.0.clone(); + + if current_message_index == 0 && queue_item.1 == 0 { + s.queue.pop_front(); + s.publisher_wakers.iter_mut().flatten().for_each(|w| w.wake()); + } + + Some(WaitResult::Message(message)) + } + }) + } + + unsafe fn register_subscriber_waker(&self, subscriber_index: usize, waker: &Waker) { + self.inner.lock(|inner| { + let mut s = inner.borrow_mut(); + s.subscriber_wakers + .get_unchecked_mut(subscriber_index) + .as_mut() + .unwrap_unchecked() + .register(waker); + }) + } + + unsafe fn register_publisher_waker(&self, publisher_index: usize, waker: &Waker) { + self.inner.lock(|inner| { + let mut s = inner.borrow_mut(); + s.publisher_wakers + .get_unchecked_mut(publisher_index) + .as_mut() + .unwrap_unchecked() + .register(waker); + }) + } + + unsafe fn unregister_subscriber(&self, subscriber_index: usize, subscriber_next_message_id: u64) { + self.inner.lock(|inner| { + let mut s = inner.borrow_mut(); + + // Remove the subscriber from the wakers + *s.subscriber_wakers.get_unchecked_mut(subscriber_index) = None; + + // All messages that haven't been read yet by this subscriber must have their counter decremented + let start_id = s.next_message_id - s.queue.len() as u64; + if subscriber_next_message_id >= start_id { + let current_message_index = (subscriber_next_message_id - start_id) as usize; + s.queue + .iter_mut() + .skip(current_message_index) + .for_each(|(_, counter)| *counter -= 1); + } + }) + } + + unsafe fn unregister_publisher(&self, publisher_index: usize) { + self.inner.lock(|inner| { + let mut s = inner.borrow_mut(); + // Remove the publisher from the wakers + *s.publisher_wakers.get_unchecked_mut(publisher_index) = None; + }) + } +} + +/// Internal state for the PubSub channel +struct PubSubState { + /// The queue contains the last messages that have been published and a countdown of how many subscribers are yet to read it + queue: Deque<(T, usize), CAP>, + /// Every message has an id. + /// Don't worry, we won't run out. + /// If a million messages were published every second, then the ID's would run out in about 584942 years. + next_message_id: u64, + /// Collection of wakers for Subscribers that are waiting. + /// The [Subscriber::subscriber_index] field indexes into this array. + subscriber_wakers: [Option; SUBS], + /// Collection of wakers for Publishers that are waiting. + /// The [Publisher::publisher_index] field indexes into this array. + publisher_wakers: [Option; PUBS], +} + +impl PubSubState { + /// Create a new internal channel state + const fn new() -> Self { + const WAKER_INIT: Option = None; + Self { + queue: Deque::new(), + next_message_id: 0, + subscriber_wakers: [WAKER_INIT; SUBS], + publisher_wakers: [WAKER_INIT; PUBS], + } + } +} + +/// A subscriber to a channel +/// +/// This instance carries a reference to the channel, but uses a trait object for it so that the channel's +/// generics are erased on this subscriber +pub struct Subscriber<'a, T: Clone> { + /// Our index into the channel + subscriber_index: usize, + /// The message id of the next message we are yet to receive + next_message_id: u64, + /// The channel we are a subscriber to + channel: &'a dyn PubSubBehavior, +} + +impl<'a, T: Clone> Subscriber<'a, T> { + /// Wait for a published message + pub fn wait<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, T> { + SubscriberWaitFuture { subscriber: self } + } + + /// Try to see if there's a published message we haven't received yet. + /// + /// This function does not peek. The message is received if there is one. + pub fn check(&mut self) -> Option> { + match self.channel.get_message(self.next_message_id) { + Some(WaitResult::Lagged(amount)) => { + self.next_message_id += amount; + Some(WaitResult::Lagged(amount)) + } + result => { + self.next_message_id += 1; + result + } + } + } +} + +impl<'a, T: Clone> Drop for Subscriber<'a, T> { + fn drop(&mut self) { + unsafe { + self.channel + .unregister_subscriber(self.subscriber_index, self.next_message_id) + } + } +} + +/// A publisher to a channel +/// +/// This instance carries a reference to the channel, but uses a trait object for it so that the channel's +/// generics are erased on this subscriber +pub struct Publisher<'a, T: Clone> { + /// Our index into the channel + publisher_index: usize, + /// The channel we are a publisher for + channel: &'a dyn PubSubBehavior, +} + +impl<'a, T: Clone> Publisher<'a, T> { + /// Publish a message right now even when the queue is full. + /// This may cause a subscriber to miss an older message. + pub fn publish_immediate(&self, message: T) { + self.channel.publish_immediate(message) + } + + /// Publish a message. But if the message queue is full, wait for all subscribers to have read the last message + pub fn publish<'s>(&'s self, message: T) -> PublisherWaitFuture<'s, 'a, T> { + PublisherWaitFuture { + message: Some(message), + publisher: self, + } + } + + /// Publish a message if there is space in the message queue + pub fn try_publish(&self, message: T) -> Result<(), T> { + self.channel.try_publish(message) + } +} + +impl<'a, T: Clone> Drop for Publisher<'a, T> { + fn drop(&mut self) { + unsafe { self.channel.unregister_publisher(self.publisher_index) } + } +} + +/// A publisher that can only use the `publish_immediate` function, but it doesn't have to be registered with the channel. +/// (So an infinite amount is possible) +pub struct ImmediatePublisher<'a, T: Clone> { + /// The channel we are a publisher for + channel: &'a dyn PubSubBehavior, +} + +impl<'a, T: Clone> ImmediatePublisher<'a, T> { + /// Publish the message right now even when the queue is full. + /// This may cause a subscriber to miss an older message. + pub fn publish_immediate(&mut self, message: T) { + self.channel.publish_immediate(message) + } + + /// Publish a message if there is space in the message queue + pub fn try_publish(&self, message: T) -> Result<(), T> { + self.channel.try_publish(message) + } + +} + +/// Error type for the [PubSubChannel] +#[derive(Debug, PartialEq, Clone)] +pub enum Error { + /// All subscriber slots are used. To add another subscriber, first another subscriber must be dropped or + /// the capacity of the channels must be increased. + MaximumSubscribersReached, + /// All publisher slots are used. To add another publisher, first another publisher must be dropped or + /// the capacity of the channels must be increased. + MaximumPublishersReached, +} + +trait PubSubBehavior { + /// Try to publish a message. If the queue is full it won't succeed + fn try_publish(&self, message: T) -> Result<(), T>; + /// Publish a message immediately. If the queue is full, just throw out the oldest one. + fn publish_immediate(&self, message: T); + /// Tries to read the message if available + fn get_message(&self, message_id: u64) -> Option>; + /// Register the given waker for the given subscriber. + /// + /// ## Safety + /// + /// The subscriber index must be of a valid and active subscriber + unsafe fn register_subscriber_waker(&self, subscriber_index: usize, waker: &Waker); + /// Register the given waker for the given publisher. + /// + /// ## Safety + /// + /// The subscriber index must be of a valid and active publisher + unsafe fn register_publisher_waker(&self, publisher_index: usize, waker: &Waker); + /// Make the channel forget the subscriber. + /// + /// ## Safety + /// + /// The subscriber index must be of a valid and active subscriber which must not be used again + /// unless a new subscriber takes on that index. + unsafe fn unregister_subscriber(&self, subscriber_index: usize, subscriber_next_message_id: u64); + /// Make the channel forget the publisher. + /// + /// ## Safety + /// + /// The publisher index must be of a valid and active publisher which must not be used again + /// unless a new publisher takes on that index. + unsafe fn unregister_publisher(&self, publisher_index: usize); +} + +/// Future for the subscriber wait action +pub struct SubscriberWaitFuture<'s, 'a, T: Clone> { + subscriber: &'s mut Subscriber<'a, T>, +} + +impl<'s, 'a, T: Clone> Future for SubscriberWaitFuture<'s, 'a, T> { + type Output = WaitResult; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // Check if we can read a message + match self.subscriber.channel.get_message(self.subscriber.next_message_id) { + // Yes, so we are done polling + Some(WaitResult::Message(message)) => { + self.subscriber.next_message_id += 1; + Poll::Ready(WaitResult::Message(message)) + } + // No, so we need to reregister our waker and sleep again + None => { + unsafe { + self.subscriber + .channel + .register_subscriber_waker(self.subscriber.subscriber_index, cx.waker()); + } + Poll::Pending + } + // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged + Some(WaitResult::Lagged(amount)) => { + self.subscriber.next_message_id += amount; + Poll::Ready(WaitResult::Lagged(amount)) + } + } + } +} + +/// Future for the publisher wait action +pub struct PublisherWaitFuture<'s, 'a, T: Clone> { + /// The message we need to publish + message: Option, + publisher: &'s Publisher<'a, T>, +} + +impl<'s, 'a, T: Clone> Future for PublisherWaitFuture<'s, 'a, T> { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = unsafe { self.get_unchecked_mut() }; + + // Try to publish the message + match this.publisher.channel.try_publish(this.message.take().unwrap()) { + // We did it, we are ready + Ok(()) => Poll::Ready(()), + // The queue is full, so we need to reregister our waker and go to sleep + Err(message) => { + this.message = Some(message); + unsafe { + this.publisher + .channel + .register_publisher_waker(this.publisher.publisher_index, cx.waker()); + } + Poll::Pending + } + } + } +} + +/// The result of the subscriber wait procedure +#[derive(Debug, Clone, PartialEq)] +pub enum WaitResult { + /// The subscriber did not receive all messages and lagged by the given amount of messages. + /// (This is the amount of messages that were missed) + Lagged(u64), + /// A message was received + Message(T), +} + +#[cfg(test)] +mod tests { + use crate::blocking_mutex::raw::NoopRawMutex; + use super::*; + + #[futures_test::test] + async fn all_subscribers_receive() { + let channel = PubSubChannel::::new(); + + let mut sub0 = channel.subscriber().unwrap(); + let mut sub1 = channel.subscriber().unwrap(); + let pub0 = channel.publisher().unwrap(); + + pub0.publish(42).await; + + assert_eq!(sub0.wait().await, WaitResult::Message(42)); + assert_eq!(sub1.wait().await, WaitResult::Message(42)); + + assert_eq!(sub0.check(), None); + assert_eq!(sub1.check(), None); + } + + #[futures_test::test] + async fn lag_when_queue_full_on_immediate_publish() { + let channel = PubSubChannel::::new(); + + let mut sub0 = channel.subscriber().unwrap(); + let pub0 = channel.publisher().unwrap(); + + pub0.publish_immediate(42); + pub0.publish_immediate(43); + pub0.publish_immediate(44); + pub0.publish_immediate(45); + pub0.publish_immediate(46); + pub0.publish_immediate(47); + + assert_eq!(sub0.check(), Some(WaitResult::Lagged(2))); + assert_eq!(sub0.wait().await, WaitResult::Message(44)); + assert_eq!(sub0.wait().await, WaitResult::Message(45)); + assert_eq!(sub0.wait().await, WaitResult::Message(46)); + assert_eq!(sub0.wait().await, WaitResult::Message(47)); + assert_eq!(sub0.check(), None); + } + + #[test] + fn limited_subs_and_pubs() { + let channel = PubSubChannel::::new(); + + let sub0 = channel.subscriber(); + let sub1 = channel.subscriber(); + let sub2 = channel.subscriber(); + let sub3 = channel.subscriber(); + let sub4 = channel.subscriber(); + + assert!(sub0.is_ok()); + assert!(sub1.is_ok()); + assert!(sub2.is_ok()); + assert!(sub3.is_ok()); + assert_eq!(sub4.err().unwrap(), Error::MaximumSubscribersReached); + + drop(sub0); + + let sub5 = channel.subscriber(); + assert!(sub5.is_ok()); + + // publishers + + let pub0 = channel.publisher(); + let pub1 = channel.publisher(); + let pub2 = channel.publisher(); + let pub3 = channel.publisher(); + let pub4 = channel.publisher(); + + assert!(pub0.is_ok()); + assert!(pub1.is_ok()); + assert!(pub2.is_ok()); + assert!(pub3.is_ok()); + assert_eq!(pub4.err().unwrap(), Error::MaximumPublishersReached); + + drop(pub0); + + let pub5 = channel.publisher(); + assert!(pub5.is_ok()); + } + + #[test] + fn publisher_wait_on_full_queue() { + let channel = PubSubChannel::::new(); + + let pub0 = channel.publisher().unwrap(); + + // There are no subscribers, so the queue will never be full + assert_eq!(pub0.try_publish(0), Ok(())); + assert_eq!(pub0.try_publish(0), Ok(())); + assert_eq!(pub0.try_publish(0), Ok(())); + assert_eq!(pub0.try_publish(0), Ok(())); + assert_eq!(pub0.try_publish(0), Ok(())); + + let sub0 = channel.subscriber().unwrap(); + + assert_eq!(pub0.try_publish(0), Ok(())); + assert_eq!(pub0.try_publish(0), Ok(())); + assert_eq!(pub0.try_publish(0), Ok(())); + assert_eq!(pub0.try_publish(0), Ok(())); + assert_eq!(pub0.try_publish(0), Err(0)); + + drop(sub0); + } + +} From 03996583a1811fd6926d0f5e4115b6165cbceda0 Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Thu, 16 Jun 2022 12:36:39 +0200 Subject: [PATCH 02/18] fmt --- embassy/src/channel/pubsub.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/embassy/src/channel/pubsub.rs b/embassy/src/channel/pubsub.rs index 5e5cce9c..fb8d0ef5 100644 --- a/embassy/src/channel/pubsub.rs +++ b/embassy/src/channel/pubsub.rs @@ -25,7 +25,9 @@ pub struct PubSubChannel>>, } -impl PubSubChannel { +impl + PubSubChannel +{ /// Create a new channel pub const fn new() -> Self { Self { @@ -134,7 +136,7 @@ impl ImmediatePublisher<'a, T> { pub fn try_publish(&self, message: T) -> Result<(), T> { self.channel.try_publish(message) } - } /// Error type for the [PubSubChannel] @@ -480,8 +481,8 @@ pub enum WaitResult { #[cfg(test)] mod tests { - use crate::blocking_mutex::raw::NoopRawMutex; use super::*; + use crate::blocking_mutex::raw::NoopRawMutex; #[futures_test::test] async fn all_subscribers_receive() { @@ -586,5 +587,4 @@ mod tests { drop(sub0); } - } From 36b363a5b71497d32d2968fcecf046773822c2e9 Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Thu, 16 Jun 2022 13:48:26 +0200 Subject: [PATCH 03/18] Changed names of subscriber methods and implemented the Stream trait for it --- embassy/src/channel/pubsub.rs | 55 +++++++++++++++++++++++++++-------- 1 file changed, 43 insertions(+), 12 deletions(-) diff --git a/embassy/src/channel/pubsub.rs b/embassy/src/channel/pubsub.rs index fb8d0ef5..5d81431e 100644 --- a/embassy/src/channel/pubsub.rs +++ b/embassy/src/channel/pubsub.rs @@ -270,14 +270,14 @@ pub struct Subscriber<'a, T: Clone> { impl<'a, T: Clone> Subscriber<'a, T> { /// Wait for a published message - pub fn wait<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, T> { + pub fn next<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, T> { SubscriberWaitFuture { subscriber: self } } /// Try to see if there's a published message we haven't received yet. /// /// This function does not peek. The message is received if there is one. - pub fn check(&mut self) -> Option> { + pub fn try_next(&mut self) -> Option> { match self.channel.get_message(self.next_message_id) { Some(WaitResult::Lagged(amount)) => { self.next_message_id += amount; @@ -300,6 +300,37 @@ impl<'a, T: Clone> Drop for Subscriber<'a, T> { } } +impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> { + type Item = WaitResult; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = unsafe { self.get_unchecked_mut() }; + + // Check if we can read a message + match this.channel.get_message(this.next_message_id) { + // Yes, so we are done polling + Some(WaitResult::Message(message)) => { + this.next_message_id += 1; + Poll::Ready(Some(WaitResult::Message(message))) + } + // No, so we need to reregister our waker and sleep again + None => { + unsafe { + this + .channel + .register_subscriber_waker(this.subscriber_index, cx.waker()); + } + Poll::Pending + } + // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged + Some(WaitResult::Lagged(amount)) => { + this.next_message_id += amount; + Poll::Ready(Some(WaitResult::Lagged(amount))) + } + } + } +} + /// A publisher to a channel /// /// This instance carries a reference to the channel, but uses a trait object for it so that the channel's @@ -494,11 +525,11 @@ mod tests { pub0.publish(42).await; - assert_eq!(sub0.wait().await, WaitResult::Message(42)); - assert_eq!(sub1.wait().await, WaitResult::Message(42)); + assert_eq!(sub0.next().await, WaitResult::Message(42)); + assert_eq!(sub1.next().await, WaitResult::Message(42)); - assert_eq!(sub0.check(), None); - assert_eq!(sub1.check(), None); + assert_eq!(sub0.try_next(), None); + assert_eq!(sub1.try_next(), None); } #[futures_test::test] @@ -515,12 +546,12 @@ mod tests { pub0.publish_immediate(46); pub0.publish_immediate(47); - assert_eq!(sub0.check(), Some(WaitResult::Lagged(2))); - assert_eq!(sub0.wait().await, WaitResult::Message(44)); - assert_eq!(sub0.wait().await, WaitResult::Message(45)); - assert_eq!(sub0.wait().await, WaitResult::Message(46)); - assert_eq!(sub0.wait().await, WaitResult::Message(47)); - assert_eq!(sub0.check(), None); + assert_eq!(sub0.try_next(), Some(WaitResult::Lagged(2))); + assert_eq!(sub0.next().await, WaitResult::Message(44)); + assert_eq!(sub0.next().await, WaitResult::Message(45)); + assert_eq!(sub0.next().await, WaitResult::Message(46)); + assert_eq!(sub0.next().await, WaitResult::Message(47)); + assert_eq!(sub0.try_next(), None); } #[test] From 98d0bb726c76c522e329e9a29f314e3d8c194b1d Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Thu, 16 Jun 2022 13:51:35 +0200 Subject: [PATCH 04/18] fmt --- embassy/src/channel/pubsub.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/embassy/src/channel/pubsub.rs b/embassy/src/channel/pubsub.rs index 5d81431e..577f7902 100644 --- a/embassy/src/channel/pubsub.rs +++ b/embassy/src/channel/pubsub.rs @@ -316,8 +316,7 @@ impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> { // No, so we need to reregister our waker and sleep again None => { unsafe { - this - .channel + this.channel .register_subscriber_waker(this.subscriber_index, cx.waker()); } Poll::Pending From c7cdecfc937185c9a8120ee1232c04cdedcab66d Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Thu, 16 Jun 2022 14:07:04 +0200 Subject: [PATCH 05/18] Renamed subscriber messages --- embassy/src/channel/pubsub.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/embassy/src/channel/pubsub.rs b/embassy/src/channel/pubsub.rs index 577f7902..d81e822a 100644 --- a/embassy/src/channel/pubsub.rs +++ b/embassy/src/channel/pubsub.rs @@ -270,14 +270,14 @@ pub struct Subscriber<'a, T: Clone> { impl<'a, T: Clone> Subscriber<'a, T> { /// Wait for a published message - pub fn next<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, T> { + pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, T> { SubscriberWaitFuture { subscriber: self } } /// Try to see if there's a published message we haven't received yet. /// /// This function does not peek. The message is received if there is one. - pub fn try_next(&mut self) -> Option> { + pub fn try_next_message(&mut self) -> Option> { match self.channel.get_message(self.next_message_id) { Some(WaitResult::Lagged(amount)) => { self.next_message_id += amount; @@ -524,11 +524,11 @@ mod tests { pub0.publish(42).await; - assert_eq!(sub0.next().await, WaitResult::Message(42)); - assert_eq!(sub1.next().await, WaitResult::Message(42)); + assert_eq!(sub0.next_message().await, WaitResult::Message(42)); + assert_eq!(sub1.next_message().await, WaitResult::Message(42)); - assert_eq!(sub0.try_next(), None); - assert_eq!(sub1.try_next(), None); + assert_eq!(sub0.try_next_message(), None); + assert_eq!(sub1.try_next_message(), None); } #[futures_test::test] @@ -545,12 +545,12 @@ mod tests { pub0.publish_immediate(46); pub0.publish_immediate(47); - assert_eq!(sub0.try_next(), Some(WaitResult::Lagged(2))); - assert_eq!(sub0.next().await, WaitResult::Message(44)); - assert_eq!(sub0.next().await, WaitResult::Message(45)); - assert_eq!(sub0.next().await, WaitResult::Message(46)); - assert_eq!(sub0.next().await, WaitResult::Message(47)); - assert_eq!(sub0.try_next(), None); + assert_eq!(sub0.try_next_message(), Some(WaitResult::Lagged(2))); + assert_eq!(sub0.next_message().await, WaitResult::Message(44)); + assert_eq!(sub0.next_message().await, WaitResult::Message(45)); + assert_eq!(sub0.next_message().await, WaitResult::Message(46)); + assert_eq!(sub0.next_message().await, WaitResult::Message(47)); + assert_eq!(sub0.try_next_message(), None); } #[test] From 790426e2f67ac5e2197139c354c3c201d098a59c Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Thu, 16 Jun 2022 14:11:41 +0200 Subject: [PATCH 06/18] Stream now ignores lag --- embassy/src/channel/pubsub.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/embassy/src/channel/pubsub.rs b/embassy/src/channel/pubsub.rs index d81e822a..4c360fea 100644 --- a/embassy/src/channel/pubsub.rs +++ b/embassy/src/channel/pubsub.rs @@ -300,8 +300,10 @@ impl<'a, T: Clone> Drop for Subscriber<'a, T> { } } +/// Warning: The stream implementation ignores lag results and returns all messages. +/// This might miss some messages without you knowing it. impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> { - type Item = WaitResult; + type Item = T; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = unsafe { self.get_unchecked_mut() }; @@ -311,7 +313,7 @@ impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> { // Yes, so we are done polling Some(WaitResult::Message(message)) => { this.next_message_id += 1; - Poll::Ready(Some(WaitResult::Message(message))) + Poll::Ready(Some(message)) } // No, so we need to reregister our waker and sleep again None => { @@ -321,10 +323,12 @@ impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> { } Poll::Pending } - // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged + // We missed a couple of messages. We must do our internal bookkeeping. + // This stream impl doesn't return lag results, so we just ignore and start over Some(WaitResult::Lagged(amount)) => { this.next_message_id += amount; - Poll::Ready(Some(WaitResult::Lagged(amount))) + cx.waker().wake_by_ref(); + Poll::Pending } } } From f92f46f489827ad9518d42c6bfa6d55fc6145bcf Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Thu, 16 Jun 2022 14:19:16 +0200 Subject: [PATCH 07/18] Added convenience methods that ignore lag --- embassy/src/channel/pubsub.rs | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/embassy/src/channel/pubsub.rs b/embassy/src/channel/pubsub.rs index 4c360fea..02122580 100644 --- a/embassy/src/channel/pubsub.rs +++ b/embassy/src/channel/pubsub.rs @@ -274,6 +274,16 @@ impl<'a, T: Clone> Subscriber<'a, T> { SubscriberWaitFuture { subscriber: self } } + /// Wait for a published message (ignoring lag results) + pub async fn next_message_pure(&mut self) -> T { + loop { + match self.next_message().await { + WaitResult::Lagged(_) => continue, + WaitResult::Message(message) => break message, + } + } + } + /// Try to see if there's a published message we haven't received yet. /// /// This function does not peek. The message is received if there is one. @@ -289,6 +299,19 @@ impl<'a, T: Clone> Subscriber<'a, T> { } } } + + /// Try to see if there's a published message we haven't received yet (ignoring lag results). + /// + /// This function does not peek. The message is received if there is one. + pub fn try_next_message_pure(&mut self) -> Option { + loop { + match self.try_next_message() { + Some(WaitResult::Lagged(_)) => continue, + Some(WaitResult::Message(message)) => break Some(message), + None => break None, + } + } + } } impl<'a, T: Clone> Drop for Subscriber<'a, T> { From cdacc44c5f00276b34f57c1a3db84fa1f429edd7 Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Thu, 16 Jun 2022 16:37:23 +0200 Subject: [PATCH 08/18] Added unpin impls to the futures --- embassy/src/channel/pubsub.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/embassy/src/channel/pubsub.rs b/embassy/src/channel/pubsub.rs index 02122580..ea0ccb2d 100644 --- a/embassy/src/channel/pubsub.rs +++ b/embassy/src/channel/pubsub.rs @@ -495,6 +495,8 @@ impl<'s, 'a, T: Clone> Future for SubscriberWaitFuture<'s, 'a, T> { } } +impl<'s, 'a, T: Clone> Unpin for SubscriberWaitFuture<'s, 'a, T> {} + /// Future for the publisher wait action pub struct PublisherWaitFuture<'s, 'a, T: Clone> { /// The message we need to publish @@ -526,6 +528,8 @@ impl<'s, 'a, T: Clone> Future for PublisherWaitFuture<'s, 'a, T> { } } +impl<'s, 'a, T: Clone> Unpin for PublisherWaitFuture<'s, 'a, T> {} + /// The result of the subscriber wait procedure #[derive(Debug, Clone, PartialEq)] pub enum WaitResult { From dfde157337b379ff0805cfe4aa5463c078ca1d41 Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Thu, 16 Jun 2022 20:57:11 +0200 Subject: [PATCH 09/18] Removed most unsafe code --- embassy/src/channel/pubsub.rs | 106 +++++++++++----------------------- 1 file changed, 34 insertions(+), 72 deletions(-) diff --git a/embassy/src/channel/pubsub.rs b/embassy/src/channel/pubsub.rs index ea0ccb2d..20878187 100644 --- a/embassy/src/channel/pubsub.rs +++ b/embassy/src/channel/pubsub.rs @@ -109,9 +109,7 @@ impl Subscriber<'a, T> { impl<'a, T: Clone> Drop for Subscriber<'a, T> { fn drop(&mut self) { - unsafe { - self.channel - .unregister_subscriber(self.subscriber_index, self.next_message_id) - } + self.channel + .unregister_subscriber(self.subscriber_index, self.next_message_id) } } @@ -340,10 +326,8 @@ impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> { } // No, so we need to reregister our waker and sleep again None => { - unsafe { - this.channel - .register_subscriber_waker(this.subscriber_index, cx.waker()); - } + this.channel + .register_subscriber_waker(this.subscriber_index, cx.waker()); Poll::Pending } // We missed a couple of messages. We must do our internal bookkeeping. @@ -391,7 +375,7 @@ impl<'a, T: Clone> Publisher<'a, T> { impl<'a, T: Clone> Drop for Publisher<'a, T> { fn drop(&mut self) { - unsafe { self.channel.unregister_publisher(self.publisher_index) } + self.channel.unregister_publisher(self.publisher_index) } } @@ -434,31 +418,13 @@ trait PubSubBehavior { /// Tries to read the message if available fn get_message(&self, message_id: u64) -> Option>; /// Register the given waker for the given subscriber. - /// - /// ## Safety - /// - /// The subscriber index must be of a valid and active subscriber - unsafe fn register_subscriber_waker(&self, subscriber_index: usize, waker: &Waker); + fn register_subscriber_waker(&self, subscriber_index: usize, waker: &Waker); /// Register the given waker for the given publisher. - /// - /// ## Safety - /// - /// The subscriber index must be of a valid and active publisher - unsafe fn register_publisher_waker(&self, publisher_index: usize, waker: &Waker); + fn register_publisher_waker(&self, publisher_index: usize, waker: &Waker); /// Make the channel forget the subscriber. - /// - /// ## Safety - /// - /// The subscriber index must be of a valid and active subscriber which must not be used again - /// unless a new subscriber takes on that index. - unsafe fn unregister_subscriber(&self, subscriber_index: usize, subscriber_next_message_id: u64); + fn unregister_subscriber(&self, subscriber_index: usize, subscriber_next_message_id: u64); /// Make the channel forget the publisher. - /// - /// ## Safety - /// - /// The publisher index must be of a valid and active publisher which must not be used again - /// unless a new publisher takes on that index. - unsafe fn unregister_publisher(&self, publisher_index: usize); + fn unregister_publisher(&self, publisher_index: usize); } /// Future for the subscriber wait action @@ -479,11 +445,9 @@ impl<'s, 'a, T: Clone> Future for SubscriberWaitFuture<'s, 'a, T> { } // No, so we need to reregister our waker and sleep again None => { - unsafe { - self.subscriber - .channel - .register_subscriber_waker(self.subscriber.subscriber_index, cx.waker()); - } + self.subscriber + .channel + .register_subscriber_waker(self.subscriber.subscriber_index, cx.waker()); Poll::Pending } // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged @@ -517,11 +481,9 @@ impl<'s, 'a, T: Clone> Future for PublisherWaitFuture<'s, 'a, T> { // The queue is full, so we need to reregister our waker and go to sleep Err(message) => { this.message = Some(message); - unsafe { - this.publisher - .channel - .register_publisher_waker(this.publisher.publisher_index, cx.waker()); - } + this.publisher + .channel + .register_publisher_waker(this.publisher.publisher_index, cx.waker()); Poll::Pending } } From a614a55c7ddecc171d48c61bf9fa8c6c11ed16f4 Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Thu, 16 Jun 2022 22:11:29 +0200 Subject: [PATCH 10/18] Put most behaviour one level lower (under the mutex instead of above). Changed the PubSubBehavior to only have high level functions. --- embassy/src/channel/pubsub.rs | 347 ++++++++++++++++++---------------- 1 file changed, 181 insertions(+), 166 deletions(-) diff --git a/embassy/src/channel/pubsub.rs b/embassy/src/channel/pubsub.rs index 20878187..c5a8c01f 100644 --- a/embassy/src/channel/pubsub.rs +++ b/embassy/src/channel/pubsub.rs @@ -94,122 +94,74 @@ impl PubSubBehavior for PubSubChannel { - fn try_publish(&self, message: T) -> Result<(), T> { - self.inner.lock(|inner| { - let mut s = inner.borrow_mut(); + fn get_message_with_context( + &self, + next_message_id: &mut u64, + subscriber_index: usize, + cx: Option<&mut Context<'_>>, + ) -> Poll> { + self.inner.lock(|s| { + let mut s = s.borrow_mut(); - let active_subscriber_count = s.subscriber_wakers.iter().flatten().count(); - - if active_subscriber_count == 0 { - // We don't need to publish anything because there is no one to receive it - return Ok(()); + // Check if we can read a message + match s.get_message(*next_message_id) { + // Yes, so we are done polling + Some(WaitResult::Message(message)) => { + *next_message_id += 1; + Poll::Ready(WaitResult::Message(message)) + } + // No, so we need to reregister our waker and sleep again + None => { + if let Some(cx) = cx { + s.register_subscriber_waker(subscriber_index, cx.waker()); + } + Poll::Pending + } + // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged + Some(WaitResult::Lagged(amount)) => { + *next_message_id += amount; + Poll::Ready(WaitResult::Lagged(amount)) + } } + }) + } - if s.queue.is_full() { - return Err(message); + fn publish_with_context(&self, message: T, publisher_index: usize, cx: Option<&mut Context<'_>>) -> Result<(), T> { + self.inner.lock(|s| { + let mut s = s.borrow_mut(); + // Try to publish the message + match s.try_publish(message) { + // We did it, we are ready + Ok(()) => Ok(()), + // The queue is full, so we need to reregister our waker and go to sleep + Err(message) => { + if let Some(cx) = cx { + s.register_publisher_waker(publisher_index, cx.waker()); + } + Err(message) + } } - // We just did a check for this - s.queue.push_back((message, active_subscriber_count)).ok().unwrap(); - - s.next_message_id += 1; - - // Wake all of the subscribers - for active_subscriber in s.subscriber_wakers.iter_mut().flatten() { - active_subscriber.wake() - } - - Ok(()) }) } fn publish_immediate(&self, message: T) { - self.inner.lock(|inner| { - let mut s = inner.borrow_mut(); - - // Make space in the queue if required - if s.queue.is_full() { - s.queue.pop_front(); - } - - // We are going to call something is Self again. - // The lock is fine, but we need to get rid of the refcell borrow - drop(s); - - // This will succeed because we made sure there is space - self.try_publish(message).ok().unwrap(); - }); - } - - fn get_message(&self, message_id: u64) -> Option> { - self.inner.lock(|inner| { - let mut s = inner.borrow_mut(); - - let start_id = s.next_message_id - s.queue.len() as u64; - - if message_id < start_id { - return Some(WaitResult::Lagged(start_id - message_id)); - } - - let current_message_index = (message_id - start_id) as usize; - - if current_message_index >= s.queue.len() { - return None; - } - - // We've checked that the index is valid - let queue_item = s.queue.iter_mut().nth(current_message_index).unwrap(); - - // We're reading this item, so decrement the counter - queue_item.1 -= 1; - let message = queue_item.0.clone(); - - if current_message_index == 0 && queue_item.1 == 0 { - s.queue.pop_front(); - s.publisher_wakers.iter_mut().flatten().for_each(|w| w.wake()); - } - - Some(WaitResult::Message(message)) - }) - } - - fn register_subscriber_waker(&self, subscriber_index: usize, waker: &Waker) { - self.inner.lock(|inner| { - let mut s = inner.borrow_mut(); - s.subscriber_wakers[subscriber_index].as_mut().unwrap().register(waker); - }) - } - - fn register_publisher_waker(&self, publisher_index: usize, waker: &Waker) { - self.inner.lock(|inner| { - let mut s = inner.borrow_mut(); - s.publisher_wakers[publisher_index].as_mut().unwrap().register(waker); + self.inner.lock(|s| { + let mut s = s.borrow_mut(); + s.publish_immediate(message) }) } fn unregister_subscriber(&self, subscriber_index: usize, subscriber_next_message_id: u64) { - self.inner.lock(|inner| { - let mut s = inner.borrow_mut(); - - // Remove the subscriber from the wakers - s.subscriber_wakers[subscriber_index] = None; - - // All messages that haven't been read yet by this subscriber must have their counter decremented - let start_id = s.next_message_id - s.queue.len() as u64; - if subscriber_next_message_id >= start_id { - let current_message_index = (subscriber_next_message_id - start_id) as usize; - s.queue - .iter_mut() - .skip(current_message_index) - .for_each(|(_, counter)| *counter -= 1); - } + self.inner.lock(|s| { + let mut s = s.borrow_mut(); + s.unregister_subscriber(subscriber_index, subscriber_next_message_id) }) } fn unregister_publisher(&self, publisher_index: usize) { - self.inner.lock(|inner| { - let mut s = inner.borrow_mut(); - // Remove the publisher from the wakers - s.publisher_wakers[publisher_index] = None; + self.inner.lock(|s| { + let mut s = s.borrow_mut(); + s.unregister_publisher(publisher_index) }) } } @@ -241,6 +193,99 @@ impl PubSubSta publisher_wakers: [WAKER_INIT; PUBS], } } + + fn try_publish(&mut self, message: T) -> Result<(), T> { + let active_subscriber_count = self.subscriber_wakers.iter().flatten().count(); + + if active_subscriber_count == 0 { + // We don't need to publish anything because there is no one to receive it + return Ok(()); + } + + if self.queue.is_full() { + return Err(message); + } + // We just did a check for this + self.queue.push_back((message, active_subscriber_count)).ok().unwrap(); + + self.next_message_id += 1; + + // Wake all of the subscribers + for active_subscriber in self.subscriber_wakers.iter_mut().flatten() { + active_subscriber.wake() + } + + Ok(()) + } + + fn publish_immediate(&mut self, message: T) { + // Make space in the queue if required + if self.queue.is_full() { + self.queue.pop_front(); + } + + // This will succeed because we made sure there is space + self.try_publish(message).ok().unwrap(); + } + + fn get_message(&mut self, message_id: u64) -> Option> { + let start_id = self.next_message_id - self.queue.len() as u64; + + if message_id < start_id { + return Some(WaitResult::Lagged(start_id - message_id)); + } + + let current_message_index = (message_id - start_id) as usize; + + if current_message_index >= self.queue.len() { + return None; + } + + // We've checked that the index is valid + let queue_item = self.queue.iter_mut().nth(current_message_index).unwrap(); + + // We're reading this item, so decrement the counter + queue_item.1 -= 1; + let message = queue_item.0.clone(); + + if current_message_index == 0 && queue_item.1 == 0 { + self.queue.pop_front(); + self.publisher_wakers.iter_mut().flatten().for_each(|w| w.wake()); + } + + Some(WaitResult::Message(message)) + } + + fn register_subscriber_waker(&mut self, subscriber_index: usize, waker: &Waker) { + self.subscriber_wakers[subscriber_index] + .as_mut() + .unwrap() + .register(waker); + } + + fn register_publisher_waker(&mut self, publisher_index: usize, waker: &Waker) { + self.publisher_wakers[publisher_index].as_mut().unwrap().register(waker); + } + + fn unregister_subscriber(&mut self, subscriber_index: usize, subscriber_next_message_id: u64) { + // Remove the subscriber from the wakers + self.subscriber_wakers[subscriber_index] = None; + + // All messages that haven't been read yet by this subscriber must have their counter decremented + let start_id = self.next_message_id - self.queue.len() as u64; + if subscriber_next_message_id >= start_id { + let current_message_index = (subscriber_next_message_id - start_id) as usize; + self.queue + .iter_mut() + .skip(current_message_index) + .for_each(|(_, counter)| *counter -= 1); + } + } + + fn unregister_publisher(&mut self, publisher_index: usize) { + // Remove the publisher from the wakers + self.publisher_wakers[publisher_index] = None; + } } /// A subscriber to a channel @@ -276,15 +321,12 @@ impl<'a, T: Clone> Subscriber<'a, T> { /// /// This function does not peek. The message is received if there is one. pub fn try_next_message(&mut self) -> Option> { - match self.channel.get_message(self.next_message_id) { - Some(WaitResult::Lagged(amount)) => { - self.next_message_id += amount; - Some(WaitResult::Lagged(amount)) - } - result => { - self.next_message_id += 1; - result - } + match self + .channel + .get_message_with_context(&mut self.next_message_id, self.subscriber_index, None) + { + Poll::Ready(result) => Some(result), + Poll::Pending => None, } } @@ -317,26 +359,16 @@ impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = unsafe { self.get_unchecked_mut() }; - // Check if we can read a message - match this.channel.get_message(this.next_message_id) { - // Yes, so we are done polling - Some(WaitResult::Message(message)) => { - this.next_message_id += 1; - Poll::Ready(Some(message)) - } - // No, so we need to reregister our waker and sleep again - None => { - this.channel - .register_subscriber_waker(this.subscriber_index, cx.waker()); - Poll::Pending - } - // We missed a couple of messages. We must do our internal bookkeeping. - // This stream impl doesn't return lag results, so we just ignore and start over - Some(WaitResult::Lagged(amount)) => { - this.next_message_id += amount; + match this + .channel + .get_message_with_context(&mut this.next_message_id, this.subscriber_index, Some(cx)) + { + Poll::Ready(WaitResult::Message(message)) => Poll::Ready(Some(message)), + Poll::Ready(WaitResult::Lagged(_)) => { cx.waker().wake_by_ref(); Poll::Pending } + Poll::Pending => Poll::Pending, } } } @@ -369,7 +401,7 @@ impl<'a, T: Clone> Publisher<'a, T> { /// Publish a message if there is space in the message queue pub fn try_publish(&self, message: T) -> Result<(), T> { - self.channel.try_publish(message) + self.channel.publish_with_context(message, self.publisher_index, None) } } @@ -395,7 +427,7 @@ impl<'a, T: Clone> ImmediatePublisher<'a, T> { /// Publish a message if there is space in the message queue pub fn try_publish(&self, message: T) -> Result<(), T> { - self.channel.try_publish(message) + self.channel.publish_with_context(message, usize::MAX, None) } } @@ -411,19 +443,19 @@ pub enum Error { } trait PubSubBehavior { - /// Try to publish a message. If the queue is full it won't succeed - fn try_publish(&self, message: T) -> Result<(), T>; - /// Publish a message immediately. If the queue is full, just throw out the oldest one. + fn get_message_with_context( + &self, + next_message_id: &mut u64, + subscriber_index: usize, + cx: Option<&mut Context<'_>>, + ) -> Poll>; + + fn publish_with_context(&self, message: T, publisher_index: usize, cx: Option<&mut Context<'_>>) -> Result<(), T>; + fn publish_immediate(&self, message: T); - /// Tries to read the message if available - fn get_message(&self, message_id: u64) -> Option>; - /// Register the given waker for the given subscriber. - fn register_subscriber_waker(&self, subscriber_index: usize, waker: &Waker); - /// Register the given waker for the given publisher. - fn register_publisher_waker(&self, publisher_index: usize, waker: &Waker); - /// Make the channel forget the subscriber. + fn unregister_subscriber(&self, subscriber_index: usize, subscriber_next_message_id: u64); - /// Make the channel forget the publisher. + fn unregister_publisher(&self, publisher_index: usize); } @@ -436,26 +468,10 @@ impl<'s, 'a, T: Clone> Future for SubscriberWaitFuture<'s, 'a, T> { type Output = WaitResult; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // Check if we can read a message - match self.subscriber.channel.get_message(self.subscriber.next_message_id) { - // Yes, so we are done polling - Some(WaitResult::Message(message)) => { - self.subscriber.next_message_id += 1; - Poll::Ready(WaitResult::Message(message)) - } - // No, so we need to reregister our waker and sleep again - None => { - self.subscriber - .channel - .register_subscriber_waker(self.subscriber.subscriber_index, cx.waker()); - Poll::Pending - } - // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged - Some(WaitResult::Lagged(amount)) => { - self.subscriber.next_message_id += amount; - Poll::Ready(WaitResult::Lagged(amount)) - } - } + let sub_index = self.subscriber.subscriber_index; + self.subscriber + .channel + .get_message_with_context(&mut self.subscriber.next_message_id, sub_index, Some(cx)) } } @@ -474,16 +490,15 @@ impl<'s, 'a, T: Clone> Future for PublisherWaitFuture<'s, 'a, T> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = unsafe { self.get_unchecked_mut() }; - // Try to publish the message - match this.publisher.channel.try_publish(this.message.take().unwrap()) { - // We did it, we are ready + let message = this.message.take().unwrap(); + match this + .publisher + .channel + .publish_with_context(message, this.publisher.publisher_index, Some(cx)) + { Ok(()) => Poll::Ready(()), - // The queue is full, so we need to reregister our waker and go to sleep Err(message) => { this.message = Some(message); - this.publisher - .channel - .register_publisher_waker(this.publisher.publisher_index, cx.waker()); Poll::Pending } } From 2a4cdd05faa40a7ce63f66e45080cdacb5171747 Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Thu, 16 Jun 2022 22:13:26 +0200 Subject: [PATCH 11/18] Removed all unsafe --- embassy/src/channel/pubsub.rs | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/embassy/src/channel/pubsub.rs b/embassy/src/channel/pubsub.rs index c5a8c01f..cc00f47a 100644 --- a/embassy/src/channel/pubsub.rs +++ b/embassy/src/channel/pubsub.rs @@ -356,12 +356,11 @@ impl<'a, T: Clone> Drop for Subscriber<'a, T> { impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> { type Item = T; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = unsafe { self.get_unchecked_mut() }; - - match this + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let sub_index = self.subscriber_index; + match self .channel - .get_message_with_context(&mut this.next_message_id, this.subscriber_index, Some(cx)) + .get_message_with_context(&mut self.next_message_id, sub_index, Some(cx)) { Poll::Ready(WaitResult::Message(message)) => Poll::Ready(Some(message)), Poll::Ready(WaitResult::Lagged(_)) => { @@ -487,18 +486,16 @@ pub struct PublisherWaitFuture<'s, 'a, T: Clone> { impl<'s, 'a, T: Clone> Future for PublisherWaitFuture<'s, 'a, T> { type Output = (); - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = unsafe { self.get_unchecked_mut() }; - - let message = this.message.take().unwrap(); - match this + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let message = self.message.take().unwrap(); + match self .publisher .channel - .publish_with_context(message, this.publisher.publisher_index, Some(cx)) + .publish_with_context(message, self.publisher.publisher_index, Some(cx)) { Ok(()) => Poll::Ready(()), Err(message) => { - this.message = Some(message); + self.message = Some(message); Poll::Pending } } From eb304c244839df742f4a76a4072f13179e397d0b Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Fri, 17 Jun 2022 13:54:34 +0200 Subject: [PATCH 12/18] Added a function to WakerRegistration to check if it's occupied. Created a MultiWakerRegistration that can hold multiple wakers. Got rid of some options and the pub/sub_index --- embassy/src/channel/pubsub.rs | 176 ++++++++++-------------- embassy/src/waitqueue/mod.rs | 3 + embassy/src/waitqueue/multi_waker.rs | 31 +++++ embassy/src/waitqueue/waker.rs | 5 + embassy/src/waitqueue/waker_agnostic.rs | 5 + 5 files changed, 120 insertions(+), 100 deletions(-) create mode 100644 embassy/src/waitqueue/multi_waker.rs diff --git a/embassy/src/channel/pubsub.rs b/embassy/src/channel/pubsub.rs index cc00f47a..41f275c4 100644 --- a/embassy/src/channel/pubsub.rs +++ b/embassy/src/channel/pubsub.rs @@ -10,7 +10,7 @@ use heapless::Deque; use crate::blocking_mutex::raw::RawMutex; use crate::blocking_mutex::Mutex; -use crate::waitqueue::WakerRegistration; +use crate::waitqueue::MultiWakerRegistration; /// A broadcast channel implementation where multiple publishers can send messages to multiple subscribers /// @@ -42,21 +42,15 @@ impl= SUBS { + Err(Error::MaximumSubscribersReached) + } else { + s.subscriber_count += 1; + Ok(Subscriber { + next_message_id: s.next_message_id, + channel: self, + }) } - - // No spot was found, we're full - Err(Error::MaximumSubscribersReached) }) } @@ -67,20 +61,12 @@ impl= PUBS { + Err(Error::MaximumPublishersReached) + } else { + s.publisher_count += 1; + Ok(Publisher { channel: self }) } - - // No spot was found, we're full - Err(Error::MaximumPublishersReached) }) } @@ -94,12 +80,7 @@ impl PubSubBehavior for PubSubChannel { - fn get_message_with_context( - &self, - next_message_id: &mut u64, - subscriber_index: usize, - cx: Option<&mut Context<'_>>, - ) -> Poll> { + fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll> { self.inner.lock(|s| { let mut s = s.borrow_mut(); @@ -113,7 +94,7 @@ impl { if let Some(cx) = cx { - s.register_subscriber_waker(subscriber_index, cx.waker()); + s.register_subscriber_waker(cx.waker()); } Poll::Pending } @@ -126,7 +107,7 @@ impl>) -> Result<(), T> { + fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> { self.inner.lock(|s| { let mut s = s.borrow_mut(); // Try to publish the message @@ -136,7 +117,7 @@ impl { if let Some(cx) = cx { - s.register_publisher_waker(publisher_index, cx.waker()); + s.register_publisher_waker(cx.waker()); } Err(message) } @@ -151,17 +132,17 @@ impl; SUBS], + subscriber_wakers: MultiWakerRegistration, /// Collection of wakers for Publishers that are waiting. - /// The [Publisher::publisher_index] field indexes into this array. - publisher_wakers: [Option; PUBS], + publisher_wakers: MultiWakerRegistration, + /// The amount of subscribers that are active + subscriber_count: usize, + /// The amount of publishers that are active + publisher_count: usize, } impl PubSubState { /// Create a new internal channel state const fn new() -> Self { - const WAKER_INIT: Option = None; Self { queue: Deque::new(), next_message_id: 0, - subscriber_wakers: [WAKER_INIT; SUBS], - publisher_wakers: [WAKER_INIT; PUBS], + subscriber_wakers: MultiWakerRegistration::new(), + publisher_wakers: MultiWakerRegistration::new(), + subscriber_count: 0, + publisher_count: 0, } } fn try_publish(&mut self, message: T) -> Result<(), T> { - let active_subscriber_count = self.subscriber_wakers.iter().flatten().count(); - - if active_subscriber_count == 0 { + if self.subscriber_count == 0 { // We don't need to publish anything because there is no one to receive it return Ok(()); } @@ -206,14 +188,12 @@ impl PubSubSta return Err(message); } // We just did a check for this - self.queue.push_back((message, active_subscriber_count)).ok().unwrap(); + self.queue.push_back((message, self.subscriber_count)).ok().unwrap(); self.next_message_id += 1; // Wake all of the subscribers - for active_subscriber in self.subscriber_wakers.iter_mut().flatten() { - active_subscriber.wake() - } + self.subscriber_wakers.wake(); Ok(()) } @@ -250,26 +230,42 @@ impl PubSubSta if current_message_index == 0 && queue_item.1 == 0 { self.queue.pop_front(); - self.publisher_wakers.iter_mut().flatten().for_each(|w| w.wake()); + self.publisher_wakers.wake(); } Some(WaitResult::Message(message)) } - fn register_subscriber_waker(&mut self, subscriber_index: usize, waker: &Waker) { - self.subscriber_wakers[subscriber_index] - .as_mut() - .unwrap() - .register(waker); + fn register_subscriber_waker(&mut self, waker: &Waker) { + match self.subscriber_wakers.register(waker) { + Ok(()) => {} + Err(_) => { + // All waker slots were full. This can only happen when there was a subscriber that now has dropped. + // We need to throw it away. It's a bit inefficient, but we can wake everything. + // Any future that is still active will simply reregister. + // This won't happen a lot, so it's ok. + self.subscriber_wakers.wake(); + self.subscriber_wakers.register(waker).unwrap(); + } + } } - fn register_publisher_waker(&mut self, publisher_index: usize, waker: &Waker) { - self.publisher_wakers[publisher_index].as_mut().unwrap().register(waker); + fn register_publisher_waker(&mut self, waker: &Waker) { + match self.publisher_wakers.register(waker) { + Ok(()) => {} + Err(_) => { + // All waker slots were full. This can only happen when there was a publisher that now has dropped. + // We need to throw it away. It's a bit inefficient, but we can wake everything. + // Any future that is still active will simply reregister. + // This won't happen a lot, so it's ok. + self.publisher_wakers.wake(); + self.publisher_wakers.register(waker).unwrap(); + } + } } - fn unregister_subscriber(&mut self, subscriber_index: usize, subscriber_next_message_id: u64) { - // Remove the subscriber from the wakers - self.subscriber_wakers[subscriber_index] = None; + fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) { + self.subscriber_count -= 1; // All messages that haven't been read yet by this subscriber must have their counter decremented let start_id = self.next_message_id - self.queue.len() as u64; @@ -282,9 +278,8 @@ impl PubSubSta } } - fn unregister_publisher(&mut self, publisher_index: usize) { - // Remove the publisher from the wakers - self.publisher_wakers[publisher_index] = None; + fn unregister_publisher(&mut self) { + self.publisher_count -= 1; } } @@ -293,8 +288,6 @@ impl PubSubSta /// This instance carries a reference to the channel, but uses a trait object for it so that the channel's /// generics are erased on this subscriber pub struct Subscriber<'a, T: Clone> { - /// Our index into the channel - subscriber_index: usize, /// The message id of the next message we are yet to receive next_message_id: u64, /// The channel we are a subscriber to @@ -321,10 +314,7 @@ impl<'a, T: Clone> Subscriber<'a, T> { /// /// This function does not peek. The message is received if there is one. pub fn try_next_message(&mut self) -> Option> { - match self - .channel - .get_message_with_context(&mut self.next_message_id, self.subscriber_index, None) - { + match self.channel.get_message_with_context(&mut self.next_message_id, None) { Poll::Ready(result) => Some(result), Poll::Pending => None, } @@ -346,8 +336,7 @@ impl<'a, T: Clone> Subscriber<'a, T> { impl<'a, T: Clone> Drop for Subscriber<'a, T> { fn drop(&mut self) { - self.channel - .unregister_subscriber(self.subscriber_index, self.next_message_id) + self.channel.unregister_subscriber(self.next_message_id) } } @@ -357,10 +346,9 @@ impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> { type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let sub_index = self.subscriber_index; match self .channel - .get_message_with_context(&mut self.next_message_id, sub_index, Some(cx)) + .get_message_with_context(&mut self.next_message_id, Some(cx)) { Poll::Ready(WaitResult::Message(message)) => Poll::Ready(Some(message)), Poll::Ready(WaitResult::Lagged(_)) => { @@ -377,8 +365,6 @@ impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> { /// This instance carries a reference to the channel, but uses a trait object for it so that the channel's /// generics are erased on this subscriber pub struct Publisher<'a, T: Clone> { - /// Our index into the channel - publisher_index: usize, /// The channel we are a publisher for channel: &'a dyn PubSubBehavior, } @@ -400,13 +386,13 @@ impl<'a, T: Clone> Publisher<'a, T> { /// Publish a message if there is space in the message queue pub fn try_publish(&self, message: T) -> Result<(), T> { - self.channel.publish_with_context(message, self.publisher_index, None) + self.channel.publish_with_context(message, None) } } impl<'a, T: Clone> Drop for Publisher<'a, T> { fn drop(&mut self) { - self.channel.unregister_publisher(self.publisher_index) + self.channel.unregister_publisher() } } @@ -426,7 +412,7 @@ impl<'a, T: Clone> ImmediatePublisher<'a, T> { /// Publish a message if there is space in the message queue pub fn try_publish(&self, message: T) -> Result<(), T> { - self.channel.publish_with_context(message, usize::MAX, None) + self.channel.publish_with_context(message, None) } } @@ -442,20 +428,15 @@ pub enum Error { } trait PubSubBehavior { - fn get_message_with_context( - &self, - next_message_id: &mut u64, - subscriber_index: usize, - cx: Option<&mut Context<'_>>, - ) -> Poll>; + fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll>; - fn publish_with_context(&self, message: T, publisher_index: usize, cx: Option<&mut Context<'_>>) -> Result<(), T>; + fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>; fn publish_immediate(&self, message: T); - fn unregister_subscriber(&self, subscriber_index: usize, subscriber_next_message_id: u64); + fn unregister_subscriber(&self, subscriber_next_message_id: u64); - fn unregister_publisher(&self, publisher_index: usize); + fn unregister_publisher(&self); } /// Future for the subscriber wait action @@ -467,10 +448,9 @@ impl<'s, 'a, T: Clone> Future for SubscriberWaitFuture<'s, 'a, T> { type Output = WaitResult; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let sub_index = self.subscriber.subscriber_index; self.subscriber .channel - .get_message_with_context(&mut self.subscriber.next_message_id, sub_index, Some(cx)) + .get_message_with_context(&mut self.subscriber.next_message_id, Some(cx)) } } @@ -488,11 +468,7 @@ impl<'s, 'a, T: Clone> Future for PublisherWaitFuture<'s, 'a, T> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let message = self.message.take().unwrap(); - match self - .publisher - .channel - .publish_with_context(message, self.publisher.publisher_index, Some(cx)) - { + match self.publisher.channel.publish_with_context(message, Some(cx)) { Ok(()) => Poll::Ready(()), Err(message) => { self.message = Some(message); diff --git a/embassy/src/waitqueue/mod.rs b/embassy/src/waitqueue/mod.rs index a2bafad9..5c4e1bc3 100644 --- a/embassy/src/waitqueue/mod.rs +++ b/embassy/src/waitqueue/mod.rs @@ -3,3 +3,6 @@ #[cfg_attr(feature = "executor-agnostic", path = "waker_agnostic.rs")] mod waker; pub use waker::*; + +mod multi_waker; +pub use multi_waker::*; diff --git a/embassy/src/waitqueue/multi_waker.rs b/embassy/src/waitqueue/multi_waker.rs new file mode 100644 index 00000000..6e8710cb --- /dev/null +++ b/embassy/src/waitqueue/multi_waker.rs @@ -0,0 +1,31 @@ +use core::task::Waker; + +use super::WakerRegistration; + +pub struct MultiWakerRegistration { + wakers: [WakerRegistration; N], +} + +impl MultiWakerRegistration { + pub const fn new() -> Self { + const WAKER: WakerRegistration = WakerRegistration::new(); + Self { wakers: [WAKER; N] } + } + + /// Register a waker. If the buffer is full the function returns it in the error + pub fn register<'a>(&mut self, w: &'a Waker) -> Result<(), &'a Waker> { + if let Some(waker_slot) = self.wakers.iter_mut().find(|waker_slot| !waker_slot.occupied()) { + waker_slot.register(w); + Ok(()) + } else { + Err(w) + } + } + + /// Wake all registered wakers. This clears the buffer + pub fn wake(&mut self) { + for waker_slot in self.wakers.iter_mut() { + waker_slot.wake() + } + } +} diff --git a/embassy/src/waitqueue/waker.rs b/embassy/src/waitqueue/waker.rs index da907300..a90154cc 100644 --- a/embassy/src/waitqueue/waker.rs +++ b/embassy/src/waitqueue/waker.rs @@ -50,6 +50,11 @@ impl WakerRegistration { unsafe { wake_task(w) } } } + + /// Returns true if a waker is currently registered + pub fn occupied(&self) -> bool { + self.waker.is_some() + } } // SAFETY: `WakerRegistration` effectively contains an `Option`, diff --git a/embassy/src/waitqueue/waker_agnostic.rs b/embassy/src/waitqueue/waker_agnostic.rs index 89430aa4..62e3adb7 100644 --- a/embassy/src/waitqueue/waker_agnostic.rs +++ b/embassy/src/waitqueue/waker_agnostic.rs @@ -47,6 +47,11 @@ impl WakerRegistration { w.wake() } } + + /// Returns true if a waker is currently registered + pub fn occupied(&self) -> bool { + self.waker.is_some() + } } /// Utility struct to register and wake a waker. From 5eae0200749e967c40e7eed5879518b48ca832cc Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Fri, 17 Jun 2022 14:44:19 +0200 Subject: [PATCH 13/18] Created the possibility to choose between dyn channel and generic channel --- embassy/src/channel/pubsub.rs | 259 +++++++++++++++++++++++++++++----- 1 file changed, 226 insertions(+), 33 deletions(-) diff --git a/embassy/src/channel/pubsub.rs b/embassy/src/channel/pubsub.rs index 41f275c4..33440188 100644 --- a/embassy/src/channel/pubsub.rs +++ b/embassy/src/channel/pubsub.rs @@ -3,6 +3,8 @@ use core::cell::RefCell; use core::fmt::Debug; use core::future::Future; +use core::marker::PhantomData; +use core::ops::{Deref, DerefMut}; use core::pin::Pin; use core::task::{Context, Poll, Waker}; @@ -38,7 +40,7 @@ impl Result, Error> { + pub fn subscriber(&self) -> Result, Error> { self.inner.lock(|inner| { let mut s = inner.borrow_mut(); @@ -46,10 +48,31 @@ impl(&'a self) -> Result, Error> { + self.inner.lock(|inner| { + let mut s = inner.borrow_mut(); + + if s.subscriber_count >= SUBS { + Err(Error::MaximumSubscribersReached) + } else { + s.subscriber_count += 1; + Ok(DynSubscriber(Sub { + next_message_id: s.next_message_id, + channel: self as _, + _phantom: Default::default(), + })) } }) } @@ -57,7 +80,7 @@ impl Result, Error> { + pub fn publisher(&self) -> Result, Error> { self.inner.lock(|inner| { let mut s = inner.borrow_mut(); @@ -65,15 +88,49 @@ impl(&'a self) -> Result, Error> { + self.inner.lock(|inner| { + let mut s = inner.borrow_mut(); + + if s.publisher_count >= PUBS { + Err(Error::MaximumPublishersReached) + } else { + s.publisher_count += 1; + Ok(DynPublisher(Pub { + channel: self, + _phantom: Default::default(), + })) } }) } /// Create a new publisher that can only send immediate messages. /// This kind of publisher does not take up a publisher slot. - pub fn immediate_publisher(&self) -> ImmediatePublisher<'_, T> { - ImmediatePublisher { channel: self } + pub fn immediate_publisher(&self) -> ImmediatePublisher { + ImmediatePublisher(ImmediatePub { + channel: self, + _phantom: Default::default(), + }) + } + + /// Create a new publisher that can only send immediate messages. + /// This kind of publisher does not take up a publisher slot. + pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher { + DynImmediatePublisher(ImmediatePub { + channel: self, + _phantom: Default::default(), + }) } } @@ -287,16 +344,17 @@ impl PubSubSta /// /// This instance carries a reference to the channel, but uses a trait object for it so that the channel's /// generics are erased on this subscriber -pub struct Subscriber<'a, T: Clone> { +pub struct Sub<'a, PSB: PubSubBehavior + ?Sized, T: Clone> { /// The message id of the next message we are yet to receive next_message_id: u64, /// The channel we are a subscriber to - channel: &'a dyn PubSubBehavior, + channel: &'a PSB, + _phantom: PhantomData, } -impl<'a, T: Clone> Subscriber<'a, T> { +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Sub<'a, PSB, T> { /// Wait for a published message - pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, T> { + pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, PSB, T> { SubscriberWaitFuture { subscriber: self } } @@ -334,15 +392,55 @@ impl<'a, T: Clone> Subscriber<'a, T> { } } -impl<'a, T: Clone> Drop for Subscriber<'a, T> { +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { fn drop(&mut self) { self.channel.unregister_subscriber(self.next_message_id) } } +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Unpin for Sub<'a, PSB, T> {} + +pub struct DynSubscriber<'a, T: Clone>(Sub<'a, dyn PubSubBehavior + 'a, T>); + +impl<'a, T: Clone> Deref for DynSubscriber<'a, T> { + type Target = Sub<'a, dyn PubSubBehavior + 'a, T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, T: Clone> DerefMut for DynSubscriber<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( + Sub<'a, PubSubChannel, T>, +); + +impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref + for Subscriber<'a, M, T, CAP, SUBS, PUBS> +{ + type Target = Sub<'a, PubSubChannel, T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut + for Subscriber<'a, M, T, CAP, SUBS, PUBS> +{ + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + /// Warning: The stream implementation ignores lag results and returns all messages. /// This might miss some messages without you knowing it. -impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> { +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> futures::Stream for Sub<'a, PSB, T> { type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -364,12 +462,13 @@ impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> { /// /// This instance carries a reference to the channel, but uses a trait object for it so that the channel's /// generics are erased on this subscriber -pub struct Publisher<'a, T: Clone> { +pub struct Pub<'a, PSB: PubSubBehavior + ?Sized, T: Clone> { /// The channel we are a publisher for - channel: &'a dyn PubSubBehavior, + channel: &'a PSB, + _phantom: PhantomData, } -impl<'a, T: Clone> Publisher<'a, T> { +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Pub<'a, PSB, T> { /// Publish a message right now even when the queue is full. /// This may cause a subscriber to miss an older message. pub fn publish_immediate(&self, message: T) { @@ -377,7 +476,7 @@ impl<'a, T: Clone> Publisher<'a, T> { } /// Publish a message. But if the message queue is full, wait for all subscribers to have read the last message - pub fn publish<'s>(&'s self, message: T) -> PublisherWaitFuture<'s, 'a, T> { + pub fn publish<'s>(&'s self, message: T) -> PublisherWaitFuture<'s, 'a, PSB, T> { PublisherWaitFuture { message: Some(message), publisher: self, @@ -390,20 +489,59 @@ impl<'a, T: Clone> Publisher<'a, T> { } } -impl<'a, T: Clone> Drop for Publisher<'a, T> { +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { fn drop(&mut self) { self.channel.unregister_publisher() } } -/// A publisher that can only use the `publish_immediate` function, but it doesn't have to be registered with the channel. -/// (So an infinite amount is possible) -pub struct ImmediatePublisher<'a, T: Clone> { - /// The channel we are a publisher for - channel: &'a dyn PubSubBehavior, +pub struct DynPublisher<'a, T: Clone>(Pub<'a, dyn PubSubBehavior + 'a, T>); + +impl<'a, T: Clone> Deref for DynPublisher<'a, T> { + type Target = Pub<'a, dyn PubSubBehavior + 'a, T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } } -impl<'a, T: Clone> ImmediatePublisher<'a, T> { +impl<'a, T: Clone> DerefMut for DynPublisher<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +pub struct Publisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( + Pub<'a, PubSubChannel, T>, +); + +impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref + for Publisher<'a, M, T, CAP, SUBS, PUBS> +{ + type Target = Pub<'a, PubSubChannel, T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut + for Publisher<'a, M, T, CAP, SUBS, PUBS> +{ + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +/// A publisher that can only use the `publish_immediate` function, but it doesn't have to be registered with the channel. +/// (So an infinite amount is possible) +pub struct ImmediatePub<'a, PSB: PubSubBehavior + ?Sized, T: Clone> { + /// The channel we are a publisher for + channel: &'a PSB, + _phantom: PhantomData, +} + +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> { /// Publish the message right now even when the queue is full. /// This may cause a subscriber to miss an older message. pub fn publish_immediate(&mut self, message: T) { @@ -416,6 +554,44 @@ impl<'a, T: Clone> ImmediatePublisher<'a, T> { } } +pub struct DynImmediatePublisher<'a, T: Clone>(ImmediatePub<'a, dyn PubSubBehavior + 'a, T>); + +impl<'a, T: Clone> Deref for DynImmediatePublisher<'a, T> { + type Target = ImmediatePub<'a, dyn PubSubBehavior + 'a, T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, T: Clone> DerefMut for DynImmediatePublisher<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +pub struct ImmediatePublisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( + ImmediatePub<'a, PubSubChannel, T>, +); + +impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref + for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS> +{ + type Target = ImmediatePub<'a, PubSubChannel, T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut + for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS> +{ + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + /// Error type for the [PubSubChannel] #[derive(Debug, PartialEq, Clone)] pub enum Error { @@ -427,7 +603,7 @@ pub enum Error { MaximumPublishersReached, } -trait PubSubBehavior { +pub trait PubSubBehavior { fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll>; fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>; @@ -440,11 +616,11 @@ trait PubSubBehavior { } /// Future for the subscriber wait action -pub struct SubscriberWaitFuture<'s, 'a, T: Clone> { - subscriber: &'s mut Subscriber<'a, T>, +pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { + subscriber: &'s mut Sub<'a, PSB, T>, } -impl<'s, 'a, T: Clone> Future for SubscriberWaitFuture<'s, 'a, T> { +impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Future for SubscriberWaitFuture<'s, 'a, PSB, T> { type Output = WaitResult; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -454,16 +630,16 @@ impl<'s, 'a, T: Clone> Future for SubscriberWaitFuture<'s, 'a, T> { } } -impl<'s, 'a, T: Clone> Unpin for SubscriberWaitFuture<'s, 'a, T> {} +impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Unpin for SubscriberWaitFuture<'s, 'a, PSB, T> {} /// Future for the publisher wait action -pub struct PublisherWaitFuture<'s, 'a, T: Clone> { +pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { /// The message we need to publish message: Option, - publisher: &'s Publisher<'a, T>, + publisher: &'s Pub<'a, PSB, T>, } -impl<'s, 'a, T: Clone> Future for PublisherWaitFuture<'s, 'a, T> { +impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Future for PublisherWaitFuture<'s, 'a, PSB, T> { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -478,7 +654,7 @@ impl<'s, 'a, T: Clone> Future for PublisherWaitFuture<'s, 'a, T> { } } -impl<'s, 'a, T: Clone> Unpin for PublisherWaitFuture<'s, 'a, T> {} +impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Unpin for PublisherWaitFuture<'s, 'a, PSB, T> {} /// The result of the subscriber wait procedure #[derive(Debug, Clone, PartialEq)] @@ -495,6 +671,23 @@ mod tests { use super::*; use crate::blocking_mutex::raw::NoopRawMutex; + #[futures_test::test] + async fn dyn_pub_sub_works() { + let channel = PubSubChannel::::new(); + + let mut sub0 = channel.dyn_subscriber().unwrap(); + let mut sub1 = channel.dyn_subscriber().unwrap(); + let pub0 = channel.dyn_publisher().unwrap(); + + pub0.publish(42).await; + + assert_eq!(sub0.next_message().await, WaitResult::Message(42)); + assert_eq!(sub1.next_message().await, WaitResult::Message(42)); + + assert_eq!(sub0.try_next_message(), None); + assert_eq!(sub1.try_next_message(), None); + } + #[futures_test::test] async fn all_subscribers_receive() { let channel = PubSubChannel::::new(); From 4a5127aead15b913976138f94226f09026e04771 Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Fri, 17 Jun 2022 14:45:07 +0200 Subject: [PATCH 14/18] Move the module into a folder --- embassy/src/channel/{pubsub.rs => pubsub/mod.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename embassy/src/channel/{pubsub.rs => pubsub/mod.rs} (100%) diff --git a/embassy/src/channel/pubsub.rs b/embassy/src/channel/pubsub/mod.rs similarity index 100% rename from embassy/src/channel/pubsub.rs rename to embassy/src/channel/pubsub/mod.rs From 949b548d45910ca8ad7a3a5d4f65f9b230431c9e Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Fri, 17 Jun 2022 15:06:41 +0200 Subject: [PATCH 15/18] Refactor pub/sub impls into their own files --- embassy/src/channel/pubsub/mod.rs | 350 ++--------------------- embassy/src/channel/pubsub/publisher.rs | 183 ++++++++++++ embassy/src/channel/pubsub/subscriber.rs | 153 ++++++++++ 3 files changed, 363 insertions(+), 323 deletions(-) create mode 100644 embassy/src/channel/pubsub/publisher.rs create mode 100644 embassy/src/channel/pubsub/subscriber.rs diff --git a/embassy/src/channel/pubsub/mod.rs b/embassy/src/channel/pubsub/mod.rs index 33440188..fdd67536 100644 --- a/embassy/src/channel/pubsub/mod.rs +++ b/embassy/src/channel/pubsub/mod.rs @@ -1,19 +1,25 @@ //! Implementation of [PubSubChannel], a queue where published messages get received by all subscribers. +#![deny(missing_docs)] + use core::cell::RefCell; use core::fmt::Debug; -use core::future::Future; -use core::marker::PhantomData; -use core::ops::{Deref, DerefMut}; -use core::pin::Pin; use core::task::{Context, Poll, Waker}; use heapless::Deque; +use self::publisher::{ImmediatePub, Pub}; +use self::subscriber::Sub; use crate::blocking_mutex::raw::RawMutex; use crate::blocking_mutex::Mutex; use crate::waitqueue::MultiWakerRegistration; +pub mod publisher; +pub mod subscriber; + +pub use publisher::{DynImmediatePublisher, DynPublisher, ImmediatePublisher, Publisher}; +pub use subscriber::{DynSubscriber, Subscriber}; + /// A broadcast channel implementation where multiple publishers can send messages to multiple subscribers /// /// Any published message can be read by all subscribers. @@ -48,11 +54,7 @@ impl ImmediatePublisher { - ImmediatePublisher(ImmediatePub { - channel: self, - _phantom: Default::default(), - }) + ImmediatePublisher(ImmediatePub::new(self)) } /// Create a new publisher that can only send immediate messages. /// This kind of publisher does not take up a publisher slot. pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher { - DynImmediatePublisher(ImmediatePub { - channel: self, - _phantom: Default::default(), - }) + DynImmediatePublisher(ImmediatePub::new(self)) } } @@ -340,258 +326,6 @@ impl PubSubSta } } -/// A subscriber to a channel -/// -/// This instance carries a reference to the channel, but uses a trait object for it so that the channel's -/// generics are erased on this subscriber -pub struct Sub<'a, PSB: PubSubBehavior + ?Sized, T: Clone> { - /// The message id of the next message we are yet to receive - next_message_id: u64, - /// The channel we are a subscriber to - channel: &'a PSB, - _phantom: PhantomData, -} - -impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Sub<'a, PSB, T> { - /// Wait for a published message - pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, PSB, T> { - SubscriberWaitFuture { subscriber: self } - } - - /// Wait for a published message (ignoring lag results) - pub async fn next_message_pure(&mut self) -> T { - loop { - match self.next_message().await { - WaitResult::Lagged(_) => continue, - WaitResult::Message(message) => break message, - } - } - } - - /// Try to see if there's a published message we haven't received yet. - /// - /// This function does not peek. The message is received if there is one. - pub fn try_next_message(&mut self) -> Option> { - match self.channel.get_message_with_context(&mut self.next_message_id, None) { - Poll::Ready(result) => Some(result), - Poll::Pending => None, - } - } - - /// Try to see if there's a published message we haven't received yet (ignoring lag results). - /// - /// This function does not peek. The message is received if there is one. - pub fn try_next_message_pure(&mut self) -> Option { - loop { - match self.try_next_message() { - Some(WaitResult::Lagged(_)) => continue, - Some(WaitResult::Message(message)) => break Some(message), - None => break None, - } - } - } -} - -impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { - fn drop(&mut self) { - self.channel.unregister_subscriber(self.next_message_id) - } -} - -impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Unpin for Sub<'a, PSB, T> {} - -pub struct DynSubscriber<'a, T: Clone>(Sub<'a, dyn PubSubBehavior + 'a, T>); - -impl<'a, T: Clone> Deref for DynSubscriber<'a, T> { - type Target = Sub<'a, dyn PubSubBehavior + 'a, T>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl<'a, T: Clone> DerefMut for DynSubscriber<'a, T> { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( - Sub<'a, PubSubChannel, T>, -); - -impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref - for Subscriber<'a, M, T, CAP, SUBS, PUBS> -{ - type Target = Sub<'a, PubSubChannel, T>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut - for Subscriber<'a, M, T, CAP, SUBS, PUBS> -{ - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -/// Warning: The stream implementation ignores lag results and returns all messages. -/// This might miss some messages without you knowing it. -impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> futures::Stream for Sub<'a, PSB, T> { - type Item = T; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self - .channel - .get_message_with_context(&mut self.next_message_id, Some(cx)) - { - Poll::Ready(WaitResult::Message(message)) => Poll::Ready(Some(message)), - Poll::Ready(WaitResult::Lagged(_)) => { - cx.waker().wake_by_ref(); - Poll::Pending - } - Poll::Pending => Poll::Pending, - } - } -} - -/// A publisher to a channel -/// -/// This instance carries a reference to the channel, but uses a trait object for it so that the channel's -/// generics are erased on this subscriber -pub struct Pub<'a, PSB: PubSubBehavior + ?Sized, T: Clone> { - /// The channel we are a publisher for - channel: &'a PSB, - _phantom: PhantomData, -} - -impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Pub<'a, PSB, T> { - /// Publish a message right now even when the queue is full. - /// This may cause a subscriber to miss an older message. - pub fn publish_immediate(&self, message: T) { - self.channel.publish_immediate(message) - } - - /// Publish a message. But if the message queue is full, wait for all subscribers to have read the last message - pub fn publish<'s>(&'s self, message: T) -> PublisherWaitFuture<'s, 'a, PSB, T> { - PublisherWaitFuture { - message: Some(message), - publisher: self, - } - } - - /// Publish a message if there is space in the message queue - pub fn try_publish(&self, message: T) -> Result<(), T> { - self.channel.publish_with_context(message, None) - } -} - -impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { - fn drop(&mut self) { - self.channel.unregister_publisher() - } -} - -pub struct DynPublisher<'a, T: Clone>(Pub<'a, dyn PubSubBehavior + 'a, T>); - -impl<'a, T: Clone> Deref for DynPublisher<'a, T> { - type Target = Pub<'a, dyn PubSubBehavior + 'a, T>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl<'a, T: Clone> DerefMut for DynPublisher<'a, T> { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -pub struct Publisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( - Pub<'a, PubSubChannel, T>, -); - -impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref - for Publisher<'a, M, T, CAP, SUBS, PUBS> -{ - type Target = Pub<'a, PubSubChannel, T>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut - for Publisher<'a, M, T, CAP, SUBS, PUBS> -{ - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -/// A publisher that can only use the `publish_immediate` function, but it doesn't have to be registered with the channel. -/// (So an infinite amount is possible) -pub struct ImmediatePub<'a, PSB: PubSubBehavior + ?Sized, T: Clone> { - /// The channel we are a publisher for - channel: &'a PSB, - _phantom: PhantomData, -} - -impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> { - /// Publish the message right now even when the queue is full. - /// This may cause a subscriber to miss an older message. - pub fn publish_immediate(&mut self, message: T) { - self.channel.publish_immediate(message) - } - - /// Publish a message if there is space in the message queue - pub fn try_publish(&self, message: T) -> Result<(), T> { - self.channel.publish_with_context(message, None) - } -} - -pub struct DynImmediatePublisher<'a, T: Clone>(ImmediatePub<'a, dyn PubSubBehavior + 'a, T>); - -impl<'a, T: Clone> Deref for DynImmediatePublisher<'a, T> { - type Target = ImmediatePub<'a, dyn PubSubBehavior + 'a, T>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl<'a, T: Clone> DerefMut for DynImmediatePublisher<'a, T> { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -pub struct ImmediatePublisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( - ImmediatePub<'a, PubSubChannel, T>, -); - -impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref - for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS> -{ - type Target = ImmediatePub<'a, PubSubChannel, T>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut - for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS> -{ - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - /// Error type for the [PubSubChannel] #[derive(Debug, PartialEq, Clone)] pub enum Error { @@ -603,59 +337,29 @@ pub enum Error { MaximumPublishersReached, } +/// 'Middle level' behaviour of the pubsub channel. +/// This trait is used so that Sub and Pub can be generic over the channel. pub trait PubSubBehavior { + /// Try to get a message from the queue with the given message id. + /// + /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll>; + /// Try to publish a message to the queue. + /// + /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>; + /// Publish a message immediately fn publish_immediate(&self, message: T); + /// Let the channel know that a subscriber has dropped fn unregister_subscriber(&self, subscriber_next_message_id: u64); + /// Let the channel know that a publisher has dropped fn unregister_publisher(&self); } -/// Future for the subscriber wait action -pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { - subscriber: &'s mut Sub<'a, PSB, T>, -} - -impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Future for SubscriberWaitFuture<'s, 'a, PSB, T> { - type Output = WaitResult; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.subscriber - .channel - .get_message_with_context(&mut self.subscriber.next_message_id, Some(cx)) - } -} - -impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Unpin for SubscriberWaitFuture<'s, 'a, PSB, T> {} - -/// Future for the publisher wait action -pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { - /// The message we need to publish - message: Option, - publisher: &'s Pub<'a, PSB, T>, -} - -impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Future for PublisherWaitFuture<'s, 'a, PSB, T> { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let message = self.message.take().unwrap(); - match self.publisher.channel.publish_with_context(message, Some(cx)) { - Ok(()) => Poll::Ready(()), - Err(message) => { - self.message = Some(message); - Poll::Pending - } - } - } -} - -impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Unpin for PublisherWaitFuture<'s, 'a, PSB, T> {} - /// The result of the subscriber wait procedure #[derive(Debug, Clone, PartialEq)] pub enum WaitResult { diff --git a/embassy/src/channel/pubsub/publisher.rs b/embassy/src/channel/pubsub/publisher.rs new file mode 100644 index 00000000..89a0b924 --- /dev/null +++ b/embassy/src/channel/pubsub/publisher.rs @@ -0,0 +1,183 @@ +//! Implementation of anything directly publisher related + +use core::marker::PhantomData; +use core::ops::{Deref, DerefMut}; +use core::pin::Pin; +use core::task::{Context, Poll}; + +use futures::Future; + +use super::{PubSubBehavior, PubSubChannel}; +use crate::blocking_mutex::raw::RawMutex; + +/// A publisher to a channel +pub struct Pub<'a, PSB: PubSubBehavior + ?Sized, T: Clone> { + /// The channel we are a publisher for + channel: &'a PSB, + _phantom: PhantomData, +} + +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Pub<'a, PSB, T> { + pub(super) fn new(channel: &'a PSB) -> Self { + Self { + channel, + _phantom: Default::default(), + } + } + + /// Publish a message right now even when the queue is full. + /// This may cause a subscriber to miss an older message. + pub fn publish_immediate(&self, message: T) { + self.channel.publish_immediate(message) + } + + /// Publish a message. But if the message queue is full, wait for all subscribers to have read the last message + pub fn publish<'s>(&'s self, message: T) -> PublisherWaitFuture<'s, 'a, PSB, T> { + PublisherWaitFuture { + message: Some(message), + publisher: self, + } + } + + /// Publish a message if there is space in the message queue + pub fn try_publish(&self, message: T) -> Result<(), T> { + self.channel.publish_with_context(message, None) + } +} + +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { + fn drop(&mut self) { + self.channel.unregister_publisher() + } +} + +/// A publisher that holds a dynamic reference to the channel +pub struct DynPublisher<'a, T: Clone>(pub(super) Pub<'a, dyn PubSubBehavior + 'a, T>); + +impl<'a, T: Clone> Deref for DynPublisher<'a, T> { + type Target = Pub<'a, dyn PubSubBehavior + 'a, T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, T: Clone> DerefMut for DynPublisher<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +/// A publisher that holds a generic reference to the channel +pub struct Publisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( + pub(super) Pub<'a, PubSubChannel, T>, +); + +impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref + for Publisher<'a, M, T, CAP, SUBS, PUBS> +{ + type Target = Pub<'a, PubSubChannel, T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut + for Publisher<'a, M, T, CAP, SUBS, PUBS> +{ + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +/// A publisher that can only use the `publish_immediate` function, but it doesn't have to be registered with the channel. +/// (So an infinite amount is possible) +pub struct ImmediatePub<'a, PSB: PubSubBehavior + ?Sized, T: Clone> { + /// The channel we are a publisher for + channel: &'a PSB, + _phantom: PhantomData, +} + +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> { + pub(super) fn new(channel: &'a PSB) -> Self { + Self { + channel, + _phantom: Default::default(), + } + } + /// Publish the message right now even when the queue is full. + /// This may cause a subscriber to miss an older message. + pub fn publish_immediate(&mut self, message: T) { + self.channel.publish_immediate(message) + } + + /// Publish a message if there is space in the message queue + pub fn try_publish(&self, message: T) -> Result<(), T> { + self.channel.publish_with_context(message, None) + } +} + +/// An immediate publisher that holds a dynamic reference to the channel +pub struct DynImmediatePublisher<'a, T: Clone>(pub(super) ImmediatePub<'a, dyn PubSubBehavior + 'a, T>); + +impl<'a, T: Clone> Deref for DynImmediatePublisher<'a, T> { + type Target = ImmediatePub<'a, dyn PubSubBehavior + 'a, T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, T: Clone> DerefMut for DynImmediatePublisher<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +/// An immediate publisher that holds a generic reference to the channel +pub struct ImmediatePublisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( + pub(super) ImmediatePub<'a, PubSubChannel, T>, +); + +impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref + for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS> +{ + type Target = ImmediatePub<'a, PubSubChannel, T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut + for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS> +{ + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +/// Future for the publisher wait action +pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { + /// The message we need to publish + message: Option, + publisher: &'s Pub<'a, PSB, T>, +} + +impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Future for PublisherWaitFuture<'s, 'a, PSB, T> { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let message = self.message.take().unwrap(); + match self.publisher.channel.publish_with_context(message, Some(cx)) { + Ok(()) => Poll::Ready(()), + Err(message) => { + self.message = Some(message); + Poll::Pending + } + } + } +} + +impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Unpin for PublisherWaitFuture<'s, 'a, PSB, T> {} diff --git a/embassy/src/channel/pubsub/subscriber.rs b/embassy/src/channel/pubsub/subscriber.rs new file mode 100644 index 00000000..23c4938d --- /dev/null +++ b/embassy/src/channel/pubsub/subscriber.rs @@ -0,0 +1,153 @@ +//! Implementation of anything directly subscriber related + +use core::marker::PhantomData; +use core::ops::{Deref, DerefMut}; +use core::pin::Pin; +use core::task::{Context, Poll}; + +use futures::Future; + +use super::{PubSubBehavior, PubSubChannel, WaitResult}; +use crate::blocking_mutex::raw::RawMutex; + +/// A subscriber to a channel +pub struct Sub<'a, PSB: PubSubBehavior + ?Sized, T: Clone> { + /// The message id of the next message we are yet to receive + next_message_id: u64, + /// The channel we are a subscriber to + channel: &'a PSB, + _phantom: PhantomData, +} + +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Sub<'a, PSB, T> { + pub(super) fn new(next_message_id: u64, channel: &'a PSB) -> Self { + Self { + next_message_id, + channel, + _phantom: Default::default(), + } + } + + /// Wait for a published message + pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, PSB, T> { + SubscriberWaitFuture { subscriber: self } + } + + /// Wait for a published message (ignoring lag results) + pub async fn next_message_pure(&mut self) -> T { + loop { + match self.next_message().await { + WaitResult::Lagged(_) => continue, + WaitResult::Message(message) => break message, + } + } + } + + /// Try to see if there's a published message we haven't received yet. + /// + /// This function does not peek. The message is received if there is one. + pub fn try_next_message(&mut self) -> Option> { + match self.channel.get_message_with_context(&mut self.next_message_id, None) { + Poll::Ready(result) => Some(result), + Poll::Pending => None, + } + } + + /// Try to see if there's a published message we haven't received yet (ignoring lag results). + /// + /// This function does not peek. The message is received if there is one. + pub fn try_next_message_pure(&mut self) -> Option { + loop { + match self.try_next_message() { + Some(WaitResult::Lagged(_)) => continue, + Some(WaitResult::Message(message)) => break Some(message), + None => break None, + } + } + } +} + +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { + fn drop(&mut self) { + self.channel.unregister_subscriber(self.next_message_id) + } +} + +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Unpin for Sub<'a, PSB, T> {} + +/// Warning: The stream implementation ignores lag results and returns all messages. +/// This might miss some messages without you knowing it. +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> futures::Stream for Sub<'a, PSB, T> { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self + .channel + .get_message_with_context(&mut self.next_message_id, Some(cx)) + { + Poll::Ready(WaitResult::Message(message)) => Poll::Ready(Some(message)), + Poll::Ready(WaitResult::Lagged(_)) => { + cx.waker().wake_by_ref(); + Poll::Pending + } + Poll::Pending => Poll::Pending, + } + } +} + +/// A subscriber that holds a dynamic reference to the channel +pub struct DynSubscriber<'a, T: Clone>(pub(super) Sub<'a, dyn PubSubBehavior + 'a, T>); + +impl<'a, T: Clone> Deref for DynSubscriber<'a, T> { + type Target = Sub<'a, dyn PubSubBehavior + 'a, T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, T: Clone> DerefMut for DynSubscriber<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +/// A subscriber that holds a generic reference to the channel +pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( + pub(super) Sub<'a, PubSubChannel, T>, +); + +impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref + for Subscriber<'a, M, T, CAP, SUBS, PUBS> +{ + type Target = Sub<'a, PubSubChannel, T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut + for Subscriber<'a, M, T, CAP, SUBS, PUBS> +{ + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +/// Future for the subscriber wait action +pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { + subscriber: &'s mut Sub<'a, PSB, T>, +} + +impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Future for SubscriberWaitFuture<'s, 'a, PSB, T> { + type Output = WaitResult; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.subscriber + .channel + .get_message_with_context(&mut self.subscriber.next_message_id, Some(cx)) + } +} + +impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Unpin for SubscriberWaitFuture<'s, 'a, PSB, T> {} From 9887f18a51015c771d039f80f347a6ed18cce055 Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Fri, 17 Jun 2022 15:08:31 +0200 Subject: [PATCH 16/18] fmt --- embassy/src/channel/pubsub/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/embassy/src/channel/pubsub/mod.rs b/embassy/src/channel/pubsub/mod.rs index fdd67536..e421df58 100644 --- a/embassy/src/channel/pubsub/mod.rs +++ b/embassy/src/channel/pubsub/mod.rs @@ -341,12 +341,12 @@ pub enum Error { /// This trait is used so that Sub and Pub can be generic over the channel. pub trait PubSubBehavior { /// Try to get a message from the queue with the given message id. - /// + /// /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll>; /// Try to publish a message to the queue. - /// + /// /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>; From 1eec7e69f1be1bf91b8ca576cc8d8e30629705be Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Fri, 17 Jun 2022 15:29:42 +0200 Subject: [PATCH 17/18] Added some more docs --- embassy/src/channel/pubsub/mod.rs | 42 ++++++++++++++++++++++++++++ embassy/src/waitqueue/multi_waker.rs | 2 ++ 2 files changed, 44 insertions(+) diff --git a/embassy/src/channel/pubsub/mod.rs b/embassy/src/channel/pubsub/mod.rs index e421df58..12f0d24a 100644 --- a/embassy/src/channel/pubsub/mod.rs +++ b/embassy/src/channel/pubsub/mod.rs @@ -29,6 +29,48 @@ pub use subscriber::{DynSubscriber, Subscriber}; /// - With [Publisher::publish_immediate] the publisher doesn't await and instead lets the oldest message /// in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive /// an error to indicate that it has lagged. +/// +/// ## Example +/// +/// ``` +/// # use embassy::blocking_mutex::raw::NoopRawMutex; +/// # use embassy::channel::pubsub::WaitResult; +/// # use embassy::channel::pubsub::PubSubChannel; +/// # use futures_executor::block_on; +/// # let test = async { +/// // Create the channel. This can be static as well +/// let channel = PubSubChannel::::new(); +/// +/// // This is a generic subscriber with a direct reference to the channel +/// let mut sub0 = channel.subscriber().unwrap(); +/// // This is a dynamic subscriber with a dynamic (trait object) reference to the channel +/// let mut sub1 = channel.dyn_subscriber().unwrap(); +/// +/// let pub0 = channel.publisher().unwrap(); +/// +/// // Publish a message, but wait if the queue is full +/// pub0.publish(42).await; +/// +/// // Publish a message, but if the queue is full, just kick out the oldest message. +/// // This may cause some subscribers to miss a message +/// pub0.publish_immediate(43); +/// +/// // Wait for a new message. If the subscriber missed a message, the WaitResult will be a Lag result +/// assert_eq!(sub0.next_message().await, WaitResult::Message(42)); +/// assert_eq!(sub1.next_message().await, WaitResult::Message(42)); +/// +/// // Wait again, but this time ignore any Lag results +/// assert_eq!(sub0.next_message_pure().await, 43); +/// assert_eq!(sub1.next_message_pure().await, 43); +/// +/// // There's also a polling interface +/// assert_eq!(sub0.try_next_message(), None); +/// assert_eq!(sub1.try_next_message(), None); +/// # }; +/// # +/// # block_on(test); +/// ``` +/// pub struct PubSubChannel { inner: Mutex>>, } diff --git a/embassy/src/waitqueue/multi_waker.rs b/embassy/src/waitqueue/multi_waker.rs index 6e8710cb..325d2cb3 100644 --- a/embassy/src/waitqueue/multi_waker.rs +++ b/embassy/src/waitqueue/multi_waker.rs @@ -2,11 +2,13 @@ use core::task::Waker; use super::WakerRegistration; +/// Utility struct to register and wake multiple wakers. pub struct MultiWakerRegistration { wakers: [WakerRegistration; N], } impl MultiWakerRegistration { + /// Create a new empty instance pub const fn new() -> Self { const WAKER: WakerRegistration = WakerRegistration::new(); Self { wakers: [WAKER; N] } From 78c546f3566d4289ebd9512fa3e0a03eee6fd87f Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Tue, 21 Jun 2022 15:47:20 +0200 Subject: [PATCH 18/18] Added example and some defmt --- embassy/src/channel/pubsub/mod.rs | 2 + examples/nrf/src/bin/pubsub.rs | 106 ++++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+) create mode 100644 examples/nrf/src/bin/pubsub.rs diff --git a/embassy/src/channel/pubsub/mod.rs b/embassy/src/channel/pubsub/mod.rs index 12f0d24a..9bfb845e 100644 --- a/embassy/src/channel/pubsub/mod.rs +++ b/embassy/src/channel/pubsub/mod.rs @@ -370,6 +370,7 @@ impl PubSubSta /// Error type for the [PubSubChannel] #[derive(Debug, PartialEq, Clone)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] pub enum Error { /// All subscriber slots are used. To add another subscriber, first another subscriber must be dropped or /// the capacity of the channels must be increased. @@ -404,6 +405,7 @@ pub trait PubSubBehavior { /// The result of the subscriber wait procedure #[derive(Debug, Clone, PartialEq)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] pub enum WaitResult { /// The subscriber did not receive all messages and lagged by the given amount of messages. /// (This is the amount of messages that were missed) diff --git a/examples/nrf/src/bin/pubsub.rs b/examples/nrf/src/bin/pubsub.rs new file mode 100644 index 00000000..2c3a355c --- /dev/null +++ b/examples/nrf/src/bin/pubsub.rs @@ -0,0 +1,106 @@ +#![no_std] +#![no_main] +#![feature(type_alias_impl_trait)] + +use defmt::unwrap; +use embassy::blocking_mutex::raw::ThreadModeRawMutex; +use embassy::channel::pubsub::{DynSubscriber, PubSubChannel, Subscriber}; +use embassy::executor::Spawner; +use embassy::time::{Duration, Timer}; +use {defmt_rtt as _, panic_probe as _}; + +/// Create the message bus. It has a queue of 4, supports 3 subscribers and 1 publisher +static MESSAGE_BUS: PubSubChannel = PubSubChannel::new(); + +#[derive(Clone, defmt::Format)] +enum Message { + A, + B, + C, +} + +#[embassy::main] +async fn main(spawner: Spawner, _p: embassy_nrf::Peripherals) { + defmt::info!("Hello World!"); + + // It's good to set up the subscribers before publishing anything. + // A subscriber will only yield messages that have been published after its creation. + + spawner.must_spawn(fast_logger(unwrap!(MESSAGE_BUS.subscriber()))); + spawner.must_spawn(slow_logger(unwrap!(MESSAGE_BUS.dyn_subscriber()))); + spawner.must_spawn(slow_logger_pure(unwrap!(MESSAGE_BUS.dyn_subscriber()))); + + // Get a publisher + let message_publisher = unwrap!(MESSAGE_BUS.publisher()); + // We can't get more (normal) publishers + // We can have an infinite amount of immediate publishers. They can't await a publish, only do an immediate publish + defmt::assert!(MESSAGE_BUS.publisher().is_err()); + + let mut index = 0; + loop { + Timer::after(Duration::from_millis(500)).await; + + let message = match index % 3 { + 0 => Message::A, + 1 => Message::B, + 2..=u32::MAX => Message::C, + }; + + // We publish immediately and don't await anything. + // If the queue is full, it will cause the oldest message to not be received by some/all subscribers + message_publisher.publish_immediate(message); + + // Try to comment out the last one and uncomment this line below. + // The behaviour will change: + // - The subscribers won't miss any messages any more + // - Trying to publish now has some wait time when the queue is full + + // message_publisher.publish(message).await; + + index += 1; + } +} + +/// A logger task that just awaits the messages it receives +/// +/// This takes the generic `Subscriber`. This is most performant, but requires you to write down all of the generics +#[embassy::task] +async fn fast_logger(mut messages: Subscriber<'static, ThreadModeRawMutex, Message, 4, 3, 1>) { + loop { + let message = messages.next_message().await; + defmt::info!("Received message at fast logger: {:?}", message); + } +} + +/// A logger task that awaits the messages, but also does some other work. +/// Because of this, depeding on how the messages were published, the subscriber might miss some messages +/// +/// This takes the dynamic `DynSubscriber`. This is not as performant as the generic version, but let's you ignore some of the generics +#[embassy::task] +async fn slow_logger(mut messages: DynSubscriber<'static, Message>) { + loop { + // Do some work + Timer::after(Duration::from_millis(2000)).await; + + // If the publisher has used the `publish_immediate` function, then we may receive a lag message here + let message = messages.next_message().await; + defmt::info!("Received message at slow logger: {:?}", message); + + // If the previous one was a lag message, then we should receive the next message here immediately + let message = messages.next_message().await; + defmt::info!("Received message at slow logger: {:?}", message); + } +} + +/// Same as `slow_logger` but it ignores lag results +#[embassy::task] +async fn slow_logger_pure(mut messages: DynSubscriber<'static, Message>) { + loop { + // Do some work + Timer::after(Duration::from_millis(2000)).await; + + // Instead of receiving lags here, we just ignore that and read the next message + let message = messages.next_message_pure().await; + defmt::info!("Received message at slow logger pure: {:?}", message); + } +}