From f159beec1cbd1406f63ca7c3e84a1d598bbadaa1 Mon Sep 17 00:00:00 2001 From: huntc Date: Fri, 9 Jul 2021 12:13:07 +1000 Subject: [PATCH] Use of a NoopMutex --- embassy/src/util/mpsc.rs | 109 +++++++++++++++-------------------- embassy/src/util/mutex.rs | 27 +++++++++ examples/nrf/src/bin/mpsc.rs | 8 +-- 3 files changed, 78 insertions(+), 66 deletions(-) diff --git a/embassy/src/util/mpsc.rs b/embassy/src/util/mpsc.rs index 65e4bf7b..e54c507c 100644 --- a/embassy/src/util/mpsc.rs +++ b/embassy/src/util/mpsc.rs @@ -49,6 +49,7 @@ use futures::Future; use super::CriticalSectionMutex; use super::Mutex; +use super::NoopMutex; use super::ThreadModeMutex; /// Send values to the associated `Receiver`. @@ -96,10 +97,10 @@ unsafe impl<'ch, M, T, const N: usize> Sync for Receiver<'ch, M, T, N> where M: //// /// ```compile_fail /// use embassy::util::mpsc; -/// use embassy::util::mpsc::{Channel, ChannelCell, WithThreadModeOnly}; +/// use embassy::util::mpsc::{Channel, WithThreadModeOnly}; /// /// let (sender, receiver) = { -/// let mut channel = ChannelCell::new(Channel::::with_thread_mode_only()); +/// let mut channel = Channel::::with_thread_mode_only(); /// mpsc::split(&channel) /// }; /// ``` @@ -464,10 +465,10 @@ impl Channel { /// /// ``` /// use embassy::util::mpsc; - /// use embassy::util::mpsc::{Channel, ChannelCell, WithCriticalSections}; + /// use embassy::util::mpsc::{Channel, WithCriticalSections}; /// /// // Declare a bounded channel of 3 u32s. - /// let mut channel = ChannelCell::new(mpsc::Channel::::with_critical_sections()); + /// let mut channel = Channel::::with_critical_sections(); /// // once we have a channel, obtain its sender and receiver /// let (sender, receiver) = mpsc::split(&channel); /// ``` @@ -488,10 +489,10 @@ impl Channel { /// /// ``` no_run /// use embassy::util::mpsc; - /// use embassy::util::mpsc::{Channel, ChannelCell, WithThreadModeOnly}; + /// use embassy::util::mpsc::{Channel, WithThreadModeOnly}; /// /// // Declare a bounded channel of 3 u32s. - /// let mut channel = ChannelCell::new(Channel::::with_thread_mode_only()); + /// let mut channel = Channel::::with_thread_mode_only(); /// // once we have a channel, obtain its sender and receiver /// let (sender, receiver) = mpsc::split(&channel); /// ``` @@ -502,6 +503,27 @@ impl Channel { } } +pub type WithNoThreads = NoopMutex<()>; + +impl Channel { + /// Establish a new bounded channel for within a single thread. To create one: + /// + /// ``` + /// use embassy::util::mpsc; + /// use embassy::util::mpsc::{Channel, WithNoThreads}; + /// + /// // Declare a bounded channel of 3 u32s. + /// let mut channel = Channel::::with_no_threads(); + /// // once we have a channel, obtain its sender and receiver + /// let (sender, receiver) = mpsc::split(&channel); + /// ``` + pub const fn with_no_threads() -> Self { + let mutex = NoopMutex::new(()); + let state = ChannelState::new(); + Channel { mutex, state } + } +} + impl Channel where M: Mutex, @@ -675,43 +697,6 @@ mod tests { } } - /// A mutex that does nothing - useful for our testing purposes - pub struct NoopMutex { - inner: UnsafeCell, - } - - impl NoopMutex { - pub const fn new(value: T) -> Self { - NoopMutex { - inner: UnsafeCell::new(value), - } - } - } - - impl NoopMutex { - pub fn borrow(&self) -> &T { - unsafe { &*self.inner.get() } - } - } - - impl Mutex for NoopMutex { - type Data = T; - - fn lock(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R { - f(self.borrow()) - } - } - - pub type WithNoThreads = NoopMutex<()>; - - impl Channel { - pub const fn with_no_threads() -> Self { - let mutex = NoopMutex::new(()); - let state = ChannelState::new(); - Channel { mutex, state } - } - } - #[test] fn sending_once() { let mut c = Channel::::with_no_threads(); @@ -772,7 +757,7 @@ mod tests { #[test] fn simple_send_and_receive() { - let c = ChannelCell::new(Channel::::with_no_threads()); + let c = Channel::::with_no_threads(); let (s, r) = split(&c); assert!(s.clone().try_send(1).is_ok()); assert_eq!(r.try_recv().unwrap(), 1); @@ -780,7 +765,7 @@ mod tests { #[test] fn should_close_without_sender() { - let c = ChannelCell::new(Channel::::with_no_threads()); + let c = Channel::::with_no_threads(); let (s, r) = split(&c); drop(s); match r.try_recv() { @@ -791,7 +776,7 @@ mod tests { #[test] fn should_close_once_drained() { - let c = ChannelCell::new(Channel::::with_no_threads()); + let c = Channel::::with_no_threads(); let (s, r) = split(&c); assert!(s.try_send(1).is_ok()); drop(s); @@ -804,7 +789,7 @@ mod tests { #[test] fn should_reject_send_when_receiver_dropped() { - let c = ChannelCell::new(Channel::::with_no_threads()); + let c = Channel::::with_no_threads(); let (s, r) = split(&c); drop(r); match s.try_send(1) { @@ -815,7 +800,7 @@ mod tests { #[test] fn should_reject_send_when_channel_closed() { - let c = ChannelCell::new(Channel::::with_no_threads()); + let c = Channel::::with_no_threads(); let (s, mut r) = split(&c); assert!(s.try_send(1).is_ok()); r.close(); @@ -831,8 +816,8 @@ mod tests { async fn receiver_closes_when_sender_dropped_async() { let executor = ThreadPool::new().unwrap(); - static mut CHANNEL: ChannelCell> = - ChannelCell::new(Channel::with_critical_sections()); + static mut CHANNEL: Channel = + Channel::with_critical_sections(); let (s, mut r) = split(unsafe { &CHANNEL }); assert!(executor .spawn(async move { @@ -846,8 +831,8 @@ mod tests { async fn receiver_receives_given_try_send_async() { let executor = ThreadPool::new().unwrap(); - static mut CHANNEL: ChannelCell> = - ChannelCell::new(Channel::with_critical_sections()); + static mut CHANNEL: Channel = + Channel::with_critical_sections(); let (s, mut r) = split(unsafe { &CHANNEL }); assert!(executor .spawn(async move { @@ -859,8 +844,8 @@ mod tests { #[futures_test::test] async fn sender_send_completes_if_capacity() { - static mut CHANNEL: ChannelCell> = - ChannelCell::new(Channel::with_critical_sections()); + static mut CHANNEL: Channel = + Channel::with_critical_sections(); let (s, mut r) = split(unsafe { &CHANNEL }); assert!(s.send(1).await.is_ok()); assert_eq!(r.recv().await, Some(1)); @@ -868,8 +853,8 @@ mod tests { #[futures_test::test] async fn sender_send_completes_if_closed() { - static mut CHANNEL: ChannelCell> = - ChannelCell::new(Channel::with_critical_sections()); + static mut CHANNEL: Channel = + Channel::with_critical_sections(); let (s, r) = split(unsafe { &CHANNEL }); drop(r); match s.send(1).await { @@ -882,8 +867,8 @@ mod tests { async fn senders_sends_wait_until_capacity() { let executor = ThreadPool::new().unwrap(); - static mut CHANNEL: ChannelCell> = - ChannelCell::new(Channel::with_critical_sections()); + static mut CHANNEL: Channel = + Channel::with_critical_sections(); let (s0, mut r) = split(unsafe { &CHANNEL }); assert!(s0.try_send(1).is_ok()); let s1 = s0.clone(); @@ -902,8 +887,8 @@ mod tests { #[futures_test::test] async fn sender_close_completes_if_closing() { - static mut CHANNEL: ChannelCell> = - ChannelCell::new(Channel::with_critical_sections()); + static mut CHANNEL: Channel = + Channel::with_critical_sections(); let (s, mut r) = split(unsafe { &CHANNEL }); r.close(); s.closed().await; @@ -911,8 +896,8 @@ mod tests { #[futures_test::test] async fn sender_close_completes_if_closed() { - static mut CHANNEL: ChannelCell> = - ChannelCell::new(Channel::with_critical_sections()); + static mut CHANNEL: Channel = + Channel::with_critical_sections(); let (s, r) = split(unsafe { &CHANNEL }); drop(r); s.closed().await; diff --git a/embassy/src/util/mutex.rs b/embassy/src/util/mutex.rs index 682fcb39..db3423cb 100644 --- a/embassy/src/util/mutex.rs +++ b/embassy/src/util/mutex.rs @@ -105,3 +105,30 @@ pub fn in_thread_mode() -> bool { return cortex_m::peripheral::SCB::vect_active() == cortex_m::peripheral::scb::VectActive::ThreadMode; } + +/// A "mutex" that does nothing and cannot be shared between threads. +pub struct NoopMutex { + inner: UnsafeCell, +} + +impl NoopMutex { + pub const fn new(value: T) -> Self { + NoopMutex { + inner: UnsafeCell::new(value), + } + } +} + +impl NoopMutex { + pub fn borrow(&self) -> &T { + unsafe { &*self.inner.get() } + } +} + +impl Mutex for NoopMutex { + type Data = T; + + fn lock(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R { + f(self.borrow()) + } +} diff --git a/examples/nrf/src/bin/mpsc.rs b/examples/nrf/src/bin/mpsc.rs index 6a0f8f47..c2cb107e 100644 --- a/examples/nrf/src/bin/mpsc.rs +++ b/examples/nrf/src/bin/mpsc.rs @@ -16,17 +16,17 @@ use embassy::util::{mpsc, Forever}; use embassy_nrf::gpio::{Level, Output, OutputDrive}; use embassy_nrf::Peripherals; use embedded_hal::digital::v2::OutputPin; -use mpsc::{Channel, Sender, WithThreadModeOnly}; +use mpsc::{Channel, Sender, WithNoThreads}; enum LedState { On, Off, } -static CHANNEL: Forever> = Forever::new(); +static CHANNEL: Forever> = Forever::new(); #[embassy::task(pool_size = 1)] -async fn my_task(sender: Sender<'static, WithThreadModeOnly, LedState, 1>) { +async fn my_task(sender: Sender<'static, WithNoThreads, LedState, 1>) { loop { let _ = sender.send(LedState::On).await; Timer::after(Duration::from_secs(1)).await; @@ -39,7 +39,7 @@ async fn my_task(sender: Sender<'static, WithThreadModeOnly, LedState, 1>) { async fn main(spawner: Spawner, p: Peripherals) { let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard); - let channel = CHANNEL.put(Channel::with_thread_mode_only()); + let channel = CHANNEL.put(Channel::with_no_threads()); let (sender, mut receiver) = mpsc::split(channel); spawner.spawn(my_task(sender)).unwrap();