Skip to content

Commit

Permalink
String interning for tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
shuhaowu committed Mar 19, 2024
1 parent 42c7722 commit a7e657a
Show file tree
Hide file tree
Showing 11 changed files with 565 additions and 98 deletions.
52 changes: 40 additions & 12 deletions examples/tracing_example/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,19 @@ void WasteTime(std::chrono::microseconds duration) {
class ExampleRTThread : public CyclicThread {
int64_t loop_counter_ = 0;

static cactus_rt::CyclicThreadConfig CreateThreadConfig() {
cactus_rt::CyclicThreadConfig thread_config;
thread_config.period_ns = 1'000'000;
thread_config.cpu_affinity = std::vector<size_t>{2};
thread_config.SetFifoScheduler(80);

// thread_config.tracer_config.trace_sleep = true;
thread_config.tracer_config.trace_wakeup_latency = true;
return thread_config;
}

public:
ExampleRTThread(const char* name, cactus_rt::CyclicThreadConfig config) : CyclicThread(name, config) {}
ExampleRTThread() : CyclicThread("ExampleRTThread", CreateThreadConfig()) {}

int64_t GetLoopCounter() const {
return loop_counter_;
Expand Down Expand Up @@ -60,21 +71,38 @@ class ExampleRTThread : public CyclicThread {
}
};

int main() {
cactus_rt::CyclicThreadConfig thread_config;
thread_config.period_ns = 1'000'000;
thread_config.cpu_affinity = std::vector<size_t>{2};
thread_config.SetFifoScheduler(80);
class SecondRTThread : public CyclicThread {
static cactus_rt::CyclicThreadConfig CreateThreadConfig() {
cactus_rt::CyclicThreadConfig thread_config;
thread_config.period_ns = 3'000'000;
thread_config.cpu_affinity = {1};
thread_config.SetFifoScheduler(60);

// thread_config.tracer_config.trace_sleep = true;
thread_config.tracer_config.trace_wakeup_latency = true;
// thread_config.tracer_config.trace_sleep = true;
thread_config.tracer_config.trace_wakeup_latency = true;
return thread_config;
}

public:
SecondRTThread() : CyclicThread("SecondRTThread", CreateThreadConfig()) {}

protected:
bool Loop(int64_t /*now*/) noexcept final {
const auto span = Tracer().WithSpan("Sense");
WasteTime(std::chrono::microseconds(2000));
return false;
}
};

int main() {
cactus_rt::AppConfig app_config;
app_config.tracer_config.trace_aggregator_cpu_affinity = {1};
app_config.tracer_config.trace_aggregator_cpu_affinity = {0}; // doesn't work yet

auto thread = std::make_shared<ExampleRTThread>("ExampleRTThread", thread_config);
auto thread1 = std::make_shared<ExampleRTThread>();
auto thread2 = std::make_shared<SecondRTThread>();
App app("TracingExampleApp", app_config);
app.RegisterThread(thread);
app.RegisterThread(thread1);
app.RegisterThread(thread2);

std::cout << "Testing RT loop for 15 seconds with two trace sessions.\n";

Expand All @@ -97,6 +125,6 @@ int main() {
app.Join();
// Don't need to stop the trace session as the app destructor will take care of it.

std::cout << "Number of loops executed: " << thread->GetLoopCounter() << "\n";
std::cout << "Number of loops executed: " << thread1->GetLoopCounter() << "\n";
return 0;
}
11 changes: 5 additions & 6 deletions include/cactus_rt/app.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class App {
// TODO: investigate into a weak pointer.
std::list<std::shared_ptr<tracing::ThreadTracer>> thread_tracers_;
std::unique_ptr<tracing::TraceAggregator> trace_aggregator_ = nullptr;
std::mutex tracer_mutex_;
std::mutex aggregator_mutex_;

void SetDefaultLogFormat(quill::Config& cfg) {
// Create a handler of stdout
Expand Down Expand Up @@ -113,15 +113,14 @@ class App {
/**
* @brief Starts a new tracing session for the process. Will not start a new
* tracing session if an existing tracing session is in progress. This
* function is not real-time safe. This will not register any output sinks.
* Use App::RegisterTraceSink() to register custom sinks.
* function is not real-time safe.
*/
bool StartTraceSession() noexcept;
bool StartTraceSession(std::shared_ptr<tracing::Sink> sink) noexcept;

/**
* @brief Register a custom trace sink after starting the trace session
*/
void RegisterTraceSink(std::shared_ptr<cactus_rt::tracing::Sink> sink) noexcept;
void RegisterTraceSink(std::shared_ptr<tracing::Sink> sink) noexcept;

/**
* @brief Stops the tracing session for the process. Will be no-op if tracing
Expand Down Expand Up @@ -159,7 +158,7 @@ class App {
*/
void DeregisterThreadTracer(const std::shared_ptr<tracing::ThreadTracer>& thread_tracer) noexcept;

void CreateAndStartTraceAggregator() noexcept;
void CreateAndStartTraceAggregator(std::shared_ptr<tracing::Sink> sink) noexcept;

void StopTraceAggregator() noexcept;
};
Expand Down
8 changes: 8 additions & 0 deletions include/cactus_rt/tracing/thread_tracer.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "../experimental/lockless/atomic_message.h"
#include "track_event_internal.h"
#include "utils/string_interner.h"

namespace cactus_rt::tracing {
struct EventCountData {
Expand All @@ -35,6 +36,13 @@ class ThreadTracer {

moodycamel::ReaderWriterQueue<TrackEventInternal> queue_;

// The event name interning must be done per thread (per sequence). Thus it is
// stored here. However, this class must NEVER call functions here (other
// than maybe .Size), as the memory allocation can occur. This variable is
// designed to be used by TraceAggregator on the non-real-time path.
utils::StringInterner event_name_interner_;
utils::StringInterner event_category_interner_;

std::string name_;
uint64_t track_uuid_;

Expand Down
28 changes: 27 additions & 1 deletion include/cactus_rt/tracing/trace_aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,35 @@ class TraceAggregator {
size_t TryDequeueOnceFromAllTracers(Trace& trace) noexcept;
void WriteTrace(const Trace& trace) noexcept;

/**
* Creates the initial process descriptor packet
*/
Trace CreateProcessDescriptorPacket() const;

/**
* Creates a thread descriptor packet given a thread tracer.
*
* Must be called while a lock is held.
*/
Trace CreateThreadDescriptorPacket(const ThreadTracer& thread_tracer) const;
void AddTrackEventPacketToTrace(Trace& trace, const ThreadTracer& thread_tracer, const TrackEventInternal& track_event_internal);

/**
* Adds the track event packet to an existing trace.
*
* Must be called while a lock is held.
*/
void AddTrackEventPacketToTrace(Trace& trace, ThreadTracer& thread_tracer, const TrackEventInternal& track_event_internal);

/**
* Create the initial interned data packet if a new sink joins.
*
* Must be called while a lock is held.
*
* @param initial_timestamp The initial timestamp of the track, must be before
* all other packets about to be written. Commonly this should be the
* timestamp of the sticky packets.
*/
std::optional<Trace> CreateInitialInternedDataPacket() const;
};
} // namespace cactus_rt::tracing

Expand Down
4 changes: 4 additions & 0 deletions include/cactus_rt/tracing/utils/string_interner.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ class StringInterner {
size_t Size() const {
return strings_.size();
};

const std::unordered_map<std::string_view, uint64_t>& Ids() const {
return ids_;
}
};
} // namespace cactus_rt::tracing::utils

Expand Down
31 changes: 17 additions & 14 deletions src/cactus_rt/app.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,30 +65,29 @@ bool App::StartTraceSession(const char* output_filename) noexcept {
return false;
}

CreateAndStartTraceAggregator();
trace_aggregator_->RegisterSink(std::make_unique<FileSink>(output_filename));
CreateAndStartTraceAggregator(std::make_shared<FileSink>(output_filename));
cactus_rt::tracing::EnableTracing();

return true;
}

bool App::StartTraceSession() noexcept {
bool App::StartTraceSession(std::shared_ptr<tracing::Sink> sink) noexcept {
if (cactus_rt::tracing::IsTracingEnabled()) {
return false;
}

CreateAndStartTraceAggregator();
CreateAndStartTraceAggregator(sink);
cactus_rt::tracing::EnableTracing();

return true;
}

void App::RegisterTraceSink(std::shared_ptr<cactus_rt::tracing::Sink> sink) noexcept {
if (trace_aggregator_ == nullptr) {
return;
}
void App::RegisterTraceSink(std::shared_ptr<tracing::Sink> sink) noexcept {
const std::scoped_lock lock(aggregator_mutex_);

trace_aggregator_->RegisterSink(sink);
if (trace_aggregator_ != nullptr) {
trace_aggregator_->RegisterSink(sink);
}
}

bool App::StopTraceSession() noexcept {
Expand All @@ -103,7 +102,7 @@ bool App::StopTraceSession() noexcept {
}

void App::RegisterThreadTracer(std::shared_ptr<tracing::ThreadTracer> thread_tracer) noexcept {
const std::scoped_lock lock(tracer_mutex_);
const std::scoped_lock lock(aggregator_mutex_);

thread_tracers_.push_back(thread_tracer);

Expand All @@ -113,7 +112,7 @@ void App::RegisterThreadTracer(std::shared_ptr<tracing::ThreadTracer> thread_tra
}

void App::DeregisterThreadTracer(const std::shared_ptr<tracing::ThreadTracer>& thread_tracer) noexcept {
const std::scoped_lock lock(tracer_mutex_);
const std::scoped_lock lock(aggregator_mutex_);

thread_tracers_.remove_if([thread_tracer](const std::shared_ptr<tracing::ThreadTracer>& t) {
return t == thread_tracer;
Expand Down Expand Up @@ -190,8 +189,8 @@ void App::StartQuill() {
quill::start();
}

void App::CreateAndStartTraceAggregator() noexcept {
const std::scoped_lock lock(tracer_mutex_);
void App::CreateAndStartTraceAggregator(std::shared_ptr<tracing::Sink> sink) noexcept {
const std::scoped_lock lock(aggregator_mutex_);

if (trace_aggregator_ != nullptr) {
// TODO: error here
Expand All @@ -203,11 +202,15 @@ void App::CreateAndStartTraceAggregator() noexcept {
trace_aggregator_->RegisterThreadTracer(tracer);
}

if (sink != nullptr) {
trace_aggregator_->RegisterSink(sink);
}

trace_aggregator_->Start();
}

void App::StopTraceAggregator() noexcept {
const std::scoped_lock lock(tracer_mutex_);
const std::scoped_lock lock(aggregator_mutex_);

if (trace_aggregator_ == nullptr) {
// TODO: error here
Expand Down
Loading

0 comments on commit a7e657a

Please sign in to comment.