From 7b838d03369f94e09d652982f994c5013e81457e Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Mon, 7 Nov 2022 00:27:21 +0100 Subject: [PATCH] rp/uart: use lockfree ringbuffer. This gets rid of another PeripheralMutex usage. --- embassy-hal-common/src/atomic_ring_buffer.rs | 331 +++++++++ embassy-hal-common/src/lib.rs | 1 + embassy-rp/Cargo.toml | 2 +- embassy-rp/src/uart/buffered.rs | 706 ++++++++++--------- embassy-rp/src/uart/mod.rs | 91 ++- tests/rp/src/bin/uart_buffered.rs | 13 +- 6 files changed, 758 insertions(+), 386 deletions(-) create mode 100644 embassy-hal-common/src/atomic_ring_buffer.rs diff --git a/embassy-hal-common/src/atomic_ring_buffer.rs b/embassy-hal-common/src/atomic_ring_buffer.rs new file mode 100644 index 00000000..c5e44430 --- /dev/null +++ b/embassy-hal-common/src/atomic_ring_buffer.rs @@ -0,0 +1,331 @@ +use core::slice; +use core::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; + +/// Atomic reusable ringbuffer +/// +/// This ringbuffer implementation is designed to be stored in a `static`, +/// therefore all methods take `&self` and not `&mut self`. +/// +/// It is "reusable": when created it has no backing buffer, you can give it +/// one with `init` and take it back with `deinit`, and init it again in the +/// future if needed. This is very non-idiomatic, but helps a lot when storing +/// it in a `static`. +/// +/// One concurrent writer and one concurrent reader are supported, even at +/// different execution priorities (like main and irq). +pub struct RingBuffer { + buf: AtomicPtr, + len: AtomicUsize, + start: AtomicUsize, + end: AtomicUsize, +} + +pub struct Reader<'a>(&'a RingBuffer); +pub struct Writer<'a>(&'a RingBuffer); + +impl RingBuffer { + /// Create a new empty ringbuffer. + pub const fn new() -> Self { + Self { + buf: AtomicPtr::new(core::ptr::null_mut()), + len: AtomicUsize::new(0), + start: AtomicUsize::new(0), + end: AtomicUsize::new(0), + } + } + + /// Initialize the ring buffer with a buffer. + /// + /// # Safety + /// - The buffer (`buf .. buf+len`) must be valid memory until `deinit` is called. + /// - Must not be called concurrently with any other methods. + pub unsafe fn init(&self, buf: *mut u8, len: usize) { + // Ordering: it's OK to use `Relaxed` because this is not called + // concurrently with other methods. + self.buf.store(buf, Ordering::Relaxed); + self.len.store(len, Ordering::Relaxed); + self.start.store(0, Ordering::Relaxed); + self.end.store(0, Ordering::Relaxed); + } + + /// Deinitialize the ringbuffer. + /// + /// After calling this, the ringbuffer becomes empty, as if it was + /// just created with `new()`. + /// + /// # Safety + /// - Must not be called concurrently with any other methods. + pub unsafe fn deinit(&self) { + // Ordering: it's OK to use `Relaxed` because this is not called + // concurrently with other methods. + self.len.store(0, Ordering::Relaxed); + self.start.store(0, Ordering::Relaxed); + self.end.store(0, Ordering::Relaxed); + } + + /// Create a reader. + /// + /// # Safety + /// + /// Only one reader can exist at a time. + pub unsafe fn reader(&self) -> Reader<'_> { + Reader(self) + } + + /// Create a writer. + /// + /// # Safety + /// + /// Only one writer can exist at a time. + pub unsafe fn writer(&self) -> Writer<'_> { + Writer(self) + } + + pub fn is_full(&self) -> bool { + let start = self.start.load(Ordering::Relaxed); + let end = self.end.load(Ordering::Relaxed); + + self.wrap(end + 1) == start + } + + pub fn is_empty(&self) -> bool { + let start = self.start.load(Ordering::Relaxed); + let end = self.end.load(Ordering::Relaxed); + + start == end + } + + fn wrap(&self, n: usize) -> usize { + let len = self.len.load(Ordering::Relaxed); + + assert!(n <= len); + if n == len { + 0 + } else { + n + } + } +} + +impl<'a> Writer<'a> { + /// Push data into the buffer in-place. + /// + /// The closure `f` is called with a free part of the buffer, it must write + /// some data to it and return the amount of bytes written. + pub fn push(&mut self, f: impl FnOnce(&mut [u8]) -> usize) -> usize { + let (p, n) = self.push_buf(); + let buf = unsafe { slice::from_raw_parts_mut(p, n) }; + let n = f(buf); + self.push_done(n); + n + } + + /// Push one data byte. + /// + /// Returns true if pushed succesfully. + pub fn push_one(&mut self, val: u8) -> bool { + let n = self.push(|f| match f { + [] => 0, + [x, ..] => { + *x = val; + 1 + } + }); + n != 0 + } + + /// Get a buffer where data can be pushed to. + /// + /// Write data to the start of the buffer, then call `push_done` with + /// however many bytes you've pushed. + /// + /// The buffer is suitable to DMA to. + /// + /// If the ringbuf is full, size=0 will be returned. + /// + /// The buffer stays valid as long as no other `Writer` method is called + /// and `init`/`deinit` aren't called on the ringbuf. + pub fn push_buf(&mut self) -> (*mut u8, usize) { + // Ordering: popping writes `start` last, so we read `start` first. + // Read it with Acquire ordering, so that the next accesses can't be reordered up past it. + let start = self.0.start.load(Ordering::Acquire); + let buf = self.0.buf.load(Ordering::Relaxed); + let len = self.0.len.load(Ordering::Relaxed); + let end = self.0.end.load(Ordering::Relaxed); + + let n = if start <= end { + len - end - (start == 0) as usize + } else { + start - end - 1 + }; + + trace!(" ringbuf: push_buf {:?}..{:?}", end, end + n); + (unsafe { buf.add(end) }, n) + } + + pub fn push_done(&mut self, n: usize) { + trace!(" ringbuf: push {:?}", n); + let end = self.0.end.load(Ordering::Relaxed); + + // Ordering: write `end` last, with Release ordering. + // The ordering ensures no preceding memory accesses (such as writing + // the actual data in the buffer) can be reordered down past it, which + // will guarantee the reader sees them after reading from `end`. + self.0.end.store(self.0.wrap(end + n), Ordering::Release); + } +} + +impl<'a> Reader<'a> { + /// Pop data from the buffer in-place. + /// + /// The closure `f` is called with the next data, it must process + /// some data from it and return the amount of bytes processed. + pub fn pop(&mut self, f: impl FnOnce(&[u8]) -> usize) -> usize { + let (p, n) = self.pop_buf(); + let buf = unsafe { slice::from_raw_parts(p, n) }; + let n = f(buf); + self.pop_done(n); + n + } + + /// Pop one data byte. + /// + /// Returns true if popped succesfully. + pub fn pop_one(&mut self) -> Option { + let mut res = None; + self.pop(|f| match f { + &[] => 0, + &[x, ..] => { + res = Some(x); + 1 + } + }); + res + } + + /// Get a buffer where data can be popped from. + /// + /// Read data from the start of the buffer, then call `pop_done` with + /// however many bytes you've processed. + /// + /// The buffer is suitable to DMA from. + /// + /// If the ringbuf is empty, size=0 will be returned. + /// + /// The buffer stays valid as long as no other `Reader` method is called + /// and `init`/`deinit` aren't called on the ringbuf. + pub fn pop_buf(&mut self) -> (*mut u8, usize) { + // Ordering: pushing writes `end` last, so we read `end` first. + // Read it with Acquire ordering, so that the next accesses can't be reordered up past it. + // This is needed to guarantee we "see" the data written by the writer. + let end = self.0.end.load(Ordering::Acquire); + let buf = self.0.buf.load(Ordering::Relaxed); + let len = self.0.len.load(Ordering::Relaxed); + let start = self.0.start.load(Ordering::Relaxed); + + let n = if end < start { len - start } else { end - start }; + + trace!(" ringbuf: pop_buf {:?}..{:?}", start, start + n); + (unsafe { buf.add(start) }, n) + } + + pub fn pop_done(&mut self, n: usize) { + trace!(" ringbuf: pop {:?}", n); + + let start = self.0.start.load(Ordering::Relaxed); + + // Ordering: write `start` last, with Release ordering. + // The ordering ensures no preceding memory accesses (such as reading + // the actual data) can be reordered down past it. This is necessary + // because writing to `start` is effectively freeing the read part of the + // buffer, which "gives permission" to the writer to write to it again. + // Therefore, all buffer accesses must be completed before this. + self.0.start.store(self.0.wrap(start + n), Ordering::Release); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn push_pop() { + let mut b = [0; 4]; + let rb = RingBuffer::new(); + unsafe { + rb.init(b.as_mut_ptr(), 4); + + assert_eq!(rb.is_empty(), true); + assert_eq!(rb.is_full(), false); + + rb.writer().push(|buf| { + // If capacity is 4, we can fill it up to 3. + assert_eq!(3, buf.len()); + buf[0] = 1; + buf[1] = 2; + buf[2] = 3; + 3 + }); + + assert_eq!(rb.is_empty(), false); + assert_eq!(rb.is_full(), true); + + rb.writer().push(|buf| { + // If it's full, we can push 0 bytes. + assert_eq!(0, buf.len()); + 0 + }); + + assert_eq!(rb.is_empty(), false); + assert_eq!(rb.is_full(), true); + + rb.reader().pop(|buf| { + assert_eq!(3, buf.len()); + assert_eq!(1, buf[0]); + 1 + }); + + assert_eq!(rb.is_empty(), false); + assert_eq!(rb.is_full(), false); + + rb.reader().pop(|buf| { + assert_eq!(2, buf.len()); + 0 + }); + + assert_eq!(rb.is_empty(), false); + assert_eq!(rb.is_full(), false); + + rb.reader().pop(|buf| { + assert_eq!(2, buf.len()); + assert_eq!(2, buf[0]); + assert_eq!(3, buf[1]); + 2 + }); + + assert_eq!(rb.is_empty(), true); + assert_eq!(rb.is_full(), false); + + rb.reader().pop(|buf| { + assert_eq!(0, buf.len()); + 0 + }); + + rb.writer().push(|buf| { + assert_eq!(1, buf.len()); + buf[0] = 10; + 1 + }); + + rb.writer().push(|buf| { + assert_eq!(2, buf.len()); + buf[0] = 11; + buf[1] = 12; + 2 + }); + + assert_eq!(rb.is_empty(), false); + assert_eq!(rb.is_full(), true); + } + } +} diff --git a/embassy-hal-common/src/lib.rs b/embassy-hal-common/src/lib.rs index 5d2649d0..b2a35cd3 100644 --- a/embassy-hal-common/src/lib.rs +++ b/embassy-hal-common/src/lib.rs @@ -4,6 +4,7 @@ // This mod MUST go first, so that the others see its macros. pub(crate) mod fmt; +pub mod atomic_ring_buffer; pub mod drop; mod macros; mod peripheral; diff --git a/embassy-rp/Cargo.toml b/embassy-rp/Cargo.toml index 770d8e25..daa60f9c 100644 --- a/embassy-rp/Cargo.toml +++ b/embassy-rp/Cargo.toml @@ -13,7 +13,7 @@ flavors = [ ] [features] -defmt = ["dep:defmt", "embassy-usb-driver?/defmt"] +defmt = ["dep:defmt", "embassy-usb-driver?/defmt", "embassy-hal-common/defmt"] # Reexport the PAC for the currently enabled chip at `embassy_rp::pac`. # This is unstable because semver-minor (non-breaking) releases of embassy-rp may major-bump (breaking) the PAC version. diff --git a/embassy-rp/src/uart/buffered.rs b/embassy-rp/src/uart/buffered.rs index fa466c8a..32029f81 100644 --- a/embassy-rp/src/uart/buffered.rs +++ b/embassy-rp/src/uart/buffered.rs @@ -1,337 +1,421 @@ -use core::future::poll_fn; -use core::task::{Poll, Waker}; +use core::future::{poll_fn, Future}; +use core::slice; +use core::task::Poll; -use atomic_polyfill::{compiler_fence, Ordering}; -use embassy_cortex_m::peripheral::{PeripheralMutex, PeripheralState, StateStorage}; -use embassy_hal_common::ring_buffer::RingBuffer; -use embassy_sync::waitqueue::WakerRegistration; +use embassy_cortex_m::interrupt::{Interrupt, InterruptExt}; +use embassy_hal_common::atomic_ring_buffer::RingBuffer; +use embassy_sync::waitqueue::AtomicWaker; use super::*; -pub struct State<'d, T: Instance>(StateStorage>); -impl<'d, T: Instance> State<'d, T> { +pub struct State { + tx_waker: AtomicWaker, + tx_buf: RingBuffer, + rx_waker: AtomicWaker, + rx_buf: RingBuffer, +} + +impl State { pub const fn new() -> Self { - Self(StateStorage::new()) + Self { + rx_buf: RingBuffer::new(), + tx_buf: RingBuffer::new(), + rx_waker: AtomicWaker::new(), + tx_waker: AtomicWaker::new(), + } } } -pub struct RxState<'d, T: Instance>(StateStorage>); -impl<'d, T: Instance> RxState<'d, T> { - pub const fn new() -> Self { - Self(StateStorage::new()) - } -} - -pub struct TxState<'d, T: Instance>(StateStorage>); -impl<'d, T: Instance> TxState<'d, T> { - pub const fn new() -> Self { - Self(StateStorage::new()) - } -} - -struct RxStateInner<'d, T: Instance> { - phantom: PhantomData<&'d mut T>, - - waker: WakerRegistration, - buf: RingBuffer<'d>, -} - -struct TxStateInner<'d, T: Instance> { - phantom: PhantomData<&'d mut T>, - - waker: WakerRegistration, - buf: RingBuffer<'d>, -} - -struct FullStateInner<'d, T: Instance> { - rx: RxStateInner<'d, T>, - tx: TxStateInner<'d, T>, -} - -unsafe impl<'d, T: Instance> Send for RxStateInner<'d, T> {} -unsafe impl<'d, T: Instance> Sync for RxStateInner<'d, T> {} - -unsafe impl<'d, T: Instance> Send for TxStateInner<'d, T> {} -unsafe impl<'d, T: Instance> Sync for TxStateInner<'d, T> {} - -unsafe impl<'d, T: Instance> Send for FullStateInner<'d, T> {} -unsafe impl<'d, T: Instance> Sync for FullStateInner<'d, T> {} - pub struct BufferedUart<'d, T: Instance> { - inner: PeripheralMutex<'d, FullStateInner<'d, T>>, + phantom: PhantomData<&'d mut T>, } pub struct BufferedUartRx<'d, T: Instance> { - inner: PeripheralMutex<'d, RxStateInner<'d, T>>, + phantom: PhantomData<&'d mut T>, } pub struct BufferedUartTx<'d, T: Instance> { - inner: PeripheralMutex<'d, TxStateInner<'d, T>>, + phantom: PhantomData<&'d mut T>, } -impl<'d, T: Instance> Unpin for BufferedUart<'d, T> {} -impl<'d, T: Instance> Unpin for BufferedUartRx<'d, T> {} -impl<'d, T: Instance> Unpin for BufferedUartTx<'d, T> {} - impl<'d, T: Instance> BufferedUart<'d, T> { - pub fn new( - state: &'d mut State<'d, T>, - _uart: Uart<'d, T, M>, + pub fn new( + _uart: impl Peripheral

