Migrated to the waker registration functionality for Embassy specific optimisations

This commit is contained in:
huntc 2021-07-11 11:47:09 +10:00
parent dcd0c38109
commit 108cffcba0
2 changed files with 20 additions and 42 deletions

View File

@ -39,6 +39,7 @@ embedded-hal = "0.2.5"
cast = { version = "=0.2.3", default-features = false } cast = { version = "=0.2.3", default-features = false }
[dev-dependencies] [dev-dependencies]
embassy = { path = ".", features = ["executor-agnostic"] }
futures-executor = { version = "0.3", features = [ "thread-pool" ] } futures-executor = { version = "0.3", features = [ "thread-pool" ] }
futures-test = "0.3" futures-test = "0.3"
futures-timer = "0.3" futures-timer = "0.3"

View File

@ -51,6 +51,7 @@ use super::CriticalSectionMutex;
use super::Mutex; use super::Mutex;
use super::NoopMutex; use super::NoopMutex;
use super::ThreadModeMutex; use super::ThreadModeMutex;
use super::WakerRegistration;
/// Send values to the associated `Receiver`. /// Send values to the associated `Receiver`.
/// ///
@ -149,7 +150,7 @@ where
Ok(v) => Poll::Ready(Some(v)), Ok(v) => Poll::Ready(Some(v)),
Err(TryRecvError::Closed) => Poll::Ready(None), Err(TryRecvError::Closed) => Poll::Ready(None),
Err(TryRecvError::Empty) => { Err(TryRecvError::Empty) => {
self.channel.get().set_receiver_waker(cx.waker().clone()); self.channel.get().set_receiver_waker(&cx.waker());
Poll::Pending Poll::Pending
} }
} }
@ -282,10 +283,7 @@ where
Ok(..) => Poll::Ready(Ok(())), Ok(..) => Poll::Ready(Ok(())),
Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))), Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))),
Err(TrySendError::Full(..)) => { Err(TrySendError::Full(..)) => {
self.sender self.sender.channel.get().set_senders_waker(&cx.waker());
.channel
.get()
.set_senders_waker(cx.waker().clone());
Poll::Pending Poll::Pending
// Note we leave the existing UnsafeCell contents - they still // Note we leave the existing UnsafeCell contents - they still
// contain the original message. We could create another UnsafeCell // contain the original message. We could create another UnsafeCell
@ -312,10 +310,7 @@ where
if self.sender.is_closed() { if self.sender.is_closed() {
Poll::Ready(()) Poll::Ready(())
} else { } else {
self.sender self.sender.channel.get().set_senders_waker(&cx.waker());
.channel
.get()
.set_senders_waker(cx.waker().clone());
Poll::Pending Poll::Pending
} }
} }
@ -400,8 +395,8 @@ struct ChannelState<T, const N: usize> {
closed: bool, closed: bool,
receiver_registered: bool, receiver_registered: bool,
senders_registered: u32, senders_registered: u32,
receiver_waker: Option<Waker>, receiver_waker: WakerRegistration,
senders_waker: Option<Waker>, senders_waker: WakerRegistration,
} }
impl<T, const N: usize> ChannelState<T, N> { impl<T, const N: usize> ChannelState<T, N> {
@ -416,8 +411,8 @@ impl<T, const N: usize> ChannelState<T, N> {
let closed = false; let closed = false;
let receiver_registered = false; let receiver_registered = false;
let senders_registered = 0; let senders_registered = 0;
let receiver_waker = None; let receiver_waker = WakerRegistration::new();
let senders_waker = None; let senders_waker = WakerRegistration::new();
ChannelState { ChannelState {
buf, buf,
read_pos, read_pos,
@ -534,9 +529,7 @@ where
if state.read_pos != state.write_pos || state.full { if state.read_pos != state.write_pos || state.full {
if state.full { if state.full {
state.full = false; state.full = false;
if let Some(w) = state.senders_waker.take() { state.senders_waker.wake();
w.wake();
}
} }
let message = let message =
unsafe { (state.buf[state.read_pos]).assume_init_mut().get().read() }; unsafe { (state.buf[state.read_pos]).assume_init_mut().get().read() };
@ -546,9 +539,7 @@ where
Err(TryRecvError::Empty) Err(TryRecvError::Empty)
} else { } else {
state.closed = true; state.closed = true;
if let Some(w) = state.senders_waker.take() { state.senders_waker.wake();
w.wake();
}
Err(TryRecvError::Closed) Err(TryRecvError::Closed)
} }
} else { } else {
@ -567,9 +558,7 @@ where
if state.write_pos == state.read_pos { if state.write_pos == state.read_pos {
state.full = true; state.full = true;
} }
if let Some(w) = state.receiver_waker.take() { state.receiver_waker.wake();
w.wake();
}
Ok(()) Ok(())
} else { } else {
Err(TrySendError::Full(message)) Err(TrySendError::Full(message))
@ -583,9 +572,7 @@ where
fn close(&mut self) { fn close(&mut self) {
let state = &mut self.state; let state = &mut self.state;
self.mutex.lock(|_| { self.mutex.lock(|_| {
if let Some(w) = state.receiver_waker.take() { state.receiver_waker.wake();
w.wake();
}
state.closing = true; state.closing = true;
}); });
} }
@ -608,9 +595,7 @@ where
self.mutex.lock(|_| { self.mutex.lock(|_| {
if state.receiver_registered { if state.receiver_registered {
state.closed = true; state.closed = true;
if let Some(w) = state.senders_waker.take() { state.senders_waker.wake();
w.wake();
}
} }
state.receiver_registered = false; state.receiver_registered = false;
}) })
@ -629,38 +614,30 @@ where
assert!(state.senders_registered > 0); assert!(state.senders_registered > 0);
state.senders_registered -= 1; state.senders_registered -= 1;
if state.senders_registered == 0 { if state.senders_registered == 0 {
if let Some(w) = state.receiver_waker.take() { state.receiver_waker.wake();
w.wake();
}
state.closing = true; state.closing = true;
} }
}) })
} }
fn set_receiver_waker(&mut self, receiver_waker: Waker) { fn set_receiver_waker(&mut self, receiver_waker: &Waker) {
let state = &mut self.state; let state = &mut self.state;
self.mutex.lock(|_| { self.mutex.lock(|_| {
state.receiver_waker = Some(receiver_waker); state.receiver_waker.register(receiver_waker);
}) })
} }
fn set_senders_waker(&mut self, senders_waker: Waker) { fn set_senders_waker(&mut self, senders_waker: &Waker) {
let state = &mut self.state; let state = &mut self.state;
self.mutex.lock(|_| { self.mutex.lock(|_| {
// Dispose of any existing sender causing them to be polled again. // Dispose of any existing sender causing them to be polled again.
// This could cause a spin given multiple concurrent senders, however given that // This could cause a spin given multiple concurrent senders, however given that
// most sends only block waiting for the receiver to become active, this should // most sends only block waiting for the receiver to become active, this should
// be a short-lived activity. The upside is a greatly simplified implementation // be a short-lived activity. The upside is a greatly simplified implementation
// that avoids the need for intrusive linked-lists and unsafe operations on pinned // that avoids the need for intrusive linked-lists and unsafe operations on pinned
// pointers. // pointers.
if let Some(waker) = state.senders_waker.clone() { state.senders_waker.wake();
if !senders_waker.will_wake(&waker) { state.senders_waker.register(senders_waker);
trace!("Waking an an active send waker due to being superseded with a new one. While benign, please report this.");
waker.wake();
}
}
state.senders_waker = Some(senders_waker);
}) })
} }
} }