Skip to content

Commit

Permalink
Port http server
Browse files Browse the repository at this point in the history
  • Loading branch information
timemarkovqtum committed Feb 27, 2024
1 parent 3f765ba commit 307de61
Show file tree
Hide file tree
Showing 11 changed files with 419 additions and 34 deletions.
19 changes: 15 additions & 4 deletions src/httprpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class HTTPRPCTimer : public RPCTimerBase
{
public:
HTTPRPCTimer(struct event_base* eventBase, std::function<void()>& func, int64_t millis) :
ev(eventBase, false, func)
ev(eventBase, false, nullptr, func)
{
struct timeval tv;
tv.tv_sec = millis/1000;
Expand Down Expand Up @@ -85,8 +85,14 @@ static void JSONErrorReply(HTTPRequest* req, const UniValue& objError, const Uni

std::string strReply = JSONRPCReply(NullUniValue, objError, id);

req->WriteHeader("Content-Type", "application/json");
req->WriteReply(nStatus, strReply);
if (req->isChunkMode()) {
// in chunk mode, we assume that the handler had already set the response content-type
req->Chunk(strReply);
req->ChunkEnd();
} else {
req->WriteHeader("Content-Type", "application/json");
req->WriteReply(nStatus, strReply);
}
}

//This function checks username and password against -rpcauth
Expand Down Expand Up @@ -159,7 +165,7 @@ static bool HTTPReq_JSONRPC(const std::any& context, HTTPRequest* req)
return false;
}

JSONRPCRequest jreq;
JSONRPCRequestLong jreq(req);
jreq.context = context;
jreq.peerAddr = req->GetPeer().ToStringAddrPort();
if (!RPCAuthorized(authHeader.second, jreq.authUser)) {
Expand Down Expand Up @@ -201,6 +207,11 @@ static bool HTTPReq_JSONRPC(const std::any& context, HTTPRequest* req)
}
UniValue result = tableRPC.execute(jreq);

if (jreq.isLongPolling) {
jreq.PollReply(result);
return true;
}

// Send reply
strReply = JSONRPCReply(result, NullUniValue, jreq.id);

Expand Down
121 changes: 116 additions & 5 deletions src/httpserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <netbase.h>
#include <node/interface_ui.h>
#include <rpc/protocol.h> // For HTTP status codes
#include <rpc/server.h> // For HTTP status codes
#include <shutdown.h>
#include <sync.h>
#include <util/check.h>
Expand Down Expand Up @@ -563,14 +564,17 @@ static void httpevent_callback_fn(evutil_socket_t, short, void* data)
delete self;
}

HTTPEvent::HTTPEvent(struct event_base* base, bool _deleteWhenTriggered, const std::function<void()>& _handler):
deleteWhenTriggered(_deleteWhenTriggered), handler(_handler)
HTTPEvent::HTTPEvent(struct event_base* base, bool _deleteWhenTriggered, struct evbuffer *_databuf, const std::function<void()>& _handler):
deleteWhenTriggered(_deleteWhenTriggered), handler(_handler), databuf(_databuf)
{
ev = event_new(base, -1, 0, httpevent_callback_fn, this);
assert(ev);
}
HTTPEvent::~HTTPEvent()
{
if (databuf != NULL) {
evbuffer_free(databuf);
}
event_free(ev);
}
void HTTPEvent::trigger(struct timeval* tv)
Expand All @@ -580,20 +584,80 @@ void HTTPEvent::trigger(struct timeval* tv)
else
evtimer_add(ev, tv); // trigger after timeval passed
}
HTTPRequest::HTTPRequest(struct evhttp_request* _req, bool _replySent) : req(_req), replySent(_replySent)
HTTPRequest::HTTPRequest(struct evhttp_request* _req, bool _replySent) : req(_req), replySent(_replySent), startedChunkTransfer(false), connClosed(false)
{
}

