Skip to content

Commit

Permalink
App::CreateThread
Browse files Browse the repository at this point in the history
Instead of letting `Thread` be created outside of the app, we now force
the `Thread` to be created by the `App::CreateThread` factory method.
This allows us to better control lifetime and sets up the stage to
possibly eliminate the weak_ptr situation with the trace aggregator if
the Thread is not longer a shared_ptr.
  • Loading branch information
shuhaowu committed Aug 30, 2024
1 parent 77274f2 commit 78f2fb3
Show file tree
Hide file tree
Showing 21 changed files with 67 additions and 99 deletions.
7 changes: 3 additions & 4 deletions examples/lockless_example/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,11 @@ class NonRTThread : public Thread {

int main() {
Context ctx;
auto rt_thread = std::make_shared<RTThread>(ctx);
auto non_rt_thread = std::make_shared<NonRTThread>(ctx);

App app;
app.RegisterThread(rt_thread);
app.RegisterThread(non_rt_thread);

auto rt_thread = app.CreateThread<RTThread>(ctx);
auto non_rt_thread = app.CreateThread<NonRTThread>(ctx);

app.Start();
app.Join();
Expand Down
4 changes: 1 addition & 3 deletions examples/logging_example/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ int main() {
thread_config.cpu_affinity = std::vector<size_t>{2};
thread_config.SetFifoScheduler(80);

auto thread = std::make_shared<ExampleRTThread>("ExampleRTThread", thread_config);

// Create a cactus_rt app configuration
cactus_rt::AppConfig app_config;

Expand All @@ -54,7 +52,7 @@ int main() {
app_config.logger_config = logging_config;
App app("LoggingExampleApp", app_config);

app.RegisterThread(thread);
auto thread = app.CreateThread<ExampleRTThread>("ExampleRTThread", thread_config);
constexpr unsigned int time = 5;

std::cout << "Testing RT loop for " << time << " seconds.\n";
Expand Down
8 changes: 3 additions & 5 deletions examples/message_passing_example/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@
using cactus_rt::App;

int main() {
auto data_logger = std::make_shared<DataLogger>("build/data.csv");
auto rt_thread = std::make_shared<RtThread>(data_logger);

App app;
app.RegisterThread(data_logger);
app.RegisterThread(rt_thread);

auto data_logger = app.CreateThread<DataLogger>("build/data.csv");
auto rt_thread = app.CreateThread<RtThread>(data_logger);

app.Start();
rt_thread->Join(); // This thread will terminate on its own.
Expand Down
8 changes: 3 additions & 5 deletions examples/mutex_example/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,10 @@ void ThreadedDemo() {
// into the thread and maintain the object lifetime to this function.
NaiveDoubleBuffer<Data> buf;

auto rt_thread = std::make_shared<RTThread>("RTThread", rt_thread_config, buf);
auto non_rt_thread = std::make_shared<NonRTThread>("NonRTThread", non_rt_thread_config, buf);
App app;
App app;

app.RegisterThread(non_rt_thread);
app.RegisterThread(rt_thread);
auto rt_thread = app.CreateThread<RTThread>("RTThread", rt_thread_config, buf);
auto non_rt_thread = app.CreateThread<NonRTThread>("NonRTThread", non_rt_thread_config, buf);

constexpr unsigned int time = 10;
app.Start();
Expand Down
1 change: 0 additions & 1 deletion examples/ros2/publisher/complex_data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ int main(int argc, const char* argv[]) {
std::cout << "Testing RT loop for " << time.count() << " seconds.\n";

auto thread = app.CreateROS2EnabledThread<RTROS2PublisherThread>(time);
app.RegisterThread(thread);

app.Start();

Expand Down
1 change: 0 additions & 1 deletion examples/ros2/publisher/simple_data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ int main(int argc, const char* argv[]) {
std::cout << "Testing RT loop for " << time.count() << " seconds.\n";

auto thread = app.CreateROS2EnabledThread<RTROS2PublisherThread>(time);
app.RegisterThread(thread);

app.Start();

Expand Down
1 change: 0 additions & 1 deletion examples/ros2/subscriber/complex_data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ int main(int argc, const char* argv[]) {
std::cout << "Testing RT loop for " << time.count() << " seconds.\n";

auto thread = app.CreateROS2EnabledThread<RTROS2SubscriberThread>(time);
app.RegisterThread(thread);

app.Start();

Expand Down
1 change: 0 additions & 1 deletion examples/ros2/subscriber/simple_data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ int main(int argc, const char* argv[]) {
std::cout << "Testing RT loop for " << time.count() << " seconds.\n";

auto thread = app.CreateROS2EnabledThread<RTROS2SubscriberThread>(time);
app.RegisterThread(thread);

app.Start();

Expand Down
4 changes: 1 addition & 3 deletions examples/signal_handling_example/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ int main() {
config.cpu_affinity = std::vector<size_t>{2};
config.SetFifoScheduler(80);

auto thread = std::make_shared<ExampleRTThread>("ExampleRTThread", config);
App app;

app.RegisterThread(thread);
auto thread = app.CreateThread<ExampleRTThread>("ExampleRTThread", config);

// Sets up the signal handlers for SIGINT and SIGTERM (by default).
cactus_rt::SetUpTerminationSignalHandler();
Expand Down
3 changes: 1 addition & 2 deletions examples/simple_deadline_example/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,9 @@ int main() {
config.period_ns = 1'000'000;
config.SetDeadlineScheduler(500'000 /* runtime */, 1'000'000 /* deadline*/);

auto thread = std::make_shared<ExampleDeadlineThread>("ExampleRTThread", config);
App app;
auto thread = app.CreateThread<ExampleDeadlineThread>("ExampleRTThread", config);

app.RegisterThread(thread);
constexpr unsigned int time = 5;

std::cout << "Testing RT loop for " << time << " seconds.\n";
Expand Down
8 changes: 2 additions & 6 deletions examples/simple_example/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,8 @@ int main() {
// We first create cactus_rt App object.
App app;

// We then create a thread object
auto thread = std::make_shared<ExampleRTThread>();

// We then register the thread with the app, which allows the app to start,
// stop, and join the thread via App::Start, App::RequestStop, and App::Join.
app.RegisterThread(thread);
// We then create a thread object.
auto thread = app.CreateThread<ExampleRTThread>();

constexpr unsigned int time = 5;
std::cout << "Testing RT loop for " << time << " seconds.\n";
Expand Down
10 changes: 4 additions & 6 deletions examples/tracing_example/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ class SecondRTThread : public CyclicThread {

protected:
LoopControl Loop(int64_t /*now*/) noexcept final {
const auto span = Tracer().WithSpan("Sense");
WasteTime(std::chrono::microseconds(2000));
return LoopControl::Continue;
}
Expand All @@ -98,11 +97,10 @@ int main() {
cactus_rt::AppConfig app_config;
app_config.tracer_config.trace_aggregator_cpu_affinity = {0}; // doesn't work yet

auto thread1 = std::make_shared<ExampleRTThread>();
auto thread2 = std::make_shared<SecondRTThread>();
App app("TracingExampleApp", app_config);
app.RegisterThread(thread1);
app.RegisterThread(thread2);
App app("TracingExampleApp", app_config);

auto thread1 = app.CreateThread<ExampleRTThread>();
auto thread2 = app.CreateThread<SecondRTThread>();

std::cout << "Testing RT loop for 15 seconds with two trace sessions.\n";

Expand Down
28 changes: 14 additions & 14 deletions include/cactus_rt/app.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ class App {

TracerConfig tracer_config_;

std::vector<std::shared_ptr<Thread>> threads_;
std::shared_ptr<tracing::TraceAggregator> trace_aggregator_; // Must be above threads_ to guarantee destructor order.

std::shared_ptr<tracing::TraceAggregator> trace_aggregator_;
std::vector<std::shared_ptr<Thread>> threads_;

void SetDefaultLogFormat(quill::Config& cfg) {
// Create a handler of stdout
Expand All @@ -62,19 +62,19 @@ class App {
App(App&&) noexcept = delete;
App& operator=(App&&) noexcept = delete;

/**
* @brief Registers a thread to be automatically started by the app. The start
* order of the threads are in the order of registration.
*
* @param thread A shared ptr to a thread.
*/
void RegisterThread(std::shared_ptr<Thread> thread);
template <typename ThreadT, typename... Args>
std::shared_ptr<ThreadT> CreateThread(Args&&... args) {
static_assert(std::is_base_of_v<Thread, ThreadT>, "Must derive from cactus_rt::Thread");
std::shared_ptr<ThreadT> thread = std::make_shared<ThreadT>(std::forward<Args>(args)...);

/**
* @brief Sets up the trace aggregator. Call this before starting the thread
* if you don't want to call RegisterThread and maintain tracing capabilities.
*/
void SetupTraceAggregator(Thread& thread);
Thread* base_thread = thread.get();
base_thread->trace_aggregator_ = trace_aggregator_;
base_thread->created_by_app_ = true;

threads_.push_back(thread);

return thread;
}

/**
* @brief Starts the app by locking the memory and reserving the memory. Also
Expand Down
5 changes: 3 additions & 2 deletions include/cactus_rt/cyclic_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@ class CyclicThread : public Thread {
Stop,
};

protected:
/**
* @brief Create a cyclic thread
* @brief Create a cyclic thread.
*
* @param name The thread name
* @param config A cactus_rt::CyclicThreadConfig that specifies configuration parameters for this thread
*/
CyclicThread(std::string name, CyclicThreadConfig config) : Thread(name, config),
period_ns_(config.period_ns) {
}

protected:
void Run() noexcept final;

/**
Expand Down
2 changes: 1 addition & 1 deletion include/cactus_rt/ros2/app.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class App : public cactus_rt::App {
template <typename ThreadT, typename... Args>
std::shared_ptr<ThreadT> CreateROS2EnabledThread(Args&&... args) {
static_assert(std::is_base_of_v<Ros2ThreadMixin, ThreadT>, "Must derive ROS2 thread from Ros2ThreadMixin");
std::shared_ptr<ThreadT> thread = std::make_shared<ThreadT>(std::forward<Args>(args)...);
std::shared_ptr<ThreadT> thread = CreateThread<ThreadT>(std::forward<Args>(args)...);

thread->SetRos2Adapter(ros2_adapter_);
thread->InitializeForRos2();
Expand Down
22 changes: 9 additions & 13 deletions include/cactus_rt/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@ namespace cactus_rt {
/// @private
constexpr size_t kDefaultStackSize = 8 * 1024 * 1024; // 8MB default stack space should be plenty

class App;

class Thread {
friend class App;

bool created_by_app_ = false; // A guard to prevent users to create the thread without using App::CreateThread.

ThreadConfig config_;
std::string name_;
std::vector<size_t> cpu_affinity_;
Expand All @@ -39,11 +45,10 @@ class Thread {
static void* RunThread(void* data);

// Non-owning TraceAggregator pointer. Used only for notifying that the thread
// has started/stopped for tracing purposes. Set by Thread::Start and read at
// the beginning of Thread::RunThread.
// has started/stopped for tracing purposes. Set by App::CreateThread.
std::weak_ptr<tracing::TraceAggregator> trace_aggregator_;

public:
protected:
/**
* Creates a new thread.
*
Expand All @@ -61,6 +66,7 @@ class Thread {
}
}

public:
/**
* Returns the name of the thread
*
Expand Down Expand Up @@ -111,16 +117,6 @@ class Thread {
*/
void Start(int64_t start_monotonic_time_ns);

/**
* @brief Sets the trace_aggregator_ pointer so the thread can notify the
* trace_aggregator_ when it starts. This should only be called by App.
*
* @private
*/
inline void SetTraceAggregator(std::weak_ptr<tracing::TraceAggregator> trace_aggregator) {
trace_aggregator_ = trace_aggregator;
}

protected:
inline quill::Logger* Logger() const { return logger_; }

Expand Down
9 changes: 0 additions & 9 deletions src/cactus_rt/app.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,6 @@ using FileSink = cactus_rt::tracing::FileSink;

namespace cactus_rt {

void App::SetupTraceAggregator(Thread& thread) {
thread.SetTraceAggregator(trace_aggregator_);
}

void App::RegisterThread(std::shared_ptr<Thread> thread) {
SetupTraceAggregator(*thread);
threads_.push_back(thread);
}

App::App(std::string name, AppConfig config)
: name_(name),
heap_size_(config.heap_size),
Expand Down
1 change: 0 additions & 1 deletion src/cactus_rt/ros2/app.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ App::App(
// Must initialize rclcpp before making the Ros2Adapter;
ros2_adapter_ = std::make_shared<Ros2Adapter>(name, ros2_adapter_config);
ros2_executor_thread_ = CreateROS2EnabledThread<Ros2ExecutorThread>();
SetupTraceAggregator(*ros2_executor_thread_);
}

App::~App() {
Expand Down
11 changes: 10 additions & 1 deletion src/cactus_rt/thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ namespace cactus_rt {

void* Thread::RunThread(void* data) {
auto* thread = static_cast<Thread*>(data);

if (!thread->created_by_app_) {
throw std::runtime_error(std::string("do not create Thread manually, use App::CreateThread to create thread") + thread->name_);
}

thread->config_.scheduler->SetSchedAttr();

pthread_setname_np(pthread_self(), thread->name_.c_str());
Expand All @@ -25,7 +30,11 @@ void* Thread::RunThread(void* data) {
if (auto trace_aggregator = thread->trace_aggregator_.lock()) {
trace_aggregator->RegisterThreadTracer(thread->tracer_);
} else {
LOG_WARNING(thread->Logger(), "thread {} does not have app_ and tracing is disabled for this thread. Did you call App::RegisterThread?", thread->name_);
LOG_WARNING(
thread->Logger(),
"thread {} does not have app_ and tracing is disabled for this thread. Did the App/Thread go out of scope before the thread is launched?",
thread->name_
);
}

quill::preallocate(); // Pre-allocates thread-local data to avoid the need to allocate on the first log message
Expand Down
Loading

0 comments on commit 78f2fb3

Please sign in to comment.