diff --git a/BedrockServer.cpp b/BedrockServer.cpp index 07753d6ef..1140c1d36 100644 --- a/BedrockServer.cpp +++ b/BedrockServer.cpp @@ -932,155 +932,157 @@ void BedrockServer::runCommand(unique_ptr&& _command, bool isBlo if (lastConflictPage) { conflictLockStartTime = STimeNow(); } - PageLockGuard pageLock(lastConflictPage); - if (lastConflictPage) { - SINFO("Waited " << (STimeNow() - conflictLockStartTime) << "us for lock on db page " << lastConflictPage << "."); - } - delete timer; - - // If the command has any httpsRequests from a previous `peek`, we won't peek it again unless the - // command has specifically asked for that. - // If peek succeeds, then it's finished, and all we need to do is respond to the command at the bottom. - bool calledPeek = false; - BedrockCore::RESULT peekResult = BedrockCore::RESULT::INVALID; - if (command->repeek || !command->httpsRequests.size()) { - peekResult = core.peekCommand(command, isBlocking); - calledPeek = true; - } - - if (!calledPeek || peekResult == BedrockCore::RESULT::SHOULD_PROCESS) { - // We've just unsuccessfully peeked a command, which means we're in a state where we might want to - // write it. We'll flag that here, to keep the node from falling out of LEADING/STANDINGDOWN - // until we're finished with this command. - if (command->httpsRequests.size()) { - if (command->repeek || !command->areHttpsRequestsComplete()) { - // Roll back the existing transaction, but only if we are inside an transaction - if (calledPeek) { - core.rollback(); - } - - // Jump back to the top of our main `while (true)` loop and run the network activity loop again. - continue; - } - } else { - // If we haven't sent a quorum command to the sync thread in a while, auto-promote one. - uint64_t now = STimeNow(); - if (now > (_lastQuorumCommandTime + (_quorumCheckpointSeconds * 1'000'000))) { - SINFO("Forcing QUORUM for command '" << command->request.methodLine << "'."); - _lastQuorumCommandTime = now; - command->writeConsistency = SQLiteNode::QUORUM; - canWriteParallel = false; - } + { + PageLockGuard pageLock(lastConflictPage); + if (lastConflictPage) { + SINFO("Waited " << (STimeNow() - conflictLockStartTime) << "us for lock on db page " << lastConflictPage << "."); + } + delete timer; + + // If the command has any httpsRequests from a previous `peek`, we won't peek it again unless the + // command has specifically asked for that. + // If peek succeeds, then it's finished, and all we need to do is respond to the command at the bottom. + bool calledPeek = false; + BedrockCore::RESULT peekResult = BedrockCore::RESULT::INVALID; + if (command->repeek || !command->httpsRequests.size()) { + peekResult = core.peekCommand(command, isBlocking); + calledPeek = true; } - // Peek wasn't enough to handle this command. See if we think it should be writable in parallel. - if (!canWriteParallel) { - // Roll back the transaction, it'll get re-run in the sync thread. - core.rollback(); - auto _clusterMessengerCopy = _clusterMessenger; - if (state == SQLiteNodeState::LEADING) { - // Limit the command timeout to 20s to avoid blocking the sync thread long enough to cause the cluster to give up and elect a new leader (causing a fork), which happens - // after 30s. - command->setTimeout(20'000); - SINFO("Sending non-parallel command " << command->request.methodLine - << " to sync thread. Sync thread has " << _syncNodeQueuedCommands.size() << " queued commands."); - _syncNodeQueuedCommands.push(move(command)); - } else if (state == SQLiteNodeState::STANDINGDOWN) { - SINFO("Need to process command " << command->request.methodLine << " but STANDINGDOWN, moving to _standDownQueue."); - _standDownQueue.push(move(command)); - } else if (_clusterMessengerCopy && _clusterMessengerCopy->runOnPeer(*command, true)) { - SINFO("Escalated " << command->request.methodLine << " to leader and complete, responding."); - _reply(command); + if (!calledPeek || peekResult == BedrockCore::RESULT::SHOULD_PROCESS) { + // We've just unsuccessfully peeked a command, which means we're in a state where we might want to + // write it. We'll flag that here, to keep the node from falling out of LEADING/STANDINGDOWN + // until we're finished with this command. + if (command->httpsRequests.size()) { + if (command->repeek || !command->areHttpsRequestsComplete()) { + // Roll back the existing transaction, but only if we are inside an transaction + if (calledPeek) { + core.rollback(); + } + + // Jump back to the top of our main `while (true)` loop and run the network activity loop again. + continue; + } } else { - // TODO: Something less naive that considers how these failures happen rather than a simple - // endless loop of requeue and retry. - SINFO("Couldn't escalate command " << command->request.methodLine << " to leader. We are in state: " << SQLiteNode::stateName(state)); - _commandQueue.push(move(command)); + // If we haven't sent a quorum command to the sync thread in a while, auto-promote one. + uint64_t now = STimeNow(); + if (now > (_lastQuorumCommandTime + (_quorumCheckpointSeconds * 1'000'000))) { + SINFO("Forcing QUORUM for command '" << command->request.methodLine << "'."); + _lastQuorumCommandTime = now; + command->writeConsistency = SQLiteNode::QUORUM; + canWriteParallel = false; + } } - // Done with this command, look for the next one. - break; - } - - // In this case, there's nothing blocking us from processing this in a worker, so let's try it. - BedrockCore::RESULT result = core.processCommand(command, isBlocking); - if (result == BedrockCore::RESULT::NEEDS_COMMIT) { - // If processCommand returned true, then we need to do a commit. Otherwise, the command is - // done, and we just need to respond. Before we commit, we need to grab the sync thread - // lock. Because the sync thread grabs an exclusive lock on this wrapping any transactions - // that it performs, we'll get this lock while the sync thread isn't in the process of - // handling a transaction, thus guaranteeing that we can't commit and cause a conflict on - // the sync thread. We can still get conflicts here, as the sync thread might have - // performed a transaction after we called `processCommand` and before we call `commit`, - // or we could conflict with another worker thread, but the sync thread will never see a - // conflict as long as we don't commit while it's performing a transaction. This is scoped - // to the minimum time required. - bool commitSuccess = false; - uint64_t transactionID = 0; - string transactionHash; - { - // There used to be a mutex protecting this state change, with the idea that if we - // prevented state changes, we couldn't fall out of leading in the middle of processing a - // command. However, for "normal" graceful state changes, these changes are prevented by - // checking canStandDown(), and we can't fall out of STANDINGDOWN until there are no - // commands left. In the case of non-graceful state changes, i.e., we are spontaneously - // disconnected from the cluster, all this really does is prevent the sync thread from - // telling us about that until after we've already committed this transaction, which - // doesn't really help. In those cases, it's possible that we fork the DB here, but that's - // possible with or without a mutex for this, so we've removed it for the sake of - // simplicity. - if (_replicationState.load() != SQLiteNodeState::LEADING && - _replicationState.load() != SQLiteNodeState::STANDINGDOWN) { - SALERT("Node State changed from LEADING to " - << SQLiteNode::stateName(_replicationState.load()) - << " during worker commit. Rolling back transaction!"); - core.rollback(); + // Peek wasn't enough to handle this command. See if we think it should be writable in parallel. + if (!canWriteParallel) { + // Roll back the transaction, it'll get re-run in the sync thread. + core.rollback(); + auto _clusterMessengerCopy = _clusterMessenger; + if (state == SQLiteNodeState::LEADING) { + // Limit the command timeout to 20s to avoid blocking the sync thread long enough to cause the cluster to give up and elect a new leader (causing a fork), which happens + // after 30s. + command->setTimeout(20'000); + SINFO("Sending non-parallel command " << command->request.methodLine + << " to sync thread. Sync thread has " << _syncNodeQueuedCommands.size() << " queued commands."); + _syncNodeQueuedCommands.push(move(command)); + } else if (state == SQLiteNodeState::STANDINGDOWN) { + SINFO("Need to process command " << command->request.methodLine << " but STANDINGDOWN, moving to _standDownQueue."); + _standDownQueue.push(move(command)); + } else if (_clusterMessengerCopy && _clusterMessengerCopy->runOnPeer(*command, true)) { + SINFO("Escalated " << command->request.methodLine << " to leader and complete, responding."); + _reply(command); } else { - BedrockCore::AutoTimer timer(command, isBlocking ? BedrockCommand::BLOCKING_COMMIT_WORKER : BedrockCommand::COMMIT_WORKER); - void (*onPrepareHandler)(SQLite& db, int64_t tableID) = nullptr; - bool enableOnPrepareNotifications = command->shouldEnableOnPrepareNotification(db, &onPrepareHandler); - commitSuccess = core.commit(SQLiteNode::stateName(_replicationState), transactionID, - transactionHash, enableOnPrepareNotifications, onPrepareHandler); + // TODO: Something less naive that considers how these failures happen rather than a simple + // endless loop of requeue and retry. + SINFO("Couldn't escalate command " << command->request.methodLine << " to leader. We are in state: " << SQLiteNode::stateName(state)); + _commandQueue.push(move(command)); } + + // Done with this command, look for the next one. + break; } - if (commitSuccess) { - // Tell the sync node that there's been a commit so that it can jump out of it's "poll" - // loop and send it to followers. NOTE: we don't check for null here, that should be - // impossible inside a worker thread. - _syncNode->notifyCommit(); - SINFO("Committed leader transaction #" << transactionID << "(" << transactionHash << "). Command: '" << command->request.methodLine << "', blocking: " - << (isBlocking ? "true" : "false")); - _conflictManager.recordTables(command->request.methodLine, db.getTablesUsed()); - // So we must still be leading, and at this point our commit has succeeded, let's - // mark it as complete. We add the currentCommit count here as well. - command->response["commitCount"] = to_string(db.getCommitCount()); - command->complete = true; - } else { - SINFO("Conflict or state change committing " << command->request.methodLine << " on worker thread."); - if (_enableConflictPageLocks) { - lastConflictPage = db.getLastConflictPage(); + + // In this case, there's nothing blocking us from processing this in a worker, so let's try it. + BedrockCore::RESULT result = core.processCommand(command, isBlocking); + if (result == BedrockCore::RESULT::NEEDS_COMMIT) { + // If processCommand returned true, then we need to do a commit. Otherwise, the command is + // done, and we just need to respond. Before we commit, we need to grab the sync thread + // lock. Because the sync thread grabs an exclusive lock on this wrapping any transactions + // that it performs, we'll get this lock while the sync thread isn't in the process of + // handling a transaction, thus guaranteeing that we can't commit and cause a conflict on + // the sync thread. We can still get conflicts here, as the sync thread might have + // performed a transaction after we called `processCommand` and before we call `commit`, + // or we could conflict with another worker thread, but the sync thread will never see a + // conflict as long as we don't commit while it's performing a transaction. This is scoped + // to the minimum time required. + bool commitSuccess = false; + uint64_t transactionID = 0; + string transactionHash; + { + // There used to be a mutex protecting this state change, with the idea that if we + // prevented state changes, we couldn't fall out of leading in the middle of processing a + // command. However, for "normal" graceful state changes, these changes are prevented by + // checking canStandDown(), and we can't fall out of STANDINGDOWN until there are no + // commands left. In the case of non-graceful state changes, i.e., we are spontaneously + // disconnected from the cluster, all this really does is prevent the sync thread from + // telling us about that until after we've already committed this transaction, which + // doesn't really help. In those cases, it's possible that we fork the DB here, but that's + // possible with or without a mutex for this, so we've removed it for the sake of + // simplicity. + if (_replicationState.load() != SQLiteNodeState::LEADING && + _replicationState.load() != SQLiteNodeState::STANDINGDOWN) { + SALERT("Node State changed from LEADING to " + << SQLiteNode::stateName(_replicationState.load()) + << " during worker commit. Rolling back transaction!"); + core.rollback(); + } else { + BedrockCore::AutoTimer timer(command, isBlocking ? BedrockCommand::BLOCKING_COMMIT_WORKER : BedrockCommand::COMMIT_WORKER); + void (*onPrepareHandler)(SQLite& db, int64_t tableID) = nullptr; + bool enableOnPrepareNotifications = command->shouldEnableOnPrepareNotification(db, &onPrepareHandler); + commitSuccess = core.commit(SQLiteNode::stateName(_replicationState), transactionID, + transactionHash, enableOnPrepareNotifications, onPrepareHandler); + } + } + if (commitSuccess) { + // Tell the sync node that there's been a commit so that it can jump out of it's "poll" + // loop and send it to followers. NOTE: we don't check for null here, that should be + // impossible inside a worker thread. + _syncNode->notifyCommit(); + SINFO("Committed leader transaction #" << transactionID << "(" << transactionHash << "). Command: '" << command->request.methodLine << "', blocking: " + << (isBlocking ? "true" : "false")); + _conflictManager.recordTables(command->request.methodLine, db.getTablesUsed()); + // So we must still be leading, and at this point our commit has succeeded, let's + // mark it as complete. We add the currentCommit count here as well. + command->response["commitCount"] = to_string(db.getCommitCount()); + command->complete = true; + } else { + SINFO("Conflict or state change committing " << command->request.methodLine << " on worker thread."); + if (_enableConflictPageLocks) { + lastConflictPage = db.getLastConflictPage(); + } + } + } else if (result == BedrockCore::RESULT::NO_COMMIT_REQUIRED) { + // Nothing to do in this case, `command->complete` will be set and we'll finish as we fall out + // of this block. + } else if (result == BedrockCore::RESULT::SERVER_NOT_LEADING) { + // We won't write regardless. + core.rollback(); + + // If there are no HTTPS requests, we can just re-queue this command, otherwise, we will + // potentially run the same HTTPS requests twice. + if (command->httpsRequests.size()) { + SALERT("Server stopped leading while running command with HTTPS requests!"); + command->response.methodLine = "500 Leader stopped leading"; + _reply(command); + break; + } else { + // Allow for an extra retry and start from the top. + SINFO("State changed before 'processCommand' but no HTTPS requests so retrying."); } - } - } else if (result == BedrockCore::RESULT::NO_COMMIT_REQUIRED) { - // Nothing to do in this case, `command->complete` will be set and we'll finish as we fall out - // of this block. - } else if (result == BedrockCore::RESULT::SERVER_NOT_LEADING) { - // We won't write regardless. - core.rollback(); - - // If there are no HTTPS requests, we can just re-queue this command, otherwise, we will - // potentially run the same HTTPS requests twice. - if (command->httpsRequests.size()) { - SALERT("Server stopped leading while running command with HTTPS requests!"); - command->response.methodLine = "500 Leader stopped leading"; - _reply(command); - break; } else { - // Allow for an extra retry and start from the top. - SINFO("State changed before 'processCommand' but no HTTPS requests so retrying."); + SERROR("processCommand (" << command->request.getVerb() << ") returned invalid result code: " << (int)result); } - } else { - SERROR("processCommand (" << command->request.getVerb() << ") returned invalid result code: " << (int)result); } } // If the command was completed above, then we'll go ahead and respond. Otherwise there must have been