diff --git a/embassy/src/channel/pubsub.rs b/embassy/src/channel/pubsub.rs index 41f275c4..33440188 100644 --- a/embassy/src/channel/pubsub.rs +++ b/embassy/src/channel/pubsub.rs @@ -3,6 +3,8 @@ use core::cell::RefCell; use core::fmt::Debug; use core::future::Future; +use core::marker::PhantomData; +use core::ops::{Deref, DerefMut}; use core::pin::Pin; use core::task::{Context, Poll, Waker}; @@ -38,7 +40,7 @@ impl Result, Error> { + pub fn subscriber(&self) -> Result, Error> { self.inner.lock(|inner| { let mut s = inner.borrow_mut(); @@ -46,10 +48,31 @@ impl(&'a self) -> Result, Error> { + self.inner.lock(|inner| { + let mut s = inner.borrow_mut(); + + if s.subscriber_count >= SUBS { + Err(Error::MaximumSubscribersReached) + } else { + s.subscriber_count += 1; + Ok(DynSubscriber(Sub { + next_message_id: s.next_message_id, + channel: self as _, + _phantom: Default::default(), + })) } }) } @@ -57,7 +80,7 @@ impl Result, Error> { + pub fn publisher(&self) -> Result, Error> { self.inner.lock(|inner| { let mut s = inner.borrow_mut(); @@ -65,15 +88,49 @@ impl(&'a self) -> Result, Error> { + self.inner.lock(|inner| { + let mut s = inner.borrow_mut(); + + if s.publisher_count >= PUBS { + Err(Error::MaximumPublishersReached) + } else { + s.publisher_count += 1; + Ok(DynPublisher(Pub { + channel: self, + _phantom: Default::default(), + })) } }) } /// Create a new publisher that can only send immediate messages. /// This kind of publisher does not take up a publisher slot. - pub fn immediate_publisher(&self) -> ImmediatePublisher<'_, T> { - ImmediatePublisher { channel: self } + pub fn immediate_publisher(&self) -> ImmediatePublisher { + ImmediatePublisher(ImmediatePub { + channel: self, + _phantom: Default::default(), + }) + } + + /// Create a new publisher that can only send immediate messages. + /// This kind of publisher does not take up a publisher slot. + pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher { + DynImmediatePublisher(ImmediatePub { + channel: self, + _phantom: Default::default(), + }) } } @@ -287,16 +344,17 @@ impl PubSubSta /// /// This instance carries a reference to the channel, but uses a trait object for it so that the channel's /// generics are erased on this subscriber -pub struct Subscriber<'a, T: Clone> { +pub struct Sub<'a, PSB: PubSubBehavior + ?Sized, T: Clone> { /// The message id of the next message we are yet to receive next_message_id: u64, /// The channel we are a subscriber to - channel: &'a dyn PubSubBehavior, + channel: &'a PSB, + _phantom: PhantomData, } -impl<'a, T: Clone> Subscriber<'a, T> { +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Sub<'a, PSB, T> { /// Wait for a published message - pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, T> { + pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, PSB, T> { SubscriberWaitFuture { subscriber: self } } @@ -334,15 +392,55 @@ impl<'a, T: Clone> Subscriber<'a, T> { } } -impl<'a, T: Clone> Drop for Subscriber<'a, T> { +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { fn drop(&mut self) { self.channel.unregister_subscriber(self.next_message_id) } } +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Unpin for Sub<'a, PSB, T> {} + +pub struct DynSubscriber<'a, T: Clone>(Sub<'a, dyn PubSubBehavior + 'a, T>); + +impl<'a, T: Clone> Deref for DynSubscriber<'a, T> { + type Target = Sub<'a, dyn PubSubBehavior + 'a, T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, T: Clone> DerefMut for DynSubscriber<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( + Sub<'a, PubSubChannel, T>, +); + +impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref + for Subscriber<'a, M, T, CAP, SUBS, PUBS> +{ + type Target = Sub<'a, PubSubChannel, T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut + for Subscriber<'a, M, T, CAP, SUBS, PUBS> +{ + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + /// Warning: The stream implementation ignores lag results and returns all messages. /// This might miss some messages without you knowing it. -impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> { +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> futures::Stream for Sub<'a, PSB, T> { type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -364,12 +462,13 @@ impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> { /// /// This instance carries a reference to the channel, but uses a trait object for it so that the channel's /// generics are erased on this subscriber -pub struct Publisher<'a, T: Clone> { +pub struct Pub<'a, PSB: PubSubBehavior + ?Sized, T: Clone> { /// The channel we are a publisher for - channel: &'a dyn PubSubBehavior, + channel: &'a PSB, + _phantom: PhantomData, } -impl<'a, T: Clone> Publisher<'a, T> { +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Pub<'a, PSB, T> { /// Publish a message right now even when the queue is full. /// This may cause a subscriber to miss an older message. pub fn publish_immediate(&self, message: T) { @@ -377,7 +476,7 @@ impl<'a, T: Clone> Publisher<'a, T> { } /// Publish a message. But if the message queue is full, wait for all subscribers to have read the last message - pub fn publish<'s>(&'s self, message: T) -> PublisherWaitFuture<'s, 'a, T> { + pub fn publish<'s>(&'s self, message: T) -> PublisherWaitFuture<'s, 'a, PSB, T> { PublisherWaitFuture { message: Some(message), publisher: self, @@ -390,20 +489,59 @@ impl<'a, T: Clone> Publisher<'a, T> { } } -impl<'a, T: Clone> Drop for Publisher<'a, T> { +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { fn drop(&mut self) { self.channel.unregister_publisher() } } -/// A publisher that can only use the `publish_immediate` function, but it doesn't have to be registered with the channel. -/// (So an infinite amount is possible) -pub struct ImmediatePublisher<'a, T: Clone> { - /// The channel we are a publisher for - channel: &'a dyn PubSubBehavior, +pub struct DynPublisher<'a, T: Clone>(Pub<'a, dyn PubSubBehavior + 'a, T>); + +impl<'a, T: Clone> Deref for DynPublisher<'a, T> { + type Target = Pub<'a, dyn PubSubBehavior + 'a, T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } } -impl<'a, T: Clone> ImmediatePublisher<'a, T> { +impl<'a, T: Clone> DerefMut for DynPublisher<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +pub struct Publisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( + Pub<'a, PubSubChannel, T>, +); + +impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref + for Publisher<'a, M, T, CAP, SUBS, PUBS> +{ + type Target = Pub<'a, PubSubChannel, T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut + for Publisher<'a, M, T, CAP, SUBS, PUBS> +{ + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +/// A publisher that can only use the `publish_immediate` function, but it doesn't have to be registered with the channel. +/// (So an infinite amount is possible) +pub struct ImmediatePub<'a, PSB: PubSubBehavior + ?Sized, T: Clone> { + /// The channel we are a publisher for + channel: &'a PSB, + _phantom: PhantomData, +} + +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> { /// Publish the message right now even when the queue is full. /// This may cause a subscriber to miss an older message. pub fn publish_immediate(&mut self, message: T) { @@ -416,6 +554,44 @@ impl<'a, T: Clone> ImmediatePublisher<'a, T> { } } +pub struct DynImmediatePublisher<'a, T: Clone>(ImmediatePub<'a, dyn PubSubBehavior + 'a, T>); + +impl<'a, T: Clone> Deref for DynImmediatePublisher<'a, T> { + type Target = ImmediatePub<'a, dyn PubSubBehavior + 'a, T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, T: Clone> DerefMut for DynImmediatePublisher<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +pub struct ImmediatePublisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( + ImmediatePub<'a, PubSubChannel, T>, +); + +impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref + for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS> +{ + type Target = ImmediatePub<'a, PubSubChannel, T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut + for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS> +{ + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + /// Error type for the [PubSubChannel] #[derive(Debug, PartialEq, Clone)] pub enum Error { @@ -427,7 +603,7 @@ pub enum Error { MaximumPublishersReached, } -trait PubSubBehavior { +pub trait PubSubBehavior { fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll>; fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>; @@ -440,11 +616,11 @@ trait PubSubBehavior { } /// Future for the subscriber wait action -pub struct SubscriberWaitFuture<'s, 'a, T: Clone> { - subscriber: &'s mut Subscriber<'a, T>, +pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { + subscriber: &'s mut Sub<'a, PSB, T>, } -impl<'s, 'a, T: Clone> Future for SubscriberWaitFuture<'s, 'a, T> { +impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Future for SubscriberWaitFuture<'s, 'a, PSB, T> { type Output = WaitResult; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -454,16 +630,16 @@ impl<'s, 'a, T: Clone> Future for SubscriberWaitFuture<'s, 'a, T> { } } -impl<'s, 'a, T: Clone> Unpin for SubscriberWaitFuture<'s, 'a, T> {} +impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Unpin for SubscriberWaitFuture<'s, 'a, PSB, T> {} /// Future for the publisher wait action -pub struct PublisherWaitFuture<'s, 'a, T: Clone> { +pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { /// The message we need to publish message: Option, - publisher: &'s Publisher<'a, T>, + publisher: &'s Pub<'a, PSB, T>, } -impl<'s, 'a, T: Clone> Future for PublisherWaitFuture<'s, 'a, T> { +impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Future for PublisherWaitFuture<'s, 'a, PSB, T> { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -478,7 +654,7 @@ impl<'s, 'a, T: Clone> Future for PublisherWaitFuture<'s, 'a, T> { } } -impl<'s, 'a, T: Clone> Unpin for PublisherWaitFuture<'s, 'a, T> {} +impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Unpin for PublisherWaitFuture<'s, 'a, PSB, T> {} /// The result of the subscriber wait procedure #[derive(Debug, Clone, PartialEq)] @@ -495,6 +671,23 @@ mod tests { use super::*; use crate::blocking_mutex::raw::NoopRawMutex; + #[futures_test::test] + async fn dyn_pub_sub_works() { + let channel = PubSubChannel::::new(); + + let mut sub0 = channel.dyn_subscriber().unwrap(); + let mut sub1 = channel.dyn_subscriber().unwrap(); + let pub0 = channel.dyn_publisher().unwrap(); + + pub0.publish(42).await; + + assert_eq!(sub0.next_message().await, WaitResult::Message(42)); + assert_eq!(sub1.next_message().await, WaitResult::Message(42)); + + assert_eq!(sub0.try_next_message(), None); + assert_eq!(sub1.try_next_message(), None); + } + #[futures_test::test] async fn all_subscribers_receive() { let channel = PubSubChannel::::new();