Add types for channel dynamic dispatch
* Add internal DynamicChannel trait implemented by Channel that allows polling for internal state in a lock safe manner and does not require knowing the channel size. * Existing usage of Sender and Receiver is preserved and does not use dynamic dispatch. * Add DynamicSender and DynamicReceiver types that references the channel using the DynamicChannel trait and does not require the const generic channel size parameter.
This commit is contained in:
parent
e844893095
commit
bc1dff34c0
@ -66,6 +66,57 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Send-only access to a [`Channel`] without knowing channel size.
|
||||||
|
#[derive(Copy)]
|
||||||
|
pub struct DynamicSender<'ch, M, T>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
channel: &'ch dyn DynamicChannel<M, T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'ch, M, T> Clone for DynamicSender<'ch, M, T>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
DynamicSender {
|
||||||
|
channel: self.channel,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for DynamicSender<'ch, M, T>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
fn from(s: Sender<'ch, M, T, N>) -> Self {
|
||||||
|
Self { channel: s.channel }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'ch, M, T> DynamicSender<'ch, M, T>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
/// Sends a value.
|
||||||
|
///
|
||||||
|
/// See [`Channel::send()`]
|
||||||
|
pub fn send(&self, message: T) -> DynamicSendFuture<'ch, M, T> {
|
||||||
|
DynamicSendFuture {
|
||||||
|
channel: self.channel,
|
||||||
|
message: Some(message),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Attempt to immediately send a message.
|
||||||
|
///
|
||||||
|
/// See [`Channel::send()`]
|
||||||
|
pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
|
||||||
|
self.channel.try_send_with_context(message, None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Receive-only access to a [`Channel`].
|
/// Receive-only access to a [`Channel`].
|
||||||
#[derive(Copy)]
|
#[derive(Copy)]
|
||||||
pub struct Receiver<'ch, M, T, const N: usize>
|
pub struct Receiver<'ch, M, T, const N: usize>
|
||||||
@ -105,6 +156,56 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Receive-only access to a [`Channel`] without knowing channel size.
|
||||||
|
#[derive(Copy)]
|
||||||
|
pub struct DynamicReceiver<'ch, M, T>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
channel: &'ch dyn DynamicChannel<M, T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'ch, M, T> Clone for DynamicReceiver<'ch, M, T>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
DynamicReceiver {
|
||||||
|
channel: self.channel,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'ch, M, T> DynamicReceiver<'ch, M, T>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
/// Receive the next value.
|
||||||
|
///
|
||||||
|
/// See [`Channel::recv()`].
|
||||||
|
pub fn recv(&self) -> DynamicRecvFuture<'_, M, T> {
|
||||||
|
DynamicRecvFuture {
|
||||||
|
channel: self.channel,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Attempt to immediately receive the next value.
|
||||||
|
///
|
||||||
|
/// See [`Channel::try_recv()`]
|
||||||
|
pub fn try_recv(&self) -> Result<T, TryRecvError> {
|
||||||
|
self.channel.try_recv_with_context(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for DynamicReceiver<'ch, M, T>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
fn from(s: Receiver<'ch, M, T, N>) -> Self {
|
||||||
|
Self { channel: s.channel }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct RecvFuture<'ch, M, T, const N: usize>
|
pub struct RecvFuture<'ch, M, T, const N: usize>
|
||||||
where
|
where
|
||||||
M: RawMutex,
|
M: RawMutex,
|
||||||
@ -119,11 +220,31 @@ where
|
|||||||
type Output = T;
|
type Output = T;
|
||||||
|
|
||||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
|
||||||
self.channel
|
match self.channel.try_recv_with_context(Some(cx)) {
|
||||||
.lock(|c| match c.try_recv_with_context(Some(cx)) {
|
Ok(v) => Poll::Ready(v),
|
||||||
Ok(v) => Poll::Ready(v),
|
Err(TryRecvError::Empty) => Poll::Pending,
|
||||||
Err(TryRecvError::Empty) => Poll::Pending,
|
}
|
||||||
})
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct DynamicRecvFuture<'ch, M, T>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
channel: &'ch dyn DynamicChannel<M, T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'ch, M, T> Future for DynamicRecvFuture<'ch, M, T>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
type Output = T;
|
||||||
|
|
||||||
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
|
||||||
|
match self.channel.try_recv_with_context(Some(cx)) {
|
||||||
|
Ok(v) => Poll::Ready(v),
|
||||||
|
Err(TryRecvError::Empty) => Poll::Pending,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -143,7 +264,7 @@ where
|
|||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
match self.message.take() {
|
match self.message.take() {
|
||||||
Some(m) => match self.channel.lock(|c| c.try_send_with_context(m, Some(cx))) {
|
Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
|
||||||
Ok(..) => Poll::Ready(()),
|
Ok(..) => Poll::Ready(()),
|
||||||
Err(TrySendError::Full(m)) => {
|
Err(TrySendError::Full(m)) => {
|
||||||
self.message = Some(m);
|
self.message = Some(m);
|
||||||
@ -157,6 +278,49 @@ where
|
|||||||
|
|
||||||
impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {}
|
impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {}
|
||||||
|
|
||||||
|
pub struct DynamicSendFuture<'ch, M, T>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
channel: &'ch dyn DynamicChannel<M, T>,
|
||||||
|
message: Option<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'ch, M, T> Future for DynamicSendFuture<'ch, M, T>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
type Output = ();
|
||||||
|
|
||||||
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
match self.message.take() {
|
||||||
|
Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
|
||||||
|
Ok(..) => Poll::Ready(()),
|
||||||
|
Err(TrySendError::Full(m)) => {
|
||||||
|
self.message = Some(m);
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
},
|
||||||
|
None => panic!("Message cannot be None"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'ch, M, T> Unpin for DynamicSendFuture<'ch, M, T> where M: RawMutex {}
|
||||||
|
|
||||||
|
trait DynamicChannel<M, T>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
fn try_send_with_context(
|
||||||
|
&self,
|
||||||
|
message: T,
|
||||||
|
cx: Option<&mut Context<'_>>,
|
||||||
|
) -> Result<(), TrySendError<T>>;
|
||||||
|
|
||||||
|
fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError>;
|
||||||
|
}
|
||||||
|
|
||||||
/// Error returned by [`try_recv`](Channel::try_recv).
|
/// Error returned by [`try_recv`](Channel::try_recv).
|
||||||
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
|
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
|
||||||
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
|
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
|
||||||
@ -287,6 +451,18 @@ where
|
|||||||
self.inner.lock(|rc| f(&mut *rc.borrow_mut()))
|
self.inner.lock(|rc| f(&mut *rc.borrow_mut()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
|
||||||
|
self.lock(|c| c.try_recv_with_context(cx))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_send_with_context(
|
||||||
|
&self,
|
||||||
|
m: T,
|
||||||
|
cx: Option<&mut Context<'_>>,
|
||||||
|
) -> Result<(), TrySendError<T>> {
|
||||||
|
self.lock(|c| c.try_send_with_context(m, cx))
|
||||||
|
}
|
||||||
|
|
||||||
/// Get a sender for this channel.
|
/// Get a sender for this channel.
|
||||||
pub fn sender(&self) -> Sender<'_, M, T, N> {
|
pub fn sender(&self) -> Sender<'_, M, T, N> {
|
||||||
Sender { channel: self }
|
Sender { channel: self }
|
||||||
@ -339,6 +515,25 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the
|
||||||
|
/// tradeoff cost of dynamic dispatch.
|
||||||
|
impl<M, T, const N: usize> DynamicChannel<M, T> for Channel<M, T, N>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
fn try_send_with_context(
|
||||||
|
&self,
|
||||||
|
m: T,
|
||||||
|
cx: Option<&mut Context<'_>>,
|
||||||
|
) -> Result<(), TrySendError<T>> {
|
||||||
|
Channel::try_send_with_context(self, m, cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
|
||||||
|
Channel::try_recv_with_context(self, cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use core::time::Duration;
|
use core::time::Duration;
|
||||||
@ -411,6 +606,16 @@ mod tests {
|
|||||||
let _ = s1.clone();
|
let _ = s1.clone();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn dynamic_dispatch() {
|
||||||
|
let c = Channel::<NoopRawMutex, u32, 3>::new();
|
||||||
|
let s: DynamicSender<'_, NoopRawMutex, u32> = c.sender().into();
|
||||||
|
let r: DynamicReceiver<'_, NoopRawMutex, u32> = c.receiver().into();
|
||||||
|
|
||||||
|
assert!(s.try_send(1).is_ok());
|
||||||
|
assert_eq!(r.try_recv().unwrap(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
#[futures_test::test]
|
#[futures_test::test]
|
||||||
async fn receiver_receives_given_try_send_async() {
|
async fn receiver_receives_given_try_send_async() {
|
||||||
let executor = ThreadPool::new().unwrap();
|
let executor = ThreadPool::new().unwrap();
|
||||||
|
Loading…
Reference in New Issue
Block a user