Skip to content

Commit

Permalink
actor add run queue size
Browse files Browse the repository at this point in the history
  • Loading branch information
lkpworkspace committed Jan 2, 2025
1 parent 1ee17db commit eee891f
Show file tree
Hide file tree
Showing 12 changed files with 61 additions and 7 deletions.
2 changes: 2 additions & 0 deletions examples/example_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class ExampleActorConfig : public myframe::Actor {
auto conf = GetConfig();
LOG(INFO) << GetActorName() << " pending queue size "
<< GetMailbox()->GetPendingQueueSize();
LOG(INFO) << GetActorName() << " run queue size "
<< GetMailbox()->GetRunQueueSize();
LOG(INFO) << GetActorName() << " conf " << conf->toStyledString();
return 0;
}
Expand Down
2 changes: 2 additions & 0 deletions examples/example_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
{
"instance_name":"#1",
"instance_config":{
"pending_queue_size":-1,
"run_queue_size":-1,
"key1":"hello",
"key2":"world"
}
Expand Down
1 change: 1 addition & 0 deletions launcher/conf/sys.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"default_pending_queue_size":-1,
"default_run_queue_size":-1,
"thread_poll_size":4,
"conn_event_size":2,
"warning_msg_size":10
Expand Down
3 changes: 2 additions & 1 deletion launcher/launcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ int main(int argc, char** argv) {
module_args.GetThreadPoolSize(),
module_args.GetConnEventSize(),
module_args.GetWarningMsgSize(),
module_args.GetDefaultPendingQueueSize())) {
module_args.GetDefaultPendingQueueSize(),
module_args.GetDefaultRunQueueSize())) {
LOG(ERROR) << "Init failed";
return -1;
}
Expand Down
4 changes: 4 additions & 0 deletions launcher/module_argument.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ bool ModuleArgument::ParseSysConf(const std::string& sys_conf) {
&& root["default_pending_queue_size"].isInt()) {
default_pending_queue_size_ = root["default_pending_queue_size"].asInt();
}
if (root.isMember("default_run_queue_size")
&& root["default_run_queue_size"].isInt()) {
default_run_queue_size_ = root["default_run_queue_size"].asInt();
}
return true;
}

