Skip to content

Commit

Permalink
Merge pull request #911 from quartiq/issue/887/stream-target-in-flash
Browse files Browse the repository at this point in the history
Updating data stream management
  • Loading branch information
jordens authored Jun 22, 2024
2 parents 3ef00cb + 99c3207 commit f39a40f
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 52 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ console.
* MSRV removed. Stabilizer uses latest stable rust.
* The IIR (biquad) filter used for PID action has changed its serialization format.
See also the `iir_coefficients` Python CLI implementation.
* The stream target is now configures as a `1.2.3.4:4321` string

### Fixed
* Fixed an issue where the device would sometimes not enumerate on Windows
Expand Down
28 changes: 26 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ smoltcp-nal = { version = "0.5", features = ["shared-stack"] }
postcard = "1"
bit_field = "0.10.2"
embassy-futures = { version = "0.1", default-features = false }
serde_with = { version = "3.8", default-features = false, features = ["macros"] }

[build-dependencies]
built = { version = "0.7", features = ["git2"], default-features = false }
Expand Down
10 changes: 3 additions & 7 deletions hitl/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,12 @@ async def _main():

conf = await miniconf.Miniconf.create(args.broker, prefix)

stream_target = [int(x) for x in args.ip.split('.')]
if ipaddress.ip_address(args.ip).is_unspecified:
stream_target = get_local_ip(args.broker)
args.ip = get_local_ip(args.broker)

logger.info("Starting stream")
await conf.set(
"/stream_target", {
"ip": stream_target,
"port": args.port
}, retain=False)
"/stream_target", f"{args.ip}:{args.port}", retain=False)

try:
logger.info("Testing stream reception")
Expand All @@ -64,7 +60,7 @@ async def _main():
finally:
logger.info("Stopping stream")
await conf.set(
"/stream_target", {"ip": [0, 0, 0, 0], "port": 0}, retain=False)
"/stream_target", "0.0.0.0:0", retain=False)

logger.info("Draining queue")
await asyncio.sleep(.1)
Expand Down
3 changes: 1 addition & 2 deletions py/stabilizer/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@ def get_local_ip(remote):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
sock.connect((remote, 1883))
address = sock.getsockname()[0]
return sock.getsockname()[0]
finally:
sock.close()
return list(map(int, address.split(".")))


class AdcDac:
Expand Down
5 changes: 3 additions & 2 deletions src/bin/dual-iir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,8 +483,9 @@ mod app {
}
}

let target = settings.dual_iir.stream_target.into();
c.shared.network.lock(|net| net.direct_stream(target));
c.shared
.network
.lock(|net| net.direct_stream(settings.dual_iir.stream_target));

c.shared
.active_settings
Expand Down
5 changes: 3 additions & 2 deletions src/bin/lockin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,9 @@ mod app {
c.local.afes.0.set_gain(settings.lockin.afe[0]);
c.local.afes.1.set_gain(settings.lockin.afe[1]);

let target = settings.lockin.stream_target.into();
c.shared.network.lock(|net| net.direct_stream(target));
c.shared
.network
.lock(|net| net.direct_stream(settings.lockin.stream_target));

c.shared
.active_settings
Expand Down
108 changes: 74 additions & 34 deletions src/net/data_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@
//! # Example
//! A sample Python script is available in `scripts/stream_throughput.py` to demonstrate reception
//! of livestreamed data.
use core::mem::MaybeUninit;
use core::{fmt::Write, mem::MaybeUninit};
use heapless::{
pool::{Box, Init, Pool, Uninit},
spsc::{Consumer, Producer, Queue},
String,
};
use num_enum::IntoPrimitive;
use serde::{Deserialize, Serialize};
use smoltcp_nal::embedded_nal::{IpAddr, Ipv4Addr, SocketAddr, UdpClientStack};
use serde::Serialize;
use serde_with::DeserializeFromStr;
use smoltcp_nal::embedded_nal::{nb, SocketAddr, UdpClientStack};

use super::NetworkReference;

Expand Down Expand Up @@ -58,17 +60,47 @@ type Frame = [MaybeUninit<u8>; FRAME_SIZE];
/// Represents the destination for the UDP stream to send data to.
///
/// # Miniconf
/// `{"ip": <addr>, "port": <port>}`
/// `<addr>:<port>`
///
/// * `<addr>` is an array of 4 bytes. E.g. `[192, 168, 0, 1]`
/// * `<addr>` is an IPv4 address. E.g. `192.168.0.1`
/// * `<port>` is any unsigned 16-bit value.
///
/// ## Example
/// `{"ip": [192, 168,0, 1], "port": 1111}`
#[derive(Copy, Clone, Debug, Serialize, Deserialize, Default)]
pub struct StreamTarget {
pub ip: [u8; 4],
pub port: u16,
/// `192.168.0.1:1234`
#[derive(Copy, Clone, Debug, DeserializeFromStr, PartialEq, Eq)]
pub struct StreamTarget(pub SocketAddr);

impl Default for StreamTarget {
fn default() -> Self {
Self("0.0.0.0:0".parse().unwrap())
}
}

impl Serialize for StreamTarget {
fn serialize<S: serde::Serializer>(
&self,
serializer: S,
) -> Result<S::Ok, S::Error> {
let mut display: String<30> = String::new();
write!(&mut display, "{}", self.0).unwrap();
serializer.serialize_str(&display)
}
}

impl core::str::FromStr for StreamTarget {
type Err = &'static str;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let addr = SocketAddr::from_str(s)
.map_err(|_| "Invalid socket address format")?;
Ok(Self(addr))
}
}

impl core::fmt::Display for StreamTarget {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
self.0.fmt(f)
}
}

