Skip to content

Commit

Permalink
First update of the 'recently finished flows' mechanism for tracking …
Browse files Browse the repository at this point in the history
…the last 5 minutes of data.
  • Loading branch information
thebracket committed Mar 9, 2024
1 parent 79fa1d4 commit 4a8be30
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,15 @@ impl AsnTable {
fn build_asn_table() -> anyhow::Result<Vec<Ip2AsnRow>> {
let file_path = Self::file_path();

let mut retries = 0;
while retries < 3 {
if file_path.exists() {
break;
if !file_path.exists() {
let mut retries = 0;
while retries < 3 {
if file_path.exists() {
break;
}
Self::download()?;
retries += 1;
}
Self::download()?;
retries += 1;
}

if !file_path.exists() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
use super::{get_asn_name_and_country, FlowAnalysis};
use crate::throughput_tracker::flow_data::FlowbeeRecipient;
use dashmap::DashMap;
use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey};
use once_cell::sync::Lazy;
use std::{
collections::HashMap,
fmt::Debug,
sync::{Arc, Mutex},
};

struct TimeBuffer {
buffer: Mutex<Vec<TimeEntry>>,
}

struct TimeEntry {
time: u64,
data: (FlowbeeKey, FlowbeeData, FlowAnalysis),
}

impl TimeBuffer {
fn new() -> Self {
Self {
buffer: Mutex::new(Vec::new()),
}
}

fn expire_over_five_minutes(&self) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let mut buffer = self.buffer.lock().unwrap();
buffer.retain(|v| now - v.time < 300);
}

fn push(&self, entry: TimeEntry) {
let mut buffer = self.buffer.lock().unwrap();
buffer.push(entry);
}

fn country_summary(&self) -> Vec<(String, [u64; 2], [f32; 2])> {
let buffer = self.buffer.lock().unwrap();
let mut my_buffer = buffer
.iter()
.map(|v| {
let (_key, data, analysis) = &v.data;
let (_name, country) = get_asn_name_and_country(analysis.asn_id.0);
let rtt = [
(data.last_rtt[0] / 1000000) as f32,
(data.last_rtt[1] / 1000000) as f32,
];
(country, data.bytes_sent, rtt)
})
.collect::<Vec<(String, [u64; 2], [f32; 2])>>();

// Sort by country
my_buffer.sort_by(|a, b| a.0.cmp(&b.0));

// Summarize by country
let mut country_summary = Vec::new();
let mut last_country = String::new();
let mut total_bytes = [0, 0];
let mut total_rtt = [0.0f64, 0.0f64];
let mut rtt_count = [0, 0];
for (country, bytes, rtt) in my_buffer {
if last_country != country {
if !last_country.is_empty() {
// Store the country
let rtt = [
if total_rtt[0] > 0.0 {
(total_rtt[0] / rtt_count[0] as f64) as f32
} else {
0.0
},
if total_rtt[1] > 0.0 {
(total_rtt[1] / rtt_count[1] as f64) as f32
} else {
0.0
},
];

country_summary.push((last_country, total_bytes, rtt));
}

last_country = country.to_string();
total_bytes = [0, 0];
total_rtt = [0.0, 0.0];
rtt_count = [0, 0];
}
total_bytes[0] += bytes[0];
total_bytes[1] += bytes[1];
total_rtt[0] += rtt[0] as f64;
total_rtt[1] += rtt[1] as f64;
rtt_count[0] += 1;
rtt_count[1] += 1;
}

// Store the last country
let rtt = [
if total_rtt[0] > 0.0 {
(total_rtt[0] / rtt_count[0] as f64) as f32
} else {
0.0
},
if total_rtt[1] > 0.0 {
(total_rtt[1] / rtt_count[1] as f64) as f32
} else {
0.0
},
];

country_summary.push((last_country, total_bytes, rtt));

// Sort by bytes descending
country_summary.sort_by(|a, b| b.1[0].cmp(&a.1[0]));

country_summary
}
}

static RECENT_FLOWS: Lazy<TimeBuffer> = Lazy::new(|| TimeBuffer::new());

pub struct FinishedFlowAnalysis {}

impl FinishedFlowAnalysis {
pub fn new() -> Arc<Self> {
log::debug!("Created Flow Analysis Endpoint");

std::thread::spawn(|| loop {
RECENT_FLOWS.expire_over_five_minutes();
std::thread::sleep(std::time::Duration::from_secs(60 * 5));
});

Arc::new(Self {})
}
}

impl FlowbeeRecipient for FinishedFlowAnalysis {
fn enqueue(&self, key: FlowbeeKey, data: FlowbeeData, analysis: FlowAnalysis) {
log::info!("Finished flow analysis");
RECENT_FLOWS.push(TimeEntry {
time: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
data: (key, data, analysis),
});

println!("{:?}", RECENT_FLOWS.country_summary());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use self::asn::AsnTable;
mod asn;
mod protocol;
pub use protocol::FlowProtocol;

use super::AsnId;
mod finished_flows;
pub use finished_flows::FinishedFlowAnalysis;

static ANALYSIS: Lazy<FlowAnalysisSystem> = Lazy::new(|| FlowAnalysisSystem::new());

Expand Down
17 changes: 10 additions & 7 deletions src/rust/lqosd/src/throughput_tracker/flow_data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod netflow5;
mod netflow9;
mod flow_analysis;

use crate::throughput_tracker::flow_data::{netflow5::Netflow5, netflow9::Netflow9};
use crate::throughput_tracker::flow_data::{flow_analysis::FinishedFlowAnalysis, netflow5::Netflow5, netflow9::Netflow9};
pub(crate) use flow_tracker::{ALL_FLOWS, AsnId};
use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey};
use std::sync::{
Expand All @@ -17,19 +17,21 @@ pub(crate) use flow_analysis::{setup_flow_analysis, lookup_asn_id, get_asn_name_


trait FlowbeeRecipient {
fn enqueue(&self, key: FlowbeeKey, data: FlowbeeData);
fn enqueue(&self, key: FlowbeeKey, data: FlowbeeData, analysis: FlowAnalysis);
}

// Creates the netflow tracker and returns the sender
pub fn setup_netflow_tracker() -> Sender<(FlowbeeKey, FlowbeeData)> {
let (tx, rx) = channel::<(FlowbeeKey, FlowbeeData)>();
pub fn setup_netflow_tracker() -> Sender<(FlowbeeKey, (FlowbeeData, FlowAnalysis))> {
let (tx, rx) = channel::<(FlowbeeKey, (FlowbeeData, FlowAnalysis))>();
let config = lqos_config::load_config().unwrap();

std::thread::spawn(move || {
log::info!("Starting the network flow tracker back-end");

// Build the endpoints list
let mut endpoints: Vec<Arc<dyn FlowbeeRecipient>> = Vec::new();
endpoints.push(FinishedFlowAnalysis::new());

if let Some(flow_config) = config.flows {
if let (Some(ip), Some(port), Some(version)) = (
flow_config.netflow_ip,
Expand All @@ -53,12 +55,13 @@ pub fn setup_netflow_tracker() -> Sender<(FlowbeeKey, FlowbeeData)> {
}
}
}
log::info!("Flow Endpoints: {}", endpoints.len());

// Send to all endpoints upon receipt
while let Ok((key, value)) = rx.recv() {
while let Ok((key, (value, analysis))) = rx.recv() {
endpoints.iter_mut().for_each(|f| {
log::debug!("Enqueueing flow data for {key:?}");
f.enqueue(key.clone(), value.clone());
//log::debug!("Enqueueing flow data for {key:?}");
f.enqueue(key.clone(), value.clone(), analysis.clone());
});
}
log::info!("Network flow tracker back-end has stopped")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Support for the Netflow 5 protocol
//! Mostly taken from: https://netflow.caligare.com/netflow_v5.htm
mod protocol;
use super::FlowbeeRecipient;
use super::{FlowAnalysis, FlowbeeRecipient};
use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey};
pub(crate) use protocol::*;
use std::{
Expand Down Expand Up @@ -83,7 +83,7 @@ impl Netflow5 {
}

impl FlowbeeRecipient for Netflow5 {
fn enqueue(&self, key: FlowbeeKey, data: FlowbeeData) {
fn enqueue(&self, key: FlowbeeKey, data: FlowbeeData, _analysis: FlowAnalysis) {
let mut lock = self.send_queue.lock().unwrap();
lock.push((key, data));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey};
use std::{net::UdpSocket, sync::{atomic::AtomicU32, Arc, Mutex}};

use self::protocol::to_netflow_9;
use super::FlowbeeRecipient;
use super::{FlowAnalysis, FlowbeeRecipient};
mod protocol;

pub(crate) struct Netflow9 {
Expand Down Expand Up @@ -66,7 +66,7 @@ impl Netflow9 {
}

impl FlowbeeRecipient for Netflow9 {
fn enqueue(&self, key: FlowbeeKey, data: FlowbeeData) {
fn enqueue(&self, key: FlowbeeKey, data: FlowbeeData, _analysis: FlowAnalysis) {
let mut lock = self.send_queue.lock().unwrap();
lock.push((key, data));
}
Expand Down
4 changes: 2 additions & 2 deletions src/rust/lqosd/src/throughput_tracker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub static THROUGHPUT_TRACKER: Lazy<ThroughputTracker> = Lazy::new(ThroughputTra
/// collection thread that there is fresh data.
pub async fn spawn_throughput_monitor(
long_term_stats_tx: Sender<StatsUpdateMessage>,
netflow_sender: std::sync::mpsc::Sender<(FlowbeeKey, FlowbeeData)>,
netflow_sender: std::sync::mpsc::Sender<(FlowbeeKey, (FlowbeeData, FlowAnalysis))>,
) {
info!("Starting the bandwidth monitor thread.");
let interval_ms = 1000; // 1 second
Expand All @@ -44,7 +44,7 @@ pub async fn spawn_throughput_monitor(
async fn throughput_task(
interval_ms: u64,
long_term_stats_tx: Sender<StatsUpdateMessage>,
netflow_sender: std::sync::mpsc::Sender<(FlowbeeKey, FlowbeeData)>
netflow_sender: std::sync::mpsc::Sender<(FlowbeeKey, (FlowbeeData, FlowAnalysis))>
) {
// Obtain the flow timeout from the config, default to 30 seconds
let timeout_seconds = if let Ok(config) = lqos_config::load_config() {
Expand Down
20 changes: 10 additions & 10 deletions src/rust/lqosd/src/throughput_tracker/tracking_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ impl ThroughputTracker {
&self,
timeout_seconds: u64,
netflow_enabled: bool,
sender: std::sync::mpsc::Sender<(FlowbeeKey, FlowbeeData)>,
sender: std::sync::mpsc::Sender<(FlowbeeKey, (FlowbeeData, FlowAnalysis))>,
) {
let self_cycle = self.cycle.load(std::sync::atomic::Ordering::Relaxed);

Expand All @@ -182,7 +182,6 @@ impl ThroughputTracker {

// Track the expired keys
let mut expired_keys = Vec::new();


// Track through all the flows
iterate_flows(&mut |key, data| {
Expand All @@ -194,12 +193,7 @@ impl ThroughputTracker {

if data.last_seen < expire {
// This flow has expired. Add it to the list to be cleaned
expired_keys.push(key.clone());

// Send it off to netperf for analysis if we are supporting doing so.
if netflow_enabled {
let _ = sender.send((key.clone(), data.clone()));
}
expired_keys.push(key.clone());
} else {
// We have a valid flow, so it needs to be tracked
if let Some(mut this_flow) = ALL_FLOWS.get_mut(&key) {
Expand All @@ -217,7 +211,6 @@ impl ThroughputTracker {
let flow_analysis = FlowAnalysis::new(&key);

ALL_FLOWS.insert(key.clone(), (data.clone(), flow_analysis));
// TODO: Submit it for analysis
}

// TCP - we have RTT data? 6 is TCP
Expand Down Expand Up @@ -249,7 +242,14 @@ impl ThroughputTracker {
log::warn!("Failed to end flows: {:?}", e);
}
for key in expired_keys {
ALL_FLOWS.remove(&key);
// Send it off to netperf for analysis if we are supporting doing so.
if netflow_enabled {
if let Some(d) = ALL_FLOWS.get(&key) {
let _ = sender.send((key.clone(), (d.0.clone(), d.1.clone())));
}
}

//ALL_FLOWS.remove(&key);
}
}
}
Expand Down

0 comments on commit 4a8be30

Please sign in to comment.