752: Replace embassy::io with embedded_io. r=Dirbaio a=Dirbaio

TODO:

- [x] Release embedded-io on crates.io
- [x] Remove git dep

Co-authored-by: Dario Nieuwenhuis <dirbaio@dirbaio.net>
This commit is contained in:
bors[bot]
2022-05-06 23:54:07 +00:00
committed by GitHub
58 changed files with 720 additions and 3086 deletions

View File

@ -1,142 +0,0 @@
/// Categories of errors that can occur.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
#[non_exhaustive]
pub enum Error {
/// An entity was not found, often a file.
NotFound,
/// The operation lacked the necessary privileges to complete.
PermissionDenied,
/// The connection was refused by the remote server.
ConnectionRefused,
/// The connection was reset by the remote server.
ConnectionReset,
/// The connection was aborted (terminated) by the remote server.
ConnectionAborted,
/// The network operation failed because it was not connected yet.
NotConnected,
/// A socket address could not be bound because the address is already in
/// use elsewhere.
AddrInUse,
/// A nonexistent interface was requested or the requested address was not
/// local.
AddrNotAvailable,
/// The operation failed because a pipe was closed.
BrokenPipe,
/// An entity already exists, often a file.
AlreadyExists,
/// The operation needs to block to complete, but the blocking operation was
/// requested to not occur.
WouldBlock,
/// A parameter was incorrect.
InvalidInput,
/// Data not valid for the operation were encountered.
///
/// Unlike [`InvalidInput`], this typically means that the operation
/// parameters were valid, however the error was caused by malformed
/// input data.
///
/// For example, a function that reads a file into a string will error with
/// `InvalidData` if the file's contents are not valid UTF-8.
///
/// [`InvalidInput`]: #variant.InvalidInput
InvalidData,
/// The I/O operation's timeout expired, causing it to be canceled.
TimedOut,
/// An error returned when an operation could not be completed because a
/// call to [`write`] returned [`Ok(0)`].
///
/// This typically means that an operation could only succeed if it wrote a
/// particular number of bytes but only a smaller number of bytes could be
/// written.
///
/// [`write`]: ../../std/io/trait.Write.html#tymethod.write
/// [`Ok(0)`]: ../../std/io/type.Result.html
WriteZero,
/// This operation was interrupted.
///
/// Interrupted operations can typically be retried.
Interrupted,
/// An error returned when an operation could not be completed because an
/// "end of file" was reached prematurely.
///
/// This typically means that an operation could only succeed if it read a
/// particular number of bytes but only a smaller number of bytes could be
/// read.
UnexpectedEof,
/// An operation would have read more data if the given buffer was large.
///
/// This typically means that the buffer has been filled with the first N bytes
/// of the read data.
Truncated,
/// Any I/O error not part of this list.
Other,
}
pub type Result<T> = core::result::Result<T, Error>;
#[cfg(feature = "std")]
impl From<std::io::Error> for Error {
fn from(err: std::io::Error) -> Error {
match err.kind() {
std::io::ErrorKind::NotFound => Error::NotFound,
std::io::ErrorKind::PermissionDenied => Error::PermissionDenied,
std::io::ErrorKind::ConnectionRefused => Error::ConnectionRefused,
std::io::ErrorKind::ConnectionReset => Error::ConnectionReset,
std::io::ErrorKind::ConnectionAborted => Error::ConnectionAborted,
std::io::ErrorKind::NotConnected => Error::NotConnected,
std::io::ErrorKind::AddrInUse => Error::AddrInUse,
std::io::ErrorKind::AddrNotAvailable => Error::AddrNotAvailable,
std::io::ErrorKind::BrokenPipe => Error::BrokenPipe,
std::io::ErrorKind::AlreadyExists => Error::AlreadyExists,
std::io::ErrorKind::WouldBlock => Error::WouldBlock,
std::io::ErrorKind::InvalidInput => Error::InvalidInput,
std::io::ErrorKind::InvalidData => Error::InvalidData,
std::io::ErrorKind::TimedOut => Error::TimedOut,
std::io::ErrorKind::WriteZero => Error::WriteZero,
std::io::ErrorKind::Interrupted => Error::Interrupted,
std::io::ErrorKind::UnexpectedEof => Error::UnexpectedEof,
_ => Error::Other,
}
}
}
#[cfg(feature = "std")]
impl From<Error> for std::io::Error {
fn from(e: Error) -> Self {
let kind = match e {
Error::NotFound => std::io::ErrorKind::NotFound,
Error::PermissionDenied => std::io::ErrorKind::PermissionDenied,
Error::ConnectionRefused => std::io::ErrorKind::ConnectionRefused,
Error::ConnectionReset => std::io::ErrorKind::ConnectionReset,
Error::ConnectionAborted => std::io::ErrorKind::ConnectionAborted,
Error::NotConnected => std::io::ErrorKind::NotConnected,
Error::AddrInUse => std::io::ErrorKind::AddrInUse,
Error::AddrNotAvailable => std::io::ErrorKind::AddrNotAvailable,
Error::BrokenPipe => std::io::ErrorKind::BrokenPipe,
Error::AlreadyExists => std::io::ErrorKind::AlreadyExists,
Error::WouldBlock => std::io::ErrorKind::WouldBlock,
Error::InvalidInput => std::io::ErrorKind::InvalidInput,
Error::InvalidData => std::io::ErrorKind::InvalidData,
Error::TimedOut => std::io::ErrorKind::TimedOut,
Error::WriteZero => std::io::ErrorKind::WriteZero,
Error::Interrupted => std::io::ErrorKind::Interrupted,
Error::UnexpectedEof => std::io::ErrorKind::UnexpectedEof,
Error::Truncated => std::io::ErrorKind::Other,
Error::Other => std::io::ErrorKind::Other,
};
std::io::Error::new(kind, "embassy::io::Error")
}
}
impl core::fmt::Display for Error {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "{:?}", self)
}
}
#[cfg(feature = "std")]
impl std::error::Error for Error {}

