Merge pull request #337 from lulf/introduce-future-types

Introduce future types
This commit is contained in:
Dario Nieuwenhuis 2021-08-02 13:20:29 +02:00 committed by GitHub
commit c458ad52e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -156,18 +156,10 @@ where
/// closed by `recv` until they are all consumed. /// closed by `recv` until they are all consumed.
/// ///
/// [`close`]: Self::close /// [`close`]: Self::close
pub async fn recv(&mut self) -> Option<T> { pub fn recv<'m>(&'m mut self) -> RecvFuture<'m, M, T, N> {
futures::future::poll_fn(|cx| self.recv_poll(cx)).await RecvFuture {
} channel_cell: self.channel_cell,
}
fn recv_poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
Channel::lock(self.channel_cell, |c| {
match c.try_recv_with_context(Some(cx)) {
Ok(v) => Poll::Ready(Some(v)),
Err(TryRecvError::Closed) => Poll::Ready(None),
Err(TryRecvError::Empty) => Poll::Pending,
}
})
} }
/// Attempts to immediately receive a message on this `Receiver` /// Attempts to immediately receive a message on this `Receiver`
@ -202,6 +194,40 @@ where
} }
} }
pub struct RecvFuture<'ch, M, T, const N: usize>
where
M: Mutex<Data = ()>,
{
channel_cell: &'ch UnsafeCell<ChannelCell<M, T, N>>,
}
impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N>
where
M: Mutex<Data = ()>,
{
type Output = Option<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
Channel::lock(self.channel_cell, |c| {
match c.try_recv_with_context(Some(cx)) {
Ok(v) => Poll::Ready(Some(v)),
Err(TryRecvError::Closed) => Poll::Ready(None),
Err(TryRecvError::Empty) => Poll::Pending,
}
})
}
}
// Safe to pass the receive future around since it locks channel whenever polled
unsafe impl<'ch, M, T, const N: usize> Send for RecvFuture<'ch, M, T, N> where
M: Mutex<Data = ()> + Sync
{
}
unsafe impl<'ch, M, T, const N: usize> Sync for RecvFuture<'ch, M, T, N> where
M: Mutex<Data = ()> + Sync
{
}
impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N>
where where
M: Mutex<Data = ()>, M: Mutex<Data = ()>,
@ -224,12 +250,11 @@ where
/// ///
/// [`close`]: Receiver::close /// [`close`]: Receiver::close
/// [`Receiver`]: Receiver /// [`Receiver`]: Receiver
pub async fn send(&self, message: T) -> Result<(), SendError<T>> { pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> {
SendFuture { SendFuture {
sender: self.clone(), sender: self.clone(),
message: Some(message), message: Some(message),
} }
.await
} }
/// Attempts to immediately send a message on this `Sender` /// Attempts to immediately send a message on this `Sender`
@ -278,7 +303,7 @@ where
} }
} }
struct SendFuture<'ch, M, T, const N: usize> pub struct SendFuture<'ch, M, T, const N: usize>
where where
M: Mutex<Data = ()>, M: Mutex<Data = ()>,
{ {