diff --git a/embassy/src/channel/channel.rs b/embassy/src/channel/channel.rs new file mode 100644 index 00000000..9084cd57 --- /dev/null +++ b/embassy/src/channel/channel.rs @@ -0,0 +1,430 @@ +//! A queue for sending values between asynchronous tasks. +//! +//! It can be used concurrently by multiple producers (senders) and multiple +//! consumers (receivers), i.e. it is an "MPMC channel". +//! +//! This queue takes a Mutex type so that various +//! targets can be attained. For example, a ThreadModeMutex can be used +//! for single-core Cortex-M targets where messages are only passed +//! between tasks running in thread mode. Similarly, a CriticalSectionMutex +//! can also be used for single-core targets where messages are to be +//! passed from exception mode e.g. out of an interrupt handler. +//! +//! This module provides a bounded channel that has a limit on the number of +//! messages that it can store, and if this limit is reached, trying to send +//! another message will result in an error being returned. +//! + +use core::cell::RefCell; +use core::pin::Pin; +use core::task::Context; +use core::task::Poll; + +use futures::Future; +use heapless::Deque; + +use crate::blocking_mutex::raw::RawMutex; +use crate::blocking_mutex::Mutex; +use crate::waitqueue::WakerRegistration; + +/// Send-only access to a [`Channel`]. +#[derive(Copy, Clone)] +pub struct Sender<'ch, M, T, const N: usize> +where + M: RawMutex, +{ + channel: &'ch Channel, +} + +impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> +where + M: RawMutex, +{ + /// Sends a value. + /// + /// See [`Channel::send()`] + pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> { + self.channel.send(message) + } + + /// Attempt to immediately send a message. + /// + /// See [`Channel::send()`] + pub fn try_send(&self, message: T) -> Result<(), TrySendError> { + self.channel.try_send(message) + } +} + +/// Receive-only access to a [`Channel`]. +#[derive(Copy, Clone)] +pub struct Receiver<'ch, M, T, const N: usize> +where + M: RawMutex, +{ + channel: &'ch Channel, +} + +impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N> +where + M: RawMutex, +{ + /// Receive the next value. + /// + /// See [`Channel::recv()`]. + pub fn recv(&self) -> RecvFuture<'_, M, T, N> { + self.channel.recv() + } + + /// Attempt to immediately receive the next value. + /// + /// See [`Channel::try_recv()`] + pub fn try_recv(&self) -> Result { + self.channel.try_recv() + } +} + +pub struct RecvFuture<'ch, M, T, const N: usize> +where + M: RawMutex, +{ + channel: &'ch Channel, +} + +impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N> +where + M: RawMutex, +{ + type Output = T; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.channel + .lock(|c| match c.try_recv_with_context(Some(cx)) { + Ok(v) => Poll::Ready(v), + Err(TryRecvError::Empty) => Poll::Pending, + }) + } +} + +pub struct SendFuture<'ch, M, T, const N: usize> +where + M: RawMutex, +{ + channel: &'ch Channel, + message: Option, +} + +impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N> +where + M: RawMutex, +{ + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.message.take() { + Some(m) => match self.channel.lock(|c| c.try_send_with_context(m, Some(cx))) { + Ok(..) => Poll::Ready(()), + Err(TrySendError::Full(m)) => { + self.message = Some(m); + Poll::Pending + } + }, + None => panic!("Message cannot be None"), + } + } +} + +impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {} + +/// Error returned by [`try_recv`](Channel::try_recv). +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum TryRecvError { + /// A message could not be received because the channel is empty. + Empty, +} + +/// Error returned by [`try_send`](Channel::try_send). +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum TrySendError { + /// The data could not be sent on the channel because the channel is + /// currently full and sending would require blocking. + Full(T), +} + +struct ChannelState { + queue: Deque, + receiver_waker: WakerRegistration, + senders_waker: WakerRegistration, +} + +impl ChannelState { + const fn new() -> Self { + ChannelState { + queue: Deque::new(), + receiver_waker: WakerRegistration::new(), + senders_waker: WakerRegistration::new(), + } + } + + fn try_recv(&mut self) -> Result { + self.try_recv_with_context(None) + } + + fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result { + if self.queue.is_full() { + self.senders_waker.wake(); + } + + if let Some(message) = self.queue.pop_front() { + Ok(message) + } else { + if let Some(cx) = cx { + self.receiver_waker.register(cx.waker()); + } + Err(TryRecvError::Empty) + } + } + + fn try_send(&mut self, message: T) -> Result<(), TrySendError> { + self.try_send_with_context(message, None) + } + + fn try_send_with_context( + &mut self, + message: T, + cx: Option<&mut Context<'_>>, + ) -> Result<(), TrySendError> { + match self.queue.push_back(message) { + Ok(()) => { + self.receiver_waker.wake(); + Ok(()) + } + Err(message) => { + if let Some(cx) = cx { + self.senders_waker.register(cx.waker()); + } + Err(TrySendError::Full(message)) + } + } + } +} + +/// A bounded channel for communicating between asynchronous tasks +/// with backpressure. +/// +/// The channel will buffer up to the provided number of messages. Once the +/// buffer is full, attempts to `send` new messages will wait until a message is +/// received from the channel. +/// +/// All data sent will become available in the same order as it was sent. +pub struct Channel +where + M: RawMutex, +{ + inner: Mutex>>, +} + +impl Channel +where + M: RawMutex, +{ + /// Establish a new bounded channel. For example, to create one with a NoopMutex: + /// + /// ``` + /// use embassy::channel::channel::Channel; + /// use embassy::blocking_mutex::raw::NoopRawMutex; + /// + /// // Declare a bounded channel of 3 u32s. + /// let mut channel = Channel::::new(); + /// ``` + #[cfg(feature = "nightly")] + pub const fn new() -> Self { + Self { + inner: Mutex::new(RefCell::new(ChannelState::new())), + } + } + + /// Establish a new bounded channel. For example, to create one with a NoopMutex: + /// + /// ``` + /// use embassy::channel::channel::Channel; + /// use embassy::blocking_mutex::raw::NoopRawMutex; + /// + /// // Declare a bounded channel of 3 u32s. + /// let mut channel = Channel::::new(); + /// ``` + #[cfg(not(feature = "nightly"))] + pub fn new() -> Self { + Self { + inner: Mutex::new(RefCell::new(ChannelState::new())), + } + } + + fn lock(&self, f: impl FnOnce(&mut ChannelState) -> R) -> R { + self.inner.lock(|rc| f(&mut *rc.borrow_mut())) + } + + /// Get a sender for this channel. + pub fn sender(&self) -> Sender<'_, M, T, N> { + Sender { channel: self } + } + + /// Get a receiver for this channel. + pub fn receiver(&self) -> Receiver<'_, M, T, N> { + Receiver { channel: self } + } + + /// Send a value, waiting until there is capacity. + /// + /// Sending completes when the value has been pushed to the channel's queue. + /// This doesn't mean the value has been received yet. + pub fn send(&self, message: T) -> SendFuture<'_, M, T, N> { + SendFuture { + channel: self, + message: Some(message), + } + } + + /// Attempt to immediately send a message. + /// + /// This method differs from [`send`] by returning immediately if the channel's + /// buffer is full, instead of waiting. + /// + /// # Errors + /// + /// If the channel capacity has been reached, i.e., the channel has `n` + /// buffered values where `n` is the argument passed to [`Channel`], then an + /// error is returned. + pub fn try_send(&self, message: T) -> Result<(), TrySendError> { + self.lock(|c| c.try_send(message)) + } + + /// Receive the next value. + /// + /// If there are no messages in the channel's buffer, this method will + /// wait until a message is sent. + pub fn recv(&self) -> RecvFuture<'_, M, T, N> { + RecvFuture { channel: self } + } + + /// Attempt to immediately receive a message. + /// + /// This method will either receive a message from the channel immediately or return an error + /// if the channel is empty. + pub fn try_recv(&self) -> Result { + self.lock(|c| c.try_recv()) + } +} + +#[cfg(test)] +mod tests { + use core::time::Duration; + + use futures::task::SpawnExt; + use futures_executor::ThreadPool; + use futures_timer::Delay; + + use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; + use crate::util::Forever; + + use super::*; + + fn capacity(c: &ChannelState) -> usize { + c.queue.capacity() - c.queue.len() + } + + #[test] + fn sending_once() { + let mut c = ChannelState::::new(); + assert!(c.try_send(1).is_ok()); + assert_eq!(capacity(&c), 2); + } + + #[test] + fn sending_when_full() { + let mut c = ChannelState::::new(); + let _ = c.try_send(1); + let _ = c.try_send(1); + let _ = c.try_send(1); + match c.try_send(2) { + Err(TrySendError::Full(2)) => assert!(true), + _ => assert!(false), + } + assert_eq!(capacity(&c), 0); + } + + #[test] + fn receiving_once_with_one_send() { + let mut c = ChannelState::::new(); + assert!(c.try_send(1).is_ok()); + assert_eq!(c.try_recv().unwrap(), 1); + assert_eq!(capacity(&c), 3); + } + + #[test] + fn receiving_when_empty() { + let mut c = ChannelState::::new(); + match c.try_recv() { + Err(TryRecvError::Empty) => assert!(true), + _ => assert!(false), + } + assert_eq!(capacity(&c), 3); + } + + #[test] + fn simple_send_and_receive() { + let c = Channel::::new(); + assert!(c.try_send(1).is_ok()); + assert_eq!(c.try_recv().unwrap(), 1); + } + + #[futures_test::test] + async fn receiver_receives_given_try_send_async() { + let executor = ThreadPool::new().unwrap(); + + static CHANNEL: Forever> = Forever::new(); + let c = &*CHANNEL.put(Channel::new()); + let c2 = c; + assert!(executor + .spawn(async move { + assert!(c2.try_send(1).is_ok()); + }) + .is_ok()); + assert_eq!(c.recv().await, 1); + } + + #[futures_test::test] + async fn sender_send_completes_if_capacity() { + let c = Channel::::new(); + c.send(1).await; + assert_eq!(c.recv().await, 1); + } + + #[futures_test::test] + async fn senders_sends_wait_until_capacity() { + let executor = ThreadPool::new().unwrap(); + + static CHANNEL: Forever> = Forever::new(); + let c = &*CHANNEL.put(Channel::new()); + assert!(c.try_send(1).is_ok()); + + let c2 = c; + let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await }); + let c2 = c; + let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await }); + // Wish I could think of a means of determining that the async send is waiting instead. + // However, I've used the debugger to observe that the send does indeed wait. + Delay::new(Duration::from_millis(500)).await; + assert_eq!(c.recv().await, 1); + assert!(executor + .spawn(async move { + loop { + c.recv().await; + } + }) + .is_ok()); + send_task_1.unwrap().await; + send_task_2.unwrap().await; + } +} diff --git a/embassy/src/channel/mod.rs b/embassy/src/channel/mod.rs index 9e8c67ee..e51a442d 100644 --- a/embassy/src/channel/mod.rs +++ b/embassy/src/channel/mod.rs @@ -1,4 +1,4 @@ //! Async channels -pub mod mpsc; +pub mod channel; pub mod signal; diff --git a/embassy/src/channel/mpsc.rs b/embassy/src/channel/mpsc.rs deleted file mode 100644 index 32787d81..00000000 --- a/embassy/src/channel/mpsc.rs +++ /dev/null @@ -1,822 +0,0 @@ -//! A multi-producer, single-consumer queue for sending values between -//! asynchronous tasks. This queue takes a Mutex type so that various -//! targets can be attained. For example, a ThreadModeMutex can be used -//! for single-core Cortex-M targets where messages are only passed -//! between tasks running in thread mode. Similarly, a CriticalSectionMutex -//! can also be used for single-core targets where messages are to be -//! passed from exception mode e.g. out of an interrupt handler. -//! -//! This module provides a bounded channel that has a limit on the number of -//! messages that it can store, and if this limit is reached, trying to send -//! another message will result in an error being returned. -//! -//! Similar to the `mpsc` channels provided by `std`, the channel constructor -//! functions provide separate send and receive handles, [`Sender`] and -//! [`Receiver`]. If there is no message to read, the current task will be -//! notified when a new value is sent. [`Sender`] allows sending values into -//! the channel. If the bounded channel is at capacity, the send is rejected. -//! -//! # Disconnection -//! -//! When all [`Sender`] handles have been dropped, it is no longer -//! possible to send values into the channel. This is considered the termination -//! event of the stream. -//! -//! If the [`Receiver`] handle is dropped, then messages can no longer -//! be read out of the channel. In this case, all further attempts to send will -//! result in an error. -//! -//! # Clean Shutdown -//! -//! When the [`Receiver`] is dropped, it is possible for unprocessed messages to -//! remain in the channel. Instead, it is usually desirable to perform a "clean" -//! shutdown. To do this, the receiver first calls `close`, which will prevent -//! any further messages to be sent into the channel. Then, the receiver -//! consumes the channel to completion, at which point the receiver can be -//! dropped. -//! -//! This channel and its associated types were derived from - -use core::cell::RefCell; -use core::fmt; -use core::pin::Pin; -use core::task::Context; -use core::task::Poll; -use core::task::Waker; - -use futures::Future; -use heapless::Deque; - -use crate::blocking_mutex::raw::RawMutex; -use crate::blocking_mutex::Mutex; -use crate::waitqueue::WakerRegistration; - -/// Send values to the associated `Receiver`. -/// -/// Instances are created by the [`split`](split) function. -pub struct Sender<'ch, M, T, const N: usize> -where - M: RawMutex, -{ - channel: &'ch Channel, -} - -/// Receive values from the associated `Sender`. -/// -/// Instances are created by the [`split`](split) function. -pub struct Receiver<'ch, M, T, const N: usize> -where - M: RawMutex, -{ - channel: &'ch Channel, -} - -/// Splits a bounded mpsc channel into a `Sender` and `Receiver`. -/// -/// All data sent on `Sender` will become available on `Receiver` in the same -/// order as it was sent. -/// -/// The `Sender` can be cloned to `send` to the same channel from multiple code -/// locations. Only one `Receiver` is valid. -/// -/// If the `Receiver` is disconnected while trying to `send`, the `send` method -/// will return a `SendError`. Similarly, if `Sender` is disconnected while -/// trying to `recv`, the `recv` method will return a `RecvError`. -/// -/// Note that when splitting the channel, the sender and receiver cannot outlive -/// their channel. The following will therefore fail compilation: -//// -/// ```compile_fail -/// use embassy::channel::mpsc; -/// use embassy::channel::mpsc::{Channel, WithThreadModeOnly}; -/// -/// let (sender, receiver) = { -/// let mut channel = Channel::::with_thread_mode_only(); -/// mpsc::split(&mut channel) -/// }; -/// ``` -pub fn split( - channel: &mut Channel, -) -> (Sender, Receiver) -where - M: RawMutex, -{ - let sender = Sender { channel }; - let receiver = Receiver { channel }; - channel.lock(|c| { - c.register_receiver(); - c.register_sender(); - }); - (sender, receiver) -} - -impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N> -where - M: RawMutex, -{ - /// Receives the next value for this receiver. - /// - /// This method returns `None` if the channel has been closed and there are - /// no remaining messages in the channel's buffer. This indicates that no - /// further values can ever be received from this `Receiver`. The channel is - /// closed when all senders have been dropped, or when [`close`] is called. - /// - /// If there are no messages in the channel's buffer, but the channel has - /// not yet been closed, this method will sleep until a message is sent or - /// the channel is closed. - /// - /// Note that if [`close`] is called, but there are still outstanding - /// messages from before it was closed, the channel is not considered - /// closed by `recv` until they are all consumed. - /// - /// [`close`]: Self::close - pub fn recv(&mut self) -> RecvFuture<'_, M, T, N> { - RecvFuture { - channel: self.channel, - } - } - - /// Attempts to immediately receive a message on this `Receiver` - /// - /// This method will either receive a message from the channel immediately or return an error - /// if the channel is empty. - pub fn try_recv(&self) -> Result { - self.channel.lock(|c| c.try_recv()) - } - - /// Closes the receiving half of a channel without dropping it. - /// - /// This prevents any further messages from being sent on the channel while - /// still enabling the receiver to drain messages that are buffered. - /// - /// To guarantee that no messages are dropped, after calling `close()`, - /// `recv()` must be called until `None` is returned. If there are - /// outstanding messages, the `recv` method will not return `None` - /// until those are released. - /// - pub fn close(&mut self) { - self.channel.lock(|c| c.close()) - } -} - -impl<'ch, M, T, const N: usize> Drop for Receiver<'ch, M, T, N> -where - M: RawMutex, -{ - fn drop(&mut self) { - self.channel.lock(|c| c.deregister_receiver()) - } -} - -pub struct RecvFuture<'ch, M, T, const N: usize> -where - M: RawMutex, -{ - channel: &'ch Channel, -} - -impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N> -where - M: RawMutex, -{ - type Output = Option; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.channel - .lock(|c| match c.try_recv_with_context(Some(cx)) { - Ok(v) => Poll::Ready(Some(v)), - Err(TryRecvError::Closed) => Poll::Ready(None), - Err(TryRecvError::Empty) => Poll::Pending, - }) - } -} - -impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> -where - M: RawMutex, -{ - /// Sends a value, waiting until there is capacity. - /// - /// A successful send occurs when it is determined that the other end of the - /// channel has not hung up already. An unsuccessful send would be one where - /// the corresponding receiver has already been closed. Note that a return - /// value of `Err` means that the data will never be received, but a return - /// value of `Ok` does not mean that the data will be received. It is - /// possible for the corresponding receiver to hang up immediately after - /// this function returns `Ok`. - /// - /// # Errors - /// - /// If the receive half of the channel is closed, either due to [`close`] - /// being called or the [`Receiver`] handle dropping, the function returns - /// an error. The error includes the value passed to `send`. - /// - /// [`close`]: Receiver::close - /// [`Receiver`]: Receiver - pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> { - SendFuture { - channel: self.channel, - message: Some(message), - } - } - - /// Attempts to immediately send a message on this `Sender` - /// - /// This method differs from [`send`] by returning immediately if the channel's - /// buffer is full or no receiver is waiting to acquire some data. Compared - /// with [`send`], this function has two failure cases instead of one (one for - /// disconnection, one for a full buffer). - /// - /// # Errors - /// - /// If the channel capacity has been reached, i.e., the channel has `n` - /// buffered values where `n` is the argument passed to [`channel`], then an - /// error is returned. - /// - /// If the receive half of the channel is closed, either due to [`close`] - /// being called or the [`Receiver`] handle dropping, the function returns - /// an error. The error includes the value passed to `send`. - /// - /// [`send`]: Sender::send - /// [`channel`]: channel - /// [`close`]: Receiver::close - pub fn try_send(&self, message: T) -> Result<(), TrySendError> { - self.channel.lock(|c| c.try_send(message)) - } - - /// Completes when the receiver has dropped. - /// - /// This allows the producers to get notified when interest in the produced - /// values is canceled and immediately stop doing work. - pub async fn closed(&self) { - CloseFuture { - channel: self.channel, - } - .await - } - - /// Checks if the channel has been closed. This happens when the - /// [`Receiver`] is dropped, or when the [`Receiver::close`] method is - /// called. - /// - /// [`Receiver`]: Receiver - /// [`Receiver::close`]: Receiver::close - pub fn is_closed(&self) -> bool { - self.channel.lock(|c| c.is_closed()) - } -} - -pub struct SendFuture<'ch, M, T, const N: usize> -where - M: RawMutex, -{ - channel: &'ch Channel, - message: Option, -} - -impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N> -where - M: RawMutex, -{ - type Output = Result<(), SendError>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.message.take() { - Some(m) => match self.channel.lock(|c| c.try_send_with_context(m, Some(cx))) { - Ok(..) => Poll::Ready(Ok(())), - Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))), - Err(TrySendError::Full(m)) => { - self.message = Some(m); - Poll::Pending - } - }, - None => panic!("Message cannot be None"), - } - } -} - -impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {} - -struct CloseFuture<'ch, M, T, const N: usize> -where - M: RawMutex, -{ - channel: &'ch Channel, -} - -impl<'ch, M, T, const N: usize> Future for CloseFuture<'ch, M, T, N> -where - M: RawMutex, -{ - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if self.channel.lock(|c| c.is_closed_with_context(Some(cx))) { - Poll::Ready(()) - } else { - Poll::Pending - } - } -} - -impl<'ch, M, T, const N: usize> Drop for Sender<'ch, M, T, N> -where - M: RawMutex, -{ - fn drop(&mut self) { - self.channel.lock(|c| c.deregister_sender()) - } -} - -impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N> -where - M: RawMutex, -{ - fn clone(&self) -> Self { - self.channel.lock(|c| c.register_sender()); - Sender { - channel: self.channel, - } - } -} - -/// An error returned from the [`try_recv`] method. -/// -/// [`try_recv`]: Receiver::try_recv -#[derive(PartialEq, Eq, Clone, Copy, Debug)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub enum TryRecvError { - /// A message could not be received because the channel is empty. - Empty, - - /// The message could not be received because the channel is empty and closed. - Closed, -} - -/// Error returned by the `Sender`. -#[derive(Debug)] -pub struct SendError(pub T); - -impl fmt::Display for SendError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "channel closed") - } -} - -#[cfg(feature = "defmt")] -impl defmt::Format for SendError { - fn format(&self, fmt: defmt::Formatter<'_>) { - defmt::write!(fmt, "channel closed") - } -} - -/// This enumeration is the list of the possible error outcomes for the -/// [try_send](Sender::try_send) method. -#[derive(Debug)] -pub enum TrySendError { - /// The data could not be sent on the channel because the channel is - /// currently full and sending would require blocking. - Full(T), - - /// The receive half of the channel was explicitly closed or has been - /// dropped. - Closed(T), -} - -impl fmt::Display for TrySendError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - fmt, - "{}", - match self { - TrySendError::Full(..) => "no available capacity", - TrySendError::Closed(..) => "channel closed", - } - ) - } -} - -#[cfg(feature = "defmt")] -impl defmt::Format for TrySendError { - fn format(&self, fmt: defmt::Formatter<'_>) { - match self { - TrySendError::Full(..) => defmt::write!(fmt, "no available capacity"), - TrySendError::Closed(..) => defmt::write!(fmt, "channel closed"), - } - } -} - -struct ChannelState { - queue: Deque, - closed: bool, - receiver_registered: bool, - senders_registered: u32, - receiver_waker: WakerRegistration, - senders_waker: WakerRegistration, -} - -impl ChannelState { - const fn new() -> Self { - ChannelState { - queue: Deque::new(), - closed: false, - receiver_registered: false, - senders_registered: 0, - receiver_waker: WakerRegistration::new(), - senders_waker: WakerRegistration::new(), - } - } - - fn try_recv(&mut self) -> Result { - self.try_recv_with_context(None) - } - - fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result { - if self.queue.is_full() { - self.senders_waker.wake(); - } - - if let Some(message) = self.queue.pop_front() { - Ok(message) - } else if !self.closed { - if let Some(cx) = cx { - self.set_receiver_waker(cx.waker()); - } - Err(TryRecvError::Empty) - } else { - Err(TryRecvError::Closed) - } - } - - fn try_send(&mut self, message: T) -> Result<(), TrySendError> { - self.try_send_with_context(message, None) - } - - fn try_send_with_context( - &mut self, - message: T, - cx: Option<&mut Context<'_>>, - ) -> Result<(), TrySendError> { - if self.closed { - return Err(TrySendError::Closed(message)); - } - - match self.queue.push_back(message) { - Ok(()) => { - self.receiver_waker.wake(); - - Ok(()) - } - Err(message) => { - cx.into_iter() - .for_each(|cx| self.set_senders_waker(cx.waker())); - Err(TrySendError::Full(message)) - } - } - } - - fn close(&mut self) { - self.receiver_waker.wake(); - self.closed = true; - } - - fn is_closed(&mut self) -> bool { - self.is_closed_with_context(None) - } - - fn is_closed_with_context(&mut self, cx: Option<&mut Context<'_>>) -> bool { - if self.closed { - cx.into_iter() - .for_each(|cx| self.set_senders_waker(cx.waker())); - true - } else { - false - } - } - - fn register_receiver(&mut self) { - assert!(!self.receiver_registered); - self.receiver_registered = true; - } - - fn deregister_receiver(&mut self) { - if self.receiver_registered { - self.closed = true; - self.senders_waker.wake(); - } - self.receiver_registered = false; - } - - fn register_sender(&mut self) { - self.senders_registered += 1; - } - - fn deregister_sender(&mut self) { - assert!(self.senders_registered > 0); - self.senders_registered -= 1; - if self.senders_registered == 0 { - self.receiver_waker.wake(); - self.closed = true; - } - } - - fn set_receiver_waker(&mut self, receiver_waker: &Waker) { - self.receiver_waker.register(receiver_waker); - } - - fn set_senders_waker(&mut self, senders_waker: &Waker) { - // Dispose of any existing sender causing them to be polled again. - // This could cause a spin given multiple concurrent senders, however given that - // most sends only block waiting for the receiver to become active, this should - // be a short-lived activity. The upside is a greatly simplified implementation - // that avoids the need for intrusive linked-lists and unsafe operations on pinned - // pointers. - self.senders_waker.wake(); - self.senders_waker.register(senders_waker); - } -} - -/// A a bounded mpsc channel for communicating between asynchronous tasks -/// with backpressure. -/// -/// The channel will buffer up to the provided number of messages. Once the -/// buffer is full, attempts to `send` new messages will wait until a message is -/// received from the channel. -/// -/// All data sent will become available in the same order as it was sent. -pub struct Channel -where - M: RawMutex, -{ - inner: Mutex>>, -} - -impl Channel -where - M: RawMutex, -{ - /// Establish a new bounded channel. For example, to create one with a NoopMutex: - /// - /// ``` - /// use embassy::channel::mpsc; - /// use embassy::blocking_mutex::raw::NoopRawMutex; - /// use embassy::channel::mpsc::Channel; - /// - /// // Declare a bounded channel of 3 u32s. - /// let mut channel = Channel::::new(); - /// // once we have a channel, obtain its sender and receiver - /// let (sender, receiver) = mpsc::split(&mut channel); - /// ``` - #[cfg(feature = "nightly")] - pub const fn new() -> Self { - Self { - inner: Mutex::new(RefCell::new(ChannelState::new())), - } - } - - /// Establish a new bounded channel. For example, to create one with a NoopMutex: - /// - /// ``` - /// use embassy::channel::mpsc; - /// use embassy::blocking_mutex::raw::NoopRawMutex; - /// use embassy::channel::mpsc::Channel; - /// - /// // Declare a bounded channel of 3 u32s. - /// let mut channel = Channel::::new(); - /// // once we have a channel, obtain its sender and receiver - /// let (sender, receiver) = mpsc::split(&mut channel); - /// ``` - #[cfg(not(feature = "nightly"))] - pub fn new() -> Self { - Self { - inner: Mutex::new(RefCell::new(ChannelState::new())), - } - } - - fn lock(&self, f: impl FnOnce(&mut ChannelState) -> R) -> R { - self.inner.lock(|rc| f(&mut *rc.borrow_mut())) - } -} - -#[cfg(test)] -mod tests { - use core::time::Duration; - - use futures::task::SpawnExt; - use futures_executor::ThreadPool; - use futures_timer::Delay; - - use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; - use crate::util::Forever; - - use super::*; - - fn capacity(c: &ChannelState) -> usize { - c.queue.capacity() - c.queue.len() - } - - #[test] - fn sending_once() { - let mut c = ChannelState::::new(); - assert!(c.try_send(1).is_ok()); - assert_eq!(capacity(&c), 2); - } - - #[test] - fn sending_when_full() { - let mut c = ChannelState::::new(); - let _ = c.try_send(1); - let _ = c.try_send(1); - let _ = c.try_send(1); - match c.try_send(2) { - Err(TrySendError::Full(2)) => assert!(true), - _ => assert!(false), - } - assert_eq!(capacity(&c), 0); - } - - #[test] - fn sending_when_closed() { - let mut c = ChannelState::::new(); - c.closed = true; - match c.try_send(2) { - Err(TrySendError::Closed(2)) => assert!(true), - _ => assert!(false), - } - } - - #[test] - fn receiving_once_with_one_send() { - let mut c = ChannelState::::new(); - assert!(c.try_send(1).is_ok()); - assert_eq!(c.try_recv().unwrap(), 1); - assert_eq!(capacity(&c), 3); - } - - #[test] - fn receiving_when_empty() { - let mut c = ChannelState::::new(); - match c.try_recv() { - Err(TryRecvError::Empty) => assert!(true), - _ => assert!(false), - } - assert_eq!(capacity(&c), 3); - } - - #[test] - fn receiving_when_closed() { - let mut c = ChannelState::::new(); - c.closed = true; - match c.try_recv() { - Err(TryRecvError::Closed) => assert!(true), - _ => assert!(false), - } - } - - #[test] - fn simple_send_and_receive() { - let mut c = Channel::::new(); - let (s, r) = split(&mut c); - assert!(s.clone().try_send(1).is_ok()); - assert_eq!(r.try_recv().unwrap(), 1); - } - - #[test] - fn should_close_without_sender() { - let mut c = Channel::::new(); - let (s, r) = split(&mut c); - drop(s); - match r.try_recv() { - Err(TryRecvError::Closed) => assert!(true), - _ => assert!(false), - } - } - - #[test] - fn should_close_once_drained() { - let mut c = Channel::::new(); - let (s, r) = split(&mut c); - assert!(s.try_send(1).is_ok()); - drop(s); - assert_eq!(r.try_recv().unwrap(), 1); - match r.try_recv() { - Err(TryRecvError::Closed) => assert!(true), - _ => assert!(false), - } - } - - #[test] - fn should_reject_send_when_receiver_dropped() { - let mut c = Channel::::new(); - let (s, r) = split(&mut c); - drop(r); - match s.try_send(1) { - Err(TrySendError::Closed(1)) => assert!(true), - _ => assert!(false), - } - } - - #[test] - fn should_reject_send_when_channel_closed() { - let mut c = Channel::::new(); - let (s, mut r) = split(&mut c); - assert!(s.try_send(1).is_ok()); - r.close(); - assert_eq!(r.try_recv().unwrap(), 1); - match r.try_recv() { - Err(TryRecvError::Closed) => assert!(true), - _ => assert!(false), - } - assert!(s.is_closed()); - } - - #[futures_test::test] - async fn receiver_closes_when_sender_dropped_async() { - let executor = ThreadPool::new().unwrap(); - - static CHANNEL: Forever> = Forever::new(); - let c = CHANNEL.put(Channel::new()); - let (s, mut r) = split(c); - assert!(executor - .spawn(async move { - drop(s); - }) - .is_ok()); - assert_eq!(r.recv().await, None); - } - - #[futures_test::test] - async fn receiver_receives_given_try_send_async() { - let executor = ThreadPool::new().unwrap(); - - static CHANNEL: Forever> = Forever::new(); - let c = CHANNEL.put(Channel::new()); - let (s, mut r) = split(c); - assert!(executor - .spawn(async move { - assert!(s.try_send(1).is_ok()); - }) - .is_ok()); - assert_eq!(r.recv().await, Some(1)); - } - - #[futures_test::test] - async fn sender_send_completes_if_capacity() { - let mut c = Channel::::new(); - let (s, mut r) = split(&mut c); - assert!(s.send(1).await.is_ok()); - assert_eq!(r.recv().await, Some(1)); - } - - #[futures_test::test] - async fn sender_send_completes_if_closed() { - static CHANNEL: Forever> = Forever::new(); - let c = CHANNEL.put(Channel::new()); - let (s, r) = split(c); - drop(r); - match s.send(1).await { - Err(SendError(1)) => assert!(true), - _ => assert!(false), - } - } - - #[futures_test::test] - async fn senders_sends_wait_until_capacity() { - let executor = ThreadPool::new().unwrap(); - - static CHANNEL: Forever> = Forever::new(); - let c = CHANNEL.put(Channel::new()); - let (s0, mut r) = split(c); - assert!(s0.try_send(1).is_ok()); - let s1 = s0.clone(); - let send_task_1 = executor.spawn_with_handle(async move { s0.send(2).await }); - let send_task_2 = executor.spawn_with_handle(async move { s1.send(3).await }); - // Wish I could think of a means of determining that the async send is waiting instead. - // However, I've used the debugger to observe that the send does indeed wait. - Delay::new(Duration::from_millis(500)).await; - assert_eq!(r.recv().await, Some(1)); - assert!(executor - .spawn(async move { while let Some(_) = r.recv().await {} }) - .is_ok()); - assert!(send_task_1.unwrap().await.is_ok()); - assert!(send_task_2.unwrap().await.is_ok()); - } - - #[futures_test::test] - async fn sender_close_completes_if_closing() { - static CHANNEL: Forever> = Forever::new(); - let c = CHANNEL.put(Channel::new()); - let (s, mut r) = split(c); - r.close(); - s.closed().await; - } - - #[futures_test::test] - async fn sender_close_completes_if_closed() { - static CHANNEL: Forever> = Forever::new(); - let c = CHANNEL.put(Channel::new()); - let (s, r) = split(c); - drop(r); - s.closed().await; - } -} diff --git a/embassy/src/channel/signal.rs b/embassy/src/channel/signal.rs index 027f4f47..e1f6c4b1 100644 --- a/embassy/src/channel/signal.rs +++ b/embassy/src/channel/signal.rs @@ -5,7 +5,7 @@ use core::task::{Context, Poll, Waker}; /// Synchronization primitive. Allows creating awaitable signals that may be passed between tasks. /// For a simple use-case where the receiver is only ever interested in the latest value of -/// something, Signals work well. For more advanced use cases, please consider [crate::channel::mpsc]. +/// something, Signals work well. For more advanced use cases, you might want to use [`Channel`](crate::channel::channel::Channel) instead.. /// /// Signals are generally declared as being a static const and then borrowed as required. /// diff --git a/examples/nrf/src/bin/channel.rs b/examples/nrf/src/bin/channel.rs new file mode 100644 index 00000000..476ec09a --- /dev/null +++ b/examples/nrf/src/bin/channel.rs @@ -0,0 +1,45 @@ +#![no_std] +#![no_main] +#![feature(type_alias_impl_trait)] + +use defmt::unwrap; +use embassy::blocking_mutex::raw::ThreadModeRawMutex; +use embassy::channel::channel::Channel; +use embassy::executor::Spawner; +use embassy::time::{Duration, Timer}; +use embassy_nrf::gpio::{Level, Output, OutputDrive}; +use embassy_nrf::Peripherals; + +use defmt_rtt as _; // global logger +use panic_probe as _; + +enum LedState { + On, + Off, +} + +static CHANNEL: Channel = Channel::new(); + +#[embassy::task] +async fn my_task() { + loop { + CHANNEL.send(LedState::On).await; + Timer::after(Duration::from_secs(1)).await; + CHANNEL.send(LedState::Off).await; + Timer::after(Duration::from_secs(1)).await; + } +} + +#[embassy::main] +async fn main(spawner: Spawner, p: Peripherals) { + let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard); + + unwrap!(spawner.spawn(my_task())); + + loop { + match CHANNEL.recv().await { + LedState::On => led.set_high(), + LedState::Off => led.set_low(), + } + } +} diff --git a/examples/nrf/src/bin/channel_sender_receiver.rs b/examples/nrf/src/bin/channel_sender_receiver.rs new file mode 100644 index 00000000..c79f2fd6 --- /dev/null +++ b/examples/nrf/src/bin/channel_sender_receiver.rs @@ -0,0 +1,52 @@ +#![no_std] +#![no_main] +#![feature(type_alias_impl_trait)] + +use defmt::unwrap; +use embassy::blocking_mutex::raw::NoopRawMutex; +use embassy::channel::channel::{Channel, Receiver, Sender}; +use embassy::executor::Spawner; +use embassy::time::{Duration, Timer}; +use embassy::util::Forever; +use embassy_nrf::gpio::{AnyPin, Level, Output, OutputDrive, Pin}; +use embassy_nrf::Peripherals; + +use defmt_rtt as _; // global logger +use panic_probe as _; + +enum LedState { + On, + Off, +} + +static CHANNEL: Forever> = Forever::new(); + +#[embassy::task] +async fn send_task(sender: Sender<'static, NoopRawMutex, LedState, 1>) { + loop { + sender.send(LedState::On).await; + Timer::after(Duration::from_secs(1)).await; + sender.send(LedState::Off).await; + Timer::after(Duration::from_secs(1)).await; + } +} + +#[embassy::task] +async fn recv_task(led: AnyPin, receiver: Receiver<'static, NoopRawMutex, LedState, 1>) { + let mut led = Output::new(led, Level::Low, OutputDrive::Standard); + + loop { + match receiver.recv().await { + LedState::On => led.set_high(), + LedState::Off => led.set_low(), + } + } +} + +#[embassy::main] +async fn main(spawner: Spawner, p: Peripherals) { + let channel = CHANNEL.put(Channel::new()); + + unwrap!(spawner.spawn(send_task(channel.sender()))); + unwrap!(spawner.spawn(recv_task(p.P0_13.degrade(), channel.receiver()))); +} diff --git a/examples/nrf/src/bin/mpsc.rs b/examples/nrf/src/bin/mpsc.rs deleted file mode 100644 index 0cb18275..00000000 --- a/examples/nrf/src/bin/mpsc.rs +++ /dev/null @@ -1,60 +0,0 @@ -#![no_std] -#![no_main] -#![feature(type_alias_impl_trait)] - -use defmt::unwrap; -use embassy::blocking_mutex::raw::NoopRawMutex; -use embassy::channel::mpsc::{self, Channel, Sender, TryRecvError}; -use embassy::executor::Spawner; -use embassy::time::{Duration, Timer}; -use embassy::util::Forever; -use embassy_nrf::gpio::{Level, Output, OutputDrive}; -use embassy_nrf::Peripherals; - -use defmt_rtt as _; // global logger -use panic_probe as _; - -enum LedState { - On, - Off, -} - -static CHANNEL: Forever> = Forever::new(); - -#[embassy::task(pool_size = 1)] -async fn my_task(sender: Sender<'static, NoopRawMutex, LedState, 1>) { - loop { - let _ = sender.send(LedState::On).await; - Timer::after(Duration::from_secs(1)).await; - let _ = sender.send(LedState::Off).await; - Timer::after(Duration::from_secs(1)).await; - } -} - -#[embassy::main] -async fn main(spawner: Spawner, p: Peripherals) { - let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard); - - let channel = CHANNEL.put(Channel::new()); - let (sender, mut receiver) = mpsc::split(channel); - - unwrap!(spawner.spawn(my_task(sender))); - - // We could just loop on `receiver.recv()` for simplicity. The code below - // is optimized to drain the queue as fast as possible in the spirit of - // handling events as fast as possible. This optimization is benign when in - // thread mode, but can be useful when interrupts are sending messages - // with the channel having been created via with_critical_sections. - loop { - let maybe_message = match receiver.try_recv() { - m @ Ok(..) => m.ok(), - Err(TryRecvError::Empty) => receiver.recv().await, - Err(TryRecvError::Closed) => break, - }; - match maybe_message { - Some(LedState::On) => led.set_high(), - Some(LedState::Off) => led.set_low(), - _ => (), - } - } -} diff --git a/examples/nrf/src/bin/uart_split.rs b/examples/nrf/src/bin/uart_split.rs index 909429b1..3fde2f0d 100644 --- a/examples/nrf/src/bin/uart_split.rs +++ b/examples/nrf/src/bin/uart_split.rs @@ -3,10 +3,9 @@ #![feature(type_alias_impl_trait)] use defmt::*; -use embassy::blocking_mutex::raw::NoopRawMutex; -use embassy::channel::mpsc::{self, Channel, Sender}; +use embassy::blocking_mutex::raw::ThreadModeRawMutex; +use embassy::channel::channel::Channel; use embassy::executor::Spawner; -use embassy::util::Forever; use embassy_nrf::peripherals::UARTE0; use embassy_nrf::uarte::UarteRx; use embassy_nrf::{interrupt, uarte, Peripherals}; @@ -14,7 +13,7 @@ use embassy_nrf::{interrupt, uarte, Peripherals}; use defmt_rtt as _; // global logger use panic_probe as _; -static CHANNEL: Forever> = Forever::new(); +static CHANNEL: Channel = Channel::new(); #[embassy::main] async fn main(spawner: Spawner, p: Peripherals) { @@ -26,14 +25,11 @@ async fn main(spawner: Spawner, p: Peripherals) { let uart = uarte::Uarte::new(p.UARTE0, irq, p.P0_08, p.P0_06, config); let (mut tx, rx) = uart.split(); - let c = CHANNEL.put(Channel::new()); - let (s, mut r) = mpsc::split(c); - info!("uarte initialized!"); // Spawn a task responsible purely for reading - unwrap!(spawner.spawn(reader(rx, s))); + unwrap!(spawner.spawn(reader(rx))); // Message must be in SRAM { @@ -48,19 +44,18 @@ async fn main(spawner: Spawner, p: Peripherals) { // back out the buffer we receive from the read // task. loop { - if let Some(buf) = r.recv().await { - info!("writing..."); - unwrap!(tx.write(&buf).await); - } + let buf = CHANNEL.recv().await; + info!("writing..."); + unwrap!(tx.write(&buf).await); } } #[embassy::task] -async fn reader(mut rx: UarteRx<'static, UARTE0>, s: Sender<'static, NoopRawMutex, [u8; 8], 1>) { +async fn reader(mut rx: UarteRx<'static, UARTE0>) { let mut buf = [0; 8]; loop { info!("reading..."); unwrap!(rx.read(&mut buf).await); - unwrap!(s.send(buf).await); + CHANNEL.send(buf).await; } } diff --git a/examples/stm32f3/src/bin/button_events.rs b/examples/stm32f3/src/bin/button_events.rs index 99aab302..06e8eec1 100644 --- a/examples/stm32f3/src/bin/button_events.rs +++ b/examples/stm32f3/src/bin/button_events.rs @@ -11,11 +11,10 @@ #![feature(type_alias_impl_trait)] use defmt::*; -use embassy::blocking_mutex::raw::NoopRawMutex; -use embassy::channel::mpsc::{self, Channel, Receiver, Sender}; +use embassy::blocking_mutex::raw::ThreadModeRawMutex; +use embassy::channel::channel::Channel; use embassy::executor::Spawner; use embassy::time::{with_timeout, Duration, Timer}; -use embassy::util::Forever; use embassy_stm32::exti::ExtiInput; use embassy_stm32::gpio::{AnyPin, Input, Level, Output, Pin, Pull, Speed}; use embassy_stm32::peripherals::PA0; @@ -51,14 +50,15 @@ impl<'a> Leds<'a> { } } - async fn show(&mut self, queue: &mut Receiver<'static, NoopRawMutex, ButtonEvent, 4>) { + async fn show(&mut self) { self.leds[self.current_led].set_high(); - if let Ok(new_message) = with_timeout(Duration::from_millis(500), queue.recv()).await { + if let Ok(new_message) = with_timeout(Duration::from_millis(500), CHANNEL.recv()).await { self.leds[self.current_led].set_low(); self.process_event(new_message).await; } else { self.leds[self.current_led].set_low(); - if let Ok(new_message) = with_timeout(Duration::from_millis(200), queue.recv()).await { + if let Ok(new_message) = with_timeout(Duration::from_millis(200), CHANNEL.recv()).await + { self.process_event(new_message).await; } } @@ -77,15 +77,18 @@ impl<'a> Leds<'a> { } } - async fn process_event(&mut self, event: Option) { + async fn process_event(&mut self, event: ButtonEvent) { match event { - Some(ButtonEvent::SingleClick) => self.move_next(), - Some(ButtonEvent::DoubleClick) => { - self.change_direction(); - self.move_next() + ButtonEvent::SingleClick => { + self.move_next(); + } + ButtonEvent::DoubleClick => { + self.change_direction(); + self.move_next(); + } + ButtonEvent::Hold => { + self.flash().await; } - Some(ButtonEvent::Hold) => self.flash().await, - _ => {} } } } @@ -97,7 +100,7 @@ enum ButtonEvent { Hold, } -static BUTTON_EVENTS_QUEUE: Forever> = Forever::new(); +static CHANNEL: Channel = Channel::new(); #[embassy::main] async fn main(spawner: Spawner, p: Peripherals) { @@ -116,27 +119,19 @@ async fn main(spawner: Spawner, p: Peripherals) { ]; let leds = Leds::new(leds); - let buttons_queue = BUTTON_EVENTS_QUEUE.put(Channel::new()); - let (sender, receiver) = mpsc::split(buttons_queue); - spawner.spawn(button_waiter(button, sender)).unwrap(); - spawner.spawn(led_blinker(leds, receiver)).unwrap(); + spawner.spawn(button_waiter(button)).unwrap(); + spawner.spawn(led_blinker(leds)).unwrap(); } #[embassy::task] -async fn led_blinker( - mut leds: Leds<'static>, - mut queue: Receiver<'static, NoopRawMutex, ButtonEvent, 4>, -) { +async fn led_blinker(mut leds: Leds<'static>) { loop { - leds.show(&mut queue).await; + leds.show().await; } } #[embassy::task] -async fn button_waiter( - mut button: ExtiInput<'static, PA0>, - queue: Sender<'static, NoopRawMutex, ButtonEvent, 4>, -) { +async fn button_waiter(mut button: ExtiInput<'static, PA0>) { const DOUBLE_CLICK_DELAY: u64 = 250; const HOLD_DELAY: u64 = 1000; @@ -150,9 +145,7 @@ async fn button_waiter( .is_err() { info!("Hold"); - if queue.send(ButtonEvent::Hold).await.is_err() { - break; - } + CHANNEL.send(ButtonEvent::Hold).await; button.wait_for_falling_edge().await; } else if with_timeout( Duration::from_millis(DOUBLE_CLICK_DELAY), @@ -161,15 +154,11 @@ async fn button_waiter( .await .is_err() { - if queue.send(ButtonEvent::SingleClick).await.is_err() { - break; - } info!("Single click"); + CHANNEL.send(ButtonEvent::SingleClick).await; } else { info!("Double click"); - if queue.send(ButtonEvent::DoubleClick).await.is_err() { - break; - } + CHANNEL.send(ButtonEvent::DoubleClick).await; button.wait_for_falling_edge().await; } button.wait_for_rising_edge().await; diff --git a/examples/stm32h7/src/bin/usart_split.rs b/examples/stm32h7/src/bin/usart_split.rs index ee1763aa..40a7c3e4 100644 --- a/examples/stm32h7/src/bin/usart_split.rs +++ b/examples/stm32h7/src/bin/usart_split.rs @@ -4,10 +4,9 @@ use defmt::*; use defmt_rtt as _; // global logger -use embassy::blocking_mutex::raw::NoopRawMutex; -use embassy::channel::mpsc::{self, Channel, Sender}; +use embassy::blocking_mutex::raw::ThreadModeRawMutex; +use embassy::channel::channel::Channel; use embassy::executor::Spawner; -use embassy::util::Forever; use embassy_stm32::dma::NoDma; use embassy_stm32::{ peripherals::{DMA1_CH1, UART7}, @@ -28,7 +27,7 @@ async fn writer(mut usart: Uart<'static, UART7, NoDma, NoDma>) { } } -static CHANNEL: Forever> = Forever::new(); +static CHANNEL: Channel = Channel::new(); #[embassy::main] async fn main(spawner: Spawner, p: Peripherals) -> ! { @@ -40,28 +39,21 @@ async fn main(spawner: Spawner, p: Peripherals) -> ! { let (mut tx, rx) = usart.split(); - let c = CHANNEL.put(Channel::new()); - let (s, mut r) = mpsc::split(c); - - unwrap!(spawner.spawn(reader(rx, s))); + unwrap!(spawner.spawn(reader(rx))); loop { - if let Some(buf) = r.recv().await { - info!("writing..."); - unwrap!(tx.write(&buf).await); - } + let buf = CHANNEL.recv().await; + info!("writing..."); + unwrap!(tx.write(&buf).await); } } #[embassy::task] -async fn reader( - mut rx: UartRx<'static, UART7, DMA1_CH1>, - s: Sender<'static, NoopRawMutex, [u8; 8], 1>, -) { +async fn reader(mut rx: UartRx<'static, UART7, DMA1_CH1>) { let mut buf = [0; 8]; loop { info!("reading..."); unwrap!(rx.read(&mut buf).await); - unwrap!(s.send(buf).await); + CHANNEL.send(buf).await; } }