Skip to content

Commit

Permalink
[release-7.3] Log all incoming connections (#11713)
Browse files Browse the repository at this point in the history
* Log all incoming connections

* Address review comments

* Update FlowTransport.actor.cpp

* Update FlowTransport.actor.cpp

* Refactor

* Format

* initialize for simulation
  • Loading branch information
vishesh authored Oct 17, 2024
1 parent 12c39e7 commit 2b23111
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 3 deletions.
126 changes: 124 additions & 2 deletions fdbrpc/FlowTransport.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@

#include "fdbrpc/FlowTransport.h"
#include "flow/Arena.h"
#include "flow/IThreadPool.h"
#include "flow/Knobs.h"
#include "flow/NetworkAddress.h"
#include "flow/network.h"

#include <cstdint>
#include <fstream>
#include <string>
#include <unordered_map>
#if VALGRIND
#include <memcheck.h>
Expand Down Expand Up @@ -352,8 +357,112 @@ class TransportData {
Future<Void> publicKeyFileWatch;

std::unordered_map<Standalone<StringRef>, PublicKey> publicKeys;

struct ConnectionHistoryEntry {
int64_t time;
NetworkAddress addr;
bool failed;
};
std::deque<ConnectionHistoryEntry> connectionHistory;
Future<Void> connectionHistoryLoggerF;
Reference<IThreadPool> connectionLogWriterThread;
};

struct ConnectionLogWriter : IThreadPoolReceiver {
const std::string baseDir;
std::string fileName;
std::fstream file;

ConnectionLogWriter(const std::string baseDir) : baseDir(baseDir) {}

virtual ~ConnectionLogWriter() {
if (file.is_open())
file.close();
}

struct AppendAction : TypedAction<ConnectionLogWriter, AppendAction> {
std::string localAddr;
std::deque<TransportData::ConnectionHistoryEntry> entries;
AppendAction(std::string localAddr, std::deque<TransportData::ConnectionHistoryEntry>&& entries)
: localAddr(localAddr), entries(std::move(entries)) {}

double getTimeEstimate() const { return 2; }
};

std::string newFileName() const { return baseDir + "fdb-connection-log-" + time_str() + ".csv"; }

void init() { fileName = newFileName(); }

std::string time_str() const { return std::to_string(now()); }

void openOrRoll() {
if (fileName.empty()) {
fileName = newFileName();
}

if (!file.is_open()) {
TraceEvent("OpenConnectionLog").detail("FileName", fileName);
file = std::fstream(fileName, std::ios::in | std::ios::out | std::ios::app);
}

if (!file.is_open()) {
TraceEvent(SevError, "ErrorOpenConnectionLog").detail("FileName", fileName);
throw io_error();
}

if (file.tellg() > 100 * 1024 * 1024 /* 100 MB */) {
file.close();
fileName = newFileName();
TraceEvent("RollConnectionLog").detail("FileName", fileName);
openOrRoll();
}
}

void action(AppendAction& a) {
openOrRoll();

std::string output;
for (const auto& entry : a.entries) {
output += std::to_string(entry.time) + ",";
output += a.localAddr + ",";
output += entry.failed ? "failed," : "success,";
output += entry.addr.toString() + "\n";
}
file << output;
file.flush();
}
};

ACTOR Future<Void> connectionHistoryLogger(TransportData* self) {
if (!FLOW_KNOBS->LOG_CONNECTION_ATTEMPTS_ENABLED) {
return Void();
}

state Future<Void> next = Void();

// One thread ensures async serialized execution on the log file.
if (g_network->isSimulated()) {
self->connectionLogWriterThread = Reference<IThreadPool>(new DummyThreadPool());
} else {
self->connectionLogWriterThread = createGenericThreadPool();
}

self->connectionLogWriterThread->addThread(new ConnectionLogWriter(FLOW_KNOBS->CONNECTION_LOG_DIRECTORY));
loop {
wait(next);
next = delay(FLOW_KNOBS->LOG_CONNECTION_INTERVAL_SECS);
if (self->connectionHistory.size() == 0) {
continue;
}
std::string localAddr = FlowTransport::getGlobalLocalAddress().toString();
auto action = new ConnectionLogWriter::AppendAction(localAddr, std::move(self->connectionHistory));
ASSERT(action != nullptr);
self->connectionLogWriterThread->post(action);
wait(delay(1));
ASSERT(self->connectionHistory.size() == 0);
}
}

ACTOR Future<Void> pingLatencyLogger(TransportData* self) {
state NetworkAddress lastAddress = NetworkAddress();
loop {
Expand Down Expand Up @@ -422,6 +531,8 @@ TransportData::TransportData(uint64_t transportId, int maxWellKnownEndpoints, IP
allowList(allowList == nullptr ? IPAllowList() : *allowList) {
degraded = makeReference<AsyncVar<bool>>(false);
pingLogger = pingLatencyLogger(this);

connectionHistoryLoggerF = connectionHistoryLogger(this);
}

#define CONNECT_PACKET_V0 0x0FDB00A444020001LL
Expand Down Expand Up @@ -1492,10 +1603,17 @@ ACTOR static Future<Void> connectionReader(TransportData* transport,
}

ACTOR static Future<Void> connectionIncoming(TransportData* self, Reference<IConnection> conn) {
state TransportData::ConnectionHistoryEntry entry;
entry.time = now();
entry.addr = conn->getPeerAddress();
try {
wait(conn->acceptHandshake());
state Promise<Reference<Peer>> onConnected;
state Future<Void> reader = connectionReader(self, conn, Reference<Peer>(), onConnected);
if (FLOW_KNOBS->LOG_CONNECTION_ATTEMPTS_ENABLED) {
entry.failed = false;
self->connectionHistory.push_back(entry);
}
choose {
when(wait(reader)) {
ASSERT(false);
Expand All @@ -1509,17 +1627,21 @@ ACTOR static Future<Void> connectionIncoming(TransportData* self, Reference<ICon
throw timed_out();
}
}
return Void();
} catch (Error& e) {
if (e.code() != error_code_actor_cancelled) {
TraceEvent("IncomingConnectionError", conn->getDebugID())
.errorUnsuppressed(e)
.suppressFor(1.0)
.detail("FromAddress", conn->getPeerAddress());
if (FLOW_KNOBS->LOG_CONNECTION_ATTEMPTS_ENABLED) {
entry.failed = true;
self->connectionHistory.push_back(entry);
}
}
conn->close();
return Void();
}

return Void();
}

ACTOR static Future<Void> listen(TransportData* self, NetworkAddress listenAddr) {
Expand Down
5 changes: 4 additions & 1 deletion flow/Knobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
init( INCOMPATIBLE_PEER_DELAY_BEFORE_LOGGING, 5.0 );
init( PING_LOGGING_INTERVAL, 3.0 );
init( PING_SKETCH_ACCURACY, 0.1 );
init( LOG_CONNECTION_ATTEMPTS_ENABLED, false );
init( CONNECTION_LOG_DIRECTORY, "" );
init( LOG_CONNECTION_INTERVAL_SECS, 3 );

init( TLS_CERT_REFRESH_DELAY_SECONDS, 12*60*60 );
init( TLS_SERVER_CONNECTION_THROTTLE_TIMEOUT, 9.0 );
Expand Down Expand Up @@ -202,7 +205,7 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
//IAsyncFile
init( INCREMENTAL_DELETE_TRUNCATE_AMOUNT, 5e8 ); //500MB
init( INCREMENTAL_DELETE_INTERVAL, 1.0 ); //every 1 second

//Net2 and FlowTransport
init( MIN_COALESCE_DELAY, 10e-6 ); if( randomize && BUGGIFY ) MIN_COALESCE_DELAY = 0;
init( MAX_COALESCE_DELAY, 20e-6 ); if( randomize && BUGGIFY ) MAX_COALESCE_DELAY = 0;
Expand Down
3 changes: 3 additions & 0 deletions flow/include/flow/Knobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ class FlowKnobs : public KnobsImpl<FlowKnobs> {
double INCOMPATIBLE_PEER_DELAY_BEFORE_LOGGING;
double PING_LOGGING_INTERVAL;
double PING_SKETCH_ACCURACY;
bool LOG_CONNECTION_ATTEMPTS_ENABLED;
int LOG_CONNECTION_INTERVAL_SECS;
std::string CONNECTION_LOG_DIRECTORY;

int TLS_CERT_REFRESH_DELAY_SECONDS;
double TLS_SERVER_CONNECTION_THROTTLE_TIMEOUT;
Expand Down

0 comments on commit 2b23111

Please sign in to comment.