diff --git a/embassy-nrf/src/buffered_uarte.rs b/embassy-nrf/src/buffered_uarte.rs index 62af544a..1bb75414 100644 --- a/embassy-nrf/src/buffered_uarte.rs +++ b/embassy-nrf/src/buffered_uarte.rs @@ -13,6 +13,7 @@ //! //! Please also see [crate::uarte] to understand when [BufferedUarte] should be used. +use core::cell::RefCell; use core::cmp::min; use core::future::Future; use core::sync::atomic::{compiler_fence, Ordering}; @@ -71,7 +72,7 @@ struct StateInner<'d, U: UarteInstance, T: TimerInstance> { /// Interface to a UARTE instance pub struct BufferedUarte<'d, U: UarteInstance, T: TimerInstance> { - inner: PeripheralMutex<'d, StateInner<'d, U, T>>, + inner: RefCell>>, } impl<'d, U: UarteInstance, T: TimerInstance> Unpin for BufferedUarte<'d, U, T> {} @@ -169,7 +170,7 @@ impl<'d, U: UarteInstance, T: TimerInstance> BufferedUarte<'d, U, T> { ppi_ch2.enable(); Self { - inner: PeripheralMutex::new(irq, &mut state.0, move || StateInner { + inner: RefCell::new(PeripheralMutex::new(irq, &mut state.0, move || StateInner { _peri: peri, timer, _ppi_ch1: ppi_ch1, @@ -182,13 +183,13 @@ impl<'d, U: UarteInstance, T: TimerInstance> BufferedUarte<'d, U, T> { tx: RingBuffer::new(tx_buffer), tx_state: TxState::Idle, tx_waker: WakerRegistration::new(), - }), + })), } } /// Adjust the baud rate to the provided value. pub fn set_baudrate(&mut self, baudrate: Baudrate) { - self.inner.with(|state| { + self.inner.borrow_mut().with(|state| { let r = U::regs(); let timeout = 0x8000_0000 / (baudrate as u32 / 40); @@ -198,12 +199,35 @@ impl<'d, U: UarteInstance, T: TimerInstance> BufferedUarte<'d, U, T> { r.baudrate.write(|w| w.baudrate().variant(baudrate)); }); } + + pub fn split<'u>(&'u mut self) -> (BufferedUarteRx<'u, 'd, U, T>, BufferedUarteTx<'u, 'd, U, T>) { + ( + BufferedUarteRx { inner: &self.inner }, + BufferedUarteTx { inner: &&self.inner }, + ) + } +} + +pub struct BufferedUarteTx<'u, 'd, U: UarteInstance, T: TimerInstance> { + inner: &'u RefCell>>, +} + +pub struct BufferedUarteRx<'u, 'd, U: UarteInstance, T: TimerInstance> { + inner: &'u RefCell>>, } impl<'d, U: UarteInstance, T: TimerInstance> embedded_io::Io for BufferedUarte<'d, U, T> { type Error = core::convert::Infallible; } +impl<'u, 'd, U: UarteInstance, T: TimerInstance> embedded_io::Io for BufferedUarteRx<'u, 'd, U, T> { + type Error = core::convert::Infallible; +} + +impl<'u, 'd, U: UarteInstance, T: TimerInstance> embedded_io::Io for BufferedUarteTx<'u, 'd, U, T> { + type Error = core::convert::Infallible; +} + impl<'d, U: UarteInstance, T: TimerInstance> embedded_io::asynch::Read for BufferedUarte<'d, U, T> { type ReadFuture<'a> = impl Future> where @@ -212,7 +236,7 @@ impl<'d, U: UarteInstance, T: TimerInstance> embedded_io::asynch::Read for Buffe fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { poll_fn(move |cx| { let mut do_pend = false; - let res = self.inner.with(|state| { + let res = self.inner.borrow_mut().with(|state| { compiler_fence(Ordering::SeqCst); trace!("poll_read"); @@ -232,7 +256,43 @@ impl<'d, U: UarteInstance, T: TimerInstance> embedded_io::asynch::Read for Buffe Poll::Pending }); if do_pend { - self.inner.pend(); + self.inner.borrow().pend(); + } + + res + }) + } +} + +impl<'u, 'd: 'u, U: UarteInstance, T: TimerInstance> embedded_io::asynch::Read for BufferedUarteRx<'u, 'd, U, T> { + type ReadFuture<'a> = impl Future> + where + Self: 'a; + + fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { + poll_fn(move |cx| { + let mut do_pend = false; + let res = self.inner.borrow_mut().with(|state| { + compiler_fence(Ordering::SeqCst); + trace!("poll_read"); + + // We have data ready in buffer? Return it. + let data = state.rx.pop_buf(); + if !data.is_empty() { + trace!(" got {:?} {:?}", data.as_ptr() as u32, data.len()); + let len = data.len().min(buf.len()); + buf[..len].copy_from_slice(&data[..len]); + state.rx.pop(len); + do_pend = true; + return Poll::Ready(Ok(len)); + } + + trace!(" empty"); + state.rx_waker.register(cx.waker()); + Poll::Pending + }); + if do_pend { + self.inner.borrow().pend(); } res @@ -247,7 +307,7 @@ impl<'d, U: UarteInstance, T: TimerInstance> embedded_io::asynch::BufRead for Bu fn fill_buf<'a>(&'a mut self) -> Self::FillBufFuture<'a> { poll_fn(move |cx| { - self.inner.with(|state| { + self.inner.borrow_mut().with(|state| { compiler_fence(Ordering::SeqCst); trace!("fill_buf"); @@ -269,13 +329,53 @@ impl<'d, U: UarteInstance, T: TimerInstance> embedded_io::asynch::BufRead for Bu } fn consume(&mut self, amt: usize) { - let signal = self.inner.with(|state| { + let signal = self.inner.borrow_mut().with(|state| { let full = state.rx.is_full(); state.rx.pop(amt); full }); if signal { - self.inner.pend(); + self.inner.borrow().pend(); + } + } +} + +impl<'u, 'd: 'u, U: UarteInstance, T: TimerInstance> embedded_io::asynch::BufRead for BufferedUarteRx<'u, 'd, U, T> { + type FillBufFuture<'a> = impl Future> + where + Self: 'a; + + fn fill_buf<'a>(&'a mut self) -> Self::FillBufFuture<'a> { + poll_fn(move |cx| { + self.inner.borrow_mut().with(|state| { + compiler_fence(Ordering::SeqCst); + trace!("fill_buf"); + + // We have data ready in buffer? Return it. + let buf = state.rx.pop_buf(); + if !buf.is_empty() { + trace!(" got {:?} {:?}", buf.as_ptr() as u32, buf.len()); + let buf: &[u8] = buf; + // Safety: buffer lives as long as uart + let buf: &[u8] = unsafe { core::mem::transmute(buf) }; + return Poll::Ready(Ok(buf)); + } + + trace!(" empty"); + state.rx_waker.register(cx.waker()); + Poll::>::Pending + }) + }) + } + + fn consume(&mut self, amt: usize) { + let signal = self.inner.borrow_mut().with(|state| { + let full = state.rx.is_full(); + state.rx.pop(amt); + full + }); + if signal { + self.inner.borrow().pend(); } } } @@ -287,7 +387,7 @@ impl<'d, U: UarteInstance, T: TimerInstance> embedded_io::asynch::Write for Buff fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { poll_fn(move |cx| { - let res = self.inner.with(|state| { + let res = self.inner.borrow_mut().with(|state| { trace!("poll_write: {:?}", buf.len()); let tx_buf = state.tx.push_buf(); @@ -308,7 +408,7 @@ impl<'d, U: UarteInstance, T: TimerInstance> embedded_io::asynch::Write for Buff Poll::Ready(Ok(n)) }); - self.inner.pend(); + self.inner.borrow_mut().pend(); res }) @@ -320,7 +420,62 @@ impl<'d, U: UarteInstance, T: TimerInstance> embedded_io::asynch::Write for Buff fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { poll_fn(move |cx| { - self.inner.with(|state| { + self.inner.borrow_mut().with(|state| { + trace!("poll_flush"); + + if !state.tx.is_empty() { + trace!("poll_flush: pending"); + state.tx_waker.register(cx.waker()); + return Poll::Pending; + } + + Poll::Ready(Ok(())) + }) + }) + } +} + +impl<'u, 'd: 'u, U: UarteInstance, T: TimerInstance> embedded_io::asynch::Write for BufferedUarteTx<'u, 'd, U, T> { + type WriteFuture<'a> = impl Future> + where + Self: 'a; + + fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { + poll_fn(move |cx| { + let res = self.inner.borrow_mut().with(|state| { + trace!("poll_write: {:?}", buf.len()); + + let tx_buf = state.tx.push_buf(); + if tx_buf.is_empty() { + trace!("poll_write: pending"); + state.tx_waker.register(cx.waker()); + return Poll::Pending; + } + + let n = min(tx_buf.len(), buf.len()); + tx_buf[..n].copy_from_slice(&buf[..n]); + state.tx.push(n); + + trace!("poll_write: queued {:?}", n); + + compiler_fence(Ordering::SeqCst); + + Poll::Ready(Ok(n)) + }); + + self.inner.borrow_mut().pend(); + + res + }) + } + + type FlushFuture<'a> = impl Future> + where + Self: 'a; + + fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { + poll_fn(move |cx| { + self.inner.borrow_mut().with(|state| { trace!("poll_flush"); if !state.tx.is_empty() {