Skip to content

Commit

Permalink
Merge pull request #804 from Expensify/master
Browse files Browse the repository at this point in the history
Update expensify_prod branch
  • Loading branch information
quinthar authored May 25, 2020
2 parents a8d5c58 + eb7dc18 commit 59cf382
Showing 1 changed file with 113 additions and 109 deletions.
222 changes: 113 additions & 109 deletions BedrockServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -533,134 +533,138 @@ void BedrockServer::sync(const SData& args,
continue;
}

// Reset this to blank. This releases the existing command and allows it to get cleaned up.
command = unique_ptr<BedrockCommand>(nullptr);
// We're going to run through all of the commands in our queue.
while (true) {

// Get the next sync node command to work on.
command = syncNodeQueuedCommands.pop();
// Reset this to blank. This releases the existing command and allows it to get cleaned up.
command = unique_ptr<BedrockCommand>(nullptr);

// We got a command to work on! Set our log prefix to the request ID.
SAUTOPREFIX(command->request);
SINFO("Sync thread dequeued command " << command->request.methodLine << ". Sync thread has "
<< syncNodeQueuedCommands.size() << " queued commands.");
// Get the next sync node command to work on.
command = syncNodeQueuedCommands.pop();

if (command->timeout() < STimeNow()) {
SINFO("Command '" << command->request.methodLine << "' timed out in sync thread queue, sending back to main queue.");
server._commandQueue.push(move(command));
continue;
}
// We got a command to work on! Set our log prefix to the request ID.
SAUTOPREFIX(command->request);
SINFO("Sync thread dequeued command " << command->request.methodLine << ". Sync thread has "
<< syncNodeQueuedCommands.size() << " queued commands.");

// Set the function that will be called if this thread's signal handler catches an unrecoverable error,
// like a segfault. Note that it's possible we're in the middle of sending a message to peers when we call
// this, which would probably make this message malformed. This is the best we can do.
SSetSignalHandlerDieFunc([&](){
server._syncNode->broadcast(_generateCrashMessage(command));
});
if (command->timeout() < STimeNow()) {
SINFO("Command '" << command->request.methodLine << "' timed out in sync thread queue, sending back to main queue.");
server._commandQueue.push(move(command));
break;
}

// And now we'll decide how to handle it.
if (nodeState == SQLiteNode::LEADING || nodeState == SQLiteNode::STANDINGDOWN) {
// Set the function that will be called if this thread's signal handler catches an unrecoverable error,
// like a segfault. Note that it's possible we're in the middle of sending a message to peers when we call
// this, which would probably make this message malformed. This is the best we can do.
SSetSignalHandlerDieFunc([&](){
server._syncNode->broadcast(_generateCrashMessage(command));
});

// And now we'll decide how to handle it.
if (nodeState == SQLiteNode::LEADING || nodeState == SQLiteNode::STANDINGDOWN) {

// We need to grab this before peekCommand (or wherever our transaction is started), to verify that
// no worker thread can commit in the middle of our transaction. We need our entire transaction to
// happen with no other commits to ensure that we can't get a conflict.
uint64_t beforeLock = STimeNow();

// This needs to be done before we acquire _syncThreadCommitMutex or we can deadlock.
db.waitForCheckpoint();
server._syncThreadCommitMutex.lock();

// It appears that this might be taking significantly longer with multi-write enabled, so we're adding
// explicit logging for it to check.
SINFO("[performance] Waited " << (STimeNow() - beforeLock)/1000 << "ms for _syncThreadCommitMutex.");

// We peek commands here in the sync thread to be able to run peek and process as part of the same
// transaction. This guarantees that any checks made in peek are still valid in process, as the DB can't
// have changed in the meantime.
// IMPORTANT: This check is omitted for commands with an HTTPS request object, because we don't want to
// risk duplicating that request. If your command creates an HTTPS request, it needs to explicitly
// re-verify that any checks made in peek are still valid in process.
if (!command->httpsRequests.size()) {
BedrockCore::RESULT result = core.peekCommand(command);
if (result == BedrockCore::RESULT::COMPLETE) {

// Finished with this.
server._syncThreadCommitMutex.unlock();

// This command completed in peek, respond to it appropriately, either directly or by sending it
// back to the sync thread.
SASSERT(command->complete);
if (command->initiatingPeerID) {
server._finishPeerCommand(command);
} else {
server._reply(command);
}
break;
} else if (result == BedrockCore::RESULT::SHOULD_PROCESS) {
// This is sort of the "default" case after checking if this command was complete above. If so,
// we'll fall through to calling processCommand below.
} else if (result == BedrockCore::RESULT::ABANDONED_FOR_CHECKPOINT) {
SINFO("[checkpoint] Re-queuing abandoned command (from peek) in sync thread");
server._commandQueue.push(move(command));
break;
} else {
SERROR("peekCommand (" << command->request.getVerb() << ") returned invalid result code: " << (int)result);
}

// We need to grab this before peekCommand (or wherever our transaction is started), to verify that
// no worker thread can commit in the middle of our transaction. We need our entire transaction to
// happen with no other commits to ensure that we can't get a conflict.
uint64_t beforeLock = STimeNow();
// If we just started a new HTTPS request, save it for later.
if (command->httpsRequests.size()) {
server.waitForHTTPS(move(command));

// This needs to be done before we acquire _syncThreadCommitMutex or we can deadlock.
db.waitForCheckpoint();
server._syncThreadCommitMutex.lock();
// Move on to the next command until this one finishes.
core.rollback();
server._syncThreadCommitMutex.unlock();
break;
}
}

// It appears that this might be taking significantly longer with multi-write enabled, so we're adding
// explicit logging for it to check.
SINFO("[performance] Waited " << (STimeNow() - beforeLock)/1000 << "ms for _syncThreadCommitMutex.");

// We peek commands here in the sync thread to be able to run peek and process as part of the same
// transaction. This guarantees that any checks made in peek are still valid in process, as the DB can't
// have changed in the meantime.
// IMPORTANT: This check is omitted for commands with an HTTPS request object, because we don't want to
// risk duplicating that request. If your command creates an HTTPS request, it needs to explicitly
// re-verify that any checks made in peek are still valid in process.
if (!command->httpsRequests.size()) {
BedrockCore::RESULT result = core.peekCommand(command);
if (result == BedrockCore::RESULT::COMPLETE) {

// Finished with this.
BedrockCore::RESULT result = core.processCommand(command);
if (result == BedrockCore::RESULT::NEEDS_COMMIT) {
// The processor says we need to commit this, so let's start that process.
committingCommand = true;
SINFO("[performance] Sync thread beginning committing command " << command->request.methodLine);
// START TIMING.
command->startTiming(BedrockCommand::COMMIT_SYNC);
server._syncNode->startCommit(command->writeConsistency);

// And we'll start the next main loop.
// NOTE: This will cause us to read from the network again. This, in theory, is fine, but we saw
// performance problems in the past trying to do something similar on every commit. This may be
// alleviated now that we're only doing this on *sync* commits instead of all commits, which should
// be a much smaller fraction of all our traffic. We set nextActivity here so that there's no
// timeout before we'll give up on poll() if there's nothing to read.
nextActivity = STimeNow();

// Don't unlock _syncThreadCommitMutex here, we'll hold the lock till the commit completes.
break;
} else if (result == BedrockCore::RESULT::NO_COMMIT_REQUIRED) {
// Otherwise, the command doesn't need a commit (maybe it was an error, or it didn't have any work
// to do). We'll just respond.
server._syncThreadCommitMutex.unlock();

// This command completed in peek, respond to it appropriately, either directly or by sending it
// back to the sync thread.
SASSERT(command->complete);
if (command->initiatingPeerID) {
server._finishPeerCommand(command);
} else {
server._reply(command);
}
continue;
} else if (result == BedrockCore::RESULT::SHOULD_PROCESS) {
// This is sort of the "default" case after checking if this command was complete above. If so,
// we'll fall through to calling processCommand below.
} else if (result == BedrockCore::RESULT::ABANDONED_FOR_CHECKPOINT) {
SINFO("[checkpoint] Re-queuing abandoned command (from peek) in sync thread");
SINFO("[checkpoint] Re-queuing abandoned command (from process) in sync thread");
server._commandQueue.push(move(command));
continue;
break;
} else {
SERROR("peekCommand (" << command->request.getVerb() << ") returned invalid result code: " << (int)result);
}

// If we just started a new HTTPS request, save it for later.
if (command->httpsRequests.size()) {
server.waitForHTTPS(move(command));

// Move on to the next command until this one finishes.
core.rollback();
server._syncThreadCommitMutex.unlock();
continue;
SERROR("processCommand (" << command->request.getVerb() << ") returned invalid result code: " << (int)result);
}
}

BedrockCore::RESULT result = core.processCommand(command);
if (result == BedrockCore::RESULT::NEEDS_COMMIT) {
// The processor says we need to commit this, so let's start that process.
committingCommand = true;
SINFO("[performance] Sync thread beginning committing command " << command->request.methodLine);
// START TIMING.
command->startTiming(BedrockCommand::COMMIT_SYNC);
server._syncNode->startCommit(command->writeConsistency);

// And we'll start the next main loop.
// NOTE: This will cause us to read from the network again. This, in theory, is fine, but we saw
// performance problems in the past trying to do something similar on every commit. This may be
// alleviated now that we're only doing this on *sync* commits instead of all commits, which should
// be a much smaller fraction of all our traffic. We set nextActivity here so that there's no
// timeout before we'll give up on poll() if there's nothing to read.
nextActivity = STimeNow();

// Don't unlock _syncThreadCommitMutex here, we'll hold the lock till the commit completes.
continue;
} else if (result == BedrockCore::RESULT::NO_COMMIT_REQUIRED) {
// Otherwise, the command doesn't need a commit (maybe it was an error, or it didn't have any work
// to do). We'll just respond.
server._syncThreadCommitMutex.unlock();
if (command->initiatingPeerID) {
server._finishPeerCommand(command);
} else {
server._reply(command);
}
} else if (result == BedrockCore::RESULT::ABANDONED_FOR_CHECKPOINT) {
SINFO("[checkpoint] Re-queuing abandoned command (from process) in sync thread");
server._commandQueue.push(move(command));
continue;
} else {
SERROR("processCommand (" << command->request.getVerb() << ") returned invalid result code: " << (int)result);
}
} else if (nodeState == SQLiteNode::FOLLOWING) {
// If we're following, we just escalate directly to leader without peeking. We can only get an incomplete
// command on the follower sync thread if a follower worker thread peeked it unsuccessfully, so we don't
// bother peeking it again.
auto it = command->request.nameValueMap.find("Connection");
bool forget = it != command->request.nameValueMap.end() && SIEquals(it->second, "forget");
server._syncNode->escalateCommand(move(command), forget);
if (forget) {
// Command is no longer in progress.
// When we're leading, we'll try and handle one command and then stop.
break;
} else if (nodeState == SQLiteNode::FOLLOWING) {
// If we're following, we just escalate directly to leader without peeking. We can only get an incomplete
// command on the follower sync thread if a follower worker thread peeked it unsuccessfully, so we don't
// bother peeking it again.
auto it = command->request.nameValueMap.find("Connection");
bool forget = it != command->request.nameValueMap.end() && SIEquals(it->second, "forget");
server._syncNode->escalateCommand(move(command), forget);
}
}
} catch (const out_of_range& e) {
Expand Down

0 comments on commit 59cf382

Please sign in to comment.