Skip to content

Commit

Permalink
Use io_context instead of the deprecated io_service
Browse files Browse the repository at this point in the history
- User boost::asio::bind_executor instead of the deprecated io_service::wrap()

Re ECFLOW-1973
  • Loading branch information
marcosbento committed Sep 3, 2024
1 parent 57fde12 commit 7c077d9
Show file tree
Hide file tree
Showing 33 changed files with 122 additions and 120 deletions.
12 changes: 6 additions & 6 deletions docs/internal_team_notes/src/Server/doc/disk_io.ddoc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ instead of requiring a type (i.e. task must inherit from process), provides more
class thread_pool
{
private:
boost::asio::io_service io_service_;
boost::asio::io_service io_;
boost::asio::io_service::work work_;
boost::thread_group threads_;
std::size_t available_;
Expand All @@ -88,21 +88,21 @@ public:

/// @brief Constructor.
thread_pool( std::size_t pool_size )
: work_( io_service_ ),
: work_( io_ ),
available_( pool_size )
{
for ( std::size_t i = 0; i < pool_size; ++i )
{
threads_.create_thread( boost::bind( &boost::asio::io_service::run,
&io_service_ ) );
&io_ ) );
}
}

/// @brief Destructor.
~thread_pool()
{
// Force all threads to return from io_service::run().
io_service_.stop();
io_.stop();

// Suppress all exceptions.
try
Expand All @@ -125,7 +125,7 @@ public:
--available_;

// Post a wrapped task into the queue.
io_service_.post( boost::bind( &thread_pool::wrap_task, this,
io_.post( boost::bind( &thread_pool::wrap_task, this,
boost::function< void() >( task ) ) );
}

Expand Down Expand Up @@ -319,4 +319,4 @@ private:
bool m_terminate;
boost::asio::io_service *m_ioService;
boost::thread *m_serviceThread;
}
}
10 changes: 5 additions & 5 deletions libs/base/src/ecflow/base/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@
/// If we do not have a timeout, it will hang indefinitely

