From 4033dcab09f93ee83c05379ec00c13c92df0a705 Mon Sep 17 00:00:00 2001 From: likepeng Date: Wed, 19 Jun 2024 14:16:36 +0800 Subject: [PATCH 01/13] add running state --- CMakeLists.txt | 2 +- myframe/app.cpp | 46 ++++++++++++++++++++++++++++++++++++++-------- myframe/app.h | 17 +++++++++++++---- 3 files changed, 52 insertions(+), 13 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index f9576bc..bbc87b7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.10) if (POLICY CMP0091) cmake_policy(SET CMP0091 NEW) endif() -project(myframe VERSION 0.9.3) +project(myframe VERSION 0.9.4) ### option option(MYFRAME_USE_CV "Using conditional variables for thread communication" ON) diff --git a/myframe/app.cpp b/myframe/app.cpp index b8e4039..3871c35 100644 --- a/myframe/app.cpp +++ b/myframe/app.cpp @@ -63,7 +63,7 @@ bool App::Init( int thread_pool_size, int event_conn_size, int warning_msg_size) { - if (!quit_.load()) { + if (state_.load() != kUninitialized) { return true; } @@ -73,10 +73,11 @@ bool App::Init( ret &= poller_->Init(); ret &= worker_ctx_mgr_->Init(warning_msg_size); ret &= ev_conn_mgr_->Init(event_conn_size); + + state_.store(kInitialized); + ret &= StartCommonWorker(thread_pool_size); ret &= StartTimerWorker(); - - quit_.store(false); return ret; } @@ -119,6 +120,10 @@ bool App::LoadServiceFromJsonStr(const std::string& service) { } bool App::LoadServiceFromJson(const Json::Value& service) { + if (state_.load() == kUninitialized) { + LOG(ERROR) << "not init, please call Init() before LoadServiceFromJson()"; + return false; + } if (service.isNull()) { LOG(ERROR) << "parse service json failed, skip"; return false; @@ -265,6 +270,10 @@ bool App::AddActor( const std::string& params, std::shared_ptr actor, const Json::Value& config) { + if (state_.load() == kUninitialized) { + LOG(ERROR) << "not init, please call Init() before AddActor()"; + return false; + } actor->SetInstName(inst_name); actor->SetConfig(config); return CreateActorContext(actor, params); @@ -274,6 +283,10 @@ bool App::AddWorker( const std::string& inst_name, std::shared_ptr worker, const Json::Value& config) { + if (state_.load() == kUninitialized) { + LOG(ERROR) << "not init, please call Init() before AddWorker()"; + return false; + } auto worker_ctx = std::make_shared( shared_from_this(), worker, poller_); worker->SetInstName(inst_name); @@ -300,6 +313,10 @@ bool App::AddWorker( } int App::Send(std::shared_ptr msg) { + if (state_.load() == kUninitialized) { + LOG(ERROR) << "not init, please call Init() before Send()"; + return -1; + } auto conn = ev_conn_mgr_->Alloc(); if (conn == nullptr) { LOG(ERROR) << "alloc conn event failed"; @@ -314,6 +331,10 @@ int App::Send(std::shared_ptr msg) { const std::shared_ptr App::SendRequest( std::shared_ptr msg) { + if (state_.load() == kUninitialized) { + LOG(ERROR) << "not init, please call Init() before SendRequest()"; + return nullptr; + } auto conn = ev_conn_mgr_->Alloc(); if (conn == nullptr) { LOG(ERROR) << "alloc conn event failed"; @@ -326,6 +347,10 @@ const std::shared_ptr App::SendRequest( return resp; } +std::unique_ptr& App::GetModManager() { + return mods_; +} + /** * 创建一个新的actor: * 1. 从ModManager中获得对应模块对象 @@ -732,9 +757,13 @@ void App::ProcessCacheMsg() { } int App::Exec() { + if (state_.load() == kUninitialized) { + LOG(ERROR) << "not init, please call Init() before Exec()"; + return -1; + } int time_wait_ms = 100; std::vector evs; - + state_.store(kRunning); while (worker_ctx_mgr_->WorkerSize()) { /// 检查空闲线程队列是否有空闲线程,如果有就找到一个有消息的actor处理 CheckStopWorkers(); @@ -748,17 +777,18 @@ int App::Exec() { // quit App worker_ctx_mgr_->WaitAllWorkerQuit(); - quit_.store(true); + state_.store(kQuit); LOG(INFO) << "app exit exec"; return 0; } void App::Quit() { // wait worker stop - if (quit_.load()) { - return; + if (state_.load() == kRunning + || state_.load() == kInitialized) { + state_.store(kQuitting); + worker_ctx_mgr_->StopAllWorker(); } - worker_ctx_mgr_->StopAllWorker(); } bool App::HasUserInst(const std::string& name) { diff --git a/myframe/app.h b/myframe/app.h index 73fe073..2a34fe7 100644 --- a/myframe/app.h +++ b/myframe/app.h @@ -44,6 +44,14 @@ class MYFRAME_EXPORT App final : public std::enable_shared_from_this { friend class Actor; public: + enum State : std::uint8_t { + kUninitialized = 0, + kInitialized, + kRunning, + kQuitting, + kQuit, + }; + App(); virtual ~App(); @@ -57,10 +65,10 @@ class MYFRAME_EXPORT App final : public std::enable_shared_from_this { bool LoadServiceFromFile(const std::string& file); - bool LoadServiceFromJson(const Json::Value& service); - bool LoadServiceFromJsonStr(const std::string& service); + bool LoadServiceFromJson(const Json::Value& service); + bool AddActor( const std::string& inst_name, const std::string& params, @@ -77,7 +85,7 @@ class MYFRAME_EXPORT App final : public std::enable_shared_from_this { const std::shared_ptr SendRequest( std::shared_ptr msg); - std::unique_ptr& GetModManager() { return mods_; } + std::unique_ptr& GetModManager(); int Exec(); @@ -130,8 +138,9 @@ class MYFRAME_EXPORT App final : public std::enable_shared_from_this { stdfs::path lib_dir_; /// node地址 std::string node_addr_; + /// std::atomic warning_msg_size_{10}; - std::atomic_bool quit_{true}; + std::atomic state_{kUninitialized}; std::recursive_mutex local_mtx_; /// 缓存消息列表 struct CacheMsg { From 52416c6a7680fb1c7196b77c0662647b59c3e3fb Mon Sep 17 00:00:00 2001 From: likepeng Date: Wed, 19 Jun 2024 15:28:26 +0800 Subject: [PATCH 02/13] fix: loop dispatch cache msg --- myframe/app.cpp | 65 ++++++++++++++++++++----------------------------- myframe/app.h | 15 +++--------- 2 files changed, 30 insertions(+), 50 deletions(-) diff --git a/myframe/app.cpp b/myframe/app.cpp index 3871c35..bb1ab6c 100644 --- a/myframe/app.cpp +++ b/myframe/app.cpp @@ -400,26 +400,29 @@ bool App::CreateActorContext( std::lock_guard lock(local_mtx_); // 接收缓存中发给自己的消息 for (auto it = cache_msgs_.begin(); it != cache_msgs_.end();) { - if (it->msg->GetDst() == actor_name) { + if ((*it)->GetDst() == actor_name) { LOG(INFO) << actor_name - << " recv msg from cache " << *(it->msg); - DispatchMsg(it->msg); + << " recv msg from cache " << *(it); + DispatchMsg(*it); it = cache_msgs_.erase(it); continue; } ++it; } // 目的地址不存在的暂时放到缓存消息队列 - auto send_list = ctx->GetMailbox()->GetSendList(); - for (auto it = send_list->begin(); it != send_list->end();) { - if (!HasUserInst((*it)->GetDst())) { - LOG(WARNING) << "can't found " << (*it)->GetDst() - << ", cache this msg"; - cache_msgs_.emplace_back(0, *it); - it = send_list->erase(it); - continue; + // 在运行时不再缓存,直接分发 + if (state_.load() != kRunning) { + auto send_list = ctx->GetMailbox()->GetSendList(); + for (auto it = send_list->begin(); it != send_list->end();) { + if (!HasUserInst((*it)->GetDst())) { + LOG(WARNING) << "can't found " << (*it)->GetDst() + << ", cache this msg"; + cache_msgs_.push_back(*it); + it = send_list->erase(it); + continue; + } + ++it; } - ++it; } // 分发目的地址已经存在的消息 DispatchMsg(ctx); @@ -454,16 +457,6 @@ bool App::StartTimerWorker() { return true; } -void App::DispatchMsg(std::list>* msg_list) { - LOG_IF(WARNING, - msg_list->size() > warning_msg_size_.load()) - << " dispatch msg too many"; - for (auto& msg : (*msg_list)) { - DispatchMsg(std::move(msg)); - } - msg_list->clear(); -} - void App::DispatchMsg(std::shared_ptr msg) { std::lock_guard lock(local_mtx_); VLOG(1) << *msg; @@ -508,6 +501,16 @@ void App::DispatchMsg(std::shared_ptr msg) { } } +void App::DispatchMsg(std::list>* msg_list) { + LOG_IF(WARNING, + msg_list->size() > warning_msg_size_.load()) + << " dispatch msg too many"; + for (auto& msg : (*msg_list)) { + DispatchMsg(std::move(msg)); + } + msg_list->clear(); +} + // 将获得的消息分发给其他actor void App::DispatchMsg(std::shared_ptr context) { if (nullptr == context) { @@ -742,20 +745,6 @@ void App::ProcessEvent(const std::vector& evs) { } } -void App::ProcessCacheMsg() { - for (auto it = cache_msgs_.begin(); it != cache_msgs_.end();) { - if (it->search_count >= default_search_count_) { - VLOG(1) << *(it->msg) << " search count: " << it->search_count - << ", dispatch it"; - DispatchMsg(it->msg); - it = cache_msgs_.erase(it); - } else { - it->search_count++; - ++it; - } - } -} - int App::Exec() { if (state_.load() == kUninitialized) { LOG(ERROR) << "not init, please call Init() before Exec()"; @@ -764,13 +753,13 @@ int App::Exec() { int time_wait_ms = 100; std::vector evs; state_.store(kRunning); + /// 处理初始化中缓存消息 + DispatchMsg(&cache_msgs_); while (worker_ctx_mgr_->WorkerSize()) { /// 检查空闲线程队列是否有空闲线程,如果有就找到一个有消息的actor处理 CheckStopWorkers(); /// 等待事件 poller_->Wait(&evs, time_wait_ms); - /// 处理缓存消息 - ProcessCacheMsg(); /// 处理事件 ProcessEvent(evs); } diff --git a/myframe/app.h b/myframe/app.h index 2a34fe7..38d012e 100644 --- a/myframe/app.h +++ b/myframe/app.h @@ -122,11 +122,11 @@ class MYFRAME_EXPORT App final : public std::enable_shared_from_this { /// 通知执行事件 void CheckStopWorkers(); - /// 分发事件 + /// 分发消息 void DispatchMsg(std::shared_ptr msg); void DispatchMsg(std::list>* msg_list); void DispatchMsg(std::shared_ptr context); - void ProcessCacheMsg(); + /// 处理事件 void ProcessEvent(const std::vector& evs); void ProcessWorkerEvent(std::shared_ptr); void ProcessTimerEvent(std::shared_ptr); @@ -143,16 +143,7 @@ class MYFRAME_EXPORT App final : public std::enable_shared_from_this { std::atomic state_{kUninitialized}; std::recursive_mutex local_mtx_; /// 缓存消息列表 - struct CacheMsg { - CacheMsg(int c, std::shared_ptr m) - : search_count(c) - , msg(std::move(m)) {} - int search_count{0}; - std::shared_ptr msg{nullptr}; - }; - int default_search_count_{3}; - std::vector cache_msgs_; - + std::list> cache_msgs_; /// 模块管理对象 std::unique_ptr mods_; /// poller From d68936abde8022c12c2935d3ff5511d7584d909a Mon Sep 17 00:00:00 2001 From: likepeng Date: Thu, 20 Jun 2024 16:42:43 +0800 Subject: [PATCH 03/13] mailbox add run queue --- examples/example_worker_actor_interactive.cpp | 2 +- examples/example_worker_publish.cpp | 4 ++-- myframe/app.cpp | 17 ++++++++------ myframe/event_conn_manager.cpp | 6 ++++- myframe/mailbox.cpp | 22 +++++++++++++++++-- myframe/mailbox.h | 16 +++++++++++--- myframe/worker_common.cpp | 5 +++-- myframe/worker_context.cpp | 16 -------------- myframe/worker_context.h | 9 -------- myframe/worker_context_manager.cpp | 12 +++++----- 10 files changed, 60 insertions(+), 49 deletions(-) diff --git a/examples/example_worker_actor_interactive.cpp b/examples/example_worker_actor_interactive.cpp index beaa9eb..04d6758 100644 --- a/examples/example_worker_actor_interactive.cpp +++ b/examples/example_worker_actor_interactive.cpp @@ -43,7 +43,7 @@ class ExampleWorkerInteractive : public myframe::Worker { return; } while (1) { - const auto msg = mailbox->PopRecv(); + const auto msg = mailbox->PopRun(); if (msg == nullptr) { break; } diff --git a/examples/example_worker_publish.cpp b/examples/example_worker_publish.cpp index 5a48bf4..75ee1ff 100644 --- a/examples/example_worker_publish.cpp +++ b/examples/example_worker_publish.cpp @@ -22,8 +22,8 @@ class ExampleWorkerPublic : public myframe::Worker { return; } auto mailbox = GetMailbox(); - while (!mailbox->RecvEmpty()) { - const auto msg = mailbox->PopRecv(); + while (!mailbox->RunEmpty()) { + const auto msg = mailbox->PopRun(); // send msg by udp/tcp/zmq/... LOG(INFO) << "public msg " << msg->GetData() << " ..."; } diff --git a/myframe/app.cpp b/myframe/app.cpp index bb1ab6c..943dcd1 100644 --- a/myframe/app.cpp +++ b/myframe/app.cpp @@ -325,6 +325,7 @@ int App::Send(std::shared_ptr msg) { poller_->Add(conn); auto ret = conn->Send(std::move(msg)); poller_->Del(conn); + // 主动释放 ev_conn_mgr_->Release(std::move(conn)); return ret; } @@ -343,7 +344,8 @@ const std::shared_ptr App::SendRequest( poller_->Add(conn); auto resp = conn->SendRequest(std::move(msg)); poller_->Del(conn); - ev_conn_mgr_->Release(std::move(conn)); + // 不需要调用ev_conn_mgr_->Release() + // 系统会主动释放 return resp; } @@ -539,16 +541,17 @@ void App::CheckStopWorkers() { << actor_ctx->GetActor()->GetActorName() << " dispatch msg to " << *worker_ctx; - auto msg_list = actor_ctx->GetMailbox()->GetRecvList(); - if (!msg_list->empty()) { + auto actor_mailbox = actor_ctx->GetMailbox(); + int actor_ctx_recv_sz = actor_mailbox->RecvSize(); + if (!actor_mailbox->RecvEmpty()) { LOG_IF(WARNING, - msg_list->size() > warning_msg_size_.load()) + actor_ctx_recv_sz > warning_msg_size_.load()) << actor_ctx->GetActor()->GetActorName() - << " recv msg size too many: " << msg_list->size(); + << " recv msg size too many: " << actor_ctx_recv_sz; VLOG(1) << "run " << actor_ctx->GetActor()->GetActorName(); - worker_ctx->GetMailbox()->Recv(msg_list); + actor_mailbox->MoveToRun(); VLOG(1) << actor_ctx->GetActor()->GetActorName() - << " has " << worker_ctx->GetMailbox()->RecvSize() + << " has " << actor_ctx_recv_sz << " msg need process"; worker_ctx_mgr_->PopFrontIdleWorker(); auto common_idle_worker = worker_ctx->GetWorker(); diff --git a/myframe/event_conn_manager.cpp b/myframe/event_conn_manager.cpp index 0487114..d467e26 100644 --- a/myframe/event_conn_manager.cpp +++ b/myframe/event_conn_manager.cpp @@ -59,7 +59,9 @@ std::shared_ptr EventConnManager::Alloc() { void EventConnManager::Release(std::shared_ptr ev) { // remove from run_conn - ev_mgr_->Del(ev); + if (!ev_mgr_->Del(ev)) { + return; + } // add to idle_conn std::lock_guard g(mtx_); idle_conn_.push_back(std::move(ev)); @@ -79,6 +81,8 @@ void EventConnManager::Notify( LOG(WARNING) << "event " << ev->GetName() << " need't resp msg"; return; } + // need release immediately + Release(ev); // push msg to event_conn ev->GetMailbox()->Recv(std::move(msg)); // send cmd to event_conn diff --git a/myframe/mailbox.cpp b/myframe/mailbox.cpp index a55d6ee..59afc9b 100644 --- a/myframe/mailbox.cpp +++ b/myframe/mailbox.cpp @@ -88,13 +88,31 @@ const std::shared_ptr Mailbox::PopRecv() { return msg; } +void Mailbox::MoveToRun() { + run_.splice(run_.end(), recv_); +} + +bool Mailbox::RunEmpty() const { + return run_.empty(); +} + +const std::shared_ptr Mailbox::PopRun() { + if (run_.empty()) { + return nullptr; + } + auto msg = run_.front(); + run_.pop_front(); + return msg; +} + std::list>* Mailbox::GetRecvList() { return &recv_; } std::ostream& operator<<(std::ostream& out, const Mailbox& mailbox) { - out << mailbox.Addr() << " recv " << mailbox.RecvSize() << ", " - << " send " << mailbox.SendSize(); + out << mailbox.Addr() << " recv " << mailbox.RecvSize() + << ", send " << mailbox.SendSize() + << ", run " << mailbox.run_.size(); return out; } diff --git a/myframe/mailbox.h b/myframe/mailbox.h index 1729293..ec44b65 100644 --- a/myframe/mailbox.h +++ b/myframe/mailbox.h @@ -16,8 +16,12 @@ namespace myframe { class Msg; class MYFRAME_EXPORT Mailbox final { + friend std::ostream& operator<<(std::ostream&, const Mailbox&); friend class ActorContext; + friend class ActorContextManager; friend class WorkerContext; + friend class WorkerContextManager; + friend class EventConn; friend class EventConnManager; friend class App; @@ -25,7 +29,7 @@ class MYFRAME_EXPORT Mailbox final { /// 邮箱地址 const std::string& Addr() const; - /// 发件箱(适用于worker/actor) + /// 发件箱 int SendSize() const; bool SendEmpty() const; void SendClear(); @@ -38,7 +42,13 @@ class MYFRAME_EXPORT Mailbox final { const std::any& data); void Send(std::list>* msg_list); - /// 收件箱(适用于worker) + /// 信件处理 + void MoveToRun(); + bool RunEmpty() const; + const std::shared_ptr PopRun(); + + private: + /// 收件箱 int RecvSize() const; bool RecvEmpty() const; void RecvClear(); @@ -46,7 +56,6 @@ class MYFRAME_EXPORT Mailbox final { void Recv(std::list>* msg_list); const std::shared_ptr PopRecv(); - private: /// 设置邮箱地址 void SetAddr(const std::string& addr); @@ -56,6 +65,7 @@ class MYFRAME_EXPORT Mailbox final { std::string addr_; std::list> recv_; std::list> send_; + std::list> run_; }; MYFRAME_EXPORT std::ostream& operator<<( diff --git a/myframe/worker_common.cpp b/myframe/worker_common.cpp index bb708ee..15c017c 100644 --- a/myframe/worker_common.cpp +++ b/myframe/worker_common.cpp @@ -42,8 +42,9 @@ int WorkerCommon::Work() { LOG(ERROR) << "context is nullptr"; return -1; } - while (!GetMailbox()->RecvEmpty()) { - ctx->Proc(GetMailbox()->PopRecv()); + auto ctx_mailbox = ctx->GetMailbox(); + while (!ctx_mailbox->RunEmpty()) { + ctx->Proc(ctx_mailbox->PopRun()); } return 0; diff --git a/myframe/worker_context.cpp b/myframe/worker_context.cpp index 074d3ee..d3b9070 100644 --- a/myframe/worker_context.cpp +++ b/myframe/worker_context.cpp @@ -77,22 +77,6 @@ void WorkerContext::ListenThread() { cmd_channel_->SendToMain(CmdChannel::Cmd::kQuit); } -std::size_t WorkerContext::CacheSize() const { - return cache_.size(); -} - -std::list>* WorkerContext::GetCache() { - return &cache_; -} - -void WorkerContext::Cache(std::shared_ptr msg) { - cache_.push_back(std::move(msg)); -} - -void WorkerContext::Cache(std::list>* msg_list) { - cache_.splice(cache_.end(), *msg_list); -} - Mailbox* WorkerContext::GetMailbox() { return &mailbox_; } diff --git a/myframe/worker_context.h b/myframe/worker_context.h index 23130e3..a18a489 100644 --- a/myframe/worker_context.h +++ b/myframe/worker_context.h @@ -55,12 +55,6 @@ class WorkerContext final : public Event { return std::dynamic_pointer_cast(worker_); } - /// recv cache list method - std::size_t CacheSize() const; - std::list>* GetCache(); - void Cache(std::shared_ptr msg); - void Cache(std::list>* msg_list); - /// 线程交互控制flag函数 void SetCtrlOwnerFlag(CtrlOwner owner) { ctrl_owner_ = owner; @@ -90,9 +84,6 @@ class WorkerContext final : public Event { std::shared_ptr worker_; std::weak_ptr app_; - /// recv cache list - std::list> cache_; - /// mailbox Mailbox mailbox_; diff --git a/myframe/worker_context_manager.cpp b/myframe/worker_context_manager.cpp index 4ffd5f8..954f0c6 100644 --- a/myframe/worker_context_manager.cpp +++ b/myframe/worker_context_manager.cpp @@ -139,7 +139,7 @@ void WorkerContextManager::WeakupWorker() { ++it; continue; } - worker_ctx->GetMailbox()->Recv(worker_ctx->GetCache()); + worker_ctx->GetMailbox()->MoveToRun(); it = weakup_workers_ctx_.erase(it); worker_ctx->SetCtrlOwnerFlag(WorkerContext::CtrlOwner::kWorker); worker_ctx->SetWaitMsgQueueFlag(false); @@ -165,10 +165,10 @@ void WorkerContextManager::DispatchWorkerMsg( LOG(WARNING) << worker_name << " unsupport recv msg, drop it"; return; } - worker_ctx->Cache(std::move(msg)); - LOG_IF(WARNING, - worker_ctx->CacheSize() > warning_msg_size_.load()) - << *worker_ctx << " has " << worker_ctx->CacheSize() + worker_ctx->GetMailbox()->Recv(std::move(msg)); + int recv_size = worker_ctx->GetMailbox()->RecvSize(); + LOG_IF(WARNING, recv_size > warning_msg_size_.load()) + << *worker_ctx << " has " << recv_size << " msg not process!!!"; if (worker_ctx->IsInWaitMsgQueue()) { VLOG(1) << *worker_ctx << " already in wait queue, return"; @@ -176,7 +176,7 @@ void WorkerContextManager::DispatchWorkerMsg( } worker_ctx->SetWaitMsgQueueFlag(true); std::unique_lock lk(rw_); - weakup_workers_ctx_.emplace_back(worker_ctx); + weakup_workers_ctx_.push_back(std::move(worker_ctx)); } } // namespace myframe From 1f8e35b2a3b32032ad886cd0f30d90251b8fd80b Mon Sep 17 00:00:00 2001 From: likepeng Date: Thu, 20 Jun 2024 16:50:41 +0800 Subject: [PATCH 04/13] fix compile error --- myframe/app.cpp | 2 +- myframe/worker_context_manager.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/myframe/app.cpp b/myframe/app.cpp index 943dcd1..d864073 100644 --- a/myframe/app.cpp +++ b/myframe/app.cpp @@ -542,7 +542,7 @@ void App::CheckStopWorkers() { << " dispatch msg to " << *worker_ctx; auto actor_mailbox = actor_ctx->GetMailbox(); - int actor_ctx_recv_sz = actor_mailbox->RecvSize(); + std::size_t actor_ctx_recv_sz = actor_mailbox->RecvSize(); if (!actor_mailbox->RecvEmpty()) { LOG_IF(WARNING, actor_ctx_recv_sz > warning_msg_size_.load()) diff --git a/myframe/worker_context_manager.cpp b/myframe/worker_context_manager.cpp index 954f0c6..0276137 100644 --- a/myframe/worker_context_manager.cpp +++ b/myframe/worker_context_manager.cpp @@ -166,7 +166,7 @@ void WorkerContextManager::DispatchWorkerMsg( return; } worker_ctx->GetMailbox()->Recv(std::move(msg)); - int recv_size = worker_ctx->GetMailbox()->RecvSize(); + std::size_t recv_size = worker_ctx->GetMailbox()->RecvSize(); LOG_IF(WARNING, recv_size > warning_msg_size_.load()) << *worker_ctx << " has " << recv_size << " msg not process!!!"; From 7fd0339590d2adcded17b31619c26a076b856e79 Mon Sep 17 00:00:00 2001 From: likepeng Date: Thu, 20 Jun 2024 17:07:53 +0800 Subject: [PATCH 05/13] fix compile error2 --- examples/example_worker_interactive_with_3rd_frame.cpp | 4 ++-- myframe/mailbox.cpp | 6 +++++- myframe/mailbox.h | 6 ++++-- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/examples/example_worker_interactive_with_3rd_frame.cpp b/examples/example_worker_interactive_with_3rd_frame.cpp index d3a325f..c64e175 100644 --- a/examples/example_worker_interactive_with_3rd_frame.cpp +++ b/examples/example_worker_interactive_with_3rd_frame.cpp @@ -102,8 +102,8 @@ class ExampleWorkerInteractiveWith3rdFrame : public myframe::Worker { recv_run_flag_ = true; } else if (cmd == myframe::CmdChannel::Cmd::kRunWithMsg) { auto mailbox = GetMailbox(); - while (!mailbox->RecvEmpty()) { - const auto msg = mailbox->PopRecv(); + while (!mailbox->RunEmpty()) { + const auto msg = mailbox->PopRun(); // 接收到其它组件消息 LOG(INFO) << "get main " << msg->GetData(); } diff --git a/myframe/mailbox.cpp b/myframe/mailbox.cpp index 59afc9b..ab8c038 100644 --- a/myframe/mailbox.cpp +++ b/myframe/mailbox.cpp @@ -96,6 +96,10 @@ bool Mailbox::RunEmpty() const { return run_.empty(); } +int Mailbox::RunSize() const { + return run_.size(); +} + const std::shared_ptr Mailbox::PopRun() { if (run_.empty()) { return nullptr; @@ -112,7 +116,7 @@ std::list>* Mailbox::GetRecvList() { std::ostream& operator<<(std::ostream& out, const Mailbox& mailbox) { out << mailbox.Addr() << " recv " << mailbox.RecvSize() << ", send " << mailbox.SendSize() - << ", run " << mailbox.run_.size(); + << ", run " << mailbox.RunSize(); return out; } diff --git a/myframe/mailbox.h b/myframe/mailbox.h index ec44b65..5d05f7c 100644 --- a/myframe/mailbox.h +++ b/myframe/mailbox.h @@ -16,7 +16,6 @@ namespace myframe { class Msg; class MYFRAME_EXPORT Mailbox final { - friend std::ostream& operator<<(std::ostream&, const Mailbox&); friend class ActorContext; friend class ActorContextManager; friend class WorkerContext; @@ -45,12 +44,15 @@ class MYFRAME_EXPORT Mailbox final { /// 信件处理 void MoveToRun(); bool RunEmpty() const; + int RunSize() const; const std::shared_ptr PopRun(); - private: /// 收件箱 int RecvSize() const; bool RecvEmpty() const; + + private: + /// 收件箱 void RecvClear(); void Recv(std::shared_ptr msg); void Recv(std::list>* msg_list); From 74fa8615588acc84aa1d8baedeff4367a0205045 Mon Sep 17 00:00:00 2001 From: likepeng Date: Sun, 23 Jun 2024 12:32:09 +0800 Subject: [PATCH 06/13] update 3rd script --- 3rd/CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/3rd/CMakeLists.txt b/3rd/CMakeLists.txt index 5f226dd..90b5458 100644 --- a/3rd/CMakeLists.txt +++ b/3rd/CMakeLists.txt @@ -6,8 +6,8 @@ project(myframe_deps VERSION 1.0.0) include(ExternalProject) -set(DEPS_SOURCE_DIR ${CMAKE_SOURCE_DIR}/src) -set(DEPS_DOWNLOAD_DIR ${CMAKE_SOURCE_DIR}/pkg) +set(DEPS_SOURCE_DIR ${CMAKE_SOURCE_DIR}/src CACHE PATH "package source dir") +set(DEPS_DOWNLOAD_DIR ${CMAKE_SOURCE_DIR}/pkg CACHE PATH "package tgz dir") ExternalProject_Add( gflags From 6c2884c7af79b23c61dcd9793a82b303b0f3995e Mon Sep 17 00:00:00 2001 From: likepeng Date: Thu, 4 Jul 2024 15:23:21 +0800 Subject: [PATCH 07/13] Optimize split string --- myframe/common.cpp | 21 ++++++++++++++------- myframe/common.h | 3 ++- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/myframe/common.cpp b/myframe/common.cpp index 96c7c5c..10b98bc 100644 --- a/myframe/common.cpp +++ b/myframe/common.cpp @@ -109,14 +109,21 @@ bool Common::IsAbsolutePath(const std::string& path) { return false; } -std::vector Common::SplitMsgName(const std::string& name) { - std::vector name_list; - std::string item; - std::stringstream ss(name); - while (std::getline(ss, item, '.')) { - name_list.push_back(std::move(item)); +std::vector Common::SplitMsgName(const std::string& name) { + std::vector tokens; + tokens.reserve(3); + size_t name_sz = name.size(); + size_t start_pos = 0; + for (size_t i = 0; i < name_sz; ++i) { + if (name[i] == '.') { + tokens.emplace_back(&name[start_pos], i - start_pos); + start_pos = i + 1; + } + if (i == name_sz - 1) { + tokens.emplace_back(&name[start_pos], i - start_pos + 1); + } } - return name_list; + return tokens; } } // namespace myframe diff --git a/myframe/common.h b/myframe/common.h index d1c4dfc..7fd8b5a 100644 --- a/myframe/common.h +++ b/myframe/common.h @@ -10,6 +10,7 @@ Author: 李柯鹏 #include #include #include +#include #if __has_include() #include namespace stdfs = std::filesystem; @@ -31,7 +32,7 @@ class MYFRAME_EXPORT Common final { static stdfs::path GetAbsolutePath(const std::string& flag_path); static bool IsAbsolutePath(const std::string& path); - static std::vector SplitMsgName(const std::string& name); + static std::vector SplitMsgName(const std::string& name); }; } // namespace myframe From ce8d9cd5fb4b052fe348f02e2a4614c719305745 Mon Sep 17 00:00:00 2001 From: likepeng Date: Thu, 4 Jul 2024 15:32:48 +0800 Subject: [PATCH 08/13] print version in log --- myframe/app.cpp | 5 +++-- myframe/config.h.in | 2 ++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/myframe/app.cpp b/myframe/app.cpp index d864073..bf9e42e 100644 --- a/myframe/app.cpp +++ b/myframe/app.cpp @@ -51,8 +51,9 @@ App::App() , actor_ctx_mgr_(new ActorContextManager()) , ev_mgr_(new EventManager()) , ev_conn_mgr_(new EventConnManager(ev_mgr_, poller_)) - , worker_ctx_mgr_(new WorkerContextManager(ev_mgr_)) -{} + , worker_ctx_mgr_(new WorkerContextManager(ev_mgr_)) { + LOG(INFO) << "myframe version: " << MYFRAME_VERSION; +} App::~App() { LOG(INFO) << "app deconstruct"; diff --git a/myframe/config.h.in b/myframe/config.h.in index 79bbc74..6bd955e 100644 --- a/myframe/config.h.in +++ b/myframe/config.h.in @@ -6,4 +6,6 @@ Author: 李柯鹏 ****************************************************************************/ #pragma once +#define MYFRAME_VERSION "@PROJECT_VERSION@" + #cmakedefine MYFRAME_USE_CV \ No newline at end of file From b5b967ab03fd1a0c8718fb01504ec6a5cb0404d7 Mon Sep 17 00:00:00 2001 From: likepeng Date: Thu, 4 Jul 2024 16:32:14 +0800 Subject: [PATCH 09/13] Optimize find --- myframe/actor_context_manager.cpp | 6 +++--- myframe/event_manager.cpp | 5 +++-- myframe/event_manager.h | 10 ++++++---- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/myframe/actor_context_manager.cpp b/myframe/actor_context_manager.cpp index d9dca96..e48a3cd 100644 --- a/myframe/actor_context_manager.cpp +++ b/myframe/actor_context_manager.cpp @@ -54,12 +54,12 @@ bool ActorContextManager::RegContext(std::shared_ptr ctx) { std::shared_ptr ActorContextManager::GetContext( const std::string& actor_name) { std::shared_lock lk(rw_); - if (ctxs_.find(actor_name) == ctxs_.end()) { + auto p = ctxs_.find(actor_name); + if (p == ctxs_.end()) { LOG(WARNING) << "not found " << actor_name; return nullptr; } - auto ctx = ctxs_[actor_name]; - return ctx; + return p->second; } std::vector ActorContextManager::GetAllActorAddr() { diff --git a/myframe/event_manager.cpp b/myframe/event_manager.cpp index 6db1bb9..97c72aa 100644 --- a/myframe/event_manager.cpp +++ b/myframe/event_manager.cpp @@ -26,10 +26,11 @@ EventManager::~EventManager() { ev_handle_t EventManager::ToHandle(const std::string& name) { std::shared_lock lk(rw_); - if (name_handle_map_.find(name) == name_handle_map_.end()) { + auto p = name_handle_map_.find(name); + if (p == name_handle_map_.end()) { return Event::DEFAULT_EV_HANDLE; } - return name_handle_map_[name]; + return p->second; } bool EventManager::Has(const std::string& name) { diff --git a/myframe/event_manager.h b/myframe/event_manager.h index 91b0b62..da48922 100644 --- a/myframe/event_manager.h +++ b/myframe/event_manager.h @@ -30,20 +30,22 @@ class EventManager final { template std::shared_ptr Get(ev_handle_t h) { std::shared_lock lk(rw_); - if (evs_.find(h) == evs_.end()) { + auto p = evs_.find(h); + if (p == evs_.end()) { return nullptr; } - return std::dynamic_pointer_cast(evs_[h]); + return std::dynamic_pointer_cast(p->second); } template std::shared_ptr Get(const std::string& name) { std::shared_lock lk(rw_); - if (name_handle_map_.find(name) == name_handle_map_.end()) { + auto p = name_handle_map_.find(name); + if (p == name_handle_map_.end()) { return nullptr; } lk.unlock(); - return Get(name_handle_map_[name]); + return Get(p->second); } bool Has(const std::string& name); From e48e60e81721554b651e7e11ba12851c4528b6b9 Mon Sep 17 00:00:00 2001 From: likepeng Date: Mon, 30 Sep 2024 00:39:34 +0800 Subject: [PATCH 10/13] remove install deps --- CMakeLists.txt | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index bbc87b7..2eb22f7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,7 +6,6 @@ project(myframe VERSION 0.9.4) ### option option(MYFRAME_USE_CV "Using conditional variables for thread communication" ON) -option(MYFRAME_INSTALL_DEPS "Install deps" OFF) option(MYFRAME_GENERATE_EXAMPLE "Generate example library" ON) option(MYFRAME_GENERATE_TEST "Generate test executable program" ON) @@ -72,15 +71,6 @@ install(PROGRAMS install(DIRECTORY templates DESTINATION .) install(DIRECTORY DESTINATION ${MYFRAME_LOG_DIR}) install(DIRECTORY DESTINATION ${MYFRAME_SERVICE_DIR}) -if (MYFRAME_INSTALL_DEPS) - if (${CMAKE_VERSION} VERSION_GREATER_EQUAL "3.21") - install(IMPORTED_RUNTIME_ARTIFACTS jsoncpp_lib) - install(IMPORTED_RUNTIME_ARTIFACTS glog::glog) - install(IMPORTED_RUNTIME_ARTIFACTS gflags) - else() - message(WARNING "Can not install deps lib, cmake version ${CMAKE_VERSION} < 3.21") - endif() -endif() ### package include(Packing) From 95f71842823bf6481e819743438c1507a2d475f5 Mon Sep 17 00:00:00 2001 From: likepeng Date: Sun, 6 Oct 2024 12:54:43 +0800 Subject: [PATCH 11/13] disable msvc warning 4251 --- CMakeLists.txt | 5 +++++ README.md | 6 +++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 2eb22f7..fc45854 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -25,6 +25,11 @@ else() message(STATUS "Set default cxx standard 17") endif() add_compile_options("$<$:/source-charset:utf-8>") +if (MSVC) + add_definitions( + -wd4251 + ) +endif (MSVC) ### install path if (CMAKE_INSTALL_PREFIX_INITIALIZED_TO_DEFAULT) diff --git a/README.md b/README.md index ae12259..18b4db1 100644 --- a/README.md +++ b/README.md @@ -29,11 +29,11 @@ worker自驱动,可以通过消息与actor交互; ```sh # 下载/构建/安装依赖库 cd 3rd -cmake -S . -B build -DCMAKE_INSTALL_PREFIX=../output -cmake --build build -j +cmake -S . -B build -DCMAKE_INSTALL_PREFIX="../output" +cmake --build build -j --config Release # 构建安装 cd .. -cmake -S . -B build -DCMAKE_INSTALL_PREFIX=./output -DCMAKE_PREFIX_PATH=./output +cmake -S . -B build -DCMAKE_INSTALL_PREFIX="./output" -DCMAKE_PREFIX_PATH="./output" cmake --build build -j --config Release --target install ``` From 7b453b6c031bf966a531c7cf8638d376fb70dcfe Mon Sep 17 00:00:00 2001 From: likepeng Date: Sun, 6 Oct 2024 13:25:30 +0800 Subject: [PATCH 12/13] fix msvc warning 4244 --- myframe/worker_timer.cpp | 2 +- test/performance_trans100_fullspeed_test.cpp | 3 ++- test/performance_trans10_cost_test.cpp | 7 ++++--- test/performance_trans1_cost_test.cpp | 5 +++-- test/performance_trans1_fullspeed_test.cpp | 3 ++- test/performance_trans20_fullspeed_test.cpp | 3 ++- 6 files changed, 14 insertions(+), 9 deletions(-) diff --git a/myframe/worker_timer.cpp b/myframe/worker_timer.cpp index 1f420a9..51df19b 100644 --- a/myframe/worker_timer.cpp +++ b/myframe/worker_timer.cpp @@ -19,7 +19,7 @@ namespace myframe { uint64_t TimerManager::GetMonoTimeMs() { auto now = std::chrono::steady_clock::now(); - return now.time_since_epoch().count() / 1e6; + return static_cast(now.time_since_epoch().count() / 1e6); } TimerManager::TimerManager() { diff --git a/test/performance_trans100_fullspeed_test.cpp b/test/performance_trans100_fullspeed_test.cpp index a7ee647..ec6b279 100644 --- a/test/performance_trans100_fullspeed_test.cpp +++ b/test/performance_trans100_fullspeed_test.cpp @@ -67,7 +67,8 @@ class FullSpeed100ActorTransTest : public myframe::Actor { LOG(INFO) << "100 actor fullspeed trans msg avg(cnt/sec): " << avg; std::sort(msg_cnt_per_sec_list_.begin(), msg_cnt_per_sec_list_.end()); LOG(INFO) << "100 actor fullspeed trans msg 99(cnt/sec): " - << msg_cnt_per_sec_list_[msg_cnt_per_sec_list_.size() * 0.99]; + << msg_cnt_per_sec_list_[ + static_cast(msg_cnt_per_sec_list_.size() * 0.99)]; GetApp()->Quit(); } } diff --git a/test/performance_trans10_cost_test.cpp b/test/performance_trans10_cost_test.cpp index 37a24cc..bcb008f 100644 --- a/test/performance_trans10_cost_test.cpp +++ b/test/performance_trans10_cost_test.cpp @@ -75,7 +75,8 @@ class Trans10ActorCostTest : public myframe::Actor { LOG(INFO) << "trans 10 actor avg(us): " << avg; std::sort(cost_us_list_.begin(), cost_us_list_.end()); LOG(INFO) << "trans 10 actor 99(us): " << - cost_us_list_[cost_us_list_.size() * 0.99]; + cost_us_list_[ + static_cast(cost_us_list_.size() * 0.99)]; GetApp()->Quit(); } } @@ -84,14 +85,14 @@ class Trans10ActorCostTest : public myframe::Actor { static bool init_; static std::chrono::high_resolution_clock::time_point total_; static std::chrono::high_resolution_clock::time_point begin_; - static std::vector cost_us_list_; + static std::vector cost_us_list_; int task_num_{0}; std::string msg_; }; bool Trans10ActorCostTest::init_{false}; std::chrono::high_resolution_clock::time_point Trans10ActorCostTest::total_; std::chrono::high_resolution_clock::time_point Trans10ActorCostTest::begin_; -std::vector Trans10ActorCostTest::cost_us_list_; +std::vector Trans10ActorCostTest::cost_us_list_; int main() { diff --git a/test/performance_trans1_cost_test.cpp b/test/performance_trans1_cost_test.cpp index 5f09e39..6dc9350 100644 --- a/test/performance_trans1_cost_test.cpp +++ b/test/performance_trans1_cost_test.cpp @@ -60,7 +60,8 @@ class TransMsgCostTest : public myframe::Actor { LOG(INFO) << "trans 1 actor avg(us): " << avg; std::sort(cost_us_list_.begin(), cost_us_list_.end()); LOG(INFO) << "trans 1 actor 99(us): " << - cost_us_list_[cost_us_list_.size() * 0.99]; + cost_us_list_[ + static_cast(cost_us_list_.size() * 0.99)]; GetApp()->Quit(); } } @@ -69,7 +70,7 @@ class TransMsgCostTest : public myframe::Actor { std::chrono::high_resolution_clock::time_point begin_; std::chrono::high_resolution_clock::time_point last_; std::string msg_; - std::vector cost_us_list_; + std::vector cost_us_list_; }; int main() { diff --git a/test/performance_trans1_fullspeed_test.cpp b/test/performance_trans1_fullspeed_test.cpp index 4877e48..d5ce91b 100644 --- a/test/performance_trans1_fullspeed_test.cpp +++ b/test/performance_trans1_fullspeed_test.cpp @@ -64,7 +64,8 @@ class FullSpeedTransTest : public myframe::Actor { LOG(INFO) << "1 actor fullspeed trans msg avg(cnt/sec): " << avg; std::sort(msg_cnt_per_sec_list_.begin(), msg_cnt_per_sec_list_.end()); LOG(INFO) << "1 actor fullspeed trans msg 99(cnt/sec): " - << msg_cnt_per_sec_list_[msg_cnt_per_sec_list_.size() * 0.99]; + << msg_cnt_per_sec_list_[ + static_cast(msg_cnt_per_sec_list_.size() * 0.99)]; GetApp()->Quit(); } } diff --git a/test/performance_trans20_fullspeed_test.cpp b/test/performance_trans20_fullspeed_test.cpp index 22f14e5..d906bb0 100644 --- a/test/performance_trans20_fullspeed_test.cpp +++ b/test/performance_trans20_fullspeed_test.cpp @@ -66,7 +66,8 @@ class FullSpeed20ActorTransTest : public myframe::Actor { LOG(INFO) << "20 actor fullspeed trans msg avg(cnt/sec): " << avg; std::sort(msg_cnt_per_sec_list_.begin(), msg_cnt_per_sec_list_.end()); LOG(INFO) << "20 actor fullspeed trans msg 99(cnt/sec): " - << msg_cnt_per_sec_list_[msg_cnt_per_sec_list_.size() * 0.99]; + << msg_cnt_per_sec_list_[ + static_cast(msg_cnt_per_sec_list_.size() * 0.99)]; GetApp()->Quit(); } } From 3f315c11225367e480760091fa7da98e6af8aea3 Mon Sep 17 00:00:00 2001 From: likepeng Date: Fri, 6 Dec 2024 22:42:54 +0800 Subject: [PATCH 13/13] macos workflow runs-on macos-latest --- .github/workflows/macos.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/macos.yml b/.github/workflows/macos.yml index d35edd4..41ad418 100644 --- a/.github/workflows/macos.yml +++ b/.github/workflows/macos.yml @@ -5,7 +5,7 @@ on: [push, pull_request] jobs: build-macos: name: AppleClang-C++${{matrix.std}}-${{matrix.build_type}} - runs-on: macos-12 + runs-on: macos-latest permissions: actions: read contents: read