View File

@ -1,11 +0,0 @@
mod error;
#[cfg(feature = "std")]
mod std;
mod traits;
mod util;
pub use self::error::*;
#[cfg(feature = "std")]
pub use self::std::*;
pub use self::traits::*;
pub use self::util::*;

View File

@ -1,41 +0,0 @@
use core::pin::Pin;
use core::task::{Context, Poll};
use futures::io as std_io;
use super::{AsyncBufRead, AsyncWrite, Result};
pub struct FromStdIo<T>(T);
impl<T> FromStdIo<T> {
pub fn new(inner: T) -> Self {
Self(inner)
}
}
impl<T: std_io::AsyncBufRead> AsyncBufRead for FromStdIo<T> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
let Self(inner) = unsafe { self.get_unchecked_mut() };
unsafe { Pin::new_unchecked(inner) }
.poll_fill_buf(cx)
.map_err(|e| e.into())
}
fn consume(self: Pin<&mut Self>, amt: usize) {
let Self(inner) = unsafe { self.get_unchecked_mut() };
unsafe { Pin::new_unchecked(inner) }.consume(amt)
}
}
impl<T: std_io::AsyncWrite> AsyncWrite for FromStdIo<T> {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
let Self(inner) = unsafe { self.get_unchecked_mut() };
unsafe { Pin::new_unchecked(inner) }
.poll_write(cx, buf)
.map_err(|e| e.into())
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let Self(inner) = unsafe { self.get_unchecked_mut() };
unsafe { Pin::new_unchecked(inner) }
.poll_flush(cx)
.map_err(|e| e.into())
}
}

View File

