Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v0.9.4 #34

Merged
merged 13 commits into from
Dec 6, 2024
Merged
2 changes: 1 addition & 1 deletion .github/workflows/macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on: [push, pull_request]
jobs:
build-macos:
name: AppleClang-C++${{matrix.std}}-${{matrix.build_type}}
runs-on: macos-12
runs-on: macos-latest
permissions:
actions: read
contents: read
Expand Down
4 changes: 2 additions & 2 deletions 3rd/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ project(myframe_deps VERSION 1.0.0)

include(ExternalProject)

set(DEPS_SOURCE_DIR ${CMAKE_SOURCE_DIR}/src)
set(DEPS_DOWNLOAD_DIR ${CMAKE_SOURCE_DIR}/pkg)
set(DEPS_SOURCE_DIR ${CMAKE_SOURCE_DIR}/src CACHE PATH "package source dir")
set(DEPS_DOWNLOAD_DIR ${CMAKE_SOURCE_DIR}/pkg CACHE PATH "package tgz dir")

ExternalProject_Add(
gflags
Expand Down
17 changes: 6 additions & 11 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ cmake_minimum_required(VERSION 3.10)
if (POLICY CMP0091)
cmake_policy(SET CMP0091 NEW)
endif()
project(myframe VERSION 0.9.3)
project(myframe VERSION 0.9.4)

### option
option(MYFRAME_USE_CV "Using conditional variables for thread communication" ON)
option(MYFRAME_INSTALL_DEPS "Install deps" OFF)
option(MYFRAME_GENERATE_EXAMPLE "Generate example library" ON)
option(MYFRAME_GENERATE_TEST "Generate test executable program" ON)

Expand All @@ -26,6 +25,11 @@ else()
message(STATUS "Set default cxx standard 17")
endif()
add_compile_options("$<$<CXX_COMPILER_ID:MSVC>:/source-charset:utf-8>")
if (MSVC)
add_definitions(
-wd4251
)
endif (MSVC)

### install path
if (CMAKE_INSTALL_PREFIX_INITIALIZED_TO_DEFAULT)
Expand Down Expand Up @@ -72,15 +76,6 @@ install(PROGRAMS
install(DIRECTORY templates DESTINATION .)
install(DIRECTORY DESTINATION ${MYFRAME_LOG_DIR})
install(DIRECTORY DESTINATION ${MYFRAME_SERVICE_DIR})
if (MYFRAME_INSTALL_DEPS)
if (${CMAKE_VERSION} VERSION_GREATER_EQUAL "3.21")
install(IMPORTED_RUNTIME_ARTIFACTS jsoncpp_lib)
install(IMPORTED_RUNTIME_ARTIFACTS glog::glog)
install(IMPORTED_RUNTIME_ARTIFACTS gflags)
else()
message(WARNING "Can not install deps lib, cmake version ${CMAKE_VERSION} < 3.21")
endif()
endif()

### package
include(Packing)
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ worker自驱动,可以通过消息与actor交互;
```sh
# 下载/构建/安装依赖库
cd 3rd
cmake -S . -B build -DCMAKE_INSTALL_PREFIX=../output
cmake --build build -j
cmake -S . -B build -DCMAKE_INSTALL_PREFIX="../output"
cmake --build build -j --config Release
# 构建安装
cd ..
cmake -S . -B build -DCMAKE_INSTALL_PREFIX=./output -DCMAKE_PREFIX_PATH=./output
cmake -S . -B build -DCMAKE_INSTALL_PREFIX="./output" -DCMAKE_PREFIX_PATH="./output"
cmake --build build -j --config Release --target install
```

Expand Down
2 changes: 1 addition & 1 deletion examples/example_worker_actor_interactive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ExampleWorkerInteractive : public myframe::Worker {
return;
}
while (1) {
const auto msg = mailbox->PopRecv();
const auto msg = mailbox->PopRun();
if (msg == nullptr) {
break;
}
Expand Down
4 changes: 2 additions & 2 deletions examples/example_worker_interactive_with_3rd_frame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ class ExampleWorkerInteractiveWith3rdFrame : public myframe::Worker {
recv_run_flag_ = true;
} else if (cmd == myframe::CmdChannel::Cmd::kRunWithMsg) {
auto mailbox = GetMailbox();
while (!mailbox->RecvEmpty()) {
const auto msg = mailbox->PopRecv();
while (!mailbox->RunEmpty()) {
const auto msg = mailbox->PopRun();
// 接收到其它组件消息
LOG(INFO) << "get main " << msg->GetData();
}
Expand Down
4 changes: 2 additions & 2 deletions examples/example_worker_publish.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ class ExampleWorkerPublic : public myframe::Worker {
return;
}
auto mailbox = GetMailbox();
while (!mailbox->RecvEmpty()) {
const auto msg = mailbox->PopRecv();
while (!mailbox->RunEmpty()) {
const auto msg = mailbox->PopRun();
// send msg by udp/tcp/zmq/...
LOG(INFO) << "public msg " << msg->GetData() << " ...";
}
Expand Down
6 changes: 3 additions & 3 deletions myframe/actor_context_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ bool ActorContextManager::RegContext(std::shared_ptr<ActorContext> ctx) {
std::shared_ptr<ActorContext> ActorContextManager::GetContext(
const std::string& actor_name) {
std::shared_lock<std::shared_mutex> lk(rw_);
if (ctxs_.find(actor_name) == ctxs_.end()) {
auto p = ctxs_.find(actor_name);
if (p == ctxs_.end()) {
LOG(WARNING) << "not found " << actor_name;
return nullptr;
}
auto ctx = ctxs_[actor_name];
return ctx;
return p->second;
}

std::vector<std::string> ActorContextManager::GetAllActorAddr() {
Expand Down
133 changes: 78 additions & 55 deletions myframe/app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ App::App()
, actor_ctx_mgr_(new ActorContextManager())
, ev_mgr_(new EventManager())
, ev_conn_mgr_(new EventConnManager(ev_mgr_, poller_))
, worker_ctx_mgr_(new WorkerContextManager(ev_mgr_))
{}
, worker_ctx_mgr_(new WorkerContextManager(ev_mgr_)) {
LOG(INFO) << "myframe version: " << MYFRAME_VERSION;
}

App::~App() {
LOG(INFO) << "app deconstruct";
Expand All @@ -63,7 +64,7 @@ bool App::Init(
int thread_pool_size,
int event_conn_size,
int warning_msg_size) {
if (!quit_.load()) {
if (state_.load() != kUninitialized) {
return true;
}

Expand All @@ -73,10 +74,11 @@ bool App::Init(
ret &= poller_->Init();
ret &= worker_ctx_mgr_->Init(warning_msg_size);
ret &= ev_conn_mgr_->Init(event_conn_size);

state_.store(kInitialized);

ret &= StartCommonWorker(thread_pool_size);
ret &= StartTimerWorker();

quit_.store(false);
return ret;
}

Expand Down Expand Up @@ -119,6 +121,10 @@ bool App::LoadServiceFromJsonStr(const std::string& service) {
}

bool App::LoadServiceFromJson(const Json::Value& service) {
if (state_.load() == kUninitialized) {
LOG(ERROR) << "not init, please call Init() before LoadServiceFromJson()";
return false;
}
if (service.isNull()) {
LOG(ERROR) << "parse service json failed, skip";
return false;
Expand Down Expand Up @@ -265,6 +271,10 @@ bool App::AddActor(
const std::string& params,
std::shared_ptr<Actor> actor,
const Json::Value& config) {
if (state_.load() == kUninitialized) {
LOG(ERROR) << "not init, please call Init() before AddActor()";
return false;
}
actor->SetInstName(inst_name);
actor->SetConfig(config);
return CreateActorContext(actor, params);
Expand All @@ -274,6 +284,10 @@ bool App::AddWorker(
const std::string& inst_name,
std::shared_ptr<Worker> worker,
const Json::Value& config) {
if (state_.load() == kUninitialized) {
LOG(ERROR) << "not init, please call Init() before AddWorker()";
return false;
}
auto worker_ctx = std::make_shared<WorkerContext>(
shared_from_this(), worker, poller_);
worker->SetInstName(inst_name);
Expand All @@ -300,6 +314,10 @@ bool App::AddWorker(
}

int App::Send(std::shared_ptr<Msg> msg) {
if (state_.load() == kUninitialized) {
LOG(ERROR) << "not init, please call Init() before Send()";
return -1;
}
auto conn = ev_conn_mgr_->Alloc();
if (conn == nullptr) {
LOG(ERROR) << "alloc conn event failed";
Expand All @@ -308,12 +326,17 @@ int App::Send(std::shared_ptr<Msg> msg) {
poller_->Add(conn);
auto ret = conn->Send(std::move(msg));
poller_->Del(conn);
// 主动释放
ev_conn_mgr_->Release(std::move(conn));
return ret;
}

const std::shared_ptr<const Msg> App::SendRequest(
std::shared_ptr<Msg> msg) {
if (state_.load() == kUninitialized) {
LOG(ERROR) << "not init, please call Init() before SendRequest()";
return nullptr;
}
auto conn = ev_conn_mgr_->Alloc();
if (conn == nullptr) {
LOG(ERROR) << "alloc conn event failed";
Expand All @@ -322,10 +345,15 @@ const std::shared_ptr<const Msg> App::SendRequest(
poller_->Add(conn);
auto resp = conn->SendRequest(std::move(msg));
poller_->Del(conn);
ev_conn_mgr_->Release(std::move(conn));
// 不需要调用ev_conn_mgr_->Release()
// 系统会主动释放
return resp;
}

std::unique_ptr<ModManager>& App::GetModManager() {
return mods_;
}

/**
* 创建一个新的actor:
* 1. 从ModManager中获得对应模块对象
Expand Down Expand Up @@ -375,26 +403,29 @@ bool App::CreateActorContext(
std::lock_guard<std::recursive_mutex> lock(local_mtx_);
// 接收缓存中发给自己的消息
for (auto it = cache_msgs_.begin(); it != cache_msgs_.end();) {
if (it->msg->GetDst() == actor_name) {
if ((*it)->GetDst() == actor_name) {
LOG(INFO) << actor_name
<< " recv msg from cache " << *(it->msg);
DispatchMsg(it->msg);
<< " recv msg from cache " << *(it);
DispatchMsg(*it);
it = cache_msgs_.erase(it);
continue;
}
++it;
}
// 目的地址不存在的暂时放到缓存消息队列
auto send_list = ctx->GetMailbox()->GetSendList();
for (auto it = send_list->begin(); it != send_list->end();) {
if (!HasUserInst((*it)->GetDst())) {
LOG(WARNING) << "can't found " << (*it)->GetDst()
<< ", cache this msg";
cache_msgs_.emplace_back(0, *it);
it = send_list->erase(it);
continue;
// 在运行时不再缓存,直接分发
if (state_.load() != kRunning) {
auto send_list = ctx->GetMailbox()->GetSendList();
for (auto it = send_list->begin(); it != send_list->end();) {
if (!HasUserInst((*it)->GetDst())) {
LOG(WARNING) << "can't found " << (*it)->GetDst()
<< ", cache this msg";
cache_msgs_.push_back(*it);
it = send_list->erase(it);
continue;
}
++it;
}
++it;
}
// 分发目的地址已经存在的消息
DispatchMsg(ctx);
Expand Down Expand Up @@ -429,16 +460,6 @@ bool App::StartTimerWorker() {
return true;
}

void App::DispatchMsg(std::list<std::shared_ptr<Msg>>* msg_list) {
LOG_IF(WARNING,
msg_list->size() > warning_msg_size_.load())
<< " dispatch msg too many";
for (auto& msg : (*msg_list)) {
DispatchMsg(std::move(msg));
}
msg_list->clear();
}

void App::DispatchMsg(std::shared_ptr<Msg> msg) {
std::lock_guard<std::recursive_mutex> lock(local_mtx_);
VLOG(1) << *msg;
Expand Down Expand Up @@ -483,6 +504,16 @@ void App::DispatchMsg(std::shared_ptr<Msg> msg) {
}
}

void App::DispatchMsg(std::list<std::shared_ptr<Msg>>* msg_list) {
LOG_IF(WARNING,
msg_list->size() > warning_msg_size_.load())
<< " dispatch msg too many";
for (auto& msg : (*msg_list)) {
DispatchMsg(std::move(msg));
}
msg_list->clear();
}

// 将获得的消息分发给其他actor
void App::DispatchMsg(std::shared_ptr<ActorContext> context) {
if (nullptr == context) {
Expand Down Expand Up @@ -511,16 +542,17 @@ void App::CheckStopWorkers() {
<< actor_ctx->GetActor()->GetActorName()
<< " dispatch msg to "
<< *worker_ctx;
auto msg_list = actor_ctx->GetMailbox()->GetRecvList();
if (!msg_list->empty()) {
auto actor_mailbox = actor_ctx->GetMailbox();
std::size_t actor_ctx_recv_sz = actor_mailbox->RecvSize();
if (!actor_mailbox->RecvEmpty()) {
LOG_IF(WARNING,
msg_list->size() > warning_msg_size_.load())
actor_ctx_recv_sz > warning_msg_size_.load())
<< actor_ctx->GetActor()->GetActorName()
<< " recv msg size too many: " << msg_list->size();
<< " recv msg size too many: " << actor_ctx_recv_sz;
VLOG(1) << "run " << actor_ctx->GetActor()->GetActorName();
worker_ctx->GetMailbox()->Recv(msg_list);
actor_mailbox->MoveToRun();
VLOG(1) << actor_ctx->GetActor()->GetActorName()
<< " has " << worker_ctx->GetMailbox()->RecvSize()
<< " has " << actor_ctx_recv_sz
<< " msg need process";
worker_ctx_mgr_->PopFrontIdleWorker();
auto common_idle_worker = worker_ctx->GetWorker<WorkerCommon>();
Expand Down Expand Up @@ -717,48 +749,39 @@ void App::ProcessEvent(const std::vector<ev_handle_t>& evs) {
}
}

void App::ProcessCacheMsg() {
for (auto it = cache_msgs_.begin(); it != cache_msgs_.end();) {
if (it->search_count >= default_search_count_) {
VLOG(1) << *(it->msg) << " search count: " << it->search_count
<< ", dispatch it";
DispatchMsg(it->msg);
it = cache_msgs_.erase(it);
} else {
it->search_count++;
++it;
}
}
}

int App::Exec() {
if (state_.load() == kUninitialized) {
LOG(ERROR) << "not init, please call Init() before Exec()";
return -1;
}
int time_wait_ms = 100;
std::vector<ev_handle_t> evs;

state_.store(kRunning);
/// 处理初始化中缓存消息
DispatchMsg(&cache_msgs_);
while (worker_ctx_mgr_->WorkerSize()) {
/// 检查空闲线程队列是否有空闲线程,如果有就找到一个有消息的actor处理
CheckStopWorkers();
/// 等待事件
poller_->Wait(&evs, time_wait_ms);
/// 处理缓存消息
ProcessCacheMsg();
/// 处理事件
ProcessEvent(evs);
}

// quit App
worker_ctx_mgr_->WaitAllWorkerQuit();
quit_.store(true);
state_.store(kQuit);
LOG(INFO) << "app exit exec";
return 0;
}

void App::Quit() {
// wait worker stop
if (quit_.load()) {
return;
if (state_.load() == kRunning
|| state_.load() == kInitialized) {
state_.store(kQuitting);
worker_ctx_mgr_->StopAllWorker();
}
worker_ctx_mgr_->StopAllWorker();
}

bool App::HasUserInst(const std::string& name) {
Expand Down
Loading
Loading