Provides a cleaner construction of the channel with the common "new" naming
This commit is contained in:
parent
6f78527aeb
commit
3778f55d80
@ -581,75 +581,27 @@ where
|
|||||||
|
|
||||||
pub type WithCriticalSections = CriticalSectionMutex<()>;
|
pub type WithCriticalSections = CriticalSectionMutex<()>;
|
||||||
|
|
||||||
impl<T, const N: usize> Channel<WithCriticalSections, T, N> {
|
|
||||||
/// Establish a new bounded channel using critical sections. Critical sections
|
|
||||||
/// should be used only single core targets where communication is required
|
|
||||||
/// from exception mode e.g. interrupt handlers. To create one:
|
|
||||||
///
|
|
||||||
/// ```
|
|
||||||
/// use embassy::util::mpsc;
|
|
||||||
/// use embassy::util::mpsc::{Channel, WithCriticalSections};
|
|
||||||
///
|
|
||||||
/// // Declare a bounded channel of 3 u32s.
|
|
||||||
/// let mut channel = Channel::<WithCriticalSections, u32, 3>::with_critical_sections();
|
|
||||||
/// // once we have a channel, obtain its sender and receiver
|
|
||||||
/// let (sender, receiver) = mpsc::split(&mut channel);
|
|
||||||
/// ```
|
|
||||||
pub const fn with_critical_sections() -> Self {
|
|
||||||
let mutex = CriticalSectionMutex::new(());
|
|
||||||
let state = ChannelState::new();
|
|
||||||
let channel_cell = ChannelCell { mutex, state };
|
|
||||||
Channel {
|
|
||||||
channel_cell: UnsafeCell::new(channel_cell),
|
|
||||||
receiver_consumed: PhantomData,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub type WithThreadModeOnly = ThreadModeMutex<()>;
|
pub type WithThreadModeOnly = ThreadModeMutex<()>;
|
||||||
|
|
||||||
impl<T, const N: usize> Channel<WithThreadModeOnly, T, N> {
|
|
||||||
/// Establish a new bounded channel for use in Cortex-M thread mode. Thread
|
|
||||||
/// mode is intended for application threads on a single core, not interrupts.
|
|
||||||
/// As such, only one task at a time can acquire a resource and so this
|
|
||||||
/// channel avoids all locks. To create one:
|
|
||||||
///
|
|
||||||
/// ``` no_run
|
|
||||||
/// use embassy::util::mpsc;
|
|
||||||
/// use embassy::util::mpsc::{Channel, WithThreadModeOnly};
|
|
||||||
///
|
|
||||||
/// // Declare a bounded channel of 3 u32s.
|
|
||||||
/// let mut channel = Channel::<WithThreadModeOnly, u32, 3>::with_thread_mode_only();
|
|
||||||
/// // once we have a channel, obtain its sender and receiver
|
|
||||||
/// let (sender, receiver) = mpsc::split(&mut channel);
|
|
||||||
/// ```
|
|
||||||
pub const fn with_thread_mode_only() -> Self {
|
|
||||||
let mutex = ThreadModeMutex::new(());
|
|
||||||
let state = ChannelState::new();
|
|
||||||
let channel_cell = ChannelCell { mutex, state };
|
|
||||||
Channel {
|
|
||||||
channel_cell: UnsafeCell::new(channel_cell),
|
|
||||||
receiver_consumed: PhantomData,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub type WithNoThreads = NoopMutex<()>;
|
pub type WithNoThreads = NoopMutex<()>;
|
||||||
|
|
||||||
impl<T, const N: usize> Channel<WithNoThreads, T, N> {
|
impl<M, T, const N: usize> Channel<M, T, N>
|
||||||
/// Establish a new bounded channel for within a single thread. To create one:
|
where
|
||||||
|
M: Mutex<Data = ()>,
|
||||||
|
{
|
||||||
|
/// Establish a new bounded channel. For example, to create one with a NoopMutex:
|
||||||
///
|
///
|
||||||
/// ```
|
/// ```
|
||||||
/// use embassy::util::mpsc;
|
/// use embassy::util::mpsc;
|
||||||
/// use embassy::util::mpsc::{Channel, WithNoThreads};
|
/// use embassy::util::mpsc::{Channel, WithNoThreads};
|
||||||
///
|
///
|
||||||
/// // Declare a bounded channel of 3 u32s.
|
/// // Declare a bounded channel of 3 u32s.
|
||||||
/// let mut channel = Channel::<WithNoThreads, u32, 3>::with_no_threads();
|
/// let mut channel = Channel::<WithNoThreads, u32, 3>::new();
|
||||||
/// // once we have a channel, obtain its sender and receiver
|
/// // once we have a channel, obtain its sender and receiver
|
||||||
/// let (sender, receiver) = mpsc::split(&mut channel);
|
/// let (sender, receiver) = mpsc::split(&mut channel);
|
||||||
/// ```
|
/// ```
|
||||||
pub const fn with_no_threads() -> Self {
|
pub fn new() -> Self {
|
||||||
let mutex = NoopMutex::new(());
|
let mutex = M::new(());
|
||||||
let state = ChannelState::new();
|
let state = ChannelState::new();
|
||||||
let channel_cell = ChannelCell { mutex, state };
|
let channel_cell = ChannelCell { mutex, state };
|
||||||
Channel {
|
Channel {
|
||||||
@ -657,12 +609,7 @@ impl<T, const N: usize> Channel<WithNoThreads, T, N> {
|
|||||||
receiver_consumed: PhantomData,
|
receiver_consumed: PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl<M, T, const N: usize> Channel<M, T, N>
|
|
||||||
where
|
|
||||||
M: Mutex<Data = ()>,
|
|
||||||
{
|
|
||||||
fn lock<R>(
|
fn lock<R>(
|
||||||
channel_cell: &UnsafeCell<ChannelCell<M, T, N>>,
|
channel_cell: &UnsafeCell<ChannelCell<M, T, N>>,
|
||||||
f: impl FnOnce(&mut ChannelState<T, N>) -> R,
|
f: impl FnOnce(&mut ChannelState<T, N>) -> R,
|
||||||
@ -684,6 +631,8 @@ mod tests {
|
|||||||
use futures_executor::ThreadPool;
|
use futures_executor::ThreadPool;
|
||||||
use futures_timer::Delay;
|
use futures_timer::Delay;
|
||||||
|
|
||||||
|
use crate::util::Forever;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize {
|
fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize {
|
||||||
@ -758,7 +707,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn simple_send_and_receive() {
|
fn simple_send_and_receive() {
|
||||||
let mut c = Channel::<WithNoThreads, u32, 3>::with_no_threads();
|
let mut c = Channel::<WithNoThreads, u32, 3>::new();
|
||||||
let (s, r) = split(&mut c);
|
let (s, r) = split(&mut c);
|
||||||
assert!(s.clone().try_send(1).is_ok());
|
assert!(s.clone().try_send(1).is_ok());
|
||||||
assert_eq!(r.try_recv().unwrap(), 1);
|
assert_eq!(r.try_recv().unwrap(), 1);
|
||||||
@ -766,7 +715,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn should_close_without_sender() {
|
fn should_close_without_sender() {
|
||||||
let mut c = Channel::<WithNoThreads, u32, 3>::with_no_threads();
|
let mut c = Channel::<WithNoThreads, u32, 3>::new();
|
||||||
let (s, r) = split(&mut c);
|
let (s, r) = split(&mut c);
|
||||||
drop(s);
|
drop(s);
|
||||||
match r.try_recv() {
|
match r.try_recv() {
|
||||||
@ -777,7 +726,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn should_close_once_drained() {
|
fn should_close_once_drained() {
|
||||||
let mut c = Channel::<WithNoThreads, u32, 3>::with_no_threads();
|
let mut c = Channel::<WithNoThreads, u32, 3>::new();
|
||||||
let (s, r) = split(&mut c);
|
let (s, r) = split(&mut c);
|
||||||
assert!(s.try_send(1).is_ok());
|
assert!(s.try_send(1).is_ok());
|
||||||
drop(s);
|
drop(s);
|
||||||
@ -790,7 +739,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn should_reject_send_when_receiver_dropped() {
|
fn should_reject_send_when_receiver_dropped() {
|
||||||
let mut c = Channel::<WithNoThreads, u32, 3>::with_no_threads();
|
let mut c = Channel::<WithNoThreads, u32, 3>::new();
|
||||||
let (s, r) = split(&mut c);
|
let (s, r) = split(&mut c);
|
||||||
drop(r);
|
drop(r);
|
||||||
match s.try_send(1) {
|
match s.try_send(1) {
|
||||||
@ -801,7 +750,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn should_reject_send_when_channel_closed() {
|
fn should_reject_send_when_channel_closed() {
|
||||||
let mut c = Channel::<WithNoThreads, u32, 3>::with_no_threads();
|
let mut c = Channel::<WithNoThreads, u32, 3>::new();
|
||||||
let (s, mut r) = split(&mut c);
|
let (s, mut r) = split(&mut c);
|
||||||
assert!(s.try_send(1).is_ok());
|
assert!(s.try_send(1).is_ok());
|
||||||
r.close();
|
r.close();
|
||||||
@ -817,9 +766,9 @@ mod tests {
|
|||||||
async fn receiver_closes_when_sender_dropped_async() {
|
async fn receiver_closes_when_sender_dropped_async() {
|
||||||
let executor = ThreadPool::new().unwrap();
|
let executor = ThreadPool::new().unwrap();
|
||||||
|
|
||||||
static mut CHANNEL: Channel<WithCriticalSections, u32, 3> =
|
static CHANNEL: Forever<Channel<WithCriticalSections, u32, 3>> = Forever::new();
|
||||||
Channel::with_critical_sections();
|
let c = CHANNEL.put(Channel::new());
|
||||||
let (s, mut r) = split(unsafe { &mut CHANNEL });
|
let (s, mut r) = split(c);
|
||||||
assert!(executor
|
assert!(executor
|
||||||
.spawn(async move {
|
.spawn(async move {
|
||||||
drop(s);
|
drop(s);
|
||||||
@ -832,9 +781,9 @@ mod tests {
|
|||||||
async fn receiver_receives_given_try_send_async() {
|
async fn receiver_receives_given_try_send_async() {
|
||||||
let executor = ThreadPool::new().unwrap();
|
let executor = ThreadPool::new().unwrap();
|
||||||
|
|
||||||
static mut CHANNEL: Channel<WithCriticalSections, u32, 3> =
|
static CHANNEL: Forever<Channel<WithCriticalSections, u32, 3>> = Forever::new();
|
||||||
Channel::with_critical_sections();
|
let c = CHANNEL.put(Channel::new());
|
||||||
let (s, mut r) = split(unsafe { &mut CHANNEL });
|
let (s, mut r) = split(c);
|
||||||
assert!(executor
|
assert!(executor
|
||||||
.spawn(async move {
|
.spawn(async move {
|
||||||
assert!(s.try_send(1).is_ok());
|
assert!(s.try_send(1).is_ok());
|
||||||
@ -845,18 +794,17 @@ mod tests {
|
|||||||
|
|
||||||
#[futures_test::test]
|
#[futures_test::test]
|
||||||
async fn sender_send_completes_if_capacity() {
|
async fn sender_send_completes_if_capacity() {
|
||||||
static mut CHANNEL: Channel<WithCriticalSections, u32, 1> =
|
let mut c = Channel::<WithCriticalSections, u32, 1>::new();
|
||||||
Channel::with_critical_sections();
|
let (s, mut r) = split(&mut c);
|
||||||
let (s, mut r) = split(unsafe { &mut CHANNEL });
|
|
||||||
assert!(s.send(1).await.is_ok());
|
assert!(s.send(1).await.is_ok());
|
||||||
assert_eq!(r.recv().await, Some(1));
|
assert_eq!(r.recv().await, Some(1));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[futures_test::test]
|
#[futures_test::test]
|
||||||
async fn sender_send_completes_if_closed() {
|
async fn sender_send_completes_if_closed() {
|
||||||
static mut CHANNEL: Channel<WithCriticalSections, u32, 1> =
|
static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new();
|
||||||
Channel::with_critical_sections();
|
let c = CHANNEL.put(Channel::new());
|
||||||
let (s, r) = split(unsafe { &mut CHANNEL });
|
let (s, r) = split(c);
|
||||||
drop(r);
|
drop(r);
|
||||||
match s.send(1).await {
|
match s.send(1).await {
|
||||||
Err(SendError(1)) => assert!(true),
|
Err(SendError(1)) => assert!(true),
|
||||||
@ -868,9 +816,9 @@ mod tests {
|
|||||||
async fn senders_sends_wait_until_capacity() {
|
async fn senders_sends_wait_until_capacity() {
|
||||||
let executor = ThreadPool::new().unwrap();
|
let executor = ThreadPool::new().unwrap();
|
||||||
|
|
||||||
static mut CHANNEL: Channel<WithCriticalSections, u32, 1> =
|
static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new();
|
||||||
Channel::with_critical_sections();
|
let c = CHANNEL.put(Channel::new());
|
||||||
let (s0, mut r) = split(unsafe { &mut CHANNEL });
|
let (s0, mut r) = split(c);
|
||||||
assert!(s0.try_send(1).is_ok());
|
assert!(s0.try_send(1).is_ok());
|
||||||
let s1 = s0.clone();
|
let s1 = s0.clone();
|
||||||
let send_task_1 = executor.spawn_with_handle(async move { s0.send(2).await });
|
let send_task_1 = executor.spawn_with_handle(async move { s0.send(2).await });
|
||||||
@ -888,18 +836,18 @@ mod tests {
|
|||||||
|
|
||||||
#[futures_test::test]
|
#[futures_test::test]
|
||||||
async fn sender_close_completes_if_closing() {
|
async fn sender_close_completes_if_closing() {
|
||||||
static mut CHANNEL: Channel<WithCriticalSections, u32, 1> =
|
static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new();
|
||||||
Channel::with_critical_sections();
|
let c = CHANNEL.put(Channel::new());
|
||||||
let (s, mut r) = split(unsafe { &mut CHANNEL });
|
let (s, mut r) = split(c);
|
||||||
r.close();
|
r.close();
|
||||||
s.closed().await;
|
s.closed().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[futures_test::test]
|
#[futures_test::test]
|
||||||
async fn sender_close_completes_if_closed() {
|
async fn sender_close_completes_if_closed() {
|
||||||
static mut CHANNEL: Channel<WithCriticalSections, u32, 1> =
|
static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new();
|
||||||
Channel::with_critical_sections();
|
let c = CHANNEL.put(Channel::new());
|
||||||
let (s, r) = split(unsafe { &mut CHANNEL });
|
let (s, r) = split(c);
|
||||||
drop(r);
|
drop(r);
|
||||||
s.closed().await;
|
s.closed().await;
|
||||||
}
|
}
|
||||||
|
@ -8,6 +8,8 @@ pub trait Mutex {
|
|||||||
/// Data protected by the mutex.
|
/// Data protected by the mutex.
|
||||||
type Data;
|
type Data;
|
||||||
|
|
||||||
|
fn new(data: Self::Data) -> Self;
|
||||||
|
|
||||||
/// Creates a critical section and grants temporary access to the protected data.
|
/// Creates a critical section and grants temporary access to the protected data.
|
||||||
fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R;
|
fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R;
|
||||||
}
|
}
|
||||||
@ -47,6 +49,10 @@ impl<T> CriticalSectionMutex<T> {
|
|||||||
impl<T> Mutex for CriticalSectionMutex<T> {
|
impl<T> Mutex for CriticalSectionMutex<T> {
|
||||||
type Data = T;
|
type Data = T;
|
||||||
|
|
||||||
|
fn new(data: T) -> Self {
|
||||||
|
Self::new(data)
|
||||||
|
}
|
||||||
|
|
||||||
fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R {
|
fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R {
|
||||||
critical_section::with(|cs| f(self.borrow(cs)))
|
critical_section::with(|cs| f(self.borrow(cs)))
|
||||||
}
|
}
|
||||||
@ -92,6 +98,10 @@ impl<T> ThreadModeMutex<T> {
|
|||||||
impl<T> Mutex for ThreadModeMutex<T> {
|
impl<T> Mutex for ThreadModeMutex<T> {
|
||||||
type Data = T;
|
type Data = T;
|
||||||
|
|
||||||
|
fn new(data: T) -> Self {
|
||||||
|
Self::new(data)
|
||||||
|
}
|
||||||
|
|
||||||
fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R {
|
fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R {
|
||||||
f(self.borrow())
|
f(self.borrow())
|
||||||
}
|
}
|
||||||
@ -126,6 +136,10 @@ impl<T> NoopMutex<T> {
|
|||||||
impl<T> Mutex for NoopMutex<T> {
|
impl<T> Mutex for NoopMutex<T> {
|
||||||
type Data = T;
|
type Data = T;
|
||||||
|
|
||||||
|
fn new(data: T) -> Self {
|
||||||
|
Self::new(data)
|
||||||
|
}
|
||||||
|
|
||||||
fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R {
|
fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R {
|
||||||
f(self.borrow())
|
f(self.borrow())
|
||||||
}
|
}
|
||||||
|
@ -37,9 +37,10 @@ async fn my_task(sender: Sender<'static, WithNoThreads, LedState, 1>) {
|
|||||||
|
|
||||||
#[embassy::main]
|
#[embassy::main]
|
||||||
async fn main(spawner: Spawner, p: Peripherals) {
|
async fn main(spawner: Spawner, p: Peripherals) {
|
||||||
|
|
||||||
let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard);
|
let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard);
|
||||||
|
|
||||||
let channel = CHANNEL.put(Channel::with_no_threads());
|
let channel = CHANNEL.put(Channel::new());
|
||||||
let (sender, mut receiver) = mpsc::split(channel);
|
let (sender, mut receiver) = mpsc::split(channel);
|
||||||
|
|
||||||
spawner.spawn(my_task(sender)).unwrap();
|
spawner.spawn(my_task(sender)).unwrap();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user