@ -1,175 +0,0 @@
use core::ops::DerefMut;
use core::pin::Pin;
use core::task::{Context, Poll};
#[cfg(feature = "alloc")]
use alloc::boxed::Box;
use super::error::Result;
/// Read bytes asynchronously.
///
/// This trait is analogous to the `std::io::BufRead` trait, but integrates
/// with the asynchronous task system. In particular, the `poll_fill_buf`
/// method, unlike `BufRead::fill_buf`, will automatically queue the current task
/// for wakeup and return if data is not yet available, rather than blocking
/// the calling thread.
pub trait AsyncBufRead {
/// Attempt to return the contents of the internal buffer, filling it with more data
/// from the inner reader if it is empty.
///
/// On success, returns `Poll::Ready(Ok(buf))`.
///
/// If no data is available for reading, the method returns
/// `Poll::Pending` and arranges for the current task (via
/// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
/// readable or is closed.
///
/// This function is a lower-level call. It needs to be paired with the
/// [`consume`] method to function properly. When calling this
/// method, none of the contents will be "read" in the sense that later
/// calling [`poll_fill_buf`] may return the same contents. As such, [`consume`] must
/// be called with the number of bytes that are consumed from this buffer to
/// ensure that the bytes are never returned twice.
///
/// [`poll_fill_buf`]: AsyncBufRead::poll_fill_buf
/// [`consume`]: AsyncBufRead::consume
///
/// An empty buffer returned indicates that the stream has reached EOF.
///
/// # Implementation
///
/// This function may not return errors of kind `WouldBlock` or
/// `Interrupted`. Implementations must convert `WouldBlock` into
/// `Poll::Pending` and either internally retry or convert
/// `Interrupted` into another error kind.
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>>;
/// Tells this buffer that `amt` bytes have been consumed from the buffer,
/// so they should no longer be returned in calls to [`poll_fill_buf`].
///
/// This function is a lower-level call. It needs to be paired with the
/// [`poll_fill_buf`] method to function properly. This function does
/// not perform any I/O, it simply informs this object that some amount of
/// its buffer, returned from [`poll_fill_buf`], has been consumed and should
/// no longer be returned. As such, this function may do odd things if
/// [`poll_fill_buf`] isn't called before calling it.
///
/// The `amt` must be `<=` the number of bytes in the buffer returned by
/// [`poll_fill_buf`].
///
/// [`poll_fill_buf`]: AsyncBufRead::poll_fill_buf
fn consume(self: Pin<&mut Self>, amt: usize);
}
/// Write bytes asynchronously.
///
/// This trait is analogous to the `core::io::Write` trait, but integrates
/// with the asynchronous task system. In particular, the `poll_write`
/// method, unlike `Write::write`, will automatically queue the current task
/// for wakeup and return if the writer cannot take more data, rather than blocking
/// the calling thread.
pub trait AsyncWrite {
/// Attempt to write bytes from `buf` into the object.
///
/// On success, returns `Poll::Ready(Ok(num_bytes_written))`.
///
/// If the object is not ready for writing, the method returns
/// `Poll::Pending` and arranges for the current task (via
/// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
/// writable or is closed.
///
/// # Implementation
///
/// This function may not return errors of kind `WouldBlock` or
/// `Interrupted`. Implementations must convert `WouldBlock` into
/// `Poll::Pending` and either internally retry or convert
/// `Interrupted` into another error kind.
///
/// `poll_write` must try to make progress by flushing the underlying object if
/// that is the only way the underlying object can become writable again.
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>>;
/// Attempt to flush the object, ensuring that any buffered data reach their destination.
///
/// On success, returns Poll::Ready(Ok(())).
///
/// If flushing cannot immediately complete, this method returns [Poll::Pending] and arranges for the
/// current task (via cx.waker()) to receive a notification when the object can make progress
/// towards flushing.
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>;
}
macro_rules! defer_async_read {
() => {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
Pin::new(&mut **self.get_mut()).poll_fill_buf(cx)
}
fn consume(mut self: Pin<&mut Self>, amt: usize) {
Pin::new(&mut **self).consume(amt)
}
};
}
#[cfg(feature = "alloc")]
impl<T: ?Sized + AsyncBufRead + Unpin> AsyncBufRead for Box<T> {
defer_async_read!();
}
impl<T: ?Sized + AsyncBufRead + Unpin> AsyncBufRead for &mut T {
defer_async_read!();
}
impl<P> AsyncBufRead for Pin<P>
where
P: DerefMut + Unpin,
P::Target: AsyncBufRead,
{
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
self.get_mut().as_mut().poll_fill_buf(cx)
}
fn consume(self: Pin<&mut Self>, amt: usize) {
self.get_mut().as_mut().consume(amt)
}
}
macro_rules! deref_async_write {
() => {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize>> {
Pin::new(&mut **self).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
Pin::new(&mut **self).poll_flush(cx)
}
};
}
#[cfg(feature = "alloc")]
impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for Box<T> {
deref_async_write!();
}
impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for &mut T {
deref_async_write!();
}
impl<P> AsyncWrite for Pin<P>
where
P: DerefMut + Unpin,
P::Target: AsyncWrite,
{
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
self.get_mut().as_mut().poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.get_mut().as_mut().poll_flush(cx)
}
}

View File

