From 9f481ddf96bdb5406ab5f0197f7c02bede757185 Mon Sep 17 00:00:00 2001 From: chenyushuo <297086016@qq.com> Date: Mon, 2 Sep 2024 14:52:21 +0800 Subject: [PATCH] fix in sem --- examples/distributed_simulation/main.py | 5 +- .../distributed_simulation/participant.py | 51 ++- src/agentscope/cpp_server/worker.cc | 345 +++++++++--------- src/agentscope/cpp_server/worker.h | 68 +++- 4 files changed, 246 insertions(+), 223 deletions(-) diff --git a/examples/distributed_simulation/main.py b/examples/distributed_simulation/main.py index 93f5141a9..70f031502 100644 --- a/examples/distributed_simulation/main.py +++ b/examples/distributed_simulation/main.py @@ -102,6 +102,7 @@ def run_main_process( agent_type: str = "random", max_value: int = 100, sleep_time: float = 1.0, + cpp_server: bool = False, ) -> None: """Run main process""" agentscope.init( @@ -127,7 +128,7 @@ def run_main_process( port_id = idx % server_per_host model_id = i % model_per_host host = hosts[host_id] - port = base_port + port_id + port = base_port + port_id if not cpp_server else base_port config_name = f"model_{model_id + 1}" if agent_type == "random": configs.append( @@ -166,7 +167,7 @@ def run_main_process( * participant_per_moderator ], host=hosts[i // moderator_per_host], - port=base_port + server_per_host + i % moderator_per_host, + port=base_port + server_per_host + i % moderator_per_host if not cpp_server else base_port, agent_type=agent_type, max_value=max_value, sleep_time=sleep_time, diff --git a/examples/distributed_simulation/participant.py b/examples/distributed_simulation/participant.py index f017d4de3..8baeeb8b6 100644 --- a/examples/distributed_simulation/participant.py +++ b/examples/distributed_simulation/participant.py @@ -4,6 +4,7 @@ import time import re from typing import Optional, Union, Sequence +import concurrent.futures from loguru import logger @@ -111,9 +112,8 @@ def __init__( ) -> None: super().__init__(name) self.max_value = max_value - if agent_type == "llm": - self.participants = [ - LLMParticipant( + def create_llm_participant(config): + return LLMParticipant( name=config["name"], model_config_name=config["model_config_name"], max_value=max_value, @@ -121,36 +121,35 @@ def __init__( host=config["host"], port=config["port"], ) - for config in part_configs - ] - else: - self.participants = [ - RandomParticipant( - name=config["name"], - max_value=max_value, - sleep_time=sleep_time, - ).to_dist( - host=config["host"], - port=config["port"], - ) - for config in part_configs - ] + + def create_random_participant(config): + return RandomParticipant( + name=config["name"], + max_value=max_value, + sleep_time=sleep_time, + ).to_dist( + host=config["host"], + port=config["port"], + ) + create_participant = {"random": create_random_participant, "llm": create_llm_participant}[agent_type] + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = {executor.submit(create_participant, config) for config in part_configs} + + self.participants = [] + for future in concurrent.futures.as_completed(futures): + result = future.result() + self.participants.append(result) def reply(self, x: Optional[Union[Msg, Sequence[Msg]]] = None) -> Msg: - results = [] msg = Msg( name="moderator", role="user", content=f"Now give a number between 0 and {self.max_value}.", ) - for p in self.participants: - results.append(p(msg)) - summ = 0 - for r in results: - try: - summ += int(r["content"]) - except Exception as e: - print(e) + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = {executor.submit(lambda p: p(msg), p) for p in self.participants} + futures_2 = {executor.submit(lambda r: int(r["content"]), future.result()) for future in concurrent.futures.as_completed(futures)} + summ = sum(future.result() for future in concurrent.futures.as_completed(futures_2)) return Msg( name=self.name, role="assistant", diff --git a/src/agentscope/cpp_server/worker.cc b/src/agentscope/cpp_server/worker.cc index f1cab2ddb..09167477b 100644 --- a/src/agentscope/cpp_server/worker.cc +++ b/src/agentscope/cpp_server/worker.cc @@ -9,6 +9,7 @@ #include #include #include +#include #include "worker.h" @@ -45,6 +46,18 @@ string Task::get_result() return _task_result; } +void P(int semid, unsigned short sem_num) +{ + struct sembuf sb = {sem_num, -1, 0}; + semop(semid, &sb, 1); +} + +void V(int semid, unsigned short sem_num) +{ + struct sembuf sb = {sem_num, 1, 0}; + semop(semid, &sb, 1); +} + Worker::Worker( const string &host, const string &port, @@ -53,81 +66,100 @@ Worker::Worker( const unsigned int max_tasks, const unsigned int max_timeout_seconds, const unsigned int num_workers) : _host(host), _port(port), _server_id(server_id), - _num_workers(std::min(std::max(num_workers, 1u), std::thread::hardware_concurrency())), - _pid(getpid()), + _main_worker_pid(getpid()), + _num_workers(std::max(num_workers, 1u)), _worker_id(-1), - _num_calls(0), - _func_call_shm_prefix("/call_" + port + "_"), + _sem_num_per_sem_id(10000), + _call_shm_size(1024), + _max_call_id(calc_max_call_id()), + _small_obj_size(1000), + _small_obj_shm_size(1024), + _call_worker_shm_name("/call_" + port), _func_args_shm_prefix("/args_" + port + "_"), _func_result_shm_prefix("/result_" + port + "_"), _worker_avail_sem_prefix("/avail_" + port + "_"), _func_ready_sem_prefix("/func_" + port + "_"), - _set_result_sem_prefix("/set_result_" + port + "_"), _small_obj_pool_shm_name("/small_obj_pool_shm_" + port), - _small_obj_pool_filename("./logs/small_obj_pool_" + port), - _call_shm_size(1024), - _small_obj_max_num(100000), - _small_obj_size(1000), - _small_obj_shm_size(1024), + _use_logger(calc_use_logger()), _num_tasks(0), _max_tasks(std::max(max_tasks, 1u)), _max_timeout_seconds(std::max(max_timeout_seconds, 1u)) { py::gil_scoped_release release; - char *use_logger = getenv("AGENTSCOPE_USE_CPP_LOGGER"); - if (use_logger != nullptr && std::string(use_logger) == "True") + struct stat info; + if (stat("./logs/", &info) != 0) { - _use_logger = true; + mkdir("./logs", 0755); } - else + + // init call worker shm + _call_worker_shm_fd = shm_open(_call_worker_shm_name.c_str(), O_CREAT | O_RDWR, 0666); + if (_call_worker_shm_fd == -1) { - _use_logger = false; + perror("Error: shm_open in Worker::Worker()"); + kill(_main_worker_pid, SIGINT); } - struct stat info; - if (stat("./logs/", &info) != 0) + ftruncate(_call_worker_shm_fd, _num_workers * _call_shm_size); + void *call_worker_shm = mmap(NULL, _num_workers * _call_shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, _call_worker_shm_fd, 0); + if (call_worker_shm == MAP_FAILED) { - mkdir("./logs", 0755); + perror("Error: mmap in Worker::Worker()"); + kill(_main_worker_pid, SIGINT); } + _call_worker_shm = (char *)call_worker_shm; + + // init small object pool _small_obj_pool_shm_fd = shm_open(_small_obj_pool_shm_name.c_str(), O_CREAT | O_RDWR, 0666); if (_small_obj_pool_shm_fd == -1) { perror("Error: shm_open in create _small_obj_pool_shm_fd"); - exit(1); + kill(_main_worker_pid, SIGINT); } - ftruncate(_small_obj_pool_shm_fd, _small_obj_max_num * _small_obj_shm_size); - _small_obj_pool_shm = mmap(NULL, _small_obj_max_num * _small_obj_shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, _small_obj_pool_shm_fd, 0); + ftruncate(_small_obj_pool_shm_fd, _max_call_id * _small_obj_shm_size); + _small_obj_pool_shm = mmap(NULL, _max_call_id * _small_obj_shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, _small_obj_pool_shm_fd, 0); if (_small_obj_pool_shm == MAP_FAILED) { perror("Error: mmap in _small_obj_pool_shm"); - exit(1); + kill(_main_worker_pid, SIGINT); } - memset(_small_obj_pool_shm, 0, _small_obj_max_num * _small_obj_shm_size); - _small_obj_pool_fd = open(_small_obj_pool_filename.c_str(), O_RDWR | O_CREAT, 0644); - if (_small_obj_pool_fd == -1) + memset(_small_obj_pool_shm, 0, _max_call_id * _small_obj_shm_size); + for (auto i = 0u; i < _max_call_id; i++) + { + _call_id_pool.push(i); + } + + // init call semaphores + string filename = "./logs/" + _port + ".log"; + int fd = open(filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644); + if (fd == -1) { - perror("Error: open in _small_obj_pool_fd"); - exit(1); + perror("Error: Failed to open file"); + kill(_main_worker_pid, SIGINT); } - ftruncate(_small_obj_pool_fd, _small_obj_max_num * _small_obj_size); - for (int i = 0; i < _num_workers; i++) + close(fd); + unsigned short *_sem_values = new unsigned short[_sem_num_per_sem_id](); + for (int i = 0; i * _sem_num_per_sem_id < _max_call_id; i++) { - string shm_name = _func_call_shm_prefix + to_string(i); - int shm_fd = shm_open(shm_name.c_str(), O_CREAT | O_RDWR, 0666); - if (shm_fd == -1) + key_t key = ftok(filename.c_str(), i); + if (key == -1) { - perror("Error: shm_open in Worker::Worker()"); - exit(1); + perror("Error: ftok in Worker::Worker()"); + kill(_main_worker_pid, SIGINT); } - ftruncate(shm_fd, _call_shm_size); - void *shm = mmap(NULL, _call_shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); - if (shm == MAP_FAILED) + int semid = semget(key, _sem_num_per_sem_id, 0666 | IPC_CREAT); + if (semid == -1) { - perror("Error: mmap in Worker::Worker()"); - exit(1); + perror("Error: semget in Worker::Worker()"); + kill(_main_worker_pid, SIGINT); } - _worker_shm_fds.push_back(shm_fd); - _worker_shms.push_back((char *)shm); + semctl(semid, 0, SETALL, _sem_values); + _call_sem_ids.push_back(semid); + } + delete[] _sem_values; + // launch workers + for (auto i = 0u; i < _num_workers; i++) + { string worker_avail_sem_name = _worker_avail_sem_prefix + to_string(i); string func_ready_sem_name = _func_ready_sem_prefix + to_string(i); sem_t *worker_avail_sem = sem_open(worker_avail_sem_name.c_str(), O_CREAT, 0666, 0); @@ -148,18 +180,17 @@ Worker::Worker( if (fd == -1) { perror("Error: Failed to open file"); - exit(1); + kill(_main_worker_pid, SIGINT); } if (dup2(fd, STDOUT_FILENO) == -1 || dup2(fd, STDERR_FILENO) == -1) { perror("Error: Failed to redirect stdout/stderr"); - exit(1); + kill(_main_worker_pid, SIGINT); } close(fd); - _pid = getpid(); _worker_id = i; - char *shm_ptr = (char *)shm; + char *shm_ptr = _call_worker_shm + i * _call_shm_size; sem_post(worker_avail_sem); while (true) { @@ -224,28 +255,27 @@ Worker::Worker( work.detach(); sem_post(worker_avail_sem); } - exit(0); + kill(_main_worker_pid, SIGINT); } else if (pid < 0) { perror("Error: fork failed in Worker::Worker()"); - exit(1); + kill(_main_worker_pid, SIGINT); } } } Worker::~Worker() // for main process to release resources { - for (auto pid : _worker_pids) - { - kill(pid, SIGINT); - waitpid(pid, NULL, 0); - } + // release call_worker_shm + close(_call_worker_shm_fd); + munmap(_call_worker_shm, _num_workers * _call_shm_size); + + // release small object pool close(_small_obj_pool_shm_fd); - munmap(_small_obj_pool_shm, _small_obj_max_num * _small_obj_shm_size); - shm_unlink(_small_obj_pool_shm_name.c_str()); - close(_small_obj_pool_fd); - remove(_small_obj_pool_filename.c_str()); + munmap(_small_obj_pool_shm, _max_call_id * _small_obj_shm_size); + + // release worker semaphores for (auto iter : _worker_semaphores) { sem_t *worker_avail_sem = iter.first; @@ -253,43 +283,51 @@ Worker::~Worker() // for main process to release resources sem_close(func_ready_sem); sem_close(worker_avail_sem); } - for (auto shm : _worker_shms) - { - munmap(shm, _call_shm_size); - } - for (auto fd : _worker_shm_fds) - { - close(fd); - } - for (int i = 0; i < _num_workers; i++) - { - string shm_name = _func_call_shm_prefix + to_string(i); - shm_unlink(shm_name.c_str()); - string worker_avail_sem_name = _worker_avail_sem_prefix + to_string(i); - string func_ready_sem_name = _func_ready_sem_prefix + to_string(i); - sem_unlink(worker_avail_sem_name.c_str()); - sem_unlink(func_ready_sem_name.c_str()); - } - for (int call_id = 0; call_id < _num_calls; call_id++) + + if (_main_worker_pid == getpid()) { - for (auto prefix : {_func_args_shm_prefix, _func_result_shm_prefix}) + for (auto pid : _worker_pids) { - string shm_name = prefix + to_string(call_id); - int shm_fd = shm_open(shm_name.c_str(), O_RDONLY, 0666); - if (shm_fd != -1) - { - close(shm_fd); - shm_unlink(shm_name.c_str()); - } + kill(pid, SIGINT); + waitpid(pid, NULL, 0); + } + + shm_unlink(_call_worker_shm_name.c_str()); + shm_unlink(_small_obj_pool_shm_name.c_str()); + // release call semaphores + for (auto semid : _call_sem_ids) + { + semctl(semid, 0, IPC_RMID); } - string set_result_name = _set_result_sem_prefix + to_string(call_id); - sem_t *set_result_sem = sem_open(set_result_name.c_str(), 0); - if (set_result_sem != SEM_FAILED) + + // release worker semaphores + for (auto i = 0u; i < _worker_semaphores.size(); i++) { - sem_close(set_result_sem); - sem_unlink(set_result_name.c_str()); + string worker_avail_sem_name = _worker_avail_sem_prefix + to_string(i); + string func_ready_sem_name = _func_ready_sem_prefix + to_string(i); + sem_unlink(worker_avail_sem_name.c_str()); + sem_unlink(func_ready_sem_name.c_str()); + } + + // release large object shm + for (auto call_id = 0u; call_id < _max_call_id; call_id++) + { + for (auto prefix : {_func_args_shm_prefix, _func_result_shm_prefix}) + { + string shm_name = prefix + to_string(call_id); + int shm_fd = shm_open(shm_name.c_str(), O_RDONLY, 0666); + if (shm_fd != -1) + { + close(shm_fd); + shm_unlink(shm_name.c_str()); + } + } } } + else + { + kill(_main_worker_pid, SIGINT); + } } int Worker::find_avail_worker_id() @@ -298,7 +336,7 @@ int Worker::find_avail_worker_id() std::mt19937 gen(rd()); std::uniform_int_distribution<> dis(0, _num_workers - 1); int i; - for (int cnt = 0; cnt < 4 * _num_workers; cnt++) + for (auto cnt = 0u; cnt < 4 * _num_workers; cnt++) { i = dis(gen); if (sem_trywait(_worker_semaphores[i].first) == 0) @@ -314,30 +352,24 @@ int Worker::find_avail_worker_id() int Worker::get_call_id() { - unique_lock lock(_mutex); - int call_id = _num_calls; - _num_calls++; - string set_result_name = _set_result_sem_prefix + to_string(call_id); - sem_t *set_result_sem = sem_open(set_result_name.c_str(), O_CREAT, 0666, 0); - if (set_result_sem == SEM_FAILED) - { - perror(("Error: sem_open in get_call_id" + set_result_name).c_str()); - exit(1); - } - sem_close(set_result_sem); + unique_lock lock(_call_id_mutex); + _call_id_cv.wait(lock, [this] + { return !_call_id_pool.empty(); }); + int call_id = _call_id_pool.front(); + _call_id_pool.pop(); return call_id; } string Worker::get_content(const string &prefix, const int call_id) { - int pool_idx = call_id % _small_obj_max_num; - char *small_obj_shm = (char *)_small_obj_pool_shm + pool_idx * _small_obj_shm_size; + char *small_obj_shm = (char *)_small_obj_pool_shm + call_id * _small_obj_shm_size; int *occupied = (int *)small_obj_shm; - logger("get_content 0: occupied = " + to_string(*occupied) + " call_id = " + to_string(call_id) + " " + to_string(*(int *)(small_obj_shm + sizeof(int)))); - if (*occupied && *(int *)(small_obj_shm + sizeof(int)) == call_id) + logger("get_content 0: occupied = " + to_string(*occupied) + " call_id = " + to_string(call_id) + " " + to_string(*(int *)(small_obj_shm)) + " " + to_string(*(int *)(small_obj_shm + sizeof(int)))); + if (*occupied) { - int content_size = *(int *)(small_obj_shm + sizeof(int) * 2); - string result(small_obj_shm + sizeof(int) * 3, small_obj_shm + sizeof(int) * 3 + content_size); + int content_size = *(int *)(small_obj_shm + sizeof(int)); + string result(small_obj_shm + sizeof(int) * 2, small_obj_shm + sizeof(int) * 2 + content_size); + logger("get_content in pool " + to_string(call_id) + " [" + result + "]"); *occupied = false; return result; } @@ -346,14 +378,14 @@ string Worker::get_content(const string &prefix, const int call_id) if (shm_fd == -1) { perror(("Error: shm_open in get_content: " + shm_name).c_str()); - exit(1); + kill(_main_worker_pid, SIGINT); } struct stat shm_stat; if (fstat(shm_fd, &shm_stat) == -1) { close(shm_fd); perror(("Error: fstat in get_content: " + shm_name).c_str()); - exit(1); + kill(_main_worker_pid, SIGINT); } auto shm_size = shm_stat.st_size; logger("get_content 1: shm_name = " + shm_name + " shm_size = " + to_string(shm_size)); @@ -361,7 +393,7 @@ string Worker::get_content(const string &prefix, const int call_id) if (shm == MAP_FAILED) { perror(("Error: mmap in get_content: " + shm_name).c_str()); - exit(1); + kill(_main_worker_pid, SIGINT); } int content_size = *(int *)shm; string content((char *)shm + sizeof(int), (char *)shm + sizeof(int) + content_size); @@ -374,41 +406,14 @@ string Worker::get_content(const string &prefix, const int call_id) void Worker::set_content(const string &prefix, const int call_id, const string &content) { + logger("set_content: " + to_string(content.size()) + " content = [" + content + "]"); if (content.size() <= _small_obj_size) { - int pool_idx = call_id % _small_obj_max_num; - char *small_obj_shm = (char *)_small_obj_pool_shm + pool_idx * _small_obj_shm_size; - int *occupied = (int *)small_obj_shm; - struct flock lock; - lock.l_type = F_WRLCK; - lock.l_whence = SEEK_SET; - lock.l_start = pool_idx * _small_obj_shm_size; - lock.l_len = _small_obj_shm_size; - for (bool running = true; running; std::this_thread::sleep_for(std::chrono::milliseconds(1))) - { - if (*occupied == false) - { - if (fcntl(_small_obj_pool_fd, F_SETLKW, &lock) == -1) - { - perror(("Error: fcntl in set_content (lock): " + prefix + to_string(call_id)).c_str()); - exit(1); - } - if (*occupied == false) - { - *occupied = true; - running = false; - } - lock.l_type = F_UNLCK; - if (fcntl(_small_obj_pool_fd, F_SETLK, &lock) == -1) - { - perror(("Error: fcntl in set_content (unlock): " + prefix + to_string(call_id)).c_str()); - exit(1); - } - } - } - *(int *)(small_obj_shm + sizeof(int)) = call_id; - *(int *)(small_obj_shm + sizeof(int) * 2) = content.size(); - memcpy(small_obj_shm + sizeof(int) * 3, content.c_str(), content.size()); + logger("set_content in pool "); + char *small_obj_shm = (char *)_small_obj_pool_shm + call_id * _small_obj_shm_size; + *(int *)small_obj_shm = true; + *(int *)(small_obj_shm + sizeof(int)) = content.size(); + memcpy(small_obj_shm + sizeof(int) * 2, content.c_str(), content.size()); return; } string shm_name = prefix + to_string(call_id); @@ -416,7 +421,7 @@ void Worker::set_content(const string &prefix, const int call_id, const string & if (shm_fd == -1) { perror(("Error: shm_open in set_content: " + shm_name).c_str()); - exit(1); + kill(_main_worker_pid, SIGINT); } logger("set_content: " + shm_name + " content = [" + content + "]"); int shm_size = content.size() + sizeof(int); @@ -425,7 +430,7 @@ void Worker::set_content(const string &prefix, const int call_id, const string & if (shm == MAP_FAILED) { perror(("Error: mmap in set_content: " + shm_name).c_str()); - exit(1); + kill(_main_worker_pid, SIGINT); } *(int *)shm = (int)content.size(); memcpy((char *)shm + sizeof(int), content.c_str(), content.size()); @@ -446,39 +451,20 @@ void Worker::set_args_repr(const int call_id, const string &args_repr) string Worker::get_result(const int call_id) { - string set_result_name = _set_result_sem_prefix + to_string(call_id); - logger("get_result start: set_result_name = " + set_result_name); - sem_t *set_result_sem = sem_open(set_result_name.c_str(), 0); - if (set_result_sem == SEM_FAILED) + P(_call_sem_ids[call_id / _sem_num_per_sem_id], call_id % _sem_num_per_sem_id); + string result = get_content(_func_result_shm_prefix, call_id); { - perror(("Error: sem_open in get_result: " + set_result_name).c_str()); - exit(1); + unique_lock lock(_call_id_mutex); + _call_id_pool.push(call_id); + _call_id_cv.notify_one(); } - sem_wait(set_result_sem); - - string result = get_content(_func_result_shm_prefix, call_id); - logger("get_result 2: set_result_name = " + set_result_name + " result = [" + result + "]"); - - sem_close(set_result_sem); - sem_unlink(set_result_name.c_str()); return result; } void Worker::set_result(const int call_id, const string &result) { - string set_result_name = _set_result_sem_prefix + to_string(call_id); - logger("set_result 1: set_result_name = " + set_result_name); - sem_t *set_result_sem = sem_open(set_result_name.c_str(), 0); - if (set_result_sem == SEM_FAILED) - { - perror(("Error: sem_open in set_result: " + set_result_name).c_str()); - exit(1); - } - set_content(_func_result_shm_prefix, call_id, result); - - sem_post(set_result_sem); - sem_close(set_result_sem); + V(_call_sem_ids[call_id / _sem_num_per_sem_id], call_id % _sem_num_per_sem_id); } int Worker::get_worker_id_by_agent_id(const string &agent_id) @@ -496,18 +482,16 @@ int Worker::get_worker_id_by_agent_id(const string &agent_id) pair Worker::get_task_id_and_callback_id() { - bool remove_flag = false; if (_tasks_head_mutex.try_lock()) { // remove front - remove_flag = true; - while (!_tasks.empty() && (_tasks.size() >= _max_tasks || is_timeout(_tasks.front().first))) + while (!_tasks.empty() && _tasks.front().second->_task_finished && (_tasks.size() >= _max_tasks || is_timeout(_tasks.front().first))) { + std::cout << "takes " << _tasks.front().second->task_id() << std::endl; _tasks.pop_front(); } _tasks_head_mutex.unlock(); } - logger("get_task_id_and_callback_id 0: remove_flag = " + to_string(remove_flag)); unique_lock lock(_tasks_tail_mutex); int task_id = _num_tasks; _num_tasks++; @@ -538,7 +522,7 @@ pair Worker::get_task_result(const int task_id) int first_task_id = first_task.second->task_id(); int idx = task_id - first_task_id; logger("get_task_result 2: task_id = " + to_string(task_id) + " idx = " + to_string(idx)); - if (0 <= idx && idx < _tasks.size()) + if (0 <= idx && idx < (int)_tasks.size()) { string result_str = _tasks[idx].second->get_result(); logger("get_task_result 3: task_id = " + to_string(task_id) + " idx = " + to_string(idx) + " result_str = [" + result_str + "]"); @@ -560,8 +544,8 @@ int Worker::call_worker_func(const int worker_id, const function_ids func_id, co sem_wait(_worker_semaphores[worker_id].first); } int call_id = get_call_id(); - *(int *)_worker_shms[worker_id] = call_id; - *(int *)(_worker_shms[worker_id] + sizeof(int)) = func_id; + *(int *)(_call_worker_shm + worker_id * _call_shm_size) = call_id; + *(int *)(_call_worker_shm + worker_id * _call_shm_size + sizeof(int)) = func_id; logger("call_worker_func 1: " + to_string(func_id) + " call_id = " + to_string(call_id)); if (args != nullptr) { @@ -593,6 +577,7 @@ string Worker::call_create_agent(const string &agent_id, const string &agent_ini unique_lock lock(_agent_id_map_mutex); _agent_id_map.insert(std::make_pair(agent_id, worker_id)); } + logger("call_create_agent 3: " + agent_id + " call_id = " + to_string(call_id) + " worker_id = " + to_string(worker_id) + " result = [" + result + "]"); return result; } @@ -662,7 +647,7 @@ string Worker::call_delete_all_agents() vector call_id_list; { unique_lock lock(_agent_id_map_mutex); - for (int worker_id = 0; worker_id < _num_workers; worker_id++) + for (auto worker_id = 0u; worker_id < _num_workers; worker_id++) { int call_id = call_worker_func(worker_id, function_ids::delete_all_agents, nullptr); call_id_list.push_back(call_id); @@ -739,7 +724,7 @@ string Worker::call_get_agent_list() vector call_id_list; { shared_lock lock(_agent_id_map_mutex); - for (int worker_id = 0; worker_id < _num_workers; worker_id++) + for (auto worker_id = 0u; worker_id < _num_workers; worker_id++) { int call_id = call_worker_func(worker_id, function_ids::get_agent_list, nullptr); call_id_list.push_back(call_id); @@ -779,7 +764,7 @@ string Worker::call_set_model_configs(const string &model_configs) vector call_id_list; ModelConfigsArgs args; args.set_model_configs(model_configs); - for (int i = 0; i < _num_workers; i++) + for (auto i = 0u; i < _num_workers; i++) { int call_id = call_worker_func(i, function_ids::set_model_configs, &args); call_id_list.push_back(call_id); @@ -980,10 +965,10 @@ string Worker::call_server_info() void Worker::server_info_worker(const int call_id) { py::gil_scoped_acquire acquire; - py::object process = py::module::import("psutil").attr("Process")(_pid); + py::object process = py::module::import("psutil").attr("Process")(_main_worker_pid); double cpu_info = process.attr("cpu_percent")("interval"_a = 1).cast(); double mem_info = process.attr("memory_info")().attr("rss").cast() / (1 << 20); - py::dict result("pid"_a = _pid, "id"_a = _server_id, "cpu"_a = cpu_info, "mem"_a = mem_info); + py::dict result("pid"_a = _main_worker_pid, "id"_a = _server_id, "cpu"_a = cpu_info, "mem"_a = mem_info); string result_str = py::module::import("json").attr("dumps")(result).cast(); set_result(call_id, result_str); } \ No newline at end of file diff --git a/src/agentscope/cpp_server/worker.h b/src/agentscope/cpp_server/worker.h index 8a087f5b5..50983c72d 100644 --- a/src/agentscope/cpp_server/worker.h +++ b/src/agentscope/cpp_server/worker.h @@ -11,8 +11,10 @@ #include #include #include +#include #include #include +#include #include #include @@ -21,6 +23,7 @@ #include "worker_args.pb.h" using std::deque; +using std::queue; using std::make_pair; using std::pair; using std::string; @@ -60,32 +63,35 @@ class Worker const string _host; const string _port; const string _server_id; + const int _main_worker_pid; const unsigned int _num_workers; - int _pid; int _worker_id; - int _num_calls; - mutex _mutex; vector _worker_pids; - vector _worker_shm_fds; - vector _worker_shms; - vector> _worker_semaphores; - const string _func_call_shm_prefix; + + const int _sem_num_per_sem_id; + const unsigned int _call_shm_size; + const unsigned int _max_call_id; + const unsigned int _small_obj_size; + const unsigned int _small_obj_shm_size; + + const string _call_worker_shm_name; const string _func_args_shm_prefix; const string _func_result_shm_prefix; const string _worker_avail_sem_prefix; const string _func_ready_sem_prefix; - const string _set_result_sem_prefix; const string _small_obj_pool_shm_name; - const string _small_obj_pool_filename; + + vector _call_sem_ids; + int _call_worker_shm_fd; + char *_call_worker_shm; + vector> _worker_semaphores; int _small_obj_pool_shm_fd; - int _small_obj_pool_fd; void *_small_obj_pool_shm; - const unsigned int _call_shm_size; - const unsigned int _small_obj_max_num; - const unsigned int _small_obj_size; - const unsigned int _small_obj_shm_size; + mutex _call_id_mutex; + condition_variable _call_id_cv; + queue _call_id_pool; - bool _use_logger; + const bool _use_logger; mutex _logger_mutex; unordered_map _agent_id_map; // map agent id to worker id @@ -124,6 +130,38 @@ class Worker void set_result(const int call_id, const string &result); int get_worker_id_by_agent_id(const string &agent_id); + static const unsigned int _default_max_call_id = 10000; + static unsigned int calc_max_call_id() + { + char *max_call_id = getenv("AGENTSCOPE_MAX_CALL_ID"); + if (max_call_id != nullptr) + { + try { + return std::stoi(max_call_id); + } catch (const std::invalid_argument&) { + return _default_max_call_id; + } catch (const std::out_of_range&) { + return _default_max_call_id; + } + } + else + { + return _default_max_call_id; + } + } + static bool calc_use_logger() + { + char *use_logger = getenv("AGENTSCOPE_USE_CPP_LOGGER"); + if (use_logger != nullptr && std::string(use_logger) == "True") + { + return true; + } + else + { + return false; + } + } + inline long long get_current_timestamp() { auto now = std::chrono::system_clock::now();