HTTPRequest::~HTTPRequest()
{
if (!replySent) {
if (!replySent && !startedChunkTransfer) {
// Keep track of whether reply was sent to avoid request leaks
LogPrintf("%s: Unhandled request\n", __func__);
WriteReply(HTTP_INTERNAL_SERVER_ERROR, "Unhandled request");
}
// evhttpd cleans up the request, as long as a reply was sent.
}

void HTTPRequest::waitClientClose() {
LogPrint(BCLog::HTTPPOLL, "wait for connection close\n");

// wait at most 5 seconds for client to close
for (int i = 0; i < 10 && IsRPCRunning() && !isConnClosed(); i++) {
std::unique_lock<std::mutex> lock(cs);
closeCv.wait_for(lock, std::chrono::milliseconds(500));
}

if (isConnClosed()) {
LogPrint(BCLog::HTTPPOLL, "wait for connection close, ok\n");
} else if (!IsRPCRunning()) {
LogPrint(BCLog::HTTPPOLL, "wait for connection close, RPC stopped\n");
} else {
LogPrint(BCLog::HTTPPOLL, "wait for connection close, timeout after 5 seconds\n");
}
}

void HTTPRequest::startDetectClientClose() {
LogPrint(BCLog::HTTPPOLL, "start detect http connection close\n");
// will need to call evhttp_send_reply_end to clean this up
auto conn = evhttp_request_get_connection(req);

// evhttp_connection_set_closecb does not reliably detect client connection close unless we write to it.
//
// This problem is supposedly resolved in 2.1.8. See: https://github.com/libevent/libevent/issues/78
//
// But we should just write to the socket to test liveness. This is useful for long-poll RPC calls to see
// if they should terminate the request early.
//
// More weirdness: if process received SIGTERM, the http event loop (in HTTPThread) returns prematurely with 1.
// In which case evhttp_send_reply_end doesn't seem to get called, and evhttp_connection_set_closecb is
// not called. BUT when the event base is freed, this callback IS called, and HTTPRequest is already freed.
//
// So, waitClientClose and startDetectClientClose should just not do anything if RPC is shutting down.
evhttp_connection_set_closecb(conn, [](struct evhttp_connection *conn, void *data) {
LogPrint(BCLog::HTTPPOLL, "http connection close detected\n");

if (IsRPCRunning()) {
auto req = (HTTPRequest*) data;
req->setConnClosed();
}
}, (void *) this);
}

void HTTPRequest::setConnClosed() {
std::lock_guard<std::mutex> lock(cs);
connClosed = true;
closeCv.notify_all();
}

bool HTTPRequest::isConnClosed() {
std::lock_guard<std::mutex> lock(cs);
return connClosed;
}

bool HTTPRequest::isChunkMode() {
return startedChunkTransfer;
}

std::pair<bool, std::string> HTTPRequest::GetHeader(const std::string& hdr) const
{
const struct evkeyvalq* headers = evhttp_request_get_input_headers(req);
Expand Down Expand Up @@ -625,13 +689,60 @@ std::string HTTPRequest::ReadBody()
return rv;
}

bool HTTPRequest::ReplySent() {
return replySent;
}

void HTTPRequest::WriteHeader(const std::string& hdr, const std::string& value)
{
struct evkeyvalq* headers = evhttp_request_get_output_headers(req);
assert(headers);
evhttp_add_header(headers, hdr.c_str(), value.c_str());
}

void HTTPRequest::ChunkEnd() {
assert(startedChunkTransfer && !replySent);

HTTPEvent* ev = new HTTPEvent(eventBase, true, NULL,
std::bind(evhttp_send_reply_end, req));

ev->trigger(0);

// If HTTPRequest is destroyed before connection is closed, evhttp seems to get messed up.
// We wait here for connection close before returning back to the handler, where HTTPRequest will be reclaimed.
waitClientClose();

replySent = true;
// `WriteReply` sets req to 0 to prevent req from being freed. But this is not enough in the case of long-polling.
// Something is still freed to early.
// req = 0;
}

void HTTPRequest::Chunk(const std::string& chunk) {
assert(!replySent);

int status = 200;

if (!startedChunkTransfer) {
HTTPEvent* ev = new HTTPEvent(eventBase, true, NULL,
std::bind(evhttp_send_reply_start, req, status,
(const char*) NULL));
ev->trigger(0);

startDetectClientClose();
startedChunkTransfer = true;
}


if (chunk.size() > 0) {
auto databuf = evbuffer_new(); // HTTPEvent will free this buffer
evbuffer_add(databuf, chunk.data(), chunk.size());
HTTPEvent* ev = new HTTPEvent(eventBase, true, databuf,
std::bind(evhttp_send_reply_chunk, req, databuf));
ev->trigger(0);
}
}

/** Closure sent to main thread to request a reply to be sent to
* a HTTP request.
* Replies must be sent in the main loop in the main http thread,
Expand All @@ -648,7 +759,7 @@ void HTTPRequest::WriteReply(int nStatus, const std::string& strReply)
assert(evb);
evbuffer_add(evb, strReply.data(), strReply.size());
auto req_copy = req;
HTTPEvent* ev = new HTTPEvent(eventBase, true, [req_copy, nStatus]{
HTTPEvent* ev = new HTTPEvent(eventBase, true, nullptr, [req_copy, nStatus]{
evhttp_send_reply(req_copy, nStatus, nullptr, nullptr);
// Re-enable reading from the socket. This is the second part of the libevent
// workaround above.
Expand Down
32 changes: 31 additions & 1 deletion src/httpserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include <functional>
#include <optional>
#include <string>
#include <mutex>
#include <condition_variable>

static const int DEFAULT_HTTP_THREADS=4;
static const int DEFAULT_HTTP_WORKQUEUE=16;
Expand Down Expand Up @@ -58,6 +60,14 @@ class HTTPRequest
private:
struct evhttp_request* req;
bool replySent;
bool startedChunkTransfer;
bool connClosed;

std::mutex cs;
std::condition_variable closeCv;

void startDetectClientClose();
void waitClientClose();

public:
explicit HTTPRequest(struct evhttp_request* req, bool replySent = false);
Expand All @@ -71,6 +81,10 @@ class HTTPRequest
PUT
};

void setConnClosed();
bool isConnClosed();
bool isChunkMode();

/** Get requested URI.
*/
std::string GetURI() const;
Expand Down Expand Up @@ -124,6 +138,21 @@ class HTTPRequest
* main thread, do not call any other HTTPRequest methods after calling this.
*/
void WriteReply(int nStatus, const std::string& strReply = "");

/**
* Start chunk transfer. Assume to be 200.
*/
void Chunk(const std::string& chunk);

/**
* End chunk transfer.
*/
void ChunkEnd();

/**
* Is reply sent?
*/
bool ReplySent();
};

