Skip to content

Commit

Permalink
Reworked the NetFlow code to batch packets into single submissions of…
Browse files Browse the repository at this point in the history
… up to 30 packets at a time.
  • Loading branch information
thebracket committed Mar 7, 2024
1 parent 04b0cd4 commit b7d4356
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 56 deletions.
32 changes: 20 additions & 12 deletions src/rust/lqosd/src/throughput_tracker/flow_data/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
//! Provides tracking and data-services for per-flow data. Includes implementations
//! of netflow protocols.
mod flow_tracker;
mod netflow5;
mod netflow9;
mod flow_tracker;

use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey};
use std::sync::mpsc::{channel, Sender};
pub(crate) use flow_tracker::ALL_FLOWS;
use crate::throughput_tracker::flow_data::{netflow5::Netflow5, netflow9::Netflow9};
pub(crate) use flow_tracker::ALL_FLOWS;
use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey};
use std::sync::{
mpsc::{channel, Sender},
Arc,
};

trait FlowbeeRecipient {
fn send(&mut self, key: FlowbeeKey, data: FlowbeeData);
fn enqueue(&self, key: FlowbeeKey, data: FlowbeeData);
}

// Creates the netflow tracker and returns the sender
Expand All @@ -23,32 +26,37 @@ pub fn setup_netflow_tracker() -> Sender<(FlowbeeKey, FlowbeeData)> {
log::info!("Starting the network flow tracker back-end");

// Build the endpoints list
let mut endpoints: Vec<Box<dyn FlowbeeRecipient>> = Vec::new();
let mut endpoints: Vec<Arc<dyn FlowbeeRecipient>> = Vec::new();
if let Some(flow_config) = config.flows {
if let (Some(ip), Some(port), Some(version)) = (flow_config.netflow_ip, flow_config.netflow_port, flow_config.netflow_version)
{
if let (Some(ip), Some(port), Some(version)) = (
flow_config.netflow_ip,
flow_config.netflow_port,
flow_config.netflow_version,
) {
log::info!("Setting up netflow target: {ip}:{port}, version: {version}");
let target = format!("{ip}:{port}", ip = ip, port = port);
match version {
5 => {
let endpoint = Netflow5::new(target).unwrap();
endpoints.push(Box::new(endpoint));
endpoints.push(endpoint);
log::info!("Netflow 5 endpoint added");
}
9 => {
let endpoint = Netflow9::new(target).unwrap();
endpoints.push(Box::new(endpoint));
endpoints.push(endpoint);
log::info!("Netflow 9 endpoint added");
}
_ => log::error!("Unsupported netflow version: {version}"),
}
}

}

// Send to all endpoints upon receipt
while let Ok((key, value)) = rx.recv() {
endpoints.iter_mut().for_each(|f| f.send(key.clone(), value.clone()));
endpoints.iter_mut().for_each(|f| {
log::debug!("Enqueueing flow data for {key:?}");
f.enqueue(key.clone(), value.clone());
});
}
log::info!("Network flow tracker back-end has stopped")
});
Expand Down
91 changes: 71 additions & 20 deletions src/rust/lqosd/src/throughput_tracker/flow_data/netflow5/mod.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,90 @@
//! Support for the Netflow 5 protocol
//! Mostly taken from: https://netflow.caligare.com/netflow_v5.htm
mod protocol;
use std::net::UdpSocket;
use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey};
use super::FlowbeeRecipient;
use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey};
pub(crate) use protocol::*;
use std::{
net::UdpSocket,
sync::{atomic::AtomicU32, Arc, Mutex},
};

pub(crate) struct Netflow5 {
socket: UdpSocket,
sequence: u32,
sequence: AtomicU32,
target: String,
send_queue: Mutex<Vec<(FlowbeeKey, FlowbeeData)>>,
}

