diff --git a/libs/node/src/ecflow/node/AvisoAttr.cpp b/libs/node/src/ecflow/node/AvisoAttr.cpp index eaa083c71..1295a31db 100644 --- a/libs/node/src/ecflow/node/AvisoAttr.cpp +++ b/libs/node/src/ecflow/node/AvisoAttr.cpp @@ -14,6 +14,7 @@ #include "ecflow/core/Ecf.hpp" #include "ecflow/core/Message.hpp" +#include "ecflow/core/Overload.hpp" #include "ecflow/core/exceptions/Exceptions.hpp" #include "ecflow/node/Node.hpp" #include "ecflow/node/Operations.hpp" @@ -93,8 +94,6 @@ void AvisoAttr::reset() { } bool AvisoAttr::isFree() const { - std::string aviso_path = path(); - ALOG(D, "AvisoAttr: check Aviso attribute (name: " << name_ << ", listener: " << listener_ << ") is free"); if (controller_ == nullptr) { @@ -102,7 +101,7 @@ bool AvisoAttr::isFree() const { } // Task associated with Attribute is free when any notification is found - auto notifications = controller_->poll_notifications(aviso_path); + auto notifications = controller_->poll_notifications(this->path()); if (notifications.empty()) { // No notifications, nothing to do -- task continues to wait @@ -112,33 +111,39 @@ bool AvisoAttr::isFree() const { // Notifications found -- task can continue // (a) get the latest revision - auto max = std::max_element(notifications.begin(), notifications.end(), [](const auto& a, const auto& b) { - return a.configuration.revision() < b.configuration.revision(); - }); + auto& back = notifications.back(); state_change_no_ = Ecf::incr_state_change_no(); // (b) update the revision, in the listener configuration - if (max->notification.success()) { - ALOG(D, "AvisoAttr::isFree: " << aviso_path << " updated revision to " << this->revision_); - this->revision_ = max->configuration.revision(); - parent_->flag().clear(Flag::REMOTE_ERROR); - parent_->flag().set_state_change_no(state_change_no_); - reason_ = ""; - - for (auto* parent = parent_; parent; parent = parent->parent()) { - parent->set_state_change_no(state_change_no_); - } - return max->notification.match().has_value(); - } - else { - parent_->flag().set(Flag::REMOTE_ERROR); - parent_->flag().set_state_change_no(state_change_no_); - reason_ = max->notification.reason(); - - ecf::visit_parents(*parent_, [n = this->state_change_no_](Node& node) { node.set_state_change_no(n); }); - return false; - } + return std::visit( + ecf::overload{ + [this](const ecf::service::aviso::NotificationPackage& response) { + ALOG(D, "AvisoAttr::isFree: " << this->path() << " updated revision to " << this->revision_); + this->revision_ = response.configuration.revision(); + parent_->flag().clear(Flag::REMOTE_ERROR); + parent_->flag().set_state_change_no(state_change_no_); + reason_ = ""; + + ecf::visit_parents(*parent_, [n = this->state_change_no_](Node& node) { node.set_state_change_no(n); }); + return true; + }, + [this](const ecf::service::aviso::AvisoNoMatch& response) { + parent_->flag().clear(Flag::REMOTE_ERROR); + parent_->flag().set_state_change_no(state_change_no_); + reason_ = ""; + return false; + }, + [this](const ecf::service::aviso::AvisoError& response) { + parent_->flag().set(Flag::REMOTE_ERROR); + parent_->flag().set_state_change_no(state_change_no_); + reason_ = response.reason(); + + ecf::visit_parents(*parent_, [n = this->state_change_no_](Node& node) { node.set_state_change_no(n); }); + return false; + }}, + back); } void AvisoAttr::start() const { @@ -185,8 +190,8 @@ void AvisoAttr::start_controller(const std::string& aviso_path, if (!controller_) { // Controller -- start up the Aviso controller, and subscribe the Aviso listener controller_ = std::make_shared(); - controller_->subscribe(ecf::service::aviso::AvisoRequest::make_listen_start( - aviso_path, aviso_listener, aviso_url, aviso_schema, polling, revision_, aviso_auth)); + controller_->subscribe(ecf::service::aviso::AvisoSubscribe{ + aviso_path, aviso_listener, aviso_url, aviso_schema, polling, revision_, aviso_auth}); // Controller -- effectively start the Aviso listener // n.b. this must be done after subscribing in the controller, so that the polling interval is set controller_->start(); @@ -197,7 +202,7 @@ void AvisoAttr::stop_controller(const std::string& aviso_path) const { if (controller_ != nullptr) { ALOG(D, "AvisoAttr: finishing polling for Aviso attribute (" << parent_path_ << ":" << name_ << ")"); - controller_->unsubscribe(ecf::service::aviso::AvisoRequest::make_listen_finish(aviso_path)); + controller_->subscribe(ecf::service::aviso::AvisoUnsubscribe{aviso_path}); // Controller -- shutdown up the Aviso controller controller_->stop(); diff --git a/libs/node/src/ecflow/node/MirrorAttr.cpp b/libs/node/src/ecflow/node/MirrorAttr.cpp index da4ebf798..9a56a96a7 100644 --- a/libs/node/src/ecflow/node/MirrorAttr.cpp +++ b/libs/node/src/ecflow/node/MirrorAttr.cpp @@ -14,6 +14,7 @@ #include "ecflow/core/Ecf.hpp" #include "ecflow/core/Message.hpp" +#include "ecflow/core/Overload.hpp" #include "ecflow/core/exceptions/Exceptions.hpp" #include "ecflow/node/Node.hpp" #include "ecflow/node/Operations.hpp" @@ -89,23 +90,26 @@ void MirrorAttr::mirror() { state_change_no_ = Ecf::incr_state_change_no(); // Notifications found -- Node state to be updated or error to be reported - if (auto& notification = notifications.back(); notification.success) { - ALOG(D, "MirrorAttr: Updating Mirror attribute (name: " << name_ << ") to state " << notification.status); - auto latest_state = static_cast(notification.status); - reason_ = ""; - parent_->flag().clear(Flag::REMOTE_ERROR); - parent_->flag().set_state_change_no(state_change_no_); - parent_->setStateOnly(latest_state, true); - } - else { - ALOG(D, - "MirrorAttr: Failure detected on Mirror attribute (name: " << name_ << ") due to " - << notification.path); - reason_ = notification.reason(); - parent_->flag().set(Flag::REMOTE_ERROR); - parent_->flag().set_state_change_no(state_change_no_); - parent_->setStateOnly(NState::UNKNOWN, true); - } + std::visit(ecf::overload{[this](const service::mirror::MirrorNotification& notification) { + ALOG(D, + "MirrorAttr: Updating Mirror attribute (name: " << name_ << ") to state " + << notification.status); + auto latest_state = static_cast(notification.status); + reason_ = ""; + parent_->flag().clear(Flag::REMOTE_ERROR); + parent_->flag().set_state_change_no(state_change_no_); + parent_->setStateOnly(latest_state, true); + }, + [this](const service::mirror::MirrorError& error) { + ALOG(D, + "MirrorAttr: Failure detected on Mirror attribute (name: " + << name_ << ") due to " << error.reason()); + reason_ = error.reason(); + parent_->flag().set(Flag::REMOTE_ERROR); + parent_->flag().set_state_change_no(state_change_no_); + parent_->setStateOnly(NState::UNKNOWN, true); + }}, + notifications.back()); // Propagate the 'local' state change number to all parents ecf::visit_parents(*parent_, [n = this->state_change_no_](Node& node) { node.set_state_change_no(n); }); diff --git a/libs/service/src/ecflow/service/Controller.hpp b/libs/service/src/ecflow/service/Controller.hpp index 6cf641d6c..ec9520904 100644 --- a/libs/service/src/ecflow/service/Controller.hpp +++ b/libs/service/src/ecflow/service/Controller.hpp @@ -23,11 +23,11 @@ namespace ecf::service { template class Controller { public: - using subscription_t = typename Service::subscription_t; + using service_t = Service; + using subscription_t = typename service_t::subscription_t; using subscriptions_t = std::vector; using notification_t = typename Service::notification_t; using notifications_t = std::vector; - using service_t = Service; template explicit Controller(Args&&... args) : running_{std::forward(args)...} {} @@ -40,12 +40,6 @@ class Controller { subscriptions_.push_back(s); } - void unsubscribe(const subscription_t& s) { - ALOG(D, "Controller: unsubscribe " << s); - std::scoped_lock lock(subscribe_); - subscriptions_.push_back(s); - } - notifications_t poll_notifications(const std::string& name) { ALOG(D, "Controller: poll_notifications for " << name); @@ -78,12 +72,13 @@ class Controller { std::scoped_lock lock(notify_); - const auto& key = notification.path; - if (auto found = notifications_.find(key); found != notifications_.end()) { - found->second.push_back(notification); - } - else { - notifications_[key] = {notification}; + if (const auto& key = service_t::key(notification); key.has_value()) { + if (auto found = notifications_.find(key.value()); found != notifications_.end()) { + found->second.push_back(notification); + } + else { + notifications_[key.value()] = {notification}; + } } } diff --git a/libs/service/src/ecflow/service/aviso/Aviso.cpp b/libs/service/src/ecflow/service/aviso/Aviso.cpp index 80fdf706e..2c5f130d5 100644 --- a/libs/service/src/ecflow/service/aviso/Aviso.cpp +++ b/libs/service/src/ecflow/service/aviso/Aviso.cpp @@ -19,46 +19,67 @@ #include +#include "ecflow/core/Overload.hpp" #include "ecflow/service/Log.hpp" namespace ecf::service::aviso { -/* std::variant visitor utility */ +/* AvisoSubscribe */ -template -struct Overload : Ts... -{ - using Ts::operator()...; -}; +std::ostream& operator<<(std::ostream& os, const AvisoSubscribe& request) { + os << "AvisoSubscribe{"; + os << "path: " << request.path(); + os << ", listener_cfg: " << request.listener_cfg(); + os << ", address: " << request.address(); + os << ", schema: " << request.schema(); + os << ", polling: " << request.polling(); + os << ", revision: " << request.revision(); + os << "}"; + return os; +} -template -Overload(Ts...) -> Overload; // required in C++17, no longer required in C++20 +/* AvisoUnsubscribe */ + +std::ostream& operator<<(std::ostream& os, const AvisoUnsubscribe& request) { + os << "AvisoUnsubscribe{"; + os << "path: " << request.path(); + os << "}"; + return os; +} /* AvisoRequest */ std::ostream& operator<<(std::ostream& os, const AvisoRequest& request) { - os << "ListenRequest{"; - os << "start: " << request.is_start(); - os << ", path: " << request.path(); - if (request.is_start()) { - os << ", listener_cfg: " << request.listener_cfg(); - os << ", address: " << request.address(); - os << ", schema: " << request.schema(); - os << ", polling: " << request.polling(); - os << ", revision: " << request.revision(); - } - os << "}"; + std::visit( + ecf::overload{[&os](const AvisoSubscribe& r) { os << r; }, [&os](const AvisoUnsubscribe& r) { os << r; }}, + request); return os; } -/* Notification */ +/* AvisoNotification */ std::ostream& operator<<(std::ostream& os, const AvisoNotification& notification) { - os << "success: " << notification.success(); - if (const auto& match = notification.match(); match) { - auto& m = match.value(); - os << m.key() << " = " << m.value() << " (revision: " << m.revision() << ")"; - } + os << "AvisoNotification{"; + os << "key: " << notification.key(); + os << ", value: " << notification.value(); + os << ", revision: " << notification.revision(); + os << "}"; + return os; +} + +/* AvisoNoMatch */ + +std::ostream& operator<<(std::ostream& os, const AvisoNoMatch& no_match) { + os << "AvisoNoMatch{}"; + return os; +} + +/* AvisoError */ + +std::ostream& operator<<(std::ostream& os, const AvisoError& error) { + os << "AvisoError{"; + os << "reason: " << error.reason(); + os << "}"; return os; } @@ -68,7 +89,7 @@ std::ostream& operator<<(std::ostream& os, const AvisoNotification& notification /* ConfiguredListener */ -ConfiguredListener ConfiguredListener::make_configured_listener(const AvisoRequest& listen_request) { +ConfiguredListener ConfiguredListener::make_configured_listener(const AvisoSubscribe& listen_request) { using json = nlohmann::ordered_json; json data; @@ -235,13 +256,13 @@ ConfiguredListener::accepts(const std::string& key, const std::string& value, ui auto actual = v; auto validate = - Overload{[&actual](const std::string& expected) { return expected == actual; }, - [&actual](std::int64_t expected) { return std::to_string(expected) == actual; }, - [&actual](const std::vector& valid) { - return std::any_of(valid.begin(), valid.end(), [&actual](const auto& expected) { - return expected == actual; - }); - }}; + ecf::overload{[&actual](const std::string& expected) { return expected == actual; }, + [&actual](std::int64_t expected) { return std::to_string(expected) == actual; }, + [&actual](const std::vector& valid) { + return std::any_of(valid.begin(), valid.end(), [&actual](const auto& expected) { + return expected == actual; + }); + }}; applicable = std::visit(validate, found->second); if (!applicable) { @@ -254,7 +275,7 @@ ConfiguredListener::accepts(const std::string& key, const std::string& value, ui if (applicable) { AvisoNotification notification{key, value, revision}; for (const auto& [k, v] : actual_parameters) { - notification.match()->add_parameter(k, v); + notification.add_parameter(k, v); } ALOG(D, "Aviso: Match [✓] --> " << key << " = " << value << " (revision: " << revision << ")"); return notification; diff --git a/libs/service/src/ecflow/service/aviso/Aviso.hpp b/libs/service/src/ecflow/service/aviso/Aviso.hpp index 304f5324a..c2f90c68e 100644 --- a/libs/service/src/ecflow/service/aviso/Aviso.hpp +++ b/libs/service/src/ecflow/service/aviso/Aviso.hpp @@ -25,28 +25,30 @@ namespace ecf::service::aviso { /** * - * An AvisoRequest is a request to start or stop listening for notifications based a given configuration. + * An AvisoSubscribe is a request to start listening for notifications based a given configuration. * * This class holds information used to fully configure an Aviso Listener, including: * - the listener configuration, * - the address of the ETCD server, * - the schema used to specify the notifications - * */ -class AvisoRequest { + * + */ +class AvisoSubscribe { public: - static AvisoRequest make_listen_start(std::string_view path, - std::string_view listener_cfg, - std::string_view address, - std::string_view schema, - uint32_t polling, - uint64_t revision, - std::string_view auth) { - return AvisoRequest{true, path, listener_cfg, address, schema, polling, revision, auth}; - } - - static AvisoRequest make_listen_finish(std::string_view path) { return AvisoRequest{false, path}; } - - bool is_start() const { return start_; } + explicit AvisoSubscribe(std::string_view path, + std::string_view listener_cfg, + std::string_view address, + std::string_view schema, + uint32_t polling, + uint64_t revision, + std::string_view auth) + : path_{path}, + listener_cfg_{listener_cfg}, + address_{address}, + schema_{schema}, + polling_{polling}, + revision_{revision}, + auth_{auth} {} const std::string& path() const { return path_; } const std::string& listener_cfg() const { return listener_cfg_; } @@ -56,33 +58,9 @@ class AvisoRequest { uint64_t revision() const { return revision_; } const std::string& auth() const { return auth_; } -private: - explicit AvisoRequest(bool start, std::string_view path) - : start_{start}, - path_{path}, - listener_cfg_{}, - address_{}, - schema_{}, - revision_{0}, - auth_{} {} - explicit AvisoRequest(bool start, - std::string_view path, - std::string_view listener_cfg, - std::string_view address, - std::string_view schema, - uint32_t polling, - uint64_t revision, - std::string_view auth) - : start_{start}, - path_{path}, - listener_cfg_{listener_cfg}, - address_{address}, - schema_{schema}, - polling_{polling}, - revision_{revision}, - auth_{auth} {} + friend std::ostream& operator<<(std::ostream&, const AvisoSubscribe&); - bool start_; +private: std::string path_; std::string listener_cfg_; std::string address_; @@ -92,55 +70,90 @@ class AvisoRequest { std::string auth_; }; +/** + * + * An AvisoUnsubscribe is a request to stop listening for notifications. + * + */ +class AvisoUnsubscribe { +public: + explicit AvisoUnsubscribe(std::string_view path) : path_{path} {} + + const std::string& path() const { return path_; } + + friend std::ostream& operator<<(std::ostream& os, const AvisoUnsubscribe& request); + +private: + std::string path_; +}; + +/** + * + * An AvisoRequest allows to either subscribe or unsubscribe to notifications. + * + * n.b. This request is sent from the main thread to the worker thread. + * + */ +using AvisoRequest = std::variant; + std::ostream& operator<<(std::ostream& os, const AvisoRequest& request); /** * - * A AvisoNotification represents a notification of a change in a key-value pair in ETCD. + * An AvisoNotification represents a notification of a match found related to a key-value pair in ETCD. * - * */ + */ class AvisoNotification { -public: - struct Match - { - Match(std::string_view key, std::string_view value, uint64_t revision) - : key_{key}, - value_{value}, - revision_{revision}, - parameters_{} {} - - std::string_view key() const { return key_; } - std::string_view value() const { return value_; } - uint64_t revision() const { return revision_; } - - void add_parameter(const std::string& parameter, const std::string& value) { - parameters_.emplace_back(parameter, value); - } - - std::string key_; - std::string value_; - uint64_t revision_; - std::vector> parameters_{}; - }; - public: AvisoNotification() = default; - AvisoNotification(std::string_view reason) : success_{false}, match_{}, reason_{reason} {} AvisoNotification(std::string_view key, std::string_view value, uint64_t revision) - : success_{true}, - match_{std::make_optional(key, value, revision)} {} + : key_{key}, + value_{value}, + revision_{revision}, + parameters_{} {} + + std::string_view key() const { return key_; } + std::string_view value() const { return value_; } + uint64_t revision() const { return revision_; } + + void add_parameter(const std::string& parameter, const std::string& value) { + parameters_.emplace_back(parameter, value); + } - [[nodiscard]] bool success() const { return success_; } - [[nodiscard]] std::optional match() const { return match_; } - [[nodiscard]] std::string reason() const { return reason_; } + friend std::ostream& operator<<(std::ostream&, const AvisoNotification&); private: - bool success_{true}; - std::optional match_{}; - std::string reason_{}; + std::string key_; + std::string value_; + uint64_t revision_; + std::vector> parameters_{}; +}; + +/** + * + * An AvisoNoMatch represents a notification of successfull polling without any match found. + * + */ +class AvisoNoMatch { + friend std::ostream& operator<<(std::ostream&, const AvisoNoMatch&); }; -std::ostream& operator<<(std::ostream& os, const AvisoNotification& notification); +/** + * + * An AvisoError represents a notification of an error found during polling. + * + */ +class AvisoError { +public: + explicit AvisoError(std::string_view reason) : reason_{reason} {} + + [[nodiscard]] std::string_view reason() const { return reason_; } + + friend std::ostream& operator<<(std::ostream&, const AvisoError&); + +private: + std::string reason_; +}; /** * A Listener represents an Aviso Listener, loaded from the Schema with placeholders as part of the `base` and @@ -181,7 +194,7 @@ class ConfiguredListener : private Listener { * @param request The AvisoRequest to create the ConfiguredListener from. * @return The ConfiguredListener. */ - static ConfiguredListener make_configured_listener(const AvisoRequest& request); + static ConfiguredListener make_configured_listener(const AvisoSubscribe& request); public: ConfiguredListener(ecf::service::aviso::etcd::Address address, @@ -215,6 +228,8 @@ class ConfiguredListener : private Listener { std::optional accepts(const std::string& key, const std::string& value, uint64_t revision) const; + friend std::ostream& operator<<(std::ostream& os, const ConfiguredListener& listener); + private: std::string path_; ecf::service::aviso::etcd::Address address_; @@ -226,8 +241,6 @@ class ConfiguredListener : private Listener { std::unordered_map parameters_ = {}; }; -std::ostream& operator<<(std::ostream& os, const ConfiguredListener& listener); - /** * A ListenerSchema is the specification of available Listeners. * diff --git a/libs/service/src/ecflow/service/aviso/AvisoService.cpp b/libs/service/src/ecflow/service/aviso/AvisoService.cpp index cbabd846e..b5c7f097e 100644 --- a/libs/service/src/ecflow/service/aviso/AvisoService.cpp +++ b/libs/service/src/ecflow/service/aviso/AvisoService.cpp @@ -10,6 +10,7 @@ #include "ecflow/service/aviso/AvisoService.hpp" +#include "ecflow/core/Overload.hpp" #include "ecflow/service/Registry.hpp" #include "ecflow/service/aviso/etcd/Client.hpp" @@ -32,18 +33,34 @@ std::string load_authentication_credential(const std::string& auth_file) { } // namespace +std::ostream& operator<<(std::ostream& os, const AvisoResponse& r) { + std::visit(ecf::overload{[&os](const NotificationPackage& p) { os << p; }, + [&os](const AvisoNoMatch& a) { os << a; }, + [&os](const AvisoError& a) { os << a; }}, + r); + return os; +} + +std::optional AvisoService::key(const AvisoResponse& notification) { + return std::visit(ecf::overload{[](const NotificationPackage& p) { + return std::make_optional(p.path); + }, + [](const AvisoNoMatch&) { return std::optional{}; }, + [](const AvisoError&) { return std::optional{}; }}, + notification); +} + void AvisoService::start() { // Update list of listeners auto new_subscriptions = subscribe_(); for (auto&& subscription : new_subscriptions) { - if (subscription.is_start()) { - register_listener(subscription); - } - else { - unregister_listener(subscription.path()); - } + std::visit(ecf::overload{[this](const AvisoSubscribe& subscription) { this->register_listener(subscription); }, + [this](const AvisoUnsubscribe& subscription) { + this->unregister_listener(subscription.path()); + }}, + subscription); } // Start polling... @@ -68,12 +85,11 @@ void AvisoService::operator()(const std::chrono::system_clock::time_point& now) auto new_subscriptions = subscribe_(); for (auto&& subscription : new_subscriptions) { - if (subscription.is_start()) { - register_listener(subscription); - } - else { - unregister_listener(subscription.path()); - } + std::visit(ecf::overload{[this](const AvisoSubscribe& subscription) { this->register_listener(subscription); }, + [this](const AvisoUnsubscribe& subscription) { + this->unregister_listener(subscription.path()); + }}, + subscription); } // Check notification for each listener @@ -93,8 +109,7 @@ void AvisoService::operator()(const std::chrono::system_clock::time_point& now) updated_keys = client.poll(entry.prefix(), entry.listener().revision() + 1); } catch (const std::exception& e) { - notification_t n{std::string{entry.path()}, entry.listener(), AvisoNotification{e.what()}}; - notify_(n); // Notification regarding failure to contact the server + notify_(AvisoError(e.what())); // Notification regarding failure to contact the server return; } @@ -111,21 +126,21 @@ void AvisoService::operator()(const std::chrono::system_clock::time_point& now) if (auto notification = entry.listener().accepts(key, value, entry.listener().revision()); notification) { - notification_t n{std::string{entry.path()}, entry.listener(), *notification}; + NotificationPackage n{ + std::string{entry.path()}, entry.listener(), *notification}; notify_(n); // Notification regarding a successful match matched = true; } } if (!matched) { - notification_t n{std::string{entry.path()}, entry.listener(), AvisoNotification{}}; - notify_(n); // Notification regarding no match + notify_(AvisoNoMatch{}); // Notification regarding no match } } } } -void AvisoService::register_listener(const AvisoRequest& listen) { +void AvisoService::register_listener(const AvisoSubscribe& listen) { auto listener = ConfiguredListener::make_configured_listener(listen); auto address = listener.address(); auto key_prefix = listener.prefix(); diff --git a/libs/service/src/ecflow/service/aviso/AvisoService.hpp b/libs/service/src/ecflow/service/aviso/AvisoService.hpp index f22270a6a..2cee91286 100644 --- a/libs/service/src/ecflow/service/aviso/AvisoService.hpp +++ b/libs/service/src/ecflow/service/aviso/AvisoService.hpp @@ -12,6 +12,7 @@ #define ecflow_service_AvisoService_HPP #include +#include #include #include #include @@ -32,11 +33,16 @@ struct NotificationPackage Notification notification; }; +using AvisoResponse = + std::variant, AvisoNoMatch, AvisoError>; + template inline std::ostream& operator<<(std::ostream& os, const NotificationPackage& p) { return os << "NotificationPackage{" << p.path << ", " << p.configuration << ", " << p.notification << "}"; } +std::ostream& operator<<(std::ostream& os, const AvisoResponse& r); + class AvisoService { public: using address_t = aviso::etcd::Address; @@ -45,7 +51,7 @@ class AvisoService { using listener_t = ConfiguredListener; using revision_t = int64_t; - using notification_t = NotificationPackage; + using notification_t = AvisoResponse; using subscription_t = AvisoRequest; using subscriptions_t = std::vector; @@ -70,6 +76,8 @@ class AvisoService { using notify_callback_t = std::function; using subscribe_callback_t = std::function; + static std::optional key(const notification_t& notification); + AvisoService(notify_callback_t notify, subscribe_callback_t subscribe) : executor_{[this](const std::chrono::system_clock::time_point& now) { this->operator()(now); }}, listeners_{}, @@ -88,7 +96,7 @@ class AvisoService { void operator()(const std::chrono::system_clock::time_point& now); private: - void register_listener(const AvisoRequest& request); + void register_listener(const AvisoSubscribe& request); void unregister_listener(const std::string& unlisten_path); executor::PeriodicTaskExecutor> executor_; @@ -111,7 +119,6 @@ class AvisoController : private Controller { using base_t::stop; using base_t::subscribe; using base_t::terminate; - using base_t::unsubscribe; }; } // namespace ecf::service::aviso diff --git a/libs/service/src/ecflow/service/mirror/MirrorService.cpp b/libs/service/src/ecflow/service/mirror/MirrorService.cpp index 695b640b8..f4ad32b33 100644 --- a/libs/service/src/ecflow/service/mirror/MirrorService.cpp +++ b/libs/service/src/ecflow/service/mirror/MirrorService.cpp @@ -15,6 +15,7 @@ #include "ecflow/client/ClientInvoker.hpp" #include "ecflow/core/Message.hpp" +#include "ecflow/core/Overload.hpp" #include "ecflow/core/PasswordEncryption.hpp" #include "ecflow/node/Defs.hpp" #include "ecflow/node/Node.hpp" @@ -41,12 +42,22 @@ std::pair load_auth_credentials(const std::string& aut } // namespace +/* MirrorClient */ + struct MirrorClient::Impl { std::shared_ptr defs_; ClientInvoker invoker_; }; +std::optional MirrorService::key(const MirrorService::notification_t& notification) { + return std::visit(ecf::overload{[](const service::mirror::MirrorNotification& notification) { + return std::make_optional(notification.path); + }, + [](const service::mirror::MirrorError&) { return std::optional{}; }}, + notification); +} + MirrorClient::MirrorClient() : impl_(std::make_unique()) { } @@ -99,6 +110,42 @@ int MirrorClient::get_node_status(const std::string& remote_host, } } +/* MirrorRequest */ + +std::ostream& operator<<(std::ostream& os, const MirrorRequest& r) { + os << "MirrorRequest{"; + os << "path=" << r.path << ", "; + os << "host=" << r.host << ", "; + os << "port=" << r.port << ", "; + os << "polling=" << r.polling << ", "; + os << "ssl=" << r.ssl << ", "; + os << "auth=" << r.auth << "}"; + return os; +} + +/* MirrorNotification */ + +std::ostream& operator<<(std::ostream& os, const MirrorNotification& n) { + os << "MirrorNotification{" << n.path << ", " << n.status << ", " << n.reason() << "}"; + return os; +} + +/* MirrorError */ + +std::ostream& operator<<(std::ostream& os, const MirrorError& n) { + os << "MirrorError{reason = " << n.reason() << "}"; + return os; +} + +/* MirrorResponse */ + +std::ostream& operator<<(std::ostream& os, const MirrorResponse& r) { + std::visit([&os](const auto& v) { os << v; }, r); + return os; +} + +/* MirrorService */ + void MirrorService::start() { // Update list of listeners diff --git a/libs/service/src/ecflow/service/mirror/MirrorService.hpp b/libs/service/src/ecflow/service/mirror/MirrorService.hpp index 0ead6d6de..b68b1e4b2 100644 --- a/libs/service/src/ecflow/service/mirror/MirrorService.hpp +++ b/libs/service/src/ecflow/service/mirror/MirrorService.hpp @@ -33,10 +33,14 @@ class MirrorClient { private: struct Impl; - mutable std::unique_ptr impl_; }; +/** + * + * A Mirror Request is a request to start listening for state notifications from remote ecFlow server. + * + */ class MirrorRequest { public: std::string path; @@ -45,16 +49,15 @@ class MirrorRequest { std::uint32_t polling; bool ssl; std::string auth; -}; -inline std::ostream& operator<<(std::ostream& os, const MirrorRequest& r) { - return os << "MirrorRequest{}"; -} - -struct MirrorConfiguration -{ - std::string path; + friend std::ostream& operator<<(std::ostream&, const MirrorRequest&); }; + +/** + * + * A MirrorNotification is a notification of a state change in a remote ecFlow server. + * + */ struct MirrorNotification { bool success; @@ -63,15 +66,45 @@ struct MirrorNotification int status; std::string reason() const { return failure_reason.substr(0, failure_reason.find_first_of("\n")); } + + friend std::ostream& operator<<(std::ostream&, const MirrorNotification&); +}; + +/** + * + * A MirrorError is a notification that an error occurred when contacting a remote ecFlow server. + * + */ +class MirrorError { +public: + explicit MirrorError(std::string_view reason) : reason_{reason.substr(0, reason.find_first_of("\n"))} {} + + const std::string& reason() const { return reason_; } + + friend std::ostream& operator<<(std::ostream&, const MirrorError&); + +private: + std::string reason_; }; -inline std::ostream& operator<<(std::ostream& os, const MirrorNotification& n) { - return os << "MirrorNotification{}"; -} +/** + * + * A MirrorResponse is notification of either state change or an error. + * + */ +using MirrorResponse = std::variant; +std::ostream& operator<<(std::ostream&, const MirrorResponse&); + +/** + * + * The MirrorService is a service (i.e. an object that executes a background worker thread) + * that listens for state changes in remote ecFlow servers. + * + */ class MirrorService { public: - using notification_t = MirrorNotification; + using notification_t = MirrorResponse; using subscription_t = MirrorRequest; using subscriptions_t = std::vector; @@ -86,6 +119,8 @@ class MirrorService { using notify_callback_t = std::function; using subscribe_callback_t = std::function; + static std::optional key(const notification_t& notification); + MirrorService(notify_callback_t notify, subscribe_callback_t subscribe) : executor_{[this](const std::chrono::system_clock::time_point& now) { this->operator()(now); }}, listeners_{}, @@ -116,6 +151,12 @@ class MirrorService { MirrorClient mirror_; }; +/** + * + * The MirrorController is a controller for the MirrorService, + * and essentially defines the handlers for subscription and notification. + * + */ class MirrorController : public Controller { public: using base_t = Controller; @@ -127,7 +168,6 @@ class MirrorController : public Controller { using base_t::stop; using base_t::subscribe; using base_t::terminate; - using base_t::unsubscribe; }; } // namespace ecf::service::mirror