diff --git a/embassy/src/channel/pubsub.rs b/embassy/src/channel/pubsub.rs index ea0ccb2d..20878187 100644 --- a/embassy/src/channel/pubsub.rs +++ b/embassy/src/channel/pubsub.rs @@ -109,9 +109,7 @@ impl Subscriber<'a, T> { impl<'a, T: Clone> Drop for Subscriber<'a, T> { fn drop(&mut self) { - unsafe { - self.channel - .unregister_subscriber(self.subscriber_index, self.next_message_id) - } + self.channel + .unregister_subscriber(self.subscriber_index, self.next_message_id) } } @@ -340,10 +326,8 @@ impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> { } // No, so we need to reregister our waker and sleep again None => { - unsafe { - this.channel - .register_subscriber_waker(this.subscriber_index, cx.waker()); - } + this.channel + .register_subscriber_waker(this.subscriber_index, cx.waker()); Poll::Pending } // We missed a couple of messages. We must do our internal bookkeeping. @@ -391,7 +375,7 @@ impl<'a, T: Clone> Publisher<'a, T> { impl<'a, T: Clone> Drop for Publisher<'a, T> { fn drop(&mut self) { - unsafe { self.channel.unregister_publisher(self.publisher_index) } + self.channel.unregister_publisher(self.publisher_index) } } @@ -434,31 +418,13 @@ trait PubSubBehavior { /// Tries to read the message if available fn get_message(&self, message_id: u64) -> Option>; /// Register the given waker for the given subscriber. - /// - /// ## Safety - /// - /// The subscriber index must be of a valid and active subscriber - unsafe fn register_subscriber_waker(&self, subscriber_index: usize, waker: &Waker); + fn register_subscriber_waker(&self, subscriber_index: usize, waker: &Waker); /// Register the given waker for the given publisher. - /// - /// ## Safety - /// - /// The subscriber index must be of a valid and active publisher - unsafe fn register_publisher_waker(&self, publisher_index: usize, waker: &Waker); + fn register_publisher_waker(&self, publisher_index: usize, waker: &Waker); /// Make the channel forget the subscriber. - /// - /// ## Safety - /// - /// The subscriber index must be of a valid and active subscriber which must not be used again - /// unless a new subscriber takes on that index. - unsafe fn unregister_subscriber(&self, subscriber_index: usize, subscriber_next_message_id: u64); + fn unregister_subscriber(&self, subscriber_index: usize, subscriber_next_message_id: u64); /// Make the channel forget the publisher. - /// - /// ## Safety - /// - /// The publisher index must be of a valid and active publisher which must not be used again - /// unless a new publisher takes on that index. - unsafe fn unregister_publisher(&self, publisher_index: usize); + fn unregister_publisher(&self, publisher_index: usize); } /// Future for the subscriber wait action @@ -479,11 +445,9 @@ impl<'s, 'a, T: Clone> Future for SubscriberWaitFuture<'s, 'a, T> { } // No, so we need to reregister our waker and sleep again None => { - unsafe { - self.subscriber - .channel - .register_subscriber_waker(self.subscriber.subscriber_index, cx.waker()); - } + self.subscriber + .channel + .register_subscriber_waker(self.subscriber.subscriber_index, cx.waker()); Poll::Pending } // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged @@ -517,11 +481,9 @@ impl<'s, 'a, T: Clone> Future for PublisherWaitFuture<'s, 'a, T> { // The queue is full, so we need to reregister our waker and go to sleep Err(message) => { this.message = Some(message); - unsafe { - this.publisher - .channel - .register_publisher_waker(this.publisher.publisher_index, cx.waker()); - } + this.publisher + .channel + .register_publisher_waker(this.publisher.publisher_index, cx.waker()); Poll::Pending } }