Removed most unsafe code

This commit is contained in:
Dion Dokter 2022-06-16 20:57:11 +02:00
parent cdacc44c5f
commit dfde157337

View File

@ -109,9 +109,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
return Err(message);
}
// We just did a check for this
unsafe {
s.queue.push_back_unchecked((message, active_subscriber_count));
}
s.queue.push_back((message, active_subscriber_count)).ok().unwrap();
s.next_message_id += 1;
@ -138,7 +136,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
drop(s);
// This will succeed because we made sure there is space
unsafe { self.try_publish(message).unwrap_unchecked() };
self.try_publish(message).ok().unwrap();
});
}
@ -159,51 +157,41 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
}
// We've checked that the index is valid
unsafe {
let queue_item = s.queue.iter_mut().nth(current_message_index).unwrap_unchecked();
let queue_item = s.queue.iter_mut().nth(current_message_index).unwrap();
// We're reading this item, so decrement the counter
queue_item.1 -= 1;
let message = queue_item.0.clone();
// We're reading this item, so decrement the counter
queue_item.1 -= 1;
let message = queue_item.0.clone();
if current_message_index == 0 && queue_item.1 == 0 {
s.queue.pop_front();
s.publisher_wakers.iter_mut().flatten().for_each(|w| w.wake());
}
Some(WaitResult::Message(message))
if current_message_index == 0 && queue_item.1 == 0 {
s.queue.pop_front();
s.publisher_wakers.iter_mut().flatten().for_each(|w| w.wake());
}
Some(WaitResult::Message(message))
})
}
unsafe fn register_subscriber_waker(&self, subscriber_index: usize, waker: &Waker) {
fn register_subscriber_waker(&self, subscriber_index: usize, waker: &Waker) {
self.inner.lock(|inner| {
let mut s = inner.borrow_mut();
s.subscriber_wakers
.get_unchecked_mut(subscriber_index)
.as_mut()
.unwrap_unchecked()
.register(waker);
s.subscriber_wakers[subscriber_index].as_mut().unwrap().register(waker);
})
}
unsafe fn register_publisher_waker(&self, publisher_index: usize, waker: &Waker) {
fn register_publisher_waker(&self, publisher_index: usize, waker: &Waker) {
self.inner.lock(|inner| {
let mut s = inner.borrow_mut();
s.publisher_wakers
.get_unchecked_mut(publisher_index)
.as_mut()
.unwrap_unchecked()
.register(waker);
s.publisher_wakers[publisher_index].as_mut().unwrap().register(waker);
})
}
unsafe fn unregister_subscriber(&self, subscriber_index: usize, subscriber_next_message_id: u64) {
fn unregister_subscriber(&self, subscriber_index: usize, subscriber_next_message_id: u64) {
self.inner.lock(|inner| {
let mut s = inner.borrow_mut();
// Remove the subscriber from the wakers
*s.subscriber_wakers.get_unchecked_mut(subscriber_index) = None;
s.subscriber_wakers[subscriber_index] = None;
// All messages that haven't been read yet by this subscriber must have their counter decremented
let start_id = s.next_message_id - s.queue.len() as u64;
@ -217,11 +205,11 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
})
}
unsafe fn unregister_publisher(&self, publisher_index: usize) {
fn unregister_publisher(&self, publisher_index: usize) {
self.inner.lock(|inner| {
let mut s = inner.borrow_mut();
// Remove the publisher from the wakers
*s.publisher_wakers.get_unchecked_mut(publisher_index) = None;
s.publisher_wakers[publisher_index] = None;
})
}
}
@ -316,10 +304,8 @@ impl<'a, T: Clone> Subscriber<'a, T> {
impl<'a, T: Clone> Drop for Subscriber<'a, T> {
fn drop(&mut self) {
unsafe {
self.channel
.unregister_subscriber(self.subscriber_index, self.next_message_id)
}
self.channel
.unregister_subscriber(self.subscriber_index, self.next_message_id)
}
}
@ -340,10 +326,8 @@ impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> {
}
// No, so we need to reregister our waker and sleep again
None => {
unsafe {
this.channel
.register_subscriber_waker(this.subscriber_index, cx.waker());
}
this.channel
.register_subscriber_waker(this.subscriber_index, cx.waker());
Poll::Pending
}
// We missed a couple of messages. We must do our internal bookkeeping.
@ -391,7 +375,7 @@ impl<'a, T: Clone> Publisher<'a, T> {
impl<'a, T: Clone> Drop for Publisher<'a, T> {
fn drop(&mut self) {
unsafe { self.channel.unregister_publisher(self.publisher_index) }
self.channel.unregister_publisher(self.publisher_index)
}
}
@ -434,31 +418,13 @@ trait PubSubBehavior<T> {
/// Tries to read the message if available
fn get_message(&self, message_id: u64) -> Option<WaitResult<T>>;
/// Register the given waker for the given subscriber.
///
/// ## Safety
///
/// The subscriber index must be of a valid and active subscriber
unsafe fn register_subscriber_waker(&self, subscriber_index: usize, waker: &Waker);
fn register_subscriber_waker(&self, subscriber_index: usize, waker: &Waker);
/// Register the given waker for the given publisher.
///
/// ## Safety
///
/// The subscriber index must be of a valid and active publisher
unsafe fn register_publisher_waker(&self, publisher_index: usize, waker: &Waker);
fn register_publisher_waker(&self, publisher_index: usize, waker: &Waker);
/// Make the channel forget the subscriber.
///
/// ## Safety
///
/// The subscriber index must be of a valid and active subscriber which must not be used again
/// unless a new subscriber takes on that index.
unsafe fn unregister_subscriber(&self, subscriber_index: usize, subscriber_next_message_id: u64);
fn unregister_subscriber(&self, subscriber_index: usize, subscriber_next_message_id: u64);
/// Make the channel forget the publisher.
///
/// ## Safety
///
/// The publisher index must be of a valid and active publisher which must not be used again
/// unless a new publisher takes on that index.
unsafe fn unregister_publisher(&self, publisher_index: usize);
fn unregister_publisher(&self, publisher_index: usize);
}
/// Future for the subscriber wait action
@ -479,11 +445,9 @@ impl<'s, 'a, T: Clone> Future for SubscriberWaitFuture<'s, 'a, T> {
}
// No, so we need to reregister our waker and sleep again
None => {
unsafe {
self.subscriber
.channel
.register_subscriber_waker(self.subscriber.subscriber_index, cx.waker());
}
self.subscriber
.channel
.register_subscriber_waker(self.subscriber.subscriber_index, cx.waker());
Poll::Pending
}
// We missed a couple of messages. We must do our internal bookkeeping and return that we lagged
@ -517,11 +481,9 @@ impl<'s, 'a, T: Clone> Future for PublisherWaitFuture<'s, 'a, T> {
// The queue is full, so we need to reregister our waker and go to sleep
Err(message) => {
this.message = Some(message);
unsafe {
this.publisher
.channel
.register_publisher_waker(this.publisher.publisher_index, cx.waker());
}
this.publisher
.channel
.register_publisher_waker(this.publisher.publisher_index, cx.waker());
Poll::Pending
}
}