Skip to content

Commit

Permalink
Implemented string interning in trace aggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
shuhaowu committed Mar 16, 2024
1 parent 651aa06 commit f1bf8ec
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 58 deletions.
72 changes: 23 additions & 49 deletions examples/tracing_protos_example/synthetic_events.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@

#include "trace.pb.h"

using cactus_tracing::vendor::perfetto::protos::InternedData;
using cactus_tracing::vendor::perfetto::protos::ProcessDescriptor;
using cactus_tracing::vendor::perfetto::protos::ThreadDescriptor;
using cactus_tracing::vendor::perfetto::protos::Trace;
using cactus_tracing::vendor::perfetto::protos::TracePacket_SequenceFlags_SEQ_INCREMENTAL_STATE_CLEARED;
using cactus_tracing::vendor::perfetto::protos::TracePacket_SequenceFlags_SEQ_NEEDS_INCREMENTAL_STATE;
using cactus_tracing::vendor::perfetto::protos::TrackDescriptor;
using cactus_tracing::vendor::perfetto::protos::TrackEvent;
using cactus_tracing::vendor::perfetto::protos::TrackEvent_Type_TYPE_INSTANT;
Expand Down Expand Up @@ -70,16 +72,22 @@ int main() {
auto* packet3 = trace3.add_packet();
packet3->set_timestamp(200);

auto* interned_data1 = new InternedData();
auto* event_name = interned_data1->add_event_names();
event_name->set_iid(1);
event_name->set_name("My special parent");
packet3->set_allocated_interned_data(interned_data1);

auto* track_event1 = new TrackEvent();
track_event1->set_type(TrackEvent_Type_TYPE_SLICE_BEGIN);
track_event1->set_track_uuid(thread_uuid);
track_event1->set_name("My special parent");
track_event1->set_name_iid(1);
packet3->set_allocated_track_event(track_event1);

packet3->set_trusted_packet_sequence_id(trusted_packet_sequence_id);
packet3->set_previous_packet_dropped(true);
packet3->set_first_packet_on_sequence(true);
packet3->set_sequence_flags(TracePacket_SequenceFlags_SEQ_INCREMENTAL_STATE_CLEARED);
packet3->set_sequence_flags(TracePacket_SequenceFlags_SEQ_INCREMENTAL_STATE_CLEARED | TracePacket_SequenceFlags_SEQ_NEEDS_INCREMENTAL_STATE);

Trace trace4;
auto* packet4 = trace4.add_packet();
Expand Down Expand Up @@ -127,66 +135,34 @@ int main() {

packet7->set_trusted_packet_sequence_id(trusted_packet_sequence_id);

// Packets complete, write it into a file!

// Now let's simulate a corruption in the middle of another slide. The idea is:
// 1. There is supposed to be three trace packets between 330 - 430 and 450 - 1000, 1050 - 2000
// 2. The slice begin at 330 got emitted, the slice end at 430 is lost, the slice begin at 450 is lost.
// 3. We detect packet loss occurred, and hence emit a packet loss marker
// 4. Then we emit the slice end at 1000? Could also try emitting 1050.
Trace trace8;
auto* packet8 = trace8.add_packet();
packet8->set_timestamp(330);
packet8->set_timestamp(350);

auto* track_event8 = new TrackEvent();
track_event8->set_type(TrackEvent_Type_TYPE_SLICE_BEGIN);
track_event8->set_track_uuid(thread_uuid);
track_event8->set_name("before packet loss");
track_event8->set_name_iid(1);
packet8->set_allocated_track_event(track_event8);
packet8->set_sequence_flags(TracePacket_SequenceFlags_SEQ_NEEDS_INCREMENTAL_STATE);

packet8->set_trusted_packet_sequence_id(trusted_packet_sequence_id);

Trace trace10;
auto* packet10 = trace10.add_packet();
packet10->set_timestamp(1000);

auto* track_event10 = new TrackEvent();
track_event10->set_type(TrackEvent_Type_TYPE_SLICE_END);
track_event10->set_track_uuid(thread_uuid);
packet10->set_allocated_track_event(track_event10);

packet10->set_trusted_packet_sequence_id(trusted_packet_sequence_id);
packet10->set_first_packet_on_sequence(true);
packet10->set_previous_packet_dropped(true);
packet10->set_sequence_flags(TracePacket_SequenceFlags_SEQ_INCREMENTAL_STATE_CLEARED);

Trace trace11;
auto* packet11 = trace11.add_packet();
packet11->set_timestamp(1050);

auto* track_event11 = new TrackEvent();
track_event11->set_type(TrackEvent_Type_TYPE_SLICE_BEGIN);
track_event11->set_track_uuid(thread_uuid);
track_event11->set_name("After packet loss");
packet11->set_allocated_track_event(track_event11);

packet11->set_trusted_packet_sequence_id(trusted_packet_sequence_id);

Trace trace12;
auto* packet12 = trace12.add_packet();
packet12->set_timestamp(2000);
Trace trace9;
auto* packet9 = trace9.add_packet();
packet9->set_timestamp(500);

auto* track_event12 = new TrackEvent();
track_event12->set_type(TrackEvent_Type_TYPE_SLICE_END);
track_event12->set_track_uuid(thread_uuid);
packet12->set_allocated_track_event(track_event12);
auto* track_event9 = new TrackEvent();
track_event9->set_type(TrackEvent_Type_TYPE_SLICE_END);
track_event9->set_track_uuid(thread_uuid);
packet9->set_allocated_track_event(track_event9);

packet12->set_trusted_packet_sequence_id(trusted_packet_sequence_id);
packet9->set_trusted_packet_sequence_id(trusted_packet_sequence_id);

{
std::fstream output("build/direct_proto_serialization.perfetto-trace", std::ios::out | std::ios::trunc | std::ios::binary);

const std::array<Trace*, 11> traces{
const std::array<Trace*, 9> traces{
&trace1,
&trace2,
&trace3,
Expand All @@ -195,9 +171,7 @@ int main() {
&trace6,
&trace7,
&trace8,
&trace10,
&trace11,
&trace12,
&trace9,
};

for (const auto* trace : traces) {
Expand Down
75 changes: 72 additions & 3 deletions src/cactus_rt/tracing/trace_aggregator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,21 @@
#include <cstring>
#include <string>

using cactus_tracing::vendor::perfetto::protos::InternedData;
using cactus_tracing::vendor::perfetto::protos::ProcessDescriptor;
using cactus_tracing::vendor::perfetto::protos::ThreadDescriptor;
using cactus_tracing::vendor::perfetto::protos::Trace;
using cactus_tracing::vendor::perfetto::protos::TracePacket_SequenceFlags_SEQ_INCREMENTAL_STATE_CLEARED;
using cactus_tracing::vendor::perfetto::protos::TracePacket_SequenceFlags_SEQ_NEEDS_INCREMENTAL_STATE;
using cactus_tracing::vendor::perfetto::protos::TrackDescriptor;
using cactus_tracing::vendor::perfetto::protos::TrackEvent;

using namespace std::chrono_literals;

namespace {
constexpr size_t kMaxInternedStrings = 10000;
}

namespace cactus_rt::tracing {
TraceAggregator::TraceAggregator(std::string process_name, std::vector<size_t> cpu_affinity)
: process_name_(process_name),
Expand Down Expand Up @@ -251,33 +257,96 @@ void TraceAggregator::AddTrackEventPacketToTrace(
const ThreadTracer& thread_tracer,
const TrackEventInternal& track_event_internal
) {
uint32_t packet_sequence_flag = 0;
InternedData* interned_data = nullptr;

// NOLINTBEGIN(clang-analyzer-cplusplus.NewDeleteLeaks)
// Create the base backet
auto* packet = trace.add_packet();
packet->set_timestamp(track_event_internal.timestamp);

// Create the track event
auto* track_event = new TrackEvent();
track_event->set_track_uuid(thread_tracer.track_uuid_);
track_event->set_type(track_event_internal.type);

// Deal with the event name
if (track_event_internal.name != nullptr) {
track_event->set_name(track_event_internal.name);
if (event_name_interner_.Size() < kMaxInternedStrings) {
// Get the interned id
auto [new_event_name, event_name_iid] = event_name_interner_.GetId(track_event_internal.name);

// If this is a never-seen-before event name, we need to emit the interned data into the data stream.
if (new_event_name) {
if (interned_data == nullptr) {
interned_data = new InternedData();
}

auto* event_name = interned_data->add_event_names();
event_name->set_iid(event_name_iid);
event_name->set_name(track_event_internal.name);
}

// Finally set the name_iid
track_event->set_name_iid(event_name_iid);
packet_sequence_flag |= TracePacket_SequenceFlags_SEQ_NEEDS_INCREMENTAL_STATE;
} else {
LOG_WARNING_LIMIT(std::chrono::milliseconds(5000), Logger(), "number of unique event names emitted in tracing is exceeding {}, string interning is disabled. trace files may be excessively large", kMaxInternedStrings);
track_event->set_name(track_event_internal.name);
}
}

// Deal with the event category
// TODO: support multiple categories later?
if (track_event_internal.category != nullptr) {
track_event->add_categories(track_event_internal.category);
// A bit of duplication is OK
if (event_category_interner_.Size() < kMaxInternedStrings) {
// Get the interned id
auto [new_event_category, event_category_iid] = event_category_interner_.GetId(track_event_internal.category);

// If this is a never-seen-before event category, we need to emit the interned data into the data stream.
if (new_event_category) {
if (interned_data == nullptr) {
interned_data = new InternedData();
}

auto* event_category = interned_data->add_event_categories();
event_category->set_iid(event_category_iid);
event_category->set_name(track_event_internal.category);
}

// Finally set the category_iid
track_event->add_category_iids(event_category_iid);
packet_sequence_flag |= TracePacket_SequenceFlags_SEQ_NEEDS_INCREMENTAL_STATE;
} else {
LOG_WARNING_LIMIT(std::chrono::milliseconds(5000), Logger(), "number of unique event categories emitted in tracing is exceeding {}, string interning is disabled. trace files may be excessively large", kMaxInternedStrings);
track_event->add_categories(track_event_internal.category);
}
}

// Set the track event into the packet and setup packet sequence id
packet->set_allocated_track_event(track_event);
packet->set_trusted_packet_sequence_id(thread_tracer.trusted_packet_sequence_id_);

// Deal with "first packet"
if (sequences_with_first_packet_emitted_.count(thread_tracer.trusted_packet_sequence_id_) == 0) {
sequences_with_first_packet_emitted_.insert(thread_tracer.trusted_packet_sequence_id_);

packet->set_first_packet_on_sequence(true);
packet->set_previous_packet_dropped(true);
packet->set_sequence_flags(TracePacket_SequenceFlags_SEQ_INCREMENTAL_STATE_CLEARED); // TODO: may need to OR this with SEQ_NEEDS_INCREMENTAL_STATE if above needs it.
packet_sequence_flag |= TracePacket_SequenceFlags_SEQ_INCREMENTAL_STATE_CLEARED;
}

// If interned data exists, add it to the packet
if (interned_data != nullptr) {
packet->set_allocated_interned_data(interned_data);
}

// Write the sequence flag is needed
if (packet_sequence_flag != 0) {
packet->set_sequence_flags(packet_sequence_flag);
}

// NOLINTEND(clang-analyzer-cplusplus.NewDeleteLeaks)
}

Expand Down
8 changes: 2 additions & 6 deletions tests/tracing/string_interner_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,18 +94,14 @@ TEST(StringInternerTest, OutOfScope) {

{
std::string s{"hello1"};
const auto [n, i] = interner.GetId(s.data());
new1 = n;
id1 = i;
std::tie(new1, id1) = interner.GetId(s.data());
s.replace(1, 1, "g");
}

{
char c[8]; // NOLINT(modernize-avoid-c-arrays)
strncpy(c, "hello1", sizeof(c));
const auto [n, i] = interner.GetId(c);
new2 = n;
id2 = i;
std::tie(new2, id2) = interner.GetId(c);
c[1] = 'g';
}

Expand Down

0 comments on commit f1bf8ec

Please sign in to comment.