Remove the cell and trait

At the expense of exposing the channel types again. We do this as we want to avoid using dyn traits given their overhead for embedded environments.
This commit is contained in:
huntc 2021-07-09 12:04:22 +10:00
parent 56b3e927fe
commit 5f87c7808c
2 changed files with 90 additions and 87 deletions

View File

@ -51,55 +51,33 @@ use super::CriticalSectionMutex;
use super::Mutex; use super::Mutex;
use super::ThreadModeMutex; use super::ThreadModeMutex;
/// A ChannelCell permits a channel to be shared between senders and their receivers.
// Derived from UnsafeCell.
#[repr(transparent)]
pub struct ChannelCell<T: ?Sized> {
_value: T,
}
impl<T> ChannelCell<T> {
#[inline(always)]
pub const fn new<U>(value: T) -> ChannelCell<T>
where
T: ChannelLike<U>,
{
ChannelCell { _value: value }
}
}
impl<T: ?Sized> ChannelCell<T> {
#[inline(always)]
const fn get(&self) -> *mut T {
// As per UnsafeCell:
// We can just cast the pointer from `ChannelCell<T>` to `T` because of
// #[repr(transparent)]. This exploits libstd's special status, there is
// no guarantee for user code that this will work in future versions of the compiler!
self as *const ChannelCell<T> as *const T as *mut T
}
}
/// Send values to the associated `Receiver`. /// Send values to the associated `Receiver`.
/// ///
/// Instances are created by the [`split`](split) function. /// Instances are created by the [`split`](split) function.
pub struct Sender<'ch, T> { pub struct Sender<'ch, M, T, const N: usize>
channel: &'ch ChannelCell<dyn ChannelLike<T>>, where
M: Mutex<Data = ()>,
{
channel: &'ch Channel<M, T, N>,
} }
// Safe to pass the sender around // Safe to pass the sender around
unsafe impl<'ch, T> Send for Sender<'ch, T> {} unsafe impl<'ch, M, T, const N: usize> Send for Sender<'ch, M, T, N> where M: Mutex<Data = ()> {}
unsafe impl<'ch, T> Sync for Sender<'ch, T> {} unsafe impl<'ch, M, T, const N: usize> Sync for Sender<'ch, M, T, N> where M: Mutex<Data = ()> {}
/// Receive values from the associated `Sender`. /// Receive values from the associated `Sender`.
/// ///
/// Instances are created by the [`split`](split) function. /// Instances are created by the [`split`](split) function.
pub struct Receiver<'ch, T> { pub struct Receiver<'ch, M, T, const N: usize>
channel: &'ch ChannelCell<dyn ChannelLike<T>>, where
M: Mutex<Data = ()>,
{
channel: &'ch Channel<M, T, N>,
} }
// Safe to pass the receiver around // Safe to pass the receiver around
unsafe impl<'ch, T> Send for Receiver<'ch, T> {} unsafe impl<'ch, M, T, const N: usize> Send for Receiver<'ch, M, T, N> where M: Mutex<Data = ()> {}
unsafe impl<'ch, T> Sync for Receiver<'ch, T> {} unsafe impl<'ch, M, T, const N: usize> Sync for Receiver<'ch, M, T, N> where M: Mutex<Data = ()> {}
/// Splits a bounded mpsc channel into a `Sender` and `Receiver`. /// Splits a bounded mpsc channel into a `Sender` and `Receiver`.
/// ///
@ -125,18 +103,26 @@ unsafe impl<'ch, T> Sync for Receiver<'ch, T> {}
/// mpsc::split(&channel) /// mpsc::split(&channel)
/// }; /// };
/// ``` /// ```
pub fn split<T>(channel: &ChannelCell<dyn ChannelLike<T>>) -> (Sender<T>, Receiver<T>) { pub fn split<M, T, const N: usize>(
channel: &Channel<M, T, N>,
) -> (Sender<M, T, N>, Receiver<M, T, N>)
where
M: Mutex<Data = ()>,
{
let sender = Sender { channel: &channel }; let sender = Sender { channel: &channel };
let receiver = Receiver { channel: &channel }; let receiver = Receiver { channel: &channel };
{ {
let c = unsafe { &mut *channel.get() }; let c = channel.get();
c.register_receiver(); c.register_receiver();
c.register_sender(); c.register_sender();
} }
(sender, receiver) (sender, receiver)
} }
impl<'ch, T> Receiver<'ch, T> { impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N>
where
M: Mutex<Data = ()>,
{
/// Receives the next value for this receiver. /// Receives the next value for this receiver.
/// ///
/// This method returns `None` if the channel has been closed and there are /// This method returns `None` if the channel has been closed and there are
@ -162,7 +148,7 @@ impl<'ch, T> Receiver<'ch, T> {
/// This method will either receive a message from the channel immediately or return an error /// This method will either receive a message from the channel immediately or return an error
/// if the channel is empty. /// if the channel is empty.
pub fn try_recv(&self) -> Result<T, TryRecvError> { pub fn try_recv(&self) -> Result<T, TryRecvError> {
unsafe { &mut *self.channel.get() }.try_recv() self.channel.get().try_recv()
} }
/// Closes the receiving half of a channel without dropping it. /// Closes the receiving half of a channel without dropping it.
@ -176,11 +162,14 @@ impl<'ch, T> Receiver<'ch, T> {
/// until those are released. /// until those are released.
/// ///
pub fn close(&mut self) { pub fn close(&mut self) {
unsafe { &mut *self.channel.get() }.close() self.channel.get().close()
} }
} }
impl<'ch, T> Future for Receiver<'ch, T> { impl<'ch, M, T, const N: usize> Future for Receiver<'ch, M, T, N>
where
M: Mutex<Data = ()>,
{
type Output = Option<T>; type Output = Option<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@ -188,20 +177,26 @@ impl<'ch, T> Future for Receiver<'ch, T> {
Ok(v) => Poll::Ready(Some(v)), Ok(v) => Poll::Ready(Some(v)),
Err(TryRecvError::Closed) => Poll::Ready(None), Err(TryRecvError::Closed) => Poll::Ready(None),
Err(TryRecvError::Empty) => { Err(TryRecvError::Empty) => {
unsafe { &mut *self.channel.get() }.set_receiver_waker(cx.waker().clone()); self.channel.get().set_receiver_waker(cx.waker().clone());
Poll::Pending Poll::Pending
} }
} }
} }
} }
impl<'ch, T> Drop for Receiver<'ch, T> { impl<'ch, M, T, const N: usize> Drop for Receiver<'ch, M, T, N>
where
M: Mutex<Data = ()>,
{
fn drop(&mut self) { fn drop(&mut self) {
unsafe { &mut *self.channel.get() }.deregister_receiver() self.channel.get().deregister_receiver()
} }
} }
impl<'ch, T> Sender<'ch, T> { impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N>
where
M: Mutex<Data = ()>,
{
/// Sends a value, waiting until there is capacity. /// Sends a value, waiting until there is capacity.
/// ///
/// A successful send occurs when it is determined that the other end of the /// A successful send occurs when it is determined that the other end of the
@ -249,7 +244,7 @@ impl<'ch, T> Sender<'ch, T> {
/// [`channel`]: channel /// [`channel`]: channel
/// [`close`]: Receiver::close /// [`close`]: Receiver::close
pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
unsafe { &mut *self.channel.get() }.try_send(message) self.channel.get().try_send(message)
} }
/// Completes when the receiver has dropped. /// Completes when the receiver has dropped.
@ -270,16 +265,22 @@ impl<'ch, T> Sender<'ch, T> {
/// [`Receiver`]: crate::sync::mpsc::Receiver /// [`Receiver`]: crate::sync::mpsc::Receiver
/// [`Receiver::close`]: crate::sync::mpsc::Receiver::close /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
pub fn is_closed(&self) -> bool { pub fn is_closed(&self) -> bool {
unsafe { &mut *self.channel.get() }.is_closed() self.channel.get().is_closed()
} }
} }
struct SendFuture<'ch, T> { struct SendFuture<'ch, M, T, const N: usize>
sender: Sender<'ch, T>, where
M: Mutex<Data = ()>,
{
sender: Sender<'ch, M, T, N>,
message: UnsafeCell<T>, message: UnsafeCell<T>,
} }
impl<'ch, T> Future for SendFuture<'ch, T> { impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N>
where
M: Mutex<Data = ()>,
{
type Output = Result<(), SendError<T>>; type Output = Result<(), SendError<T>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@ -287,7 +288,10 @@ impl<'ch, T> Future for SendFuture<'ch, T> {
Ok(..) => Poll::Ready(Ok(())), Ok(..) => Poll::Ready(Ok(())),
Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))), Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))),
Err(TrySendError::Full(..)) => { Err(TrySendError::Full(..)) => {
unsafe { &mut *self.sender.channel.get() }.set_senders_waker(cx.waker().clone()); self.sender
.channel
.get()
.set_senders_waker(cx.waker().clone());
Poll::Pending Poll::Pending
// Note we leave the existing UnsafeCell contents - they still // Note we leave the existing UnsafeCell contents - they still
// contain the original message. We could create another UnsafeCell // contain the original message. We could create another UnsafeCell
@ -297,33 +301,48 @@ impl<'ch, T> Future for SendFuture<'ch, T> {
} }
} }
struct CloseFuture<'ch, T> { struct CloseFuture<'ch, M, T, const N: usize>
sender: Sender<'ch, T>, where
M: Mutex<Data = ()>,
{
sender: Sender<'ch, M, T, N>,
} }
impl<'ch, T> Future for CloseFuture<'ch, T> { impl<'ch, M, T, const N: usize> Future for CloseFuture<'ch, M, T, N>
where
M: Mutex<Data = ()>,
{
type Output = (); type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.sender.is_closed() { if self.sender.is_closed() {
Poll::Ready(()) Poll::Ready(())
} else { } else {
unsafe { &mut *self.sender.channel.get() }.set_senders_waker(cx.waker().clone()); self.sender
.channel
.get()
.set_senders_waker(cx.waker().clone());
Poll::Pending Poll::Pending
} }
} }
} }
impl<'ch, T> Drop for Sender<'ch, T> { impl<'ch, M, T, const N: usize> Drop for Sender<'ch, M, T, N>
where
M: Mutex<Data = ()>,
{
fn drop(&mut self) { fn drop(&mut self) {
unsafe { &mut *self.channel.get() }.deregister_sender() self.channel.get().deregister_sender()
} }
} }
impl<'ch, T> Clone for Sender<'ch, T> { impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N>
where
M: Mutex<Data = ()>,
{
#[allow(clippy::clone_double_ref)] #[allow(clippy::clone_double_ref)]
fn clone(&self) -> Self { fn clone(&self) -> Self {
unsafe { &mut *self.channel.get() }.register_sender(); self.channel.get().register_sender();
Sender { Sender {
channel: self.channel.clone(), channel: self.channel.clone(),
} }
@ -378,28 +397,6 @@ impl<T> fmt::Display for TrySendError<T> {
} }
} }
pub trait ChannelLike<T> {
fn try_recv(&mut self) -> Result<T, TryRecvError>;
fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>>;
fn close(&mut self);
fn is_closed(&mut self) -> bool;
fn register_receiver(&mut self);
fn deregister_receiver(&mut self);
fn register_sender(&mut self);
fn deregister_sender(&mut self);
fn set_receiver_waker(&mut self, receiver_waker: Waker);
fn set_senders_waker(&mut self, senders_waker: Waker);
}
struct ChannelState<T, const N: usize> { struct ChannelState<T, const N: usize> {
buf: [MaybeUninit<UnsafeCell<T>>; N], buf: [MaybeUninit<UnsafeCell<T>>; N],
read_pos: usize, read_pos: usize,
@ -505,10 +502,16 @@ impl<T, const N: usize> Channel<WithThreadModeOnly, T, N> {
} }
} }
impl<M, T, const N: usize> ChannelLike<T> for Channel<M, T, N> impl<M, T, const N: usize> Channel<M, T, N>
where where
M: Mutex<Data = ()>, M: Mutex<Data = ()>,
{ {
fn get(&self) -> &mut Self {
let const_ptr = self as *const Self;
let mut_ptr = const_ptr as *mut Self;
unsafe { &mut *mut_ptr }
}
fn try_recv(&mut self) -> Result<T, TryRecvError> { fn try_recv(&mut self) -> Result<T, TryRecvError> {
let state = &mut self.state; let state = &mut self.state;
self.mutex.lock(|_| { self.mutex.lock(|_| {

View File

@ -11,7 +11,7 @@ mod example_common;
use defmt::panic; use defmt::panic;
use embassy::executor::Spawner; use embassy::executor::Spawner;
use embassy::time::{Duration, Timer}; use embassy::time::{Duration, Timer};
use embassy::util::mpsc::{ChannelCell, TryRecvError}; use embassy::util::mpsc::TryRecvError;
use embassy::util::{mpsc, Forever}; use embassy::util::{mpsc, Forever};
use embassy_nrf::gpio::{Level, Output, OutputDrive}; use embassy_nrf::gpio::{Level, Output, OutputDrive};
use embassy_nrf::Peripherals; use embassy_nrf::Peripherals;
@ -23,10 +23,10 @@ enum LedState {
Off, Off,
} }
static CHANNEL: Forever<ChannelCell<Channel<WithThreadModeOnly, LedState, 1>>> = Forever::new(); static CHANNEL: Forever<Channel<WithThreadModeOnly, LedState, 1>> = Forever::new();
#[embassy::task(pool_size = 1)] #[embassy::task(pool_size = 1)]
async fn my_task(sender: Sender<'static, LedState>) { async fn my_task(sender: Sender<'static, WithThreadModeOnly, LedState, 1>) {
loop { loop {
let _ = sender.send(LedState::On).await; let _ = sender.send(LedState::On).await;
Timer::after(Duration::from_secs(1)).await; Timer::after(Duration::from_secs(1)).await;
@ -39,7 +39,7 @@ async fn my_task(sender: Sender<'static, LedState>) {
async fn main(spawner: Spawner, p: Peripherals) { async fn main(spawner: Spawner, p: Peripherals) {
let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard); let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard);
let channel = CHANNEL.put(ChannelCell::new(Channel::with_thread_mode_only())); let channel = CHANNEL.put(Channel::with_thread_mode_only());
let (sender, mut receiver) = mpsc::split(channel); let (sender, mut receiver) = mpsc::split(channel);
spawner.spawn(my_task(sender)).unwrap(); spawner.spawn(my_task(sender)).unwrap();