Skip to content

Commit

Permalink
Break submission up into chunks for sending.
Browse files Browse the repository at this point in the history
  • Loading branch information
thebracket committed Jan 13, 2025
1 parent d429ce1 commit a7af216
Showing 1 changed file with 22 additions and 6 deletions.
28 changes: 22 additions & 6 deletions src/rust/lqosd/src/lts2_sys/lts2_client/ingestor/message_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl MessageQueue {
}
}

// Submit the actual data
// Build the submission packet
let mut message = IngestSession {
license_key: license_uuid,
node_id: node_id.clone(),
Expand All @@ -234,13 +234,29 @@ impl MessageQueue {
site_retransmits::add_site_retransmits(&mut message, &mut self.site_retransmits);
site_rtt::add_site_rtt(&mut message, &mut self.site_rtt);
site_throughput::add_site_throughput(&mut message, &mut self.site_throughput);
let Ok((_, _, data_to_send)) = (WsMessage::DataDump { data: message }).to_bytes() else {

// Build the submission blob
let Ok(raw_bytes) = serde_cbor::to_vec(&message) else {
warn!("Failed to serialize data message");
return Ok(());
};
if let Err(e) = socket.send(tungstenite::Message::Binary(data_to_send)) {
warn!("Failed to send data message to server: {}", e);
return Ok(());
let compressed_bytes = miniz_oxide::deflate::compress_to_vec(&raw_bytes, 10);

// Divide into chunks. Chunk size is 60k
const CHUNK_SIZE: usize = 60 * 1024;
let message_chunks = compressed_bytes.chunks(CHUNK_SIZE);
let n_chunks = message_chunks.len();

// Submit the chunks
for (i,chunk) in message_chunks.into_iter().enumerate() {
let Ok((_, _, data_to_send)) = (WsMessage::DataDump { chunk: i+1, n_chunks, data: chunk.to_vec() }).to_bytes() else {
warn!("Failed to serialize data message");
return Ok(());
};
if let Err(e) = socket.send(tungstenite::Message::Binary(data_to_send)) {
warn!("Failed to send data message to server: {}", e);
return Ok(());
}
}

// Remote Commands
Expand Down Expand Up @@ -296,7 +312,7 @@ enum WsMessage {
// Request messages
Hello { magic: u32 },
License { license: Uuid },
DataDump { data: IngestSession },
DataDump { chunk: usize, n_chunks: usize, data: Vec<u8> },
RequestRemoteCommands,

// Response messages
Expand Down

0 comments on commit a7af216

Please sign in to comment.