Skip to content

Commit

Permalink
Merge pull request #1579 from Expensify/main
Browse files Browse the repository at this point in the history
Update expensify_prod branch
  • Loading branch information
johnmlee101 authored Oct 2, 2023
2 parents de25127 + 716059a commit 81c8f08
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 115 deletions.
126 changes: 36 additions & 90 deletions BedrockServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,6 @@ bool BedrockServer::canStandDown() {
size_t blockingQueueSize = _blockingCommandQueue.size();
size_t syncNodeQueueSize = _syncNodeQueuedCommands.size();

// These two aren't all nicely packaged so we need to lock them ourselves.
size_t outstandingHTTPSCommandsSize = 0;
{
lock_guard<decltype(_httpsCommandMutex)> lock(_httpsCommandMutex);
outstandingHTTPSCommandsSize = _outstandingHTTPSCommands.size();
}
size_t futureCommitCommandsSize = 0;
{
lock_guard<decltype(_futureCommitCommandMutex)> lock(_futureCommitCommandMutex);
Expand All @@ -48,7 +42,6 @@ bool BedrockServer::canStandDown() {
<< "mainQueueSize: " << mainQueueSize << ", "
<< "blockingQueueSize: " << blockingQueueSize << ", "
<< "syncNodeQueueSize: " << syncNodeQueueSize << ", "
<< "outstandingHTTPSCommandsSize: " << outstandingHTTPSCommandsSize << ", "
<< "futureCommitCommandsSize: " << futureCommitCommandsSize << ", "
<< "standDownQueueSize: " << standDownQueueSize << ".");
return false;
Expand Down Expand Up @@ -243,9 +236,6 @@ void BedrockServer::sync()
// activity. Once any of them has activity (or the timeout ends), poll will return.
fd_map fdm;

// Prepare our commands for `poll` (for instance, in case they're making HTTP requests).
_prePollCommands(fdm);

// Pre-process any sockets the sync node is managing (i.e., communication with peer nodes).
_syncNode->prePoll(fdm);

Expand All @@ -270,7 +260,6 @@ void BedrockServer::sync()

// Process any activity in our plugins.
AutoTimerTime postPollTime(postPollTimer);
_postPollCommands(fdm, nextActivity);
_syncNode->postPoll(fdm, nextActivity);
_syncNodeQueuedCommands.postPoll(fdm);
}
Expand Down Expand Up @@ -538,18 +527,13 @@ void BedrockServer::sync()
SERROR("peekCommand (" << command->request.getVerb() << ") returned invalid result code: " << (int)result);
}

