Merge pull request #1763 from rubdos/sender-receiver-with-ctx
Refactor Channel/Sender/Receiver poll methods
This commit is contained in:
@ -65,6 +65,13 @@ where
|
||||
pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
|
||||
self.channel.try_send(message)
|
||||
}
|
||||
|
||||
/// Allows a poll_fn to poll until the channel is ready to send
|
||||
///
|
||||
/// See [`Channel::poll_ready_to_send()`]
|
||||
pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
|
||||
self.channel.poll_ready_to_send(cx)
|
||||
}
|
||||
}
|
||||
|
||||
/// Send-only access to a [`Channel`] without knowing channel size.
|
||||
@ -106,6 +113,13 @@ impl<'ch, T> DynamicSender<'ch, T> {
|
||||
pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
|
||||
self.channel.try_send_with_context(message, None)
|
||||
}
|
||||
|
||||
/// Allows a poll_fn to poll until the channel is ready to send
|
||||
///
|
||||
/// See [`Channel::poll_ready_to_send()`]
|
||||
pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
|
||||
self.channel.poll_ready_to_send(cx)
|
||||
}
|
||||
}
|
||||
|
||||
/// Receive-only access to a [`Channel`].
|
||||
@ -133,16 +147,30 @@ where
|
||||
{
|
||||
/// Receive the next value.
|
||||
///
|
||||
/// See [`Channel::recv()`].
|
||||
pub fn recv(&self) -> RecvFuture<'_, M, T, N> {
|
||||
self.channel.recv()
|
||||
/// See [`Channel::receive()`].
|
||||
pub fn receive(&self) -> ReceiveFuture<'_, M, T, N> {
|
||||
self.channel.receive()
|
||||
}
|
||||
|
||||
/// Attempt to immediately receive the next value.
|
||||
///
|
||||
/// See [`Channel::try_recv()`]
|
||||
pub fn try_recv(&self) -> Result<T, TryRecvError> {
|
||||
self.channel.try_recv()
|
||||
/// See [`Channel::try_receive()`]
|
||||
pub fn try_receive(&self) -> Result<T, TryReceiveError> {
|
||||
self.channel.try_receive()
|
||||
}
|
||||
|
||||
/// Allows a poll_fn to poll until the channel is ready to receive
|
||||
///
|
||||
/// See [`Channel::poll_ready_to_receive()`]
|
||||
pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
|
||||
self.channel.poll_ready_to_receive(cx)
|
||||
}
|
||||
|
||||
/// Poll the channel for the next item
|
||||
///
|
||||
/// See [`Channel::poll_receive()`]
|
||||
pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
|
||||
self.channel.poll_receive(cx)
|
||||
}
|
||||
}
|
||||
|
||||
@ -162,16 +190,30 @@ impl<'ch, T> Copy for DynamicReceiver<'ch, T> {}
|
||||
impl<'ch, T> DynamicReceiver<'ch, T> {
|
||||
/// Receive the next value.
|
||||
///
|
||||
/// See [`Channel::recv()`].
|
||||
pub fn recv(&self) -> DynamicRecvFuture<'_, T> {
|
||||
DynamicRecvFuture { channel: self.channel }
|
||||
/// See [`Channel::receive()`].
|
||||
pub fn receive(&self) -> DynamicReceiveFuture<'_, T> {
|
||||
DynamicReceiveFuture { channel: self.channel }
|
||||
}
|
||||
|
||||
/// Attempt to immediately receive the next value.
|
||||
///
|
||||
/// See [`Channel::try_recv()`]
|
||||
pub fn try_recv(&self) -> Result<T, TryRecvError> {
|
||||
self.channel.try_recv_with_context(None)
|
||||
/// See [`Channel::try_receive()`]
|
||||
pub fn try_receive(&self) -> Result<T, TryReceiveError> {
|
||||
self.channel.try_receive_with_context(None)
|
||||
}
|
||||
|
||||
/// Allows a poll_fn to poll until the channel is ready to receive
|
||||
///
|
||||
/// See [`Channel::poll_ready_to_receive()`]
|
||||
pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
|
||||
self.channel.poll_ready_to_receive(cx)
|
||||
}
|
||||
|
||||
/// Poll the channel for the next item
|
||||
///
|
||||
/// See [`Channel::poll_receive()`]
|
||||
pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
|
||||
self.channel.poll_receive(cx)
|
||||
}
|
||||
}
|
||||
|
||||
@ -184,42 +226,39 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Future returned by [`Channel::recv`] and [`Receiver::recv`].
|
||||
/// Future returned by [`Channel::receive`] and [`Receiver::receive`].
|
||||
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
||||
pub struct RecvFuture<'ch, M, T, const N: usize>
|
||||
pub struct ReceiveFuture<'ch, M, T, const N: usize>
|
||||
where
|
||||
M: RawMutex,
|
||||
{
|
||||
channel: &'ch Channel<M, T, N>,
|
||||
}
|
||||
|
||||
impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N>
|
||||
impl<'ch, M, T, const N: usize> Future for ReceiveFuture<'ch, M, T, N>
|
||||
where
|
||||
M: RawMutex,
|
||||
{
|
||||
type Output = T;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
|
||||
match self.channel.try_recv_with_context(Some(cx)) {
|
||||
Ok(v) => Poll::Ready(v),
|
||||
Err(TryRecvError::Empty) => Poll::Pending,
|
||||
}
|
||||
self.channel.poll_receive(cx)
|
||||
}
|
||||
}
|
||||
|
||||
/// Future returned by [`DynamicReceiver::recv`].
|
||||
/// Future returned by [`DynamicReceiver::receive`].
|
||||
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
||||
pub struct DynamicRecvFuture<'ch, T> {
|
||||
pub struct DynamicReceiveFuture<'ch, T> {
|
||||
channel: &'ch dyn DynamicChannel<T>,
|
||||
}
|
||||
|
||||
impl<'ch, T> Future for DynamicRecvFuture<'ch, T> {
|
||||
impl<'ch, T> Future for DynamicReceiveFuture<'ch, T> {
|
||||
type Output = T;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
|
||||
match self.channel.try_recv_with_context(Some(cx)) {
|
||||
match self.channel.try_receive_with_context(Some(cx)) {
|
||||
Ok(v) => Poll::Ready(v),
|
||||
Err(TryRecvError::Empty) => Poll::Pending,
|
||||
Err(TryReceiveError::Empty) => Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -285,13 +324,18 @@ impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {}
|
||||
trait DynamicChannel<T> {
|
||||
fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>;
|
||||
|
||||
fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError>;
|
||||
fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>;
|
||||
|
||||
fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>;
|
||||
fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>;
|
||||
|
||||
fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T>;
|
||||
}
|
||||
|
||||
/// Error returned by [`try_recv`](Channel::try_recv).
|
||||
/// Error returned by [`try_receive`](Channel::try_receive).
|
||||
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
|
||||
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
|
||||
pub enum TryRecvError {
|
||||
pub enum TryReceiveError {
|
||||
/// A message could not be received because the channel is empty.
|
||||
Empty,
|
||||
}
|
||||
@ -320,11 +364,11 @@ impl<T, const N: usize> ChannelState<T, N> {
|
||||
}
|
||||
}
|
||||
|
||||
fn try_recv(&mut self) -> Result<T, TryRecvError> {
|
||||
self.try_recv_with_context(None)
|
||||
fn try_receive(&mut self) -> Result<T, TryReceiveError> {
|
||||
self.try_receive_with_context(None)
|
||||
}
|
||||
|
||||
fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
|
||||
fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
|
||||
if self.queue.is_full() {
|
||||
self.senders_waker.wake();
|
||||
}
|
||||
@ -335,14 +379,31 @@ impl<T, const N: usize> ChannelState<T, N> {
|
||||
if let Some(cx) = cx {
|
||||
self.receiver_waker.register(cx.waker());
|
||||
}
|
||||
Err(TryRecvError::Empty)
|
||||
Err(TryReceiveError::Empty)
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> bool {
|
||||
fn poll_receive(&mut self, cx: &mut Context<'_>) -> Poll<T> {
|
||||
if self.queue.is_full() {
|
||||
self.senders_waker.wake();
|
||||
}
|
||||
|
||||
if let Some(message) = self.queue.pop_front() {
|
||||
Poll::Ready(message)
|
||||
} else {
|
||||
self.receiver_waker.register(cx.waker());
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> Poll<()> {
|
||||
self.receiver_waker.register(cx.waker());
|
||||
|
||||
!self.queue.is_empty()
|
||||
if !self.queue.is_empty() {
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
|
||||
@ -364,10 +425,14 @@ impl<T, const N: usize> ChannelState<T, N> {
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_ready_to_send(&mut self, cx: &mut Context<'_>) -> bool {
|
||||
fn poll_ready_to_send(&mut self, cx: &mut Context<'_>) -> Poll<()> {
|
||||
self.senders_waker.register(cx.waker());
|
||||
|
||||
!self.queue.is_full()
|
||||
if !self.queue.is_full() {
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -409,8 +474,13 @@ where
|
||||
self.inner.lock(|rc| f(&mut *rc.borrow_mut()))
|
||||
}
|
||||
|
||||
fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
|
||||
self.lock(|c| c.try_recv_with_context(cx))
|
||||
fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
|
||||
self.lock(|c| c.try_receive_with_context(cx))
|
||||
}
|
||||
|
||||
/// Poll the channel for the next message
|
||||
pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
|
||||
self.lock(|c| c.poll_receive(cx))
|
||||
}
|
||||
|
||||
fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
|
||||
@ -418,12 +488,12 @@ where
|
||||
}
|
||||
|
||||
/// Allows a poll_fn to poll until the channel is ready to receive
|
||||
pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> bool {
|
||||
pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
|
||||
self.lock(|c| c.poll_ready_to_receive(cx))
|
||||
}
|
||||
|
||||
/// Allows a poll_fn to poll until the channel is ready to send
|
||||
pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> bool {
|
||||
pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
|
||||
self.lock(|c| c.poll_ready_to_send(cx))
|
||||
}
|
||||
|
||||
@ -466,16 +536,16 @@ where
|
||||
///
|
||||
/// If there are no messages in the channel's buffer, this method will
|
||||
/// wait until a message is sent.
|
||||
pub fn recv(&self) -> RecvFuture<'_, M, T, N> {
|
||||
RecvFuture { channel: self }
|
||||
pub fn receive(&self) -> ReceiveFuture<'_, M, T, N> {
|
||||
ReceiveFuture { channel: self }
|
||||
}
|
||||
|
||||
/// Attempt to immediately receive a message.
|
||||
///
|
||||
/// This method will either receive a message from the channel immediately or return an error
|
||||
/// if the channel is empty.
|
||||
pub fn try_recv(&self) -> Result<T, TryRecvError> {
|
||||
self.lock(|c| c.try_recv())
|
||||
pub fn try_receive(&self) -> Result<T, TryReceiveError> {
|
||||
self.lock(|c| c.try_receive())
|
||||
}
|
||||
}
|
||||
|
||||
@ -489,8 +559,20 @@ where
|
||||
Channel::try_send_with_context(self, m, cx)
|
||||
}
|
||||
|
||||
fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
|
||||
Channel::try_recv_with_context(self, cx)
|
||||
fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
|
||||
Channel::try_receive_with_context(self, cx)
|
||||
}
|
||||
|
||||
fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
|
||||
Channel::poll_ready_to_send(self, cx)
|
||||
}
|
||||
|
||||
fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
|
||||
Channel::poll_ready_to_receive(self, cx)
|
||||
}
|
||||
|
||||
fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
|
||||
Channel::poll_receive(self, cx)
|
||||
}
|
||||
}
|
||||
|
||||
@ -534,15 +616,15 @@ mod tests {
|
||||
fn receiving_once_with_one_send() {
|
||||
let mut c = ChannelState::<u32, 3>::new();
|
||||
assert!(c.try_send(1).is_ok());
|
||||
assert_eq!(c.try_recv().unwrap(), 1);
|
||||
assert_eq!(c.try_receive().unwrap(), 1);
|
||||
assert_eq!(capacity(&c), 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn receiving_when_empty() {
|
||||
let mut c = ChannelState::<u32, 3>::new();
|
||||
match c.try_recv() {
|
||||
Err(TryRecvError::Empty) => assert!(true),
|
||||
match c.try_receive() {
|
||||
Err(TryReceiveError::Empty) => assert!(true),
|
||||
_ => assert!(false),
|
||||
}
|
||||
assert_eq!(capacity(&c), 3);
|
||||
@ -552,7 +634,7 @@ mod tests {
|
||||
fn simple_send_and_receive() {
|
||||
let c = Channel::<NoopRawMutex, u32, 3>::new();
|
||||
assert!(c.try_send(1).is_ok());
|
||||
assert_eq!(c.try_recv().unwrap(), 1);
|
||||
assert_eq!(c.try_receive().unwrap(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -572,7 +654,7 @@ mod tests {
|
||||
let r: DynamicReceiver<'_, u32> = c.receiver().into();
|
||||
|
||||
assert!(s.try_send(1).is_ok());
|
||||
assert_eq!(r.try_recv().unwrap(), 1);
|
||||
assert_eq!(r.try_receive().unwrap(), 1);
|
||||
}
|
||||
|
||||
#[futures_test::test]
|
||||
@ -587,14 +669,14 @@ mod tests {
|
||||
assert!(c2.try_send(1).is_ok());
|
||||
})
|
||||
.is_ok());
|
||||
assert_eq!(c.recv().await, 1);
|
||||
assert_eq!(c.receive().await, 1);
|
||||
}
|
||||
|
||||
#[futures_test::test]
|
||||
async fn sender_send_completes_if_capacity() {
|
||||
let c = Channel::<CriticalSectionRawMutex, u32, 1>::new();
|
||||
c.send(1).await;
|
||||
assert_eq!(c.recv().await, 1);
|
||||
assert_eq!(c.receive().await, 1);
|
||||
}
|
||||
|
||||
#[futures_test::test]
|
||||
@ -612,11 +694,11 @@ mod tests {
|
||||
// Wish I could think of a means of determining that the async send is waiting instead.
|
||||
// However, I've used the debugger to observe that the send does indeed wait.
|
||||
Delay::new(Duration::from_millis(500)).await;
|
||||
assert_eq!(c.recv().await, 1);
|
||||
assert_eq!(c.receive().await, 1);
|
||||
assert!(executor
|
||||
.spawn(async move {
|
||||
loop {
|
||||
c.recv().await;
|
||||
c.receive().await;
|
||||
}
|
||||
})
|
||||
.is_ok());
|
||||
|
Reference in New Issue
Block a user