Skip to content

Commit

Permalink
Optimize pointer access2
Browse files Browse the repository at this point in the history
  • Loading branch information
lkpworkspace committed Jun 6, 2024
1 parent c272690 commit 2ca2262
Show file tree
Hide file tree
Showing 13 changed files with 42 additions and 38 deletions.
2 changes: 1 addition & 1 deletion myframe/actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ bool Actor::Subscribe(const std::string& addr, const std::string& msg_type) {
msg->SetType("SUBSCRIBE");
msg->SetDesc(msg_type);
auto mailbox = ctx_->GetMailbox();
mailbox->Send(addr, msg);
mailbox->Send(addr, std::move(msg));
return true;
}

Expand Down
26 changes: 14 additions & 12 deletions myframe/actor_context_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ void ActorContextManager::DispatchMsg(
return;
}
auto mailbox = ctx->GetMailbox();
mailbox->Recv(msg);
PushContext(ctx);
mailbox->Recv(std::move(msg));
PushContext(std::move(ctx));
}

bool ActorContextManager::RegContext(std::shared_ptr<ActorContext> ctx) {
Expand All @@ -46,7 +46,7 @@ bool ActorContextManager::RegContext(std::shared_ptr<ActorContext> ctx) {
return false;
}
LOG(INFO) << "reg actor " << ctx->GetActor()->GetActorName();
ctxs_[ctx->GetActor()->GetActorName()] = ctx;
ctxs_[ctx->GetActor()->GetActorName()] = std::move(ctx);
return true;
}

Expand Down Expand Up @@ -76,6 +76,9 @@ bool ActorContextManager::HasActor(const std::string& name) {
}