+ 'd, irq: impl Peripheral

+ 'd, + tx: impl Peripheral

> + 'd, + rx: impl Peripheral

> + 'd, tx_buffer: &'d mut [u8], rx_buffer: &'d mut [u8], - ) -> BufferedUart<'d, T> { - into_ref!(irq); + config: Config, + ) -> Self { + into_ref!(tx, rx); + Self::new_inner( + irq, + tx.map_into(), + rx.map_into(), + None, + None, + tx_buffer, + rx_buffer, + config, + ) + } + + pub fn new_with_rtscts( + _uart: impl Peripheral

+ 'd, + irq: impl Peripheral

+ 'd, + tx: impl Peripheral

> + 'd, + rx: impl Peripheral

> + 'd, + rts: impl Peripheral

> + 'd, + cts: impl Peripheral

> + 'd, + tx_buffer: &'d mut [u8], + rx_buffer: &'d mut [u8], + config: Config, + ) -> Self { + into_ref!(tx, rx, cts, rts); + Self::new_inner( + irq, + tx.map_into(), + rx.map_into(), + Some(rts.map_into()), + Some(cts.map_into()), + tx_buffer, + rx_buffer, + config, + ) + } + + fn new_inner( + irq: impl Peripheral

+ 'd, + mut tx: PeripheralRef<'d, AnyPin>, + mut rx: PeripheralRef<'d, AnyPin>, + mut rts: Option>, + mut cts: Option>, + tx_buffer: &'d mut [u8], + rx_buffer: &'d mut [u8], + config: Config, + ) -> Self { + into_ref!(irq); + super::Uart::<'d, T, Async>::init( + Some(tx.reborrow()), + Some(rx.reborrow()), + rts.as_mut().map(|x| x.reborrow()), + cts.as_mut().map(|x| x.reborrow()), + config, + ); + + let state = T::state(); + let regs = T::regs(); + + let len = tx_buffer.len(); + unsafe { state.tx_buf.init(tx_buffer.as_mut_ptr(), len) }; + let len = rx_buffer.len(); + unsafe { state.rx_buf.init(rx_buffer.as_mut_ptr(), len) }; - let r = T::regs(); unsafe { - r.uartimsc().modify(|w| { + regs.uartimsc().modify(|w| { w.set_rxim(true); w.set_rtim(true); w.set_txim(true); }); } - Self { - inner: PeripheralMutex::new(irq, &mut state.0, move || FullStateInner { - tx: TxStateInner { - phantom: PhantomData, - waker: WakerRegistration::new(), - buf: RingBuffer::new(tx_buffer), - }, - rx: RxStateInner { - phantom: PhantomData, - waker: WakerRegistration::new(), - buf: RingBuffer::new(rx_buffer), - }, - }), - } + irq.set_handler(on_interrupt::); + irq.unpend(); + irq.enable(); + + Self { phantom: PhantomData } } } impl<'d, T: Instance> BufferedUartRx<'d, T> { - pub fn new( - state: &'d mut RxState<'d, T>, - _uart: UartRx<'d, T, M>, + pub fn new( + _uart: impl Peripheral

+ 'd, irq: impl Peripheral

+ 'd, + rx: impl Peripheral

> + 'd, rx_buffer: &'d mut [u8], - ) -> BufferedUartRx<'d, T> { - into_ref!(irq); + config: Config, + ) -> Self { + into_ref!(rx); + Self::new_inner(irq, rx.map_into(), None, rx_buffer, config) + } + + pub fn new_with_rts( + _uart: impl Peripheral

+ 'd, + irq: impl Peripheral

+ 'd, + rx: impl Peripheral

> + 'd, + rts: impl Peripheral

> + 'd, + rx_buffer: &'d mut [u8], + config: Config, + ) -> Self { + into_ref!(rx, rts); + Self::new_inner(irq, rx.map_into(), Some(rts.map_into()), rx_buffer, config) + } + + fn new_inner( + irq: impl Peripheral

+ 'd, + mut rx: PeripheralRef<'d, AnyPin>, + mut rts: Option>, + rx_buffer: &'d mut [u8], + config: Config, + ) -> Self { + into_ref!(irq); + super::Uart::<'d, T, Async>::init( + None, + Some(rx.reborrow()), + rts.as_mut().map(|x| x.reborrow()), + None, + config, + ); + + let state = T::state(); + let regs = T::regs(); + + let len = rx_buffer.len(); + unsafe { state.rx_buf.init(rx_buffer.as_mut_ptr(), len) }; - let r = T::regs(); unsafe { - r.uartimsc().modify(|w| { + regs.uartimsc().modify(|w| { w.set_rxim(true); w.set_rtim(true); }); } - Self { - inner: PeripheralMutex::new(irq, &mut state.0, move || RxStateInner { - phantom: PhantomData, + irq.set_handler(on_interrupt::); + irq.unpend(); + irq.enable(); - buf: RingBuffer::new(rx_buffer), - waker: WakerRegistration::new(), - }), - } + Self { phantom: PhantomData } + } + + fn read<'a>(buf: &'a mut [u8]) -> impl Future> + 'a { + poll_fn(move |cx| { + let state = T::state(); + let mut rx_reader = unsafe { state.rx_buf.reader() }; + let n = rx_reader.pop(|data| { + let n = data.len().min(buf.len()); + buf[..n].copy_from_slice(&data[..n]); + n + }); + if n == 0 { + state.rx_waker.register(cx.waker()); + return Poll::Pending; + } + + Poll::Ready(Ok(n)) + }) + } + + fn fill_buf<'a>() -> impl Future> { + poll_fn(move |cx| { + let state = T::state(); + let mut rx_reader = unsafe { state.rx_buf.reader() }; + let (p, n) = rx_reader.pop_buf(); + if n == 0 { + state.rx_waker.register(cx.waker()); + return Poll::Pending; + } + + let buf = unsafe { slice::from_raw_parts(p, n) }; + Poll::Ready(Ok(buf)) + }) + } + + fn consume(amt: usize) { + let state = T::state(); + let mut rx_reader = unsafe { state.rx_buf.reader() }; + rx_reader.pop_done(amt) } } impl<'d, T: Instance> BufferedUartTx<'d, T> { - pub fn new( - state: &'d mut TxState<'d, T>, - _uart: UartTx<'d, T, M>, + pub fn new( + _uart: impl Peripheral

+ 'd, irq: impl Peripheral

+ 'd, + tx: impl Peripheral

> + 'd, tx_buffer: &'d mut [u8], - ) -> BufferedUartTx<'d, T> { - into_ref!(irq); + config: Config, + ) -> Self { + into_ref!(tx); + Self::new_inner(irq, tx.map_into(), None, tx_buffer, config) + } + + pub fn new_with_cts( + _uart: impl Peripheral

+ 'd, + irq: impl Peripheral

+ 'd, + tx: impl Peripheral

> + 'd, + cts: impl Peripheral

> + 'd, + tx_buffer: &'d mut [u8], + config: Config, + ) -> Self { + into_ref!(tx, cts); + Self::new_inner(irq, tx.map_into(), Some(cts.map_into()), tx_buffer, config) + } + + fn new_inner( + irq: impl Peripheral

+ 'd, + mut tx: PeripheralRef<'d, AnyPin>, + mut cts: Option>, + tx_buffer: &'d mut [u8], + config: Config, + ) -> Self { + into_ref!(irq); + super::Uart::<'d, T, Async>::init( + Some(tx.reborrow()), + None, + None, + cts.as_mut().map(|x| x.reborrow()), + config, + ); + + let state = T::state(); + let regs = T::regs(); + + let len = tx_buffer.len(); + unsafe { state.tx_buf.init(tx_buffer.as_mut_ptr(), len) }; - let r = T::regs(); unsafe { - r.uartimsc().modify(|w| { + regs.uartimsc().modify(|w| { w.set_txim(true); }); } - Self { - inner: PeripheralMutex::new(irq, &mut state.0, move || TxStateInner { - phantom: PhantomData, + irq.set_handler(on_interrupt::); + irq.unpend(); + irq.enable(); - buf: RingBuffer::new(tx_buffer), - waker: WakerRegistration::new(), - }), - } - } -} - -impl<'d, T: Instance> PeripheralState for FullStateInner<'d, T> -where - Self: 'd, -{ - type Interrupt = T::Interrupt; - fn on_interrupt(&mut self) { - self.rx.on_interrupt(); - self.tx.on_interrupt(); - } -} - -impl<'d, T: Instance> RxStateInner<'d, T> -where - Self: 'd, -{ - fn read(&mut self, buf: &mut [u8], waker: &Waker) -> (Poll>, bool) { - // We have data ready in buffer? Return it. - let mut do_pend = false; - let data = self.buf.pop_buf(); - if !data.is_empty() { - let len = data.len().min(buf.len()); - buf[..len].copy_from_slice(&data[..len]); - - if self.buf.is_full() { - do_pend = true; - } - self.buf.pop(len); - - return (Poll::Ready(Ok(len)), do_pend); - } - - self.waker.register(waker); - (Poll::Pending, do_pend) + Self { phantom: PhantomData } } - fn fill_buf<'a>(&mut self, waker: &Waker) -> Poll> { - // We have data ready in buffer? Return it. - let buf = self.buf.pop_buf(); - if !buf.is_empty() { - let buf: &[u8] = buf; - // Safety: buffer lives as long as uart - let buf: &[u8] = unsafe { core::mem::transmute(buf) }; - return Poll::Ready(Ok(buf)); - } - - self.waker.register(waker); - Poll::Pending - } - - fn consume(&mut self, amt: usize) -> bool { - let full = self.buf.is_full(); - self.buf.pop(amt); - full - } -} - -impl<'d, T: Instance> PeripheralState for RxStateInner<'d, T> -where - Self: 'd, -{ - type Interrupt = T::Interrupt; - fn on_interrupt(&mut self) { - let r = T::regs(); - unsafe { - let ris = r.uartris().read(); - // Clear interrupt flags - r.uarticr().modify(|w| { - w.set_rxic(true); - w.set_rtic(true); + fn write<'a>(buf: &'a [u8]) -> impl Future> + 'a { + poll_fn(move |cx| { + let state = T::state(); + let mut tx_writer = unsafe { state.tx_buf.writer() }; + let n = tx_writer.push(|data| { + let n = data.len().min(buf.len()); + data[..n].copy_from_slice(&buf[..n]); + n }); - - if ris.peris() { - warn!("Parity error"); - r.uarticr().modify(|w| { - w.set_peic(true); - }); - } - if ris.feris() { - warn!("Framing error"); - r.uarticr().modify(|w| { - w.set_feic(true); - }); - } - if ris.beris() { - warn!("Break error"); - r.uarticr().modify(|w| { - w.set_beic(true); - }); - } - if ris.oeris() { - warn!("Overrun error"); - r.uarticr().modify(|w| { - w.set_oeic(true); - }); - } - - if !r.uartfr().read().rxfe() { - let buf = self.buf.push_buf(); - if !buf.is_empty() { - buf[0] = r.uartdr().read().data(); - self.buf.push(1); - } else { - warn!("RX buffer full, discard received byte"); - } - - if self.buf.is_full() { - self.waker.wake(); - } - } - - if ris.rtris() { - self.waker.wake(); - }; - } - } -} - -impl<'d, T: Instance> TxStateInner<'d, T> -where - Self: 'd, -{ - fn write(&mut self, buf: &[u8], waker: &Waker) -> (Poll>, bool) { - let empty = self.buf.is_empty(); - let tx_buf = self.buf.push_buf(); - if tx_buf.is_empty() { - self.waker.register(waker); - return (Poll::Pending, empty); - } - - let n = core::cmp::min(tx_buf.len(), buf.len()); - tx_buf[..n].copy_from_slice(&buf[..n]); - self.buf.push(n); - - (Poll::Ready(Ok(n)), empty) - } - - fn flush(&mut self, waker: &Waker) -> Poll> { - if !self.buf.is_empty() { - self.waker.register(waker); - return Poll::Pending; - } - - Poll::Ready(Ok(())) - } -} - -impl<'d, T: Instance> PeripheralState for TxStateInner<'d, T> -where - Self: 'd, -{ - type Interrupt = T::Interrupt; - fn on_interrupt(&mut self) { - let r = T::regs(); - unsafe { - let buf = self.buf.pop_buf(); - if !buf.is_empty() { - r.uartimsc().modify(|w| { - w.set_txim(true); - }); - r.uartdr().write(|w| w.set_data(buf[0].into())); - self.buf.pop(1); - self.waker.wake(); + if n == 0 { + state.tx_waker.register(cx.waker()); + return Poll::Pending; } else { - // Disable interrupt until we have something to transmit again - r.uartimsc().modify(|w| { - w.set_txim(false); - }); + unsafe { T::Interrupt::steal() }.pend(); } + + Poll::Ready(Ok(n)) + }) + } + + fn flush() -> impl Future> { + poll_fn(move |cx| { + let state = T::state(); + if !state.tx_buf.is_empty() { + state.tx_waker.register(cx.waker()); + return Poll::Pending; + } + + Poll::Ready(Ok(())) + }) + } +} + +impl<'d, T: Instance> Drop for BufferedUart<'d, T> { + fn drop(&mut self) { + unsafe { + T::Interrupt::steal().disable(); + let state = T::state(); + state.tx_buf.deinit(); + state.rx_buf.deinit(); + } + } +} + +impl<'d, T: Instance> Drop for BufferedUartRx<'d, T> { + fn drop(&mut self) { + unsafe { + T::Interrupt::steal().disable(); + let state = T::state(); + state.tx_buf.deinit(); + state.rx_buf.deinit(); + } + } +} + +impl<'d, T: Instance> Drop for BufferedUartTx<'d, T> { + fn drop(&mut self) { + unsafe { + T::Interrupt::steal().disable(); + let state = T::state(); + state.tx_buf.deinit(); + state.rx_buf.deinit(); + } + } +} + +pub(crate) unsafe fn on_interrupt(_: *mut ()) { + trace!("on_interrupt"); + + let r = T::regs(); + let s = T::state(); + + unsafe { + // RX + + let ris = r.uartris().read(); + // Clear interrupt flags + r.uarticr().write(|w| { + w.set_rxic(true); + w.set_rtic(true); + }); + + if ris.peris() { + warn!("Parity error"); + r.uarticr().write(|w| { + w.set_peic(true); + }); + } + if ris.feris() { + warn!("Framing error"); + r.uarticr().write(|w| { + w.set_feic(true); + }); + } + if ris.beris() { + warn!("Break error"); + r.uarticr().write(|w| { + w.set_beic(true); + }); + } + if ris.oeris() { + warn!("Overrun error"); + r.uarticr().write(|w| { + w.set_oeic(true); + }); + } + + let mut rx_writer = s.rx_buf.writer(); + if !r.uartfr().read().rxfe() { + let val = r.uartdr().read().data(); + if !rx_writer.push_one(val) { + warn!("RX buffer full, discard received byte"); + } + s.rx_waker.wake(); + } + + // TX + let mut tx_reader = s.tx_buf.reader(); + if let Some(val) = tx_reader.pop_one() { + r.uartimsc().modify(|w| { + w.set_txim(true); + }); + r.uartdr().write(|w| w.set_data(val)); + s.tx_waker.wake(); + } else { + // Disable interrupt until we have something to transmit again + r.uartimsc().modify(|w| { + w.set_txim(false); + }); } } } @@ -356,108 +440,52 @@ impl<'d, T: Instance> embedded_io::Io for BufferedUartTx<'d, T> { impl<'d, T: Instance + 'd> embedded_io::asynch::Read for BufferedUart<'d, T> { async fn read(&mut self, buf: &mut [u8]) -> Result { - poll_fn(move |cx| { - let (res, do_pend) = self.inner.with(|state| { - compiler_fence(Ordering::SeqCst); - state.rx.read(buf, cx.waker()) - }); - - if do_pend { - self.inner.pend(); - } - - res - }) - .await + BufferedUartRx::<'d, T>::read(buf).await } } impl<'d, T: Instance + 'd> embedded_io::asynch::Read for BufferedUartRx<'d, T> { async fn read(&mut self, buf: &mut [u8]) -> Result { - poll_fn(move |cx| { - let (res, do_pend) = self.inner.with(|state| { - compiler_fence(Ordering::SeqCst); - state.read(buf, cx.waker()) - }); - - if do_pend { - self.inner.pend(); - } - - res - }) - .await + Self::read(buf).await } } impl<'d, T: Instance + 'd> embedded_io::asynch::BufRead for BufferedUart<'d, T> { async fn fill_buf(&mut self) -> Result<&[u8], Self::Error> { - poll_fn(move |cx| { - self.inner.with(|state| { - compiler_fence(Ordering::SeqCst); - state.rx.fill_buf(cx.waker()) - }) - }) - .await + BufferedUartRx::<'d, T>::fill_buf().await } fn consume(&mut self, amt: usize) { - let signal = self.inner.with(|state| state.rx.consume(amt)); - if signal { - self.inner.pend(); - } + BufferedUartRx::<'d, T>::consume(amt) } } impl<'d, T: Instance + 'd> embedded_io::asynch::BufRead for BufferedUartRx<'d, T> { async fn fill_buf(&mut self) -> Result<&[u8], Self::Error> { - poll_fn(move |cx| { - self.inner.with(|state| { - compiler_fence(Ordering::SeqCst); - state.fill_buf(cx.waker()) - }) - }) - .await + Self::fill_buf().await } fn consume(&mut self, amt: usize) { - let signal = self.inner.with(|state| state.consume(amt)); - if signal { - self.inner.pend(); - } + Self::consume(amt) } } impl<'d, T: Instance + 'd> embedded_io::asynch::Write for BufferedUart<'d, T> { async fn write(&mut self, buf: &[u8]) -> Result { - poll_fn(move |cx| { - let (poll, empty) = self.inner.with(|state| state.tx.write(buf, cx.waker())); - if empty { - self.inner.pend(); - } - poll - }) - .await + BufferedUartTx::<'d, T>::write(buf).await } async fn flush(&mut self) -> Result<(), Self::Error> { - poll_fn(move |cx| self.inner.with(|state| state.tx.flush(cx.waker()))).await + BufferedUartTx::<'d, T>::flush().await } } impl<'d, T: Instance + 'd> embedded_io::asynch::Write for BufferedUartTx<'d, T> { async fn write(&mut self, buf: &[u8]) -> Result { - poll_fn(move |cx| { - let (poll, empty) = self.inner.with(|state| state.write(buf, cx.waker())); - if empty { - self.inner.pend(); - } - poll - }) - .await + Self::write(buf).await } async fn flush(&mut self) -> Result<(), Self::Error> { - poll_fn(move |cx| self.inner.with(|state| state.flush(cx.waker()))).await + Self::flush().await } } diff --git a/embassy-rp/src/uart/mod.rs b/embassy-rp/src/uart/mod.rs index 56c25e18..7e7bcaf3 100644 --- a/embassy-rp/src/uart/mod.rs +++ b/embassy-rp/src/uart/mod.rs @@ -7,6 +7,11 @@ use crate::gpio::sealed::Pin; use crate::gpio::AnyPin; use crate::{pac, peripherals, Peripheral}; +#[cfg(feature = "nightly")] +mod buffered; +#[cfg(feature = "nightly")] +pub use buffered::{BufferedUart, BufferedUartRx, BufferedUartTx}; + #[derive(Clone, Copy, PartialEq, Eq, Debug)] pub enum DataBits { DataBits5, @@ -196,7 +201,7 @@ impl<'d, T: Instance> Uart<'d, T, Blocking> { config: Config, ) -> Self { into_ref!(tx, rx); - Self::new_inner(uart, rx.map_into(), tx.map_into(), None, None, None, None, config) + Self::new_inner(uart, tx.map_into(), rx.map_into(), None, None, None, None, config) } /// Create a new UART with hardware flow control (RTS/CTS) @@ -211,8 +216,8 @@ impl<'d, T: Instance> Uart<'d, T, Blocking> { into_ref!(tx, rx, cts, rts); Self::new_inner( uart, - rx.map_into(), tx.map_into(), + rx.map_into(), Some(rts.map_into()), Some(cts.map_into()), None, @@ -235,8 +240,8 @@ impl<'d, T: Instance> Uart<'d, T, Async> { into_ref!(tx, rx, tx_dma, rx_dma); Self::new_inner( uart, - rx.map_into(), tx.map_into(), + rx.map_into(), None, None, Some(tx_dma.map_into()), @@ -259,8 +264,8 @@ impl<'d, T: Instance> Uart<'d, T, Async> { into_ref!(tx, rx, cts, rts, tx_dma, rx_dma); Self::new_inner( uart, - rx.map_into(), tx.map_into(), + rx.map_into(), Some(rts.map_into()), Some(cts.map_into()), Some(tx_dma.map_into()), @@ -273,41 +278,52 @@ impl<'d, T: Instance> Uart<'d, T, Async> { impl<'d, T: Instance, M: Mode> Uart<'d, T, M> { fn new_inner( _uart: impl Peripheral

+ 'd, - tx: PeripheralRef<'d, AnyPin>, - rx: PeripheralRef<'d, AnyPin>, - rts: Option>, - cts: Option>, + mut tx: PeripheralRef<'d, AnyPin>, + mut rx: PeripheralRef<'d, AnyPin>, + mut rts: Option>, + mut cts: Option>, tx_dma: Option>, rx_dma: Option>, config: Config, ) -> Self { - into_ref!(_uart); + Self::init( + Some(tx.reborrow()), + Some(rx.reborrow()), + rts.as_mut().map(|x| x.reborrow()), + cts.as_mut().map(|x| x.reborrow()), + config, + ); + Self { + tx: UartTx::new(tx_dma), + rx: UartRx::new(rx_dma), + } + } + + fn init( + tx: Option>, + rx: Option>, + rts: Option>, + cts: Option>, + config: Config, + ) { + let r = T::regs(); unsafe { - let r = T::regs(); - - tx.io().ctrl().write(|w| w.set_funcsel(2)); - rx.io().ctrl().write(|w| w.set_funcsel(2)); - - tx.pad_ctrl().write(|w| { - w.set_ie(true); - }); - - rx.pad_ctrl().write(|w| { - w.set_ie(true); - }); - + if let Some(pin) = &tx { + pin.io().ctrl().write(|w| w.set_funcsel(2)); + pin.pad_ctrl().write(|w| w.set_ie(true)); + } + if let Some(pin) = &rx { + pin.io().ctrl().write(|w| w.set_funcsel(2)); + pin.pad_ctrl().write(|w| w.set_ie(true)); + } if let Some(pin) = &cts { pin.io().ctrl().write(|w| w.set_funcsel(2)); - pin.pad_ctrl().write(|w| { - w.set_ie(true); - }); + pin.pad_ctrl().write(|w| w.set_ie(true)); } if let Some(pin) = &rts { pin.io().ctrl().write(|w| w.set_funcsel(2)); - pin.pad_ctrl().write(|w| { - w.set_ie(true); - }); + pin.pad_ctrl().write(|w| w.set_ie(true)); } let clk_base = crate::clocks::clk_peri_freq(); @@ -359,11 +375,6 @@ impl<'d, T: Instance, M: Mode> Uart<'d, T, M> { w.set_rtsen(rts.is_some()); }); } - - Self { - tx: UartTx::new(tx_dma), - rx: UartRx::new(rx_dma), - } } } @@ -611,11 +622,6 @@ mod eha { } } -#[cfg(feature = "nightly")] -mod buffered; -#[cfg(feature = "nightly")] -pub use buffered::*; - mod sealed { use super::*; @@ -628,6 +634,9 @@ mod sealed { type Interrupt: crate::interrupt::Interrupt; fn regs() -> pac::uart::Uart; + + #[cfg(feature = "nightly")] + fn state() -> &'static buffered::State; } pub trait TxPin {} pub trait RxPin {} @@ -663,6 +672,12 @@ macro_rules! impl_instance { fn regs() -> pac::uart::Uart { pac::$inst } + + #[cfg(feature = "nightly")] + fn state() -> &'static buffered::State { + static STATE: buffered::State = buffered::State::new(); + &STATE + } } impl Instance for peripherals::$inst {} }; diff --git a/tests/rp/src/bin/uart_buffered.rs b/tests/rp/src/bin/uart_buffered.rs index 9cc20bb9..bea9283e 100644 --- a/tests/rp/src/bin/uart_buffered.rs +++ b/tests/rp/src/bin/uart_buffered.rs @@ -5,7 +5,7 @@ use defmt::{assert_eq, *}; use embassy_executor::Spawner; use embassy_rp::interrupt; -use embassy_rp::uart::{BufferedUart, Config, State, Uart}; +use embassy_rp::uart::{BufferedUart, Config}; use embedded_io::asynch::{Read, Write}; use {defmt_rtt as _, panic_probe as _}; @@ -17,25 +17,22 @@ async fn main(_spawner: Spawner) { let (tx, rx, uart) = (p.PIN_0, p.PIN_1, p.UART0); let config = Config::default(); - let uart = Uart::new_blocking(uart, tx, rx, config); - let irq = interrupt::take!(UART0_IRQ); let tx_buf = &mut [0u8; 16]; let rx_buf = &mut [0u8; 16]; - let mut state = State::new(); - let mut uart = BufferedUart::new(&mut state, uart, irq, tx_buf, rx_buf); + let mut uart = BufferedUart::new(uart, irq, tx, rx, tx_buf, rx_buf, config); // Make sure we send more bytes than fits in the FIFO, to test the actual // bufferedUart. let data = [ - 1_u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, - 30, 31, 32, + 1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, + 30, 31, ]; uart.write_all(&data).await.unwrap(); info!("Done writing"); - let mut buf = [0; 32]; + let mut buf = [0; 31]; uart.read_exact(&mut buf).await.unwrap(); assert_eq!(buf, data);