diff --git a/docs/client_api/api/alter.rst b/docs/client_api/api/alter.rst index 909f1305a..6495c1e41 100644 --- a/docs/client_api/api/alter.rst +++ b/docs/client_api/api/alter.rst @@ -20,7 +20,7 @@ alter For change: [ variable | clock_type | clock_gain | clock_date | clock_sync | event | meter | label | trigger | complete | repeat | limit_max | limit_value | defstatus | late | time | - today, aviso, mirror ] + today | aviso | mirror ] *NOTE* If the clock is changed, then the suite will need to be re-queued in order for the change to take effect fully. For add: @@ -47,6 +47,9 @@ alter * for mirror, "--listener '{ \"event\": \"mars\", \"request\": { \"class\": "od" } }' --url http://aviso/ --schema /path/to/schema --polling 60" + For both aviso and mirror, the special value "reload" can be used to force reloading the configuration. + n.b. This is typically useful after updating variables used to configure these kind of attributes. + Usage: ecflow_client --alter=add variable GLOBAL "value" / # add server variable diff --git a/docs/glossary.rst b/docs/glossary.rst index 57f92e111..5c721dfc5 100644 --- a/docs/glossary.rst +++ b/docs/glossary.rst @@ -82,9 +82,15 @@ the user. Each aviso attribute implies that a background thread is spawned whenever - the associated :term:`node` is (re)queued. This background thread is - responsible for polling the Aviso server, and periodically processing the - latest notifications. + the associated :term:`node` is (re)queued. This independent background thread, + responsible for polling the Aviso server and periodically processing the latest notifications, + uses the configuriguration available when the associated task is queued. + + .. note:: + + If any variables provinding the configuration are updated, the Aviso configuration + can be reloaded (without unqueuing the Task) by issuing an Alter change command with + the value :code:`reload` to the relevant Aviso attribute. The authentication credentials file is expected to be in JSON format, following the `ECMWF Web API `_: @@ -1424,9 +1430,16 @@ (empty string), which effectively disables Authentication Each mirror attribute implies that a background thread is spawned whenever - the ecFlow server is :term:`running`. This background thread is - responsible for polling the remote ecFlow server, and periodically - synchronise node status. + the ecFlow server is :term:`running` (i.e. when the server is shutdown or halted the + thread is terminated and the mirroring process is completely stopped). + This independent background thread, responsible for polling the remote ecFlow server and periodically + synchronise node status, uses the configuration available when the server is restarted. + + .. note:: + + If any variables provinding the configuration are updated, the Mirror configuration can be + reloaded (without restarting the Server) by issuing an Alter change command with the value + :code:`reload` to the relevant attributes. The authentication credentials file is expected to be in JSON, according to the following format: diff --git a/libs/base/src/ecflow/base/cts/user/AlterCmd.cpp b/libs/base/src/ecflow/base/cts/user/AlterCmd.cpp index 517c736cc..c10f6ab47 100644 --- a/libs/base/src/ecflow/base/cts/user/AlterCmd.cpp +++ b/libs/base/src/ecflow/base/cts/user/AlterCmd.cpp @@ -613,7 +613,7 @@ const char* AlterCmd::desc() { " For change:\n" " [ variable | clock_type | clock_gain | clock_date | clock_sync | event | meter | label |\n" " trigger | complete | repeat | limit_max | limit_value | defstatus | late | time |\n" - " today, aviso, mirror ]\n" + " today | aviso | mirror ]\n" " *NOTE* If the clock is changed, then the suite will need to be re-queued in order for\n" " the change to take effect fully.\n" " For add:\n" @@ -640,6 +640,9 @@ const char* AlterCmd::desc() { " * for mirror, \"--listener '{ \\\"event\\\": \\\"mars\\\", \\\"request\\\": { \\\"class\\\": \"od\" } }'\n" " --url http://aviso/ --schema /path/to/schema --polling 60\"\n" "\n" + "For both aviso and mirror, the special value \"reload\" can be used to force reloading the configuration.\n" + " n.b. This is typically useful after updating variables used to configure these kind of attributes.\n" + "\n" "Usage:\n\n" " ecflow_client --alter=add variable GLOBAL \"value\" / # add server variable\n" " ecflow_client --alter=add variable FRED \"value\" /path/to/node # add node variable\n" diff --git a/libs/core/src/ecflow/core/Log.cpp b/libs/core/src/ecflow/core/Log.cpp index e742b4926..4ebdae6e1 100644 --- a/libs/core/src/ecflow/core/Log.cpp +++ b/libs/core/src/ecflow/core/Log.cpp @@ -35,8 +35,9 @@ void Log::create(const std::string& filename) { } void Log::destroy() { - if (instance_) + if (instance_) { instance_->flush(); + } delete instance_; instance_ = nullptr; @@ -52,11 +53,9 @@ void Log::create_logimpl() { } bool Log::log(Log::LogType lt, const std::string& message) { - create_logimpl(); + std::lock_guard lock(mx_); - // if (!logImpl_->log_open_error().empty()) { - // cerr << "Log::log: " << message << "\n"; - // } + create_logimpl(); if (!logImpl_->log(lt, message)) { // handle write failure and Get the failure reason. This will delete logImpl_ & recreate @@ -70,11 +69,9 @@ bool Log::log(Log::LogType lt, const std::string& message) { } bool Log::log_no_newline(Log::LogType lt, const std::string& message) { - create_logimpl(); + std::lock_guard lock(mx_); - // if (!logImpl_->log_open_error().empty()) { - // cerr << "Log::log_no_newline : " << message << "\n"; - // } + create_logimpl(); if (!logImpl_->log_no_newline(lt, message)) { // handle write failure and Get the failure reason. This will delete logImpl_ & recreate @@ -88,11 +85,9 @@ bool Log::log_no_newline(Log::LogType lt, const std::string& message) { } bool Log::append(const std::string& message) { - create_logimpl(); + std::lock_guard lock(mx_); - // if (!logImpl_->log_open_error().empty()) { - // cerr << "Log::append : " << message << "\n"; - // } + create_logimpl(); if (!logImpl_->append(message)) { // handle write failure and Get the failure reason. This will delete logImpl_ & recreate @@ -106,26 +101,36 @@ bool Log::append(const std::string& message) { } void Log::cache_time_stamp() { + std::lock_guard lock(mx_); + create_logimpl(); logImpl_->create_time_stamp(); } const std::string& Log::get_cached_time_stamp() const { + std::lock_guard lock(mx_); + return (logImpl_) ? logImpl_->get_cached_time_stamp() : Str::EMPTY(); } void Log::flush() { + std::lock_guard lock(mx_); + // will close ofstream and force data to be written to disk. // Forcing writing to physical medium can't be guaranteed though! logImpl_.reset(); } void Log::flush_only() { - if (logImpl_) + std::lock_guard lock(mx_); + + if (logImpl_) { logImpl_->flush(); + } } void Log::clear() { + std::lock_guard lock(mx_); flush(); // Open and truncate the file. @@ -136,6 +141,8 @@ void Log::clear() { } void Log::new_path(const std::string& the_new_path) { + std::lock_guard lock(mx_); + check_new_path(the_new_path); // flush and close log file @@ -174,6 +181,8 @@ void Log::check_new_path(const std::string& new_path) { } std::string Log::path() const { + std::lock_guard lock(mx_); + if (!fileName_.empty() && fileName_[0] == '/') { // Path is absolute return as is return fileName_; @@ -185,6 +194,8 @@ std::string Log::path() const { } std::string Log::contents(int get_last_n_lines) { + std::lock_guard lock(mx_); + if (get_last_n_lines == 0) { return string(); } @@ -200,6 +211,8 @@ std::string Log::contents(int get_last_n_lines) { } std::string Log::handle_write_failure() { + std::lock_guard lock(mx_); + std::string msg = logImpl_->log_open_error(); if (msg.empty()) { msg += "\nFailed to write to log file: "; @@ -213,22 +226,20 @@ std::string Log::handle_write_failure() { logImpl_.reset(); create_logimpl(); - if (logImpl_->log_open_error().empty()) + if (logImpl_->log_open_error().empty()) { msg += "\nAttempting to close/reopen log file."; - else + } + else { msg += "\nAttempting to close/reopen log file did not work!"; + } - if (LogToCout::ok()) + if (LogToCout::ok()) { Indentor::indent(cout) << msg << '\n'; + } return msg; } bool log(Log::LogType lt, const std::string& message) { - // For debug of simulator enable this - // if (LogToCout::ok()) { - // Indentor::indent(cout) << message << '\n'; - // } - if (Log::instance()) { return Log::instance()->log(lt, message); } @@ -367,8 +378,9 @@ bool LogImpl::do_log(Log::LogType lt, const std::string& message, bool newline) // XXX:[HH:MM:SS D.M.YYYY] chd:fullname [+additional information] // XXX:[HH:MM:SS D.M.YYYY] -- [+additional information] - if (time_stamp_.empty() || lt == Log::ERR || lt == Log::WAR || lt == Log::DBG) + if (time_stamp_.empty() || lt == Log::ERR || lt == Log::WAR || lt == Log::DBG) { create_time_stamp(); + } // re-use memory allocated to log_type_and_time_stamp_ log_type_and_time_stamp_.clear(); @@ -377,8 +389,9 @@ bool LogImpl::do_log(Log::LogType lt, const std::string& message, bool newline) if (message.find("\n") == std::string::npos) { file_ << log_type_and_time_stamp_ << message; - if (newline) + if (newline) { file_ << '\n'; + } } else { // If message has \n then split into multiple lines diff --git a/libs/core/src/ecflow/core/Log.hpp b/libs/core/src/ecflow/core/Log.hpp index bf6f228a9..b88c66897 100644 --- a/libs/core/src/ecflow/core/Log.hpp +++ b/libs/core/src/ecflow/core/Log.hpp @@ -35,6 +35,7 @@ #include #include +#include #include #include @@ -121,6 +122,8 @@ class Log { std::unique_ptr logImpl_; std::string fileName_; std::string log_error_; + + mutable std::recursive_mutex mx_; }; // Flush log on destruction diff --git a/libs/node/src/ecflow/node/AvisoAttr.cpp b/libs/node/src/ecflow/node/AvisoAttr.cpp index a5c5eb369..321aebac5 100644 --- a/libs/node/src/ecflow/node/AvisoAttr.cpp +++ b/libs/node/src/ecflow/node/AvisoAttr.cpp @@ -93,6 +93,14 @@ void AvisoAttr::reset() { } } +void AvisoAttr::reload() { + if (controller_) { + state_change_no_ = Ecf::incr_state_change_no(); + finish(); + start(); + } +} + bool AvisoAttr::isFree() const { if (controller_ == nullptr) { diff --git a/libs/node/src/ecflow/node/AvisoAttr.hpp b/libs/node/src/ecflow/node/AvisoAttr.hpp index 4c8e33245..aa11e392b 100644 --- a/libs/node/src/ecflow/node/AvisoAttr.hpp +++ b/libs/node/src/ecflow/node/AvisoAttr.hpp @@ -56,6 +56,8 @@ class AvisoAttr { static constexpr const char* default_polling = "%ECF_AVISO_POLLING%"; static constexpr const char* default_auth = "%ECF_AVISO_AUTH%"; + static constexpr const char* reload_option_value = "reload"; + static bool is_valid_name(const std::string& name); /** @@ -98,8 +100,19 @@ class AvisoAttr { bool why(std::string& theReasonWhy) const; + /** + * Initialises the Aviso procedure, which effectively starts the background polling mechanism. + * Typically, called when traversing the tree -- does nothing if Aviso service is already set up. + */ void reset(); + /** + * Restarts the Aviso procedure, which effectively stops before restarting the background polling mechanism. + * Typicallly, called explicitly via Alter command -- forces the reinitialisation of the Aviso service, + * guaranteeing that parameters, given as ECF variables, are reevaluated. + */ + void reload(); + [[nodiscard]] bool isFree() const; void start() const; diff --git a/libs/node/src/ecflow/node/MirrorAttr.cpp b/libs/node/src/ecflow/node/MirrorAttr.cpp index bce3abbcd..4cfedfb0e 100644 --- a/libs/node/src/ecflow/node/MirrorAttr.cpp +++ b/libs/node/src/ecflow/node/MirrorAttr.cpp @@ -74,7 +74,16 @@ void MirrorAttr::reset() { start_controller(); } +void MirrorAttr::reload() { + if (controller_) { + state_change_no_ = Ecf::incr_state_change_no(); + stop_controller(); + start_controller(); + } +} + void MirrorAttr::finish() { + state_change_no_ = Ecf::incr_state_change_no(); stop_controller(); } @@ -190,7 +199,7 @@ std::string MirrorAttr::resolve_cfg(const std::string& value, } void MirrorAttr::start_controller() { - if (controller_ == nullptr) { + if (!controller_) { // Resolve variables in configuration // In the case of the 'remote_host', we have to resolve the configuration @@ -217,7 +226,8 @@ void MirrorAttr::start_controller() { SLOG(D, "MirrorAttr: start polling Mirror attribute '" << absolute_name() << "', from " << remote_path_ << " @ " - << remote_host << ':' << remote_port << ")"); + << remote_host << ':' << remote_port << ") using polling: " + << polling << " s"); std::uint32_t polling_value; try { @@ -247,7 +257,7 @@ void MirrorAttr::start_controller() { } void MirrorAttr::stop_controller() { - if (controller_ != nullptr) { + if (controller_) { SLOG(D, "MirrorAttr: finishing polling for Mirror attribute \"" << parent_->absNodePath() << ":" << name_ << "\", from host: " << remote_host_ diff --git a/libs/node/src/ecflow/node/MirrorAttr.hpp b/libs/node/src/ecflow/node/MirrorAttr.hpp index 45786fa7f..199bc73a3 100644 --- a/libs/node/src/ecflow/node/MirrorAttr.hpp +++ b/libs/node/src/ecflow/node/MirrorAttr.hpp @@ -59,6 +59,8 @@ class MirrorAttr { static constexpr const char* fallback_polling = "120"; static constexpr const char* fallback_remote_auth = ""; + static constexpr const char* reload_option_value = "reload"; + static bool is_valid_name(const std::string& name); /** @@ -104,8 +106,15 @@ class MirrorAttr { /** * Initialises the Mirror procedure, which effectively starts the background polling mechanism. + * Typically, called when traversing the tree -- does nothing if Mirror service is already set up. */ void reset(); + /** + * Restarts the Mirror procedure, which effectively stops before restarting the background polling mechanism. + * Typicallly, called explicitly via Alter command -- forces the reinitialisation of the Mirror service, + * guaranteeing that parameters, given as ECF variables, are reevaluated. + */ + void reload(); void finish(); /** diff --git a/libs/node/src/ecflow/node/NodeChange.cpp b/libs/node/src/ecflow/node/NodeChange.cpp index 8abbe8e4d..6f31e9c4f 100644 --- a/libs/node/src/ecflow/node/NodeChange.cpp +++ b/libs/node/src/ecflow/node/NodeChange.cpp @@ -172,7 +172,11 @@ void Node::changeAviso(const std::string& name, const std::string& value) { throw std::runtime_error("Node::changeAviso: Could not find aviso " + name); } - *found = AvisoParser::parse_aviso_line(value, name); + if (value == AvisoAttr::reload_option_value) { + found->reload(); + } else { + *found = AvisoParser::parse_aviso_line(value, name); + } state_change_no_ = Ecf::incr_state_change_no(); } @@ -201,11 +205,15 @@ void Node::changeMirror(const std::string& name, const std::string& value) { throw std::runtime_error("Node::changeMirror: Could not find mirror " + name); } - auto attr = MirrorParser::parse_mirror_line(value, name, this); + if (value == MirrorAttr::reload_option_value) { + found->reload(); + } else { + auto attr = MirrorParser::parse_mirror_line(value, name, this); - // The following delete/add enforces the reconfiguration of the attribute backend thread - this->deleteMirror(name); // delete the mirror if it exists (to avoid duplicates - this->addMirror(attr); + // The following delete/add enforces the reconfiguration of the attribute backend thread + this->deleteMirror(name); // delete the mirror if it exists (to avoid duplicates + this->addMirror(attr); + } state_change_no_ = Ecf::incr_state_change_no(); } diff --git a/libs/server/src/ecflow/server/BaseServer.cpp b/libs/server/src/ecflow/server/BaseServer.cpp index 40370a826..3ce8e9fe7 100644 --- a/libs/server/src/ecflow/server/BaseServer.cpp +++ b/libs/server/src/ecflow/server/BaseServer.cpp @@ -323,8 +323,9 @@ void BaseServer::shutdown() { /// RUNNING yes yes yes yes /// SHUTDOWN yes yes no yes /// HALTED yes no no no - if (serverEnv_.debug()) + if (serverEnv_.debug()) { cout << " BaseServer::shutdown. Stop Scheduling new jobs only" << endl; + } // Stop server from creating new jobs. Don't stop the checkPtSaver_ since // the jobs communication with server can still change state. Which we want @@ -336,6 +337,9 @@ void BaseServer::shutdown() { // If we go from HALTED --> SHUTDOWN, then check pointing needs to be enabled checkPtSaver_.start(); + // Stop all Mirror/Aviso attributes (i.e. background threads are stopped) + ecf::visit_all(*defs_, ShutdownDefs{}); + // Will update defs as well to stop job scheduling set_server_state(SState::SHUTDOWN); } @@ -345,10 +349,11 @@ void BaseServer::halted() { /// RUNNING yes yes yes yes /// SHUTDOWN yes yes no yes /// HALTED yes no no no - if (serverEnv_.debug()) + if (serverEnv_.debug()) { cout << " BaseServer::halted. Stop Scheduling new jobs *and* block task communication. Stop check pointing. " "Only accept user request" << endl; + } // Stop server from creating new jobs. i.e Job scheduling. traverser_.stop(); @@ -360,6 +365,7 @@ void BaseServer::halted() { // Added after discussion with Axel. checkPtSaver_.stop(); + // Stop all Mirror/Aviso attributes (i.e. background threads are stopped) ecf::visit_all(*defs_, ShutdownDefs{}); // Stop the task communication with server. Hence nodes can be stuck @@ -374,8 +380,9 @@ void BaseServer::restart() { /// RUNNING yes yes yes yes /// SHUTDOWN yes yes no yes /// HALTED yes no no no - if (serverEnv_.debug()) + if (serverEnv_.debug()) { std::cout << " BaseServer::restart" << endl; + } // The server state *MUST* be set, *before* traverser_.start(), since that can kick off job traversal. // Job Scheduling can only be done under RUNNING state, hence must be before traverser_.start();