/// Specifies the format of streamed data
Expand All @@ -92,20 +124,6 @@ pub enum StreamFormat {
Fls = 2,
}

impl From<StreamTarget> for SocketAddr {
fn from(target: StreamTarget) -> SocketAddr {
SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(
target.ip[0],
target.ip[1],
target.ip[2],
target.ip[3],
)),
target.port,
)
}
}

/// Configure streaming on a device.
///
/// # Args
Expand Down Expand Up @@ -273,7 +291,7 @@ pub struct DataStream {
socket: Option<<NetworkReference as UdpClientStack>::UdpSocket>,
queue: Consumer<'static, StreamFrame, FRAME_QUEUE_SIZE>,
frame_pool: &'static Pool<Frame>,
remote: SocketAddr,
remote: StreamTarget,
}

impl DataStream {
Expand All @@ -291,7 +309,7 @@ impl DataStream {
Self {
stack,
socket: None,
remote: StreamTarget::default().into(),
remote: StreamTarget::default(),
queue: consumer,
frame_pool,
}
Expand All @@ -309,28 +327,30 @@ impl DataStream {
fn open(&mut self) -> Result<(), ()> {
// If there is already a socket of if remote address is unspecified,
// do not open a new socket.
if self.socket.is_some() || self.remote.ip().is_unspecified() {
if self.socket.is_some() || self.remote.0.ip().is_unspecified() {
return Err(());
}

log::info!("Opening stream");

let mut socket = self.stack.socket().or(Err(()))?;

// Note(unwrap): We only connect with a new socket, so it is guaranteed to not already be
// bound.
self.stack.connect(&mut socket, self.remote).unwrap();
// We may fail to connect if we don't have an IP address yet.
if self.stack.connect(&mut socket, self.remote.0).is_err() {
self.stack.close(socket).unwrap();
return Err(());
}

self.socket.replace(socket);

log::info!("Opening stream");

Ok(())
}

/// Configure the remote endpoint of the stream.
///
/// # Args
/// * `remote` - The destination to send stream data to.
pub fn set_remote(&mut self, remote: SocketAddr) {
pub fn set_remote(&mut self, remote: StreamTarget) {
// Close socket to be reopened if the remote has changed.
if remote != self.remote {
self.close();
Expand Down Expand Up @@ -360,7 +380,27 @@ impl DataStream {
core::mem::size_of_val(buf),
)
};
self.stack.send(handle, data).ok();

// If we fail to send, it can only be because the socket got closed on us (i.e.
// address update due to DHCP). If this happens, reopen the socket.
match self.stack.send(handle, data) {
Ok(_) => {},

// Our IP address may have changedm so handle reopening the UDP stream.
Err(nb::Error::Other(smoltcp_nal::NetworkError::UdpWriteFailure(smoltcp_nal::smoltcp::socket::udp::SendError::Unaddressable))) => {
log::warn!( "IP address updated during stream. Reopening socket");
let socket = self.socket.take().unwrap();
self.stack.close(socket).unwrap();
}

// The buffer should clear up once ICMP resolves the IP address, so ignore
// this error.
Err(nb::Error::Other(smoltcp_nal::NetworkError::UdpWriteFailure(smoltcp_nal::smoltcp::socket::udp::SendError::BufferFull))) => {}

Err(other) => {
log::warn!("Unexpected UDP error during data stream: {other:?}");
}
}
self.frame_pool.free(frame.buffer)
}
}
Expand Down
5 changes: 2 additions & 3 deletions src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@ use crate::hardware::{
SystemTimer,
};
use crate::settings::NetSettings;
use data_stream::{DataStream, FrameGenerator};
use data_stream::{DataStream, FrameGenerator, StreamTarget};
use network_processor::NetworkProcessor;
use telemetry::TelemetryClient;

use core::fmt::Write;
use heapless::String;
use miniconf::JsonCoreSlash;
use miniconf_mqtt::minimq;
use smoltcp_nal::embedded_nal::SocketAddr;

pub type NetworkReference =
smoltcp_nal::shared::NetworkStackProxy<'static, NetworkStack>;
Expand Down Expand Up @@ -174,7 +173,7 @@ where
///
/// # Args
/// * `remote` - The destination for the streamed data.
pub fn direct_stream(&mut self, remote: SocketAddr) {
pub fn direct_stream(&mut self, remote: StreamTarget) {
if self.generator.is_none() {
self.stream.set_remote(remote);
}
Expand Down

0 comments on commit f39a40f

Please sign in to comment.