From a2656f402b1c59461cec5f5dc685b2692119b996 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Fri, 11 Aug 2023 12:04:30 +0200 Subject: [PATCH 1/4] Move embassy-net-driver-channel::zerocopy_channel to embassy_sync::zero_copy_channel --- embassy-net-driver-channel/src/lib.rs | 237 ++------------------------ embassy-sync/src/lib.rs | 1 + embassy-sync/src/zero_copy_channel.rs | 209 +++++++++++++++++++++++ 3 files changed, 223 insertions(+), 224 deletions(-) create mode 100644 embassy-sync/src/zero_copy_channel.rs diff --git a/embassy-net-driver-channel/src/lib.rs b/embassy-net-driver-channel/src/lib.rs index f2aa6b25..e8cd66f8 100644 --- a/embassy-net-driver-channel/src/lib.rs +++ b/embassy-net-driver-channel/src/lib.rs @@ -14,6 +14,7 @@ use embassy_net_driver::{Capabilities, LinkState, Medium}; use embassy_sync::blocking_mutex::raw::NoopRawMutex; use embassy_sync::blocking_mutex::Mutex; use embassy_sync::waitqueue::WakerRegistration; +use embassy_sync::zero_copy_channel; pub struct State { rx: [PacketBuf; N_RX], @@ -34,8 +35,8 @@ impl State { - rx: zerocopy_channel::Channel<'d, NoopRawMutex, PacketBuf>, - tx: zerocopy_channel::Channel<'d, NoopRawMutex, PacketBuf>, + rx: zero_copy_channel::Channel<'d, NoopRawMutex, PacketBuf>, + tx: zero_copy_channel::Channel<'d, NoopRawMutex, PacketBuf>, shared: Mutex>, } @@ -47,8 +48,8 @@ struct Shared { } pub struct Runner<'d, const MTU: usize> { - tx_chan: zerocopy_channel::Receiver<'d, NoopRawMutex, PacketBuf>, - rx_chan: zerocopy_channel::Sender<'d, NoopRawMutex, PacketBuf>, + tx_chan: zero_copy_channel::Receiver<'d, NoopRawMutex, PacketBuf>, + rx_chan: zero_copy_channel::Sender<'d, NoopRawMutex, PacketBuf>, shared: &'d Mutex>, } @@ -58,11 +59,11 @@ pub struct StateRunner<'d> { } pub struct RxRunner<'d, const MTU: usize> { - rx_chan: zerocopy_channel::Sender<'d, NoopRawMutex, PacketBuf>, + rx_chan: zero_copy_channel::Sender<'d, NoopRawMutex, PacketBuf>, } pub struct TxRunner<'d, const MTU: usize> { - tx_chan: zerocopy_channel::Receiver<'d, NoopRawMutex, PacketBuf>, + tx_chan: zero_copy_channel::Receiver<'d, NoopRawMutex, PacketBuf>, } impl<'d, const MTU: usize> Runner<'d, MTU> { @@ -243,8 +244,8 @@ pub fn new<'d, const MTU: usize, const N_RX: usize, const N_TX: usize>( let state_uninit: *mut MaybeUninit> = (&mut state.inner as *mut MaybeUninit>).cast(); let state = unsafe { &mut *state_uninit }.write(StateInner { - rx: zerocopy_channel::Channel::new(&mut state.rx[..]), - tx: zerocopy_channel::Channel::new(&mut state.tx[..]), + rx: zero_copy_channel::Channel::new(&mut state.rx[..]), + tx: zero_copy_channel::Channel::new(&mut state.tx[..]), shared: Mutex::new(RefCell::new(Shared { link_state: LinkState::Down, hardware_address, @@ -282,8 +283,8 @@ impl PacketBuf { } pub struct Device<'d, const MTU: usize> { - rx: zerocopy_channel::Receiver<'d, NoopRawMutex, PacketBuf>, - tx: zerocopy_channel::Sender<'d, NoopRawMutex, PacketBuf>, + rx: zero_copy_channel::Receiver<'d, NoopRawMutex, PacketBuf>, + tx: zero_copy_channel::Sender<'d, NoopRawMutex, PacketBuf>, shared: &'d Mutex>, caps: Capabilities, } @@ -328,7 +329,7 @@ impl<'d, const MTU: usize> embassy_net_driver::Driver for Device<'d, MTU> { } pub struct RxToken<'a, const MTU: usize> { - rx: zerocopy_channel::Receiver<'a, NoopRawMutex, PacketBuf>, + rx: zero_copy_channel::Receiver<'a, NoopRawMutex, PacketBuf>, } impl<'a, const MTU: usize> embassy_net_driver::RxToken for RxToken<'a, MTU> { @@ -345,7 +346,7 @@ impl<'a, const MTU: usize> embassy_net_driver::RxToken for RxToken<'a, MTU> { } pub struct TxToken<'a, const MTU: usize> { - tx: zerocopy_channel::Sender<'a, NoopRawMutex, PacketBuf>, + tx: zero_copy_channel::Sender<'a, NoopRawMutex, PacketBuf>, } impl<'a, const MTU: usize> embassy_net_driver::TxToken for TxToken<'a, MTU> { @@ -361,215 +362,3 @@ impl<'a, const MTU: usize> embassy_net_driver::TxToken for TxToken<'a, MTU> { r } } - -mod zerocopy_channel { - use core::cell::RefCell; - use core::future::poll_fn; - use core::marker::PhantomData; - use core::task::{Context, Poll}; - - use embassy_sync::blocking_mutex::raw::RawMutex; - use embassy_sync::blocking_mutex::Mutex; - use embassy_sync::waitqueue::WakerRegistration; - - pub struct Channel<'a, M: RawMutex, T> { - buf: *mut T, - phantom: PhantomData<&'a mut T>, - state: Mutex>, - } - - impl<'a, M: RawMutex, T> Channel<'a, M, T> { - pub fn new(buf: &'a mut [T]) -> Self { - let len = buf.len(); - assert!(len != 0); - - Self { - buf: buf.as_mut_ptr(), - phantom: PhantomData, - state: Mutex::new(RefCell::new(State { - len, - front: 0, - back: 0, - full: false, - send_waker: WakerRegistration::new(), - recv_waker: WakerRegistration::new(), - })), - } - } - - pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) { - (Sender { channel: self }, Receiver { channel: self }) - } - } - - pub struct Sender<'a, M: RawMutex, T> { - channel: &'a Channel<'a, M, T>, - } - - impl<'a, M: RawMutex, T> Sender<'a, M, T> { - pub fn borrow(&mut self) -> Sender<'_, M, T> { - Sender { channel: self.channel } - } - - pub fn try_send(&mut self) -> Option<&mut T> { - self.channel.state.lock(|s| { - let s = &mut *s.borrow_mut(); - match s.push_index() { - Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }), - None => None, - } - }) - } - - pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> { - self.channel.state.lock(|s| { - let s = &mut *s.borrow_mut(); - match s.push_index() { - Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }), - None => { - s.recv_waker.register(cx.waker()); - Poll::Pending - } - } - }) - } - - pub async fn send(&mut self) -> &mut T { - let i = poll_fn(|cx| { - self.channel.state.lock(|s| { - let s = &mut *s.borrow_mut(); - match s.push_index() { - Some(i) => Poll::Ready(i), - None => { - s.recv_waker.register(cx.waker()); - Poll::Pending - } - } - }) - }) - .await; - unsafe { &mut *self.channel.buf.add(i) } - } - - pub fn send_done(&mut self) { - self.channel.state.lock(|s| s.borrow_mut().push_done()) - } - } - pub struct Receiver<'a, M: RawMutex, T> { - channel: &'a Channel<'a, M, T>, - } - - impl<'a, M: RawMutex, T> Receiver<'a, M, T> { - pub fn borrow(&mut self) -> Receiver<'_, M, T> { - Receiver { channel: self.channel } - } - - pub fn try_recv(&mut self) -> Option<&mut T> { - self.channel.state.lock(|s| { - let s = &mut *s.borrow_mut(); - match s.pop_index() { - Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }), - None => None, - } - }) - } - - pub fn poll_recv(&mut self, cx: &mut Context) -> Poll<&mut T> { - self.channel.state.lock(|s| { - let s = &mut *s.borrow_mut(); - match s.pop_index() { - Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }), - None => { - s.send_waker.register(cx.waker()); - Poll::Pending - } - } - }) - } - - pub async fn recv(&mut self) -> &mut T { - let i = poll_fn(|cx| { - self.channel.state.lock(|s| { - let s = &mut *s.borrow_mut(); - match s.pop_index() { - Some(i) => Poll::Ready(i), - None => { - s.send_waker.register(cx.waker()); - Poll::Pending - } - } - }) - }) - .await; - unsafe { &mut *self.channel.buf.add(i) } - } - - pub fn recv_done(&mut self) { - self.channel.state.lock(|s| s.borrow_mut().pop_done()) - } - } - - struct State { - len: usize, - - /// Front index. Always 0..=(N-1) - front: usize, - /// Back index. Always 0..=(N-1). - back: usize, - - /// Used to distinguish "empty" and "full" cases when `front == back`. - /// May only be `true` if `front == back`, always `false` otherwise. - full: bool, - - send_waker: WakerRegistration, - recv_waker: WakerRegistration, - } - - impl State { - fn increment(&self, i: usize) -> usize { - if i + 1 == self.len { - 0 - } else { - i + 1 - } - } - - fn is_full(&self) -> bool { - self.full - } - - fn is_empty(&self) -> bool { - self.front == self.back && !self.full - } - - fn push_index(&mut self) -> Option { - match self.is_full() { - true => None, - false => Some(self.back), - } - } - - fn push_done(&mut self) { - assert!(!self.is_full()); - self.back = self.increment(self.back); - if self.back == self.front { - self.full = true; - } - self.send_waker.wake(); - } - - fn pop_index(&mut self) -> Option { - match self.is_empty() { - true => None, - false => Some(self.front), - } - } - - fn pop_done(&mut self) { - assert!(!self.is_empty()); - self.front = self.increment(self.front); - self.full = false; - self.recv_waker.wake(); - } - } -} diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs index 53d95d08..48a7b13f 100644 --- a/embassy-sync/src/lib.rs +++ b/embassy-sync/src/lib.rs @@ -17,3 +17,4 @@ pub mod pipe; pub mod pubsub; pub mod signal; pub mod waitqueue; +pub mod zero_copy_channel; diff --git a/embassy-sync/src/zero_copy_channel.rs b/embassy-sync/src/zero_copy_channel.rs new file mode 100644 index 00000000..3701ccf1 --- /dev/null +++ b/embassy-sync/src/zero_copy_channel.rs @@ -0,0 +1,209 @@ +use core::cell::RefCell; +use core::future::poll_fn; +use core::marker::PhantomData; +use core::task::{Context, Poll}; + +use crate::blocking_mutex::raw::RawMutex; +use crate::blocking_mutex::Mutex; +use crate::waitqueue::WakerRegistration; + +pub struct Channel<'a, M: RawMutex, T> { + buf: *mut T, + phantom: PhantomData<&'a mut T>, + state: Mutex>, +} + +impl<'a, M: RawMutex, T> Channel<'a, M, T> { + pub fn new(buf: &'a mut [T]) -> Self { + let len = buf.len(); + assert!(len != 0); + + Self { + buf: buf.as_mut_ptr(), + phantom: PhantomData, + state: Mutex::new(RefCell::new(State { + len, + front: 0, + back: 0, + full: false, + send_waker: WakerRegistration::new(), + recv_waker: WakerRegistration::new(), + })), + } + } + + pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) { + (Sender { channel: self }, Receiver { channel: self }) + } +} + +pub struct Sender<'a, M: RawMutex, T> { + channel: &'a Channel<'a, M, T>, +} + +impl<'a, M: RawMutex, T> Sender<'a, M, T> { + pub fn borrow(&mut self) -> Sender<'_, M, T> { + Sender { channel: self.channel } + } + + pub fn try_send(&mut self) -> Option<&mut T> { + self.channel.state.lock(|s| { + let s = &mut *s.borrow_mut(); + match s.push_index() { + Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }), + None => None, + } + }) + } + + pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> { + self.channel.state.lock(|s| { + let s = &mut *s.borrow_mut(); + match s.push_index() { + Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }), + None => { + s.recv_waker.register(cx.waker()); + Poll::Pending + } + } + }) + } + + pub async fn send(&mut self) -> &mut T { + let i = poll_fn(|cx| { + self.channel.state.lock(|s| { + let s = &mut *s.borrow_mut(); + match s.push_index() { + Some(i) => Poll::Ready(i), + None => { + s.recv_waker.register(cx.waker()); + Poll::Pending + } + } + }) + }) + .await; + unsafe { &mut *self.channel.buf.add(i) } + } + + pub fn send_done(&mut self) { + self.channel.state.lock(|s| s.borrow_mut().push_done()) + } +} +pub struct Receiver<'a, M: RawMutex, T> { + channel: &'a Channel<'a, M, T>, +} + +impl<'a, M: RawMutex, T> Receiver<'a, M, T> { + pub fn borrow(&mut self) -> Receiver<'_, M, T> { + Receiver { channel: self.channel } + } + + pub fn try_recv(&mut self) -> Option<&mut T> { + self.channel.state.lock(|s| { + let s = &mut *s.borrow_mut(); + match s.pop_index() { + Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }), + None => None, + } + }) + } + + pub fn poll_recv(&mut self, cx: &mut Context) -> Poll<&mut T> { + self.channel.state.lock(|s| { + let s = &mut *s.borrow_mut(); + match s.pop_index() { + Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }), + None => { + s.send_waker.register(cx.waker()); + Poll::Pending + } + } + }) + } + + pub async fn recv(&mut self) -> &mut T { + let i = poll_fn(|cx| { + self.channel.state.lock(|s| { + let s = &mut *s.borrow_mut(); + match s.pop_index() { + Some(i) => Poll::Ready(i), + None => { + s.send_waker.register(cx.waker()); + Poll::Pending + } + } + }) + }) + .await; + unsafe { &mut *self.channel.buf.add(i) } + } + + pub fn recv_done(&mut self) { + self.channel.state.lock(|s| s.borrow_mut().pop_done()) + } +} + +struct State { + len: usize, + + /// Front index. Always 0..=(N-1) + front: usize, + /// Back index. Always 0..=(N-1). + back: usize, + + /// Used to distinguish "empty" and "full" cases when `front == back`. + /// May only be `true` if `front == back`, always `false` otherwise. + full: bool, + + send_waker: WakerRegistration, + recv_waker: WakerRegistration, +} + +impl State { + fn increment(&self, i: usize) -> usize { + if i + 1 == self.len { + 0 + } else { + i + 1 + } + } + + fn is_full(&self) -> bool { + self.full + } + + fn is_empty(&self) -> bool { + self.front == self.back && !self.full + } + + fn push_index(&mut self) -> Option { + match self.is_full() { + true => None, + false => Some(self.back), + } + } + + fn push_done(&mut self) { + assert!(!self.is_full()); + self.back = self.increment(self.back); + if self.back == self.front { + self.full = true; + } + self.send_waker.wake(); + } + + fn pop_index(&mut self) -> Option { + match self.is_empty() { + true => None, + false => Some(self.front), + } + } + + fn pop_done(&mut self) { + assert!(!self.is_empty()); + self.front = self.increment(self.front); + self.full = false; + self.recv_waker.wake(); + } +} From 1eb03dc41a4a5fa8435f9a49d26e29ceea6d498e Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Fri, 11 Aug 2023 12:07:30 +0200 Subject: [PATCH 2/4] Prefer `receive` over `recv` --- embassy-net-driver-channel/src/lib.rs | 22 +++++++++++----------- embassy-sync/src/zero_copy_channel.rs | 18 +++++++++--------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/embassy-net-driver-channel/src/lib.rs b/embassy-net-driver-channel/src/lib.rs index e8cd66f8..a2076074 100644 --- a/embassy-net-driver-channel/src/lib.rs +++ b/embassy-net-driver-channel/src/lib.rs @@ -131,24 +131,24 @@ impl<'d, const MTU: usize> Runner<'d, MTU> { } pub async fn tx_buf(&mut self) -> &mut [u8] { - let p = self.tx_chan.recv().await; + let p = self.tx_chan.receive().await; &mut p.buf[..p.len] } pub fn try_tx_buf(&mut self) -> Option<&mut [u8]> { - let p = self.tx_chan.try_recv()?; + let p = self.tx_chan.try_receive()?; Some(&mut p.buf[..p.len]) } pub fn poll_tx_buf(&mut self, cx: &mut Context) -> Poll<&mut [u8]> { - match self.tx_chan.poll_recv(cx) { + match self.tx_chan.poll_receive(cx) { Poll::Ready(p) => Poll::Ready(&mut p.buf[..p.len]), Poll::Pending => Poll::Pending, } } pub fn tx_done(&mut self) { - self.tx_chan.recv_done(); + self.tx_chan.receive_done(); } } @@ -205,24 +205,24 @@ impl<'d, const MTU: usize> RxRunner<'d, MTU> { impl<'d, const MTU: usize> TxRunner<'d, MTU> { pub async fn tx_buf(&mut self) -> &mut [u8] { - let p = self.tx_chan.recv().await; + let p = self.tx_chan.receive().await; &mut p.buf[..p.len] } pub fn try_tx_buf(&mut self) -> Option<&mut [u8]> { - let p = self.tx_chan.try_recv()?; + let p = self.tx_chan.try_receive()?; Some(&mut p.buf[..p.len]) } pub fn poll_tx_buf(&mut self, cx: &mut Context) -> Poll<&mut [u8]> { - match self.tx_chan.poll_recv(cx) { + match self.tx_chan.poll_receive(cx) { Poll::Ready(p) => Poll::Ready(&mut p.buf[..p.len]), Poll::Pending => Poll::Pending, } } pub fn tx_done(&mut self) { - self.tx_chan.recv_done(); + self.tx_chan.receive_done(); } } @@ -294,7 +294,7 @@ impl<'d, const MTU: usize> embassy_net_driver::Driver for Device<'d, MTU> { type TxToken<'a> = TxToken<'a, MTU> where Self: 'a ; fn receive(&mut self, cx: &mut Context) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> { - if self.rx.poll_recv(cx).is_ready() && self.tx.poll_send(cx).is_ready() { + if self.rx.poll_receive(cx).is_ready() && self.tx.poll_send(cx).is_ready() { Some((RxToken { rx: self.rx.borrow() }, TxToken { tx: self.tx.borrow() })) } else { None @@ -338,9 +338,9 @@ impl<'a, const MTU: usize> embassy_net_driver::RxToken for RxToken<'a, MTU> { F: FnOnce(&mut [u8]) -> R, { // NOTE(unwrap): we checked the queue wasn't full when creating the token. - let pkt = unwrap!(self.rx.try_recv()); + let pkt = unwrap!(self.rx.try_receive()); let r = f(&mut pkt.buf[..pkt.len]); - self.rx.recv_done(); + self.rx.receive_done(); r } } diff --git a/embassy-sync/src/zero_copy_channel.rs b/embassy-sync/src/zero_copy_channel.rs index 3701ccf1..cbb8cb52 100644 --- a/embassy-sync/src/zero_copy_channel.rs +++ b/embassy-sync/src/zero_copy_channel.rs @@ -27,7 +27,7 @@ impl<'a, M: RawMutex, T> Channel<'a, M, T> { back: 0, full: false, send_waker: WakerRegistration::new(), - recv_waker: WakerRegistration::new(), + receive_waker: WakerRegistration::new(), })), } } @@ -62,7 +62,7 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> { match s.push_index() { Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }), None => { - s.recv_waker.register(cx.waker()); + s.receive_waker.register(cx.waker()); Poll::Pending } } @@ -76,7 +76,7 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> { match s.push_index() { Some(i) => Poll::Ready(i), None => { - s.recv_waker.register(cx.waker()); + s.receive_waker.register(cx.waker()); Poll::Pending } } @@ -99,7 +99,7 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> { Receiver { channel: self.channel } } - pub fn try_recv(&mut self) -> Option<&mut T> { + pub fn try_receive(&mut self) -> Option<&mut T> { self.channel.state.lock(|s| { let s = &mut *s.borrow_mut(); match s.pop_index() { @@ -109,7 +109,7 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> { }) } - pub fn poll_recv(&mut self, cx: &mut Context) -> Poll<&mut T> { + pub fn poll_receive(&mut self, cx: &mut Context) -> Poll<&mut T> { self.channel.state.lock(|s| { let s = &mut *s.borrow_mut(); match s.pop_index() { @@ -122,7 +122,7 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> { }) } - pub async fn recv(&mut self) -> &mut T { + pub async fn receive(&mut self) -> &mut T { let i = poll_fn(|cx| { self.channel.state.lock(|s| { let s = &mut *s.borrow_mut(); @@ -139,7 +139,7 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> { unsafe { &mut *self.channel.buf.add(i) } } - pub fn recv_done(&mut self) { + pub fn receive_done(&mut self) { self.channel.state.lock(|s| s.borrow_mut().pop_done()) } } @@ -157,7 +157,7 @@ struct State { full: bool, send_waker: WakerRegistration, - recv_waker: WakerRegistration, + receive_waker: WakerRegistration, } impl State { @@ -204,6 +204,6 @@ impl State { assert!(!self.is_empty()); self.front = self.increment(self.front); self.full = false; - self.recv_waker.wake(); + self.receive_waker.wake(); } } From 6e38b0764253ba07d3106ce3d57c2fd3509d7beb Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Fri, 11 Aug 2023 13:50:12 +0200 Subject: [PATCH 3/4] Add docs to zero-copy-channel --- embassy-sync/src/zero_copy_channel.rs | 51 +++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/embassy-sync/src/zero_copy_channel.rs b/embassy-sync/src/zero_copy_channel.rs index cbb8cb52..f704cbd5 100644 --- a/embassy-sync/src/zero_copy_channel.rs +++ b/embassy-sync/src/zero_copy_channel.rs @@ -1,3 +1,22 @@ +//! A zero-copy queue for sending values between asynchronous tasks. +//! +//! It can be used concurrently by multiple producers (senders) and multiple +//! consumers (receivers), i.e. it is an "MPMC channel". +//! +//! Receivers are competing for messages. So a message that is received by +//! one receiver is not received by any other. +//! +//! This queue takes a Mutex type so that various +//! targets can be attained. For example, a ThreadModeMutex can be used +//! for single-core Cortex-M targets where messages are only passed +//! between tasks running in thread mode. Similarly, a CriticalSectionMutex +//! can also be used for single-core targets where messages are to be +//! passed from exception mode e.g. out of an interrupt handler. +//! +//! This module provides a bounded channel that has a limit on the number of +//! messages that it can store, and if this limit is reached, trying to send +//! another message will result in an error being returned. + use core::cell::RefCell; use core::future::poll_fn; use core::marker::PhantomData; @@ -7,6 +26,17 @@ use crate::blocking_mutex::raw::RawMutex; use crate::blocking_mutex::Mutex; use crate::waitqueue::WakerRegistration; +/// A bounded zero-copy channel for communicating between asynchronous tasks +/// with backpressure. +/// +/// The channel will buffer up to the provided number of messages. Once the +/// buffer is full, attempts to `send` new messages will wait until a message is +/// received from the channel. +/// +/// All data sent will become available in the same order as it was sent. +/// +/// The channel requires a buffer of recyclable elements. Writing to the channel is done through +/// an `&mut T`. pub struct Channel<'a, M: RawMutex, T> { buf: *mut T, phantom: PhantomData<&'a mut T>, @@ -14,6 +44,10 @@ pub struct Channel<'a, M: RawMutex, T> { } impl<'a, M: RawMutex, T> Channel<'a, M, T> { + /// Initialize a new [`Channel`]. + /// + /// The provided buffer will be used and reused by the channel's logic, and thus dictates the + /// channel's capacity. pub fn new(buf: &'a mut [T]) -> Self { let len = buf.len(); assert!(len != 0); @@ -32,20 +66,27 @@ impl<'a, M: RawMutex, T> Channel<'a, M, T> { } } + /// Creates a [`Sender`] and [`Receiver`] from an existing channel. + /// + /// Further Senders and Receivers can be created through [`Sender::borrow`] and + /// [`Receiver::borrow`] respectively. pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) { (Sender { channel: self }, Receiver { channel: self }) } } +/// Send-only access to a [`Channel`]. pub struct Sender<'a, M: RawMutex, T> { channel: &'a Channel<'a, M, T>, } impl<'a, M: RawMutex, T> Sender<'a, M, T> { + /// Creates one further [`Sender`] over the same channel. pub fn borrow(&mut self) -> Sender<'_, M, T> { Sender { channel: self.channel } } + /// Attempts to send a value over the channel. pub fn try_send(&mut self) -> Option<&mut T> { self.channel.state.lock(|s| { let s = &mut *s.borrow_mut(); @@ -56,6 +97,7 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> { }) } + /// Attempts to send a value over the channel. pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> { self.channel.state.lock(|s| { let s = &mut *s.borrow_mut(); @@ -69,6 +111,7 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> { }) } + /// Asynchronously send a value over the channel. pub async fn send(&mut self) -> &mut T { let i = poll_fn(|cx| { self.channel.state.lock(|s| { @@ -86,19 +129,24 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> { unsafe { &mut *self.channel.buf.add(i) } } + /// Notify the channel that the sending of the value has been finalized. pub fn send_done(&mut self) { self.channel.state.lock(|s| s.borrow_mut().push_done()) } } + +/// Receive-only access to a [`Channel`]. pub struct Receiver<'a, M: RawMutex, T> { channel: &'a Channel<'a, M, T>, } impl<'a, M: RawMutex, T> Receiver<'a, M, T> { + /// Creates one further [`Sender`] over the same channel. pub fn borrow(&mut self) -> Receiver<'_, M, T> { Receiver { channel: self.channel } } + /// Attempts to receive a value over the channel. pub fn try_receive(&mut self) -> Option<&mut T> { self.channel.state.lock(|s| { let s = &mut *s.borrow_mut(); @@ -109,6 +157,7 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> { }) } + /// Attempts to asynchronously receive a value over the channel. pub fn poll_receive(&mut self, cx: &mut Context) -> Poll<&mut T> { self.channel.state.lock(|s| { let s = &mut *s.borrow_mut(); @@ -122,6 +171,7 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> { }) } + /// Asynchronously receive a value over the channel. pub async fn receive(&mut self) -> &mut T { let i = poll_fn(|cx| { self.channel.state.lock(|s| { @@ -139,6 +189,7 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> { unsafe { &mut *self.channel.buf.add(i) } } + /// Notify the channel that the receiving of the value has been finalized. pub fn receive_done(&mut self) { self.channel.state.lock(|s| s.borrow_mut().pop_done()) } From 615882ebd67f4e7e60fb8aa1505b1272655c4fa4 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Mon, 4 Sep 2023 22:16:28 +0200 Subject: [PATCH 4/4] Rename zero_copy -> zerocopy. --- embassy-net-driver-channel/src/lib.rs | 26 +++++++++---------- embassy-sync/src/lib.rs | 2 +- ...ro_copy_channel.rs => zerocopy_channel.rs} | 0 3 files changed, 14 insertions(+), 14 deletions(-) rename embassy-sync/src/{zero_copy_channel.rs => zerocopy_channel.rs} (100%) diff --git a/embassy-net-driver-channel/src/lib.rs b/embassy-net-driver-channel/src/lib.rs index a2076074..bf7ae521 100644 --- a/embassy-net-driver-channel/src/lib.rs +++ b/embassy-net-driver-channel/src/lib.rs @@ -14,7 +14,7 @@ use embassy_net_driver::{Capabilities, LinkState, Medium}; use embassy_sync::blocking_mutex::raw::NoopRawMutex; use embassy_sync::blocking_mutex::Mutex; use embassy_sync::waitqueue::WakerRegistration; -use embassy_sync::zero_copy_channel; +use embassy_sync::zerocopy_channel; pub struct State { rx: [PacketBuf; N_RX], @@ -35,8 +35,8 @@ impl State { - rx: zero_copy_channel::Channel<'d, NoopRawMutex, PacketBuf>, - tx: zero_copy_channel::Channel<'d, NoopRawMutex, PacketBuf>, + rx: zerocopy_channel::Channel<'d, NoopRawMutex, PacketBuf>, + tx: zerocopy_channel::Channel<'d, NoopRawMutex, PacketBuf>, shared: Mutex>, } @@ -48,8 +48,8 @@ struct Shared { } pub struct Runner<'d, const MTU: usize> { - tx_chan: zero_copy_channel::Receiver<'d, NoopRawMutex, PacketBuf>, - rx_chan: zero_copy_channel::Sender<'d, NoopRawMutex, PacketBuf>, + tx_chan: zerocopy_channel::Receiver<'d, NoopRawMutex, PacketBuf>, + rx_chan: zerocopy_channel::Sender<'d, NoopRawMutex, PacketBuf>, shared: &'d Mutex>, } @@ -59,11 +59,11 @@ pub struct StateRunner<'d> { } pub struct RxRunner<'d, const MTU: usize> { - rx_chan: zero_copy_channel::Sender<'d, NoopRawMutex, PacketBuf>, + rx_chan: zerocopy_channel::Sender<'d, NoopRawMutex, PacketBuf>, } pub struct TxRunner<'d, const MTU: usize> { - tx_chan: zero_copy_channel::Receiver<'d, NoopRawMutex, PacketBuf>, + tx_chan: zerocopy_channel::Receiver<'d, NoopRawMutex, PacketBuf>, } impl<'d, const MTU: usize> Runner<'d, MTU> { @@ -244,8 +244,8 @@ pub fn new<'d, const MTU: usize, const N_RX: usize, const N_TX: usize>( let state_uninit: *mut MaybeUninit> = (&mut state.inner as *mut MaybeUninit>).cast(); let state = unsafe { &mut *state_uninit }.write(StateInner { - rx: zero_copy_channel::Channel::new(&mut state.rx[..]), - tx: zero_copy_channel::Channel::new(&mut state.tx[..]), + rx: zerocopy_channel::Channel::new(&mut state.rx[..]), + tx: zerocopy_channel::Channel::new(&mut state.tx[..]), shared: Mutex::new(RefCell::new(Shared { link_state: LinkState::Down, hardware_address, @@ -283,8 +283,8 @@ impl PacketBuf { } pub struct Device<'d, const MTU: usize> { - rx: zero_copy_channel::Receiver<'d, NoopRawMutex, PacketBuf>, - tx: zero_copy_channel::Sender<'d, NoopRawMutex, PacketBuf>, + rx: zerocopy_channel::Receiver<'d, NoopRawMutex, PacketBuf>, + tx: zerocopy_channel::Sender<'d, NoopRawMutex, PacketBuf>, shared: &'d Mutex>, caps: Capabilities, } @@ -329,7 +329,7 @@ impl<'d, const MTU: usize> embassy_net_driver::Driver for Device<'d, MTU> { } pub struct RxToken<'a, const MTU: usize> { - rx: zero_copy_channel::Receiver<'a, NoopRawMutex, PacketBuf>, + rx: zerocopy_channel::Receiver<'a, NoopRawMutex, PacketBuf>, } impl<'a, const MTU: usize> embassy_net_driver::RxToken for RxToken<'a, MTU> { @@ -346,7 +346,7 @@ impl<'a, const MTU: usize> embassy_net_driver::RxToken for RxToken<'a, MTU> { } pub struct TxToken<'a, const MTU: usize> { - tx: zero_copy_channel::Sender<'a, NoopRawMutex, PacketBuf>, + tx: zerocopy_channel::Sender<'a, NoopRawMutex, PacketBuf>, } impl<'a, const MTU: usize> embassy_net_driver::TxToken for TxToken<'a, MTU> { diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs index 48a7b13f..8a9f841e 100644 --- a/embassy-sync/src/lib.rs +++ b/embassy-sync/src/lib.rs @@ -17,4 +17,4 @@ pub mod pipe; pub mod pubsub; pub mod signal; pub mod waitqueue; -pub mod zero_copy_channel; +pub mod zerocopy_channel; diff --git a/embassy-sync/src/zero_copy_channel.rs b/embassy-sync/src/zerocopy_channel.rs similarity index 100% rename from embassy-sync/src/zero_copy_channel.rs rename to embassy-sync/src/zerocopy_channel.rs