From b3dfd06dd6da3369813cf469a7fcd87c22047e87 Mon Sep 17 00:00:00 2001 From: Mathias Date: Wed, 21 Sep 2022 06:00:35 +0200 Subject: [PATCH] Remove code-duplication in async bufferedUart implementations --- embassy-rp/src/uart/buffered.rs | 215 +++++++++++++------------------- 1 file changed, 89 insertions(+), 126 deletions(-) diff --git a/embassy-rp/src/uart/buffered.rs b/embassy-rp/src/uart/buffered.rs index 3eb96e3d..6d395b6f 100644 --- a/embassy-rp/src/uart/buffered.rs +++ b/embassy-rp/src/uart/buffered.rs @@ -1,5 +1,5 @@ use core::future::Future; -use core::task::Poll; +use core::task::{Poll, Waker}; use atomic_polyfill::{compiler_fence, Ordering}; use embassy_cortex_m::peripheral::{PeripheralMutex, PeripheralState, StateStorage}; @@ -87,9 +87,9 @@ impl<'d, T: Instance> BufferedUart<'d, T> { let r = T::regs(); unsafe { r.uartimsc().modify(|w| { - // TODO: Should and more or fewer interrupts be enabled? w.set_rxim(true); w.set_rtim(true); + w.set_txim(true); }); } @@ -122,7 +122,6 @@ impl<'d, T: Instance> RxBufferedUart<'d, T> { let r = T::regs(); unsafe { r.uartimsc().modify(|w| { - // TODO: Should and more or fewer interrupts be enabled? w.set_rxim(true); w.set_rtim(true); }); @@ -151,9 +150,7 @@ impl<'d, T: Instance> TxBufferedUart<'d, T> { let r = T::regs(); unsafe { r.uartimsc().modify(|w| { - // TODO: Should and more or fewer interrupts be enabled? - w.set_rxim(true); - w.set_rtim(true); + w.set_txim(true); }); } @@ -179,6 +176,51 @@ where } } +impl<'d, T: Instance> RxStateInner<'d, T> +where + Self: 'd, +{ + fn read(&mut self, buf: &mut [u8], waker: &Waker) -> (Poll>, bool) { + // We have data ready in buffer? Return it. + let mut do_pend = false; + let data = self.buf.pop_buf(); + if !data.is_empty() { + let len = data.len().min(buf.len()); + buf[..len].copy_from_slice(&data[..len]); + + if self.buf.is_full() { + do_pend = true; + } + self.buf.pop(len); + + return (Poll::Ready(Ok(len)), do_pend); + } + + self.waker.register(waker); + (Poll::Pending, do_pend) + } + + fn fill_buf<'a>(&mut self, waker: &Waker) -> Poll> { + // We have data ready in buffer? Return it. + let buf = self.buf.pop_buf(); + if !buf.is_empty() { + let buf: &[u8] = buf; + // Safety: buffer lives as long as uart + let buf: &[u8] = unsafe { core::mem::transmute(buf) }; + return Poll::Ready(Ok(buf)); + } + + self.waker.register(waker); + Poll::Pending + } + + fn consume(&mut self, amt: usize) -> bool { + let full = self.buf.is_full(); + self.buf.pop(amt); + full + } +} + impl<'d, T: Instance> PeripheralState for RxStateInner<'d, T> where Self: 'd, @@ -240,6 +282,35 @@ where } } +impl<'d, T: Instance> TxStateInner<'d, T> +where + Self: 'd, +{ + fn write(&mut self, buf: &[u8], waker: &Waker) -> (Poll>, bool) { + let empty = self.buf.is_empty(); + let tx_buf = self.buf.push_buf(); + if tx_buf.is_empty() { + self.waker.register(waker); + return (Poll::Pending, empty); + } + + let n = core::cmp::min(tx_buf.len(), buf.len()); + tx_buf[..n].copy_from_slice(&buf[..n]); + self.buf.push(n); + + (Poll::Ready(Ok(n)), empty) + } + + fn flush(&mut self, waker: &Waker) -> Poll> { + if !self.buf.is_empty() { + self.waker.register(waker); + return Poll::Pending; + } + + Poll::Ready(Ok(())) + } +} + impl<'d, T: Instance> PeripheralState for TxStateInner<'d, T> where Self: 'd, @@ -299,26 +370,9 @@ impl<'d, T: Instance + 'd> embedded_io::asynch::Read for BufferedUart<'d, T> { fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { poll_fn(move |cx| { - let mut do_pend = false; - let res = self.inner.with(|state| { + let (res, do_pend) = self.inner.with(|state| { compiler_fence(Ordering::SeqCst); - - // We have data ready in buffer? Return it. - let data = state.rx.buf.pop_buf(); - if !data.is_empty() { - let len = data.len().min(buf.len()); - buf[..len].copy_from_slice(&data[..len]); - - if state.rx.buf.is_full() { - do_pend = true; - } - state.rx.buf.pop(len); - - return Poll::Ready(Ok(len)); - } - - state.rx.waker.register(cx.waker()); - Poll::Pending + state.rx.read(buf, cx.waker()) }); if do_pend { @@ -337,26 +391,9 @@ impl<'d, T: Instance + 'd> embedded_io::asynch::Read for RxBufferedUart<'d, T> { fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { poll_fn(move |cx| { - let mut do_pend = false; - let res = self.inner.with(|state| { + let (res, do_pend) = self.inner.with(|state| { compiler_fence(Ordering::SeqCst); - - // We have data ready in buffer? Return it. - let data = state.buf.pop_buf(); - if !data.is_empty() { - let len = data.len().min(buf.len()); - buf[..len].copy_from_slice(&data[..len]); - - if state.buf.is_full() { - do_pend = true; - } - state.buf.pop(len); - - return Poll::Ready(Ok(len)); - } - - state.waker.register(cx.waker()); - Poll::Pending + state.read(buf, cx.waker()) }); if do_pend { @@ -377,28 +414,13 @@ impl<'d, T: Instance + 'd> embedded_io::asynch::BufRead for BufferedUart<'d, T> poll_fn(move |cx| { self.inner.with(|state| { compiler_fence(Ordering::SeqCst); - - // We have data ready in buffer? Return it. - let buf = state.rx.buf.pop_buf(); - if !buf.is_empty() { - let buf: &[u8] = buf; - // Safety: buffer lives as long as uart - let buf: &[u8] = unsafe { core::mem::transmute(buf) }; - return Poll::Ready(Ok(buf)); - } - - state.rx.waker.register(cx.waker()); - Poll::>::Pending + state.rx.fill_buf(cx.waker()) }) }) } fn consume(&mut self, amt: usize) { - let signal = self.inner.with(|state| { - let full = state.rx.buf.is_full(); - state.rx.buf.pop(amt); - full - }); + let signal = self.inner.with(|state| state.rx.consume(amt)); if signal { self.inner.pend(); } @@ -414,28 +436,13 @@ impl<'d, T: Instance + 'd> embedded_io::asynch::BufRead for RxBufferedUart<'d, T poll_fn(move |cx| { self.inner.with(|state| { compiler_fence(Ordering::SeqCst); - - // We have data ready in buffer? Return it. - let buf = state.buf.pop_buf(); - if !buf.is_empty() { - let buf: &[u8] = buf; - // Safety: buffer lives as long as uart - let buf: &[u8] = unsafe { core::mem::transmute(buf) }; - return Poll::Ready(Ok(buf)); - } - - state.waker.register(cx.waker()); - Poll::>::Pending + state.fill_buf(cx.waker()) }) }) } fn consume(&mut self, amt: usize) { - let signal = self.inner.with(|state| { - let full = state.buf.is_full(); - state.buf.pop(amt); - full - }); + let signal = self.inner.with(|state| state.consume(amt)); if signal { self.inner.pend(); } @@ -449,20 +456,7 @@ impl<'d, T: Instance + 'd> embedded_io::asynch::Write for BufferedUart<'d, T> { fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { poll_fn(move |cx| { - let (poll, empty) = self.inner.with(|state| { - let empty = state.tx.buf.is_empty(); - let tx_buf = state.tx.buf.push_buf(); - if tx_buf.is_empty() { - state.tx.waker.register(cx.waker()); - return (Poll::Pending, empty); - } - - let n = core::cmp::min(tx_buf.len(), buf.len()); - tx_buf[..n].copy_from_slice(&buf[..n]); - state.tx.buf.push(n); - - (Poll::Ready(Ok(n)), empty) - }); + let (poll, empty) = self.inner.with(|state| state.tx.write(buf, cx.waker())); if empty { self.inner.pend(); } @@ -475,16 +469,7 @@ impl<'d, T: Instance + 'd> embedded_io::asynch::Write for BufferedUart<'d, T> { Self: 'a; fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { - poll_fn(move |cx| { - self.inner.with(|state| { - if !state.tx.buf.is_empty() { - state.tx.waker.register(cx.waker()); - return Poll::Pending; - } - - Poll::Ready(Ok(())) - }) - }) + poll_fn(move |cx| self.inner.with(|state| state.tx.flush(cx.waker()))) } } @@ -495,20 +480,7 @@ impl<'d, T: Instance + 'd> embedded_io::asynch::Write for TxBufferedUart<'d, T> fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { poll_fn(move |cx| { - let (poll, empty) = self.inner.with(|state| { - let empty = state.buf.is_empty(); - let tx_buf = state.buf.push_buf(); - if tx_buf.is_empty() { - state.waker.register(cx.waker()); - return (Poll::Pending, empty); - } - - let n = core::cmp::min(tx_buf.len(), buf.len()); - tx_buf[..n].copy_from_slice(&buf[..n]); - state.buf.push(n); - - (Poll::Ready(Ok(n)), empty) - }); + let (poll, empty) = self.inner.with(|state| state.write(buf, cx.waker())); if empty { self.inner.pend(); } @@ -521,15 +493,6 @@ impl<'d, T: Instance + 'd> embedded_io::asynch::Write for TxBufferedUart<'d, T> Self: 'a; fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { - poll_fn(move |cx| { - self.inner.with(|state| { - if !state.buf.is_empty() { - state.waker.register(cx.waker()); - return Poll::Pending; - } - - Poll::Ready(Ok(())) - }) - }) + poll_fn(move |cx| self.inner.with(|state| state.flush(cx.waker()))) } }