@ -1,80 +0,0 @@
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use futures::ready;
use pin_project::pin_project;
use crate::io::{AsyncBufRead, AsyncWrite, Error, Result};
/// Creates a future which copies all the bytes from one object to another.
///
/// The returned future will copy all the bytes read from this `AsyncBufRead` into the
/// `writer` specified. This future will only complete once the `reader` has hit
/// EOF and all bytes have been written to and flushed from the `writer`
/// provided.
///
/// On success the number of bytes is returned.
///
/// # Examples
///
/// ``` ignore
/// # futures::executor::block_on(async {
/// use futures::io::{self, AsyncWriteExt, Cursor};
///
/// let reader = Cursor::new([1, 2, 3, 4]);
/// let mut writer = Cursor::new(vec![0u8; 5]);
///
/// let bytes = io::copy_buf(reader, &mut writer).await?;
/// writer.close().await?;
///
/// assert_eq!(bytes, 4);
/// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]);
/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
/// ```
pub fn copy_buf<R, W>(reader: R, writer: &mut W) -> CopyBuf<'_, R, W>
where
R: AsyncBufRead,
W: AsyncWrite + Unpin + ?Sized,
{
CopyBuf {
reader,
writer,
amt: 0,
}
}
/// Future for the [`copy_buf()`] function.
#[pin_project]
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct CopyBuf<'a, R, W: ?Sized> {
#[pin]
reader: R,
writer: &'a mut W,
amt: usize,
}
impl<R, W> Future for CopyBuf<'_, R, W>
where
R: AsyncBufRead,
W: AsyncWrite + Unpin + ?Sized,
{
type Output = Result<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
let buffer = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
if buffer.is_empty() {
return Poll::Ready(Ok(*this.amt));
}
let i = ready!(Pin::new(&mut this.writer).poll_write(cx, buffer))?;
if i == 0 {
return Poll::Ready(Err(Error::WriteZero));
}
*this.amt += i;
this.reader.as_mut().consume(i);
}
}
}

View File

@ -1,41 +0,0 @@
use core::pin::Pin;
use futures::future::Future;
use futures::task::{Context, Poll};
use super::super::error::Result;
use super::super::traits::AsyncBufRead;
pub struct Drain<'a, R: ?Sized> {
reader: &'a mut R,
}
impl<R: ?Sized + Unpin> Unpin for Drain<'_, R> {}
impl<'a, R: AsyncBufRead + ?Sized + Unpin> Drain<'a, R> {
pub(super) fn new(reader: &'a mut R) -> Self {
Self { reader }
}
}
impl<'a, R: AsyncBufRead + ?Sized + Unpin> Future for Drain<'a, R> {
type Output = Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self { reader } = &mut *self;
let mut reader = Pin::new(reader);
let mut n = 0;
loop {
match reader.as_mut().poll_fill_buf(cx) {
Poll::Pending => return Poll::Ready(Ok(n)),
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Ready(Ok(buf)) => {
let len = buf.len();
n += len;
reader.as_mut().consume(len);
}
}
}
}
}

View File

@ -1,32 +0,0 @@
use core::pin::Pin;
use futures::future::Future;
use futures::ready;
use futures::task::{Context, Poll};
use super::super::error::Result;
use super::super::traits::AsyncWrite;
/// Future for the [`flush`](super::AsyncWriteExt::flush) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Flush<'a, W: ?Sized> {
writer: &'a mut W,
}
impl<W: ?Sized + Unpin> Unpin for Flush<'_, W> {}
impl<'a, W: AsyncWrite + ?Sized + Unpin> Flush<'a, W> {
pub(super) fn new(writer: &'a mut W) -> Self {
Flush { writer }
}
}
impl<W: AsyncWrite + ?Sized + Unpin> Future for Flush<'_, W> {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let this = &mut *self;
let _ = ready!(Pin::new(&mut this.writer).poll_flush(cx))?;
Poll::Ready(Ok(()))
}
}

View File

