Skip to content

Commit

Permalink
First example of an actual summary report - we can group flow endpoin…
Browse files Browse the repository at this point in the history
…ts in a 5-minute report.
  • Loading branch information
thebracket committed Mar 12, 2024
1 parent 1fb5838 commit 66a19c0
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 11 deletions.
3 changes: 3 additions & 0 deletions src/rust/lqos_bus/src/bus/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ pub enum BusRequest {

/// Flows by IP Address
FlowsByIp(String),

/// Current Endpoints by Country
CurrentEndpointsByCountry,
}

/// Defines the type of "top" flow being requested
Expand Down
3 changes: 3 additions & 0 deletions src/rust/lqos_bus/src/bus/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,7 @@ pub enum BusResponse {

/// Flows by IP
FlowsByIp(Vec<FlowbeeData>),

/// Current endpoints by country
CurrentEndpointsByCountry(Vec<(String, [u64; 2], [f32; 2])>),
}
12 changes: 12 additions & 0 deletions src/rust/lqos_node_manager/src/flow_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,17 @@ pub async fn top_5_flows(top_n: u32, flow_type: String) -> NoCache<Json<Vec<Flow
_ => Vec::new(),
};

NoCache::new(Json(result))
}

#[get("/api/flows/by_country")]
pub async fn flows_by_country() -> NoCache<Json<Vec<(String, [u64; 2], [f32; 2])>>> {
let responses =
bus_request(vec![BusRequest::CurrentEndpointsByCountry]).await.unwrap();
let result = match &responses[0] {
BusResponse::CurrentEndpointsByCountry(country_summary) => country_summary.to_owned(),
_ => Vec::new(),
};

NoCache::new(Json(result))
}
1 change: 1 addition & 0 deletions src/rust/lqos_node_manager/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ fn rocket() -> _ {
flow_monitor::all_flows_debug_dump,
flow_monitor::count_flows,
flow_monitor::top_5_flows,
flow_monitor::flows_by_country,
],
);

Expand Down
51 changes: 49 additions & 2 deletions src/rust/lqos_node_manager/static/main.html
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,11 @@ <h5 class="card-title">
<i class='fa fa-arrow-down'></i> Top 10 Downloaders
<button class="btn btn-small btn-success" href="/top10" onclick="showCircuits()">Circuits</button>
<button class="btn btn-small btn-success" href="/top10" onclick="showFlows()">Flows</button>
<button class="btn btn-small btn-success" href="/top10" onclick="showEndpoints()">Geo Endpoints</button>
</h5>
<div id="top10dl" style="display:block;"></div>
<div id="top10flows" style="display: none;"></div>
<div id="top10ep" style="display: none;"></div>
</div>
</div>
</div>
Expand Down Expand Up @@ -347,14 +349,54 @@ <h5 class="card-title"><i class='fa fa-exclamation'></i> Worst 10 RTT</h5>
});
}

function updateTop10Endpoints() {
$.get("/api/flows/by_country", data => {
//console.log(data);
let html = "<table class='table' style='font-size: 8pt'>";
html += "<thead>";
html += "<th>Country</th>";
html += "<th>UL ⬆️</th>";
html += "<th>DL ⬇️</th>";
html += "<th>UL RTT</th>";
html += "<th>DL RTT</th>";
html += "</thead></tbody>";
let i = 0;
while (i < data.length) {
html += "<tr>";
html += "<td>" + data[i][0] + "</td>";
html += "<td>" + scaleNumber(data[i][1][0]) + "</td>";
html += "<td>" + scaleNumber(data[i][1][1]) + "</td>";
html += "<td>" + (data[i][2][0] / 1000000).toFixed(2) + "</td>";
html += "<td>" + (data[i][2][1] / 1000000).toFixed(2) + "</td>";
html += "</tr>";
i += 1;
}
html += "</tbody></table>";
$("#top10ep").html(html);
});
}

let top10view = "circuits";

function showCircuits() {
$("#top10dl").show();
$("#top10flows").hide();
$("#top10ep").hide();
top10view = "circuits";
}

function showFlows() {
$("#top10dl").hide();
$("#top10flows").show();
$("#top10ep").hide();
top10view = "flows";
}

function showEndpoints() {
$("#top10dl").hide();
$("#top10flows").hide();
$("#top10ep").show();
top10view = "endpoints";
}

