Skip to content

Commit

Permalink
single threaded test wip
Browse files Browse the repository at this point in the history
  • Loading branch information
shuhaowu committed Mar 19, 2024
1 parent 607f0d2 commit 050f8c0
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 50 deletions.
11 changes: 5 additions & 6 deletions include/cactus_rt/app.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class App {
// TODO: investigate into a weak pointer.
std::list<std::shared_ptr<tracing::ThreadTracer>> thread_tracers_;
std::unique_ptr<tracing::TraceAggregator> trace_aggregator_ = nullptr;
std::mutex tracer_mutex_;
std::mutex aggregator_mutex_;

void SetDefaultLogFormat(quill::Config& cfg) {
// Create a handler of stdout
Expand Down Expand Up @@ -113,15 +113,14 @@ class App {
/**
* @brief Starts a new tracing session for the process. Will not start a new
* tracing session if an existing tracing session is in progress. This
* function is not real-time safe. This will not register any output sinks.
* Use App::RegisterTraceSink() to register custom sinks.
* function is not real-time safe.
*/
bool StartTraceSession() noexcept;
bool StartTraceSession(std::shared_ptr<tracing::Sink> sink) noexcept;

/**
* @brief Register a custom trace sink after starting the trace session
*/
void RegisterTraceSink(std::shared_ptr<cactus_rt::tracing::Sink> sink) noexcept;
void RegisterTraceSink(std::shared_ptr<tracing::Sink> sink) noexcept;

/**
* @brief Stops the tracing session for the process. Will be no-op if tracing
Expand Down Expand Up @@ -159,7 +158,7 @@ class App {
*/
void DeregisterThreadTracer(const std::shared_ptr<tracing::ThreadTracer>& thread_tracer) noexcept;

void CreateAndStartTraceAggregator() noexcept;
void CreateAndStartTraceAggregator(std::shared_ptr<tracing::Sink> sink) noexcept;

void StopTraceAggregator() noexcept;
};
Expand Down
31 changes: 17 additions & 14 deletions src/cactus_rt/app.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,30 +65,29 @@ bool App::StartTraceSession(const char* output_filename) noexcept {
return false;
}

CreateAndStartTraceAggregator();
trace_aggregator_->RegisterSink(std::make_unique<FileSink>(output_filename));
CreateAndStartTraceAggregator(std::make_shared<FileSink>(output_filename));
cactus_rt::tracing::EnableTracing();

return true;
}

bool App::StartTraceSession() noexcept {
bool App::StartTraceSession(std::shared_ptr<tracing::Sink> sink) noexcept {
if (cactus_rt::tracing::IsTracingEnabled()) {
return false;
}

CreateAndStartTraceAggregator();
CreateAndStartTraceAggregator(sink);
cactus_rt::tracing::EnableTracing();

return true;
}

void App::RegisterTraceSink(std::shared_ptr<cactus_rt::tracing::Sink> sink) noexcept {
if (trace_aggregator_ == nullptr) {
return;
}
void App::RegisterTraceSink(std::shared_ptr<tracing::Sink> sink) noexcept {
const std::scoped_lock lock(aggregator_mutex_);

trace_aggregator_->RegisterSink(sink);
if (trace_aggregator_ != nullptr) {
trace_aggregator_->RegisterSink(sink);
}
}

bool App::StopTraceSession() noexcept {
Expand All @@ -103,7 +102,7 @@ bool App::StopTraceSession() noexcept {
}

void App::RegisterThreadTracer(std::shared_ptr<tracing::ThreadTracer> thread_tracer) noexcept {
const std::scoped_lock lock(tracer_mutex_);
const std::scoped_lock lock(aggregator_mutex_);

thread_tracers_.push_back(thread_tracer);

Expand All @@ -113,7 +112,7 @@ void App::RegisterThreadTracer(std::shared_ptr<tracing::ThreadTracer> thread_tra
}

void App::DeregisterThreadTracer(const std::shared_ptr<tracing::ThreadTracer>& thread_tracer) noexcept {
const std::scoped_lock lock(tracer_mutex_);
const std::scoped_lock lock(aggregator_mutex_);

thread_tracers_.remove_if([thread_tracer](const std::shared_ptr<tracing::ThreadTracer>& t) {
return t == thread_tracer;
Expand Down Expand Up @@ -190,8 +189,8 @@ void App::StartQuill() {
quill::start();
}

void App::CreateAndStartTraceAggregator() noexcept {
const std::scoped_lock lock(tracer_mutex_);
void App::CreateAndStartTraceAggregator(std::shared_ptr<tracing::Sink> sink) noexcept {
const std::scoped_lock lock(aggregator_mutex_);

if (trace_aggregator_ != nullptr) {
// TODO: error here
Expand All @@ -203,11 +202,15 @@ void App::CreateAndStartTraceAggregator() noexcept {
trace_aggregator_->RegisterThreadTracer(tracer);
}

if (sink != nullptr) {
trace_aggregator_->RegisterSink(sink);
}

trace_aggregator_->Start();
}

void App::StopTraceAggregator() noexcept {
const std::scoped_lock lock(tracer_mutex_);
const std::scoped_lock lock(aggregator_mutex_);

if (trace_aggregator_ == nullptr) {
// TODO: error here
Expand Down
23 changes: 16 additions & 7 deletions src/cactus_rt/tracing/trace_aggregator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using cactus_tracing::vendor::perfetto::protos::ProcessDescriptor;
using cactus_tracing::vendor::perfetto::protos::ThreadDescriptor;
using cactus_tracing::vendor::perfetto::protos::Trace;
using cactus_tracing::vendor::perfetto::protos::TracePacket;
using cactus_tracing::vendor::perfetto::protos::TracePacket_SequenceFlags_SEQ_INCREMENTAL_STATE_CLEARED;
using cactus_tracing::vendor::perfetto::protos::TracePacket_SequenceFlags_SEQ_NEEDS_INCREMENTAL_STATE;
using cactus_tracing::vendor::perfetto::protos::TrackDescriptor;
Expand Down Expand Up @@ -278,7 +279,7 @@ void TraceAggregator::AddTrackEventPacketToTrace(
// Deal with the event name
if (track_event_internal.name != nullptr) {
if (thread_tracer.event_name_interner_.Size() < kMaxInternedStrings) {
auto [new_event_name, event_name_iid] = thread_tracer.event_category_interner_.GetId(track_event_internal.name);
auto [new_event_name, event_name_iid] = thread_tracer.event_name_interner_.GetId(track_event_internal.name);

// If this is a never-seen-before event name, we need to emit the interned data into the data stream.
if (new_event_name) {
Expand Down Expand Up @@ -365,13 +366,11 @@ void TraceAggregator::AddTrackEventPacketToTrace(

std::optional<Trace> TraceAggregator::CreateInitialInternedDataPacket() const {
Trace trace;
auto* packet = trace.add_packet();
// TODO: is it okay to not have a timestamp?
// packet->set_timestamp(initial_timestamp);

InternedData* interned_data = nullptr;
bool wrote_interned_data = false;

for (const auto& tracer : this->tracers_) {
InternedData* interned_data = nullptr;
for (const auto& [name, name_iid] : tracer->event_name_interner_.Ids()) {
if (interned_data == nullptr) {
interned_data = new InternedData();
Expand All @@ -391,10 +390,20 @@ std::optional<Trace> TraceAggregator::CreateInitialInternedDataPacket() const {
event_category->set_name(category.data());
event_category->set_iid(category_iid);
}

if (interned_data != nullptr) {
wrote_interned_data = true;
TracePacket* packet = trace.add_packet();

// TODO: is it okay to not have a timestamp?
// packet->set_timestamp(initial_timestamp);

packet->set_allocated_interned_data(interned_data);
packet->set_trusted_packet_sequence_id(tracer->trusted_packet_sequence_id_);
}
}

if (interned_data != nullptr) {
packet->set_allocated_interned_data(interned_data);
if (wrote_interned_data) {
return trace;
}

Expand Down
21 changes: 21 additions & 0 deletions tests/tracing/helpers/assert_helpers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,27 @@ void AssertIsTrackEventInstant(
}
}

void AssertIsTrackEventInstant1(
const TracePacket& packet,
uint64_t thread_track_uuid,
uint32_t trusted_packet_sequence_id
) {
ASSERT_TRUE(packet.has_track_event());

const auto& track_event = packet.track_event();
ASSERT_TRUE(track_event.has_type());
ASSERT_EQ(track_event.type(), TrackEvent_Type_TYPE_INSTANT);

ASSERT_EQ(track_event.track_uuid(), thread_track_uuid);

ASSERT_TRUE(packet.has_trusted_packet_sequence_id());
if (trusted_packet_sequence_id == 0) {
ASSERT_TRUE(packet.trusted_packet_sequence_id() > 0);
} else {
ASSERT_EQ(packet.trusted_packet_sequence_id(), trusted_packet_sequence_id);
}
}

void AssertTrackEventDuration(const TracePacket& packet1, const TracePacket& packet2, uint64_t min, uint64_t max) {
ASSERT_TRUE(packet1.has_track_event());
ASSERT_TRUE(packet2.has_track_event());
Expand Down
6 changes: 6 additions & 0 deletions tests/tracing/helpers/assert_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ std::unordered_map<std::string, uint64_t> GetInternedEventCategories(const Trace

void AssertIsTrackEventSliceEnd(const TracePacket& packet, uint64_t thread_track_uuid, uint32_t trusted_packet_sequence_id);

void AssertIsTrackEventInstant1(
const TracePacket& packet,
uint64_t thread_track_uuid,
uint32_t trusted_packet_sequence_id = 0
);

void AssertIsTrackEventInstant(
const TracePacket& packet,
const char* event_name,
Expand Down
3 changes: 1 addition & 2 deletions tests/tracing/multi_threaded_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ class MultiThreadTracingTest : public ::testing::Test {

protected:
void SetUp() override {
app_.StartTraceSession();
app_.RegisterTraceSink(sink_); // TODO: make this registerable before the trace session starts.
app_.StartTraceSession(sink_);
}

void TearDown() override {
Expand Down
91 changes: 70 additions & 21 deletions tests/tracing/single_threaded_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@ class SingleThreadTracingTest : public ::testing::Test {
protected:
void SetUp() override {
app_.RegisterThread(regular_thread_);
app_.StartTraceSession(); // TODO: make each test manually start the trace session!

app_.RegisterTraceSink(sink_);

app_.StartTraceSession(sink_); // TODO: make each test manually start the trace session!
app_.Start();
}

Expand Down Expand Up @@ -158,7 +155,7 @@ TEST_F(SingleThreadTracingTest, WithSpanNested) {
ASSERT_GT(inner_event1_iid, 0);
ASSERT_NE(inner_event1_iid, outer_event_iid);

event_categories = GetInternedEventCategories(*packets[2]);
event_categories = GetInternedEventCategories(*packets[3]);
ASSERT_EQ(event_categories.size(), 1);

const auto inner_category_iid = event_categories.at("inner");
Expand Down Expand Up @@ -207,7 +204,7 @@ TEST_F(SingleThreadTracingTest, WithSpanNested) {
// First InnerEvent2, inner packet
AssertIsTrackEventSliceBegin1(*packets[9], thread_track_uuid, sequence_id);

event_names = GetInternedEventNames(*packets[6]);
event_names = GetInternedEventNames(*packets[9]);
ASSERT_EQ(event_names.size(), 1);

const auto inner_event2_iid = event_names.at("InnerEvent2");
Expand All @@ -217,7 +214,7 @@ TEST_F(SingleThreadTracingTest, WithSpanNested) {
ASSERT_NE(inner_event2_iid, inner_event1_iid);
ASSERT_NE(inner_event2_iid, outer_event_iid);

AssertTrackEventHasIid(*packets[6], inner_inner_event2_iid, inner_category_iid);
AssertTrackEventHasIid(*packets[9], inner_event2_iid, inner_category_iid);

AssertIsTrackEventSliceEnd(*packets[10], thread_track_uuid, sequence_id);
AssertTrackEventDuration(*packets[9], *packets[10], 2000000, 20000000);
Expand All @@ -243,7 +240,21 @@ TEST_F(SingleThreadTracingTest, InstantEvent) {
AssertIsThreadTrackDescriptor(*packets[1], kRegularThreadName, process_track_uuid);
auto thread_track_uuid = packets[1]->track_descriptor().uuid();

AssertIsTrackEventInstant(*packets[2], "MyCoolEvent", "instant", thread_track_uuid);
AssertIsTrackEventInstant1(*packets[2], thread_track_uuid);

const auto event_names = GetInternedEventNames(*packets[2]);
ASSERT_EQ(event_names.size(), 1);

const auto event_name_iid = event_names.at("MyCoolEvent");
ASSERT_GT(event_name_iid, 0);

const auto event_categories = GetInternedEventCategories(*packets[2]);
ASSERT_EQ(event_categories.size(), 1);

const auto category_iid = event_categories.at("instant");
ASSERT_GT(category_iid, 0);

AssertTrackEventHasIid(*packets[2], event_name_iid, category_iid);
}

TEST_F(SingleThreadTracingTest, StopTracingAndNoEventsAreRecorded) {
Expand Down Expand Up @@ -272,17 +283,15 @@ TEST_F(SingleThreadTracingTest, RestartTracingStartsNewSession) {
auto packets = GetPacketsFromTraces(traces);
ASSERT_EQ(packets.size(), 4);

auto event1_thread_sequence_id1 = packets[2]->trusted_packet_sequence_id();

regular_thread_->RunOneIteration([](MockRegularThread* self) {
auto span = self->TracerForTest().WithSpan("Event2");
WasteTime(std::chrono::microseconds(1000));
});

// In normal API usage, we always have to re-register sinks after starting a
// trace session, which may not be ideal?? Alternatively could use the short
// hand API to log directly to file...
app_.StartTraceSession();
sink_->Clear(); // clear the sink so we have a fresh start when restarting trace
app_.RegisterTraceSink(sink_); // TODO: make it so that sinks are cached???? Doesn't make sense tho.
sink_->Clear(); // clear the sink so we have a fresh start when restarting trace
app_.StartTraceSession(sink_);

regular_thread_->RunOneIteration([](MockRegularThread* self) {
auto span = self->TracerForTest().WithSpan("Event3");
Expand All @@ -293,21 +302,44 @@ TEST_F(SingleThreadTracingTest, RestartTracingStartsNewSession) {

auto traces2 = sink_->LoggedTraces();
auto packets2 = GetPacketsFromTraces(traces2);
ASSERT_EQ(packets2.size(), 4);
ASSERT_EQ(packets2.size(), 5);

AssertIsProcessTrackDescriptor(*packets2[0], kAppName);
const auto process_track_uuid = packets2[0]->track_descriptor().uuid();

AssertIsThreadTrackDescriptor(*packets2[1], kRegularThreadName, process_track_uuid);
auto thread_track_uuid = packets2[1]->track_descriptor().uuid();

std::cout << "packets2: " << packets2[2]->ShortDebugString() << "\n";

// Event1 is emitted as interned data because that thread is still active and the event name got interned previously.
auto event_names = GetInternedEventNames(*packets2[2]);
ASSERT_EQ(event_names.size(), 1);

auto event1_name_iid = event_names.at("Event1");
ASSERT_GT(event1_name_iid, 0);

auto event1_thread_sequence_id2 = packets2[2]->trusted_packet_sequence_id();

ASSERT_EQ(event1_thread_sequence_id1, event1_thread_sequence_id2);

// Note Event2 is lost as designed
AssertIsTrackEventSliceBegin(*packets2[2], "Event3", nullptr, thread_track_uuid);
auto sequence_id = packets2[2]->trusted_packet_sequence_id();
AssertIsTrackEventSliceBegin1(*packets2[3], thread_track_uuid);
auto sequence_id = packets2[3]->trusted_packet_sequence_id();

ASSERT_EQ(sequence_id, event1_thread_sequence_id2);

event_names = GetInternedEventNames(*packets2[3]);
ASSERT_EQ(event_names.size(), 1);

const auto event3_name_iid = event_names.at("Event3");
ASSERT_GT(event3_name_iid, 0);

AssertTrackEventHasIid(*packets2[3], event3_name_iid, 0);

AssertIsTrackEventSliceEnd(*packets2[3], thread_track_uuid, sequence_id);
AssertIsTrackEventSliceEnd(*packets2[4], thread_track_uuid, sequence_id);

AssertTrackEventDuration(*packets2[2], *packets2[3], 1000000, 10000000);
AssertTrackEventDuration(*packets2[3], *packets2[4], 1000000, 10000000);
}

TEST_F(SingleThreadTracingTest, DynamicallyAddingSinkWillWork) {
Expand All @@ -334,15 +366,32 @@ TEST_F(SingleThreadTracingTest, DynamicallyAddingSinkWillWork) {
auto traces2 = sink2->LoggedTraces();
auto packets2 = GetPacketsFromTraces(traces2);

ASSERT_EQ(packets2.size(), 3);
ASSERT_EQ(packets2.size(), 4);

AssertIsProcessTrackDescriptor(*packets2[0], kAppName);
const auto process_track_uuid = packets2[0]->track_descriptor().uuid();

AssertIsThreadTrackDescriptor(*packets2[1], kRegularThreadName, process_track_uuid);
auto thread_track_uuid = packets2[1]->track_descriptor().uuid();

AssertIsTrackEventInstant(*packets2[2], "Event2", nullptr, thread_track_uuid);
auto event_names = GetInternedEventNames(*packets2[2]);
ASSERT_EQ(event_names.size(), 1);

const auto event1_name_iid = event_names.at("Event1");
ASSERT_GT(event1_name_iid, 0);

auto sequence_id = packets2[2]->trusted_packet_sequence_id();

AssertIsTrackEventInstant1(*packets2[3], thread_track_uuid, sequence_id);

event_names = GetInternedEventNames(*packets2[3]);
ASSERT_EQ(event_names.size(), 1);

const auto event2_name_iid = event_names.at("Event2");
ASSERT_GT(event2_name_iid, 0);
ASSERT_NE(event2_name_iid, event1_name_iid);

AssertTrackEventHasIid(*packets2[3], event2_name_iid, 0);

auto traces = sink_->LoggedTraces();
auto packets = GetPacketsFromTraces(traces);
Expand Down

0 comments on commit 050f8c0

Please sign in to comment.