diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/mod.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/mod.rs index 3396c1b4d..76b5d3e00 100644 --- a/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/mod.rs +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/mod.rs @@ -1,59 +1,37 @@ -use std::{net::UdpSocket, time::Instant}; - +use std::net::UdpSocket; use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey}; +use crate::throughput_tracker::flow_data::netflow9::protocol::{header::Netflow9Header, template_ipv4::template_data_ipv4, template_ipv6::template_data_ipv6}; -use self::protocol::{to_netflow_9, Netflow9Header}; - +use self::protocol::to_netflow_9; use super::FlowbeeRecipient; - mod protocol; pub(crate) struct Netflow9 { socket: UdpSocket, sequence: u32, target: String, - last_sent_template: Option, } impl Netflow9 { pub(crate) fn new(target: String) -> anyhow::Result { let socket = UdpSocket::bind("0.0.0.0:12212")?; - Ok(Self { socket, sequence: 0, target, last_sent_template: None}) + Ok(Self { socket, sequence: 0, target }) } } impl FlowbeeRecipient for Netflow9 { fn send(&mut self, key: FlowbeeKey, data: FlowbeeData) { - let mut needs_template = false; - if let Some(last_sent_template) = self.last_sent_template { - if last_sent_template.elapsed().as_secs() > 60 { - needs_template = true; - } - } else { - needs_template = true; - } - - if needs_template { - // Get the header, ipv4 template and ipv6 templates and send them all. - let header = Netflow9Header::new(self.sequence); - let header_bytes = unsafe { std::slice::from_raw_parts(&header as *const _ as *const u8, std::mem::size_of::()) }; - let mut buffer = Vec::with_capacity(header_bytes.len()); - buffer.extend_from_slice(header_bytes); - buffer.extend_from_slice(&protocol::template_data_ipv4(self.sequence)); - buffer.extend_from_slice(&protocol::template_data_ipv6(self.sequence)); - self.socket.send_to(&buffer, &self.target).unwrap(); - self.last_sent_template = Some(Instant::now()); - } - if let Ok((packet1, packet2)) = to_netflow_9(&key, &data) { - let header = Netflow9Header::new(self.sequence); + let header = Netflow9Header::new(self.sequence, 4); let header_bytes = unsafe { std::slice::from_raw_parts(&header as *const _ as *const u8, std::mem::size_of::()) }; let mut buffer = Vec::with_capacity(header_bytes.len() + packet1.len() + packet2.len()); buffer.extend_from_slice(header_bytes); + buffer.extend_from_slice(&template_data_ipv4()); + buffer.extend_from_slice(&template_data_ipv6()); buffer.extend_from_slice(&packet1); buffer.extend_from_slice(&packet2); - //log::debug!("Sending netflow packet to {target}", target = self.target); + log::debug!("Sending netflow9 packet of size {} to {}", buffer.len(), self.target); self.socket.send_to(&buffer, &self.target).unwrap(); self.sequence = self.sequence.wrapping_add(2); diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol.rs deleted file mode 100644 index 86e51e96b..000000000 --- a/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol.rs +++ /dev/null @@ -1,246 +0,0 @@ -//! Protocol definitions for Netflow v9 Data. - -use std::net::IpAddr; - -use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey}; -use lqos_utils::unix_time::time_since_boot; -use nix::sys::time::TimeValLike; - -pub(crate) struct Netflow9Header { - pub(crate) version: u16, - pub(crate) count: u16, - pub(crate) sys_uptime: u32, - pub(crate) unix_secs: u32, - pub(crate) package_sequence: u32, - pub(crate) source_id: u32, -} - -impl Netflow9Header { - /// Create a new Netflow 9 header - pub(crate) fn new(flow_sequence: u32) -> Self { - let uptime = time_since_boot().unwrap(); - - Self { - version: (9u16).to_be(), - count: (2u16).to_be(), - sys_uptime: (uptime.num_milliseconds() as u32).to_be(), - unix_secs: (uptime.num_seconds() as u32).to_be(), - package_sequence: flow_sequence, - source_id: 0, - } - } -} - -fn add_field(bytes: &mut Vec, field_type: u16, field_length: u16) { - bytes.extend_from_slice(field_type.to_be_bytes().as_ref()); - bytes.extend_from_slice(field_length.to_be_bytes().as_ref()); -} - -pub fn template_data_ipv4(sequence: u32) -> Vec { - const FIELDS: [(u16, u16); 8] = [ - (1, 4), // IN_BYTES - (2, 4), // IN_PKTS - (4, 1), // PROTOCOL - (7, 4), // L4_SRC_PORT - (8, 4), // IPV4_SRC_ADDR - (11, 4), // L4_DST_PORT - (12, 4), // IPV4_DST_ADDR - (15, 1), // TOS - ]; - - // Build the header - let mut bytes = Vec::new(); - - // Add the flowset_id, id is zero. (See https://netflow.caligare.com/netflow_v9.htm) - // 16 - bytes.push(0); - bytes.push(0); - - // Add the length of the flowset, 4 bytes - const LENGTH: u16 = 4; // TODO: Fixme - bytes.extend_from_slice(LENGTH.to_be_bytes().as_ref()); - - // Add the TemplateID. We're going to use 256 for IPv4. - const TEMPLATE_ID: u16 = 256; - bytes.extend_from_slice(TEMPLATE_ID.to_be_bytes().as_ref()); - - // Add the number of fields in the template - const FIELD_COUNT: u16 = FIELDS.len() as u16; - bytes.extend_from_slice(FIELD_COUNT.to_be_bytes().as_ref()); - - for (field_type, field_length) in FIELDS.iter() { - add_field(&mut bytes, *field_type, *field_length); - } - - bytes -} - -pub fn template_data_ipv6(sequence: u32) -> Vec { - const FIELDS: [(u16, u16); 8] = [ - (1, 4), // IN_BYTES - (2, 4), // IN_PKTS - (4, 1), // PROTOCOL - (7, 4), // L4_SRC_PORT - (27, 16), // IPV6_SRC_ADDR - (11, 4), // L4_DST_PORT - (28, 16), // IPV6_DST_ADDR - (15, 1), // TOS - ]; - - // Build the header - let mut bytes = Vec::new(); - - // Add the flowset_id, id is zero. (See https://netflow.caligare.com/netflow_v9.htm) - // 16 - bytes.push(0); - bytes.push(0); - - // Add the length of the flowset, 4 bytes - const LENGTH: u16 = 4; // TODO: Fixme - bytes.extend_from_slice(LENGTH.to_be_bytes().as_ref()); - - // Add the TemplateID. We're going to use 257 for IPv6. - const TEMPLATE_ID: u16 = 257; - bytes.extend_from_slice(TEMPLATE_ID.to_be_bytes().as_ref()); - - // Add the number of fields in the template - const FIELD_COUNT: u16 = FIELDS.len() as u16; - bytes.extend_from_slice(FIELD_COUNT.to_be_bytes().as_ref()); - - for (field_type, field_length) in FIELDS.iter() { - add_field(&mut bytes, *field_type, *field_length); - } - - bytes -} - -pub(crate) fn to_netflow_9( - key: &FlowbeeKey, - data: &FlowbeeData, -) -> anyhow::Result<(Vec, Vec)> { - if key.local_ip.is_v4() && key.remote_ip.is_v4() { - // Return IPv4 records - Ok((ipv4_record(key, data, 0)?, ipv4_record(key, data, 1)?)) - } else if (!key.local_ip.is_v4()) && (!key.remote_ip.is_v4()) { - // Return IPv6 records - Ok((ipv6_record(key, data, 0)?, ipv6_record(key, data, 1)?)) - } else { - anyhow::bail!("Mixing IPv4 and IPv6 is not supported"); - } -} - -fn ipv4_record(key: &FlowbeeKey, data: &FlowbeeData, direction: usize) -> anyhow::Result> { - // Configure IP directions - let local = key.local_ip.as_ip(); - let remote = key.remote_ip.as_ip(); - if let (IpAddr::V4(local), IpAddr::V4(remote)) = (local, remote) { - let src_ip = u32::from_ne_bytes(local.octets()); - let dst_ip = u32::from_ne_bytes(remote.octets()); - - // Build the field values - let mut field_bytes: Vec = Vec::new(); - - // Bytes Sent - field_bytes.extend_from_slice(&data.bytes_sent[direction].to_be_bytes()); - - // Packet Sent - field_bytes.extend_from_slice(&data.packets_sent[direction].to_be_bytes()); - - // Add the protocol - field_bytes.push(key.ip_protocol); - - // Add the source port - field_bytes.extend_from_slice(&key.src_port.to_be_bytes()); - - // Add the source address - if direction == 0 { - field_bytes.extend_from_slice(&src_ip.to_be_bytes()); - } else { - field_bytes.extend_from_slice(&dst_ip.to_be_bytes()); - } - - // Add the destination port - field_bytes.extend_from_slice(&key.dst_port.to_be_bytes()); - - // Add the destination address - if direction == 0 { - field_bytes.extend_from_slice(&dst_ip.to_be_bytes()); - } else { - field_bytes.extend_from_slice(&src_ip.to_be_bytes()); - } - - // Add the TOS - field_bytes.push(0); - - // Build the actual record - let mut bytes = Vec::new(); - // Add the flowset_id. Template ID is 256 - bytes.extend_from_slice(&(256u16).to_be_bytes()); - - // Add the length. Length includes 2 bytes for flowset and 2 bytes for the length field - // itself. That's odd. - bytes.extend_from_slice(&((field_bytes.len() as u16 + 4).to_be_bytes())); - - Ok(bytes) - } else { - anyhow::bail!("IPv6 data in an IPv4 function was a bad idea"); - } -} - -fn ipv6_record(key: &FlowbeeKey, data: &FlowbeeData, direction: usize) -> anyhow::Result> { - // Configure IP directions - let local = key.local_ip.as_ip(); - let remote = key.remote_ip.as_ip(); - if let (IpAddr::V6(local), IpAddr::V6(remote)) = (local, remote) { - let src_ip = local.octets(); - let dst_ip = remote.octets(); - - // Build the field values - let mut field_bytes: Vec = Vec::new(); - - // Bytes Sent - field_bytes.extend_from_slice(&data.bytes_sent[direction].to_be_bytes()); - - // Packet Sent - field_bytes.extend_from_slice(&data.packets_sent[direction].to_be_bytes()); - - // Add the protocol - field_bytes.push(key.ip_protocol); - - // Add the source port - field_bytes.extend_from_slice(&key.src_port.to_be_bytes()); - - // Add the source address - if direction == 0 { - field_bytes.extend_from_slice(&src_ip); - } else { - field_bytes.extend_from_slice(&dst_ip); - } - - // Add the destination port - field_bytes.extend_from_slice(&key.dst_port.to_be_bytes()); - - // Add the destination address - if direction == 0 { - field_bytes.extend_from_slice(&dst_ip); - } else { - field_bytes.extend_from_slice(&src_ip); - } - - // Add the TOS - field_bytes.push(0); - - // Build the actual record - let mut bytes = Vec::new(); - // Add the flowset_id. Template ID is 257 - bytes.extend_from_slice(&(257u16).to_be_bytes()); - - // Add the length. Length includes 2 bytes for flowset and 2 bytes for the length field - // itself. That's odd. - bytes.extend_from_slice(&((field_bytes.len() as u16 + 4).to_be_bytes())); - - Ok(bytes) - } else { - anyhow::bail!("IPv4 data in an IPv6 function was a bad idea"); - } -} \ No newline at end of file diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol/field_encoder.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol/field_encoder.rs new file mode 100644 index 000000000..2d5054fad --- /dev/null +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol/field_encoder.rs @@ -0,0 +1,70 @@ +use std::net::IpAddr; + +use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey}; +use super::field_types::*; + +pub(crate) fn encode_fields_from_template(template: &[(u16, u16)], direction: usize, key: &FlowbeeKey, data: &FlowbeeData) -> anyhow::Result> { + let src_port = if direction == 0 { key.src_port } else { key.dst_port }; + let dst_port = if direction == 0 { key.dst_port } else { key.src_port }; + + let total_size: u16 = template.iter().map(|(_, size)| size).sum(); + let mut result = Vec::with_capacity(total_size as usize); + for (field_type, field_length) in template.iter() { + match (*field_type, *field_length) { + IN_BYTES => encode_u64(data.bytes_sent[direction], &mut result), + IN_PKTS => encode_u64(data.packets_sent[direction], &mut result), + PROTOCOL => result.push(key.ip_protocol), + L4_SRC_PORT => encode_u16(src_port, &mut result), + L4_DST_PORT => encode_u16(dst_port, &mut result), + DST_TOS => result.push(data.tos), + IPV4_SRC_ADDR => encode_ipv4(0, key, &mut result)?, + IPV4_DST_ADDR => encode_ipv4(1, key, &mut result)?, + IPV6_SRC_ADDR => encode_ipv6(0, key, &mut result)?, + IPV6_DST_ADDR => encode_ipv6(1, key, &mut result)?, + _ => anyhow::bail!("Don't know how to encode field type {} yet", field_type), + } + } + Ok(result) +} + +fn encode_u64(value: u64, target: &mut Vec) { + target.extend_from_slice(&value.to_be_bytes()); +} + +fn encode_u16(value: u16, target: &mut Vec) { + target.extend_from_slice(&value.to_be_bytes()); +} + +fn encode_ipv4(direction: usize, key: &FlowbeeKey, target: &mut Vec) -> anyhow::Result<()> { + let local = key.local_ip.as_ip(); + let remote = key.remote_ip.as_ip(); + if let (IpAddr::V4(local), IpAddr::V4(remote)) = (local, remote) { + let src_ip = u32::from_ne_bytes(local.octets()); + let dst_ip = u32::from_ne_bytes(remote.octets()); + if direction == 0 { + target.extend_from_slice(&src_ip.to_be_bytes()); + } else { + target.extend_from_slice(&dst_ip.to_be_bytes()); + } + } else { + anyhow::bail!("Expected IPv4 addresses, got {:?}", (local, remote)); + } + Ok(()) +} + +fn encode_ipv6(direction: usize, key: &FlowbeeKey, target: &mut Vec) -> anyhow::Result<()> { + let local = key.local_ip.as_ip(); + let remote = key.remote_ip.as_ip(); + if let (IpAddr::V6(local), IpAddr::V6(remote)) = (local, remote) { + let src_ip = local.octets(); + let dst_ip = remote.octets(); + if direction == 0 { + target.extend_from_slice(&src_ip); + } else { + target.extend_from_slice(&dst_ip); + } + } else { + anyhow::bail!("Expected IPv6 addresses, got {:?}", (local, remote)); + } + Ok(()) +} \ No newline at end of file diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol/field_types.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol/field_types.rs new file mode 100644 index 000000000..9cfae8f5c --- /dev/null +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol/field_types.rs @@ -0,0 +1,82 @@ +// Extracted from https://netflow.caligare.com/netflow_v9.htm +#![allow(dead_code)] + +pub(crate) const IN_BYTES:(u16, u16) = (1, 8); +pub(crate) const IN_PKTS:(u16, u16) = (2, 8); +pub(crate) const FLOWS:(u16, u16) = (3, 4); +pub(crate) const PROTOCOL:(u16, u16) = (4, 1); +pub(crate) const SRC_TOS:(u16, u16) = (5, 1); +pub(crate) const TCP_FLAGS:(u16, u16) = (6, 1); +pub(crate) const L4_SRC_PORT:(u16, u16) = (7, 2); +pub(crate) const IPV4_SRC_ADDR:(u16, u16) = (8, 4); +pub(crate) const SRC_MASK:(u16, u16) = (9, 1); +pub(crate) const INPUT_SNMP:(u16, u16) = (10, 2); +pub(crate) const L4_DST_PORT:(u16, u16) = (11, 2); +pub(crate) const IPV4_DST_ADDR:(u16, u16) = (12, 4); +pub(crate) const DST_MASK:(u16, u16) = (13, 1); +pub(crate) const OUTPUT_SNMP:(u16, u16) = (14, 2); +pub(crate) const IPV4_NEXT_HOP:(u16, u16) = (15, 4); +pub(crate) const SRC_AS:(u16, u16) = (16, 2); +pub(crate) const DST_AS:(u16, u16) = (17, 2); +pub(crate) const BGP_IPV4_NEXT_HOP:(u16, u16) = (18, 4); +pub(crate) const MUL_DST_PKTS:(u16, u16) = (19, 4); +pub(crate) const MUL_DST_BYTES:(u16, u16) = (20, 4); +pub(crate) const LAST_SWITCHED:(u16, u16) = (21, 4); +pub(crate) const FIRST_SWITCHED:(u16, u16) = (22, 4); +pub(crate) const OUT_BYTES:(u16, u16) = (23, 4); +pub(crate) const OUT_PKTS:(u16, u16) = (24, 4); +pub(crate) const MIN_PKT_LNGTH:(u16, u16) = (25, 2); +pub(crate) const MAX_PKT_LNGTH:(u16, u16) = (26, 2); +pub(crate) const IPV6_SRC_ADDR:(u16, u16) = (27, 16); +pub(crate) const IPV6_DST_ADDR:(u16, u16) = (28, 16); +pub(crate) const IPV6_SRC_MASK:(u16, u16) = (29, 1); +pub(crate) const IPV6_DST_MASK:(u16, u16) = (30, 1); +pub(crate) const IPV6_FLOW_LABEL:(u16, u16) = (31, 3); +pub(crate) const ICMP_TYPE:(u16, u16) = (32, 2); +pub(crate) const MUL_IGMP_TYPE:(u16, u16) = (33, 1); +pub(crate) const SAMPLING_INTERVAL:(u16, u16) = (34, 4); +pub(crate) const SAMPLING_ALGORITHM:(u16, u16) = (35, 1); +pub(crate) const FLOW_ACTIVE_TIMEOUT:(u16, u16) = (36, 2); +pub(crate) const FLOW_INACTIVE_TIMEOUT:(u16, u16) = (37, 2); +pub(crate) const ENGINE_TYPE:(u16, u16) = (38, 1); +pub(crate) const ENGINE_ID:(u16, u16) = (39, 1); +pub(crate) const TOTAL_BYTES_EXP:(u16, u16) = (40, 4); +pub(crate) const TOTAL_PKTS_EXP:(u16, u16) = (41, 4); +pub(crate) const TOTAL_FLOWS_EXP:(u16, u16) = (42, 4); +pub(crate) const IPV4_SRC_PREFIX:(u16, u16) = (44, 4); +pub(crate) const IPV4_DST_PREFIX:(u16, u16) = (45, 4); +pub(crate) const MPLS_TOP_LABEL_TYPE:(u16, u16) = (46, 1); +pub(crate) const MPLS_TOP_LABEL_IP_ADDR:(u16, u16) = (47, 4); +pub(crate) const FLOW_SAMPLER_ID:(u16, u16) = (48, 1); +pub(crate) const FLOW_SAMPLER_MODE:(u16, u16) = (49, 1); +pub(crate) const FLOW_SAMPLER_RANDOM_INTERVAL:(u16, u16) = (50, 4); +pub(crate) const MIN_TTL:(u16, u16) = (52, 1); +pub(crate) const MAX_TTL:(u16, u16) = (53, 1); +pub(crate) const IPV4_IDENT:(u16, u16) = (54, 2); +pub(crate) const DST_TOS:(u16, u16) = (55, 1); +pub(crate) const IN_SRC_MAC:(u16, u16) = (56, 6); +pub(crate) const OUT_DST_MAC:(u16, u16) = (57, 6); +pub(crate) const SRC_VLAN:(u16, u16) = (58, 2); +pub(crate) const DST_VLAN:(u16, u16) = (59, 2); +pub(crate) const IP_PROTOCOL_VERSION:(u16, u16) = (60, 1); +pub(crate) const DIRECTION:(u16, u16) = (61, 1); +pub(crate) const IPV6_NEXT_HOP:(u16, u16) = (62, 16); +pub(crate) const BPG_IPV6_NEXT_HOP:(u16, u16) = (63, 16); +pub(crate) const IPV6_OPTION_HEADERS:(u16, u16) = (64, 4); +pub(crate) const MPLS_LABEL_1:(u16, u16) = (70, 3); +pub(crate) const MPLS_LABEL_2:(u16, u16) = (71, 3); +pub(crate) const MPLS_LABEL_3:(u16, u16) = (72, 3); +pub(crate) const MPLS_LABEL_4:(u16, u16) = (73, 3); +pub(crate) const MPLS_LABEL_5:(u16, u16) = (74, 3); +pub(crate) const MPLS_LABEL_6:(u16, u16) = (75, 3); +pub(crate) const MPLS_LABEL_7:(u16, u16) = (76, 3); +pub(crate) const MPLS_LABEL_8:(u16, u16) = (77, 3); +pub(crate) const MPLS_LABEL_9:(u16, u16) = (78, 3); +pub(crate) const MPLS_LABEL_10:(u16, u16) = (79, 3); +pub(crate) const IN_DST_MAC:(u16, u16) = (80, 6); +pub(crate) const OUT_SRC_MAC:(u16, u16) = (81, 6); +pub(crate) const IF_NAME:(u16, u16) = (82, 0); +pub(crate) const IF_DESC:(u16, u16) = (83, 0); +pub(crate) const SAMPLER_NAME:(u16, u16) = (84, 0); +pub(crate) const IN_PERMANENT_BYTES:(u16, u16) = (85, 4); +pub(crate) const IN_PERMANENT_PKTS:(u16, u16) = (86, 4); diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol/header.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol/header.rs new file mode 100644 index 000000000..fec54c27a --- /dev/null +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol/header.rs @@ -0,0 +1,28 @@ +use lqos_utils::unix_time::time_since_boot; +use nix::sys::time::TimeValLike; + +#[repr(C)] +pub(crate) struct Netflow9Header { + pub(crate) version: u16, + pub(crate) count: u16, + pub(crate) sys_uptime: u32, + pub(crate) unix_secs: u32, + pub(crate) package_sequence: u32, + pub(crate) source_id: u32, +} + +impl Netflow9Header { + /// Create a new Netflow 9 header + pub(crate) fn new(flow_sequence: u32, record_count_including_templates: u16) -> Self { + let uptime = time_since_boot().unwrap(); + + Self { + version: (9u16).to_be(), + count: record_count_including_templates.to_be(), + sys_uptime: (uptime.num_milliseconds() as u32).to_be(), + unix_secs: (uptime.num_seconds() as u32).to_be(), + package_sequence: flow_sequence.to_be(), + source_id: 0, + } + } +} \ No newline at end of file diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol/mod.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol/mod.rs new file mode 100644 index 000000000..60fbc3fc6 --- /dev/null +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol/mod.rs @@ -0,0 +1,131 @@ +//! Protocol definitions for Netflow v9 Data. +//! Mostly derived from https://netflow.caligare.com/netflow_v9.htm + +use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey}; +use std::net::IpAddr; +mod field_types; +use field_types::*; +pub(crate) mod field_encoder; +pub(crate) mod header; +pub(crate) mod template_ipv4; +pub(crate) mod template_ipv6; + +fn add_field(bytes: &mut Vec, field_type: u16, field_length: u16) { + bytes.extend_from_slice(field_type.to_be_bytes().as_ref()); + bytes.extend_from_slice(field_length.to_be_bytes().as_ref()); +} + +pub(crate) fn to_netflow_9( + key: &FlowbeeKey, + data: &FlowbeeData, +) -> anyhow::Result<(Vec, Vec)> { + if key.local_ip.is_v4() && key.remote_ip.is_v4() { + // Return IPv4 records + Ok((ipv4_record(key, data, 0)?, ipv4_record(key, data, 1)?)) + } else if (!key.local_ip.is_v4()) && (!key.remote_ip.is_v4()) { + // Return IPv6 records + Ok((ipv6_record(key, data, 0)?, ipv6_record(key, data, 1)?)) + } else { + anyhow::bail!("Mixing IPv4 and IPv6 is not supported"); + } +} + +fn ipv4_record(key: &FlowbeeKey, data: &FlowbeeData, direction: usize) -> anyhow::Result> { + let field_bytes = field_encoder::encode_fields_from_template( + &template_ipv4::FIELDS_IPV4, + direction, + key, + data, + )?; + + // Build the actual record + let mut bytes = Vec::new(); + // Add the flowset_id. Template ID is 256 + bytes.extend_from_slice(&(256u16).to_be_bytes()); + + // Add the length. Length includes 2 bytes for flowset and 2 bytes for the length field + // itself. That's odd. + let padding = (field_bytes.len() + 4) % 4; + let size = (bytes.len() + field_bytes.len() + padding + 2) as u16; + bytes.extend_from_slice(&size.to_be_bytes()); + + // Add the data itself + bytes.extend_from_slice(&field_bytes); + + println!("Padding: {}", padding); + println!("IPv4 data {} = {}", bytes.len(), size); + println!("Field bytes was: {}", field_bytes.len()); + + // Pad to 32-bits + for _ in 0..padding { + bytes.push(0); + } + + Ok(bytes) +} + +fn ipv6_record(key: &FlowbeeKey, data: &FlowbeeData, direction: usize) -> anyhow::Result> { + // Configure IP directions + let local = key.local_ip.as_ip(); + let remote = key.remote_ip.as_ip(); + if let (IpAddr::V6(local), IpAddr::V6(remote)) = (local, remote) { + let src_ip = local.octets(); + let dst_ip = remote.octets(); + + // Build the field values + let mut field_bytes: Vec = Vec::new(); + + // Bytes Sent + field_bytes.extend_from_slice(&data.bytes_sent[direction].to_be_bytes()); + + // Packet Sent + field_bytes.extend_from_slice(&data.packets_sent[direction].to_be_bytes()); + + // Add the protocol + field_bytes.push(key.ip_protocol); + + // Add the source port + field_bytes.extend_from_slice(&key.src_port.to_be_bytes()); + + // Add the source address + if direction == 0 { + field_bytes.extend_from_slice(&src_ip); + } else { + field_bytes.extend_from_slice(&dst_ip); + } + + // Add the destination port + field_bytes.extend_from_slice(&key.dst_port.to_be_bytes()); + + // Add the destination address + if direction == 0 { + field_bytes.extend_from_slice(&dst_ip); + } else { + field_bytes.extend_from_slice(&src_ip); + } + + // Add the TOS + field_bytes.push(0); + + // Build the actual record + let mut bytes = Vec::new(); + // Add the flowset_id. Template ID is 257 + bytes.extend_from_slice(&(257u16).to_be_bytes()); + + // Add the length. Length includes 2 bytes for flowset and 2 bytes for the length field + // itself. That's odd. + bytes.extend_from_slice(&((field_bytes.len() as u16 + 4).to_be_bytes())); + + // Add the data itself + bytes.extend_from_slice(&field_bytes); + + // Pad to 32-bits + while bytes.len() % 4 != 0 { + bytes.push(0); + } + + Ok(bytes) + } else { + anyhow::bail!("IPv4 data in an IPv6 function was a bad idea"); + } +} diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol/template_ipv4.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol/template_ipv4.rs new file mode 100644 index 000000000..2eaae91ce --- /dev/null +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol/template_ipv4.rs @@ -0,0 +1,42 @@ +use crate::throughput_tracker::flow_data::netflow9::protocol::*; + +pub(crate) const FIELDS_IPV4: [(u16, u16); 8] = [ + IN_BYTES, + IN_PKTS, + PROTOCOL, + L4_SRC_PORT, + IPV4_SRC_ADDR, + L4_DST_PORT, + IPV4_DST_ADDR, + DST_TOS, +]; + +pub fn template_data_ipv4() -> Vec { + // Build the header + let mut bytes = Vec::new(); + + // Add the flowset_id, id is zero. (See https://netflow.caligare.com/netflow_v9.htm) + // 16 + bytes.push(0); + bytes.push(0); + + // Add the length of the flowset, 4 bytes + const LENGTH: u16 = 8 + (FIELDS_IPV4.len() * 4) as u16; // TODO: Fixme + bytes.extend_from_slice(LENGTH.to_be_bytes().as_ref()); + + // Add the TemplateID. We're going to use 256 for IPv4. + const TEMPLATE_ID: u16 = 256; + bytes.extend_from_slice(TEMPLATE_ID.to_be_bytes().as_ref()); + + // Add the number of fields in the template + const FIELD_COUNT: u16 = FIELDS_IPV4.len() as u16; + bytes.extend_from_slice(FIELD_COUNT.to_be_bytes().as_ref()); + + for (field_type, field_length) in FIELDS_IPV4.iter() { + add_field(&mut bytes, *field_type, *field_length); + } + + println!("Templatev4 Size {} = {}", bytes.len(), 8 + (FIELDS_IPV4.len() * 2)); + + bytes +} diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol/template_ipv6.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol/template_ipv6.rs new file mode 100644 index 000000000..a27dfba74 --- /dev/null +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol/template_ipv6.rs @@ -0,0 +1,40 @@ +use crate::throughput_tracker::flow_data::netflow9::protocol::*; + +pub(crate) const FIELDS_IPV6: [(u16, u16); 8] = [ + IN_BYTES, + IN_PKTS, + PROTOCOL, + L4_SRC_PORT, + IPV6_SRC_ADDR, + L4_DST_PORT, + IPV6_DST_ADDR, + DST_TOS, +]; + +pub fn template_data_ipv6() -> Vec { + // Build the header + let mut bytes = Vec::new(); + + // Add the flowset_id, id is zero. (See https://netflow.caligare.com/netflow_v9.htm) + // 16 + bytes.push(0); + bytes.push(0); + + // Add the length of the flowset, 4 bytes + const LENGTH: u16 = 8 + (FIELDS_IPV6.len() * 4) as u16; // TODO: Fixme + bytes.extend_from_slice(LENGTH.to_be_bytes().as_ref()); + + // Add the TemplateID. We're going to use 257 for IPv6. + const TEMPLATE_ID: u16 = 257; + bytes.extend_from_slice(TEMPLATE_ID.to_be_bytes().as_ref()); + + // Add the number of fields in the template + const FIELD_COUNT: u16 = FIELDS_IPV6.len() as u16; + bytes.extend_from_slice(FIELD_COUNT.to_be_bytes().as_ref()); + + for (field_type, field_length) in FIELDS_IPV6.iter() { + add_field(&mut bytes, *field_type, *field_length); + } + + bytes +} \ No newline at end of file