diff --git a/src/handlers/flow/test_flows.cpp b/src/handlers/flow/test_flows.cpp index a0aeded68..0f6ff3caa 100644 --- a/src/handlers/flow/test_flows.cpp +++ b/src/handlers/flow/test_flows.cpp @@ -1,6 +1,6 @@ #include -#include #include +#include #include "FlowInputStream.h" #include "FlowStreamHandler.h" @@ -399,7 +399,49 @@ TEST_CASE("Parse sflow stream with interfaces filter", "[sflow][flow]") CHECK(j["devices"]["192.168.0.11"]["interfaces"]["37"]["top_out_src_ips_and_port_bytes"][0]["estimate"] == nullptr); } -TEST_CASE("Parse netflow stream", "[netflow][flow]") +TEST_CASE("Parse netflow v5 stream", "[netflow][flow]") +{ + + FlowInputStream stream{"netflow-test"}; + stream.config_set("flow_type", "netflow"); + stream.config_set("pcap_file", "tests/fixtures/nf5.pcap"); + + visor::Config c; + auto stream_proxy = stream.add_event_proxy(c); + c.config_set("num_periods", 1); + FlowStreamHandler flow_handler{"flow-test", stream_proxy, &c}; + flow_handler.config_set("enable", visor::Configurable::StringList({"top_tos"})); + + flow_handler.start(); + stream.start(); + stream.stop(); + flow_handler.stop(); + + auto event_data = flow_handler.metrics()->bucket(0)->event_data_locked(); + + // confirmed with wireshark + CHECK(event_data.num_events->value() == 2); + CHECK(event_data.num_samples->value() == 2); + + nlohmann::json j; + flow_handler.metrics()->bucket(0)->to_json(j); + + CHECK(j["devices"]["10.0.0.2"]["records_flows"] == 3); + CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["cardinality"]["dst_ips_out"] == 2); + CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["cardinality"]["src_ips_in"] == 1); + CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["cardinality"]["dst_ports_out"] == 3); + CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["cardinality"]["src_ports_in"] == 2); + CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["top_in_src_ips_bytes"][0]["estimate"] == 1720); + CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["top_in_src_ips_packets"][0]["estimate"] == 33); + CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["top_in_dscp_bytes"][0]["estimate"] == 1220); + CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["top_in_dscp_bytes"][0]["name"] == "CS6"); + CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["top_in_ecn_packets"][0]["estimate"] == 33); + CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["top_in_ecn_packets"][0]["name"] == "Not-ECT"); + CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["top_in_dst_ports_bytes"][0]["estimate"] == 1128); + CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["top_in_dst_ports_bytes"][0]["name"] == "telnet"); +} + +TEST_CASE("Parse netflow v9 stream", "[netflow][flow]") { FlowInputStream stream{"netflow-test"}; diff --git a/src/inputs/flow/NetflowData.h b/src/inputs/flow/NetflowData.h index 0eb8424c5..a6e6bfc08 100644 --- a/src/inputs/flow/NetflowData.h +++ b/src/inputs/flow/NetflowData.h @@ -87,11 +87,11 @@ static bool process_netflow_v1(NFSample *sample) memcpy(flow_sample.dst_ip.data(), &nf1_flow->dest_ip, sizeof(uint32_t)); memcpy(flow_sample.nexthop_ip.data(), &nf1_flow->nexthop_ip, sizeof(uint32_t)); - flow_sample.src_port = nf1_flow->src_port; - flow_sample.dst_port = nf1_flow->dest_port; + flow_sample.src_port = be16toh(nf1_flow->src_port); + flow_sample.dst_port = be16toh(nf1_flow->dest_port); - flow_sample.flow_start = nf1_flow->flow_start; - flow_sample.flow_finish = nf1_flow->flow_finish; + flow_sample.flow_start = be32toh(nf1_flow->flow_start); + flow_sample.flow_finish = be32toh(nf1_flow->flow_finish); flow_sample.if_index_in = be16toh(nf1_flow->if_index_in); flow_sample.if_index_out = be16toh(nf1_flow->if_index_out); @@ -136,11 +136,11 @@ static bool process_netflow_v5(NFSample *sample) memcpy(flow_sample.dst_ip.data(), &nf5_flow->dest_ip, sizeof(uint32_t)); memcpy(flow_sample.nexthop_ip.data(), &nf5_flow->nexthop_ip, sizeof(uint32_t)); - flow_sample.src_port = nf5_flow->src_port; - flow_sample.dst_port = nf5_flow->dest_port; + flow_sample.src_port = be16toh(nf5_flow->src_port); + flow_sample.dst_port = be16toh(nf5_flow->dest_port); - flow_sample.flow_start = nf5_flow->flow_start; - flow_sample.flow_finish = nf5_flow->flow_finish; + flow_sample.flow_start = be32toh(nf5_flow->flow_start); + flow_sample.flow_finish = be32toh(nf5_flow->flow_finish); flow_sample.if_index_in = be16toh(nf5_flow->if_index_in); flow_sample.if_index_out = be16toh(nf5_flow->if_index_out); @@ -190,11 +190,11 @@ static bool process_netflow_v7(NFSample *sample) memcpy(flow_sample.dst_ip.data(), &nf7_flow->dest_ip, sizeof(uint32_t)); memcpy(flow_sample.nexthop_ip.data(), &nf7_flow->nexthop_ip, sizeof(uint32_t)); - flow_sample.src_port = nf7_flow->src_port; - flow_sample.dst_port = nf7_flow->dest_port; + flow_sample.src_port = be16toh(nf7_flow->src_port); + flow_sample.dst_port = be16toh(nf7_flow->dest_port); - flow_sample.flow_start = nf7_flow->flow_start; - flow_sample.flow_finish = nf7_flow->flow_finish; + flow_sample.flow_start = be32toh(nf7_flow->flow_start); + flow_sample.flow_finish = be32toh(nf7_flow->flow_finish); flow_sample.if_index_in = be16toh(nf7_flow->if_index_in); flow_sample.if_index_out = be16toh(nf7_flow->if_index_out); diff --git a/src/tests/fixtures/nf5.pcap b/src/tests/fixtures/nf5.pcap new file mode 100644 index 000000000..43d60baaa Binary files /dev/null and b/src/tests/fixtures/nf5.pcap differ