Skip to content

Commit

Permalink
Fix netflow port numbers and start/finish (#715)
Browse files Browse the repository at this point in the history
* Fix netflow port numbers and start/finish

* Add unit tests for netflow v5
  • Loading branch information
leoparente authored Apr 8, 2024
1 parent 9301d74 commit 5695659
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 14 deletions.
46 changes: 44 additions & 2 deletions src/handlers/flow/test_flows.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include <catch2/catch_test_macros.hpp>
#include <catch2/matchers/catch_matchers.hpp>
#include <catch2/catch_test_visor.hpp>
#include <catch2/matchers/catch_matchers.hpp>

#include "FlowInputStream.h"
#include "FlowStreamHandler.h"
Expand Down Expand Up @@ -399,7 +399,49 @@ TEST_CASE("Parse sflow stream with interfaces filter", "[sflow][flow]")
CHECK(j["devices"]["192.168.0.11"]["interfaces"]["37"]["top_out_src_ips_and_port_bytes"][0]["estimate"] == nullptr);
}

TEST_CASE("Parse netflow stream", "[netflow][flow]")
TEST_CASE("Parse netflow v5 stream", "[netflow][flow]")
{

FlowInputStream stream{"netflow-test"};
stream.config_set("flow_type", "netflow");
stream.config_set("pcap_file", "tests/fixtures/nf5.pcap");

visor::Config c;
auto stream_proxy = stream.add_event_proxy(c);
c.config_set<uint64_t>("num_periods", 1);
FlowStreamHandler flow_handler{"flow-test", stream_proxy, &c};
flow_handler.config_set<visor::Configurable::StringList>("enable", visor::Configurable::StringList({"top_tos"}));

flow_handler.start();
stream.start();
stream.stop();
flow_handler.stop();

auto event_data = flow_handler.metrics()->bucket(0)->event_data_locked();

// confirmed with wireshark
CHECK(event_data.num_events->value() == 2);
CHECK(event_data.num_samples->value() == 2);

nlohmann::json j;
flow_handler.metrics()->bucket(0)->to_json(j);

CHECK(j["devices"]["10.0.0.2"]["records_flows"] == 3);
CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["cardinality"]["dst_ips_out"] == 2);
CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["cardinality"]["src_ips_in"] == 1);
CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["cardinality"]["dst_ports_out"] == 3);
CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["cardinality"]["src_ports_in"] == 2);
CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["top_in_src_ips_bytes"][0]["estimate"] == 1720);
CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["top_in_src_ips_packets"][0]["estimate"] == 33);
CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["top_in_dscp_bytes"][0]["estimate"] == 1220);
CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["top_in_dscp_bytes"][0]["name"] == "CS6");
CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["top_in_ecn_packets"][0]["estimate"] == 33);
CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["top_in_ecn_packets"][0]["name"] == "Not-ECT");
CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["top_in_dst_ports_bytes"][0]["estimate"] == 1128);
CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["top_in_dst_ports_bytes"][0]["name"] == "telnet");
}

TEST_CASE("Parse netflow v9 stream", "[netflow][flow]")
{

FlowInputStream stream{"netflow-test"};
Expand Down
24 changes: 12 additions & 12 deletions src/inputs/flow/NetflowData.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,11 @@ static bool process_netflow_v1(NFSample *sample)
memcpy(flow_sample.dst_ip.data(), &nf1_flow->dest_ip, sizeof(uint32_t));
memcpy(flow_sample.nexthop_ip.data(), &nf1_flow->nexthop_ip, sizeof(uint32_t));

flow_sample.src_port = nf1_flow->src_port;
flow_sample.dst_port = nf1_flow->dest_port;
flow_sample.src_port = be16toh(nf1_flow->src_port);
flow_sample.dst_port = be16toh(nf1_flow->dest_port);

flow_sample.flow_start = nf1_flow->flow_start;
flow_sample.flow_finish = nf1_flow->flow_finish;
flow_sample.flow_start = be32toh(nf1_flow->flow_start);
flow_sample.flow_finish = be32toh(nf1_flow->flow_finish);

flow_sample.if_index_in = be16toh(nf1_flow->if_index_in);
flow_sample.if_index_out = be16toh(nf1_flow->if_index_out);
Expand Down Expand Up @@ -136,11 +136,11 @@ static bool process_netflow_v5(NFSample *sample)
memcpy(flow_sample.dst_ip.data(), &nf5_flow->dest_ip, sizeof(uint32_t));
memcpy(flow_sample.nexthop_ip.data(), &nf5_flow->nexthop_ip, sizeof(uint32_t));

flow_sample.src_port = nf5_flow->src_port;
flow_sample.dst_port = nf5_flow->dest_port;
flow_sample.src_port = be16toh(nf5_flow->src_port);
flow_sample.dst_port = be16toh(nf5_flow->dest_port);

flow_sample.flow_start = nf5_flow->flow_start;
flow_sample.flow_finish = nf5_flow->flow_finish;
flow_sample.flow_start = be32toh(nf5_flow->flow_start);
flow_sample.flow_finish = be32toh(nf5_flow->flow_finish);

flow_sample.if_index_in = be16toh(nf5_flow->if_index_in);
flow_sample.if_index_out = be16toh(nf5_flow->if_index_out);
Expand Down Expand Up @@ -190,11 +190,11 @@ static bool process_netflow_v7(NFSample *sample)
memcpy(flow_sample.dst_ip.data(), &nf7_flow->dest_ip, sizeof(uint32_t));
memcpy(flow_sample.nexthop_ip.data(), &nf7_flow->nexthop_ip, sizeof(uint32_t));

flow_sample.src_port = nf7_flow->src_port;
flow_sample.dst_port = nf7_flow->dest_port;
flow_sample.src_port = be16toh(nf7_flow->src_port);
flow_sample.dst_port = be16toh(nf7_flow->dest_port);

flow_sample.flow_start = nf7_flow->flow_start;
flow_sample.flow_finish = nf7_flow->flow_finish;
flow_sample.flow_start = be32toh(nf7_flow->flow_start);
flow_sample.flow_finish = be32toh(nf7_flow->flow_finish);

flow_sample.if_index_in = be16toh(nf7_flow->if_index_in);
flow_sample.if_index_out = be16toh(nf7_flow->if_index_out);
Expand Down
Binary file added src/tests/fixtures/nf5.pcap
Binary file not shown.

0 comments on commit 5695659

Please sign in to comment.