// If we just started a new HTTPS request, save it for later.
// If this command attempted an HTTP request, kill it.
if (command->httpsRequests.size()) {
waitForHTTPS(move(command));
// TODO:
// Move the HTTPS loop into the worker, so that the worker can poll on its own requests.
// This is the first step toward linear workers that run start->finish without being
// interrupted and bounced back and forth between a bunch of queues.
//
// The follow-up to that is to allow direct escalations from workers to leader, which can
// be done as a simple HTTPS request for the exact same command.

// Move on to the next command until this one finishes.
SWARN("Killing command " << command->request.methodLine << " that attempted HTTPS request in sync thread.");
command->response.clear();
command->response.methodLine = "500 Refused";
command->complete = true;
_reply(command);
core.rollback();
break;
}
Expand Down Expand Up @@ -828,6 +812,34 @@ void BedrockServer::runCommand(unique_ptr<BedrockCommand>&& _command, bool isBlo

int64_t lastConflictPage = 0;
while (true) {

// If there are outstanding HTTPS requests on this command (from a previous call to `peek`) we process them here.
while (!command->areHttpsRequestsComplete()) {
SINFO("Running command network activity loop.");
fd_map fdm;
command->prePoll(fdm);
const uint64_t now = STimeNow();
uint64_t nextActivity = 0;
S_poll(fdm, max(nextActivity, now) - now);

// Timeout is five minutes unless we're shutting down or standing down, in which case it's 5 seconds.
// Note that BedrockCommad::postPoll sets the timeout to the command's timeout if it's lower than this value anyway,
// So this only has an effect if it will be shorter than the command's timeout.
uint64_t maxWaitMs = 5 * 60 * 1'000;
auto _syncNodeCopy = atomic_load(&_syncNode);
if (_shutdownState.load() != RUNNING || (_syncNodeCopy && _syncNodeCopy->getState() == SQLiteNodeState::STANDINGDOWN)) {
maxWaitMs = 5'000;
}

auto start = STimeNow();
command->postPoll(fdm, nextActivity, maxWaitMs);
auto elapsedUS = STimeNow() - start;
if (elapsedUS > 100'000) {
// We warn here as this is a potential serious performance issue that seems to happen sometimes.
SWARN("Post poll on command '" << command->request.methodLine << "' took " << elapsedUS << "us.");
}
}

// Get a DB handle to work on. This will automatically be returned when dbScope goes out of scope.
if (!_dbPool) {
SERROR("Can't run a command with no DB pool");
Expand Down Expand Up @@ -922,24 +934,14 @@ void BedrockServer::runCommand(unique_ptr<BedrockCommand>&& _command, bool isBlo
break;
}

// If the command isn't complete, we'll re-queue it.
if (command->repeek || !command->areHttpsRequestsComplete()) {
// Roll back the existing transaction, but only if we are inside an transaction
if (calledPeek) {
core.rollback();
}

if (!command->areHttpsRequestsComplete()) {
// If it has outstanding HTTPS requests, we'll wait for them.
waitForHTTPS(move(command));
} else if (command->repeek) {
// Otherwise, it needs to be re-peeked, but had no outstanding requests, so it goes
// back in the main queue.
_commandQueue.push(move(command));
}

// Move on to the next command until this one finishes.
break;
// Jump back to the top of our main `while (true)` loop and run the network activity loop again.
continue;
}
} else {
// If we haven't sent a quorum command to the sync thread in a while, auto-promote one.
Expand Down Expand Up @@ -1923,53 +1925,6 @@ bool BedrockServer::_upgradeDB(SQLite& db) {
return !db.getUncommittedQuery().empty();
}

void BedrockServer::_prePollCommands(fd_map& fdm) {
lock_guard<decltype(_httpsCommandMutex)> lock(_httpsCommandMutex);
for (auto& command : _outstandingHTTPSCommands) {
command->prePoll(fdm);
}

// Make sure that waiting for an HTTPS command interrupts the current `poll` in the sync thread.
_newCommandsWaiting.prePoll(fdm);
}

void BedrockServer::_postPollCommands(fd_map& fdm, uint64_t nextActivity) {
lock_guard<decltype(_httpsCommandMutex)> lock(_httpsCommandMutex);

// Just clear this, it doesn't matter what the contents are.
_newCommandsWaiting.postPoll(fdm);
_newCommandsWaiting.clear();

// Because we modify this list as we walk across it, we use an iterator to our current position.
auto it = _outstandingHTTPSCommands.begin();
while (it != _outstandingHTTPSCommands.end()) {
auto& command = *it;

// By default, we can poll up to 5 min.
uint64_t maxWaitMs = 5 * 60 * 1'000;
auto _syncNodeCopy = atomic_load(&_syncNode);
if (_shutdownState.load() != RUNNING || (_syncNodeCopy && _syncNodeCopy->getState() == SQLiteNodeState::STANDINGDOWN)) {
// But if we're trying to shut down, we give up after 5 seconds.
maxWaitMs = 5'000;
}
command->postPoll(fdm, nextActivity, maxWaitMs);

// If it finished all it's requests, put it back in the main queue.
if (command->areHttpsRequestsComplete()) {
SINFO("All HTTPS requests complete, returning to main queue.");

// Because sets contain only `const` data, they can't be moved-from without these weird `extract`
// semantics. This invalidates our iterator, so we save the one we want before we break it.
auto nextIt = next(it);
_commandQueue.push(move(_outstandingHTTPSCommands.extract(it).value()));
it = nextIt;
} else {
// otherwise just move on to the next command.
it++;
}
}
}

void BedrockServer::_beginShutdown(const string& reason, bool detach) {
if (_shutdownState.load() == RUNNING) {
_detach = detach;
Expand Down Expand Up @@ -2380,15 +2335,6 @@ void BedrockServer::handleSocket(Socket&& socket, bool fromControlPort, bool fro
}
}

void BedrockServer::waitForHTTPS(unique_ptr<BedrockCommand>&& command) {
SAUTOPREFIX(command->request);
lock_guard<mutex> lock(_httpsCommandMutex);
_outstandingHTTPSCommands.insert(move(command));

// Interrupt `poll` in the sync thread.
_newCommandsWaiting.push(true);
}

const atomic<SQLiteNodeState>& BedrockServer::getState() const {
return _nodeStateSnapshot == SQLiteNodeState::UNKNOWN ? _replicationState : _nodeStateSnapshot;
}
Expand Down
18 changes: 0 additions & 18 deletions BedrockServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,6 @@ class BedrockServer : public SQLiteServer {
// becomes leader. It will return true if the DB has changed and needs to be committed.
bool _upgradeDB(SQLite& db);

// Iterate across all of our plugins and call `prePoll` and `postPoll` on any httpsManagers they've created.
// TODO: Can we kill `nextActivity`?
void _prePollCommands(fd_map& fdm);
void _postPollCommands(fd_map& fdm, uint64_t nextActivity);

// Resets the server state so when the sync node restarts it is as if the BedrockServer object was just created.
void _resetServer();

Expand Down Expand Up @@ -423,19 +418,6 @@ class BedrockServer : public SQLiteServer {

mutex _httpsCommandMutex;

// This contains all of the command that _outstandingHTTPSRequests` points at. This allows us to keep only a single
// copy of each command, even if it has multiple requests. Sorted with the above `compareCommandByTimeout`.
set<unique_ptr<BedrockCommand>> _outstandingHTTPSCommands;

// Takes a command that has an outstanding HTTPS request and saves it in _outstandingHTTPSCommands until its HTTPS
// requests are complete.
void waitForHTTPS(unique_ptr<BedrockCommand>&& command);

// This doesn't really do anything in itself, but when we need to add new sockets to a poll loop (like when we
// create an outgoing http request) we queue something here, so that the poll loop in `sync` gets interrupted. This
// allows it to start again and pick up the new socket we just created.
SSynchronizedQueue<bool> _newCommandsWaiting;

// When we're standing down, we temporarily dump newly received commands here (this lets all existing
// partially-completed commands, like commands with HTTPS requests) finish without risking getting caught in an
// endless loop of always having new unfinished commands.
Expand Down
13 changes: 7 additions & 6 deletions sqlitecluster/SQLiteNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1810,7 +1810,10 @@ void SQLiteNode::_onDisconnect(SQLitePeer* peer) {
//
// It works for the sync thread as well, as there's handling in _changeState to rollback a commit when
// dropping out of leading or standing down (and there can't be commits in progress in other states).
SWARN("We were " << stateName(_state) << " but lost quorum. Going to SEARCHING.");
SWARN("[clustersync] We were " << stateName(_state) << " but lost quorum (Disconnected from " << peer->name << "). Going to SEARCHING.");
for (const auto* p : _peerList) {
SWARN("[clustersync] Peer " << p->name << " logged in? " << (p->loggedIn ? "TRUE" : "FALSE") << (p->permaFollower ? " (permaFollower)" : ""));
}
_changeState(SQLiteNodeState::SEARCHING);
}
}
Expand Down Expand Up @@ -2492,8 +2495,6 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) {
// interrupt that chain in a way that will cause the remote end to think you've had an
// error, and start over. So, once a connection is established, we should just use that one
// for all communication until it breaks.
peer->reset();
_onDisconnect(peer);
STHROW("Peer " + peer->name + " seems already connected.");
}
} else {
Expand Down Expand Up @@ -2545,9 +2546,9 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) {
break;
case SQLitePeer::PeerPostPollStatus::OK:
{
auto lastSendTime = peer->lastSendTime();
if (lastSendTime && STimeNow() - lastSendTime > SQLiteNode::RECV_TIMEOUT - 5 * STIME_US_PER_S) {
SINFO("Close to timeout, sending PING to peer '" << peer->name << "'");
auto lastActivityTime = max(peer->lastSendTime(), peer->lastRecvTime());
if (lastActivityTime && STimeNow() - lastActivityTime > SQLiteNode::RECV_TIMEOUT - 5 * STIME_US_PER_S) {
SINFO("Close to timeout (" << (STimeNow() - lastActivityTime) << "us since last activity), sending PING to peer '" << peer->name << "'");
_sendPING(peer);
}
try {
Expand Down
3 changes: 2 additions & 1 deletion sqlitecluster/SQLitePeer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ SQLitePeer::PeerPostPollStatus SQLitePeer::postPoll(fd_map& fdm, uint64_t& nextA
switch (socket->state.load()) {
case STCPManager::Socket::CONNECTED: {
// socket->lastRecvTime is always set, it's initialized to STimeNow() at creation.
if (socket->lastRecvTime + SQLiteNode::RECV_TIMEOUT < STimeNow()) {
auto lastActivityTime = max(socket->lastSendTime, socket->lastRecvTime);
if (lastActivityTime + SQLiteNode::RECV_TIMEOUT < STimeNow()) {
SHMMM("Connection with peer '" << name << "' timed out.");
return PeerPostPollStatus::SOCKET_ERROR;
}
Expand Down

0 comments on commit 81c8f08

Please sign in to comment.