/// Constructor starts the asynchronous connect operation.
Client::Client(boost::asio::io_service& io_service,
Client::Client(boost::asio::io_context& io,
Cmd_ptr cmd_ptr,
const std::string& host,
const std::string& port,
int timeout)
: stopped_(false),
host_(host),
port_(port),
connection_(io_service),
deadline_(io_service),
connection_(io),
deadline_(io),
timeout_(timeout) {
/// Avoid sending a NULL request to the server
if (!cmd_ptr.get())
Expand All @@ -60,7 +60,7 @@ Client::Client(boost::asio::io_service& io_service,

// Host name resolution is performed using a resolver, where host and service
// names(or ports) are looked up and converted into one or more end points
boost::asio::ip::tcp::resolver resolver(io_service);
boost::asio::ip::tcp::resolver resolver(io);
boost::asio::ip::tcp::resolver::query query(host_, port_);
boost::asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);

Expand Down Expand Up @@ -299,7 +299,7 @@ void Client::handle_read(const boost::system::error_code& e) {
throw std::runtime_error(ss.str());
}

// Since we are not starting a new operation the io_service will run out of
// Since we are not starting a new operation the io_context will run out of
// work to do and the Client will exit.
}

Expand Down
2 changes: 1 addition & 1 deletion libs/base/src/ecflow/base/Client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
class Client {
public:
/// Constructor starts the asynchronous connect operation.
Client(boost::asio::io_service& io_service,
Client(boost::asio::io_context& io,
Cmd_ptr cmd_ptr,
const std::string& host,
const std::string& port,
Expand Down
2 changes: 1 addition & 1 deletion libs/base/src/ecflow/base/Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ connection::~connection() {
#endif
}

connection::connection(boost::asio::io_service& io_service) : socket_(io_service) {
connection::connection(boost::asio::io_context& io) : socket_(io) {
#ifdef DEBUG_CONNECTION
if (Ecf::server())
std::cout << "SERVER: Connection::connection\n";
Expand Down
2 changes: 1 addition & 1 deletion libs/base/src/ecflow/base/Connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class connection {
public:
~connection();

explicit connection(boost::asio::io_service& io_service);
explicit connection(boost::asio::io_context& io);
boost::asio::ip::tcp::socket& socket() { return socket_; }
boost::asio::ip::tcp::socket& socket_ll() { return socket_; }

Expand Down
10 changes: 5 additions & 5 deletions libs/base/src/ecflow/base/SslClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
/// If we do not have a timeout, it will hang indefinitely

/// Constructor starts the asynchronous connect operation.
SslClient::SslClient(boost::asio::io_service& io_service,
SslClient::SslClient(boost::asio::io_context& io,
boost::asio::ssl::context& context,
Cmd_ptr cmd_ptr,
const std::string& host,
Expand All @@ -36,8 +36,8 @@ SslClient::SslClient(boost::asio::io_service& io_service,
: stopped_(false),
host_(host),
port_(port),
connection_(io_service, context),
deadline_(io_service),
connection_(io, context),
deadline_(io),
timeout_(timeout) {
/// Avoid sending a NULL request to the server
if (!cmd_ptr.get())
Expand All @@ -60,7 +60,7 @@ SslClient::SslClient(boost::asio::io_service& io_service,

// Host name resolution is performed using a resolver, where host and service
// names(or ports) are looked up and converted into one or more end points
boost::asio::ip::tcp::resolver resolver(io_service);
boost::asio::ip::tcp::resolver resolver(io);
boost::asio::ip::tcp::resolver::query query(host_, port_);
boost::asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);

Expand Down Expand Up @@ -331,7 +331,7 @@ void SslClient::handle_read(const boost::system::error_code& e) {
throw std::runtime_error(ss.str());
}

// Since we are not starting a new operation the io_service will run out of
// Since we are not starting a new operation the io_context will run out of
// work to do and the Client will exit.
}

Expand Down
2 changes: 1 addition & 1 deletion libs/base/src/ecflow/base/SslClient.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
class SslClient {
public:
/// Constructor starts the asynchronous connect operation.
SslClient(boost::asio::io_service& io_service,
SslClient(boost::asio::io_context& io,
boost::asio::ssl::context& context,
Cmd_ptr cmd_ptr,
const std::string& host,
Expand Down
10 changes: 5 additions & 5 deletions libs/base/src/ecflow/base/cts/user/PlugCmd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,25 +173,25 @@ STC_Cmd_ptr PlugCmd::doHandleRequest(AbstractServer* as) const {
// Server is acting like a client, Send MoveCmd to another server
// The source should end up being copied, when sent to remote server
ServerReply server_reply;
boost::asio::io_service io_service;
boost::asio::io_context io;
#ifdef ECF_OPENSSL
if (!as->ssl().empty()) {
ecf::Openssl openssl;
if (!openssl.enable_no_throw(host, port, as->ssl()))
throw std::runtime_error("PlugCmd::doHandleRequest Could not enable ssl for " + as->ssl());
openssl.init_for_client();

SslClient theClient(io_service, openssl.context(), cts_cmd, host, port);
io_service.run();
SslClient theClient(io, openssl.context(), cts_cmd, host, port);
io.run();
theClient.handle_server_response(server_reply, false /* debug */);
if (server_reply.client_request_failed()) {
throw std::runtime_error(server_reply.error_msg());
}
}
else {
#endif
Client theClient(io_service, cts_cmd, host, port);
io_service.run();
Client theClient(io, cts_cmd, host, port);
io.run();
theClient.handle_server_response(server_reply, false /* debug */);
if (server_reply.client_request_failed()) {
throw std::runtime_error(server_reply.error_msg());
Expand Down
4 changes: 2 additions & 2 deletions libs/base/src/ecflow/base/ssl_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ ssl_connection::~ssl_connection() {
#endif
}

ssl_connection::ssl_connection(boost::asio::io_service& io_service, boost::asio::ssl::context& context)
: socket_(io_service, context) {
ssl_connection::ssl_connection(boost::asio::io_context& io, boost::asio::ssl::context& context)
: socket_(io, context) {
#ifdef DEBUG_CONNECTION
if (Ecf::server())
std::cout << "SERVER: ssl_connection::ssl_connection\n";
Expand Down
2 changes: 1 addition & 1 deletion libs/base/src/ecflow/base/ssl_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ssl_connection {
public:
~ssl_connection();

ssl_connection(boost::asio::io_service& io_service, boost::asio::ssl::context& context);
ssl_connection(boost::asio::io_context& io, boost::asio::ssl::context& context);
bool verify_certificate(bool preverified, boost::asio::ssl::verify_context& ctx);

/// Get the underlying socket. Used for making a connection or for accepting
Expand Down
20 changes: 10 additions & 10 deletions libs/client/src/ecflow/client/ClientInvoker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ int ClientInvoker::do_invoke_cmd(Cmd_ptr cts_cmd) const {
int no_of_tries = connection_attempts_;
while (no_of_tries > 0) {
try {
/// *** Each call to io_service.run(); is a *REQUEST* to the server ***
/// *** Each call to io_context::run(); is a *REQUEST* to the server ***
/// *** Hence we *MUST* clear the server_reply before each call *******
/// *** Found during zombie test. i.e when blocking, we were responding to previous, reply, since
/// server_reply was not being reset *Note* server_reply_.client_handle_ is kept until the next call
Expand All @@ -392,26 +392,26 @@ int ClientInvoker::do_invoke_cmd(Cmd_ptr cts_cmd) const {
<< ")<<<" << endl;
}

boost::asio::io_service io_service;
boost::asio::io_context io;
#ifdef ECF_OPENSSL
if (clientEnv_.ssl()) {

clientEnv_.openssl().init_for_client();

SslClient theClient(io_service,
SslClient theClient(io,
clientEnv_.openssl().context(),
cts_cmd,
clientEnv_.host(),
clientEnv_.port(),
clientEnv_.connect_timeout());
{
#ifdef DEBUG_PERF
ecf::ScopedDurationTimer my_timer(" io_service.run()");
ecf::ScopedDurationTimer my_timer(" io.run()");
#endif
io_service.run();
io.run();
}
if (clientEnv_.debug())
cout << TimeStamp::now() << "ClientInvoker: >>> After: io_service.run() <<<" << endl;
cout << TimeStamp::now() << "ClientInvoker: >>> After: io_context::run() <<<" << endl;

/// Let see how the server responded if at all.
try {
Expand All @@ -429,15 +429,15 @@ int ClientInvoker::do_invoke_cmd(Cmd_ptr cts_cmd) const {
else {
#endif
Client theClient(
io_service, cts_cmd, clientEnv_.host(), clientEnv_.port(), clientEnv_.connect_timeout());
io, cts_cmd, clientEnv_.host(), clientEnv_.port(), clientEnv_.connect_timeout());
{
#ifdef DEBUG_PERF
ecf::ScopedDurationTimer my_timer(" io_service.run()");
ecf::ScopedDurationTimer my_timer(" io.run()");
#endif
io_service.run();
io.run();
}
if (clientEnv_.debug())
cout << TimeStamp::now() << "ClientInvoker: >>> After: io_service.run() <<<" << endl;
cout << TimeStamp::now() << "ClientInvoker: >>> After: io_context::run() <<<" << endl;

/// Let see how the server responded if at all.
try {
Expand Down
14 changes: 7 additions & 7 deletions libs/server/src/ecflow/server/BaseServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ using namespace std;
using namespace ecf;

/// Constructor opens the acceptor and starts waiting for the first incoming connection.
BaseServer::BaseServer(boost::asio::io_service& io_service, ServerEnvironment& serverEnv)
: io_service_(io_service),
signals_(io_service),
BaseServer::BaseServer(boost::asio::io_context& io, ServerEnvironment& serverEnv)
: io_(io),
signals_(io),
defs_(Defs::create()), // ECFLOW-182
traverser_(this, io_service, serverEnv),
checkPtSaver_(this, io_service, &serverEnv),
traverser_(this, io, serverEnv),
checkPtSaver_(this, io, &serverEnv),
serverState_(SState::HALTED),
serverEnv_(serverEnv) {
if (serverEnv_.debug())
Expand Down Expand Up @@ -498,9 +498,9 @@ bool BaseServer::debug() const {
}

void BaseServer::sigterm_signal_handler() {
if (io_service_.stopped()) {
if (io_.stopped()) {
if (serverEnv_.debug())
cout << "-->BaseServer::sigterm_signal_handler(): io_service is stopped returning " << endl;
cout << "-->BaseServer::sigterm_signal_handler(): io_context has stopped returning " << endl;
return;
}

Expand Down
6 changes: 3 additions & 3 deletions libs/server/src/ecflow/server/BaseServer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class BaseServer : public AbstractServer {
public:
/// Constructor opens the acceptor and starts waiting for the first incoming
/// connection.
explicit BaseServer(boost::asio::io_service& io_service, ServerEnvironment&);
explicit BaseServer(boost::asio::io_context& io, ServerEnvironment&);
~BaseServer() override;

void handle_terminate();
Expand Down Expand Up @@ -88,8 +88,8 @@ class BaseServer : public AbstractServer {
void sigterm_signal_handler();

protected:
/// The io_service used to perform asynchronous operations.
boost::asio::io_service& io_service_;
/// The io_context used to perform asynchronous operations.
boost::asio::io_context& io_;

/// The signal_set is used to register for automatic check pointing
boost::asio::signal_set signals_;
Expand Down
12 changes: 6 additions & 6 deletions libs/server/src/ecflow/server/CheckPtSaver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
using namespace ecf;

//-------------------------------------------------------------------------------------
CheckPtSaver::CheckPtSaver(BaseServer* s, boost::asio::io_service& io, const ServerEnvironment* serverEnv)
CheckPtSaver::CheckPtSaver(BaseServer* s, boost::asio::io_context& io, const ServerEnvironment* serverEnv)
: server_(s),
timer_(io, boost::posix_time::seconds(0)),
firstTime_(true),
Expand Down Expand Up @@ -71,13 +71,13 @@ void CheckPtSaver::start() {
if (firstTime_) {
firstTime_ = false;
timer_.expires_from_now(boost::posix_time::seconds(serverEnv_->checkPtInterval()));
timer_.async_wait(
server_->io_service_.wrap([this](const boost::system::error_code& error) { periodicSaveCheckPt(error); }));
timer_.async_wait(boost::asio::bind_executor(
server_->io_, [this](const boost::system::error_code& error) { periodicSaveCheckPt(error); }));
}
}

void CheckPtSaver::stop() { // The server is stopped by cancelling all outstanding asynchronous
// operations. Once all operations have finished the io_service::run() call
// operations. Once all operations have finished the io_context::run() call
// will exit.
#ifdef DEBUG_CHECKPT
std::cout << " CheckPtSaver::stop() check_mode: " << serverEnv_->check_mode_str()
Expand Down Expand Up @@ -174,8 +174,8 @@ void CheckPtSaver::periodicSaveCheckPt(const boost::system::error_code& error) {

/// Appears that expires_from_now is more accurate then expires_at
timer_.expires_from_now(boost::posix_time::seconds(serverEnv_->checkPtInterval()));
timer_.async_wait(
server_->io_service_.wrap([this](const boost::system::error_code& error) { periodicSaveCheckPt(error); }));
timer_.async_wait(boost::asio::bind_executor(
server_->io_, [this](const boost::system::error_code& error) { periodicSaveCheckPt(error); }));
}

void CheckPtSaver::doSave() const {
Expand Down
2 changes: 1 addition & 1 deletion libs/server/src/ecflow/server/CheckPtSaver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class BaseServer;

class CheckPtSaver {
public:
CheckPtSaver(BaseServer* s, boost::asio::io_service& io, const ServerEnvironment*);
CheckPtSaver(BaseServer* s, boost::asio::io_context& io, const ServerEnvironment*);
// Disable copy (and move) semantics
CheckPtSaver(const CheckPtSaver&) = delete;
const CheckPtSaver& operator=(const CheckPtSaver&) = delete;
Expand Down
5 changes: 3 additions & 2 deletions libs/server/src/ecflow/server/NodeTreeTraverser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ using namespace boost::posix_time;
// Hence we poll every second, and check it against the minute boundary
// ************************************************************************

NodeTreeTraverser::NodeTreeTraverser(BaseServer* s, boost::asio::io_service& io, const ServerEnvironment& serverEnv)
NodeTreeTraverser::NodeTreeTraverser(BaseServer* s, boost::asio::io_context& io, const ServerEnvironment& serverEnv)
: server_(s),
serverEnv_(serverEnv),
timer_(io, boost::posix_time::seconds(0)),
Expand Down Expand Up @@ -284,7 +284,8 @@ void NodeTreeTraverser::start_timer() {
/// Appears that expires_from_now is more accurate then expires_at i.e timer_.expires_at( timer_.expires_at() +
/// boost::posix_time::seconds( poll_at ) );
timer_.expires_from_now(boost::posix_time::seconds(1));
timer_.async_wait(server_->io_service_.wrap([this](const boost::system::error_code& error) { traverse(error); }));
timer_.async_wait(
boost::asio::bind_executor(server_->io_, [this](const boost::system::error_code& error) { traverse(error); }));
}

void NodeTreeTraverser::traverse(const boost::system::error_code& error) {
Expand Down
2 changes: 1 addition & 1 deletion libs/server/src/ecflow/server/NodeTreeTraverser.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class ServerEnvironment;

class NodeTreeTraverser {
public:
NodeTreeTraverser(BaseServer* s, boost::asio::io_service& io, const ServerEnvironment& serverEnv);
NodeTreeTraverser(BaseServer* s, boost::asio::io_context& io, const ServerEnvironment& serverEnv);
// Disable copy (and move) semantics
NodeTreeTraverser(const NodeTreeTraverser&) = delete;
const NodeTreeTraverser& operator=(const NodeTreeTraverser&) = delete;
Expand Down
Loading

0 comments on commit 7c077d9

Please sign in to comment.