Skip to content

Commit

Permalink
fix: loop dispatch cache msg
Browse files Browse the repository at this point in the history
  • Loading branch information
lkpworkspace committed Jun 19, 2024
1 parent 4033dca commit 52416c6
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 50 deletions.
65 changes: 27 additions & 38 deletions myframe/app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -400,26 +400,29 @@ bool App::CreateActorContext(
std::lock_guard<std::recursive_mutex> lock(local_mtx_);
// 接收缓存中发给自己的消息
for (auto it = cache_msgs_.begin(); it != cache_msgs_.end();) {
if (it->msg->GetDst() == actor_name) {
if ((*it)->GetDst() == actor_name) {
LOG(INFO) << actor_name
<< " recv msg from cache " << *(it->msg);
DispatchMsg(it->msg);
<< " recv msg from cache " << *(it);
DispatchMsg(*it);
it = cache_msgs_.erase(it);
continue;
}
++it;
}
// 目的地址不存在的暂时放到缓存消息队列
auto send_list = ctx->GetMailbox()->GetSendList();
for (auto it = send_list->begin(); it != send_list->end();) {
if (!HasUserInst((*it)->GetDst())) {
LOG(WARNING) << "can't found " << (*it)->GetDst()
<< ", cache this msg";
cache_msgs_.emplace_back(0, *it);
it = send_list->erase(it);
continue;
// 在运行时不再缓存,直接分发
if (state_.load() != kRunning) {
auto send_list = ctx->GetMailbox()->GetSendList();
for (auto it = send_list->begin(); it != send_list->end();) {
if (!HasUserInst((*it)->GetDst())) {
LOG(WARNING) << "can't found " << (*it)->GetDst()
<< ", cache this msg";
cache_msgs_.push_back(*it);
it = send_list->erase(it);
continue;
}
++it;
}
++it;
}
// 分发目的地址已经存在的消息
DispatchMsg(ctx);
Expand Down Expand Up @@ -454,16 +457,6 @@ bool App::StartTimerWorker() {
return true;
}

void App::DispatchMsg(std::list<std::shared_ptr<Msg>>* msg_list) {
LOG_IF(WARNING,
msg_list->size() > warning_msg_size_.load())
<< " dispatch msg too many";
for (auto& msg : (*msg_list)) {
DispatchMsg(std::move(msg));
}
msg_list->clear();
}

void App::DispatchMsg(std::shared_ptr<Msg> msg) {
std::lock_guard<std::recursive_mutex> lock(local_mtx_);
VLOG(1) << *msg;
Expand Down Expand Up @@ -508,6 +501,16 @@ void App::DispatchMsg(std::shared_ptr<Msg> msg) {
}
}

void App::DispatchMsg(std::list<std::shared_ptr<Msg>>* msg_list) {
LOG_IF(WARNING,
msg_list->size() > warning_msg_size_.load())
<< " dispatch msg too many";
for (auto& msg : (*msg_list)) {
DispatchMsg(std::move(msg));
}
msg_list->clear();
}

// 将获得的消息分发给其他actor
void App::DispatchMsg(std::shared_ptr<ActorContext> context) {
if (nullptr == context) {
Expand Down Expand Up @@ -742,20 +745,6 @@ void App::ProcessEvent(const std::vector<ev_handle_t>& evs) {
}
}

void App::ProcessCacheMsg() {
for (auto it = cache_msgs_.begin(); it != cache_msgs_.end();) {
if (it->search_count >= default_search_count_) {
VLOG(1) << *(it->msg) << " search count: " << it->search_count
<< ", dispatch it";
DispatchMsg(it->msg);
it = cache_msgs_.erase(it);
} else {
it->search_count++;
++it;
}
}
}

int App::Exec() {
if (state_.load() == kUninitialized) {
LOG(ERROR) << "not init, please call Init() before Exec()";
Expand All @@ -764,13 +753,13 @@ int App::Exec() {
int time_wait_ms = 100;
std::vector<ev_handle_t> evs;
state_.store(kRunning);
/// 处理初始化中缓存消息
DispatchMsg(&cache_msgs_);
while (worker_ctx_mgr_->WorkerSize()) {
/// 检查空闲线程队列是否有空闲线程,如果有就找到一个有消息的actor处理
CheckStopWorkers();
/// 等待事件
poller_->Wait(&evs, time_wait_ms);
/// 处理缓存消息
ProcessCacheMsg();
/// 处理事件
ProcessEvent(evs);
}
Expand Down
15 changes: 3 additions & 12 deletions myframe/app.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ class MYFRAME_EXPORT App final : public std::enable_shared_from_this<App> {
/// 通知执行事件
void CheckStopWorkers();

/// 分发事件
/// 分发消息
void DispatchMsg(std::shared_ptr<Msg> msg);
void DispatchMsg(std::list<std::shared_ptr<Msg>>* msg_list);
void DispatchMsg(std::shared_ptr<ActorContext> context);
void ProcessCacheMsg();
/// 处理事件
void ProcessEvent(const std::vector<ev_handle_t>& evs);
void ProcessWorkerEvent(std::shared_ptr<WorkerContext>);
void ProcessTimerEvent(std::shared_ptr<WorkerContext>);
Expand All @@ -143,16 +143,7 @@ class MYFRAME_EXPORT App final : public std::enable_shared_from_this<App> {
std::atomic<State> state_{kUninitialized};
std::recursive_mutex local_mtx_;
/// 缓存消息列表
struct CacheMsg {
CacheMsg(int c, std::shared_ptr<Msg> m)
: search_count(c)
, msg(std::move(m)) {}
int search_count{0};
std::shared_ptr<Msg> msg{nullptr};
};
int default_search_count_{3};
std::vector<CacheMsg> cache_msgs_;

std::list<std::shared_ptr<Msg>> cache_msgs_;
/// 模块管理对象
std::unique_ptr<ModManager> mods_;
/// poller
Expand Down

0 comments on commit 52416c6

Please sign in to comment.