diff --git a/embassy/src/io/util/drain.rs b/embassy/src/io/util/drain.rs new file mode 100644 index 00000000..d3f21d07 --- /dev/null +++ b/embassy/src/io/util/drain.rs @@ -0,0 +1,43 @@ +use core::iter::Iterator; +use core::pin::Pin; +use futures::future::Future; +use futures::ready; +use futures::task::{Context, Poll}; + +use super::super::error::{Error, Result}; +use super::super::traits::AsyncBufRead; + +pub struct Drain<'a, R: ?Sized> { + reader: &'a mut R, +} + +impl Unpin for Drain<'_, R> {} + +impl<'a, R: AsyncBufRead + ?Sized + Unpin> Drain<'a, R> { + pub(super) fn new(reader: &'a mut R) -> Self { + Self { reader } + } +} + +impl<'a, R: AsyncBufRead + ?Sized + Unpin> Future for Drain<'a, R> { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Self { reader } = &mut *self; + let mut reader = Pin::new(reader); + + let mut n = 0; + + loop { + match reader.as_mut().poll_fill_buf(cx) { + Poll::Pending => return Poll::Ready(Ok(n)), + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Ready(Ok(buf)) => { + let len = buf.len(); + n += len; + reader.as_mut().consume(len); + } + } + } + } +} diff --git a/embassy/src/io/util/mod.rs b/embassy/src/io/util/mod.rs index c95a23f0..a0bda6e3 100644 --- a/embassy/src/io/util/mod.rs +++ b/embassy/src/io/util/mod.rs @@ -24,6 +24,9 @@ pub use self::read_to_end::ReadToEnd; mod skip_while; pub use self::skip_while::SkipWhile; +mod drain; +pub use self::drain::Drain; + mod write; pub use self::write::Write; @@ -79,6 +82,13 @@ pub trait AsyncBufReadExt: AsyncBufRead { SkipWhile::new(self, f) } + fn drain<'a>(&'a mut self) -> Drain<'a, Self> + where + Self: Unpin, + { + Drain::new(self) + } + fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self> where Self: Unpin,