@ -1,177 +0,0 @@
use core::cmp::min;
use core::pin::Pin;
use core::task::{Context, Poll};
use futures::ready;
mod read;
pub use self::read::Read;
mod read_buf;
pub use self::read_buf::ReadBuf;
mod read_byte;
pub use self::read_byte::ReadByte;
mod read_exact;
pub use self::read_exact::ReadExact;
mod read_while;
pub use self::read_while::ReadWhile;
mod read_to_end;
pub use self::read_to_end::ReadToEnd;
mod skip_while;
pub use self::skip_while::SkipWhile;
mod drain;
pub use self::drain::Drain;
mod flush;
pub use self::flush::Flush;
mod write;
pub use self::write::Write;
mod write_all;
pub use self::write_all::WriteAll;
mod write_byte;
pub use self::write_byte::WriteByte;
#[cfg(feature = "alloc")]
mod split;
#[cfg(feature = "alloc")]
pub use self::split::{split, ReadHalf, WriteHalf};
mod copy_buf;
pub use self::copy_buf::{copy_buf, CopyBuf};
use super::error::Result;
use super::traits::{AsyncBufRead, AsyncWrite};
pub trait AsyncBufReadExt: AsyncBufRead {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize>>
where
Self: Unpin,
{
let mut this = &mut *self;
let rbuf = ready!(Pin::new(&mut this).poll_fill_buf(cx))?;
let n = min(buf.len(), rbuf.len());
buf[..n].copy_from_slice(&rbuf[..n]);
Pin::new(&mut this).consume(n);
Poll::Ready(Ok(n))
}
fn read_while<'a, F: Fn(u8) -> bool>(
&'a mut self,
buf: &'a mut [u8],
f: F,
) -> ReadWhile<'a, Self, F>
where
Self: Unpin,
{
ReadWhile::new(self, f, buf)
}
fn skip_while<F: Fn(u8) -> bool>(&mut self, f: F) -> SkipWhile<Self, F>
where
Self: Unpin,
{
SkipWhile::new(self, f)
}
fn drain(&mut self) -> Drain<Self>
where
Self: Unpin,
{
Drain::new(self)
}
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self>
where
Self: Unpin,
{
Read::new(self, buf)
}
fn read_buf(&mut self) -> ReadBuf<Self>
where
Self: Unpin,
{
ReadBuf::new(self)
}
fn read_byte(&mut self) -> ReadByte<Self>
where
Self: Unpin,
{
ReadByte::new(self)
}
fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self>
where
Self: Unpin,
{
ReadExact::new(self, buf)
}
fn read_to_end<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadToEnd<'a, Self>
where
Self: Unpin,
{
ReadToEnd::new(self, buf)
}
}
impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}
pub async fn read_line<R: AsyncBufRead + Unpin + ?Sized>(
r: &mut R,
buf: &mut [u8],
) -> Result<usize> {
r.skip_while(|b| b == b'\r' || b == b'\n').await?;
let n = r.read_while(buf, |b| b != b'\r' && b != b'\n').await?;
r.skip_while(|b| b == b'\r').await?;
//assert_eq!(b'\n', r.read_byte().await?);
r.read_byte().await?;
Ok(n)
}
pub trait AsyncWriteExt: AsyncWrite {
fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAll<'a, Self>
where
Self: Unpin,
{
WriteAll::new(self, buf)
}
fn write_byte(&mut self, byte: u8) -> WriteByte<Self>
where
Self: Unpin,
{
WriteByte::new(self, byte)
}
fn write<'a>(&'a mut self, buf: &'a [u8]) -> Write<'a, Self>
where
Self: Unpin,
{
Write::new(self, buf)
}
/// Awaits until all bytes have actually been written, and
/// not just enqueued as per the other "write" methods.
fn flush<'a>(&mut self) -> Flush<Self>
where
Self: Unpin,
{
Flush::new(self)
}
}
impl<R: AsyncWrite + ?Sized> AsyncWriteExt for R {}

View File

@ -1,39 +0,0 @@
use super::super::error::Result;
use super::super::traits::AsyncBufRead;
use core::cmp::min;
use core::pin::Pin;
use futures::future::Future;
use futures::ready;
use futures::task::{Context, Poll};
/// Future for the [`read_exact`](super::AsyncBufReadExt::read_exact) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Read<'a, R: ?Sized> {
reader: &'a mut R,
buf: &'a mut [u8],
}
impl<R: ?Sized + Unpin> Unpin for Read<'_, R> {}
impl<'a, R: AsyncBufRead + ?Sized + Unpin> Read<'a, R> {
pub(super) fn new(reader: &'a mut R, buf: &'a mut [u8]) -> Self {
Read { reader, buf }
}
}
impl<R: AsyncBufRead + ?Sized + Unpin> Future for Read<'_, R> {
type Output = Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
let buf = ready!(Pin::new(&mut this.reader).poll_fill_buf(cx))?;
let n = min(this.buf.len(), buf.len());
this.buf[..n].copy_from_slice(&buf[..n]);
Pin::new(&mut this.reader).consume(n);
Poll::Ready(Ok(n))
}
}

