diff --git a/BedrockServer.cpp b/BedrockServer.cpp index 2dc7d796e..fdf4e68cf 100644 --- a/BedrockServer.cpp +++ b/BedrockServer.cpp @@ -32,12 +32,6 @@ bool BedrockServer::canStandDown() { size_t blockingQueueSize = _blockingCommandQueue.size(); size_t syncNodeQueueSize = _syncNodeQueuedCommands.size(); - // These two aren't all nicely packaged so we need to lock them ourselves. - size_t outstandingHTTPSCommandsSize = 0; - { - lock_guard lock(_httpsCommandMutex); - outstandingHTTPSCommandsSize = _outstandingHTTPSCommands.size(); - } size_t futureCommitCommandsSize = 0; { lock_guard lock(_futureCommitCommandMutex); @@ -48,7 +42,6 @@ bool BedrockServer::canStandDown() { << "mainQueueSize: " << mainQueueSize << ", " << "blockingQueueSize: " << blockingQueueSize << ", " << "syncNodeQueueSize: " << syncNodeQueueSize << ", " - << "outstandingHTTPSCommandsSize: " << outstandingHTTPSCommandsSize << ", " << "futureCommitCommandsSize: " << futureCommitCommandsSize << ", " << "standDownQueueSize: " << standDownQueueSize << "."); return false; @@ -243,9 +236,6 @@ void BedrockServer::sync() // activity. Once any of them has activity (or the timeout ends), poll will return. fd_map fdm; - // Prepare our commands for `poll` (for instance, in case they're making HTTP requests). - _prePollCommands(fdm); - // Pre-process any sockets the sync node is managing (i.e., communication with peer nodes). _syncNode->prePoll(fdm); @@ -270,7 +260,6 @@ void BedrockServer::sync() // Process any activity in our plugins. AutoTimerTime postPollTime(postPollTimer); - _postPollCommands(fdm, nextActivity); _syncNode->postPoll(fdm, nextActivity); _syncNodeQueuedCommands.postPoll(fdm); } @@ -538,18 +527,13 @@ void BedrockServer::sync() SERROR("peekCommand (" << command->request.getVerb() << ") returned invalid result code: " << (int)result); } - // If we just started a new HTTPS request, save it for later. + // If this command attempted an HTTP request, kill it. if (command->httpsRequests.size()) { - waitForHTTPS(move(command)); - // TODO: - // Move the HTTPS loop into the worker, so that the worker can poll on its own requests. - // This is the first step toward linear workers that run start->finish without being - // interrupted and bounced back and forth between a bunch of queues. - // - // The follow-up to that is to allow direct escalations from workers to leader, which can - // be done as a simple HTTPS request for the exact same command. - - // Move on to the next command until this one finishes. + SWARN("Killing command " << command->request.methodLine << " that attempted HTTPS request in sync thread."); + command->response.clear(); + command->response.methodLine = "500 Refused"; + command->complete = true; + _reply(command); core.rollback(); break; } @@ -828,6 +812,34 @@ void BedrockServer::runCommand(unique_ptr&& _command, bool isBlo int64_t lastConflictPage = 0; while (true) { + + // If there are outstanding HTTPS requests on this command (from a previous call to `peek`) we process them here. + while (!command->areHttpsRequestsComplete()) { + SINFO("Running command network activity loop."); + fd_map fdm; + command->prePoll(fdm); + const uint64_t now = STimeNow(); + uint64_t nextActivity = 0; + S_poll(fdm, max(nextActivity, now) - now); + + // Timeout is five minutes unless we're shutting down or standing down, in which case it's 5 seconds. + // Note that BedrockCommad::postPoll sets the timeout to the command's timeout if it's lower than this value anyway, + // So this only has an effect if it will be shorter than the command's timeout. + uint64_t maxWaitMs = 5 * 60 * 1'000; + auto _syncNodeCopy = atomic_load(&_syncNode); + if (_shutdownState.load() != RUNNING || (_syncNodeCopy && _syncNodeCopy->getState() == SQLiteNodeState::STANDINGDOWN)) { + maxWaitMs = 5'000; + } + + auto start = STimeNow(); + command->postPoll(fdm, nextActivity, maxWaitMs); + auto elapsedUS = STimeNow() - start; + if (elapsedUS > 100'000) { + // We warn here as this is a potential serious performance issue that seems to happen sometimes. + SWARN("Post poll on command '" << command->request.methodLine << "' took " << elapsedUS << "us."); + } + } + // Get a DB handle to work on. This will automatically be returned when dbScope goes out of scope. if (!_dbPool) { SERROR("Can't run a command with no DB pool"); @@ -922,24 +934,14 @@ void BedrockServer::runCommand(unique_ptr&& _command, bool isBlo break; } - // If the command isn't complete, we'll re-queue it. if (command->repeek || !command->areHttpsRequestsComplete()) { // Roll back the existing transaction, but only if we are inside an transaction if (calledPeek) { core.rollback(); } - if (!command->areHttpsRequestsComplete()) { - // If it has outstanding HTTPS requests, we'll wait for them. - waitForHTTPS(move(command)); - } else if (command->repeek) { - // Otherwise, it needs to be re-peeked, but had no outstanding requests, so it goes - // back in the main queue. - _commandQueue.push(move(command)); - } - - // Move on to the next command until this one finishes. - break; + // Jump back to the top of our main `while (true)` loop and run the network activity loop again. + continue; } } else { // If we haven't sent a quorum command to the sync thread in a while, auto-promote one. @@ -1923,53 +1925,6 @@ bool BedrockServer::_upgradeDB(SQLite& db) { return !db.getUncommittedQuery().empty(); } -void BedrockServer::_prePollCommands(fd_map& fdm) { - lock_guard lock(_httpsCommandMutex); - for (auto& command : _outstandingHTTPSCommands) { - command->prePoll(fdm); - } - - // Make sure that waiting for an HTTPS command interrupts the current `poll` in the sync thread. - _newCommandsWaiting.prePoll(fdm); -} - -void BedrockServer::_postPollCommands(fd_map& fdm, uint64_t nextActivity) { - lock_guard lock(_httpsCommandMutex); - - // Just clear this, it doesn't matter what the contents are. - _newCommandsWaiting.postPoll(fdm); - _newCommandsWaiting.clear(); - - // Because we modify this list as we walk across it, we use an iterator to our current position. - auto it = _outstandingHTTPSCommands.begin(); - while (it != _outstandingHTTPSCommands.end()) { - auto& command = *it; - - // By default, we can poll up to 5 min. - uint64_t maxWaitMs = 5 * 60 * 1'000; - auto _syncNodeCopy = atomic_load(&_syncNode); - if (_shutdownState.load() != RUNNING || (_syncNodeCopy && _syncNodeCopy->getState() == SQLiteNodeState::STANDINGDOWN)) { - // But if we're trying to shut down, we give up after 5 seconds. - maxWaitMs = 5'000; - } - command->postPoll(fdm, nextActivity, maxWaitMs); - - // If it finished all it's requests, put it back in the main queue. - if (command->areHttpsRequestsComplete()) { - SINFO("All HTTPS requests complete, returning to main queue."); - - // Because sets contain only `const` data, they can't be moved-from without these weird `extract` - // semantics. This invalidates our iterator, so we save the one we want before we break it. - auto nextIt = next(it); - _commandQueue.push(move(_outstandingHTTPSCommands.extract(it).value())); - it = nextIt; - } else { - // otherwise just move on to the next command. - it++; - } - } -} - void BedrockServer::_beginShutdown(const string& reason, bool detach) { if (_shutdownState.load() == RUNNING) { _detach = detach; @@ -2380,15 +2335,6 @@ void BedrockServer::handleSocket(Socket&& socket, bool fromControlPort, bool fro } } -void BedrockServer::waitForHTTPS(unique_ptr&& command) { - SAUTOPREFIX(command->request); - lock_guard lock(_httpsCommandMutex); - _outstandingHTTPSCommands.insert(move(command)); - - // Interrupt `poll` in the sync thread. - _newCommandsWaiting.push(true); -} - const atomic& BedrockServer::getState() const { return _nodeStateSnapshot == SQLiteNodeState::UNKNOWN ? _replicationState : _nodeStateSnapshot; } diff --git a/BedrockServer.h b/BedrockServer.h index 566341a1a..d08720a41 100644 --- a/BedrockServer.h +++ b/BedrockServer.h @@ -313,11 +313,6 @@ class BedrockServer : public SQLiteServer { // becomes leader. It will return true if the DB has changed and needs to be committed. bool _upgradeDB(SQLite& db); - // Iterate across all of our plugins and call `prePoll` and `postPoll` on any httpsManagers they've created. - // TODO: Can we kill `nextActivity`? - void _prePollCommands(fd_map& fdm); - void _postPollCommands(fd_map& fdm, uint64_t nextActivity); - // Resets the server state so when the sync node restarts it is as if the BedrockServer object was just created. void _resetServer(); @@ -423,19 +418,6 @@ class BedrockServer : public SQLiteServer { mutex _httpsCommandMutex; - // This contains all of the command that _outstandingHTTPSRequests` points at. This allows us to keep only a single - // copy of each command, even if it has multiple requests. Sorted with the above `compareCommandByTimeout`. - set> _outstandingHTTPSCommands; - - // Takes a command that has an outstanding HTTPS request and saves it in _outstandingHTTPSCommands until its HTTPS - // requests are complete. - void waitForHTTPS(unique_ptr&& command); - - // This doesn't really do anything in itself, but when we need to add new sockets to a poll loop (like when we - // create an outgoing http request) we queue something here, so that the poll loop in `sync` gets interrupted. This - // allows it to start again and pick up the new socket we just created. - SSynchronizedQueue _newCommandsWaiting; - // When we're standing down, we temporarily dump newly received commands here (this lets all existing // partially-completed commands, like commands with HTTPS requests) finish without risking getting caught in an // endless loop of always having new unfinished commands. diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index cdf3fb126..85bac7b23 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1810,7 +1810,10 @@ void SQLiteNode::_onDisconnect(SQLitePeer* peer) { // // It works for the sync thread as well, as there's handling in _changeState to rollback a commit when // dropping out of leading or standing down (and there can't be commits in progress in other states). - SWARN("We were " << stateName(_state) << " but lost quorum. Going to SEARCHING."); + SWARN("[clustersync] We were " << stateName(_state) << " but lost quorum (Disconnected from " << peer->name << "). Going to SEARCHING."); + for (const auto* p : _peerList) { + SWARN("[clustersync] Peer " << p->name << " logged in? " << (p->loggedIn ? "TRUE" : "FALSE") << (p->permaFollower ? " (permaFollower)" : "")); + } _changeState(SQLiteNodeState::SEARCHING); } } @@ -2492,8 +2495,6 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { // interrupt that chain in a way that will cause the remote end to think you've had an // error, and start over. So, once a connection is established, we should just use that one // for all communication until it breaks. - peer->reset(); - _onDisconnect(peer); STHROW("Peer " + peer->name + " seems already connected."); } } else { @@ -2545,9 +2546,9 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { break; case SQLitePeer::PeerPostPollStatus::OK: { - auto lastSendTime = peer->lastSendTime(); - if (lastSendTime && STimeNow() - lastSendTime > SQLiteNode::RECV_TIMEOUT - 5 * STIME_US_PER_S) { - SINFO("Close to timeout, sending PING to peer '" << peer->name << "'"); + auto lastActivityTime = max(peer->lastSendTime(), peer->lastRecvTime()); + if (lastActivityTime && STimeNow() - lastActivityTime > SQLiteNode::RECV_TIMEOUT - 5 * STIME_US_PER_S) { + SINFO("Close to timeout (" << (STimeNow() - lastActivityTime) << "us since last activity), sending PING to peer '" << peer->name << "'"); _sendPING(peer); } try { diff --git a/sqlitecluster/SQLitePeer.cpp b/sqlitecluster/SQLitePeer.cpp index 94f4eb5bd..624725b74 100644 --- a/sqlitecluster/SQLitePeer.cpp +++ b/sqlitecluster/SQLitePeer.cpp @@ -72,7 +72,8 @@ SQLitePeer::PeerPostPollStatus SQLitePeer::postPoll(fd_map& fdm, uint64_t& nextA switch (socket->state.load()) { case STCPManager::Socket::CONNECTED: { // socket->lastRecvTime is always set, it's initialized to STimeNow() at creation. - if (socket->lastRecvTime + SQLiteNode::RECV_TIMEOUT < STimeNow()) { + auto lastActivityTime = max(socket->lastSendTime, socket->lastRecvTime); + if (lastActivityTime + SQLiteNode::RECV_TIMEOUT < STimeNow()) { SHMMM("Connection with peer '" << name << "' timed out."); return PeerPostPollStatus::SOCKET_ERROR; }