Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ROX-25218: Instruct Collector to not aggregate some subnets #1639

Merged
merged 8 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 28 additions & 12 deletions collector/lib/CollectorConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ BoolEnvVar network_drop_ignored("ROX_NETWORK_DROP_IGNORED", true);
// The default value contains link-local addresses for IPv4 (RFC3927) and IPv6 (RFC2462)
StringListEnvVar ignored_networks("ROX_IGNORE_NETWORKS", std::vector<std::string>({"169.254.0.0/16", "fe80::/10"}));

// Connection endpoints matching a network prefix listed here will never be aggregated.
StringListEnvVar non_aggregated_networks("ROX_NON_AGGREGATED_NETWORKS", std::vector<std::string>());

// If true, set curl to be verbose, adding further logging that might be useful for debugging.
BoolEnvVar set_curl_verbose("ROX_COLLECTOR_SET_CURL_VERBOSE", false);

Expand Down Expand Up @@ -170,20 +173,33 @@ void CollectorConfig::InitCollectorConfig(CollectorArgs* args) {
ignored_l4proto_port_pairs_ = kIgnoredL4ProtoPortPairs;
}

std::for_each(ignored_networks.value().begin(), ignored_networks.value().end(),
[&ignored_networks = this->ignored_networks_](const std::string& str) {
if (str.empty())
return;
for (const std::string& str : ignored_networks.value()) {
if (str.empty())
continue;

std::optional<IPNet> net = IPNet::parse(str);

if (net) {
CLOG(INFO) << "Ignore network : " << *net;
ignored_networks_.emplace_back(std::move(*net));
} else {
CLOG(ERROR) << "Invalid network in ROX_IGNORE_NETWORKS : " << str;
}
}

for (const std::string& str : non_aggregated_networks.value()) {
if (str.empty())
continue;

std::optional<IPNet> net = IPNet::parse(str);
std::optional<IPNet> net = IPNet::parse(str);

if (net) {
CLOG(INFO) << "Ignore network : " << *net;
ignored_networks.emplace_back(std::move(*net));
} else {
CLOG(ERROR) << "Invalid network in ROX_IGNORE_NETWORKS : " << str;
}
});
if (net) {
CLOG(INFO) << "Detail network : " << *net;
non_aggregated_networks_.emplace_back(std::move(*net));
} else {
CLOG(ERROR) << "Invalid network in ROX_NON_AGGREGATED_NETWORKS : " << str;
}
}

