Skip to content

Commit

Permalink
feat: Integrate new webserver (#1722)
Browse files Browse the repository at this point in the history
For #919.
The new web server is not using dosguard yet. It will be fixed by a
separate PR.
  • Loading branch information
kuznetsss authored Nov 21, 2024
1 parent fc3ba07 commit c77154a
Show file tree
Hide file tree
Showing 90 changed files with 4,020 additions and 674 deletions.
3 changes: 2 additions & 1 deletion docs/examples/config/example-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@
// send a reply for each request whenever it is ready.
"parallel_requests_limit": 10, // Optional parameter, used only if "processing_strategy" is "parallel". It limits the number of requests for one client connection processed in parallel. Infinite if not specified.
// Max number of responses to queue up before sent successfully. If a client's waiting queue is too long, the server will close the connection.
"ws_max_sending_queue_size": 1500
"ws_max_sending_queue_size": 1500,
"__ng_web_server": false // Use ng web server. This is a temporary setting which will be deleted after switching to ng web server
},
// Time in seconds for graceful shutdown. Defaults to 10 seconds. Not fully implemented yet.
"graceful_period": 10.0,
Expand Down
4 changes: 3 additions & 1 deletion src/app/CliArgs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ CliArgs::parse(int argc, char const* argv[])
("help,h", "print help message and exit")
("version,v", "print version and exit")
("conf,c", po::value<std::string>()->default_value(defaultConfigPath), "configuration file")
("ng-web-server,w", "Use ng-web-server")
;
// clang-format on
po::positional_options_description positional;
Expand All @@ -64,7 +65,8 @@ CliArgs::parse(int argc, char const* argv[])
}

auto configPath = parsed["conf"].as<std::string>();
return Action{Action::Run{std::move(configPath)}};
return Action{Action::Run{.configPath = std::move(configPath), .useNgWebServer = parsed.count("ng-web-server") != 0}
};
}

} // namespace app
7 changes: 3 additions & 4 deletions src/app/CliArgs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,13 @@ class CliArgs {
public:
/** @brief Run action. */
struct Run {
/** @brief Configuration file path. */
std::string configPath;
std::string configPath; ///< Configuration file path.
bool useNgWebServer; ///< Whether to use a ng web server
};

/** @brief Exit action. */
struct Exit {
/** @brief Exit code. */
int exitCode;
int exitCode; ///< Exit code.
};

/**
Expand Down
98 changes: 97 additions & 1 deletion src/app/ClioApplication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,39 @@
#include "etl/NetworkValidatedLedgers.hpp"
#include "feed/SubscriptionManager.hpp"
#include "rpc/Counters.hpp"
#include "rpc/Errors.hpp"
#include "rpc/RPCEngine.hpp"
#include "rpc/WorkQueue.hpp"
#include "rpc/common/impl/HandlerProvider.hpp"
#include "util/Assert.hpp"
#include "util/build/Build.hpp"
#include "util/config/Config.hpp"
#include "util/log/Logger.hpp"
#include "util/prometheus/Http.hpp"
#include "util/prometheus/Prometheus.hpp"
#include "web/AdminVerificationStrategy.hpp"
#include "web/RPCServerHandler.hpp"
#include "web/Server.hpp"
#include "web/SubscriptionContextInterface.hpp"
#include "web/dosguard/DOSGuard.hpp"
#include "web/dosguard/IntervalSweepHandler.hpp"
#include "web/dosguard/WhitelistHandler.hpp"
#include "web/ng/Connection.hpp"
#include "web/ng/RPCServerHandler.hpp"
#include "web/ng/Request.hpp"
#include "web/ng/Response.hpp"
#include "web/ng/Server.hpp"

#include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/beast/http/status.hpp>

#include <cstdint>
#include <cstdlib>
#include <exception>
#include <memory>
#include <thread>
#include <utility>
#include <vector>

namespace app {
Expand Down Expand Up @@ -79,7 +93,7 @@ ClioApplication::ClioApplication(util::Config const& config) : config_(config),
}

int
ClioApplication::run()
ClioApplication::run(bool const useNgWebServer)
{
auto const threads = config_.valueOr("io_threads", 2);
if (threads <= 0) {
Expand Down Expand Up @@ -126,9 +140,91 @@ ClioApplication::run()
auto const rpcEngine =
RPCEngineType::make_RPCEngine(config_, backend, balancer, dosGuard, workQueue, counters, handlerProvider);

if (useNgWebServer or config_.valueOr("server.__ng_web_server", false)) {
web::ng::RPCServerHandler<RPCEngineType, etl::ETLService> handler{config_, backend, rpcEngine, etl};

auto expectedAdminVerifier = web::make_AdminVerificationStrategy(config_);
if (not expectedAdminVerifier.has_value()) {
LOG(util::LogService::error()) << "Error creating admin verifier: " << expectedAdminVerifier.error();
return EXIT_FAILURE;
}
auto const adminVerifier = std::move(expectedAdminVerifier).value();

auto httpServer = web::ng::make_Server(config_, ioc);

if (not httpServer.has_value()) {
LOG(util::LogService::error()) << "Error creating web server: " << httpServer.error();
return EXIT_FAILURE;
}

httpServer->onGet(
"/metrics",
[adminVerifier](
web::ng::Request const& request,
web::ng::ConnectionMetadata& connectionMetadata,
web::SubscriptionContextPtr,
boost::asio::yield_context
) -> web::ng::Response {
auto const maybeHttpRequest = request.asHttpRequest();
ASSERT(maybeHttpRequest.has_value(), "Got not a http request in Get");
auto const& httpRequest = maybeHttpRequest->get();

// FIXME(#1702): Using veb server thread to handle prometheus request. Better to post on work queue.
auto maybeResponse = util::prometheus::handlePrometheusRequest(
httpRequest, adminVerifier->isAdmin(httpRequest, connectionMetadata.ip())
);
ASSERT(maybeResponse.has_value(), "Got unexpected request for Prometheus");
return web::ng::Response{std::move(maybeResponse).value(), request};
}
);

util::Logger webServerLog{"WebServer"};
auto onRequest = [adminVerifier, &webServerLog, &handler](
web::ng::Request const& request,
web::ng::ConnectionMetadata& connectionMetadata,
web::SubscriptionContextPtr subscriptionContext,
boost::asio::yield_context yield
) -> web::ng::Response {
LOG(webServerLog.info()) << connectionMetadata.tag()
<< "Received request from ip = " << connectionMetadata.ip()
<< " - posting to WorkQueue";

connectionMetadata.setIsAdmin([&adminVerifier, &request, &connectionMetadata]() {
return adminVerifier->isAdmin(request.httpHeaders(), connectionMetadata.ip());
});

try {
return handler(request, connectionMetadata, std::move(subscriptionContext), yield);
} catch (std::exception const&) {
return web::ng::Response{
boost::beast::http::status::internal_server_error,
rpc::makeError(rpc::RippledError::rpcINTERNAL),
request
};
}
};

httpServer->onPost("/", onRequest);
httpServer->onWs(onRequest);

auto const maybeError = httpServer->run();
if (maybeError.has_value()) {
LOG(util::LogService::error()) << "Error starting web server: " << *maybeError;
return EXIT_FAILURE;
}

// Blocks until stopped.
// When stopped, shared_ptrs fall out of scope
// Calls destructors on all resources, and destructs in order
start(ioc, threads);

return EXIT_SUCCESS;
}

// Init the web server
auto handler =
std::make_shared<web::RPCServerHandler<RPCEngineType, etl::ETLService>>(config_, backend, rpcEngine, etl);

auto const httpServer = web::make_HttpServer(config_, ioc, dosGuard, handler);

// Blocks until stopped.
Expand Down
4 changes: 3 additions & 1 deletion src/app/ClioApplication.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ class ClioApplication {
/**
* @brief Run the application
*
* @param useNgWebServer Whether to use the new web server
*
* @return exit code
*/
int
run();
run(bool useNgWebServer);
};

} // namespace app
5 changes: 3 additions & 2 deletions src/feed/Types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@

