Skip to content

Commit

Permalink
Ensure single thread replying to HTTP request
Browse files Browse the repository at this point in the history
Re ECFLOW-1957
  • Loading branch information
marcosbento committed Sep 16, 2024
1 parent 78028c2 commit df548c9
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 38 deletions.
26 changes: 19 additions & 7 deletions libs/base/src/ecflow/base/HttpClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
#include "ecflow/core/Converter.hpp"

HttpClient::HttpClient(Cmd_ptr cmd_ptr, const std::string& host, const std::string& port, int timeout)
: stopped_(false),
host_(host),
: host_(host),
port_(port),
client_(host, ecf::convert_to<int>(port)) {

Expand All @@ -34,16 +33,29 @@ void HttpClient::run() {
std::string outbound;
ecf::save_as_string(outbound, outbound_request_);

auto result = client_.Post("/v1/ecflow", outbound, "application/json");
auto response = result.value();

ecf::restore_from_string(response.body, inbound_response_);
auto result = client_.Post("/v1/ecflow", outbound, "application/json");
if (result) {
auto response = result.value();
ecf::restore_from_string(response.body, inbound_response_);
}
else {
status_ = result.error();
reason_ = httplib::to_string(status_);
}
};

bool HttpClient::handle_server_response(ServerReply& server_reply, bool debug) const {
if (debug) {
std::cout << " Client::handle_server_response" << std::endl;
}
server_reply.set_host_port(host_, port_); // client context, needed by some commands, ie. SServerLoadCmd
return inbound_response_.handle_server_response(server_reply, outbound_request_.get_cmd(), debug);

if (status_ == httplib::Error::Success) {
return inbound_response_.handle_server_response(server_reply, outbound_request_.get_cmd(), debug);
}
else {
std::stringstream ss;
ss << "HttpClient::handle_server_response: Error: " << status_ << " " << reason_;
throw std::runtime_error(ss.str());
}
}
12 changes: 6 additions & 6 deletions libs/base/src/ecflow/base/HttpClient.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@
#include "ecflow/base/ServerToClientResponse.hpp"

///
/// \brief This class acts as the client part. ( in client/server architecture)
///
/// \note The plug command can move a node to another server hence the server
/// itself will NEED to ACT as a client. This is why client lives in Base and
/// not the Client project
/// \brief This class acts as an HTTP client
///

class HttpClient {
Expand All @@ -38,10 +34,14 @@ class HttpClient {
bool handle_server_response(ServerReply&, bool debug) const;

private:
bool stopped_;
std::string host_; /// the servers name
std::string port_; /// the port on the server
httplib::Client client_;

httplib::Response response_;
httplib::Error status_ = httplib::Error::Success;
std::string reason_ = "";

ClientToServerRequest outbound_request_; /// The request we will send to the server
ServerToClientResponse inbound_response_; /// The response we get back from the server
};
Expand Down
18 changes: 2 additions & 16 deletions libs/core/src/ecflow/core/Ecf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
bool Ecf::server_ = false;
bool Ecf::debug_equality_ = false;
unsigned int Ecf::debug_level_ = 0;
thread_local Ecf::atomic_counter_t Ecf::state_change_no_ = 0;
thread_local Ecf::atomic_counter_t Ecf::modify_change_no_ = 0;
Ecf::atomic_counter_t Ecf::state_change_no_ = 0;
Ecf::atomic_counter_t Ecf::modify_change_no_ = 0;
bool DebugEquality::ignore_server_variables_ = false;

const char* Ecf::SERVER_NAME() {
Expand Down Expand Up @@ -80,20 +80,6 @@ const std::string& Ecf::URL() {
return URL;
}

Ecf::counter_t Ecf::incr_state_change_no() {
if (server_) {
return ++state_change_no_;
}
return state_change_no_;
}

Ecf::counter_t Ecf::incr_modify_change_no() {
if (server_) {
return ++modify_change_no_;
}
return modify_change_no_;
}

// =======================================================

EcfPreserveChangeNo::EcfPreserveChangeNo()
Expand Down
46 changes: 38 additions & 8 deletions libs/core/src/ecflow/core/Ecf.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
///

#include <atomic>
#include <iostream>
#include <string>

/**
Expand Down Expand Up @@ -47,14 +48,43 @@ class Ecf {
const Ecf& operator=(const Ecf&) = delete;

/// Increment and then return state change no
static counter_t incr_state_change_no();
static counter_t state_change_no() { return state_change_no_; }
static void set_state_change_no(counter_t x) { state_change_no_ = x; }
static counter_t incr_state_change_no() {
if (server_) {
++state_change_no_;
}
std::string location = server_ ? "server" : "client";
std::cout << "incrementing state_change_no_ (" << location << ") = " << state_change_no_ << std::endl;

return state_change_no_;
}
static counter_t state_change_no() {
std::cout << "accessing state_change_no_ = " << state_change_no_ << std::endl;
return state_change_no_;
}
static void set_state_change_no(counter_t x) {
state_change_no_ = x;
std::cout << "setting state_change_no_ = " << x << std::endl;
}

/// The modify_change_no_ is used for node addition and deletion and re-ordering
static counter_t incr_modify_change_no();
static counter_t modify_change_no() { return modify_change_no_; }
static void set_modify_change_no(counter_t x) { modify_change_no_ = x; }
static counter_t incr_modify_change_no() {
if (server_) {
++modify_change_no_;
}

std::string location = server_ ? "server" : "client";
std::cout << "incrementing modify_change_no_ (" << location << ") = " << modify_change_no_ << std::endl;

return modify_change_no_;
}
static counter_t modify_change_no() {
std::cout << "accessing modify_change_no_ = " << modify_change_no_ << std::endl;
return modify_change_no_;
}
static void set_modify_change_no(counter_t x) {
modify_change_no_ = x;
std::cout << "setting modify_change_no_ = " << modify_change_no_ << std::endl;
}

/// Returns true if we are on the server side.
/// Only in server side do we increment state/modify numbers
Expand Down Expand Up @@ -91,8 +121,8 @@ class Ecf {
static bool server_;
static bool debug_equality_;
static unsigned int debug_level_;
static thread_local atomic_counter_t state_change_no_;
static thread_local atomic_counter_t modify_change_no_;
static atomic_counter_t state_change_no_;
static atomic_counter_t modify_change_no_;
};

/// Make sure the Ecf number don't change
Expand Down
1 change: 1 addition & 0 deletions libs/rest/test/TestApiV1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#define CPPHTTPLIB_OPENSSL_SUPPORT
#endif

#define CPPHTTPLIB_THREAD_POOL_COUNT 1
#include <httplib.h>

#include <boost/test/unit_test.hpp>
Expand Down
15 changes: 14 additions & 1 deletion libs/server/src/ecflow/server/Server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#ifndef ecflow_server_Server_HPP
#define ecflow_server_Server_HPP

#define CPPHTTPLIB_THREAD_POOL_COUNT 1

#include <httplib.h>

#include "ecflow/base/stc/PreAllocatedReply.hpp"
Expand Down Expand Up @@ -67,7 +69,7 @@ class HttpServer : public BaseServer {
{
// See what kind of message we got from the client
if (serverEnv_.debug()) {
std::cout << " TcpBaseServer::handle_request : client request " << inbound_request << std::endl;
std::cout << " HTTPServer::handle_request : client request " << inbound_request << std::endl;
}

try {
Expand All @@ -87,8 +89,19 @@ class HttpServer : public BaseServer {
std::string outbound;
ecf::save_as_string(outbound, outbound_response);

outbound_response.cleanup();

// 4) ship response
response.set_content(outbound, "text/plain");

if (inbound_request.terminateRequest()) {
if (serverEnv_.debug())
std::cout << " <-- HttpServer::handle_terminate_request exiting server via terminate() port "
<< serverEnv_.port() << std::endl;
// terminate();

// TODO: Terminate the server!
}
});

try {
Expand Down

0 comments on commit df548c9

Please sign in to comment.