Expand Down
4 changes: 4 additions & 0 deletions launcher/module_argument.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ class ModuleArgument final {
inline int GetDefaultPendingQueueSize() const {
return default_pending_queue_size_;
}
inline int GetDefaultRunQueueSize() const {
return default_run_queue_size_;
}

private:
bool ParseSysConf(const std::string&);
Expand All @@ -39,6 +42,7 @@ class ModuleArgument final {
int conn_event_size_{2};
int warning_msg_size_{10};
int default_pending_queue_size_{-1};
int default_run_queue_size_{-1};
std::string log_dir_;
std::string lib_dir_;
std::string conf_dir_;
Expand Down
5 changes: 5 additions & 0 deletions myframe/actor_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,16 @@ ActorContext::ActorContext(
actor_->SetContext(this);
mailbox_.SetAddr(actor_->GetActorName());
int pending_queue_size = app->GetDefaultPendingQueueSize();
int run_queue_size = app->GetDefaultRunQueueSize();
auto cfg = actor_->GetConfig();
if (cfg->isMember("pending_queue_size")) {
pending_queue_size = cfg->get("pending_queue_size", -1).asInt();
}
if (cfg->isMember("run_queue_size")) {
run_queue_size = cfg->get("run_queue_size", -1).asInt();
}
mailbox_.SetPendingQueueSize(pending_queue_size);
mailbox_.SetRunQueueSize(run_queue_size);
LOG(INFO) << mailbox_.Addr() << " context create";
}

Expand Down
5 changes: 3 additions & 2 deletions myframe/actor_context_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ class ActorContextManager final {
std::vector<std::string> GetAllActorAddr();
bool HasActor(const std::string& name);

/* 将有消息的actor放入链表 */
void PushContext(std::shared_ptr<ActorContext> ctx);

private:
/* 获得actor名对应的actor */
std::shared_ptr<ActorContext> GetContext(const std::string& actor_name);
/* 将有消息的actor放入链表 */
void PushContext(std::shared_ptr<ActorContext> ctx);
void PrintWaitQueue();

/// 当前注册actor数量
Expand Down
12 changes: 11 additions & 1 deletion myframe/app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ bool App::Init(
int thread_pool_size,
int event_conn_size,
int warning_msg_size,
int default_pending_queue_size) {
int default_pending_queue_size,
int default_run_queue_size) {
if (state_.load() != kUninitialized) {
return true;
}
Expand All @@ -73,6 +74,7 @@ bool App::Init(
lib_dir_ = lib_dir;
warning_msg_size_.store(warning_msg_size);
default_pending_queue_size_ = default_pending_queue_size;
default_run_queue_size_ = default_run_queue_size;
ret &= poller_->Init();
ret &= worker_ctx_mgr_->Init(warning_msg_size);
ret &= ev_conn_mgr_->Init(event_conn_size);
Expand Down Expand Up @@ -558,6 +560,10 @@ void App::CheckStopWorkers() {
worker_ctx_mgr_->PopFrontIdleWorker();
auto common_idle_worker = worker_ctx->GetWorker<WorkerCommon>();
common_idle_worker->SetActorContext(actor_ctx);
// 接收队列不空,重新加入等待执行队列
if (!actor_mailbox->RecvEmpty()) {
actor_ctx_mgr_->PushContext(std::move(actor_ctx));
}
worker_ctx->GetCmdChannel()->SendToOwner(CmdChannel::Cmd::kRun);
} else {
LOG(ERROR) << actor_ctx->GetActor()->GetActorName() << " has no msg";
Expand Down Expand Up @@ -821,4 +827,8 @@ int App::GetDefaultPendingQueueSize() const {
return default_pending_queue_size_;
}

int App::GetDefaultRunQueueSize() const {
return default_run_queue_size_;
}

} // namespace myframe
5 changes: 4 additions & 1 deletion myframe/app.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ class MYFRAME_EXPORT App final : public std::enable_shared_from_this<App> {
int thread_pool_size = 4,
int event_conn_size = 2,
int warning_msg_size = 10,
int default_pending_queue_size = -1);
int default_pending_queue_size = -1,
int default_run_queue_size = -1);

int LoadServiceFromDir(const std::string& path);

Expand Down Expand Up @@ -93,6 +94,7 @@ class MYFRAME_EXPORT App final : public std::enable_shared_from_this<App> {
void Quit();

int GetDefaultPendingQueueSize() const;
int GetDefaultRunQueueSize() const;

private:
bool CreateActorContext(
Expand Down Expand Up @@ -143,6 +145,7 @@ class MYFRAME_EXPORT App final : public std::enable_shared_from_this<App> {
std::string node_addr_;
///
int default_pending_queue_size_{-1};
int default_run_queue_size_{-1};
std::atomic<std::size_t> warning_msg_size_{10};
std::atomic<State> state_{kUninitialized};
std::recursive_mutex local_mtx_;
Expand Down
18 changes: 18 additions & 0 deletions myframe/mailbox.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ const std::shared_ptr<const Msg> Mailbox::PopRecv() {
}

void Mailbox::MoveToRun() {
if (run_queue_size_ > 0) {
auto it = recv_.begin();
for (size_t i = 0;
i < static_cast<std::size_t>(run_queue_size_) && it != recv_.end();
++i) {
++it;
}
run_.splice(run_.begin(), recv_, recv_.begin(), it);
return;
}
run_.splice(run_.end(), recv_);
}

Expand Down Expand Up @@ -118,6 +128,14 @@ int Mailbox::GetPendingQueueSize() const {
return pending_queue_size_;
}

void Mailbox::SetRunQueueSize(int sz) {
run_queue_size_ = sz;
}

int Mailbox::GetRunQueueSize() const {
return run_queue_size_;
}

std::list<std::shared_ptr<Msg>>* Mailbox::GetRecvList() {
return &recv_;
}
Expand Down
7 changes: 5 additions & 2 deletions myframe/mailbox.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ class MYFRAME_EXPORT Mailbox final {
bool RunEmpty() const;
int RunSize() const;
const std::shared_ptr<const Msg> PopRun();
void SetPendingQueueSize(int sz);
int GetPendingQueueSize() const;
void SetRunQueueSize(int sz);
int GetRunQueueSize() const;

/// 收件箱
int RecvSize() const;
bool RecvEmpty() const;
void SetPendingQueueSize(int sz);
int GetPendingQueueSize() const;

private:
/// 收件箱
Expand All @@ -70,6 +72,7 @@ class MYFRAME_EXPORT Mailbox final {
std::list<std::shared_ptr<Msg>> send_;
std::list<std::shared_ptr<Msg>> run_;
int pending_queue_size_{-1};
int run_queue_size_{-1};
};

MYFRAME_EXPORT std::ostream& operator<<(
Expand Down

0 comments on commit eee891f

Please sign in to comment.