#pragma once

#include "web/interface/ConnectionBase.hpp"
#include "web/SubscriptionContextInterface.hpp"

#include <memory>

namespace feed {
using Subscriber = web::ConnectionBase;

using Subscriber = web::SubscriptionContextInterface;
using SubscriberPtr = Subscriber*;
using SubscriberSharedPtr = std::shared_ptr<Subscriber>;

Expand Down
6 changes: 2 additions & 4 deletions src/feed/impl/ProposedTransactionFeed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ ProposedTransactionFeed::sub(SubscriberSharedPtr const& subscriber)
if (added) {
LOG(logger_.info()) << subscriber->tag() << "Subscribed tx_proposed";
++subAllCount_.get();
subscriber->onDisconnect.connect([this](SubscriberPtr connection) { unsubInternal(connection); });
subscriber->onDisconnect([this](SubscriberPtr connection) { unsubInternal(connection); });
}
}

Expand All @@ -73,9 +73,7 @@ ProposedTransactionFeed::sub(ripple::AccountID const& account, SubscriberSharedP
if (added) {
LOG(logger_.info()) << subscriber->tag() << "Subscribed accounts_proposed " << account;
++subAccountCount_.get();
subscriber->onDisconnect.connect([this, account](SubscriberPtr connection) {
unsubInternal(account, connection);
});
subscriber->onDisconnect([this, account](SubscriberPtr connection) { unsubInternal(account, connection); });
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/feed/impl/SingleFeedBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ SingleFeedBase::sub(SubscriberSharedPtr const& subscriber)
if (added) {
LOG(logger_.info()) << subscriber->tag() << "Subscribed " << name_;
++subCount_.get();
subscriber->onDisconnect.connect([this](SubscriberPtr connectionDisconnecting) {
subscriber->onDisconnect([this](SubscriberPtr connectionDisconnecting) {
unsubInternal(connectionDisconnecting);
});
};
Expand Down
1 change: 1 addition & 0 deletions src/feed/impl/TrackableSignalMap.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include <boost/signals2.hpp>

#include <concepts>
#include <cstddef>
#include <functional>
#include <memory>
Expand Down
22 changes: 10 additions & 12 deletions src/feed/impl/TransactionFeed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ namespace feed::impl {
void
TransactionFeed::TransactionSlot::operator()(AllVersionTransactionsType const& allVersionMsgs) const
{
if (auto connection = connectionWeakPtr.lock(); connection) {
if (auto connection = subscriptionContextWeakPtr.lock(); connection) {
// Check if this connection already sent
if (feed.get().notified_.contains(connection.get()))
return;

feed.get().notified_.insert(connection.get());

if (connection->apiSubVersion < 2u) {
if (connection->apiSubversion() < 2u) {
connection->send(allVersionMsgs[0]);
return;
}
Expand All @@ -75,7 +75,7 @@ TransactionFeed::sub(SubscriberSharedPtr const& subscriber)
if (added) {
LOG(logger_.info()) << subscriber->tag() << "Subscribed transactions";
++subAllCount_.get();
subscriber->onDisconnect.connect([this](SubscriberPtr connection) { unsubInternal(connection); });
subscriber->onDisconnect([this](SubscriberPtr connection) { unsubInternal(connection); });
}
}

Expand All @@ -86,18 +86,16 @@ TransactionFeed::sub(ripple::AccountID const& account, SubscriberSharedPtr const
if (added) {
LOG(logger_.info()) << subscriber->tag() << "Subscribed account " << account;
++subAccountCount_.get();
subscriber->onDisconnect.connect([this, account](SubscriberPtr connection) {
unsubInternal(account, connection);
});
subscriber->onDisconnect([this, account](SubscriberPtr connection) { unsubInternal(account, connection); });
}
}

void
TransactionFeed::subProposed(SubscriberSharedPtr const& subscriber)
{
auto const added = txProposedsignal_.connectTrackableSlot(subscriber, TransactionSlot(*this, subscriber));
auto const added = txProposedSignal_.connectTrackableSlot(subscriber, TransactionSlot(*this, subscriber));
if (added) {
subscriber->onDisconnect.connect([this](SubscriberPtr connection) { unsubProposedInternal(connection); });
subscriber->onDisconnect([this](SubscriberPtr connection) { unsubProposedInternal(connection); });
}
}

Expand All @@ -107,7 +105,7 @@ TransactionFeed::subProposed(ripple::AccountID const& account, SubscriberSharedP
auto const added =
accountProposedSignal_.connectTrackableSlot(subscriber, account, TransactionSlot(*this, subscriber));
if (added) {
subscriber->onDisconnect.connect([this, account](SubscriberPtr connection) {
subscriber->onDisconnect([this, account](SubscriberPtr connection) {
unsubProposedInternal(account, connection);
});
}
Expand All @@ -120,7 +118,7 @@ TransactionFeed::sub(ripple::Book const& book, SubscriberSharedPtr const& subscr
if (added) {
LOG(logger_.info()) << subscriber->tag() << "Subscribed book " << book;
++subBookCount_.get();
subscriber->onDisconnect.connect([this, book](SubscriberPtr connection) { unsubInternal(book, connection); });
subscriber->onDisconnect([this, book](SubscriberPtr connection) { unsubInternal(book, connection); });
}
}

Expand Down Expand Up @@ -285,7 +283,7 @@ TransactionFeed::pub(
// clear the notified set. If the same connection subscribes both transactions + proposed_transactions,
// rippled SENDS the same message twice
notified_.clear();
txProposedsignal_.emit(allVersionsMsgs);
txProposedSignal_.emit(allVersionsMsgs);
notified_.clear();
// check duplicate for account and proposed_account, this prevents sending the same message multiple times
// if it affects multiple accounts watched by the same connection
Expand Down Expand Up @@ -323,7 +321,7 @@ TransactionFeed::unsubInternal(ripple::AccountID const& account, SubscriberPtr s
void
TransactionFeed::unsubProposedInternal(SubscriberPtr subscriber)
{
txProposedsignal_.disconnect(subscriber);
txProposedSignal_.disconnect(subscriber);
}

void
Expand Down
6 changes: 3 additions & 3 deletions src/feed/impl/TransactionFeed.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ class TransactionFeed {

struct TransactionSlot {
std::reference_wrapper<TransactionFeed> feed;
std::weak_ptr<Subscriber> connectionWeakPtr;
std::weak_ptr<Subscriber> subscriptionContextWeakPtr;

TransactionSlot(TransactionFeed& feed, SubscriberSharedPtr const& connection)
: feed(feed), connectionWeakPtr(connection)
: feed(feed), subscriptionContextWeakPtr(connection)
{
}

Expand All @@ -76,7 +76,7 @@ class TransactionFeed {

// Signals for proposed tx subscribers
TrackableSignalMap<ripple::AccountID, Subscriber, AllVersionTransactionsType const&> accountProposedSignal_;
TrackableSignal<Subscriber, AllVersionTransactionsType const&> txProposedsignal_;
TrackableSignal<Subscriber, AllVersionTransactionsType const&> txProposedSignal_;

std::unordered_set<SubscriberPtr>
notified_; // Used by slots to prevent double notifications if tx contains multiple subscribed accounts
Expand Down
3 changes: 2 additions & 1 deletion src/main/Main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ try {
}
util::LogService::init(config);
app::ClioApplication clio{config};
return clio.run();

return clio.run(run.useNgWebServer);
}
);
} catch (std::exception const& e) {
Expand Down
Loading

0 comments on commit c77154a

Please sign in to comment.