Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streaming #150

Closed
jordens opened this issue Oct 13, 2020 · 3 comments
Closed

streaming #150

jordens opened this issue Oct 13, 2020 · 3 comments
Assignees
Labels
enhancement New feature or request

Comments

@jordens
Copy link
Member

jordens commented Oct 13, 2020

  • Minimum overhead, timestamped raw binary sample burst packets
  • Containing a configurable subset of DAC/ADC/DDS high bandwidth data or anything from the signal crossbar
  • Maybe UDP
  • Maybe target controlled and configured via MQTT
  • Best effort task, should not harm the main processing task performance.

forked from #54
details in #99 and #147

@jordens jordens added the enhancement New feature or request label Oct 13, 2020
@jordens jordens mentioned this issue Nov 30, 2020
@dnadlinger
Copy link
Contributor

dnadlinger commented Feb 17, 2021

Some notes on a MVP streaming implementation (which I may or may not get around to actually doing):

  • Addressed use cases:
    • Interactive tuning of feedback parameters/inspection of lock quality (stream error signal/IIR output, display in UI, or acquire several seconds and compute FFT, …)
    • Bespoke one-off scripts running on a PC for prototyping/handling more complex use cases, e.g. automatic lock acquisition for a few-Hz laser system where Stabilizer is used for auxiliary loops
  • (Soft) requirements/desiderata:
    • General focus: Best-effort, but with a design focus on lowest realistic overhead
    • Desirable throughput: One, ideally two 16 bit channels at full rate (~25 Mbit/s).
    • Single client support is enough; can always direct to a pubsub server of some description in the future
    • Seamless streaming of samples over many seconds should be supported (e.g. for tracking loop performance down to low frequencies)
  • Design proposal:
    • UDP, single target host/port configured via MQTT interface.
    • Regular samples every N processing cycles from a selected subset of channels (ADC/DAC streams, internal lockin/… signals, …), configured via MQTT interface.
    • Each UDP message is a "streaming frame", consisting of a fixed-size header and a configurable-length payload.
      • Payload: Row-major i16 array of samples (make generic in the future?), where rows are time and columns are different channels
      • Header: Channels selected, sample divider used, samples per frame, global timestamp (e.g. ticks since Stabilizer boot) of first sample.
    • Firmware keeps a ring buffer of frames
      • DSP task: If not currently filling a frame, pull one from the ring buffer (try next iteration if none available), fill in current timestamp and used settings. If space in frame, copy in requested data samples. If frame full (or streaming stopped (?)), push back into ring buffer.
      • Idle task: Pull out any completed frames, push onto wire (ideally zero-copy, though I don't know whether that's feasible in current smoltcp), return to queue.
      • No atomics necessary on either side, but barriers/volatile for interrupt safety.
    • Changing streaming settings requires stop and restart (possibly missing some samples) for simplicity.
  • Comments:
    • Global timestamps allow clients to detect whether any data was dropped due to throughput limitations/network hiccups/…
    • For applications that use batching, should time granularity be individual samples or batches?

@jordens
Copy link
Member Author

jordens commented Feb 18, 2021

Yeah. With that I was able to stream timestamps plus four channels at ~50 Mb/s over TCP with room to spare. I let the queue/frame granularity be the data of a single process invocation, i.e. batch. And the packaging was done by TCP. With UDP you'd want to concatenate a few.
I'm uncertain how much of this can and should be made configurable at runtime (other than stream target and on/off) and whether all that configuration can be represented in the header. You certainly want to get a full description of the payload every now and then.

Old rotten patch
diff --git a/Cargo.toml b/Cargo.toml
index 8626796..ccbd7c7 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -33,6 +33,7 @@ cortex-m-log = { version = "0.5", features = ["log-integration"] }
 log = "0.4"
 panic-abort = "0.3"
 panic-semihosting = { version = "0.5.2", optional = true }
+heapless = { version = "0.4", features = ["const-fn"] }

 [dependencies.stm32h7]
 path = "../stm32-rs/stm32h7"
diff --git a/src/main.rs b/src/main.rs
index 8ffc57f..9508bda 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -14,11 +14,11 @@ extern crate log;

 use core::ptr;
 use core::cell::RefCell;
-use core::fmt::Write;
 use core::sync::atomic::{AtomicU32, AtomicBool, Ordering};
 use cortex_m_rt::{entry, exception};
 use stm32h7::stm32h7x3::{self as stm32, Peripherals, CorePeripherals, interrupt};
 use cortex_m::interrupt::Mutex;
+use heapless::{consts::*, spsc::Queue};

 use smoltcp as net;

@@ -514,6 +514,7 @@ macro_rules! create_socket {
     )
 }

+static mut IIRLOG: Queue<[i16; 5], U512> = Queue::new();

 #[entry]
 fn main() -> ! {
@@ -648,11 +649,23 @@ fn main() -> ! {
                 socket.listen(80).unwrap_or_else(|e| warn!("TCP listen error: {:?}", e));
             } else if last != time && socket.can_send() {
                 last = time;
-                let (x0, y0, x1, y1) = unsafe {
-                    (IIR_STATE[0][0], IIR_STATE[0][2], IIR_STATE[1][0], IIR_STATE[1][2]) };
-                writeln!(socket, "t={} x0={:.1} y0={:.1} x1={:.1} y1={:.1}",
-                         time, x0, y0, x1, y1)
-                    .unwrap_or_else(|e| warn!("TCP send error: {:?}", e));
+                let mut c = unsafe { IIRLOG.split().1 };
+                socket.send(|buf| {
+                    let len = buf.len();
+                    let mut i = 0;
+                    while i + 2*4 < len {
+                        match c.dequeue() {
+                            Some(d) => for v in d.iter() {
+                                let v = v.to_le_bytes();
+                                let l = v.len();
+                                buf[i..i + l].copy_from_slice(&v);
+                                i += l;
+                            },
+                            None => break
+                        }
+                    }
+                    (i, ())
+                }).ok().unwrap();
             }
         }
         if !match iface.poll(&mut sockets, net::time::Instant::from_millis(time as i64)) {
@@ -708,6 +721,11 @@ fn SPI1() {
             let txdr = &spi4.txdr as *const _ as *mut u16;
             unsafe { ptr::write_volatile(txdr, d) };
         }
+        let time = TIME.load(Ordering::Relaxed);
+        let mut p = unsafe { IIRLOG.split().0 };
+        p.enqueue(unsafe {
+            [time as i16, IIR_STATE[0][0] as i16, IIR_STATE[0][2] as i16,
+                IIR_STATE[1][0] as i16, IIR_STATE[1][2] as i16] }).ok();
     });
     #[cfg(feature = "bkpt")]
     cortex_m::asm::bkpt();

@jordens
Copy link
Member Author

jordens commented Jun 24, 2021

Closed by #380
Follow-on improvements in #385 #386 etc

@jordens jordens closed this as completed Jun 24, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants