Skip to content

Commit

Permalink
Make the dispatcher more resilient against certain overuse patterns.
Browse files Browse the repository at this point in the history
  • Loading branch information
drolbr committed Apr 6, 2023
1 parent 68f2c21 commit b201bb3
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 89 deletions.
27 changes: 19 additions & 8 deletions src/overpass_api/dispatch/dispatcher_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
#include "../../template_db/dispatcher.h"
#include "../../template_db/dispatcher_client.h"

#include <sys/time.h>
#include <sys/resource.h>

#include <cstring>
#include <iostream>
#include <sstream>
Expand Down Expand Up @@ -440,21 +443,29 @@ int main(int argc, char* argv[])

try
{
rlimit max_open_files;
int result = getrlimit(RLIMIT_NOFILE, &max_open_files);
if (result == -1)
{
std::cerr<<"getrlimit(RLIMIT_NOFILE, ..) failed: "<<errno<<' '<<strerror(errno)<<'\n';
return errno;
}

Logger logger(db_dir);
Default_Dispatcher_Logger disp_logger(logger);
if (max_allowed_space <= 0)
max_allowed_space = areas ? area_settings().total_available_space : osm_base_settings().total_available_space;
if (max_allowed_time_units <= 0)
max_allowed_time_units = areas ? area_settings().total_available_time_units
: osm_base_settings().total_available_time_units;
Dispatcher dispatcher
(areas ? area_settings().shared_name : osm_base_settings().shared_name,
"", db_dir + (areas ? "areas_shadow" : "osm_base_shadow"), db_dir,
areas ? area_settings().max_num_processes : osm_base_settings().max_num_processes,
areas ? area_settings().purge_timeout : osm_base_settings().purge_timeout,
max_allowed_space,
max_allowed_time_units,
files_to_manage, &disp_logger);
Dispatcher dispatcher(
areas ? area_settings().shared_name : osm_base_settings().shared_name,
"", db_dir + (areas ? "areas_shadow" : "osm_base_shadow"), db_dir,
areas ? area_settings().max_num_processes : osm_base_settings().max_num_processes,
max_open_files.rlim_cur > 256 ? max_open_files.rlim_cur - 64 : max_open_files.rlim_cur*3/4,
areas ? area_settings().purge_timeout : osm_base_settings().purge_timeout,
max_allowed_space, max_allowed_time_units,
files_to_manage, &disp_logger);

if (rate_limit > -1)
dispatcher.set_rate_limit(rate_limit);
Expand Down
57 changes: 25 additions & 32 deletions src/template_db/dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,10 @@
#include <vector>


Dispatcher_Socket::Dispatcher_Socket
(const std::string& dispatcher_share_name,
const std::string& shadow_name_,
const std::string& db_dir_,
uint max_num_reading_processes)
: socket("", max_num_reading_processes)
Dispatcher_Socket::Dispatcher_Socket(
const std::string& dispatcher_share_name, const std::string& db_dir_,
uint max_num_reading_processes, uint max_num_socket_clients)
: socket("", max_num_reading_processes), open_socket_limit(max_num_socket_clients)
{
signal(SIGPIPE, SIG_IGN);

Expand All @@ -65,22 +63,20 @@ Dispatcher_Socket::~Dispatcher_Socket()

void Dispatcher_Socket::look_for_a_new_connection(Connection_Per_Pid_Map& connection_per_pid)
{
struct sockaddr_un sockaddr_un_dummy;
uint sockaddr_un_dummy_size = sizeof(sockaddr_un_dummy);
int socket_fd = accept(socket.descriptor(), (sockaddr*)&sockaddr_un_dummy,
(socklen_t*)&sockaddr_un_dummy_size);
if (socket_fd == -1)
if (started_connections.size() + connection_per_pid.base_map().size() < open_socket_limit)
{
if (errno != EAGAIN && errno != EWOULDBLOCK)
throw File_Error
(errno, "(socket)", "Dispatcher_Server::6");
}
else
{
if (fcntl(socket_fd, F_SETFL, O_RDWR|O_NONBLOCK) == -1)
throw File_Error
(errno, "(socket)", "Dispatcher_Server::7");
started_connections.push_back(socket_fd);
int socket_fd = accept(socket.descriptor(), NULL, NULL);
if (socket_fd == -1)
{
if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EMFILE)
throw File_Error(errno, "(socket)", "Dispatcher_Server::6");
}
else
{
if (fcntl(socket_fd, F_SETFL, O_RDWR|O_NONBLOCK) == -1)
throw File_Error(errno, "(socket)", "Dispatcher_Server::7");
started_connections.push_back(socket_fd);
}
}

// associate to a new connection the pid of the sender
Expand Down Expand Up @@ -369,17 +365,14 @@ void Global_Resource_Planner::purge(Connection_Per_Pid_Map& connection_per_pid)
}


Dispatcher::Dispatcher
(std::string dispatcher_share_name_,
std::string index_share_name,
std::string shadow_name_,
std::string db_dir_,
uint max_num_reading_processes_, uint purge_timeout_,
uint64 total_available_space_,
uint64 total_available_time_units_,
const std::vector< File_Properties* >& controlled_files_,
Dispatcher_Logger* logger_)
: socket(dispatcher_share_name_, shadow_name_, db_dir_, max_num_reading_processes_),
Dispatcher::Dispatcher(
std::string dispatcher_share_name_,
std::string index_share_name, std::string shadow_name_, std::string db_dir_,
uint max_num_reading_processes_, uint max_num_socket_clients, uint purge_timeout_,
uint64 total_available_space_, uint64 total_available_time_units_,
const std::vector< File_Properties* >& controlled_files_,
Dispatcher_Logger* logger_)
: socket(dispatcher_share_name_, db_dir_, max_num_reading_processes_, max_num_socket_clients),
transaction_insulator(db_dir_, controlled_files_),
shadow_name(shadow_name_),
dispatcher_share_name(dispatcher_share_name_),
Expand Down
13 changes: 6 additions & 7 deletions src/template_db/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,9 @@ class Global_Resource_Planner
class Dispatcher_Socket
{
public:
Dispatcher_Socket(const std::string& dispatcher_share_name,
const std::string& shadow_name_,
const std::string& db_dir_,
uint max_num_reading_processes_);
Dispatcher_Socket(
const std::string& dispatcher_share_name, const std::string& db_dir_,
uint max_num_reading_processes_, uint max_num_socket_clients);
~Dispatcher_Socket();

void look_for_a_new_connection(Connection_Per_Pid_Map& connection_per_pid);
Expand All @@ -158,6 +157,7 @@ class Dispatcher_Socket
Unix_Socket socket;
std::string socket_name;
std::vector< int > started_connections;
uint open_socket_limit;
};


Expand Down Expand Up @@ -202,9 +202,8 @@ class Dispatcher
std::string index_share_name,
std::string shadow_name,
std::string db_dir,
uint max_num_reading_processes, uint purge_timeout,
uint64 total_available_space,
uint64 total_available_time_units,
uint max_num_reading_processes, uint max_num_socket_clients, uint purge_timeout,
uint64 total_available_space, uint64 total_available_time_units,
const std::vector< File_Properties* >& controlled_files,
Dispatcher_Logger* logger = 0);

Expand Down
84 changes: 42 additions & 42 deletions src/template_db/dispatcher.test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -469,8 +469,8 @@ int main(int argc, char* args[])
file_properties.push_back(&test_file_2);
file_properties.push_back(&test_file_3);
Dispatcher dispatcher("osm3s_share_test", "osm3s_index_share_test",
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 180, 1024*1024*1024, 1024*1024, file_properties);
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 500, 180, 1024*1024*1024, 1024*1024, file_properties);
}
catch (File_Error e)
{
Expand All @@ -496,8 +496,8 @@ int main(int argc, char* args[])
file_properties.push_back(&test_file_2);
file_properties.push_back(&test_file_3);
Dispatcher dispatcher("osm3s_share_test", "osm3s_index_share_test",
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 180, 1024*1024*1024, 1024*1024, file_properties);
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 500, 180, 1024*1024*1024, 1024*1024, file_properties);
}

if ((test_to_execute == "") || (test_to_execute == "3"))
Expand All @@ -507,8 +507,8 @@ int main(int argc, char* args[])
std::vector< File_Properties* > file_properties;
file_properties.push_back(&test_file);
Dispatcher dispatcher("osm3s_share_test", "osm3s_index_share_test",
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 180, 1024*1024*1024, 1024*1024, file_properties);
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 500, 180, 1024*1024*1024, 1024*1024, file_properties);
}