if (set_curl_verbose) {
curl_verbose_ = true;
Expand Down
2 changes: 2 additions & 0 deletions collector/lib/CollectorConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class CollectorConfig {
bool DisableNetworkFlows() const { return disable_network_flows_; }
const UnorderedSet<L4ProtoPortPair>& IgnoredL4ProtoPortPairs() const { return ignored_l4proto_port_pairs_; }
const std::vector<IPNet>& IgnoredNetworks() const { return ignored_networks_; }
const std::vector<IPNet>& NonAggregatedNetworks() const { return non_aggregated_networks_; }
bool CurlVerbose() const { return curl_verbose_; }
bool EnableAfterglow() const { return enable_afterglow_; }
bool IsCoreDumpEnabled() const;
Expand Down Expand Up @@ -100,6 +101,7 @@ class CollectorConfig {
bool scrape_listen_endpoints_ = false;
UnorderedSet<L4ProtoPortPair> ignored_l4proto_port_pairs_;
std::vector<IPNet> ignored_networks_;
std::vector<IPNet> non_aggregated_networks_;
bool curl_verbose_ = false;

HostConfig host_config_;
Expand Down
1 change: 1 addition & 0 deletions collector/lib/CollectorService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ void CollectorService::RunForever() {
UnorderedSet<L4ProtoPortPair> ignored_l4proto_port_pairs(config_.IgnoredL4ProtoPortPairs());
conn_tracker->UpdateIgnoredL4ProtoPortPairs(std::move(ignored_l4proto_port_pairs));
conn_tracker->UpdateIgnoredNetworks(config_.IgnoredNetworks());
conn_tracker->UpdateNonAggregatedNetworks(config_.NonAggregatedNetworks());
conn_tracker->EnableExternalIPs(config_.EnableExternalIPs());

auto network_connection_info_service_comm = std::make_shared<NetworkConnectionInfoServiceComm>(config_.Hostname(), config_.grpc_channel);
Expand Down
15 changes: 13 additions & 2 deletions collector/lib/ConnTracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,18 @@ IPNet ConnectionTracker::NormalizeAddressNoLock(const Address& address) const {
}

bool private_addr = !address.IsPublic();
bool do_not_aggregate_addr = !non_aggregated_networks_.Find(address).IsNull();

// We want to keep private addresses and explicitely requested ones.
bool keep_addr = private_addr || do_not_aggregate_addr;

const bool* known_private_networks_exists = Lookup(known_private_networks_exists_, address.family());
if (private_addr && (known_private_networks_exists && !*known_private_networks_exists)) {
if (keep_addr && (known_private_networks_exists && !*known_private_networks_exists)) {
return IPNet(address, 0, true);
}

const auto& network = known_ip_networks_.Find(address);
if (private_addr || Contains(known_public_ips_, address)) {
if (keep_addr || Contains(known_public_ips_, address)) {
return IPNet(address, network.bits(), true);
}

Expand Down Expand Up @@ -330,6 +335,12 @@ void ConnectionTracker::UpdateIgnoredNetworks(const std::vector<IPNet>& network_
}
}

void ConnectionTracker::UpdateNonAggregatedNetworks(const std::vector<IPNet>& network_list) {
WITH_LOCK(mutex_) {
non_aggregated_networks_ = NRadixTree(network_list);
}
}

// Increment the stat counter matching the connection's characteristics
inline void ConnectionTracker::IncrementConnectionStats(Connection conn, ConnectionTracker::Stats& stats) const {
auto& direction = conn.is_server() ? stats.inbound : stats.outbound;
Expand Down
2 changes: 2 additions & 0 deletions collector/lib/ConnTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ class ConnectionTracker {
void EnableExternalIPs(bool enable) { enable_external_ips_ = enable; }
void UpdateIgnoredL4ProtoPortPairs(UnorderedSet<L4ProtoPortPair>&& ignored_l4proto_port_pairs);
void UpdateIgnoredNetworks(const std::vector<IPNet>& network_list);
void UpdateNonAggregatedNetworks(const std::vector<IPNet>& network_list);

// Emplace a connection into the state ConnMap, or update its timestamp if the supplied timestamp is more recent
// than the stored one.
Expand Down Expand Up @@ -200,6 +201,7 @@ class ConnectionTracker {
UnorderedMap<Address::Family, bool> known_private_networks_exists_;
UnorderedSet<L4ProtoPortPair> ignored_l4proto_port_pairs_;
NRadixTree ignored_networks_;
NRadixTree non_aggregated_networks_;

Stats inserted_connections_counters_ = {};
};
Expand Down
24 changes: 24 additions & 0 deletions collector/test/ConnTrackerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,30 @@ TEST(ConnTrackerTest, TestUpdateIgnoredNetworks) {
EXPECT_TRUE(tracker.FetchConnState().empty());
}

TEST(ConnTrackerTest, TestUpdateNonAggregatedNetworks) {
Endpoint a(Address(192, 168, 1, 10), 9999);
Endpoint b(Address(245, 1, 1, 1), 80);

Connection conn1("xyz", a, b, L4Proto::TCP, false);

int64_t time_micros = 1000;

Connection conn_aggregated("xyz", Endpoint(IPNet(Address(), 0, true), 0), Endpoint(IPNet(Address(255, 255, 255, 255), 0), 80), L4Proto::TCP, false);
Connection conn_detailed("xyz", Endpoint(IPNet(Address(), 0, true), 0), Endpoint(IPNet(Address(245, 1, 1, 1), 0, true), 80), L4Proto::TCP, false);

ConnectionTracker tracker;

tracker.Update({conn1}, {}, time_micros);

auto state = tracker.FetchConnState(true);
EXPECT_THAT(state, UnorderedElementsAre(std::make_pair(conn_aggregated, ConnStatus(time_micros, true))));

tracker.UpdateNonAggregatedNetworks({IPNet(Address(240, 0, 0, 0), 4)});

state = tracker.FetchConnState(true);
EXPECT_THAT(state, UnorderedElementsAre(std::make_pair(conn_detailed, ConnStatus(time_micros, true))));
}

TEST(ConnTrackerTest, TestUpdateNormalized) {
Endpoint a(Address(192, 168, 0, 1), 80);
Endpoint b(Address(192, 168, 1, 10), 9999);
Expand Down
Loading