From 18671b94ba173d6b5c2d2ec5e3569e39a03b61bb Mon Sep 17 00:00:00 2001 From: Ulf Lilleengen Date: Mon, 8 Aug 2022 16:51:34 +0200 Subject: [PATCH 1/4] Implement embedded-nal-async traits for embassy-net --- embassy-net/Cargo.toml | 3 + embassy-net/src/tcp.rs | 167 ++++++++++++++++++++++++++++++++++++ examples/stm32h7/Cargo.toml | 2 +- 3 files changed, 171 insertions(+), 1 deletion(-) diff --git a/embassy-net/Cargo.toml b/embassy-net/Cargo.toml index 64cb5bd8..fface207 100644 --- a/embassy-net/Cargo.toml +++ b/embassy-net/Cargo.toml @@ -31,6 +31,7 @@ pool-16 = [] pool-32 = [] pool-64 = [] pool-128 = [] +unstable-traits = [] [dependencies] @@ -48,6 +49,8 @@ generic-array = { version = "0.14.4", default-features = false } stable_deref_trait = { version = "1.2.0", default-features = false } futures = { version = "0.3.17", default-features = false, features = [ "async-await" ] } atomic-pool = "0.2.1" +atomic-polyfill = "0.1.5" +embedded-nal-async = "0.2.0" [dependencies.smoltcp] version = "0.8.0" diff --git a/embassy-net/src/tcp.rs b/embassy-net/src/tcp.rs index c18391ac..96a6dfe2 100644 --- a/embassy-net/src/tcp.rs +++ b/embassy-net/src/tcp.rs @@ -328,3 +328,170 @@ impl<'d> embedded_io::asynch::Write for TcpWriter<'d> { self.io.flush() } } + +#[cfg(feature = "unstable-traits")] +pub mod client { + use core::mem::MaybeUninit; + use core::ptr::NonNull; + + use atomic_polyfill::{AtomicBool, Ordering}; + use embedded_nal_async::IpAddr; + + use super::*; + + pub struct TcpClient<'d, D: Device, const N: usize, const TX_SZ: usize = 1024, const RX_SZ: usize = 1024> { + stack: &'d Stack, + tx: &'d BufferPool, + rx: &'d BufferPool, + } + + impl<'d, D: Device, const N: usize, const TX_SZ: usize, const RX_SZ: usize> TcpClient<'d, D, N, TX_SZ, RX_SZ> { + pub fn new(stack: &'d Stack, tx: &'d BufferPool, rx: &'d BufferPool) -> Self { + Self { stack, tx, rx } + } + } + + impl<'d, D: Device, const N: usize, const TX_SZ: usize, const RX_SZ: usize> embedded_nal_async::TcpConnect + for TcpClient<'d, D, N, TX_SZ, RX_SZ> + { + type Error = Error; + type Connection<'m> = TcpConnection<'m, N, TX_SZ, RX_SZ> where Self: 'm; + type ConnectFuture<'m> = impl Future, Self::Error>> + 'm + where + Self: 'm; + + fn connect<'m>(&'m self, remote: embedded_nal_async::SocketAddr) -> Self::ConnectFuture<'m> { + async move { + let addr: crate::IpAddress = match remote.ip() { + IpAddr::V4(addr) => crate::IpAddress::Ipv4(crate::Ipv4Address::from_bytes(&addr.octets())), + #[cfg(feature = "proto-ipv6")] + IpAddr::V6(addr) => crate::IpAddress::Ipv6(crate::Ipv6Address::from_bytes(&addr.octets())), + #[cfg(not(feature = "proto-ipv6"))] + IpAddr::V6(_) => panic!("ipv6 support not enabled"), + }; + let remote_endpoint = (addr, remote.port()); + let mut socket = TcpConnection::new(&self.stack, self.tx, self.rx)?; + socket + .socket + .connect(remote_endpoint) + .await + .map_err(|_| Error::ConnectionReset)?; + Ok(socket) + } + } + } + + pub struct TcpConnection<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> { + socket: TcpSocket<'d>, + tx: &'d BufferPool, + rx: &'d BufferPool, + txb: NonNull<[u8; TX_SZ]>, + rxb: NonNull<[u8; RX_SZ]>, + } + + impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> TcpConnection<'d, N, TX_SZ, RX_SZ> { + fn new( + stack: &'d Stack, + tx: &'d BufferPool, + rx: &'d BufferPool, + ) -> Result { + let mut txb = tx.alloc().ok_or(Error::ConnectionReset)?; + let mut rxb = rx.alloc().ok_or(Error::ConnectionReset)?; + Ok(Self { + socket: unsafe { TcpSocket::new(stack, rxb.as_mut(), txb.as_mut()) }, + tx, + rx, + txb, + rxb, + }) + } + } + + impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> Drop for TcpConnection<'d, N, TX_SZ, RX_SZ> { + fn drop(&mut self) { + unsafe { + self.socket.close(); + self.rx.free(self.rxb); + self.tx.free(self.txb); + } + } + } + + impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> embedded_io::Io + for TcpConnection<'d, N, TX_SZ, RX_SZ> + { + type Error = Error; + } + + impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> embedded_io::asynch::Read + for TcpConnection<'d, N, TX_SZ, RX_SZ> + { + type ReadFuture<'a> = impl Future> + where + Self: 'a; + + fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { + self.socket.read(buf) + } + } + + impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> embedded_io::asynch::Write + for TcpConnection<'d, N, TX_SZ, RX_SZ> + { + type WriteFuture<'a> = impl Future> + where + Self: 'a; + + fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { + self.socket.write(buf) + } + + type FlushFuture<'a> = impl Future> + where + Self: 'a; + + fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { + self.socket.flush() + } + } + + pub type BufferPool = Pool<[u8; BUFSZ], N>; + + pub struct Pool { + used: [AtomicBool; N], + data: [UnsafeCell>; N], + } + + impl Pool { + const VALUE: AtomicBool = AtomicBool::new(false); + const UNINIT: UnsafeCell> = UnsafeCell::new(MaybeUninit::uninit()); + + pub const fn new() -> Self { + Self { + used: [Self::VALUE; N], + data: [Self::UNINIT; N], + } + } + } + + impl Pool { + fn alloc(&self) -> Option> { + for n in 0..N { + if self.used[n].swap(true, Ordering::SeqCst) == false { + let p = self.data[n].get() as *mut T; + return Some(unsafe { NonNull::new_unchecked(p) }); + } + } + None + } + + /// safety: p must be a pointer obtained from self.alloc that hasn't been freed yet. + unsafe fn free(&self, p: NonNull) { + let origin = self.data.as_ptr() as *mut T; + let n = p.as_ptr().offset_from(origin); + assert!(n >= 0); + assert!((n as usize) < N); + self.used[n as usize].store(false, Ordering::SeqCst); + } + } +} diff --git a/examples/stm32h7/Cargo.toml b/examples/stm32h7/Cargo.toml index 8b1999b3..07b7e493 100644 --- a/examples/stm32h7/Cargo.toml +++ b/examples/stm32h7/Cargo.toml @@ -7,7 +7,7 @@ version = "0.1.0" embassy-util = { version = "0.1.0", path = "../../embassy-util", features = ["defmt"] } embassy-executor = { version = "0.1.0", path = "../../embassy-executor", features = ["defmt", "defmt-timestamp-uptime", "unstable-traits", "time-tick-32768hz"] } embassy-stm32 = { version = "0.1.0", path = "../../embassy-stm32", features = ["nightly", "defmt", "stm32h743bi", "net", "time-driver-any", "exti", "unstable-pac", "unstable-traits"] } -embassy-net = { path = "../../embassy-net", features = ["defmt", "tcp", "dhcpv4", "medium-ethernet", "pool-16"] } +embassy-net = { path = "../../embassy-net", features = ["defmt", "tcp", "dhcpv4", "medium-ethernet", "pool-16", "unstable-traits"] } embedded-io = { version = "0.3.0", features = ["async"] } defmt = "0.3" From 80c1551153b06a14e5dc475f6fbd945db06b8117 Mon Sep 17 00:00:00 2001 From: Ulf Lilleengen Date: Tue, 9 Aug 2022 14:43:55 +0200 Subject: [PATCH 2/4] Wrap buffers in a single state type --- embassy-net/src/tcp.rs | 53 +++++++++++++++++++++---------------- examples/stm32h7/Cargo.toml | 1 + 2 files changed, 31 insertions(+), 23 deletions(-) diff --git a/embassy-net/src/tcp.rs b/embassy-net/src/tcp.rs index 96a6dfe2..814e7ab6 100644 --- a/embassy-net/src/tcp.rs +++ b/embassy-net/src/tcp.rs @@ -339,15 +339,16 @@ pub mod client { use super::*; + /// TCP client capable of creating up to N multiple connections with tx and rx buffers according to TX_SZ and RX_SZ. pub struct TcpClient<'d, D: Device, const N: usize, const TX_SZ: usize = 1024, const RX_SZ: usize = 1024> { stack: &'d Stack, - tx: &'d BufferPool, - rx: &'d BufferPool, + state: &'d TcpClientState, } impl<'d, D: Device, const N: usize, const TX_SZ: usize, const RX_SZ: usize> TcpClient<'d, D, N, TX_SZ, RX_SZ> { - pub fn new(stack: &'d Stack, tx: &'d BufferPool, rx: &'d BufferPool) -> Self { - Self { stack, tx, rx } + /// Create a new TcpClient + pub fn new(stack: &'d Stack, state: &'d TcpClientState) -> Self { + Self { stack, state } } } @@ -370,7 +371,7 @@ pub mod client { IpAddr::V6(_) => panic!("ipv6 support not enabled"), }; let remote_endpoint = (addr, remote.port()); - let mut socket = TcpConnection::new(&self.stack, self.tx, self.rx)?; + let mut socket = TcpConnection::new(&self.stack, self.state)?; socket .socket .connect(remote_endpoint) @@ -383,26 +384,20 @@ pub mod client { pub struct TcpConnection<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> { socket: TcpSocket<'d>, - tx: &'d BufferPool, - rx: &'d BufferPool, - txb: NonNull<[u8; TX_SZ]>, - rxb: NonNull<[u8; RX_SZ]>, + state: &'d TcpClientState, + bufs: NonNull<([u8; TX_SZ], [u8; RX_SZ])>, } impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> TcpConnection<'d, N, TX_SZ, RX_SZ> { fn new( stack: &'d Stack, - tx: &'d BufferPool, - rx: &'d BufferPool, + state: &'d TcpClientState, ) -> Result { - let mut txb = tx.alloc().ok_or(Error::ConnectionReset)?; - let mut rxb = rx.alloc().ok_or(Error::ConnectionReset)?; + let mut bufs = state.pool.alloc().ok_or(Error::ConnectionReset)?; Ok(Self { - socket: unsafe { TcpSocket::new(stack, rxb.as_mut(), txb.as_mut()) }, - tx, - rx, - txb, - rxb, + socket: unsafe { TcpSocket::new(stack, &mut bufs.as_mut().0, &mut bufs.as_mut().1) }, + state, + bufs, }) } } @@ -411,8 +406,7 @@ pub mod client { fn drop(&mut self) { unsafe { self.socket.close(); - self.rx.free(self.rxb); - self.tx.free(self.txb); + self.state.pool.free(self.bufs); } } } @@ -455,9 +449,22 @@ pub mod client { } } - pub type BufferPool = Pool<[u8; BUFSZ], N>; + /// State for TcpClient + pub struct TcpClientState { + pool: Pool<([u8; TX_SZ], [u8; RX_SZ]), N>, + } - pub struct Pool { + impl TcpClientState { + pub const fn new() -> Self { + Self { + pool: Pool::new() + } + } + } + + unsafe impl Sync for TcpClientState {} + + struct Pool { used: [AtomicBool; N], data: [UnsafeCell>; N], } @@ -466,7 +473,7 @@ pub mod client { const VALUE: AtomicBool = AtomicBool::new(false); const UNINIT: UnsafeCell> = UnsafeCell::new(MaybeUninit::uninit()); - pub const fn new() -> Self { + const fn new() -> Self { Self { used: [Self::VALUE; N], data: [Self::UNINIT; N], diff --git a/examples/stm32h7/Cargo.toml b/examples/stm32h7/Cargo.toml index 07b7e493..89604675 100644 --- a/examples/stm32h7/Cargo.toml +++ b/examples/stm32h7/Cargo.toml @@ -18,6 +18,7 @@ cortex-m-rt = "0.7.0" embedded-hal = "0.2.6" embedded-hal-1 = { package = "embedded-hal", version = "1.0.0-alpha.8" } embedded-hal-async = { version = "0.1.0-alpha.1" } +embedded-nal-async = "0.2.0" panic-probe = { version = "0.3", features = ["print-defmt"] } futures = { version = "0.3.17", default-features = false, features = ["async-await"] } heapless = { version = "0.7.5", default-features = false } From 2e76b13a4c5fcf46b3a9fc58ceef91e9317c12b7 Mon Sep 17 00:00:00 2001 From: Ulf Lilleengen Date: Tue, 9 Aug 2022 14:44:18 +0200 Subject: [PATCH 3/4] Add example using embedded-nal-async traits --- examples/stm32h7/src/bin/eth_client.rs | 125 +++++++++++++++++++++++++ 1 file changed, 125 insertions(+) create mode 100644 examples/stm32h7/src/bin/eth_client.rs diff --git a/examples/stm32h7/src/bin/eth_client.rs b/examples/stm32h7/src/bin/eth_client.rs new file mode 100644 index 00000000..a66c6f19 --- /dev/null +++ b/examples/stm32h7/src/bin/eth_client.rs @@ -0,0 +1,125 @@ +#![no_std] +#![no_main] +#![feature(type_alias_impl_trait)] + +use defmt::*; +use embassy_executor::executor::Spawner; +use embassy_executor::time::{Duration, Timer}; +use embassy_net::tcp::client::{TcpClient, TcpClientState}; +use embassy_net::{Stack, StackResources}; +use embassy_stm32::eth::generic_smi::GenericSMI; +use embassy_stm32::eth::{Ethernet, State}; +use embassy_stm32::peripherals::ETH; +use embassy_stm32::rng::Rng; +use embassy_stm32::time::mhz; +use embassy_stm32::{interrupt, Config, Peripherals}; +use embassy_util::Forever; +use embedded_io::asynch::Write; +use embedded_nal_async::{Ipv4Addr, SocketAddr, SocketAddrV4, TcpConnect}; +use rand_core::RngCore; +use {defmt_rtt as _, panic_probe as _}; + +macro_rules! forever { + ($val:expr) => {{ + type T = impl Sized; + static FOREVER: Forever = Forever::new(); + FOREVER.put_with(move || $val) + }}; +} + +type Device = Ethernet<'static, ETH, GenericSMI, 4, 4>; + +#[embassy_executor::task] +async fn net_task(stack: &'static Stack) -> ! { + stack.run().await +} + +pub fn config() -> Config { + let mut config = Config::default(); + config.rcc.sys_ck = Some(mhz(400)); + config.rcc.hclk = Some(mhz(200)); + config.rcc.pll1.q_ck = Some(mhz(100)); + config +} + +#[embassy_executor::main(config = "config()")] +async fn main(spawner: Spawner, p: Peripherals) -> ! { + info!("Hello World!"); + + // Generate random seed. + let mut rng = Rng::new(p.RNG); + let mut seed = [0; 8]; + rng.fill_bytes(&mut seed); + let seed = u64::from_le_bytes(seed); + + let eth_int = interrupt::take!(ETH); + let mac_addr = [0x00, 0x00, 0xDE, 0xAD, 0xBE, 0xEF]; + + let device = unsafe { + Ethernet::new( + forever!(State::new()), + p.ETH, + eth_int, + p.PA1, + p.PA2, + p.PC1, + p.PA7, + p.PC4, + p.PC5, + p.PG13, + p.PB13, + p.PG11, + GenericSMI, + mac_addr, + 0, + ) + }; + + let config = embassy_net::ConfigStrategy::Dhcp; + //let config = embassy_net::ConfigStrategy::Static(embassy_net::Config { + // address: Ipv4Cidr::new(Ipv4Address::new(10, 42, 0, 61), 24), + // dns_servers: Vec::new(), + // gateway: Some(Ipv4Address::new(10, 42, 0, 1)), + //}); + + // Init network stack + let stack = &*forever!(Stack::new( + device, + config, + forever!(StackResources::<1, 2, 8>::new()), + seed + )); + + // Launch network task + unwrap!(spawner.spawn(net_task(&stack))); + + info!("Network task initialized"); + + // To ensure DHCP configuration before trying connect + Timer::after(Duration::from_secs(20)).await; + + static STATE: TcpClientState<1, 1024, 1024> = TcpClientState::new(); + let client = TcpClient::new(&stack, &STATE); + + loop { + let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(10, 42, 0, 1), 8000)); + + info!("connecting..."); + let r = client.connect(addr).await; + if let Err(e) = r { + info!("connect error: {:?}", e); + Timer::after(Duration::from_secs(1)).await; + continue; + } + let mut connection = r.unwrap(); + info!("connected!"); + loop { + let r = connection.write_all(b"Hello\n").await; + if let Err(e) = r { + info!("write error: {:?}", e); + return; + } + Timer::after(Duration::from_secs(1)).await; + } + } +} From 87401c49b71eb22a4f1a9ce4b318ebd20ea38000 Mon Sep 17 00:00:00 2001 From: Ulf Lilleengen Date: Tue, 9 Aug 2022 14:51:32 +0200 Subject: [PATCH 4/4] Fix formatting --- embassy-net/src/tcp.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/embassy-net/src/tcp.rs b/embassy-net/src/tcp.rs index 814e7ab6..2e276ecd 100644 --- a/embassy-net/src/tcp.rs +++ b/embassy-net/src/tcp.rs @@ -389,10 +389,7 @@ pub mod client { } impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> TcpConnection<'d, N, TX_SZ, RX_SZ> { - fn new( - stack: &'d Stack, - state: &'d TcpClientState, - ) -> Result { + fn new(stack: &'d Stack, state: &'d TcpClientState) -> Result { let mut bufs = state.pool.alloc().ok_or(Error::ConnectionReset)?; Ok(Self { socket: unsafe { TcpSocket::new(stack, &mut bufs.as_mut().0, &mut bufs.as_mut().1) }, @@ -456,9 +453,7 @@ pub mod client { impl TcpClientState { pub const fn new() -> Self { - Self { - pool: Pool::new() - } + Self { pool: Pool::new() } } }