if ((test_to_execute == "") || (test_to_execute == "4"))
Expand All @@ -518,8 +518,8 @@ int main(int argc, char* args[])
std::vector< File_Properties* > file_properties;
file_properties.push_back(&test_file);
Dispatcher dispatcher("osm3s_share_test", "osm3s_index_share_test",
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 180, 1024*1024*1024, 1024*1024, file_properties);
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 500, 180, 1024*1024*1024, 1024*1024, file_properties);
dispatcher.write_start(480);
{
Nonsynced_Transaction transaction(true, true, BASE_DIRECTORY, "");
Expand All @@ -539,8 +539,8 @@ int main(int argc, char* args[])
std::vector< File_Properties* > file_properties;
file_properties.push_back(&test_file);
Dispatcher dispatcher("osm3s_share_test", "osm3s_index_share_test",
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 180, 1024*1024*1024, 1024*1024, file_properties);
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 500, 180, 1024*1024*1024, 1024*1024, file_properties);
dispatcher.write_start(495);
dispatcher.write_start(480);
try
Expand Down Expand Up @@ -569,8 +569,8 @@ int main(int argc, char* args[])
std::vector< File_Properties* > file_properties;
file_properties.push_back(&test_file);
Dispatcher dispatcher("osm3s_share_test", "osm3s_index_share_test",
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 180, 1024*1024*1024, 1024*1024, file_properties);
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 500, 180, 1024*1024*1024, 1024*1024, file_properties);
dispatcher.write_start(480);
{
Nonsynced_Transaction transaction(true, true, BASE_DIRECTORY, "");
Expand All @@ -591,8 +591,8 @@ int main(int argc, char* args[])
std::vector< File_Properties* > file_properties;
file_properties.push_back(&test_file);
Dispatcher dispatcher("osm3s_share_test", "osm3s_index_share_test",
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 180, 1024*1024*1024, 1024*1024, file_properties);
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 500, 180, 1024*1024*1024, 1024*1024, file_properties);
dispatcher.write_start(480);
{
Nonsynced_Transaction transaction(true, true, BASE_DIRECTORY, "");
Expand Down Expand Up @@ -621,8 +621,8 @@ int main(int argc, char* args[])
std::vector< File_Properties* > file_properties;
file_properties.push_back(&test_file);
Dispatcher dispatcher("osm3s_share_test", "osm3s_index_share_test",
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 180, 1024*1024*1024, 1024*1024, file_properties);
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 500, 180, 1024*1024*1024, 1024*1024, file_properties);
dispatcher.write_start(480);
{
Nonsynced_Transaction transaction(true, true, BASE_DIRECTORY, "");
Expand Down Expand Up @@ -659,8 +659,8 @@ int main(int argc, char* args[])
std::vector< File_Properties* > file_properties;
file_properties.push_back(&test_file);
Dispatcher dispatcher("osm3s_share_test", "osm3s_index_share_test",
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 180, 1024*1024*1024, 1024*1024, file_properties);
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 500, 180, 1024*1024*1024, 1024*1024, file_properties);
dispatcher.write_start(480);
put_elem(0, 1, test_file);
dispatcher.write_commit(0);
Expand All @@ -676,8 +676,8 @@ int main(int argc, char* args[])
std::vector< File_Properties* > file_properties;
file_properties.push_back(&test_file);
Dispatcher dispatcher("osm3s_share_test", "osm3s_index_share_test",
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 180, 1024*1024*1024, 1024*1024, file_properties);
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 500, 180, 1024*1024*1024, 1024*1024, file_properties);
dispatcher.write_start(480);
put_elem(0, 1, test_file);
dispatcher.write_commit(0);
Expand All @@ -696,8 +696,8 @@ int main(int argc, char* args[])
std::vector< File_Properties* > file_properties;
file_properties.push_back(&test_file);
Dispatcher dispatcher("osm3s_share_test", "osm3s_index_share_test",
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 180, 1024*1024*1024, 1024*1024, file_properties);
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 500, 180, 1024*1024*1024, 1024*1024, file_properties);
dispatcher.write_start(480);
put_elem(0, 1, test_file);
dispatcher.write_commit(0);
Expand All @@ -719,8 +719,8 @@ int main(int argc, char* args[])
std::vector< File_Properties* > file_properties;
file_properties.push_back(&test_file);
Dispatcher dispatcher("osm3s_share_test", "osm3s_index_share_test",
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 180, 1024*1024*1024, 1024*1024, file_properties);
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 500, 180, 1024*1024*1024, 1024*1024, file_properties);
dispatcher.write_start(480);
{
Nonsynced_Transaction transaction(true, true, BASE_DIRECTORY, "");
Expand Down Expand Up @@ -749,8 +749,8 @@ int main(int argc, char* args[])
std::vector< File_Properties* > file_properties;
file_properties.push_back(&test_file);
Dispatcher dispatcher("osm3s_share_test", "osm3s_index_share_test",
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 180, 1024*1024*1024, 1024*1024, file_properties);
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 500, 180, 1024*1024*1024, 1024*1024, file_properties);
dispatcher.write_start(480);
dispatcher.request_read_and_idx(640, 180, 512*1024*1024, 640);
dispatcher.read_idx_finished(640);
Expand Down Expand Up @@ -797,8 +797,8 @@ int main(int argc, char* args[])
std::vector< File_Properties* > file_properties;
file_properties.push_back(&test_file);
Dispatcher dispatcher("osm3s_share_test", "osm3s_index_share_test",
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 180, 1024*1024*1024, 1024*1024, file_properties);
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 500, 180, 1024*1024*1024, 1024*1024, file_properties);
dispatcher.write_start(480);
dispatcher.request_read_and_idx(640, 180, 512*1024*1024, 640);
dispatcher.read_idx_finished(640);
Expand Down Expand Up @@ -830,8 +830,8 @@ int main(int argc, char* args[])
std::vector< File_Properties* > file_properties;
file_properties.push_back(&test_file);
Dispatcher dispatcher("osm3s_share_test", "osm3s_index_share_test",
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 180, 1024*1024*1024, 1024*1024, file_properties);
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 500, 180, 1024*1024*1024, 1024*1024, file_properties);
dispatcher.write_start(480);
{
Nonsynced_Transaction transaction(true, true, BASE_DIRECTORY, "");
Expand Down Expand Up @@ -871,8 +871,8 @@ int main(int argc, char* args[])
std::vector< File_Properties* > file_properties;
file_properties.push_back(&test_file);
Dispatcher dispatcher("osm3s_share_test", "osm3s_index_share_test",
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 180, 1024*1024*1024, 1024*1024, file_properties);
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 500, 180, 1024*1024*1024, 1024*1024, file_properties);
dispatcher.write_start(480);
put_elem(0, 1, test_file);
dispatcher.write_commit(0);
Expand All @@ -897,8 +897,8 @@ int main(int argc, char* args[])
std::vector< File_Properties* > file_properties;
file_properties.push_back(&test_file);
Dispatcher dispatcher("osm3s_share_test", "osm3s_index_share_test",
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 180, 1024*1024*1024, 1024*1024, file_properties);
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 500, 180, 1024*1024*1024, 1024*1024, file_properties);
dispatcher.write_start(480);
{
Nonsynced_Transaction transaction(true, true, BASE_DIRECTORY, "");
Expand Down Expand Up @@ -946,8 +946,8 @@ int main(int argc, char* args[])
std::vector< File_Properties* > file_properties;
file_properties.push_back(&test_file);
Dispatcher dispatcher("osm3s_share_test", "osm3s_index_share_test",
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 180, 1024*1024*1024, 1024*1024, file_properties);
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 500, 180, 1024*1024*1024, 1024*1024, file_properties);
dispatcher.write_start(480);
put_elem(0, 1, test_file);
dispatcher.write_commit(0);
Expand Down Expand Up @@ -977,8 +977,8 @@ int main(int argc, char* args[])
try
{
Dispatcher dispatcher("/", "osm3s_index_share_test",
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 180, 1024*1024*1024, 1024*1024, file_properties);
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 500, 180, 1024*1024*1024, 1024*1024, file_properties);
}
catch (File_Error e)
{
Expand Down Expand Up @@ -1013,8 +1013,8 @@ int main(int argc, char* args[])
try
{
Dispatcher dispatcher("osm3s_share_test", "osm3s_index_share_test",
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 180, 1024*1024*1024, 1024*1024, file_properties);
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 500, 180, 1024*1024*1024, 1024*1024, file_properties);
dispatcher.write_start(480);
put_elem(0, 1, test_file);
dispatcher.write_commit(0);
Expand Down Expand Up @@ -1042,8 +1042,8 @@ int main(int argc, char* args[])
try
{
Dispatcher dispatcher("osm3s_share_test", "osm3s_index_share_test",
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 180, 1024*1024*1024, 1024*1024, file_properties);
BASE_DIRECTORY + "test-shadow", BASE_DIRECTORY,
5, 500, 180, 1024*1024*1024, 1024*1024, file_properties);
}
catch (File_Error e)
{
Expand Down

0 comments on commit b201bb3

Please sign in to comment.