Skip to content

Commit

Permalink
Split Request/Response requests between main & worker threads
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosbento committed May 29, 2024
1 parent 66002d7 commit cc241ff
Show file tree
Hide file tree
Showing 9 changed files with 356 additions and 209 deletions.
63 changes: 34 additions & 29 deletions libs/node/src/ecflow/node/AvisoAttr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "ecflow/core/Ecf.hpp"
#include "ecflow/core/Message.hpp"
#include "ecflow/core/Overload.hpp"
#include "ecflow/core/exceptions/Exceptions.hpp"
#include "ecflow/node/Node.hpp"
#include "ecflow/node/Operations.hpp"
Expand Down Expand Up @@ -93,16 +94,14 @@ void AvisoAttr::reset() {
}

bool AvisoAttr::isFree() const {
std::string aviso_path = path();

ALOG(D, "AvisoAttr: check Aviso attribute (name: " << name_ << ", listener: " << listener_ << ") is free");

if (controller_ == nullptr) {
return false;
}

// Task associated with Attribute is free when any notification is found
auto notifications = controller_->poll_notifications(aviso_path);
auto notifications = controller_->poll_notifications(this->path());

if (notifications.empty()) {
// No notifications, nothing to do -- task continues to wait
Expand All @@ -112,33 +111,39 @@ bool AvisoAttr::isFree() const {
// Notifications found -- task can continue

// (a) get the latest revision
auto max = std::max_element(notifications.begin(), notifications.end(), [](const auto& a, const auto& b) {
return a.configuration.revision() < b.configuration.revision();
});
auto& back = notifications.back();

state_change_no_ = Ecf::incr_state_change_no();

// (b) update the revision, in the listener configuration
if (max->notification.success()) {
ALOG(D, "AvisoAttr::isFree: " << aviso_path << " updated revision to " << this->revision_);
this->revision_ = max->configuration.revision();
parent_->flag().clear(Flag::REMOTE_ERROR);
parent_->flag().set_state_change_no(state_change_no_);
reason_ = "";

for (auto* parent = parent_; parent; parent = parent->parent()) {
parent->set_state_change_no(state_change_no_);
}
return max->notification.match().has_value();
}
else {
parent_->flag().set(Flag::REMOTE_ERROR);
parent_->flag().set_state_change_no(state_change_no_);
reason_ = max->notification.reason();

ecf::visit_parents(*parent_, [n = this->state_change_no_](Node& node) { node.set_state_change_no(n); });
return false;
}
return std::visit(
ecf::overload{
[this](const ecf::service::aviso::NotificationPackage<service::aviso::ConfiguredListener,
service::aviso::AvisoNotification>& response) {
ALOG(D, "AvisoAttr::isFree: " << this->path() << " updated revision to " << this->revision_);
this->revision_ = response.configuration.revision();
parent_->flag().clear(Flag::REMOTE_ERROR);
parent_->flag().set_state_change_no(state_change_no_);
reason_ = "";

ecf::visit_parents(*parent_, [n = this->state_change_no_](Node& node) { node.set_state_change_no(n); });
return true;
},
[this](const ecf::service::aviso::AvisoNoMatch& response) {
parent_->flag().clear(Flag::REMOTE_ERROR);
parent_->flag().set_state_change_no(state_change_no_);
reason_ = "";
return false;
},
[this](const ecf::service::aviso::AvisoError& response) {
parent_->flag().set(Flag::REMOTE_ERROR);
parent_->flag().set_state_change_no(state_change_no_);
reason_ = response.reason();

ecf::visit_parents(*parent_, [n = this->state_change_no_](Node& node) { node.set_state_change_no(n); });
return false;
}},
back);
}

void AvisoAttr::start() const {
Expand Down Expand Up @@ -185,8 +190,8 @@ void AvisoAttr::start_controller(const std::string& aviso_path,
if (!controller_) {
// Controller -- start up the Aviso controller, and subscribe the Aviso listener
controller_ = std::make_shared<controller_t>();
controller_->subscribe(ecf::service::aviso::AvisoRequest::make_listen_start(
aviso_path, aviso_listener, aviso_url, aviso_schema, polling, revision_, aviso_auth));
controller_->subscribe(ecf::service::aviso::AvisoSubscribe{
aviso_path, aviso_listener, aviso_url, aviso_schema, polling, revision_, aviso_auth});
// Controller -- effectively start the Aviso listener
// n.b. this must be done after subscribing in the controller, so that the polling interval is set
controller_->start();
Expand All @@ -197,7 +202,7 @@ void AvisoAttr::stop_controller(const std::string& aviso_path) const {
if (controller_ != nullptr) {
ALOG(D, "AvisoAttr: finishing polling for Aviso attribute (" << parent_path_ << ":" << name_ << ")");

controller_->unsubscribe(ecf::service::aviso::AvisoRequest::make_listen_finish(aviso_path));
controller_->subscribe(ecf::service::aviso::AvisoUnsubscribe{aviso_path});

// Controller -- shutdown up the Aviso controller
controller_->stop();
Expand Down
38 changes: 21 additions & 17 deletions libs/node/src/ecflow/node/MirrorAttr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "ecflow/core/Ecf.hpp"
#include "ecflow/core/Message.hpp"
#include "ecflow/core/Overload.hpp"
#include "ecflow/core/exceptions/Exceptions.hpp"
#include "ecflow/node/Node.hpp"
#include "ecflow/node/Operations.hpp"
Expand Down Expand Up @@ -89,23 +90,26 @@ void MirrorAttr::mirror() {
state_change_no_ = Ecf::incr_state_change_no();

// Notifications found -- Node state to be updated or error to be reported
if (auto& notification = notifications.back(); notification.success) {
ALOG(D, "MirrorAttr: Updating Mirror attribute (name: " << name_ << ") to state " << notification.status);
auto latest_state = static_cast<NState::State>(notification.status);
reason_ = "";
parent_->flag().clear(Flag::REMOTE_ERROR);
parent_->flag().set_state_change_no(state_change_no_);
parent_->setStateOnly(latest_state, true);
}
else {
ALOG(D,
"MirrorAttr: Failure detected on Mirror attribute (name: " << name_ << ") due to "
<< notification.path);
reason_ = notification.reason();
parent_->flag().set(Flag::REMOTE_ERROR);
parent_->flag().set_state_change_no(state_change_no_);
parent_->setStateOnly(NState::UNKNOWN, true);
}
std::visit(ecf::overload{[this](const service::mirror::MirrorNotification& notification) {
ALOG(D,
"MirrorAttr: Updating Mirror attribute (name: " << name_ << ") to state "
<< notification.status);
auto latest_state = static_cast<NState::State>(notification.status);
reason_ = "";
parent_->flag().clear(Flag::REMOTE_ERROR);
parent_->flag().set_state_change_no(state_change_no_);
parent_->setStateOnly(latest_state, true);
},
[this](const service::mirror::MirrorError& error) {
ALOG(D,
"MirrorAttr: Failure detected on Mirror attribute (name: "
<< name_ << ") due to " << error.reason());
reason_ = error.reason();
parent_->flag().set(Flag::REMOTE_ERROR);
parent_->flag().set_state_change_no(state_change_no_);
parent_->setStateOnly(NState::UNKNOWN, true);
}},
notifications.back());

// Propagate the 'local' state change number to all parents
ecf::visit_parents(*parent_, [n = this->state_change_no_](Node& node) { node.set_state_change_no(n); });
Expand Down
23 changes: 9 additions & 14 deletions libs/service/src/ecflow/service/Controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ namespace ecf::service {
template <typename Service>
class Controller {
public:
using subscription_t = typename Service::subscription_t;
using service_t = Service;
using subscription_t = typename service_t::subscription_t;
using subscriptions_t = std::vector<subscription_t>;
using notification_t = typename Service::notification_t;
using notifications_t = std::vector<notification_t>;
using service_t = Service;

template <typename... Args>
explicit Controller(Args&&... args) : running_{std::forward<Args>(args)...} {}
Expand All @@ -40,12 +40,6 @@ class Controller {
subscriptions_.push_back(s);
}

void unsubscribe(const subscription_t& s) {
ALOG(D, "Controller: unsubscribe " << s);
std::scoped_lock lock(subscribe_);
subscriptions_.push_back(s);
}

notifications_t poll_notifications(const std::string& name) {
ALOG(D, "Controller: poll_notifications for " << name);

Expand Down Expand Up @@ -78,12 +72,13 @@ class Controller {

std::scoped_lock lock(notify_);

const auto& key = notification.path;
if (auto found = notifications_.find(key); found != notifications_.end()) {
found->second.push_back(notification);
}
else {
notifications_[key] = {notification};
if (const auto& key = service_t::key(notification); key.has_value()) {
if (auto found = notifications_.find(key.value()); found != notifications_.end()) {
found->second.push_back(notification);
}
else {
notifications_[key.value()] = {notification};
}
}
}

Expand Down
89 changes: 55 additions & 34 deletions libs/service/src/ecflow/service/aviso/Aviso.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,67 @@

#include <nlohmann/json.hpp>

#include "ecflow/core/Overload.hpp"
#include "ecflow/service/Log.hpp"

namespace ecf::service::aviso {

/* std::variant visitor utility */
/* AvisoSubscribe */

template <typename... Ts>
struct Overload : Ts...
{
using Ts::operator()...;
};
std::ostream& operator<<(std::ostream& os, const AvisoSubscribe& request) {
os << "AvisoSubscribe{";
os << "path: " << request.path();
os << ", listener_cfg: " << request.listener_cfg();
os << ", address: " << request.address();
os << ", schema: " << request.schema();
os << ", polling: " << request.polling();
os << ", revision: " << request.revision();
os << "}";
return os;
}

template <class... Ts>
Overload(Ts...) -> Overload<Ts...>; // required in C++17, no longer required in C++20
/* AvisoUnsubscribe */

std::ostream& operator<<(std::ostream& os, const AvisoUnsubscribe& request) {
os << "AvisoUnsubscribe{";
os << "path: " << request.path();
os << "}";
return os;
}

/* AvisoRequest */

std::ostream& operator<<(std::ostream& os, const AvisoRequest& request) {
os << "ListenRequest{";
os << "start: " << request.is_start();
os << ", path: " << request.path();
if (request.is_start()) {
os << ", listener_cfg: " << request.listener_cfg();
os << ", address: " << request.address();
os << ", schema: " << request.schema();
os << ", polling: " << request.polling();
os << ", revision: " << request.revision();
}
os << "}";
std::visit(
ecf::overload{[&os](const AvisoSubscribe& r) { os << r; }, [&os](const AvisoUnsubscribe& r) { os << r; }},
request);
return os;
}

/* Notification */
/* AvisoNotification */

std::ostream& operator<<(std::ostream& os, const AvisoNotification& notification) {
os << "success: " << notification.success();
if (const auto& match = notification.match(); match) {
auto& m = match.value();
os << m.key() << " = " << m.value() << " (revision: " << m.revision() << ")";
}
os << "AvisoNotification{";
os << "key: " << notification.key();
os << ", value: " << notification.value();
os << ", revision: " << notification.revision();
os << "}";
return os;
}

/* AvisoNoMatch */

std::ostream& operator<<(std::ostream& os, const AvisoNoMatch& no_match) {
os << "AvisoNoMatch{}";
return os;
}

/* AvisoError */

std::ostream& operator<<(std::ostream& os, const AvisoError& error) {
os << "AvisoError{";
os << "reason: " << error.reason();
os << "}";
return os;
}

Expand All @@ -68,7 +89,7 @@ std::ostream& operator<<(std::ostream& os, const AvisoNotification& notification

/* ConfiguredListener */

ConfiguredListener ConfiguredListener::make_configured_listener(const AvisoRequest& listen_request) {
ConfiguredListener ConfiguredListener::make_configured_listener(const AvisoSubscribe& listen_request) {
using json = nlohmann::ordered_json;

json data;
Expand Down Expand Up @@ -235,13 +256,13 @@ ConfiguredListener::accepts(const std::string& key, const std::string& value, ui
auto actual = v;

auto validate =
Overload{[&actual](const std::string& expected) { return expected == actual; },
[&actual](std::int64_t expected) { return std::to_string(expected) == actual; },
[&actual](const std::vector<std::string>& valid) {
return std::any_of(valid.begin(), valid.end(), [&actual](const auto& expected) {
return expected == actual;
});
}};
ecf::overload{[&actual](const std::string& expected) { return expected == actual; },
[&actual](std::int64_t expected) { return std::to_string(expected) == actual; },
[&actual](const std::vector<std::string>& valid) {
return std::any_of(valid.begin(), valid.end(), [&actual](const auto& expected) {
return expected == actual;
});
}};

applicable = std::visit(validate, found->second);
if (!applicable) {
Expand All @@ -254,7 +275,7 @@ ConfiguredListener::accepts(const std::string& key, const std::string& value, ui
if (applicable) {
AvisoNotification notification{key, value, revision};
for (const auto& [k, v] : actual_parameters) {
notification.match()->add_parameter(k, v);
notification.add_parameter(k, v);
}
ALOG(D, "Aviso: Match [✓] --> <Notification> " << key << " = " << value << " (revision: " << revision << ")");
return notification;
Expand Down
Loading

0 comments on commit cc241ff

Please sign in to comment.