diff --git a/Viewer/ecflowUI/images/icon_no_access.svg b/Viewer/ecflowUI/images/icon_no_access.svg
new file mode 100644
index 000000000..d6b27c5b1
--- /dev/null
+++ b/Viewer/ecflowUI/images/icon_no_access.svg
@@ -0,0 +1,10 @@
+
+
diff --git a/Viewer/ecflowUI/src/VAvisoAttr.cpp b/Viewer/ecflowUI/src/VAvisoAttr.cpp
index dd79e3493..cdd0a6898 100644
--- a/Viewer/ecflowUI/src/VAvisoAttr.cpp
+++ b/Viewer/ecflowUI/src/VAvisoAttr.cpp
@@ -42,7 +42,7 @@ QString VAvisoAttrType::toolTip(QStringList d) const {
t += "Revision: " + d[RevisionIndex] + "
";
t += "Auth: " + d[AuthIndex];
if (auto& reason = d[ReasonIndex]; !reason.isEmpty()) {
- t += "
Reason: " + d[ReasonIndex];
+ t += "
Reason: " + d[ReasonIndex] + "";
}
}
return t;
diff --git a/Viewer/ecflowUI/src/VIcon.cpp b/Viewer/ecflowUI/src/VIcon.cpp
index 11c066f71..7d82c1893 100644
--- a/Viewer/ecflowUI/src/VIcon.cpp
+++ b/Viewer/ecflowUI/src/VIcon.cpp
@@ -139,6 +139,12 @@ class VCheckpointErrorIcon : public VIcon {
bool show(VNode*) override;
};
+class VRemoteErrorIcon : public VIcon {
+public:
+ explicit VRemoteErrorIcon(const std::string& name) : VIcon(name) {}
+ bool show(VNode*) override;
+};
+
//==========================================================
//
// Create VIcon instances
@@ -162,6 +168,7 @@ static VRestoredIcon restoredIcon("restored");
static VSlowJobCreationIcon slowJobCreationIcon("slow_job");
static VNoLogIcon noLog("no_log");
static VCheckpointErrorIcon noCheckptIcon("checkpt_err");
+static VRemoteErrorIcon remoteErrorIcon("remote_err");
//==========================================================
//
@@ -557,3 +564,11 @@ bool VCheckpointErrorIcon::show(VNode* n) {
return n->isFlagSet(ecf::Flag::CHECKPT_ERROR);
}
+
+bool VRemoteErrorIcon::show(VNode* n) {
+ if (!n || n->isServer()) {
+ return false;
+ }
+
+ return n->isFlagSet(ecf::Flag::REMOTE_ERROR);
+}
diff --git a/Viewer/ecflowUI/src/VMirrorAttr.cpp b/Viewer/ecflowUI/src/VMirrorAttr.cpp
index 13c6f678d..5b4a1ee33 100644
--- a/Viewer/ecflowUI/src/VMirrorAttr.cpp
+++ b/Viewer/ecflowUI/src/VMirrorAttr.cpp
@@ -39,7 +39,7 @@ QString VMirrorAttrType::toolTip(QStringList d) const {
t += "SSL: " + d[SslIndex] + "
";
t += "Auth: " + d[AuthIndex];
if (const auto& reason = d[ReasonIndex]; !reason.isEmpty()) {
- t += "
Reason: " + d[ReasonIndex];
+ t += "
Reason: " + d[ReasonIndex] + "";
}
}
return t;
diff --git a/Viewer/ecflowUI/src/viewer.qrc b/Viewer/ecflowUI/src/viewer.qrc
index a38b257d3..3a0dd87f9 100644
--- a/Viewer/ecflowUI/src/viewer.qrc
+++ b/Viewer/ecflowUI/src/viewer.qrc
@@ -14,6 +14,7 @@
../images/icon_archived.svg
../images/icon_calendar.svg
../images/icon_checkpt_err.svg
+ ../images/icon_no_access.svg
../images/icon_clock.svg
../images/icon_clock_free.svg
../images/icon_complete.svg
diff --git a/libs/core/src/ecflow/core/Ecf.cpp b/libs/core/src/ecflow/core/Ecf.cpp
index b42e20658..4aabeb762 100644
--- a/libs/core/src/ecflow/core/Ecf.cpp
+++ b/libs/core/src/ecflow/core/Ecf.cpp
@@ -13,8 +13,8 @@
bool Ecf::server_ = false;
bool Ecf::debug_equality_ = false;
unsigned int Ecf::debug_level_ = 0;
-unsigned int Ecf::state_change_no_ = 0;
-unsigned int Ecf::modify_change_no_ = 0;
+thread_local Ecf::atomic_counter_t Ecf::state_change_no_ = 0;
+thread_local Ecf::atomic_counter_t Ecf::modify_change_no_ = 0;
bool DebugEquality::ignore_server_variables_ = false;
const char* Ecf::SERVER_NAME() {
@@ -59,7 +59,7 @@ const std::string& Ecf::CHECK_CMD() {
return CHECK_CMD;
}
-// -remote has been removed form firefox, since version 39
+//-remote has been removed from firefox, since version 39
//-remote openfile(file) -> -file
//-remote openurl(url) -> -url
//-remote openurl(url,new-window) -> -new-window
@@ -80,15 +80,14 @@ const std::string& Ecf::URL() {
return URL;
}
-unsigned int Ecf::incr_state_change_no() {
+Ecf::counter_t Ecf::incr_state_change_no() {
if (server_) {
return ++state_change_no_;
}
return state_change_no_;
}
-unsigned int Ecf::incr_modify_change_no() {
-
+Ecf::counter_t Ecf::incr_modify_change_no() {
if (server_) {
return ++modify_change_no_;
}
diff --git a/libs/core/src/ecflow/core/Ecf.hpp b/libs/core/src/ecflow/core/Ecf.hpp
index d0d391cd5..69f94f6ff 100644
--- a/libs/core/src/ecflow/core/Ecf.hpp
+++ b/libs/core/src/ecflow/core/Ecf.hpp
@@ -15,20 +15,31 @@
/// \brief Provides globals used by server for determining change
///
+#include
#include
-// class Ecf: This class is used in the server to determine incremental changes
-// to the data model. Each Node/attribute stores a state change no
-// When ever there is a change, we increment local state change
-// number with this global.
-// When making large scale changes, ie nodes added or deleted we use modify change no
-// Note: The client will need to at some point copy over the full defs
-// at this point the state change no add modify number is also copied.
-// The client passes these two number back to server, the server then
-// uses these two numbers to determine what's changed.
-//
+/**
+ * This class holds *global data*, and is used in the server to determine incremental changes to the data model.
+ *
+ * This data is composed of two parts:
+ * - `state_change_no_`: leads the number of node state changes on the currently loaded defs
+ * - `modify_change_no_`: leads the number of structural changes to the currently loaded defs
+ *
+ * Each Node/Attribute stores a local `state_change_no_`, and whenever there is a change, this local number is
+ * assigned based on the incremented value of the `global state_change_no_`.
+ *
+ * When making large scale changes (e.g. adding or deleting nodes), we increment the global `modify_change_no_`.
+ *
+ * The synchronization strategy is such that when the client eventually copies over the full defs, the state_change_no_
+ * and modify_change_no_ are also copied. In future synchronization attempts, the client includes these two numbers in
+ * the sync request, and thus allows the server to determine what has changed.
+ */
+
class Ecf {
public:
+ using counter_t = unsigned int;
+ using atomic_counter_t = std::atomic;
+
// Disable default construction
Ecf() = delete;
// Disable copy (and move) semantics
@@ -36,14 +47,14 @@ class Ecf {
const Ecf& operator=(const Ecf&) = delete;
/// Increment and then return state change no
- static unsigned int incr_state_change_no();
- static unsigned int state_change_no() { return state_change_no_; }
- static void set_state_change_no(unsigned int x) { state_change_no_ = x; }
+ static counter_t incr_state_change_no();
+ static counter_t state_change_no() { return state_change_no_; }
+ static void set_state_change_no(counter_t x) { state_change_no_ = x; }
/// The modify_change_no_ is used for node addition and deletion and re-ordering
- static unsigned int incr_modify_change_no();
- static unsigned int modify_change_no() { return modify_change_no_; }
- static void set_modify_change_no(unsigned int x) { modify_change_no_ = x; }
+ static counter_t incr_modify_change_no();
+ static counter_t modify_change_no() { return modify_change_no_; }
+ static void set_modify_change_no(counter_t x) { modify_change_no_ = x; }
/// Returns true if we are on the server side.
/// Only in server side do we increment state/modify numbers
@@ -80,8 +91,8 @@ class Ecf {
static bool server_;
static bool debug_equality_;
static unsigned int debug_level_;
- static unsigned int state_change_no_;
- static unsigned int modify_change_no_;
+ static thread_local atomic_counter_t state_change_no_;
+ static thread_local atomic_counter_t modify_change_no_;
};
/// Make sure the Ecf number don't change
diff --git a/libs/core/src/ecflow/core/Serialization.hpp b/libs/core/src/ecflow/core/Serialization.hpp
index 7fa6d0b50..daeb07b07 100644
--- a/libs/core/src/ecflow/core/Serialization.hpp
+++ b/libs/core/src/ecflow/core/Serialization.hpp
@@ -65,11 +65,14 @@ void save_as_string(std::string& outbound_data, const T& t) {
}
outbound_data = archive_stream.str();
- //std::cout << "*** save_as_string: " << outbound_data << "\n";
+ std::cout << "*** save_as_string: " << outbound_data << "\n";
}
template
void restore_from_string(const std::string& archive_data, T& restored) {
+
+ std::cout << "*** restore_from_string: " << archive_data << "\n";
+
std::istringstream archive_stream(archive_data);
cereal::JSONInputArchive iarchive(archive_stream); // Create an input archive
iarchive(restored); // Read the data from the archive
diff --git a/libs/node/src/ecflow/node/AvisoAttr.cpp b/libs/node/src/ecflow/node/AvisoAttr.cpp
index 01f5a588a..eaa083c71 100644
--- a/libs/node/src/ecflow/node/AvisoAttr.cpp
+++ b/libs/node/src/ecflow/node/AvisoAttr.cpp
@@ -16,6 +16,7 @@
#include "ecflow/core/Message.hpp"
#include "ecflow/core/exceptions/Exceptions.hpp"
#include "ecflow/node/Node.hpp"
+#include "ecflow/node/Operations.hpp"
namespace ecf {
@@ -48,6 +49,13 @@ AvisoAttr::AvisoAttr(Node* parent,
}
}
+AvisoAttr AvisoAttr::make_detached() const {
+ AvisoAttr detached = *this;
+ detached.parent_ = nullptr;
+ detached.controller_ = nullptr;
+ return detached;
+}
+
void AvisoAttr::set_listener(std::string_view listener) {
state_change_no_ = Ecf::incr_state_change_no();
@@ -108,12 +116,29 @@ bool AvisoAttr::isFree() const {
return a.configuration.revision() < b.configuration.revision();
});
- // (b) update the revision, in the listener configuration
- this->revision_ = max->configuration.revision();
- ALOG(D, "AvisoAttr: " << aviso_path << " updated revision to " << this->revision_);
state_change_no_ = Ecf::incr_state_change_no();
- return true;
+ // (b) update the revision, in the listener configuration
+ if (max->notification.success()) {
+ ALOG(D, "AvisoAttr::isFree: " << aviso_path << " updated revision to " << this->revision_);
+ this->revision_ = max->configuration.revision();
+ parent_->flag().clear(Flag::REMOTE_ERROR);
+ parent_->flag().set_state_change_no(state_change_no_);
+ reason_ = "";
+
+ for (auto* parent = parent_; parent; parent = parent->parent()) {
+ parent->set_state_change_no(state_change_no_);
+ }
+ return max->notification.match().has_value();
+ }
+ else {
+ parent_->flag().set(Flag::REMOTE_ERROR);
+ parent_->flag().set_state_change_no(state_change_no_);
+ reason_ = max->notification.reason();
+
+ ecf::visit_parents(*parent_, [n = this->state_change_no_](Node& node) { node.set_state_change_no(n); });
+ return false;
+ }
}
void AvisoAttr::start() const {
@@ -157,17 +182,21 @@ void AvisoAttr::start_controller(const std::string& aviso_path,
std::uint32_t polling,
const std::string& aviso_auth) const {
- // Controller -- start up the Aviso controller, and subscribe the Aviso listener
- controller_ = std::make_shared();
- controller_->subscribe(ecf::service::aviso::AvisoRequest::make_listen_start(
- aviso_path, aviso_listener, aviso_url, aviso_schema, polling, revision_, aviso_auth));
- // Controller -- effectively start the Aviso listener
- // n.b. this must be done after subscribing in the controller, so that the polling interval is set
- controller_->start();
+ if (!controller_) {
+ // Controller -- start up the Aviso controller, and subscribe the Aviso listener
+ controller_ = std::make_shared();
+ controller_->subscribe(ecf::service::aviso::AvisoRequest::make_listen_start(
+ aviso_path, aviso_listener, aviso_url, aviso_schema, polling, revision_, aviso_auth));
+ // Controller -- effectively start the Aviso listener
+ // n.b. this must be done after subscribing in the controller, so that the polling interval is set
+ controller_->start();
+ }
}
void AvisoAttr::stop_controller(const std::string& aviso_path) const {
if (controller_ != nullptr) {
+ ALOG(D, "AvisoAttr: finishing polling for Aviso attribute (" << parent_path_ << ":" << name_ << ")");
+
controller_->unsubscribe(ecf::service::aviso::AvisoRequest::make_listen_finish(aviso_path));
// Controller -- shutdown up the Aviso controller
@@ -178,7 +207,6 @@ void AvisoAttr::stop_controller(const std::string& aviso_path) const {
void AvisoAttr::finish() const {
using namespace ecf;
- LOG(Log::DBG, Message("AvisoAttr: unsubscribe Aviso attribute (name: ", name_, ", listener: ", listener_, ")"));
std::string aviso_path = path();
stop_controller(aviso_path);
diff --git a/libs/node/src/ecflow/node/AvisoAttr.hpp b/libs/node/src/ecflow/node/AvisoAttr.hpp
index 5a0331fd4..046bab818 100644
--- a/libs/node/src/ecflow/node/AvisoAttr.hpp
+++ b/libs/node/src/ecflow/node/AvisoAttr.hpp
@@ -78,6 +78,8 @@ class AvisoAttr {
AvisoAttr& operator=(const AvisoAttr& rhs) = default;
+ [[nodiscard]] AvisoAttr make_detached() const;
+
[[nodiscard]] inline Node* parent() const { return parent_; }
[[nodiscard]] inline const std::string& name() const { return name_; }
[[nodiscard]] inline const std::string& listener() const { return listener_; }
@@ -124,7 +126,7 @@ class AvisoAttr {
polling_t polling_;
auth_t auth_;
- reason_t reason_;
+ mutable reason_t reason_{};
// The following are mutable as they are modified by the const method isFree()
mutable revision_t revision_;
diff --git a/libs/node/src/ecflow/node/Flag.cpp b/libs/node/src/ecflow/node/Flag.cpp
index 03cebb1cb..baf5777a0 100644
--- a/libs/node/src/ecflow/node/Flag.cpp
+++ b/libs/node/src/ecflow/node/Flag.cpp
@@ -19,16 +19,14 @@
namespace ecf {
void Flag::set(Flag::Type flag) {
- if (!is_set(flag)) {
- // minimize changes to state_change_no_
+ if (!is_set(flag)) { // minimize changes to state_change_no_
flag_ |= (1 << flag);
state_change_no_ = Ecf::incr_state_change_no();
}
}
void Flag::clear(Flag::Type flag) {
- if (is_set(flag)) {
- // minimize changes to state_change_no_
+ if (is_set(flag)) { // minimize changes to state_change_no_
flag_ &= ~(1 << flag);
state_change_no_ = Ecf::incr_state_change_no();
}
@@ -41,7 +39,7 @@ void Flag::reset() {
std::vector Flag::list() {
std::vector ret;
- ret.reserve(24);
+ ret.reserve(25);
ret.push_back(Flag::FORCE_ABORT);
ret.push_back(Flag::USER_EDIT);
ret.push_back(Flag::TASK_ABORTED);
@@ -66,18 +64,20 @@ std::vector Flag::list() {
ret.push_back(Flag::ECF_SIGTERM);
ret.push_back(Flag::LOG_ERROR);
ret.push_back(Flag::CHECKPT_ERROR);
+ ret.push_back(Flag::REMOTE_ERROR);
return ret;
}
-constexpr std::array Flag::array() {
- return std::array{Flag::FORCE_ABORT, Flag::USER_EDIT, Flag::TASK_ABORTED,
- Flag::EDIT_FAILED, Flag::JOBCMD_FAILED, Flag::KILLCMD_FAILED,
- Flag::STATUSCMD_FAILED, Flag::NO_SCRIPT, Flag::KILLED,
- Flag::STATUS, Flag::LATE, Flag::MESSAGE,
- Flag::BYRULE, Flag::QUEUELIMIT, Flag::WAIT,
- Flag::LOCKED, Flag::ZOMBIE, Flag::NO_REQUE_IF_SINGLE_TIME_DEP,
- Flag::ARCHIVED, Flag::RESTORED, Flag::THRESHOLD,
- Flag::ECF_SIGTERM, Flag::LOG_ERROR, Flag::CHECKPT_ERROR};
+constexpr std::array Flag::array() {
+ return std::array{Flag::FORCE_ABORT, Flag::USER_EDIT, Flag::TASK_ABORTED,
+ Flag::EDIT_FAILED, Flag::JOBCMD_FAILED, Flag::KILLCMD_FAILED,
+ Flag::STATUSCMD_FAILED, Flag::NO_SCRIPT, Flag::KILLED,
+ Flag::STATUS, Flag::LATE, Flag::MESSAGE,
+ Flag::BYRULE, Flag::QUEUELIMIT, Flag::WAIT,
+ Flag::LOCKED, Flag::ZOMBIE, Flag::NO_REQUE_IF_SINGLE_TIME_DEP,
+ Flag::ARCHIVED, Flag::RESTORED, Flag::THRESHOLD,
+ Flag::ECF_SIGTERM, Flag::LOG_ERROR, Flag::CHECKPT_ERROR,
+ Flag::REMOTE_ERROR};
}
std::string Flag::enum_to_string(Flag::Type flag) {
@@ -155,6 +155,9 @@ std::string Flag::enum_to_string(Flag::Type flag) {
case Flag::STATUS:
return "status";
break;
+ case Flag::REMOTE_ERROR:
+ return "remote_error";
+ break;
case Flag::NOT_SET:
return "not_set";
break;
@@ -238,6 +241,9 @@ const char* Flag::enum_to_char_star(Flag::Type flag) {
case Flag::CHECKPT_ERROR:
return "checkpt_error";
break;
+ case Flag::REMOTE_ERROR:
+ return "remote_error";
+ break;
case Flag::NOT_SET:
return "not_set";
break;
@@ -297,11 +303,13 @@ Flag::Type Flag::string_to_flag_type(const std::string& s) {
return Flag::LOG_ERROR;
if (s == "checkpt_error")
return Flag::CHECKPT_ERROR;
+ if (s == "remote_error")
+ return Flag::REMOTE_ERROR;
return Flag::NOT_SET;
}
void Flag::valid_flag_type(std::vector& vec) {
- vec.reserve(24);
+ vec.reserve(25);
vec.emplace_back("force_aborted");
vec.emplace_back("user_edit");
vec.emplace_back("task_aborted");
@@ -326,6 +334,7 @@ void Flag::valid_flag_type(std::vector& vec) {
vec.emplace_back("sigterm");
vec.emplace_back("log_error");
vec.emplace_back("checkpt_error");
+ vec.emplace_back("remote_error");
}
std::string Flag::to_string() const {
@@ -335,8 +344,8 @@ std::string Flag::to_string() const {
}
void Flag::write(std::string& ret) const {
- bool added = false;
- std::array flag_list = Flag::array();
+ bool added = false;
+ auto flag_list = Flag::array();
for (auto& i : flag_list) {
if (is_set(i)) {
if (added)
diff --git a/libs/node/src/ecflow/node/Flag.hpp b/libs/node/src/ecflow/node/Flag.hpp
index 7561e3d77..8e0d6adde 100644
--- a/libs/node/src/ecflow/node/Flag.hpp
+++ b/libs/node/src/ecflow/node/Flag.hpp
@@ -43,36 +43,92 @@ namespace ecf {
class Flag {
public:
+ using underlying_type_t = int;
+ static_assert(sizeof(underlying_type_t) >= 4, "Flag's underlying type must have at least 4 bytes");
+
Flag() = default;
/// The BYRULE is used to distinguish between tasks that have RUN and completed
/// and those that have completed by complete expression.
enum Type {
- FORCE_ABORT = 0, // Node* do not run when try_no > ECF_TRIES, and task aborted by user
- USER_EDIT = 1, // task
- TASK_ABORTED = 2, // task*
- EDIT_FAILED = 3, // task*
- JOBCMD_FAILED = 4, // task*
- NO_SCRIPT = 5, // task*
- KILLED = 6, // task* do not run when try_no > ECF_TRIES, and task killed by user
- LATE = 7, // Node attribute,
- MESSAGE = 8, // Node
- BYRULE = 9, // Node*, set if node is set to complete by complete trigger expression
- QUEUELIMIT = 10, // Node ( NOT USED currently)
- WAIT = 11, // task* set when waiting for trigger expression in client command
- LOCKED = 12, // Server ( NOT USED currently)
- ZOMBIE = 13, // task* Set/cleared but never queried by GUI
- NO_REQUE_IF_SINGLE_TIME_DEP = 14, //
- ARCHIVED = 15, // Container*
- RESTORED = 16, // Container*, Avoid re-archiving node that is restored, until it is re-queued again
- THRESHOLD = 17, // Job threshold exceeded.(slow disk,large includes/huge scripts,overloaded machine,server)
- ECF_SIGTERM = 18, // Record on defs that server received SIGTERM signal, main used in test
- NOT_SET = 19,
- LOG_ERROR = 20, // Error in opening or writing to the log file
- CHECKPT_ERROR = 21, // Error in saving checkpoint file
- KILLCMD_FAILED = 22, // task*
- STATUSCMD_FAILED = 23, // task*
- STATUS = 24 // task*
+
+ // Node* do not run when try_no > ECF_TRIES, and task aborted by user
+ FORCE_ABORT = 0,
+
+ // task
+ USER_EDIT = 1,
+
+ // task*
+ TASK_ABORTED = 2,
+
+ // task*
+ EDIT_FAILED = 3,
+
+ // task*
+ JOBCMD_FAILED = 4,
+
+ // task*
+ NO_SCRIPT = 5,
+
+ // task* do not run when try_no > ECF_TRIES, and task killed by user
+ KILLED = 6,
+
+ // Node attribute,
+ LATE = 7,
+
+ // Node
+ MESSAGE = 8,
+
+ // Node*, set if node is set to complete by complete trigger expression
+ BYRULE = 9,
+
+ // Node ( NOT USED currently)
+ QUEUELIMIT = 10,
+
+ // task* set when waiting for trigger expression in client command
+ WAIT = 11,
+
+ // Server ( NOT USED currently)
+ LOCKED = 12,
+
+ // task* Set/cleared but never queried by GUI
+ ZOMBIE = 13,
+
+ //
+ NO_REQUE_IF_SINGLE_TIME_DEP = 14,
+
+ // Container
+ ARCHIVED = 15,
+
+ // Container, Avoid re-archiving node that is restored, until re-queued again
+ RESTORED = 16,
+
+ // Job threshold exceeded, slow disk, large includes/huge scripts,overloaded machine,server)
+ THRESHOLD = 17,
+
+ // Record on defs that server received SIGTERM signal, main used in test
+ ECF_SIGTERM = 18,
+
+ //
+ NOT_SET = 19,
+
+ // Error in opening or writing to the log file
+ LOG_ERROR = 20,
+
+ // Error in saving checkpoint file
+ CHECKPT_ERROR = 21,
+
+ // task*
+ KILLCMD_FAILED = 22,
+
+ // task*
+ STATUSCMD_FAILED = 23,
+
+ // task*
+ STATUS = 24,
+
+ // Error connecting to remote source
+ REMOTE_ERROR = 25
};
bool operator==(const Flag& rhs) const { return flag_ == rhs.flag_; }
@@ -85,7 +141,7 @@ class Flag {
void reset();
int flag() const { return flag_; }
- void set_flag(int f) { flag_ = f; }
+ void set_flag(underlying_type_t f) { flag_ = f; }
void set_flag(const std::string& flags); // these are comma separated
/// returns a comma separated list of all flags set
@@ -97,11 +153,12 @@ class Flag {
static const char* enum_to_char_star(Flag::Type flag);
/// Used to determine change in state relative to client
+ void set_state_change_no(unsigned int n) { state_change_no_ = n; }
unsigned int state_change_no() const { return state_change_no_; }
/// returns the list of all flag types
static std::vector list();
- static constexpr std::array array();
+ static constexpr std::array array();
/// Converts from string to flag types.
static Flag::Type string_to_flag_type(const std::string& s);
@@ -110,7 +167,7 @@ class Flag {
static void valid_flag_type(std::vector& vec);
private:
- int flag_{0};
+ underlying_type_t flag_{0};
unsigned int state_change_no_{0}; // *not* persisted, only used on server side
friend class cereal::access;
diff --git a/libs/node/src/ecflow/node/Jobs.cpp b/libs/node/src/ecflow/node/Jobs.cpp
index bbeb211b2..3f79dcb7e 100644
--- a/libs/node/src/ecflow/node/Jobs.cpp
+++ b/libs/node/src/ecflow/node/Jobs.cpp
@@ -59,7 +59,7 @@ bool Jobs::generate(JobsParam& jobsParam) const {
for (const suite_ptr& suite : suites) {
// SuiteChanged moved into Suite::resolveDependencies.
// This ensures the fast path and when suite are not begun we save a ctor/dtor call
- ecf::visit(*suite, ActivateAll{});
+ ecf::visit_all(*suite, ActivateAll{});
(void)suite->resolveDependencies(jobsParam);
}
}
diff --git a/libs/node/src/ecflow/node/Memento.hpp b/libs/node/src/ecflow/node/Memento.hpp
index 350ee7fba..ff97e692e 100644
--- a/libs/node/src/ecflow/node/Memento.hpp
+++ b/libs/node/src/ecflow/node/Memento.hpp
@@ -44,11 +44,11 @@
#include "ecflow/attribute/ZombieAttr.hpp"
#include "ecflow/node/Alias.hpp"
#include "ecflow/node/AvisoAttr.hpp"
-#include "ecflow/node/MirrorAttr.hpp"
#include "ecflow/node/Defs.hpp"
#include "ecflow/node/Expression.hpp"
#include "ecflow/node/Family.hpp"
#include "ecflow/node/Limit.hpp"
+#include "ecflow/node/MirrorAttr.hpp"
#include "ecflow/node/Suite.hpp"
#include "ecflow/node/Task.hpp"
@@ -656,7 +656,7 @@ class NodeCronMemento : public Memento {
class NodeAvisoMemento : public Memento {
public:
NodeAvisoMemento() = default;
- explicit NodeAvisoMemento(const ecf::AvisoAttr& a) : aviso_(a) {}
+ explicit NodeAvisoMemento(const ecf::AvisoAttr& a) : aviso_(a.make_detached()) {}
private:
void do_incremental_node_sync(Node* n, std::vector& aspects, bool f) const override {
@@ -674,7 +674,7 @@ class NodeAvisoMemento : public Memento {
class NodeMirrorMemento : public Memento {
public:
NodeMirrorMemento() = default;
- explicit NodeMirrorMemento(const ecf::MirrorAttr& a) : mirror_(a) {}
+ explicit NodeMirrorMemento(const ecf::MirrorAttr& a) : mirror_(a.make_detached()) {}
private:
void do_incremental_node_sync(Node* n, std::vector& aspects, bool f) const override {
diff --git a/libs/node/src/ecflow/node/MirrorAttr.cpp b/libs/node/src/ecflow/node/MirrorAttr.cpp
index adc743373..da4ebf798 100644
--- a/libs/node/src/ecflow/node/MirrorAttr.cpp
+++ b/libs/node/src/ecflow/node/MirrorAttr.cpp
@@ -16,6 +16,7 @@
#include "ecflow/core/Message.hpp"
#include "ecflow/core/exceptions/Exceptions.hpp"
#include "ecflow/node/Node.hpp"
+#include "ecflow/node/Operations.hpp"
namespace ecf {
@@ -51,6 +52,13 @@ MirrorAttr::~MirrorAttr() {
stop_controller();
}
+[[nodiscard]] MirrorAttr MirrorAttr::make_detached() const {
+ MirrorAttr clone{*this};
+ clone.parent_ = nullptr;
+ clone.controller_ = nullptr;
+ return clone;
+}
+
std::string MirrorAttr::absolute_name() const {
return parent_->absNodePath() + ':' + name_;
}
@@ -76,12 +84,34 @@ void MirrorAttr::mirror() {
// Task associated with Attribute is free when any notification is found
if (auto notifications = controller_->poll_notifications(remote_path_); !notifications.empty()) {
- // Notifications found -- Node state to be updated
- ALOG(D, "MirrorAttr::isFree: found notifications for Mirror attribute (name: " << name_ << ")");
- auto latest_state = static_cast(notifications.back().status);
- parent_->setStateOnly(latest_state, true);
- parent_->handleStateChange();
+ // Update the 'local' state change number
+ state_change_no_ = Ecf::incr_state_change_no();
+
+ // Notifications found -- Node state to be updated or error to be reported
+ if (auto& notification = notifications.back(); notification.success) {
+ ALOG(D, "MirrorAttr: Updating Mirror attribute (name: " << name_ << ") to state " << notification.status);
+ auto latest_state = static_cast(notification.status);
+ reason_ = "";
+ parent_->flag().clear(Flag::REMOTE_ERROR);
+ parent_->flag().set_state_change_no(state_change_no_);
+ parent_->setStateOnly(latest_state, true);
+ }
+ else {
+ ALOG(D,
+ "MirrorAttr: Failure detected on Mirror attribute (name: " << name_ << ") due to "
+ << notification.path);
+ reason_ = notification.reason();
+ parent_->flag().set(Flag::REMOTE_ERROR);
+ parent_->flag().set_state_change_no(state_change_no_);
+ parent_->setStateOnly(NState::UNKNOWN, true);
+ }
+
+ // Propagate the 'local' state change number to all parents
+ ecf::visit_parents(*parent_, [n = this->state_change_no_](Node& node) { node.set_state_change_no(n); });
+ }
+ else {
+ ALOG(D, "MirrorAttr: No notifications found for Mirror attribute (name: " << name_ << ")");
}
// No notifications, nothing to do...
@@ -115,7 +145,10 @@ void MirrorAttr::start_controller() const {
void MirrorAttr::stop_controller() const {
if (controller_ != nullptr) {
- ALOG(D, "MirrorAttr: stop polling Mirror attribute '" << absolute_name() << "'");
+ ALOG(D,
+ "MirrorAttr: finishing polling for Mirror attribute \"" << parent_->absNodePath() << ":" << name_
+ << "\", from host: " << remote_host_
+ << ", port: " << remote_port_ << ")");
controller_->stop();
controller_.reset();
diff --git a/libs/node/src/ecflow/node/MirrorAttr.hpp b/libs/node/src/ecflow/node/MirrorAttr.hpp
index 1ca57f0a5..016885a11 100644
--- a/libs/node/src/ecflow/node/MirrorAttr.hpp
+++ b/libs/node/src/ecflow/node/MirrorAttr.hpp
@@ -76,6 +76,8 @@ class MirrorAttr {
MirrorAttr& operator=(const MirrorAttr& rhs) = default;
+ [[nodiscard]] MirrorAttr make_detached() const;
+
[[nodiscard]] inline const std::string& name() const { return name_; }
[[nodiscard]] std::string absolute_name() const;
diff --git a/libs/node/src/ecflow/node/Node.hpp b/libs/node/src/ecflow/node/Node.hpp
index ad6138271..e13d8e2ce 100644
--- a/libs/node/src/ecflow/node/Node.hpp
+++ b/libs/node/src/ecflow/node/Node.hpp
@@ -713,6 +713,9 @@ class Node : public std::enable_shared_from_this {
virtual boost::posix_time::time_duration sum_runtime() { return sc_rt_; }
+ void set_state_change_no(unsigned int x) { state_change_no_ = x; }
+ unsigned int state_change_no() const { return state_change_no_; }
+
protected:
void set_runtime(const boost::posix_time::time_duration& rt) { sc_rt_ = rt; }
diff --git a/libs/node/src/ecflow/node/NodeMemento.cpp b/libs/node/src/ecflow/node/NodeMemento.cpp
index 50b335c28..7ae42ce73 100644
--- a/libs/node/src/ecflow/node/NodeMemento.cpp
+++ b/libs/node/src/ecflow/node/NodeMemento.cpp
@@ -166,6 +166,8 @@ void Node::incremental_changes(DefsDelta& changes, compound_memento_ptr& comp) c
if (late_)
comp->add(std::make_shared(*late_));
+ comp->add(std::make_shared(flag_));
+
changes.add(comp);
return;
}
diff --git a/libs/node/src/ecflow/node/Operations.hpp b/libs/node/src/ecflow/node/Operations.hpp
index 946592348..46a38abad 100644
--- a/libs/node/src/ecflow/node/Operations.hpp
+++ b/libs/node/src/ecflow/node/Operations.hpp
@@ -68,19 +68,19 @@ namespace detail {
template
void visit_all(const std::vector>& all, V&& visitor) {
for (auto& item : all) {
- visit(*item, std::forward(visitor));
+ visit_all(*item, std::forward(visitor));
}
}
template
void visit_attrs(std::vector& all, V&& visitor) {
for (auto& i : all) {
- visit(i, std::forward(visitor));
+ visit_all(i, std::forward(visitor));
}
}
template
-struct Visitor
+struct VisitorAll
{
template
void operator()(V&& v) {
@@ -91,7 +91,7 @@ struct Visitor
};
template <>
-struct Visitor
+struct VisitorAll
{
template
void operator()(V&& v) {
@@ -104,7 +104,7 @@ struct Visitor
};
template <>
-struct Visitor
+struct VisitorAll
{
template
void operator()(V&& v) {
@@ -116,18 +116,18 @@ struct Visitor
};
template <>
-struct Visitor
+struct VisitorAll
{
template
void operator()(V&& v) {
if (auto* family_ptr = dynamic_cast(&node_)) {
- visit(*family_ptr, std::forward(v));
+ visit_all(*family_ptr, std::forward(v));
}
else if (auto* task_ptr = dynamic_cast(&node_)) {
- visit(*task_ptr, std::forward(v));
+ visit_all(*task_ptr, std::forward(v));
}
if (auto* alias_ptr = dynamic_cast(&node_)) {
- visit(*alias_ptr, std::forward(v));
+ visit_all(*alias_ptr, std::forward(v));
}
}
@@ -135,7 +135,7 @@ struct Visitor
};
template <>
-struct Visitor
+struct VisitorAll
{
template
void operator()(V&& v) {
@@ -147,7 +147,7 @@ struct Visitor
};
template <>
-struct Visitor
+struct VisitorAll
{
template
void operator()(V&& v) {
@@ -159,11 +159,46 @@ struct Visitor
Defs& defs_;
};
+template
+struct VisitorParent
+{
+ template
+ void operator()(V&& v) {
+ v(item_);
+ if (auto* parent = item_.parent(); parent) {
+ visit_parents(*parent, std::forward(v));
+ }
+ }
+
+ I& item_;
+};
+
} // namespace detail
+/**
+ * Traverses all nodes downwards in the tree, including the given item and all its children.
+ *
+ * @tparam V - Visitor type
+ * @tparam I - Item type
+ * @param item - The 'root' item to traverse
+ * @param visitor - The visitor to apply to each item
+ */
+template
+void visit_all(I& item, V&& visitor) {
+ detail::VisitorAll{item}(std::forward(visitor));
+}
+
+/**
+ * Traverses nodes upwards in the tree, including the given item and all its parents.
+ *
+ * @tparam V - Visitor type
+ * @tparam I - Item type
+ * @param item - The 'leaf' item to traverse
+ * @param visitor - The visitor to apply to each item
+ */
template
-void visit(I& item, V&& visitor) {
- detail::Visitor{item}(std::forward(visitor));
+void visit_parents(I& item, V&& visitor) {
+ detail::VisitorParent{item}(std::forward(visitor));
}
} // namespace ecf
diff --git a/libs/node/src/ecflow/node/Suite.cpp b/libs/node/src/ecflow/node/Suite.cpp
index c1aaca538..ed1cbaf49 100644
--- a/libs/node/src/ecflow/node/Suite.cpp
+++ b/libs/node/src/ecflow/node/Suite.cpp
@@ -61,7 +61,6 @@ Suite& Suite::operator=(const Suite& rhs) {
clock_end_attr_ = std::make_shared(*rhs.clock_end_attr_);
cal_ = rhs.cal_;
- state_change_no_ = 0;
modify_change_no_ = Ecf::incr_modify_change_no();
begun_change_no_ = 0;
calendar_change_no_ = 0;
@@ -93,8 +92,6 @@ bool Suite::check_defaults() const {
throw std::runtime_error("Suite::check_defaults(): defs_ != nullptr");
if (begun_ != false)
throw std::runtime_error("Suite::check_defaults(): begun_ != false");
- if (state_change_no_ != 0)
- throw std::runtime_error("Suite::check_defaults(): state_change_no_ != 0");
if (modify_change_no_ != 0)
throw std::runtime_error("Suite::check_defaults(): modify_change_no_ != 0 ");
if (begun_change_no_ != 0)
@@ -558,13 +555,6 @@ bool Suite::checkInvariants(std::string& errorMsg) const {
}
if (Ecf::server()) {
- if (state_change_no_ > Ecf::state_change_no()) {
- std::stringstream ss;
- ss << "Suite::checkInvariants: suite_change_no(" << state_change_no_ << ") > Ecf::state_change_no("
- << Ecf::state_change_no() << ")\n";
- errorMsg += ss.str();
- return false;
- }
if (begun_change_no_ > Ecf::state_change_no()) {
std::stringstream ss;
ss << "Suite::checkInvariants: begun_change_no_(" << begun_change_no_ << ") > Ecf::state_change_no("
diff --git a/libs/node/src/ecflow/node/Suite.hpp b/libs/node/src/ecflow/node/Suite.hpp
index a7a30d82d..806ab215b 100644
--- a/libs/node/src/ecflow/node/Suite.hpp
+++ b/libs/node/src/ecflow/node/Suite.hpp
@@ -96,8 +96,6 @@ class Suite final : public NodeContainer {
NodeContainer::set_memento(m, aspects, f);
}
- void set_state_change_no(unsigned int x) { state_change_no_ = x; }
- unsigned int state_change_no() const { return state_change_no_; }
void set_modify_change_no(unsigned int x) { modify_change_no_ = x; }
unsigned int modify_change_no() const { return modify_change_no_; }
@@ -119,7 +117,6 @@ class Suite final : public NodeContainer {
clock_ptr clockAttr_;
clock_ptr clock_end_attr_; // *NOT* persisted, used by simulator only
ecf::Calendar cal_; // *Only* persisted since used by the why() on client side
- unsigned int state_change_no_{0}; // no need to persist
unsigned int modify_change_no_{0}; // no need to persist
unsigned int begun_change_no_{0}; // no need to persist, record changes to begun_. Needed for SSyncCmd
unsigned int calendar_change_no_{0}; // no need to persist,
diff --git a/libs/server/src/ecflow/server/BaseServer.cpp b/libs/server/src/ecflow/server/BaseServer.cpp
index 0b6065d57..aba30aedd 100644
--- a/libs/server/src/ecflow/server/BaseServer.cpp
+++ b/libs/server/src/ecflow/server/BaseServer.cpp
@@ -254,7 +254,7 @@ void BaseServer::updateDefs(defs_ptr defs, bool force) {
LOG_ASSERT(defs_->server().jobSubmissionInterval() != 0, "");
if (serverState_ == SState::RUNNING) {
- ecf::visit(*defs_, BootstrapDefs{});
+ ecf::visit_all(*defs_, BootstrapDefs{});
}
}
@@ -360,7 +360,7 @@ void BaseServer::halted() {
// Added after discussion with Axel.
checkPtSaver_.stop();
- ecf::visit(*defs_, ShutdownDefs{});
+ ecf::visit_all(*defs_, ShutdownDefs{});
// Stop the task communication with server. Hence nodes can be stuck
// in submitted/active states. Task based command will continue attempting,
@@ -387,7 +387,7 @@ void BaseServer::restart() {
traverser_.start();
checkPtSaver_.start();
- ecf::visit(*defs_, BootstrapDefs{});
+ ecf::visit_all(*defs_, BootstrapDefs{});
}
void BaseServer::traverse_node_tree_and_job_generate(const boost::posix_time::ptime& time_now,
diff --git a/libs/service/src/ecflow/service/aviso/Aviso.cpp b/libs/service/src/ecflow/service/aviso/Aviso.cpp
index a3f40a6f3..80fdf706e 100644
--- a/libs/service/src/ecflow/service/aviso/Aviso.cpp
+++ b/libs/service/src/ecflow/service/aviso/Aviso.cpp
@@ -54,7 +54,11 @@ std::ostream& operator<<(std::ostream& os, const AvisoRequest& request) {
/* Notification */
std::ostream& operator<<(std::ostream& os, const AvisoNotification& notification) {
- os << notification.key() << " = " << notification.value() << " (revision: " << notification.revision() << ")";
+ os << "success: " << notification.success();
+ if (const auto& match = notification.match(); match) {
+ auto& m = match.value();
+ os << m.key() << " = " << m.value() << " (revision: " << m.revision() << ")";
+ }
return os;
}
@@ -84,24 +88,19 @@ ConfiguredListener ConfiguredListener::make_configured_listener(const AvisoReque
throw std::runtime_error("Failed to load listener schema: " + std::string(e.what()));
}
- std::string address = listen_request.address();
- std::string path = listen_request.path();
- std::string event = data["event"];
- uint32_t polling = listen_request.polling();
- uint64_t revision = listen_request.revision();
+ std::string address = listen_request.address();
+ std::string path = listen_request.path();
+ std::string event = data["event"];
+ uint32_t polling = listen_request.polling();
+ uint64_t revision = listen_request.revision();
const auto& listener = schema.get_listener(event);
if (!listener) {
throw std::runtime_error("Listener not found");
}
- ConfiguredListener configured{aviso::etcd::Address{address},
- path,
- listener->name(),
- listener->base(),
- listener->stem(),
- polling,
- revision};
+ ConfiguredListener configured{
+ aviso::etcd::Address{address}, path, listener->name(), listener->base(), listener->stem(), polling, revision};
ALOG(I,
"Aviso: configured with: " << path << " for " << event << " at " << address << " with revision " << revision);
@@ -255,7 +254,7 @@ ConfiguredListener::accepts(const std::string& key, const std::string& value, ui
if (applicable) {
AvisoNotification notification{key, value, revision};
for (const auto& [k, v] : actual_parameters) {
- notification.add_parameter(k, v);
+ notification.match()->add_parameter(k, v);
}
ALOG(D, "Aviso: Match [✓] --> " << key << " = " << value << " (revision: " << revision << ")");
return notification;
diff --git a/libs/service/src/ecflow/service/aviso/Aviso.hpp b/libs/service/src/ecflow/service/aviso/Aviso.hpp
index 9d36add20..304f5324a 100644
--- a/libs/service/src/ecflow/service/aviso/Aviso.hpp
+++ b/libs/service/src/ecflow/service/aviso/Aviso.hpp
@@ -100,33 +100,51 @@ std::ostream& operator<<(std::ostream& os, const AvisoRequest& request);
*
* */
class AvisoNotification {
+public:
+ struct Match
+ {
+ Match(std::string_view key, std::string_view value, uint64_t revision)
+ : key_{key},
+ value_{value},
+ revision_{revision},
+ parameters_{} {}
+
+ std::string_view key() const { return key_; }
+ std::string_view value() const { return value_; }
+ uint64_t revision() const { return revision_; }
+
+ void add_parameter(const std::string& parameter, const std::string& value) {
+ parameters_.emplace_back(parameter, value);
+ }
+
+ std::string key_;
+ std::string value_;
+ uint64_t revision_;
+ std::vector> parameters_{};
+ };
+
public:
AvisoNotification() = default;
+ AvisoNotification(std::string_view reason) : success_{false}, match_{}, reason_{reason} {}
AvisoNotification(std::string_view key, std::string_view value, uint64_t revision)
- : key_{key},
- value_{value},
- revision_{revision} {}
+ : success_{true},
+ match_{std::make_optional(key, value, revision)} {}
- std::string_view key() const { return key_; }
- std::string_view value() const { return value_; }
- uint64_t revision() const { return revision_; }
-
- void add_parameter(const std::string& parameter, const std::string& value) {
- parameters_.emplace_back(parameter, value);
- }
- std::vector> parameters() const { return parameters_; }
+ [[nodiscard]] bool success() const { return success_; }
+ [[nodiscard]] std::optional match() const { return match_; }
+ [[nodiscard]] std::string reason() const { return reason_; }
private:
- std::string key_{};
- std::string value_{};
- uint64_t revision_{0};
- std::vector> parameters_{};
+ bool success_{true};
+ std::optional match_{};
+ std::string reason_{};
};
std::ostream& operator<<(std::ostream& os, const AvisoNotification& notification);
/**
- * A Listener represents an Aviso Listener, loaded from the Schema with placeholders as part of the `base` and `stem`.
+ * A Listener represents an Aviso Listener, loaded from the Schema with placeholders as part of the `base` and
+ * `stem`.
*/
class Listener {
public:
diff --git a/libs/service/src/ecflow/service/aviso/AvisoService.cpp b/libs/service/src/ecflow/service/aviso/AvisoService.cpp
index 350890067..cbabd846e 100644
--- a/libs/service/src/ecflow/service/aviso/AvisoService.cpp
+++ b/libs/service/src/ecflow/service/aviso/AvisoService.cpp
@@ -17,7 +17,7 @@ namespace ecf::service::aviso {
namespace {
-std::string get_authentication_credential(const std::string& auth_file) {
+std::string load_authentication_credential(const std::string& auth_file) {
std::ifstream file(auth_file);
if (!file.is_open()) {
throw std::runtime_error("Unable to open file: " + auth_file);
@@ -87,8 +87,18 @@ void AvisoService::operator()(const std::chrono::system_clock::time_point& now)
<< entry.prefix() << ", rev: " << entry.listener().revision() << ")");
// Poll notifications on the key prefix
- auto updated_keys = client.poll(entry.prefix(), entry.listener().revision() + 1);
+ std::vector> updated_keys;
+ try {
+ updated_keys = client.poll(entry.prefix(), entry.listener().revision() + 1);
+ }
+ catch (const std::exception& e) {
+ notification_t n{std::string{entry.path()}, entry.listener(), AvisoNotification{e.what()}};
+ notify_(n); // Notification regarding failure to contact the server
+ return;
+ }
+
+ auto matched = false;
// Pass updated keys to the listener
for (auto&& [key, value] : updated_keys) {
if (key == "latest_revision") {
@@ -102,9 +112,15 @@ void AvisoService::operator()(const std::chrono::system_clock::time_point& now)
if (auto notification = entry.listener().accepts(key, value, entry.listener().revision());
notification) {
notification_t n{std::string{entry.path()}, entry.listener(), *notification};
- notify_(n);
+ notify_(n); // Notification regarding a successful match
+ matched = true;
}
}
+
+ if (!matched) {
+ notification_t n{std::string{entry.path()}, entry.listener(), AvisoNotification{}};
+ notify_(n); // Notification regarding no match
+ }
}
}
}
@@ -121,7 +137,7 @@ void AvisoService::register_listener(const AvisoRequest& listen) {
auto& inserted = listeners_.emplace_back(listener);
if (auto auth = listen.auth(); !auth.empty()) {
- inserted.auth_token = get_authentication_credential(listen.auth());
+ inserted.auth_token = load_authentication_credential(listen.auth());
}
}
diff --git a/libs/service/src/ecflow/service/aviso/AvisoService.hpp b/libs/service/src/ecflow/service/aviso/AvisoService.hpp
index 7127bdcab..f22270a6a 100644
--- a/libs/service/src/ecflow/service/aviso/AvisoService.hpp
+++ b/libs/service/src/ecflow/service/aviso/AvisoService.hpp
@@ -89,7 +89,6 @@ class AvisoService {
private:
void register_listener(const AvisoRequest& request);
-// void register_listener(const listener_t& listener);
void unregister_listener(const std::string& unlisten_path);
executor::PeriodicTaskExecutor> executor_;
diff --git a/libs/service/src/ecflow/service/aviso/etcd/Client.cpp b/libs/service/src/ecflow/service/aviso/etcd/Client.cpp
index 9faa5653f..4a5de9d1f 100644
--- a/libs/service/src/ecflow/service/aviso/etcd/Client.cpp
+++ b/libs/service/src/ecflow/service/aviso/etcd/Client.cpp
@@ -12,6 +12,8 @@
#include
+#include "ecflow/core/Message.hpp"
+
#if defined(ECF_OPENSSL)
#include
#if OPENSSL_VERSION_NUMBER < 0x1010100fL
@@ -54,7 +56,6 @@ Client::Client(Address address) : impl_(std::make_unique(address))
Client::Client(Address address, std::string auth_token)
: impl_(std::make_unique(std::move(address), std::move(auth_token))) {
- std::cout << OPENSSL_VERSION_NUMBER << std::endl;
}
Client::~Client() = default;
@@ -65,7 +66,7 @@ std::vector> Client::poll(std::string_view k
httplib::Headers headers;
if (!impl_->auth_token_.empty()) {
- ALOG(D, "EtcdClient: using authorization token " << impl_->auth_token_);
+ ALOG(D, "EtcdClient: using authorization token");
headers.emplace("Authorization", "Bearer " + impl_->auth_token_);
}
@@ -78,13 +79,11 @@ std::vector> Client::poll(std::string_view k
httplib::Result result = impl_->client_.Post(endpoint_path, headers, request_body, content_type);
if (!result) {
- ALOG(E, "EtcdClient " << result.error());
- return std::vector>{};
+ throw std::runtime_error(Message("EtcdClient: Unable to retrieve result, due to ", result.error()).str());
}
if (result.value().status != 200) {
- ALOG(E, "EtcdClient: status code " << result.value().status << " received");
- return std::vector>{};
+ throw std::runtime_error(Message("EtcdClient: Failed to poll, due to ", result.value().reason).str());
}
json response_body;
@@ -92,8 +91,7 @@ std::vector> Client::poll(std::string_view k
response_body = json::parse(std::begin(result.value().body), std::end(result.value().body));
}
catch (const json::parse_error& e) {
- ALOG(E, "EtcdClient: Failed to parse response: " << e.what());
- return std::vector>{};
+ throw std::runtime_error(Message("EtcdClient: Unable to parse response, due to ", e.what()).str());
}
std::vector> entries;
@@ -111,13 +109,13 @@ std::vector> Client::poll(std::string_view k
auto value = make_content_from(v);
if (key.raw() != key_prefix) {
- ALOG(D, "Received key+value: " << key.raw() << "+" << value.raw());
+ ALOG(D, "EtcdClient: Received key+value: " << key.raw() << "+" << value.raw());
entries.emplace_back(key.raw(), value.raw());
}
}
}
else {
- ALOG(D, "EtcdClient: No new key+value");
+ ALOG(D, "EtcdClient: No new key+value found");
}
return entries;
diff --git a/libs/service/src/ecflow/service/mirror/MirrorService.cpp b/libs/service/src/ecflow/service/mirror/MirrorService.cpp
index de88af850..695b640b8 100644
--- a/libs/service/src/ecflow/service/mirror/MirrorService.cpp
+++ b/libs/service/src/ecflow/service/mirror/MirrorService.cpp
@@ -10,11 +10,11 @@
#include "ecflow/service/mirror/MirrorService.hpp"
-#include
#include
#include
#include "ecflow/client/ClientInvoker.hpp"
+#include "ecflow/core/Message.hpp"
#include "ecflow/core/PasswordEncryption.hpp"
#include "ecflow/node/Defs.hpp"
#include "ecflow/node/Node.hpp"
@@ -25,10 +25,10 @@ namespace ecf::service::mirror {
namespace {
-std::pair load_auth_credentials(const std::filesystem::path& auth_file) {
+std::pair load_auth_credentials(const std::string& auth_file) {
std::ifstream file(auth_file);
if (!file.is_open()) {
- throw std::runtime_error("Unable to open file: " + auth_file.string());
+ throw std::runtime_error("Unable to open file: " + auth_file);
}
std::string username, password;
@@ -80,23 +80,22 @@ int MirrorClient::get_node_status(const std::string& remote_host,
if (!impl_->defs_) {
ALOG(E, "MirrorClient: unable to sync with remote defs");
- return NState::UNKNOWN;
+ throw std::runtime_error("MirrorClient: Failed to sync with remote defs");
}
auto node = impl_->defs_->findAbsNode(node_path);
if (!node) {
- ALOG(E, "MirrorClient: requested node (" << node_path << ") not found in remote defs");
- return NState::UNKNOWN;
+ throw std::runtime_error(
+ Message("MirrorClient: Unable to find requested node (", node_path, ") in remote remote defs").str());
}
auto state = node->state();
- ALOG(D, "MirrorClient: found node (" << node_path << "), with status " << state);
+ ALOG(D, "MirrorClient: found node (" << node_path << "), with state " << state);
return state;
}
catch (std::exception& e) {
- ALOG(W, "MirrorClient: failure to sync, due to: " << e.what());
- return NState::UNKNOWN;
+ throw std::runtime_error(Message("MirrorClient: failure to sync remote defs, due to: ", e.what()));
}
}
@@ -148,20 +147,26 @@ void MirrorService::operator()(const std::chrono::system_clock::time_point& now)
auto remote_pass = entry.remote_password_;
// Collect the latest remote status
- auto latest_status =
- mirror_.get_node_status(remote_host, remote_port, remote_path, ssl, remote_user, remote_pass);
-
- ALOG(D, "MirrorService: Notifying remote node state: " << latest_status);
-
- MirrorNotification notification{remote_path, latest_status};
- notify_(notification);
+ try {
+ auto latest_status =
+ mirror_.get_node_status(remote_host, remote_port, remote_path, ssl, remote_user, remote_pass);
+
+ ALOG(D, "MirrorService: Notifying remote node state: " << latest_status);
+ MirrorNotification notification{true, remote_path, "", latest_status};
+ notify_(notification);
+ }
+ catch (std::runtime_error& e) {
+ ALOG(W, "MirrorService: Failed to sync with remote node: " << e.what());
+ MirrorNotification notification{false, remote_path, e.what(), -1};
+ notify_(notification);
+ }
}
}
}
void MirrorService::register_listener(const MirrorRequest& request) {
ALOG(D, "MirrorService: Registering Mirror: {" << request.path << "}");
- Entry& inserted = listeners_.emplace_back(Entry{request});
+ Entry& inserted = listeners_.emplace_back(Entry{request, "", ""});
if (!request.auth.empty()) {
ALOG(D, "MirrorService: Loading auth {" << request.auth << "}");
try {
@@ -182,6 +187,7 @@ MirrorController::MirrorController()
ALOG(D, "MirrorController: forcing server to traverse the defs");
TheOneServer::server().increment_job_generation_count();
},
- [this]() { return this->get_subscriptions(); }} {};
+ [this]() { return this->get_subscriptions(); }} {
+}
} // namespace ecf::service::mirror
diff --git a/libs/service/src/ecflow/service/mirror/MirrorService.hpp b/libs/service/src/ecflow/service/mirror/MirrorService.hpp
index 2e80f9e1a..0ead6d6de 100644
--- a/libs/service/src/ecflow/service/mirror/MirrorService.hpp
+++ b/libs/service/src/ecflow/service/mirror/MirrorService.hpp
@@ -57,8 +57,12 @@ struct MirrorConfiguration
};
struct MirrorNotification
{
+ bool success;
std::string path;
+ std::string failure_reason;
int status;
+
+ std::string reason() const { return failure_reason.substr(0, failure_reason.find_first_of("\n")); }
};
inline std::ostream& operator<<(std::ostream& os, const MirrorNotification& n) {
diff --git a/share/ecflow/etc/ecflowview_icon_conf.json b/share/ecflow/etc/ecflowview_icon_conf.json
index 8128d5efb..b664713f9 100644
--- a/share/ecflow/etc/ecflowview_icon_conf.json
+++ b/share/ecflow/etc/ecflowview_icon_conf.json
@@ -131,6 +131,14 @@
"icon" : {
"default" : "icon_checkpt_err.svg"
}
+ },
+ "remote_err" : {
+ "label" : "Remote error",
+ "tooltip" : "Error accessing remote resource",
+ "shortDesc" : "Error accessing remote resource",
+ "icon" : {
+ "default" : "icon_remote_err.svg"
}
+ }
}
}