View File

@ -1,34 +0,0 @@
use super::super::error::Result;
use super::super::traits::AsyncBufRead;
use core::pin::Pin;
use futures::future::Future;
use futures::ready;
use futures::task::{Context, Poll};
pub struct ReadBuf<'a, R: ?Sized> {
reader: Option<&'a mut R>,
}
impl<R: ?Sized + Unpin> Unpin for ReadBuf<'_, R> {}
impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadBuf<'a, R> {
pub(super) fn new(reader: &'a mut R) -> Self {
ReadBuf {
reader: Some(reader),
}
}
}
impl<'a, R: AsyncBufRead + ?Sized + Unpin> Future for ReadBuf<'a, R> {
type Output = Result<&'a [u8]>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
let buf = ready!(Pin::new(this.reader.as_mut().unwrap()).poll_fill_buf(cx))?;
let buf: &'a [u8] = unsafe { core::mem::transmute(buf) };
this.reader = None;
Poll::Ready(Ok(buf))
}
}

View File

@ -1,36 +0,0 @@
use core::pin::Pin;
use futures::future::Future;
use futures::ready;
use futures::task::{Context, Poll};
use super::super::error::{Error, Result};
use super::super::traits::AsyncBufRead;
pub struct ReadByte<'a, R: ?Sized> {
reader: &'a mut R,
}
impl<R: ?Sized + Unpin> Unpin for ReadByte<'_, R> {}
impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadByte<'a, R> {
pub(super) fn new(reader: &'a mut R) -> Self {
Self { reader }
}
}
impl<'a, R: AsyncBufRead + ?Sized + Unpin> Future for ReadByte<'a, R> {
type Output = Result<u8>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self { reader } = &mut *self;
let mut reader = Pin::new(reader);
let rbuf = ready!(reader.as_mut().poll_fill_buf(cx))?;
if rbuf.is_empty() {
return Poll::Ready(Err(Error::UnexpectedEof));
}
let r = rbuf[0];
reader.as_mut().consume(1);
Poll::Ready(Ok(r))
}
}

View File

@ -1,48 +0,0 @@
use super::super::error::{Error, Result};
use super::super::traits::AsyncBufRead;
use core::cmp::min;
use core::mem;
use core::pin::Pin;
use futures::future::Future;
use futures::ready;
use futures::task::{Context, Poll};
/// Future for the [`read_exact`](super::AsyncBufReadExt::read_exact) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadExact<'a, R: ?Sized> {
reader: &'a mut R,
buf: &'a mut [u8],
}
impl<R: ?Sized + Unpin> Unpin for ReadExact<'_, R> {}
impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadExact<'a, R> {
pub(super) fn new(reader: &'a mut R, buf: &'a mut [u8]) -> Self {
ReadExact { reader, buf }
}
}
impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadExact<'_, R> {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
while !this.buf.is_empty() {
let buf = ready!(Pin::new(&mut this.reader).poll_fill_buf(cx))?;
if buf.is_empty() {
return Poll::Ready(Err(Error::UnexpectedEof));
}
let n = min(this.buf.len(), buf.len());
this.buf[..n].copy_from_slice(&buf[..n]);
Pin::new(&mut this.reader).consume(n);
{
let (_, rest) = mem::take(&mut this.buf).split_at_mut(n);
this.buf = rest;
}
}
Poll::Ready(Ok(()))
}
}

View File

@ -1,48 +0,0 @@
use core::cmp::min;
use core::pin::Pin;
use futures::future::Future;
use futures::ready;
use futures::task::{Context, Poll};
use super::super::error::{Error, Result};
use super::super::traits::AsyncBufRead;
pub struct ReadToEnd<'a, R: ?Sized> {
reader: &'a mut R,
buf: &'a mut [u8],
n: usize,
}
impl<R: ?Sized + Unpin> Unpin for ReadToEnd<'_, R> {}
impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadToEnd<'a, R> {
pub(super) fn new(reader: &'a mut R, buf: &'a mut [u8]) -> Self {
Self { reader, buf, n: 0 }
}
}
impl<'a, R: AsyncBufRead + ?Sized + Unpin> Future for ReadToEnd<'a, R> {
type Output = Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self { reader, buf, n } = &mut *self;
let mut reader = Pin::new(reader);
loop {
let rbuf = ready!(reader.as_mut().poll_fill_buf(cx))?;
if rbuf.is_empty() {
return Poll::Ready(Ok(*n));
}
if *n == buf.len() {
return Poll::Ready(Err(Error::Truncated));
}
// truncate data if it doesn't fit in buf
let p = min(rbuf.len(), buf.len() - *n);
buf[*n..*n + p].copy_from_slice(&rbuf[..p]);
*n += p;
reader.as_mut().consume(p);
}
}
}

