Skip to content

Commit

Permalink
Separate Mirror related classes
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosbento committed May 29, 2024
1 parent cc241ff commit 3dc26fb
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 174 deletions.
4 changes: 4 additions & 0 deletions libs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,8 @@ set(srcs
service/src/ecflow/service/aviso/etcd/Client.hpp
service/src/ecflow/service/aviso/etcd/Range.hpp
service/src/ecflow/service/executor/PeriodicTaskExecutor.hpp
service/src/ecflow/service/mirror/Mirror.hpp
service/src/ecflow/service/mirror/MirrorClient.hpp
service/src/ecflow/service/mirror/MirrorService.hpp
service/src/ecflow/service/Controller.hpp
service/src/ecflow/service/Log.hpp
Expand All @@ -515,6 +517,8 @@ set(srcs
service/src/ecflow/service/aviso/AvisoService.cpp
service/src/ecflow/service/aviso/etcd/Range.cpp
service/src/ecflow/service/aviso/etcd/Client.cpp
service/src/ecflow/service/mirror/Mirror.cpp
service/src/ecflow/service/mirror/MirrorClient.cpp
service/src/ecflow/service/mirror/MirrorService.cpp
service/src/ecflow/service/Controller.cpp
service/src/ecflow/service/Registry.cpp
Expand Down
49 changes: 49 additions & 0 deletions libs/service/src/ecflow/service/mirror/Mirror.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2009- ECMWF.
*
* This software is licensed under the terms of the Apache Licence version 2.0
* which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
* In applying this licence, ECMWF does not waive the privileges and immunities
* granted to it by virtue of its status as an intergovernmental organisation
* nor does it submit to any jurisdiction.
*/

#include "ecflow/service/mirror/Mirror.hpp"

namespace ecf::service::mirror {

/* MirrorRequest */

std::ostream& operator<<(std::ostream& os, const MirrorRequest& r) {
os << "MirrorRequest{";
os << "path=" << r.path << ", ";
os << "host=" << r.host << ", ";
os << "port=" << r.port << ", ";
os << "polling=" << r.polling << ", ";
os << "ssl=" << r.ssl << ", ";
os << "auth=" << r.auth << "}";
return os;
}

/* MirrorNotification */

std::ostream& operator<<(std::ostream& os, const MirrorNotification& n) {
os << "MirrorNotification{" << n.path << ", " << n.status << ", " << n.reason() << "}";
return os;
}

/* MirrorError */

std::ostream& operator<<(std::ostream& os, const MirrorError& n) {
os << "MirrorError{reason = " << n.reason() << "}";
return os;
}

/* MirrorResponse */

std::ostream& operator<<(std::ostream& os, const MirrorResponse& r) {
std::visit([&os](const auto& v) { os << v; }, r);
return os;
}

} // namespace ecf::service::mirror
85 changes: 85 additions & 0 deletions libs/service/src/ecflow/service/mirror/Mirror.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2009- ECMWF.
*
* This software is licensed under the terms of the Apache Licence version 2.0
* which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
* In applying this licence, ECMWF does not waive the privileges and immunities
* granted to it by virtue of its status as an intergovernmental organisation
* nor does it submit to any jurisdiction.
*/

#ifndef ecflow_service_mirror_Mirror_HPP
#define ecflow_service_mirror_Mirror_HPP

#include <string>
#include <string_view>

#include "ecflow/service/Controller.hpp"
#include "ecflow/service/executor/PeriodicTaskExecutor.hpp"
#include "ecflow/service/mirror/MirrorClient.hpp"

namespace ecf::service::mirror {

/**
*
* A MirrorRequest is a request to start listening for state notifications from remote ecFlow server.
*
*/
class MirrorRequest {
public:
std::string path;
std::string host;
std::string port;
std::uint32_t polling;
bool ssl;
std::string auth;

friend std::ostream& operator<<(std::ostream&, const MirrorRequest&);
};

/**
*
* A MirrorNotification is a notification of a state change in a remote ecFlow server.
*
*/
struct MirrorNotification
{
bool success;
std::string path;
std::string failure_reason;
int status;

std::string reason() const { return failure_reason.substr(0, failure_reason.find_first_of("\n")); }

friend std::ostream& operator<<(std::ostream&, const MirrorNotification&);
};

/**
*
* A MirrorError is a notification that an error occurred when contacting a remote ecFlow server.
*
*/
class MirrorError {
public:
explicit MirrorError(std::string_view reason) : reason_{reason.substr(0, reason.find_first_of("\n"))} {}

const std::string& reason() const { return reason_; }

friend std::ostream& operator<<(std::ostream&, const MirrorError&);

private:
std::string reason_;
};

/**
*
* A MirrorResponse is a notification of either state change or an error.
*
*/
using MirrorResponse = std::variant<MirrorNotification, MirrorError>;

std::ostream& operator<<(std::ostream&, const MirrorResponse&);

} // namespace ecf::service::mirror

#endif /* ecflow_service_mirror_Mirror_HPP */
81 changes: 81 additions & 0 deletions libs/service/src/ecflow/service/mirror/MirrorClient.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2009- ECMWF.
*
* This software is licensed under the terms of the Apache Licence version 2.0
* which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
* In applying this licence, ECMWF does not waive the privileges and immunities
* granted to it by virtue of its status as an intergovernmental organisation
* nor does it submit to any jurisdiction.
*/

#include "ecflow/service/mirror/MirrorClient.hpp"

#include "ecflow/client/ClientInvoker.hpp"
#include "ecflow/core/Message.hpp"
#include "ecflow/core/PasswordEncryption.hpp"
#include "ecflow/node/Defs.hpp"
#include "ecflow/node/Node.hpp"
#include "ecflow/service/Log.hpp"

namespace ecf::service::mirror {
/* MirrorClient */

struct MirrorClient::Impl
{
std::shared_ptr<Defs> defs_;
ClientInvoker invoker_;
};

MirrorClient::MirrorClient() : impl_(std::make_unique<Impl>()) {
}

MirrorClient::~MirrorClient() = default;

int MirrorClient::get_node_status(const std::string& remote_host,
const std::string& remote_port,
const std::string& node_path,
bool ssl,
const std::string& remote_username,
const std::string& remote_password) const {
ALOG(D, "MirrorClient: Accessing " << remote_host << ":" << remote_port << ", path=" << node_path);
ALOG(D, "MirrorClient: Authentication Credentials: " << remote_username << ":" << remote_password);

try {
impl_ = std::make_unique<Impl>();
impl_->invoker_.set_host_port(remote_host, remote_port);
if (ssl) {
impl_->invoker_.enable_ssl();
}
if (!remote_username.empty()) {
impl_->invoker_.set_user_name(remote_username);
}
if (!remote_password.empty()) {
// Extremely important: the password actually needs to be encrypted before being set in the invoker!
impl_->invoker_.set_password(PasswordEncryption::encrypt(remote_password, remote_username));
}

ALOG(D, "MirrorClient: retrieving the latest defs");
impl_->invoker_.sync(impl_->defs_);

if (!impl_->defs_) {
ALOG(E, "MirrorClient: unable to sync with remote defs");
throw std::runtime_error("MirrorClient: Failed to sync with remote defs");
}

auto node = impl_->defs_->findAbsNode(node_path);

if (!node) {
throw std::runtime_error(
Message("MirrorClient: Unable to find requested node (", node_path, ") in remote remote defs").str());
}

auto state = node->state();
ALOG(D, "MirrorClient: found node (" << node_path << "), with state " << state);
return state;
}
catch (std::exception& e) {
throw std::runtime_error(Message("MirrorClient: failure to sync remote defs, due to: ", e.what()));
}
}

} // namespace ecf::service::mirror
38 changes: 38 additions & 0 deletions libs/service/src/ecflow/service/mirror/MirrorClient.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2009- ECMWF.
*
* This software is licensed under the terms of the Apache Licence version 2.0
* which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
* In applying this licence, ECMWF does not waive the privileges and immunities
* granted to it by virtue of its status as an intergovernmental organisation
* nor does it submit to any jurisdiction.
*/

#ifndef ecflow_service_mirror_MirrorClient_HPP
#define ecflow_service_mirror_MirrorClient_HPP

#include <memory>
#include <string>

namespace ecf::service::mirror {

class MirrorClient {
public:
MirrorClient();
~MirrorClient();

int get_node_status(const std::string& remote_host,
const std::string& remote_port,
const std::string& node_path,
bool ssl,
const std::string& remote_username,
const std::string& remote_password) const;

private:
struct Impl;
mutable std::unique_ptr<Impl> impl_;
};

} // namespace ecf::service::mirror

#endif /* ecflow_service_mirror_MirrorClient_HPP */
96 changes: 1 addition & 95 deletions libs/service/src/ecflow/service/mirror/MirrorService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,7 @@ std::pair<std::string, std::string> load_auth_credentials(const std::string& aut

} // namespace

/* MirrorClient */

struct MirrorClient::Impl
{
std::shared_ptr<Defs> defs_;
ClientInvoker invoker_;
};
/* MirrorService */

std::optional<std::string> MirrorService::key(const MirrorService::notification_t& notification) {
return std::visit(ecf::overload{[](const service::mirror::MirrorNotification& notification) {
Expand All @@ -58,94 +52,6 @@ std::optional<std::string> MirrorService::key(const MirrorService::notification_
notification);
}

MirrorClient::MirrorClient() : impl_(std::make_unique<Impl>()) {
}

MirrorClient::~MirrorClient() = default;

int MirrorClient::get_node_status(const std::string& remote_host,
const std::string& remote_port,
const std::string& node_path,
bool ssl,
const std::string& remote_username,
const std::string& remote_password) const {
ALOG(D, "MirrorClient: Accessing " << remote_host << ":" << remote_port << ", path=" << node_path);
ALOG(D, "MirrorClient: Authentication Credentials: " << remote_username << ":" << remote_password);

try {
impl_ = std::make_unique<Impl>();
impl_->invoker_.set_host_port(remote_host, remote_port);
if (ssl) {
impl_->invoker_.enable_ssl();
}
if (!remote_username.empty()) {
impl_->invoker_.set_user_name(remote_username);
}
if (!remote_password.empty()) {
// Extremely important: the password actually needs to be encrypted before being set in the invoker!
impl_->invoker_.set_password(PasswordEncryption::encrypt(remote_password, remote_username));
}

ALOG(D, "MirrorClient: retrieving the latest defs");
impl_->invoker_.sync(impl_->defs_);

if (!impl_->defs_) {
ALOG(E, "MirrorClient: unable to sync with remote defs");
throw std::runtime_error("MirrorClient: Failed to sync with remote defs");
}

auto node = impl_->defs_->findAbsNode(node_path);

if (!node) {
throw std::runtime_error(
Message("MirrorClient: Unable to find requested node (", node_path, ") in remote remote defs").str());
}

auto state = node->state();
ALOG(D, "MirrorClient: found node (" << node_path << "), with state " << state);
return state;
}
catch (std::exception& e) {
throw std::runtime_error(Message("MirrorClient: failure to sync remote defs, due to: ", e.what()));
}
}

/* MirrorRequest */

std::ostream& operator<<(std::ostream& os, const MirrorRequest& r) {
os << "MirrorRequest{";
os << "path=" << r.path << ", ";
os << "host=" << r.host << ", ";
os << "port=" << r.port << ", ";
os << "polling=" << r.polling << ", ";
os << "ssl=" << r.ssl << ", ";
os << "auth=" << r.auth << "}";
return os;
}

/* MirrorNotification */

std::ostream& operator<<(std::ostream& os, const MirrorNotification& n) {
os << "MirrorNotification{" << n.path << ", " << n.status << ", " << n.reason() << "}";
return os;
}

/* MirrorError */

std::ostream& operator<<(std::ostream& os, const MirrorError& n) {
os << "MirrorError{reason = " << n.reason() << "}";
return os;
}

/* MirrorResponse */

std::ostream& operator<<(std::ostream& os, const MirrorResponse& r) {
std::visit([&os](const auto& v) { os << v; }, r);
return os;
}

/* MirrorService */

void MirrorService::start() {

// Update list of listeners
Expand Down
Loading

0 comments on commit 3dc26fb

Please sign in to comment.