From d5ab0d3ebb119c7ffd95da4b67325f75cae05b7e Mon Sep 17 00:00:00 2001 From: Artur Kowalski Date: Thu, 28 Jul 2022 10:25:47 +0200 Subject: [PATCH 1/4] Add UDP socket support --- embassy-net/Cargo.toml | 1 + embassy-net/src/lib.rs | 5 + embassy-net/src/udp.rs | 227 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 233 insertions(+) create mode 100644 embassy-net/src/udp.rs diff --git a/embassy-net/Cargo.toml b/embassy-net/Cargo.toml index fface207..e4d8c2c2 100644 --- a/embassy-net/Cargo.toml +++ b/embassy-net/Cargo.toml @@ -18,6 +18,7 @@ std = [] defmt = ["dep:defmt", "smoltcp/defmt"] +udp = ["smoltcp/socket-udp"] tcp = ["smoltcp/socket-tcp"] dns = ["smoltcp/socket-dns"] dhcpv4 = ["medium-ethernet", "smoltcp/socket-dhcpv4"] diff --git a/embassy-net/src/lib.rs b/embassy-net/src/lib.rs index 1c5ba103..83d36471 100644 --- a/embassy-net/src/lib.rs +++ b/embassy-net/src/lib.rs @@ -16,6 +16,9 @@ pub use stack::{Config, ConfigStrategy, Stack, StackResources}; #[cfg(feature = "tcp")] pub mod tcp; +#[cfg(feature = "udp")] +pub mod udp; + // smoltcp reexports pub use smoltcp::phy::{DeviceCapabilities, Medium}; pub use smoltcp::time::{Duration as SmolDuration, Instant as SmolInstant}; @@ -24,3 +27,5 @@ pub use smoltcp::wire::{EthernetAddress, HardwareAddress}; pub use smoltcp::wire::{IpAddress, IpCidr, Ipv4Address, Ipv4Cidr}; #[cfg(feature = "proto-ipv6")] pub use smoltcp::wire::{Ipv6Address, Ipv6Cidr}; +#[cfg(feature = "udp")] +pub use smoltcp::{socket::udp::PacketMetadata, wire::IpListenEndpoint}; diff --git a/embassy-net/src/udp.rs b/embassy-net/src/udp.rs new file mode 100644 index 00000000..6b15805c --- /dev/null +++ b/embassy-net/src/udp.rs @@ -0,0 +1,227 @@ +use core::cell::UnsafeCell; +use core::mem; +use core::task::Poll; + +use futures::future::poll_fn; +use smoltcp::iface::{Interface, SocketHandle}; +use smoltcp::socket::udp::{self, PacketMetadata}; +use smoltcp::wire::{IpEndpoint, IpListenEndpoint}; + +use super::stack::SocketStack; +use crate::{Device, Stack}; + +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum BindError { + /// The socket was already open. + InvalidState, + /// No route to host. + NoRoute, +} + +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum Error { + /// No route to host. + NoRoute, +} + +pub struct UdpSocket<'a> { + io: UdpIo<'a>, +} + +pub struct UdpReader<'a> { + io: UdpIo<'a>, +} + +pub struct UdpWriter<'a> { + io: UdpIo<'a>, +} + +impl<'a> UdpSocket<'a> { + pub fn new( + stack: &'a Stack, + rx_meta: &'a mut [PacketMetadata], + rx_buffer: &'a mut [u8], + tx_meta: &'a mut [PacketMetadata], + tx_buffer: &'a mut [u8], + ) -> Self { + // safety: not accessed reentrantly. + let s = unsafe { &mut *stack.socket.get() }; + + let rx_meta: &'static mut [PacketMetadata] = unsafe { mem::transmute(rx_meta) }; + let rx_buffer: &'static mut [u8] = unsafe { mem::transmute(rx_buffer) }; + let tx_meta: &'static mut [PacketMetadata] = unsafe { mem::transmute(tx_meta) }; + let tx_buffer: &'static mut [u8] = unsafe { mem::transmute(tx_buffer) }; + let handle = s.sockets.add(udp::Socket::new( + udp::PacketBuffer::new(rx_meta, rx_buffer), + udp::PacketBuffer::new(tx_meta, tx_buffer), + )); + + Self { + io: UdpIo { + stack: &stack.socket, + handle, + }, + } + } + + pub fn split(&mut self) -> (UdpReader<'_>, UdpWriter<'_>) { + (UdpReader { io: self.io }, UdpWriter { io: self.io }) + } + + pub fn bind(&mut self, endpoint: T) -> Result<(), BindError> + where + T: Into, + { + let mut endpoint = endpoint.into(); + + // safety: not accessed reentrantly. + if endpoint.port == 0 { + // If user didn't specify port allocate a dynamic port. + endpoint.port = unsafe { &mut *self.io.stack.get() }.get_local_port(); + } + + // safety: not accessed reentrantly. + match unsafe { self.io.with_mut(|s, _| s.bind(endpoint)) } { + Ok(()) => Ok(()), + Err(udp::BindError::InvalidState) => Err(BindError::InvalidState), + Err(udp::BindError::Unaddressable) => Err(BindError::NoRoute), + } + } + + pub async fn send_to(&mut self, buf: &[u8], remote_endpoint: T) -> Result<(), Error> + where + T: Into, + { + self.io.write(buf, remote_endpoint.into()).await + } + + pub async fn recv_from(&mut self, buf: &mut [u8]) -> Result<(usize, IpEndpoint), Error> { + self.io.read(buf).await + } + + pub async fn flush(&mut self) -> Result<(), Error> { + self.io.flush().await + } + + pub fn endpoint(&self) -> IpListenEndpoint { + unsafe { self.io.with(|s, _| s.endpoint()) } + } + + pub fn is_open(&self) -> bool { + unsafe { self.io.with(|s, _| s.is_open()) } + } + + pub fn close(&mut self) { + unsafe { self.io.with_mut(|s, _| s.close()) } + } + + pub fn may_send(&self) -> bool { + unsafe { self.io.with(|s, _| s.can_send()) } + } + + pub fn may_recv(&self) -> bool { + unsafe { self.io.with(|s, _| s.can_recv()) } + } +} + +impl Drop for UdpSocket<'_> { + fn drop(&mut self) { + // safety: not accessed reentrantly. + let s = unsafe { &mut *self.io.stack.get() }; + s.sockets.remove(self.io.handle); + } +} + +#[derive(Copy, Clone)] +pub struct UdpIo<'a> { + stack: &'a UnsafeCell, + handle: SocketHandle, +} + +impl UdpIo<'_> { + /// SAFETY: must not call reentrantly. + unsafe fn with(&self, f: impl FnOnce(&udp::Socket, &Interface) -> R) -> R { + let s = &*self.stack.get(); + let socket = s.sockets.get::(self.handle); + f(socket, &s.iface) + } + + /// SAFETY: must not call reentrantly. + unsafe fn with_mut(&mut self, f: impl FnOnce(&mut udp::Socket, &mut Interface) -> R) -> R { + let s = &mut *self.stack.get(); + let socket = s.sockets.get_mut::(self.handle); + let res = f(socket, &mut s.iface); + s.waker.wake(); + res + } + + async fn read(&mut self, buf: &mut [u8]) -> Result<(usize, IpEndpoint), Error> { + poll_fn(move |cx| unsafe { + self.with_mut(|s, _| match s.recv_slice(buf) { + Ok(x) => Poll::Ready(Ok(x)), + // No data ready + Err(udp::RecvError::Exhausted) => { + //s.register_recv_waker(cx.waker()); + cx.waker().wake_by_ref(); + Poll::Pending + } + }) + }) + .await + } + + async fn write(&mut self, buf: &[u8], ep: IpEndpoint) -> Result<(), Error> { + poll_fn(move |cx| unsafe { + self.with_mut(|s, _| match s.send_slice(buf, ep) { + // Entire datagram has been sent + Ok(()) => Poll::Ready(Ok(())), + Err(udp::SendError::BufferFull) => { + s.register_send_waker(cx.waker()); + Poll::Pending + } + Err(udp::SendError::Unaddressable) => Poll::Ready(Err(Error::NoRoute)), + }) + }) + .await + } + + async fn flush(&mut self) -> Result<(), Error> { + poll_fn(move |_| { + Poll::Ready(Ok(())) // TODO: Is there a better implementation for this? + }) + .await + } +} + +impl UdpReader<'_> { + pub async fn recv_from(&mut self, buf: &mut [u8]) -> Result<(usize, IpEndpoint), Error> { + self.io.read(buf).await + } +} + +impl UdpWriter<'_> { + pub async fn send_to(&mut self, buf: &[u8], remote_endpoint: T) -> Result<(), Error> + where + T: Into, + { + self.io.write(buf, remote_endpoint.into()).await + } + + pub async fn flush(&mut self) -> Result<(), Error> { + self.io.flush().await + } +} + +impl embedded_io::Error for BindError { + fn kind(&self) -> embedded_io::ErrorKind { + embedded_io::ErrorKind::Other + } +} + +impl embedded_io::Error for Error { + fn kind(&self) -> embedded_io::ErrorKind { + embedded_io::ErrorKind::Other + } +} From 865a91976c5b6b5c45b37e0286e7c328e8404dde Mon Sep 17 00:00:00 2001 From: Artur Kowalski Date: Wed, 10 Aug 2022 19:38:34 +0200 Subject: [PATCH 2/4] Add UDP example app --- examples/std/Cargo.toml | 2 +- examples/std/src/bin/net_udp.rs | 109 ++++++++++++++++++++++++++++++++ 2 files changed, 110 insertions(+), 1 deletion(-) create mode 100644 examples/std/src/bin/net_udp.rs diff --git a/examples/std/Cargo.toml b/examples/std/Cargo.toml index 54499796..427b9343 100644 --- a/examples/std/Cargo.toml +++ b/examples/std/Cargo.toml @@ -6,7 +6,7 @@ version = "0.1.0" [dependencies] embassy-util = { version = "0.1.0", path = "../../embassy-util", features = ["log"] } embassy-executor = { version = "0.1.0", path = "../../embassy-executor", features = ["log", "std", "time", "nightly"] } -embassy-net = { version = "0.1.0", path = "../../embassy-net", features=[ "std", "log", "medium-ethernet", "tcp", "dhcpv4", "pool-16"] } +embassy-net = { version = "0.1.0", path = "../../embassy-net", features=[ "std", "log", "medium-ethernet", "tcp", "udp", "dhcpv4", "pool-16"] } embedded-io = { version = "0.3.0", features = ["async", "std", "futures"] } async-io = "1.6.0" diff --git a/examples/std/src/bin/net_udp.rs b/examples/std/src/bin/net_udp.rs new file mode 100644 index 00000000..7fe36e23 --- /dev/null +++ b/examples/std/src/bin/net_udp.rs @@ -0,0 +1,109 @@ +#![feature(type_alias_impl_trait)] + +use clap::Parser; +use embassy_executor::executor::{Executor, Spawner}; +use embassy_net::udp::UdpSocket; +use embassy_net::{ConfigStrategy, Ipv4Address, Ipv4Cidr, PacketMetadata, Stack, StackResources}; +use embassy_util::Forever; +use heapless::Vec; +use log::*; +use rand_core::{OsRng, RngCore}; + +#[path = "../tuntap.rs"] +mod tuntap; + +use crate::tuntap::TunTapDevice; + +macro_rules! forever { + ($val:expr) => {{ + type T = impl Sized; + static FOREVER: Forever = Forever::new(); + FOREVER.put_with(move || $val) + }}; +} + +#[derive(Parser)] +#[clap(version = "1.0")] +struct Opts { + /// TAP device name + #[clap(long, default_value = "tap0")] + tap: String, + /// use a static IP instead of DHCP + #[clap(long)] + static_ip: bool, +} + +#[embassy_executor::task] +async fn net_task(stack: &'static Stack) -> ! { + stack.run().await +} + +#[embassy_executor::task] +async fn main_task(spawner: Spawner) { + let opts: Opts = Opts::parse(); + + // Init network device + let device = TunTapDevice::new(&opts.tap).unwrap(); + + // Choose between dhcp or static ip + let config = if opts.static_ip { + ConfigStrategy::Static(embassy_net::Config { + address: Ipv4Cidr::new(Ipv4Address::new(192, 168, 69, 2), 24), + dns_servers: Vec::new(), + gateway: Some(Ipv4Address::new(192, 168, 69, 1)), + }) + } else { + ConfigStrategy::Dhcp + }; + + // Generate random seed + let mut seed = [0; 8]; + OsRng.fill_bytes(&mut seed); + let seed = u64::from_le_bytes(seed); + + // Init network stack + let stack = &*forever!(Stack::new( + device, + config, + forever!(StackResources::<1, 2, 8>::new()), + seed + )); + + // Launch network task + spawner.spawn(net_task(stack)).unwrap(); + + // Then we can use it! + let mut rx_meta = [PacketMetadata::EMPTY; 16]; + let mut rx_buffer = [0; 4096]; + let mut tx_meta = [PacketMetadata::EMPTY; 16]; + let mut tx_buffer = [0; 4096]; + let mut buf = [0; 4096]; + + let mut socket = UdpSocket::new(stack, &mut rx_meta, &mut rx_buffer, &mut tx_meta, &mut tx_buffer); + socket.bind(9400).unwrap(); + + loop { + let (n, ep) = socket.recv_from(&mut buf).await.unwrap(); + if let Ok(s) = core::str::from_utf8(&buf[..n]) { + info!("ECHO (to {}): {}", ep, s); + } else { + info!("ECHO (to {}): bytearray len {}", ep, n); + } + socket.send_to(&buf[..n], ep).await.unwrap(); + } +} + +static EXECUTOR: Forever = Forever::new(); + +fn main() { + env_logger::builder() + .filter_level(log::LevelFilter::Debug) + .filter_module("async_io", log::LevelFilter::Info) + .format_timestamp_nanos() + .init(); + + let executor = EXECUTOR.put(Executor::new()); + executor.run(|spawner| { + spawner.spawn(main_task(spawner)).unwrap(); + }); +} From b97983242d16a321bab8c13f9df4c8af99d89a0f Mon Sep 17 00:00:00 2001 From: Artur Kowalski Date: Thu, 11 Aug 2022 08:23:18 +0200 Subject: [PATCH 3/4] Simplify UDP code Drop unneeded APIs: remove impls of embedded_io error traits, remove flush() and split() methods. --- embassy-net/src/udp.rs | 64 ++++-------------------------------------- 1 file changed, 5 insertions(+), 59 deletions(-) diff --git a/embassy-net/src/udp.rs b/embassy-net/src/udp.rs index 6b15805c..ee90c301 100644 --- a/embassy-net/src/udp.rs +++ b/embassy-net/src/udp.rs @@ -30,14 +30,6 @@ pub struct UdpSocket<'a> { io: UdpIo<'a>, } -pub struct UdpReader<'a> { - io: UdpIo<'a>, -} - -pub struct UdpWriter<'a> { - io: UdpIo<'a>, -} - impl<'a> UdpSocket<'a> { pub fn new( stack: &'a Stack, @@ -66,10 +58,6 @@ impl<'a> UdpSocket<'a> { } } - pub fn split(&mut self) -> (UdpReader<'_>, UdpWriter<'_>) { - (UdpReader { io: self.io }, UdpWriter { io: self.io }) - } - pub fn bind(&mut self, endpoint: T) -> Result<(), BindError> where T: Into, @@ -90,21 +78,17 @@ impl<'a> UdpSocket<'a> { } } - pub async fn send_to(&mut self, buf: &[u8], remote_endpoint: T) -> Result<(), Error> + pub async fn send_to(&self, buf: &[u8], remote_endpoint: T) -> Result<(), Error> where T: Into, { self.io.write(buf, remote_endpoint.into()).await } - pub async fn recv_from(&mut self, buf: &mut [u8]) -> Result<(usize, IpEndpoint), Error> { + pub async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, IpEndpoint), Error> { self.io.read(buf).await } - pub async fn flush(&mut self) -> Result<(), Error> { - self.io.flush().await - } - pub fn endpoint(&self) -> IpListenEndpoint { unsafe { self.io.with(|s, _| s.endpoint()) } } @@ -149,7 +133,7 @@ impl UdpIo<'_> { } /// SAFETY: must not call reentrantly. - unsafe fn with_mut(&mut self, f: impl FnOnce(&mut udp::Socket, &mut Interface) -> R) -> R { + unsafe fn with_mut(&self, f: impl FnOnce(&mut udp::Socket, &mut Interface) -> R) -> R { let s = &mut *self.stack.get(); let socket = s.sockets.get_mut::(self.handle); let res = f(socket, &mut s.iface); @@ -157,7 +141,7 @@ impl UdpIo<'_> { res } - async fn read(&mut self, buf: &mut [u8]) -> Result<(usize, IpEndpoint), Error> { + async fn read(&self, buf: &mut [u8]) -> Result<(usize, IpEndpoint), Error> { poll_fn(move |cx| unsafe { self.with_mut(|s, _| match s.recv_slice(buf) { Ok(x) => Poll::Ready(Ok(x)), @@ -172,7 +156,7 @@ impl UdpIo<'_> { .await } - async fn write(&mut self, buf: &[u8], ep: IpEndpoint) -> Result<(), Error> { + async fn write(&self, buf: &[u8], ep: IpEndpoint) -> Result<(), Error> { poll_fn(move |cx| unsafe { self.with_mut(|s, _| match s.send_slice(buf, ep) { // Entire datagram has been sent @@ -186,42 +170,4 @@ impl UdpIo<'_> { }) .await } - - async fn flush(&mut self) -> Result<(), Error> { - poll_fn(move |_| { - Poll::Ready(Ok(())) // TODO: Is there a better implementation for this? - }) - .await - } -} - -impl UdpReader<'_> { - pub async fn recv_from(&mut self, buf: &mut [u8]) -> Result<(usize, IpEndpoint), Error> { - self.io.read(buf).await - } -} - -impl UdpWriter<'_> { - pub async fn send_to(&mut self, buf: &[u8], remote_endpoint: T) -> Result<(), Error> - where - T: Into, - { - self.io.write(buf, remote_endpoint.into()).await - } - - pub async fn flush(&mut self) -> Result<(), Error> { - self.io.flush().await - } -} - -impl embedded_io::Error for BindError { - fn kind(&self) -> embedded_io::ErrorKind { - embedded_io::ErrorKind::Other - } -} - -impl embedded_io::Error for Error { - fn kind(&self) -> embedded_io::ErrorKind { - embedded_io::ErrorKind::Other - } } From ef473827a2beaca120f45fbe490f84a0be7d381d Mon Sep 17 00:00:00 2001 From: Artur Kowalski Date: Thu, 11 Aug 2022 15:52:32 +0200 Subject: [PATCH 4/4] Remove UdpIo struct UdpIo was shared by split sender/receives halves. Since split() API is no more UdpIo is not needed and its APIs may be moved into UdpSocket. --- embassy-net/src/udp.rs | 98 ++++++++++++++++++------------------------ 1 file changed, 41 insertions(+), 57 deletions(-) diff --git a/embassy-net/src/udp.rs b/embassy-net/src/udp.rs index ee90c301..78b09a49 100644 --- a/embassy-net/src/udp.rs +++ b/embassy-net/src/udp.rs @@ -27,7 +27,8 @@ pub enum Error { } pub struct UdpSocket<'a> { - io: UdpIo<'a>, + stack: &'a UnsafeCell, + handle: SocketHandle, } impl<'a> UdpSocket<'a> { @@ -51,10 +52,8 @@ impl<'a> UdpSocket<'a> { )); Self { - io: UdpIo { - stack: &stack.socket, - handle, - }, + stack: &stack.socket, + handle, } } @@ -67,64 +66,17 @@ impl<'a> UdpSocket<'a> { // safety: not accessed reentrantly. if endpoint.port == 0 { // If user didn't specify port allocate a dynamic port. - endpoint.port = unsafe { &mut *self.io.stack.get() }.get_local_port(); + endpoint.port = unsafe { &mut *self.stack.get() }.get_local_port(); } // safety: not accessed reentrantly. - match unsafe { self.io.with_mut(|s, _| s.bind(endpoint)) } { + match unsafe { self.with_mut(|s, _| s.bind(endpoint)) } { Ok(()) => Ok(()), Err(udp::BindError::InvalidState) => Err(BindError::InvalidState), Err(udp::BindError::Unaddressable) => Err(BindError::NoRoute), } } - pub async fn send_to(&self, buf: &[u8], remote_endpoint: T) -> Result<(), Error> - where - T: Into, - { - self.io.write(buf, remote_endpoint.into()).await - } - - pub async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, IpEndpoint), Error> { - self.io.read(buf).await - } - - pub fn endpoint(&self) -> IpListenEndpoint { - unsafe { self.io.with(|s, _| s.endpoint()) } - } - - pub fn is_open(&self) -> bool { - unsafe { self.io.with(|s, _| s.is_open()) } - } - - pub fn close(&mut self) { - unsafe { self.io.with_mut(|s, _| s.close()) } - } - - pub fn may_send(&self) -> bool { - unsafe { self.io.with(|s, _| s.can_send()) } - } - - pub fn may_recv(&self) -> bool { - unsafe { self.io.with(|s, _| s.can_recv()) } - } -} - -impl Drop for UdpSocket<'_> { - fn drop(&mut self) { - // safety: not accessed reentrantly. - let s = unsafe { &mut *self.io.stack.get() }; - s.sockets.remove(self.io.handle); - } -} - -#[derive(Copy, Clone)] -pub struct UdpIo<'a> { - stack: &'a UnsafeCell, - handle: SocketHandle, -} - -impl UdpIo<'_> { /// SAFETY: must not call reentrantly. unsafe fn with(&self, f: impl FnOnce(&udp::Socket, &Interface) -> R) -> R { let s = &*self.stack.get(); @@ -141,7 +93,7 @@ impl UdpIo<'_> { res } - async fn read(&self, buf: &mut [u8]) -> Result<(usize, IpEndpoint), Error> { + pub async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, IpEndpoint), Error> { poll_fn(move |cx| unsafe { self.with_mut(|s, _| match s.recv_slice(buf) { Ok(x) => Poll::Ready(Ok(x)), @@ -156,9 +108,13 @@ impl UdpIo<'_> { .await } - async fn write(&self, buf: &[u8], ep: IpEndpoint) -> Result<(), Error> { + pub async fn send_to(&self, buf: &[u8], remote_endpoint: T) -> Result<(), Error> + where + T: Into, + { + let remote_endpoint = remote_endpoint.into(); poll_fn(move |cx| unsafe { - self.with_mut(|s, _| match s.send_slice(buf, ep) { + self.with_mut(|s, _| match s.send_slice(buf, remote_endpoint) { // Entire datagram has been sent Ok(()) => Poll::Ready(Ok(())), Err(udp::SendError::BufferFull) => { @@ -170,4 +126,32 @@ impl UdpIo<'_> { }) .await } + + pub fn endpoint(&self) -> IpListenEndpoint { + unsafe { self.with(|s, _| s.endpoint()) } + } + + pub fn is_open(&self) -> bool { + unsafe { self.with(|s, _| s.is_open()) } + } + + pub fn close(&mut self) { + unsafe { self.with_mut(|s, _| s.close()) } + } + + pub fn may_send(&self) -> bool { + unsafe { self.with(|s, _| s.can_send()) } + } + + pub fn may_recv(&self) -> bool { + unsafe { self.with(|s, _| s.can_recv()) } + } +} + +impl Drop for UdpSocket<'_> { + fn drop(&mut self) { + // safety: not accessed reentrantly. + let s = unsafe { &mut *self.stack.get() }; + s.sockets.remove(self.handle); + } }