View File

@ -1,61 +0,0 @@
use core::cmp::min;
use core::pin::Pin;
use futures::future::Future;
use futures::ready;
use futures::task::{Context, Poll};
use super::super::error::{Error, Result};
use super::super::traits::AsyncBufRead;
pub struct ReadWhile<'a, R: ?Sized, F> {
reader: &'a mut R,
buf: &'a mut [u8],
n: usize,
f: F,
}
impl<R: ?Sized + Unpin, F> Unpin for ReadWhile<'_, R, F> {}
impl<'a, R: AsyncBufRead + ?Sized + Unpin, F: Fn(u8) -> bool> ReadWhile<'a, R, F> {
pub(super) fn new(reader: &'a mut R, f: F, buf: &'a mut [u8]) -> Self {
Self {
reader,
f,
buf,
n: 0,
}
}
}
impl<'a, R: AsyncBufRead + ?Sized + Unpin, F: Fn(u8) -> bool> Future for ReadWhile<'a, R, F> {
type Output = Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self { reader, f, buf, n } = &mut *self;
let mut reader = Pin::new(reader);
loop {
let rbuf = ready!(reader.as_mut().poll_fill_buf(cx))?;
if rbuf.is_empty() {
return Poll::Ready(Err(Error::UnexpectedEof));
}
let (p, done) = match rbuf.iter().position(|&b| !f(b)) {
Some(p) => (p, true),
None => (rbuf.len(), false),
};
// truncate data if it doesn't fit in buf
let p2 = min(p, buf.len() - *n);
buf[*n..*n + p2].copy_from_slice(&rbuf[..p2]);
*n += p2;
// consume it all, even if it doesn't fit.
// Otherwise we can deadlock because we never read to the ending char
reader.as_mut().consume(p);
if done {
return Poll::Ready(Ok(*n));
}
}
}
}

View File

@ -1,45 +0,0 @@
use core::iter::Iterator;
use core::pin::Pin;
use futures::future::Future;
use futures::ready;
use futures::task::{Context, Poll};
use super::super::error::{Error, Result};
use super::super::traits::AsyncBufRead;
pub struct SkipWhile<'a, R: ?Sized, F> {
reader: &'a mut R,
f: F,
}
impl<R: ?Sized + Unpin, F> Unpin for SkipWhile<'_, R, F> {}
impl<'a, R: AsyncBufRead + ?Sized + Unpin, F: Fn(u8) -> bool> SkipWhile<'a, R, F> {
pub(super) fn new(reader: &'a mut R, f: F) -> Self {
Self { reader, f }
}
}
impl<'a, R: AsyncBufRead + ?Sized + Unpin, F: Fn(u8) -> bool> Future for SkipWhile<'a, R, F> {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self { reader, f } = &mut *self;
let mut reader = Pin::new(reader);
loop {
let buf = ready!(reader.as_mut().poll_fill_buf(cx))?;
if buf.is_empty() {
return Poll::Ready(Err(Error::UnexpectedEof));
}
let (p, done) = match buf.iter().position(|b| !f(*b)) {
Some(p) => (p, true),
None => (buf.len(), false),
};
reader.as_mut().consume(p);
if done {
return Poll::Ready(Ok(()));
}
}
}
}

View File