impl Netflow5 {
pub(crate) fn new(target: String) -> anyhow::Result<Self> {
pub(crate) fn new(target: String) -> anyhow::Result<Arc<Self>> {
let socket = UdpSocket::bind("0.0.0.0:12212")?;
Ok(Self { socket, sequence: 0, target })
let result = Arc::new(Self {
socket,
sequence: AtomicU32::new(0),
target,
send_queue: Mutex::new(Vec::new()),
});
let thread_result = result.clone();
std::thread::spawn(move || thread_result.queue_handler());
Ok(result)
}
}

impl FlowbeeRecipient for Netflow5 {
fn send(&mut self, key: FlowbeeKey, data: FlowbeeData) {
if let Ok((packet1, packet2)) = to_netflow_5(&key, &data) {
let header = Netflow5Header::new(self.sequence);
let header_bytes = unsafe { std::slice::from_raw_parts(&header as *const _ as *const u8, std::mem::size_of::<Netflow5Header>()) };
let packet1_bytes = unsafe { std::slice::from_raw_parts(&packet1 as *const _ as *const u8, std::mem::size_of::<Netflow5Record>()) };
let packet2_bytes = unsafe { std::slice::from_raw_parts(&packet2 as *const _ as *const u8, std::mem::size_of::<Netflow5Record>()) };
let mut buffer = Vec::with_capacity(header_bytes.len() + packet1_bytes.len() + packet2_bytes.len());
buffer.extend_from_slice(header_bytes);
buffer.extend_from_slice(packet1_bytes);
buffer.extend_from_slice(packet2_bytes);
fn queue_handler(&self) {
loop {
let mut lock = self.send_queue.lock().unwrap();
if lock.is_empty() {
std::thread::sleep(std::time::Duration::from_millis(100));
continue;
}

//log::debug!("Sending netflow packet to {target}", target = self.target);
self.socket.send_to(&buffer, &self.target).unwrap();
let send_chunks = lock.chunks(15);
for to_send in send_chunks {
let num_records = (to_send.len() * 2) as u16;
let sequence = self.sequence.load(std::sync::atomic::Ordering::Relaxed);
let header = Netflow5Header::new(sequence, num_records);
let header_bytes = unsafe {
std::slice::from_raw_parts(
&header as *const _ as *const u8,
std::mem::size_of::<Netflow5Header>(),
)
};

let mut buffer = Vec::with_capacity(
header_bytes.len() + to_send.len() * 2 * std::mem::size_of::<Netflow5Record>(),
);

buffer.extend_from_slice(header_bytes);
for (key, data) in to_send {
if let Ok((packet1, packet2)) = to_netflow_5(key, data) {
let packet1_bytes = unsafe {
std::slice::from_raw_parts(
&packet1 as *const _ as *const u8,
std::mem::size_of::<Netflow5Record>(),
)
};
let packet2_bytes = unsafe {
std::slice::from_raw_parts(
&packet2 as *const _ as *const u8,
std::mem::size_of::<Netflow5Record>(),
)
};
buffer.extend_from_slice(packet1_bytes);
buffer.extend_from_slice(packet2_bytes);
}
}

self.sequence = self.sequence.wrapping_add(2);
self.socket.send_to(&buffer, &self.target).unwrap();
self.sequence.fetch_add(num_records as u32, std::sync::atomic::Ordering::Relaxed);
}
lock.clear();
}
}
}

impl FlowbeeRecipient for Netflow5 {
fn enqueue(&self, key: FlowbeeKey, data: FlowbeeData) {
let mut lock = self.send_queue.lock().unwrap();
lock.push((key, data));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ pub(crate) struct Netflow5Header {

impl Netflow5Header {
/// Create a new Netflow 5 header
pub(crate) fn new(flow_sequence: u32) -> Self {
pub(crate) fn new(flow_sequence: u32, num_records: u16) -> Self {
let uptime = time_since_boot().unwrap();

Self {
version: (5u16).to_be(),
count: (2u16).to_be(),
count: num_records.to_be(),
sys_uptime: (uptime.num_milliseconds() as u32).to_be(),
unix_secs: (uptime.num_seconds() as u32).to_be(),
unix_nsecs: 0,
Expand Down
77 changes: 55 additions & 22 deletions src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/mod.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,73 @@
use std::net::UdpSocket;
use crate::throughput_tracker::flow_data::netflow9::protocol::{
header::Netflow9Header, template_ipv4::template_data_ipv4, template_ipv6::template_data_ipv6,
};
use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey};
use crate::throughput_tracker::flow_data::netflow9::protocol::{header::Netflow9Header, template_ipv4::template_data_ipv4, template_ipv6::template_data_ipv6};
use std::{net::UdpSocket, sync::{atomic::AtomicU32, Arc, Mutex}};

use self::protocol::to_netflow_9;
use super::FlowbeeRecipient;
mod protocol;

pub(crate) struct Netflow9 {
socket: UdpSocket,
sequence: u32,
sequence: AtomicU32,
target: String,
send_queue: Mutex<Vec<(FlowbeeKey, FlowbeeData)>>,
}

impl Netflow9 {
pub(crate) fn new(target: String) -> anyhow::Result<Self> {
pub(crate) fn new(target: String) -> anyhow::Result<Arc<Self>> {
let socket = UdpSocket::bind("0.0.0.0:12212")?;
Ok(Self { socket, sequence: 0, target })
let result = Arc::new(Self {
socket,
sequence: AtomicU32::new(0),
target,
send_queue: Mutex::new(Vec::new()),
});
let thread_result = result.clone();
std::thread::spawn(move || thread_result.queue_handler());
Ok(result)
}

fn queue_handler(&self) {
loop {
let mut lock = self.send_queue.lock().unwrap();
if lock.is_empty() {
std::thread::sleep(std::time::Duration::from_millis(100));
continue;
}

let send_chunks = lock.chunks(14);
for to_send in send_chunks {
let num_records = (to_send.len() * 2) as u16 + 2; // +2 to include templates
let sequence = self.sequence.load(std::sync::atomic::Ordering::Relaxed);
let header = Netflow9Header::new(sequence, num_records);
let header_bytes = unsafe { std::slice::from_raw_parts(&header as *const _ as *const u8, std::mem::size_of::<Netflow9Header>()) };
let template1 = template_data_ipv4();
let template2 = template_data_ipv6();
let mut buffer = Vec::with_capacity(header_bytes.len() + template1.len() + template2.len() + (num_records as usize) * 140);
buffer.extend_from_slice(header_bytes);
buffer.extend_from_slice(&template1);
buffer.extend_from_slice(&template2);

for (key, data) in to_send {
if let Ok((packet1, packet2)) = to_netflow_9(key, data) {
buffer.extend_from_slice(&packet1);
buffer.extend_from_slice(&packet2);
}
}
self.socket.send_to(&buffer, &self.target).unwrap();
self.sequence.fetch_add(num_records as u32, std::sync::atomic::Ordering::Relaxed);
}
lock.clear();
}

}
}

impl FlowbeeRecipient for Netflow9 {
fn send(&mut self, key: FlowbeeKey, data: FlowbeeData) {
if let Ok((packet1, packet2)) = to_netflow_9(&key, &data) {
let header = Netflow9Header::new(self.sequence, 4);
let header_bytes = unsafe { std::slice::from_raw_parts(&header as *const _ as *const u8, std::mem::size_of::<Netflow9Header>()) };
let mut buffer = Vec::with_capacity(header_bytes.len() + packet1.len() + packet2.len());
buffer.extend_from_slice(header_bytes);
buffer.extend_from_slice(&template_data_ipv4());
buffer.extend_from_slice(&template_data_ipv6());
buffer.extend_from_slice(&packet1);
buffer.extend_from_slice(&packet2);

log::debug!("Sending netflow9 packet of size {} to {}", buffer.len(), self.target);
self.socket.send_to(&buffer, &self.target).unwrap();

self.sequence = self.sequence.wrapping_add(2);
}
fn enqueue(&self, key: FlowbeeKey, data: FlowbeeData) {
let mut lock = self.send_queue.lock().unwrap();
lock.push((key, data));
}
}
}

0 comments on commit b7d4356

Please sign in to comment.