Skip to content

Commit

Permalink
Refactor netflow v9 into readable code, and the IPv4 version is at le…
Browse files Browse the repository at this point in the history
…ast somewhat working now.
  • Loading branch information
thebracket committed Mar 7, 2024
1 parent 10c56f9 commit 34a2ec7
Show file tree
Hide file tree
Showing 8 changed files with 401 additions and 276 deletions.
38 changes: 8 additions & 30 deletions src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/mod.rs
Original file line number Diff line number Diff line change
@@ -1,59 +1,37 @@
use std::{net::UdpSocket, time::Instant};

use std::net::UdpSocket;
use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey};
use crate::throughput_tracker::flow_data::netflow9::protocol::{header::Netflow9Header, template_ipv4::template_data_ipv4, template_ipv6::template_data_ipv6};

use self::protocol::{to_netflow_9, Netflow9Header};

use self::protocol::to_netflow_9;
use super::FlowbeeRecipient;

mod protocol;

pub(crate) struct Netflow9 {
socket: UdpSocket,
sequence: u32,
target: String,
last_sent_template: Option<Instant>,
}

impl Netflow9 {
pub(crate) fn new(target: String) -> anyhow::Result<Self> {
let socket = UdpSocket::bind("0.0.0.0:12212")?;
Ok(Self { socket, sequence: 0, target, last_sent_template: None})
Ok(Self { socket, sequence: 0, target })
}
}

impl FlowbeeRecipient for Netflow9 {
fn send(&mut self, key: FlowbeeKey, data: FlowbeeData) {
let mut needs_template = false;
if let Some(last_sent_template) = self.last_sent_template {
if last_sent_template.elapsed().as_secs() > 60 {
needs_template = true;
}
} else {
needs_template = true;
}

if needs_template {
// Get the header, ipv4 template and ipv6 templates and send them all.
let header = Netflow9Header::new(self.sequence);
let header_bytes = unsafe { std::slice::from_raw_parts(&header as *const _ as *const u8, std::mem::size_of::<Netflow9Header>()) };
let mut buffer = Vec::with_capacity(header_bytes.len());
buffer.extend_from_slice(header_bytes);
buffer.extend_from_slice(&protocol::template_data_ipv4(self.sequence));
buffer.extend_from_slice(&protocol::template_data_ipv6(self.sequence));
self.socket.send_to(&buffer, &self.target).unwrap();
self.last_sent_template = Some(Instant::now());
}

if let Ok((packet1, packet2)) = to_netflow_9(&key, &data) {
let header = Netflow9Header::new(self.sequence);
let header = Netflow9Header::new(self.sequence, 4);
let header_bytes = unsafe { std::slice::from_raw_parts(&header as *const _ as *const u8, std::mem::size_of::<Netflow9Header>()) };
let mut buffer = Vec::with_capacity(header_bytes.len() + packet1.len() + packet2.len());
buffer.extend_from_slice(header_bytes);
buffer.extend_from_slice(&template_data_ipv4());
buffer.extend_from_slice(&template_data_ipv6());
buffer.extend_from_slice(&packet1);
buffer.extend_from_slice(&packet2);

//log::debug!("Sending netflow packet to {target}", target = self.target);
log::debug!("Sending netflow9 packet of size {} to {}", buffer.len(), self.target);
self.socket.send_to(&buffer, &self.target).unwrap();

self.sequence = self.sequence.wrapping_add(2);
Expand Down
246 changes: 0 additions & 246 deletions src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol.rs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use std::net::IpAddr;

use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey};
use super::field_types::*;

pub(crate) fn encode_fields_from_template(template: &[(u16, u16)], direction: usize, key: &FlowbeeKey, data: &FlowbeeData) -> anyhow::Result<Vec<u8>> {
let src_port = if direction == 0 { key.src_port } else { key.dst_port };
let dst_port = if direction == 0 { key.dst_port } else { key.src_port };

let total_size: u16 = template.iter().map(|(_, size)| size).sum();
let mut result = Vec::with_capacity(total_size as usize);
for (field_type, field_length) in template.iter() {
match (*field_type, *field_length) {
IN_BYTES => encode_u64(data.bytes_sent[direction], &mut result),
IN_PKTS => encode_u64(data.packets_sent[direction], &mut result),
PROTOCOL => result.push(key.ip_protocol),
L4_SRC_PORT => encode_u16(src_port, &mut result),
L4_DST_PORT => encode_u16(dst_port, &mut result),
DST_TOS => result.push(data.tos),
IPV4_SRC_ADDR => encode_ipv4(0, key, &mut result)?,
IPV4_DST_ADDR => encode_ipv4(1, key, &mut result)?,
IPV6_SRC_ADDR => encode_ipv6(0, key, &mut result)?,
IPV6_DST_ADDR => encode_ipv6(1, key, &mut result)?,
_ => anyhow::bail!("Don't know how to encode field type {} yet", field_type),
}
}
Ok(result)
}

fn encode_u64(value: u64, target: &mut Vec<u8>) {
target.extend_from_slice(&value.to_be_bytes());
}

fn encode_u16(value: u16, target: &mut Vec<u8>) {
target.extend_from_slice(&value.to_be_bytes());
}

fn encode_ipv4(direction: usize, key: &FlowbeeKey, target: &mut Vec<u8>) -> anyhow::Result<()> {
let local = key.local_ip.as_ip();
let remote = key.remote_ip.as_ip();
if let (IpAddr::V4(local), IpAddr::V4(remote)) = (local, remote) {
let src_ip = u32::from_ne_bytes(local.octets());
let dst_ip = u32::from_ne_bytes(remote.octets());
if direction == 0 {
target.extend_from_slice(&src_ip.to_be_bytes());
} else {
target.extend_from_slice(&dst_ip.to_be_bytes());
}
} else {
anyhow::bail!("Expected IPv4 addresses, got {:?}", (local, remote));
}
Ok(())
}

fn encode_ipv6(direction: usize, key: &FlowbeeKey, target: &mut Vec<u8>) -> anyhow::Result<()> {
let local = key.local_ip.as_ip();
let remote = key.remote_ip.as_ip();
if let (IpAddr::V6(local), IpAddr::V6(remote)) = (local, remote) {
let src_ip = local.octets();
let dst_ip = remote.octets();
if direction == 0 {
target.extend_from_slice(&src_ip);
} else {
target.extend_from_slice(&dst_ip);
}
} else {
anyhow::bail!("Expected IPv6 addresses, got {:?}", (local, remote));
}
Ok(())
}
Loading

0 comments on commit 34a2ec7

Please sign in to comment.