Skip to content

Commit

Permalink
refactor: trace aggregator cleanup
Browse files Browse the repository at this point in the history
Made the code a bit more DRY. This makes the code a bit more clear and
less error prone (previously we had a data race on tracers_ and sinks_
due to the lack of a lock, which is the lock that protects tracer/sink
registration, not the actual event dequeue).

This sets up a future where the trace aggregator can be refactored to
write out to a bigger in-memory cache before batching the IO writes.
  • Loading branch information
shuhaowu committed Mar 8, 2024
1 parent cce7512 commit c05c109
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 65 deletions.
3 changes: 3 additions & 0 deletions include/cactus_rt/tracing/trace_aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ class TraceAggregator {

void SetupCPUAffinityIfNecessary() const;

size_t TryDequeueOnceFromAllTracers(Trace& trace) noexcept;
void WriteTrace(const Trace& trace) noexcept;

Trace CreateProcessDescriptorPacket() const;
Trace CreateThreadDescriptorPacket(const ThreadTracer& thread_tracer) const;
void AddTrackEventPacketToTrace(Trace& trace, const ThreadTracer& thread_tracer, const TrackEventInternal& track_event_internal) const;
Expand Down
129 changes: 65 additions & 64 deletions src/cactus_rt/tracing/trace_aggregator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,83 +99,49 @@ quill::Logger* TraceAggregator::Logger() noexcept {
void TraceAggregator::Run() {
SetupCPUAffinityIfNecessary();

// TODO: major refactor required

while (!StopRequested()) {
int tracers_with_events = 0;
{
// This lock is needed as when we are writing packets into sinks, we don't
// want new thread tracers and sinks to be created or registered as that
// can cause potential data loss. This lock is NOT for dequeueing from the
// thread tracer queues, as expected.

const std::scoped_lock lock(mutex_);
Trace trace;

for (auto& tracer : tracers_) {
TrackEventInternal event;
if (!tracer->queue_.try_dequeue(event)) {
// No event in this queue.
continue;
}

tracers_with_events++;
AddTrackEventPacketToTrace(trace, *tracer, event);
}

if (tracers_with_events > 0) {
for (auto& sink : sinks_) {
if (!sink->Write(trace)) {
LOG_WARNING_LIMIT(std::chrono::milliseconds(5000), Logger(), "failed to write trace data to sink, data may be corrupted");
}
}
}
}

// TODO: check if there are any dropped packets since last time we checked

if (tracers_with_events == 0) {
Trace trace;
auto num_events = TryDequeueOnceFromAllTracers(trace);

// TODO: what we should do here is:
// 1. While the dequeue is happening, we should check each tracer for errors/full queue problems.
// 2. Instead of writing the trace directly, we should put it into a bigger
// memory queue (bigger than the tracer queue), which would be written out
// in another thread. That should reduce the bottlenecks for this loop as
// otherwise this loop would be blocked by the writer.

if (num_events > 0) {
WriteTrace(trace);
} else {
// No events from any tracers! We can sleep and let the queues accumulate a bit.
// TODO: customize this sleep period
std::this_thread::sleep_for(10ms);
}
// If there are events then we want to write as fast as possible.
}

// TODO: likely need to take a large lock here when flushing?

// Need to empty all the remaining events in the queue
// This code is a bit redundant, but works for now. In the long term, this
// whole thread probably has to be rewritten to better optimize for throughput
// and possibly ordering problems.
// When the trace aggregator is requested to stop, there may still be some
// packets in the tracer queues. We aim to write out as much as possible.
// TODO: right now we accumulate a giant Trace message and write it all
// together. In the future there should be batch writing.
bool has_events = false;
Trace trace;
// together. In the future there should be batch writing, which may be more
// efficient.
Trace trace;
size_t total_events = 0;
while (true) {
int tracers_with_events = 0;

for (auto& tracer : tracers_) {
TrackEventInternal event;
if (!tracer->queue_.try_dequeue(event)) {
continue;
}

tracers_with_events++;
AddTrackEventPacketToTrace(trace, *tracer, event);
has_events = true;
}

if (tracers_with_events == 0) {
auto num_events = TryDequeueOnceFromAllTracers(trace);
total_events += num_events;

// This is a busy loop that pops from the queues. As soon as there are no
// more events being emitted, it quits. If the queues are being further
// written to, the data would be lost and it is likely OK.
//
// TODO: have a timeout that guarantees the trace aggregator will stop after X seconds.
if (num_events == 0) {
break;
}
}

if (has_events) {
for (auto& sink : sinks_) {
// TODO: errors
sink->Write(trace);
}
if (total_events > 0) {
WriteTrace(trace);
}
}

Expand All @@ -202,6 +168,41 @@ void TraceAggregator::SetupCPUAffinityIfNecessary() const {
throw std::runtime_error{std::string("cannot set affinity for trace aggregator: ") + std::strerror(errno)};
}

size_t TraceAggregator::TryDequeueOnceFromAllTracers(Trace& trace) noexcept {
// This lock is needed because we are accessing the tracer_, which can race
// with other threads that are registering a thread tracer. This is NOT for
// dequeuing from the thread_tracer queues.
const std::scoped_lock lock(mutex_);
size_t num_events = 0;

for (auto& tracer : tracers_) {
TrackEventInternal event;
if (!tracer->queue_.try_dequeue(event)) {
// No event in this queue.
continue;
}

num_events++;
AddTrackEventPacketToTrace(trace, *tracer, event);
}

return num_events;
}

void TraceAggregator::WriteTrace(const Trace& trace) noexcept {
// This lock is needed because we are accessing the sinks_, which can race
// with other threads that are registering a sink. This is NOT for dequeuing
// from the thread_tracer queues.
const std::scoped_lock lock(mutex_);

// TODO: better handle error by maybe emitting an error signal and calling an error callback?
for (auto& sink : sinks_) {
if (!sink->Write(trace)) {
LOG_WARNING_LIMIT(std::chrono::milliseconds(5000), Logger(), "failed to write trace data to sink, data may be corrupted");
}
}
}

Trace TraceAggregator::CreateProcessDescriptorPacket() const {
// NOLINTBEGIN(clang-analyzer-cplusplus.NewDeleteLeaks)
Trace trace;
Expand Down
2 changes: 1 addition & 1 deletion tests/tracing/tracing_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ void BM_ThreadTracerWithSpanEnabledWithCategory(benchmark::State& state) {
cactus_rt::tracing::EnableTracing();
cactus_rt::tracing::ThreadTracer thread_tracer("benchmark_tracer", kQueueSize);
for (auto _ : state) {
thread_tracer.WithSpan("EventName", "category");
benchmark::DoNotOptimize(thread_tracer.WithSpan("EventName", "category"));
}
}
BENCHMARK(BM_ThreadTracerWithSpanEnabledWithCategory)->Iterations(kQueueSize / 2);
Expand Down

0 comments on commit c05c109

Please sign in to comment.