/** Get the query parameter value from request uri for a specified key, or std::nullopt if the key
Expand Down Expand Up @@ -158,7 +187,7 @@ class HTTPEvent
* deleteWhenTriggered deletes this event object after the event is triggered (and the handler called)
* handler is the handler to call when the event is triggered.
*/
HTTPEvent(struct event_base* base, bool deleteWhenTriggered, const std::function<void()>& handler);
HTTPEvent(struct event_base* base, bool deleteWhenTriggered, struct evbuffer *_databuf, const std::function<void()>& handler);
~HTTPEvent();

/** Trigger the event. If tv is 0, trigger it immediately. Otherwise trigger it after
Expand All @@ -169,6 +198,7 @@ class HTTPEvent
bool deleteWhenTriggered;
std::function<void()> handler;
private:
struct evbuffer *databuf;
struct event* ev;
};

Expand Down
2 changes: 1 addition & 1 deletion src/rest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ static bool rest_chaininfo(const std::any& context, HTTPRequest* req, const std:

switch (rf) {
case RESTResponseFormat::JSON: {
JSONRPCRequest jsonRequest;
JSONRPCRequestLong jsonRequest(req);
jsonRequest.context = context;
jsonRequest.params = UniValue(UniValue::VARR);
UniValue chainInfoObject = getblockchaininfo().HandleRequest(jsonRequest);
Expand Down
Loading

0 comments on commit 307de61

Please sign in to comment.