Skip to content

Commit

Permalink
Refactor custom traversal mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosbento committed May 16, 2024
1 parent fcd01c1 commit da078d8
Show file tree
Hide file tree
Showing 22 changed files with 263 additions and 102 deletions.
1 change: 1 addition & 0 deletions libs/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ set(srcs
src/ecflow/node/NodeState.hpp
src/ecflow/node/NodeStats.hpp
src/ecflow/node/NodeTreeVisitor.hpp
src/ecflow/node/Operations.hpp
src/ecflow/node/ResolveExternsVisitor.hpp
src/ecflow/node/ServerState.hpp
src/ecflow/node/Signal.hpp
Expand Down
25 changes: 20 additions & 5 deletions libs/node/src/ecflow/node/AvisoAttr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,15 @@ void AvisoAttr::start() const {
}
auto polling = boost::lexical_cast<std::uint32_t>(aviso_polling);

start_controller(aviso_path, aviso_listener, aviso_url, aviso_schema, polling);
}

void AvisoAttr::start_controller(const std::string& aviso_path,
const std::string& aviso_listener,
const std::string& aviso_url,
const std::string& aviso_schema,
std::uint32_t polling) const {

// Controller -- start up the Aviso controller, and subscribe the Aviso listener
controller_ = std::make_shared<controller_t>();
controller_->subscribe(ecf::service::aviso::AvisoRequest::make_listen_start(
Expand All @@ -145,16 +154,22 @@ void AvisoAttr::start() const {
controller_->start();
}

void AvisoAttr::stop_controller(const std::string& aviso_path) const {
if (controller_ != nullptr) {
controller_->unsubscribe(ecf::service::aviso::AvisoRequest::make_listen_finish(aviso_path));

// Controller -- shutdown up the Aviso controller
controller_->stop();
controller_ = nullptr;
}
}

void AvisoAttr::finish() const {
using namespace ecf;
LOG(Log::DBG, Message("**** Unsubscribe Aviso attribute (name: ", name_, ", listener: ", listener_, ")"));

std::string aviso_path = path();
controller_->unsubscribe(ecf::service::aviso::AvisoRequest::make_listen_finish(aviso_path));

// Controller -- shutdown up the Aviso controller
controller_->stop();
controller_ = nullptr;
stop_controller(aviso_path);
}

} // namespace ecf
8 changes: 8 additions & 0 deletions libs/node/src/ecflow/node/AvisoAttr.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class AvisoAttr {

AvisoAttr& operator=(const AvisoAttr& rhs) = default;

[[nodiscard]] inline Node* parent() const { return parent_; }
[[nodiscard]] inline const std::string& name() const { return name_; }
[[nodiscard]] inline const std::string& listener() const { return listener_; }
[[nodiscard]] inline const std::string& url() const { return url_; }
Expand All @@ -95,6 +96,13 @@ class AvisoAttr {
friend void serialize(Archive& ar, AvisoAttr& aviso, std::uint32_t version);

private:
void start_controller(const std::string& aviso_path,
const std::string& aviso_listener,
const std::string& aviso_url,
const std::string& aviso_schema,
std::uint32_t polling) const;
void stop_controller(const std::string& aviso_path) const;

Node* parent_{nullptr}; // only ever used on the server side, to access parent Node variables
path_t parent_path_;
name_t name_;
Expand Down
10 changes: 2 additions & 8 deletions libs/node/src/ecflow/node/Defs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,6 @@ Defs::~Defs() {
ExprDuplicate reclaim_cloned_ast_memory;
}

void Defs::poke() {
for (const auto& suite : suiteVec_) {
suite->poke();
}
}

void Defs::handle_migration() {
// Fix up any migration issues. Remove when in Bolgna, and ecflow4 no longer used
for (const auto& s : suiteVec_) {
Expand Down Expand Up @@ -412,7 +406,7 @@ bool Defs::verification(std::string& errorMsg) const {
suite_ptr Defs::add_suite(const std::string& name) {
if (findSuite(name).get()) {
std::stringstream ss;
ss << "Add Suite failed: A Suite of name '" << name << "' already exist";
ss << "Add Suite failed: A Suite of name '" << name << "' already exists";
throw std::runtime_error(ss.str());
}
suite_ptr the_suite = Suite::create(name);
Expand All @@ -423,7 +417,7 @@ suite_ptr Defs::add_suite(const std::string& name) {
void Defs::addSuite(const suite_ptr& s, size_t position) {
if (findSuite(s->name()).get()) {
std::stringstream ss;
ss << "Add Suite failed: A Suite of name '" << s->name() << "' already exist";
ss << "Add Suite failed: A Suite of name '" << s->name() << "' already exists";
throw std::runtime_error(ss.str());
}
add_suite_only(s, position);
Expand Down
3 changes: 1 addition & 2 deletions libs/node/src/ecflow/node/Defs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "ecflow/node/Flag.hpp"
#include "ecflow/node/NodeFwd.hpp"
#include "ecflow/node/ServerState.hpp"
#include "ecflow/node/Suite.hpp"

namespace cereal {
class access;
Expand All @@ -56,8 +57,6 @@ class Defs {

~Defs();

void poke();

void copy_defs_state_only(const defs_ptr& defs); // needed when creating defs for client handles
bool operator==(const Defs& rhs) const;
void print(std::string&) const;
Expand Down
4 changes: 0 additions & 4 deletions libs/node/src/ecflow/node/Family.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ Family::~Family() {
delete fam_gen_variables_;
}

void Family::poke() {
NodeContainer::poke();
}

family_ptr Family::create(const std::string& name, bool check) {
return std::make_shared<Family>(name, check);
}
Expand Down
2 changes: 0 additions & 2 deletions libs/node/src/ecflow/node/Family.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ class Family final : public NodeContainer {

~Family() override;

void poke() override;

static family_ptr create(const std::string& name, bool check = true);
static family_ptr create_me(const std::string& name); // python api, to pick correct init function

Expand Down
3 changes: 2 additions & 1 deletion libs/node/src/ecflow/node/Jobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "ecflow/core/Log.hpp"
#include "ecflow/node/Defs.hpp"
#include "ecflow/node/JobsParam.hpp"
#include "ecflow/node/Operations.hpp"
#include "ecflow/node/Signal.hpp"
#include "ecflow/node/Suite.hpp"
#include "ecflow/node/SuiteChanged.hpp"
Expand Down Expand Up @@ -58,7 +59,7 @@ bool Jobs::generate(JobsParam& jobsParam) const {
for (const suite_ptr& suite : suites) {
// SuiteChanged moved into Suite::resolveDependencies.
// This ensures the fast path and when suite are not begun we save a ctor/dtor call
suite->poke();
ecf::visit(*suite, ActivateAll{});
(void)suite->resolveDependencies(jobsParam);
}
}
Expand Down
69 changes: 31 additions & 38 deletions libs/node/src/ecflow/node/MirrorAttr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,38 +42,10 @@ MirrorAttr::MirrorAttr(Node* parent,
}

MirrorAttr::~MirrorAttr() {
if (controller_ != nullptr) {
controller_->stop();
controller_.reset();
}
}

void MirrorAttr::poke() {
ALOG(D, "**** Check Mirror attribute (name: " << name_ << ")");

start_controller();

// Task associated with Attribute is free when any notification is found
auto notifications = controller_->poll_notifications(remote_path_);

if (notifications.empty()) {
// No notifications, nothing to do...
return;
}

// Notifications found -- Node state to be updated
ALOG(D, "MirrorAttr::isFree: found notifications for Mirror attribute (name: " << name_ << ")");

auto latest_state = static_cast<NState::State>(notifications.back().status);
parent_->setStateOnly(latest_state, true);
parent_->handleStateChange();
stop_controller();
}

bool MirrorAttr::why(std::string& theReasonWhy) const {
if (isFree()) {
return false;
}

theReasonWhy += ecf::Message(" is a Mirror of ", remote_path(), " at '", remote_host(), ":", remote_port(), "'");
return true;
}
Expand All @@ -88,21 +60,31 @@ void MirrorAttr::reset() {
start_controller();
}

bool MirrorAttr::isFree() const {
void MirrorAttr::finish() {

LOG(Log::MSG, "**** Check Mirror attribute (name: " << name_ << ")");
ALOG(D,
"MirrorAttr::reset: start polling for Mirror attribute (name: " << name_ << ", host: " << remote_host_
<< ", port: " << remote_port_ << ")");

stop_controller();
}

void MirrorAttr::mirror() {
ALOG(D, "**** Check Mirror attribute (name: " << name_ << ")");

start_controller();

return true;
}
// Task associated with Attribute is free when any notification is found
if (auto notifications = controller_->poll_notifications(remote_path_); !notifications.empty()) {
// Notifications found -- Node state to be updated
ALOG(D, "MirrorAttr::isFree: found notifications for Mirror attribute (name: " << name_ << ")");

void MirrorAttr::start() const {
// Nothing do do...
}
auto latest_state = static_cast<NState::State>(notifications.back().status);
parent_->setStateOnly(latest_state, true);
parent_->handleStateChange();
}

void MirrorAttr::finish() const {
// Nothing do do...
// No notifications, nothing to do...
}

void MirrorAttr::start_controller() const {
Expand All @@ -129,4 +111,15 @@ void MirrorAttr::start_controller() const {
}
}

void MirrorAttr::stop_controller() const {
if (controller_ != nullptr) {
ALOG(D,
"MirrorAttr::reset: stop polling for Mirror attribute (name: " << name_ << ", host: " << remote_host_
<< ", port: " << remote_port_ << ")");

controller_->stop();
controller_.reset();
}
}

} // namespace ecf
15 changes: 9 additions & 6 deletions libs/node/src/ecflow/node/MirrorAttr.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ class MirrorAttr {

MirrorAttr& operator=(const MirrorAttr& rhs) = default;

void poke();

[[nodiscard]] inline const std::string& name() const { return name_; }
[[nodiscard]] inline const std::string& remote_path() const { return remote_path_; }
[[nodiscard]] inline const std::string& remote_host() const { return remote_host_; }
Expand All @@ -78,18 +76,23 @@ class MirrorAttr {

bool why(std::string& theReasonWhy) const;

/**
* Initialises the Mirror procedure, which effectively starts the background polling mechanism.
*/
void reset();
void finish();

[[nodiscard]] bool isFree() const;

void start() const;
void finish() const;
/**
* Check if state changes were detected by the background polling mechanism, and if so, reflect it on the Node.
*/
void mirror();

template <class Archive>
friend void serialize(Archive& ar, MirrorAttr& aviso, std::uint32_t version);

private:
void start_controller() const;
void stop_controller() const;

Node* parent_{nullptr}; // only ever used on the server side, to update parent Node state
name_t name_;
Expand Down
11 changes: 5 additions & 6 deletions libs/node/src/ecflow/node/Node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,6 @@ Node& Node::operator=(const Node& rhs) {

Node::~Node() = default;

void Node::poke() {
for (auto& mirror : mirrors_) {
mirror.poke();
}
}

bool Node::isParentSuspended() const {
Node* theParent = parent();
if (theParent) {
Expand Down Expand Up @@ -758,6 +752,11 @@ bool Node::resolveDependencies(JobsParam& jobsParam) {
return false;
}

if (!mirrors_.empty()) {
// In case mirror attributes are configured, the node is never free (i.e. will not be run)
return false;
}

if (!timeDependenciesFree()) {
#ifdef DEBUG_DEPENDENCIES
const Calendar& calendar = suite()->calendar();
Expand Down
6 changes: 4 additions & 2 deletions libs/node/src/ecflow/node/Node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ class Node : public std::enable_shared_from_this<Node> {
Node(const Node& rhs);
virtual ~Node();

virtual void poke();

virtual bool check_defaults() const;

// parse string and create suite || family || task || alias. Can return a NULL node_ptr() for errors
Expand Down Expand Up @@ -397,7 +395,11 @@ class Node : public std::enable_shared_from_this<Node> {
const std::vector<DateAttr>& dates() const { return dates_; }
const std::vector<DayAttr>& days() const { return days_; }
const std::vector<ecf::CronAttr>& crons() const { return crons_; }

std::vector<ecf::AvisoAttr>& avisos() { return avisos_; }
const std::vector<ecf::AvisoAttr>& avisos() const { return avisos_; }

std::vector<ecf::MirrorAttr>& mirrors() { return mirrors_; }
const std::vector<ecf::MirrorAttr>& mirrors() const { return mirrors_; }

const std::vector<VerifyAttr>& verifys() const;
Expand Down
6 changes: 0 additions & 6 deletions libs/node/src/ecflow/node/NodeContainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,6 @@ NodeContainer& NodeContainer::operator=(const NodeContainer& rhs) {

NodeContainer::~NodeContainer() = default;

void NodeContainer::poke() {
for (const auto& n : nodes_) {
n->poke();
}
}

bool NodeContainer::check_defaults() const {
if (order_state_change_no_ != 0)
throw std::runtime_error("NodeContainer::check_defaults(): order_state_change_no_ != 0");
Expand Down
3 changes: 1 addition & 2 deletions libs/node/src/ecflow/node/NodeContainer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ class NodeContainer : public Node {
NodeContainer();
~NodeContainer() override;

void poke() override;

bool check_defaults() const override;

void accept(ecf::NodeTreeVisitor&) override;
Expand Down Expand Up @@ -67,6 +65,7 @@ class NodeContainer : public Node {
void addFamily(const family_ptr&, size_t position = std::numeric_limits<std::size_t>::max());
void add_child(const node_ptr&, size_t position = std::numeric_limits<std::size_t>::max());

const std::vector<node_ptr>& children() const { return nodes_; }
void immediateChildren(std::vector<node_ptr>&) const override;
void allChildren(std::vector<node_ptr>&) const override;

Expand Down
Loading

0 comments on commit da078d8

Please sign in to comment.