diff --git a/libs/node/CMakeLists.txt b/libs/node/CMakeLists.txt index c75451c69..ad13d7558 100644 --- a/libs/node/CMakeLists.txt +++ b/libs/node/CMakeLists.txt @@ -47,6 +47,7 @@ set(srcs src/ecflow/node/NodeState.hpp src/ecflow/node/NodeStats.hpp src/ecflow/node/NodeTreeVisitor.hpp + src/ecflow/node/Operations.hpp src/ecflow/node/ResolveExternsVisitor.hpp src/ecflow/node/ServerState.hpp src/ecflow/node/Signal.hpp diff --git a/libs/node/src/ecflow/node/AvisoAttr.cpp b/libs/node/src/ecflow/node/AvisoAttr.cpp index 4e01e0b61..ac1265c11 100644 --- a/libs/node/src/ecflow/node/AvisoAttr.cpp +++ b/libs/node/src/ecflow/node/AvisoAttr.cpp @@ -136,6 +136,15 @@ void AvisoAttr::start() const { } auto polling = boost::lexical_cast(aviso_polling); + start_controller(aviso_path, aviso_listener, aviso_url, aviso_schema, polling); +} + +void AvisoAttr::start_controller(const std::string& aviso_path, + const std::string& aviso_listener, + const std::string& aviso_url, + const std::string& aviso_schema, + std::uint32_t polling) const { + // Controller -- start up the Aviso controller, and subscribe the Aviso listener controller_ = std::make_shared(); controller_->subscribe(ecf::service::aviso::AvisoRequest::make_listen_start( @@ -145,16 +154,22 @@ void AvisoAttr::start() const { controller_->start(); } +void AvisoAttr::stop_controller(const std::string& aviso_path) const { + if (controller_ != nullptr) { + controller_->unsubscribe(ecf::service::aviso::AvisoRequest::make_listen_finish(aviso_path)); + + // Controller -- shutdown up the Aviso controller + controller_->stop(); + controller_ = nullptr; + } +} + void AvisoAttr::finish() const { using namespace ecf; LOG(Log::DBG, Message("**** Unsubscribe Aviso attribute (name: ", name_, ", listener: ", listener_, ")")); std::string aviso_path = path(); - controller_->unsubscribe(ecf::service::aviso::AvisoRequest::make_listen_finish(aviso_path)); - - // Controller -- shutdown up the Aviso controller - controller_->stop(); - controller_ = nullptr; + stop_controller(aviso_path); } } // namespace ecf diff --git a/libs/node/src/ecflow/node/AvisoAttr.hpp b/libs/node/src/ecflow/node/AvisoAttr.hpp index bd99857b8..b2d13b9cf 100644 --- a/libs/node/src/ecflow/node/AvisoAttr.hpp +++ b/libs/node/src/ecflow/node/AvisoAttr.hpp @@ -69,6 +69,7 @@ class AvisoAttr { AvisoAttr& operator=(const AvisoAttr& rhs) = default; + [[nodiscard]] inline Node* parent() const { return parent_; } [[nodiscard]] inline const std::string& name() const { return name_; } [[nodiscard]] inline const std::string& listener() const { return listener_; } [[nodiscard]] inline const std::string& url() const { return url_; } @@ -95,6 +96,13 @@ class AvisoAttr { friend void serialize(Archive& ar, AvisoAttr& aviso, std::uint32_t version); private: + void start_controller(const std::string& aviso_path, + const std::string& aviso_listener, + const std::string& aviso_url, + const std::string& aviso_schema, + std::uint32_t polling) const; + void stop_controller(const std::string& aviso_path) const; + Node* parent_{nullptr}; // only ever used on the server side, to access parent Node variables path_t parent_path_; name_t name_; diff --git a/libs/node/src/ecflow/node/Defs.cpp b/libs/node/src/ecflow/node/Defs.cpp index 4373d80ce..827fc964d 100644 --- a/libs/node/src/ecflow/node/Defs.cpp +++ b/libs/node/src/ecflow/node/Defs.cpp @@ -123,12 +123,6 @@ Defs::~Defs() { ExprDuplicate reclaim_cloned_ast_memory; } -void Defs::poke() { - for (const auto& suite : suiteVec_) { - suite->poke(); - } -} - void Defs::handle_migration() { // Fix up any migration issues. Remove when in Bolgna, and ecflow4 no longer used for (const auto& s : suiteVec_) { @@ -412,7 +406,7 @@ bool Defs::verification(std::string& errorMsg) const { suite_ptr Defs::add_suite(const std::string& name) { if (findSuite(name).get()) { std::stringstream ss; - ss << "Add Suite failed: A Suite of name '" << name << "' already exist"; + ss << "Add Suite failed: A Suite of name '" << name << "' already exists"; throw std::runtime_error(ss.str()); } suite_ptr the_suite = Suite::create(name); @@ -423,7 +417,7 @@ suite_ptr Defs::add_suite(const std::string& name) { void Defs::addSuite(const suite_ptr& s, size_t position) { if (findSuite(s->name()).get()) { std::stringstream ss; - ss << "Add Suite failed: A Suite of name '" << s->name() << "' already exist"; + ss << "Add Suite failed: A Suite of name '" << s->name() << "' already exists"; throw std::runtime_error(ss.str()); } add_suite_only(s, position); diff --git a/libs/node/src/ecflow/node/Defs.hpp b/libs/node/src/ecflow/node/Defs.hpp index f09ca8414..588a1ca16 100644 --- a/libs/node/src/ecflow/node/Defs.hpp +++ b/libs/node/src/ecflow/node/Defs.hpp @@ -36,6 +36,7 @@ #include "ecflow/node/Flag.hpp" #include "ecflow/node/NodeFwd.hpp" #include "ecflow/node/ServerState.hpp" +#include "ecflow/node/Suite.hpp" namespace cereal { class access; @@ -56,8 +57,6 @@ class Defs { ~Defs(); - void poke(); - void copy_defs_state_only(const defs_ptr& defs); // needed when creating defs for client handles bool operator==(const Defs& rhs) const; void print(std::string&) const; diff --git a/libs/node/src/ecflow/node/Family.cpp b/libs/node/src/ecflow/node/Family.cpp index 087b11ec8..48e22f2db 100644 --- a/libs/node/src/ecflow/node/Family.cpp +++ b/libs/node/src/ecflow/node/Family.cpp @@ -49,10 +49,6 @@ Family::~Family() { delete fam_gen_variables_; } -void Family::poke() { - NodeContainer::poke(); -} - family_ptr Family::create(const std::string& name, bool check) { return std::make_shared(name, check); } diff --git a/libs/node/src/ecflow/node/Family.hpp b/libs/node/src/ecflow/node/Family.hpp index ed8b7fd45..2650fe5b0 100644 --- a/libs/node/src/ecflow/node/Family.hpp +++ b/libs/node/src/ecflow/node/Family.hpp @@ -25,8 +25,6 @@ class Family final : public NodeContainer { ~Family() override; - void poke() override; - static family_ptr create(const std::string& name, bool check = true); static family_ptr create_me(const std::string& name); // python api, to pick correct init function diff --git a/libs/node/src/ecflow/node/Jobs.cpp b/libs/node/src/ecflow/node/Jobs.cpp index e2662d9ff..bbeb211b2 100644 --- a/libs/node/src/ecflow/node/Jobs.cpp +++ b/libs/node/src/ecflow/node/Jobs.cpp @@ -14,6 +14,7 @@ #include "ecflow/core/Log.hpp" #include "ecflow/node/Defs.hpp" #include "ecflow/node/JobsParam.hpp" +#include "ecflow/node/Operations.hpp" #include "ecflow/node/Signal.hpp" #include "ecflow/node/Suite.hpp" #include "ecflow/node/SuiteChanged.hpp" @@ -58,7 +59,7 @@ bool Jobs::generate(JobsParam& jobsParam) const { for (const suite_ptr& suite : suites) { // SuiteChanged moved into Suite::resolveDependencies. // This ensures the fast path and when suite are not begun we save a ctor/dtor call - suite->poke(); + ecf::visit(*suite, ActivateAll{}); (void)suite->resolveDependencies(jobsParam); } } diff --git a/libs/node/src/ecflow/node/MirrorAttr.cpp b/libs/node/src/ecflow/node/MirrorAttr.cpp index eebc67c93..ac746bb38 100644 --- a/libs/node/src/ecflow/node/MirrorAttr.cpp +++ b/libs/node/src/ecflow/node/MirrorAttr.cpp @@ -42,38 +42,10 @@ MirrorAttr::MirrorAttr(Node* parent, } MirrorAttr::~MirrorAttr() { - if (controller_ != nullptr) { - controller_->stop(); - controller_.reset(); - } -} - -void MirrorAttr::poke() { - ALOG(D, "**** Check Mirror attribute (name: " << name_ << ")"); - - start_controller(); - - // Task associated with Attribute is free when any notification is found - auto notifications = controller_->poll_notifications(remote_path_); - - if (notifications.empty()) { - // No notifications, nothing to do... - return; - } - - // Notifications found -- Node state to be updated - ALOG(D, "MirrorAttr::isFree: found notifications for Mirror attribute (name: " << name_ << ")"); - - auto latest_state = static_cast(notifications.back().status); - parent_->setStateOnly(latest_state, true); - parent_->handleStateChange(); + stop_controller(); } bool MirrorAttr::why(std::string& theReasonWhy) const { - if (isFree()) { - return false; - } - theReasonWhy += ecf::Message(" is a Mirror of ", remote_path(), " at '", remote_host(), ":", remote_port(), "'"); return true; } @@ -88,21 +60,31 @@ void MirrorAttr::reset() { start_controller(); } -bool MirrorAttr::isFree() const { +void MirrorAttr::finish() { - LOG(Log::MSG, "**** Check Mirror attribute (name: " << name_ << ")"); + ALOG(D, + "MirrorAttr::reset: start polling for Mirror attribute (name: " << name_ << ", host: " << remote_host_ + << ", port: " << remote_port_ << ")"); + + stop_controller(); +} + +void MirrorAttr::mirror() { + ALOG(D, "**** Check Mirror attribute (name: " << name_ << ")"); start_controller(); - return true; -} + // Task associated with Attribute is free when any notification is found + if (auto notifications = controller_->poll_notifications(remote_path_); !notifications.empty()) { + // Notifications found -- Node state to be updated + ALOG(D, "MirrorAttr::isFree: found notifications for Mirror attribute (name: " << name_ << ")"); -void MirrorAttr::start() const { - // Nothing do do... -} + auto latest_state = static_cast(notifications.back().status); + parent_->setStateOnly(latest_state, true); + parent_->handleStateChange(); + } -void MirrorAttr::finish() const { - // Nothing do do... + // No notifications, nothing to do... } void MirrorAttr::start_controller() const { @@ -129,4 +111,15 @@ void MirrorAttr::start_controller() const { } } +void MirrorAttr::stop_controller() const { + if (controller_ != nullptr) { + ALOG(D, + "MirrorAttr::reset: stop polling for Mirror attribute (name: " << name_ << ", host: " << remote_host_ + << ", port: " << remote_port_ << ")"); + + controller_->stop(); + controller_.reset(); + } +} + } // namespace ecf diff --git a/libs/node/src/ecflow/node/MirrorAttr.hpp b/libs/node/src/ecflow/node/MirrorAttr.hpp index 91e98601e..4d9c3454b 100644 --- a/libs/node/src/ecflow/node/MirrorAttr.hpp +++ b/libs/node/src/ecflow/node/MirrorAttr.hpp @@ -64,8 +64,6 @@ class MirrorAttr { MirrorAttr& operator=(const MirrorAttr& rhs) = default; - void poke(); - [[nodiscard]] inline const std::string& name() const { return name_; } [[nodiscard]] inline const std::string& remote_path() const { return remote_path_; } [[nodiscard]] inline const std::string& remote_host() const { return remote_host_; } @@ -78,18 +76,23 @@ class MirrorAttr { bool why(std::string& theReasonWhy) const; + /** + * Initialises the Mirror procedure, which effectively starts the background polling mechanism. + */ void reset(); + void finish(); - [[nodiscard]] bool isFree() const; - - void start() const; - void finish() const; + /** + * Check if state changes were detected by the background polling mechanism, and if so, reflect it on the Node. + */ + void mirror(); template friend void serialize(Archive& ar, MirrorAttr& aviso, std::uint32_t version); private: void start_controller() const; + void stop_controller() const; Node* parent_{nullptr}; // only ever used on the server side, to update parent Node state name_t name_; diff --git a/libs/node/src/ecflow/node/Node.cpp b/libs/node/src/ecflow/node/Node.cpp index 087449fa8..fe4365e24 100644 --- a/libs/node/src/ecflow/node/Node.cpp +++ b/libs/node/src/ecflow/node/Node.cpp @@ -209,12 +209,6 @@ Node& Node::operator=(const Node& rhs) { Node::~Node() = default; -void Node::poke() { - for (auto& mirror : mirrors_) { - mirror.poke(); - } -} - bool Node::isParentSuspended() const { Node* theParent = parent(); if (theParent) { @@ -758,6 +752,11 @@ bool Node::resolveDependencies(JobsParam& jobsParam) { return false; } + if (!mirrors_.empty()) { + // In case mirror attributes are configured, the node is never free (i.e. will not be run) + return false; + } + if (!timeDependenciesFree()) { #ifdef DEBUG_DEPENDENCIES const Calendar& calendar = suite()->calendar(); diff --git a/libs/node/src/ecflow/node/Node.hpp b/libs/node/src/ecflow/node/Node.hpp index 36de1a720..ad6138271 100644 --- a/libs/node/src/ecflow/node/Node.hpp +++ b/libs/node/src/ecflow/node/Node.hpp @@ -73,8 +73,6 @@ class Node : public std::enable_shared_from_this { Node(const Node& rhs); virtual ~Node(); - virtual void poke(); - virtual bool check_defaults() const; // parse string and create suite || family || task || alias. Can return a NULL node_ptr() for errors @@ -397,7 +395,11 @@ class Node : public std::enable_shared_from_this { const std::vector& dates() const { return dates_; } const std::vector& days() const { return days_; } const std::vector& crons() const { return crons_; } + + std::vector& avisos() { return avisos_; } const std::vector& avisos() const { return avisos_; } + + std::vector& mirrors() { return mirrors_; } const std::vector& mirrors() const { return mirrors_; } const std::vector& verifys() const; diff --git a/libs/node/src/ecflow/node/NodeContainer.cpp b/libs/node/src/ecflow/node/NodeContainer.cpp index f68529275..8363d3f3e 100644 --- a/libs/node/src/ecflow/node/NodeContainer.cpp +++ b/libs/node/src/ecflow/node/NodeContainer.cpp @@ -83,12 +83,6 @@ NodeContainer& NodeContainer::operator=(const NodeContainer& rhs) { NodeContainer::~NodeContainer() = default; -void NodeContainer::poke() { - for (const auto& n : nodes_) { - n->poke(); - } -} - bool NodeContainer::check_defaults() const { if (order_state_change_no_ != 0) throw std::runtime_error("NodeContainer::check_defaults(): order_state_change_no_ != 0"); diff --git a/libs/node/src/ecflow/node/NodeContainer.hpp b/libs/node/src/ecflow/node/NodeContainer.hpp index 9c947555a..f2efdfe30 100644 --- a/libs/node/src/ecflow/node/NodeContainer.hpp +++ b/libs/node/src/ecflow/node/NodeContainer.hpp @@ -29,8 +29,6 @@ class NodeContainer : public Node { NodeContainer(); ~NodeContainer() override; - void poke() override; - bool check_defaults() const override; void accept(ecf::NodeTreeVisitor&) override; @@ -67,6 +65,7 @@ class NodeContainer : public Node { void addFamily(const family_ptr&, size_t position = std::numeric_limits::max()); void add_child(const node_ptr&, size_t position = std::numeric_limits::max()); + const std::vector& children() const { return nodes_; } void immediateChildren(std::vector&) const override; void allChildren(std::vector&) const override; diff --git a/libs/node/src/ecflow/node/Operations.hpp b/libs/node/src/ecflow/node/Operations.hpp new file mode 100644 index 000000000..946592348 --- /dev/null +++ b/libs/node/src/ecflow/node/Operations.hpp @@ -0,0 +1,171 @@ +/* + * Copyright 2009- ECMWF. + * + * This software is licensed under the terms of the Apache Licence version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation + * nor does it submit to any jurisdiction. + */ + +#ifndef ecflow_node_Operations_HPP +#define ecflow_node_Operations_HPP + +#include "ecflow/node/Alias.hpp" +#include "ecflow/node/AvisoAttr.hpp" +#include "ecflow/node/Defs.hpp" +#include "ecflow/node/Family.hpp" +#include "ecflow/node/MirrorAttr.hpp" +#include "ecflow/node/Suite.hpp" +#include "ecflow/node/Task.hpp" + +namespace ecf { + +/** + * BootstrapDefs, traverses the Node tree when the the server (re)starts or when a new suite is loaded, + * and is used to bootstrap all required nodes and attributes. + */ +struct BootstrapDefs +{ + inline void operator()(AvisoAttr& attr) const { + if (attr.parent()->state() == NState::QUEUED) { + attr.start(); + } + } + inline void operator()(MirrorAttr& attr) const { attr.mirror(); } + + template + void operator()(T&& t) const { /* Nothing to do... */ } +}; + +/** + * ShutdownDefs, traverses the Node tree when the the server shutsdown/halts,, + * and is used to shutdown all required nodes and attributes. + */ +struct ShutdownDefs +{ + inline void operator()(AvisoAttr& attr) const { attr.finish(); } + inline void operator()(MirrorAttr& attr) const { attr.finish(); } + + template + void operator()(T&& t) const { /* Nothing to do... */ } +}; + +/** + * ActivateAll, traverses the Node tree periodicablly, and effectively triggers the synchronization between the + * main and background threads. + */ +struct ActivateAll +{ + inline void operator()(MirrorAttr& attr) const { attr.mirror(); } + + template + void operator()(T&& t) const { /* Nothing to do... */ } +}; + +namespace detail { + +template +void visit_all(const std::vector>& all, V&& visitor) { + for (auto& item : all) { + visit(*item, std::forward(visitor)); + } +} + +template +void visit_attrs(std::vector& all, V&& visitor) { + for (auto& i : all) { + visit(i, std::forward(visitor)); + } +} + +template +struct Visitor +{ + template + void operator()(V&& v) { + v(item_); + } + + I& item_; +}; + +template <> +struct Visitor +{ + template + void operator()(V&& v) { + v(task_); + visit_attrs(task_.avisos(), std::forward(v)); + visit_attrs(task_.mirrors(), std::forward(v)); + } + + Task& task_; +}; + +template <> +struct Visitor +{ + template + void operator()(V&& v) { + v(family_); + visit_all(family_.children(), std::forward(v)); + } + + Family& family_; +}; + +template <> +struct Visitor +{ + template + void operator()(V&& v) { + if (auto* family_ptr = dynamic_cast(&node_)) { + visit(*family_ptr, std::forward(v)); + } + else if (auto* task_ptr = dynamic_cast(&node_)) { + visit(*task_ptr, std::forward(v)); + } + if (auto* alias_ptr = dynamic_cast(&node_)) { + visit(*alias_ptr, std::forward(v)); + } + } + + Node& node_; +}; + +template <> +struct Visitor +{ + template + void operator()(V&& v) { + v(*this); + visit_all(suite_.children(), std::forward(v)); + } + + Suite& suite_; +}; + +template <> +struct Visitor +{ + template + void operator()(V&& v) { + v(*this); + + visit_all(defs_.suiteVec(), std::forward(v)); + } + + Defs& defs_; +}; + +} // namespace detail + +template +void visit(I& item, V&& visitor) { + detail::Visitor{item}(std::forward(visitor)); +} + +} // namespace ecf + +#endif // ecflow_node_Operations_HPP diff --git a/libs/node/src/ecflow/node/Submittable.cpp b/libs/node/src/ecflow/node/Submittable.cpp index 0d37c020d..8f615b9db 100644 --- a/libs/node/src/ecflow/node/Submittable.cpp +++ b/libs/node/src/ecflow/node/Submittable.cpp @@ -68,10 +68,6 @@ Submittable::~Submittable() { delete sub_gen_variables_; } -void Submittable::poke() { - Node::poke(); -} - bool Submittable::check_defaults() const { if (tryNo_ != 0) throw std::runtime_error("Submittable::check_defaults(): tryNo_ != 0"); diff --git a/libs/node/src/ecflow/node/Submittable.hpp b/libs/node/src/ecflow/node/Submittable.hpp index 2010154b8..e04380f07 100644 --- a/libs/node/src/ecflow/node/Submittable.hpp +++ b/libs/node/src/ecflow/node/Submittable.hpp @@ -35,8 +35,6 @@ class Submittable : public Node { public: ~Submittable() override; - void poke() override; - bool check_defaults() const override; /// Initialise the task. will set the state to NState::ACTIVE diff --git a/libs/node/src/ecflow/node/Suite.cpp b/libs/node/src/ecflow/node/Suite.cpp index ddb898ffe..c1aaca538 100644 --- a/libs/node/src/ecflow/node/Suite.cpp +++ b/libs/node/src/ecflow/node/Suite.cpp @@ -80,10 +80,6 @@ Suite::~Suite() { delete suite_gen_variables_; } -void Suite::poke() { - NodeContainer::poke(); -} - suite_ptr Suite::create(const std::string& name, bool check) { return std::make_shared(name, check); } diff --git a/libs/node/src/ecflow/node/Suite.hpp b/libs/node/src/ecflow/node/Suite.hpp index bc02fd6fe..a7a30d82d 100644 --- a/libs/node/src/ecflow/node/Suite.hpp +++ b/libs/node/src/ecflow/node/Suite.hpp @@ -31,8 +31,6 @@ class Suite final : public NodeContainer { ~Suite() override; - void poke() override; - static suite_ptr create(const std::string& name, bool check = true); static suite_ptr create_me(const std::string& name); // python api, to pick correct init function diff --git a/libs/node/src/ecflow/node/Task.cpp b/libs/node/src/ecflow/node/Task.cpp index 9959ac5b2..ff15d6d2f 100644 --- a/libs/node/src/ecflow/node/Task.cpp +++ b/libs/node/src/ecflow/node/Task.cpp @@ -85,10 +85,6 @@ Task::~Task() { } } -void Task::poke() { - Submittable::poke(); -} - task_ptr Task::create(const std::string& name, bool check) { return std::make_shared(name, check); } diff --git a/libs/node/src/ecflow/node/Task.hpp b/libs/node/src/ecflow/node/Task.hpp index 297658e0a..af35ebe26 100644 --- a/libs/node/src/ecflow/node/Task.hpp +++ b/libs/node/src/ecflow/node/Task.hpp @@ -22,8 +22,6 @@ class Task final : public Submittable { node_ptr clone() const override; ~Task() override; - void poke() override; - bool check_defaults() const override; static task_ptr create(const std::string& name, bool check = true); diff --git a/libs/server/src/ecflow/server/BaseServer.cpp b/libs/server/src/ecflow/server/BaseServer.cpp index 352bb3ba1..0b6065d57 100644 --- a/libs/server/src/ecflow/server/BaseServer.cpp +++ b/libs/server/src/ecflow/server/BaseServer.cpp @@ -22,6 +22,7 @@ #include "ecflow/core/Version.hpp" #include "ecflow/node/Defs.hpp" #include "ecflow/node/ExprDuplicate.hpp" +#include "ecflow/node/Operations.hpp" #include "ecflow/node/System.hpp" #include "ecflow/server/ServerEnvironment.hpp" #include "ecflow/service/Registry.hpp" @@ -251,6 +252,10 @@ void BaseServer::updateDefs(defs_ptr defs, bool force) { defs_->set_most_significant_state(); LOG_ASSERT(defs_->server().jobSubmissionInterval() != 0, ""); + + if (serverState_ == SState::RUNNING) { + ecf::visit(*defs_, BootstrapDefs{}); + } } void BaseServer::clear_defs() { @@ -355,6 +360,8 @@ void BaseServer::halted() { // Added after discussion with Axel. checkPtSaver_.stop(); + ecf::visit(*defs_, ShutdownDefs{}); + // Stop the task communication with server. Hence nodes can be stuck // in submitted/active states. Task based command will continue attempting, // communication with the server for up to 24hrs. @@ -380,8 +387,7 @@ void BaseServer::restart() { traverser_.start(); checkPtSaver_.start(); - // Bootstrap defs - defs_->poke(); + ecf::visit(*defs_, BootstrapDefs{}); } void BaseServer::traverse_node_tree_and_job_generate(const boost::posix_time::ptime& time_now,