@ -1,43 +0,0 @@
use alloc::rc::Rc;
use core::cell::UnsafeCell;
use core::pin::Pin;
use futures::task::{Context, Poll};
use super::super::error::Result;
use super::super::traits::{AsyncBufRead, AsyncWrite};
/// The readable half of an object returned from `AsyncBufRead::split`.
#[derive(Debug)]
pub struct ReadHalf<T> {
handle: Rc<UnsafeCell<T>>,
}
/// The writable half of an object returned from `AsyncBufRead::split`.
#[derive(Debug)]
pub struct WriteHalf<T> {
handle: Rc<UnsafeCell<T>>,
}
impl<T: AsyncBufRead + Unpin> AsyncBufRead for ReadHalf<T> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
Pin::new(unsafe { &mut *self.handle.get() }).poll_fill_buf(cx)
}
fn consume(self: Pin<&mut Self>, amt: usize) {
Pin::new(unsafe { &mut *self.handle.get() }).consume(amt)
}
}
impl<T: AsyncWrite + Unpin> AsyncWrite for WriteHalf<T> {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
Pin::new(unsafe { &mut *self.handle.get() }).poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
Pin::new(unsafe { &mut *self.handle.get() }).poll_flush(cx)
}
}
pub fn split<T: AsyncBufRead + AsyncWrite>(t: T) -> (ReadHalf<T>, WriteHalf<T>) {
let c = Rc::new(UnsafeCell::new(t));
(ReadHalf { handle: c.clone() }, WriteHalf { handle: c })
}

View File

@ -1,33 +0,0 @@
use core::pin::Pin;
use futures::future::Future;
use futures::ready;
use futures::task::{Context, Poll};
use super::super::error::Result;
use super::super::traits::AsyncWrite;
/// Future for the [`write_all`](super::AsyncWriteExt::write_all) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Write<'a, W: ?Sized> {
writer: &'a mut W,
buf: &'a [u8],
}
impl<W: ?Sized + Unpin> Unpin for Write<'_, W> {}
impl<'a, W: AsyncWrite + ?Sized + Unpin> Write<'a, W> {
pub(super) fn new(writer: &'a mut W, buf: &'a [u8]) -> Self {
Write { writer, buf }
}
}
impl<W: AsyncWrite + ?Sized + Unpin> Future for Write<'_, W> {
type Output = Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<usize>> {
let this = &mut *self;
let n = ready!(Pin::new(&mut this.writer).poll_write(cx, this.buf))?;
Poll::Ready(Ok(n))
}
}

View File

@ -1,44 +0,0 @@
use core::mem;
use core::pin::Pin;
use futures::future::Future;
use futures::ready;
use futures::task::{Context, Poll};
use super::super::error::Result;
use super::super::traits::AsyncWrite;
/// Future for the [`write_all`](super::AsyncWriteExt::write_all) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct WriteAll<'a, W: ?Sized> {
writer: &'a mut W,
buf: &'a [u8],
}
impl<W: ?Sized + Unpin> Unpin for WriteAll<'_, W> {}
impl<'a, W: AsyncWrite + ?Sized + Unpin> WriteAll<'a, W> {
pub(super) fn new(writer: &'a mut W, buf: &'a [u8]) -> Self {
WriteAll { writer, buf }
}
}
impl<W: AsyncWrite + ?Sized + Unpin> Future for WriteAll<'_, W> {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let this = &mut *self;
while !this.buf.is_empty() {
let n = ready!(Pin::new(&mut this.writer).poll_write(cx, this.buf))?;
{
let (_, rest) = mem::take(&mut this.buf).split_at(n);
this.buf = rest;
}
if n == 0 {
panic!();
}
}
Poll::Ready(Ok(()))
}
}

View File

@ -1,39 +0,0 @@
use core::pin::Pin;
use futures::future::Future;
use futures::ready;
use futures::task::{Context, Poll};
use super::super::error::Result;
use super::super::traits::AsyncWrite;
/// Future for the [`write_all`](super::AsyncWriteExt::write_all) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct WriteByte<'a, W: ?Sized> {
writer: &'a mut W,
byte: u8,
}
impl<W: ?Sized + Unpin> Unpin for WriteByte<'_, W> {}
impl<'a, W: AsyncWrite + ?Sized + Unpin> WriteByte<'a, W> {
pub(super) fn new(writer: &'a mut W, byte: u8) -> Self {
WriteByte { writer, byte }
}
}
impl<W: AsyncWrite + ?Sized + Unpin> Future for WriteByte<'_, W> {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let this = &mut *self;
let buf = [this.byte; 1];
let n = ready!(Pin::new(&mut this.writer).poll_write(cx, &buf))?;
if n == 0 {
panic!();
}
assert!(n == 1);
Poll::Ready(Ok(()))
}
}

View File

@ -13,7 +13,6 @@ pub mod channel;
pub mod executor;
#[cfg(cortex_m)]
pub mod interrupt;
pub mod io;
pub mod mutex;
#[cfg(feature = "time")]
pub mod time;