var rttGraph = new RttHistogram();
Expand All @@ -379,8 +421,13 @@ <h5 class="card-title"><i class='fa fa-exclamation'></i> Worst 10 RTT</h5>
if (tickCount % 5 == 0) {
updateHistogram();
updateWorst10();
updateTop10();
updateTop10Flows();
if (top10view == "circuits") {
updateTop10();
} else if (top10view == "flows") {
updateTop10Flows();
} else {
updateTop10Endpoints();
}
}

if (tickCount % 10 == 0) {
Expand Down
1 change: 1 addition & 0 deletions src/rust/lqosd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ fn handle_bus_requests(
}
BusRequest::TopFlows { n, flow_type } => throughput_tracker::top_flows(*n, *flow_type),
BusRequest::FlowsByIp(ip) => throughput_tracker::flows_by_ip(ip),
BusRequest::CurrentEndpointsByCountry => throughput_tracker::current_endpoints_by_country(),
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey};
use once_cell::sync::Lazy;
use std::sync::{Arc, Mutex};

struct TimeBuffer {
pub struct TimeBuffer {
buffer: Mutex<Vec<TimeEntry>>,
}

Expand Down Expand Up @@ -34,7 +34,7 @@ impl TimeBuffer {
buffer.push(entry);
}

fn country_summary(&self) -> Vec<(String, [u64; 2], [f32; 2])> {
pub fn country_summary(&self) -> Vec<(String, [u64; 2], [f32; 2])> {
let buffer = self.buffer.lock().unwrap();
let mut my_buffer = buffer
.iter()
Expand Down Expand Up @@ -114,7 +114,7 @@ impl TimeBuffer {
}
}

static RECENT_FLOWS: Lazy<TimeBuffer> = Lazy::new(|| TimeBuffer::new());
pub static RECENT_FLOWS: Lazy<TimeBuffer> = Lazy::new(|| TimeBuffer::new());

pub struct FinishedFlowAnalysis {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub use protocol::FlowProtocol;
use super::AsnId;
mod finished_flows;
pub use finished_flows::FinishedFlowAnalysis;
pub use finished_flows::RECENT_FLOWS;

static ANALYSIS: Lazy<FlowAnalysisSystem> = Lazy::new(|| FlowAnalysisSystem::new());

Expand Down Expand Up @@ -80,4 +81,4 @@ pub fn get_asn_name_and_country(ip: IpAddr) -> (String, String) {
}
}
(String::new(), String::new())
}
}
2 changes: 1 addition & 1 deletion src/rust/lqosd/src/throughput_tracker/flow_data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::sync::{
mpsc::{channel, Sender},
Arc,
};
pub(crate) use flow_analysis::{setup_flow_analysis, get_asn_name_and_country, FlowAnalysis};
pub(crate) use flow_analysis::{setup_flow_analysis, get_asn_name_and_country, FlowAnalysis, RECENT_FLOWS};


trait FlowbeeRecipient {
Expand Down
6 changes: 6 additions & 0 deletions src/rust/lqosd/src/throughput_tracker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,4 +651,10 @@ pub fn flows_by_ip(ip: &str) -> BusResponse {
return BusResponse::FlowsByIp(matching_flows);
}
BusResponse::Ack
}

/// Current endpoints by country
pub fn current_endpoints_by_country() -> BusResponse {
let summary = flow_data::RECENT_FLOWS.country_summary();
BusResponse::CurrentEndpointsByCountry(summary)
}
6 changes: 2 additions & 4 deletions src/rust/lqosd/src/throughput_tracker/tracking_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ impl ThroughputTracker {
pub(crate) fn apply_flow_data(
&self,
timeout_seconds: u64,
netflow_enabled: bool,
_netflow_enabled: bool,
sender: std::sync::mpsc::Sender<(FlowbeeKey, (FlowbeeData, FlowAnalysis))>,
) {
let self_cycle = self.cycle.load(std::sync::atomic::Ordering::Relaxed);
Expand Down Expand Up @@ -241,9 +241,7 @@ impl ThroughputTracker {
for key in expired_keys.iter() {
// Send it off to netperf for analysis if we are supporting doing so.
if let Some(d) = lock.get(&key) {
if netflow_enabled {
let _ = sender.send((key.clone(), (d.0.clone(), d.1.clone())));
}
let _ = sender.send((key.clone(), (d.0.clone(), d.1.clone())));
}
// Remove the flow from circulation
lock.remove(&key);
Expand Down

0 comments on commit 66a19c0

Please sign in to comment.