void ActorContextManager::PrintWaitQueue() {
if (!VLOG_IS_ON(1)) {
return;
}
VLOG(1) << "cur wait queue actor:";
auto it = wait_queue_.begin();
while (it != wait_queue_.end()) {
Expand All @@ -94,7 +97,7 @@ std::shared_ptr<ActorContext> ActorContextManager::GetContextWithMsg() {
return nullptr;
}

std::vector<std::shared_ptr<ActorContext>> in_runing_context;
std::list<std::weak_ptr<ActorContext>> in_runing_context;
std::shared_ptr<ActorContext> ret = nullptr;
while (!wait_queue_.empty()) {
if (wait_queue_.front().expired()) {
Expand All @@ -104,20 +107,19 @@ std::shared_ptr<ActorContext> ActorContextManager::GetContextWithMsg() {
auto ctx = wait_queue_.front().lock();
if (ctx->IsRuning()) {
wait_queue_.pop_front();
in_runing_context.push_back(ctx);
VLOG(1) << ctx->GetActor()->GetActorName()
<< " is runing, move to wait queue back";
in_runing_context.push_back(std::move(ctx));
} else {
wait_queue_.pop_front();

ctx->SetRuningFlag(true);
ctx->SetWaitQueueFlag(false);
ret = ctx;
ret.swap(ctx);
break;
}
}
for (std::size_t i = 0; i < in_runing_context.size(); ++i) {
VLOG(1) << in_runing_context[i]->GetActor()->GetActorName()
<< " is runing, move to wait queue back";
wait_queue_.push_back(in_runing_context[i]);
if (!in_runing_context.empty()) {
wait_queue_.splice(wait_queue_.end(), in_runing_context);
}
return ret;
}
Expand All @@ -129,7 +131,7 @@ void ActorContextManager::PushContext(std::shared_ptr<ActorContext> ctx) {
return;
}
ctx->SetWaitQueueFlag(true);
wait_queue_.push_back(ctx);
wait_queue_.push_back(std::move(ctx));
PrintWaitQueue();
}

Expand Down
14 changes: 8 additions & 6 deletions myframe/app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,9 @@ int App::Send(std::shared_ptr<Msg> msg) {
return -1;
}
poller_->Add(conn);
auto ret = conn->Send(msg);
auto ret = conn->Send(std::move(msg));
poller_->Del(conn);
ev_conn_mgr_->Release(conn);
ev_conn_mgr_->Release(std::move(conn));
return ret;
}

Expand All @@ -320,9 +320,9 @@ const std::shared_ptr<const Msg> App::SendRequest(
return nullptr;
}
poller_->Add(conn);
auto resp = conn->SendRequest(msg);
auto resp = conn->SendRequest(std::move(msg));
poller_->Del(conn);
ev_conn_mgr_->Release(conn);
ev_conn_mgr_->Release(std::move(conn));
return resp;
}

Expand Down Expand Up @@ -434,7 +434,7 @@ void App::DispatchMsg(std::list<std::shared_ptr<Msg>>* msg_list) {
msg_list->size() > warning_msg_size_.load())
<< " dispatch msg too many";
for (auto& msg : (*msg_list)) {
DispatchMsg(msg);
DispatchMsg(std::move(msg));
}
msg_list->clear();
}
Expand All @@ -444,7 +444,7 @@ void App::DispatchMsg(std::shared_ptr<Msg> msg) {
VLOG(1) << *msg;
/// 处理框架消息
if (msg->GetDst() == MAIN_ADDR) {
ProcessMain(msg);
ProcessMain(std::move(msg));
return;
}
/// 消息分发
Expand All @@ -463,12 +463,14 @@ void App::DispatchMsg(std::shared_ptr<Msg> msg) {
actor_ctx_mgr_->DispatchMsg(msg);
} else if (name_list[0] == "event") {
if (name_list[1] == "conn") {
// dispatch to event conn
auto handle = ev_mgr_->ToHandle(msg->GetDst());
ev_conn_mgr_->Notify(handle, msg);
} else {
LOG(ERROR) << "Unknown msg " << *msg;
}
} else if (!node_addr_.empty()) {
// dispatch to node
if (node_addr_.substr(0, 5) == "actor") {
actor_ctx_mgr_->DispatchMsg(msg, node_addr_);
} else if (node_addr_.substr(0, 6) == "worker") {
Expand Down
2 changes: 1 addition & 1 deletion myframe/app.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class MYFRAME_EXPORT App final : public std::enable_shared_from_this<App> {
struct CacheMsg {
CacheMsg(int c, std::shared_ptr<Msg> m)
: search_count(c)
, msg(m) {}
, msg(std::move(m)) {}
int search_count{0};
std::shared_ptr<Msg> msg{nullptr};
};
Expand Down
2 changes: 1 addition & 1 deletion myframe/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ std::vector<std::string> Common::SplitMsgName(const std::string& name) {
std::string item;
std::stringstream ss(name);
while (std::getline(ss, item, '.')) {
name_list.push_back(item);
name_list.push_back(std::move(item));
}
return name_list;
}
Expand Down
4 changes: 2 additions & 2 deletions myframe/event_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ int EventConn::Send(std::shared_ptr<Msg> msg) {
if (msg->GetDst().empty()) {
return -1;
}
mailbox_.Send(msg);
mailbox_.Send(std::move(msg));
cmd_channel_->SendToMain(CmdChannel::Cmd::kRun);
CmdChannel::Cmd cmd;
return cmd_channel_->RecvFromMain(&cmd);
Expand All @@ -60,7 +60,7 @@ const std::shared_ptr<const Msg> EventConn::SendRequest(
if (req->GetDst().empty()) {
return nullptr;
}
mailbox_.Send(req);
mailbox_.Send(std::move(req));
cmd_channel_->SendToMain(CmdChannel::Cmd::kRunWithMsg);
CmdChannel::Cmd cmd;
cmd_channel_->RecvFromMain(&cmd);
Expand Down
6 changes: 3 additions & 3 deletions myframe/event_conn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void EventConnManager::AddEventConn() {
auto conn = std::make_shared<EventConn>(poller_);
std::string name = "event.conn." + std::to_string(conn_sz_);
conn->GetMailbox()->SetAddr(name);
idle_conn_.emplace_back(conn);
idle_conn_.push_back(std::move(conn));
conn_sz_++;
}

Expand All @@ -62,7 +62,7 @@ void EventConnManager::Release(std::shared_ptr<EventConn> ev) {
ev_mgr_->Del(ev);
// add to idle_conn
std::lock_guard<std::mutex> g(mtx_);
idle_conn_.emplace_back(ev);
idle_conn_.push_back(std::move(ev));
}

// call by main frame
Expand All @@ -80,7 +80,7 @@ void EventConnManager::Notify(
return;
}
// push msg to event_conn
ev->GetMailbox()->Recv(msg);
ev->GetMailbox()->Recv(std::move(msg));
// send cmd to event_conn
auto cmd_channel = ev->GetCmdChannel();
cmd_channel->SendToOwner(CmdChannel::Cmd::kIdle);
Expand Down
4 changes: 2 additions & 2 deletions myframe/event_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ std::vector<std::shared_ptr<Event>> EventManager::Get(
return tmp_evs;
}

bool EventManager::Add(std::shared_ptr<Event> ev) {
bool EventManager::Add(const std::shared_ptr<Event>& ev) {
auto handle = ev->GetHandle();
std::unique_lock<std::shared_mutex> lk(rw_);
if (evs_.find(handle) != evs_.end()) {
Expand All @@ -64,7 +64,7 @@ bool EventManager::Add(std::shared_ptr<Event> ev) {
return true;
}

bool EventManager::Del(std::shared_ptr<Event> ev) {
bool EventManager::Del(const std::shared_ptr<Event>& ev) {
auto handle = ev->GetHandle();
std::unique_lock<std::shared_mutex> lk(rw_);
if (evs_.find(handle) == evs_.end()) {
Expand Down
4 changes: 2 additions & 2 deletions myframe/event_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ class EventManager final {
ev_handle_t ToHandle(const std::string&);

private:
bool Add(std::shared_ptr<Event>);
bool Del(std::shared_ptr<Event>);
bool Add(const std::shared_ptr<Event>&);
bool Del(const std::shared_ptr<Event>&);

std::shared_mutex rw_;
std::unordered_map<std::string, ev_handle_t> name_handle_map_;
Expand Down
8 changes: 4 additions & 4 deletions myframe/mailbox.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,23 @@ void Mailbox::SendClear() {
}

void Mailbox::Send(std::shared_ptr<Msg> msg) {
send_.emplace_back(msg);
send_.push_back(std::move(msg));
}

void Mailbox::Send(
const std::string& dst,
std::shared_ptr<Msg> msg) {
msg->SetSrc(addr_);
msg->SetDst(dst);
Send(msg);
Send(std::move(msg));
}

void Mailbox::Send(
const std::string& dst,
const std::any& data) {
auto msg = std::make_shared<Msg>();
msg->SetAnyData(data);
Send(dst, msg);
Send(dst, std::move(msg));
}

void Mailbox::Send(std::list<std::shared_ptr<Msg>>* msg_list) {
Expand All @@ -70,7 +70,7 @@ void Mailbox::RecvClear() {
}

void Mailbox::Recv(std::shared_ptr<Msg> msg) {
recv_.emplace_back(msg);
recv_.push_back(std::move(msg));
}

void Mailbox::Recv(std::list<std::shared_ptr<Msg>>* msg_list) {
Expand Down
2 changes: 1 addition & 1 deletion myframe/worker_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ std::list<std::shared_ptr<Msg>>* WorkerContext::GetCache() {
}

void WorkerContext::Cache(std::shared_ptr<Msg> msg) {
cache_.emplace_back(msg);
cache_.push_back(std::move(msg));
}

void WorkerContext::Cache(std::list<std::shared_ptr<Msg>>* msg_list) {
Expand Down
4 changes: 2 additions & 2 deletions myframe/worker_context_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void WorkerContextManager::PopFrontIdleWorker() {
void WorkerContextManager::PushBackIdleWorker(
std::shared_ptr<WorkerContext> worker) {
std::unique_lock<std::shared_mutex> lk(rw_);
idle_workers_ctx_.emplace_back(worker);
idle_workers_ctx_.push_back(std::move(worker));
}

std::vector<std::string> WorkerContextManager::GetAllUserWorkerAddr() {
Expand Down Expand Up @@ -163,7 +163,7 @@ void WorkerContextManager::DispatchWorkerMsg(
LOG(WARNING) << worker_name << " unsupport recv msg, drop it";
return;
}
worker_ctx->Cache(msg);
worker_ctx->Cache(std::move(msg));
LOG_IF(WARNING,
worker_ctx->CacheSize() > warning_msg_size_.load())
<< *worker_ctx << " has " << worker_ctx->CacheSize()
Expand Down
2 changes: 1 addition & 1 deletion myframe/worker_timer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ void TimerManager::_Dispath(List* cur) {
msg->SetDesc(timer->timer_name_);
msg->SetType("TIMER");
delete begin;
timeout_list_.emplace_back(msg);
timeout_list_.push_back(std::move(msg));
begin = temp;
}
}
Expand Down

0 comments on commit 2ca2262

Please sign in to comment.