Skip to content

Commit

Permalink
Merge pull request #848 from Expensify/master
Browse files Browse the repository at this point in the history
Update expensify_prod branch
  • Loading branch information
rafecolton authored Aug 18, 2020
2 parents 26ece23 + 853ad1d commit 0cf6578
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 57 deletions.
3 changes: 2 additions & 1 deletion BedrockCommand.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ atomic<size_t> BedrockCommand::_commandCount(0);

const string BedrockCommand::defaultPluginName("NO_PLUGIN");

BedrockCommand::BedrockCommand(SQLiteCommand&& baseCommand, BedrockPlugin* plugin) :
BedrockCommand::BedrockCommand(SQLiteCommand&& baseCommand, BedrockPlugin* plugin, bool escalateImmediately_) :
SQLiteCommand(move(baseCommand)),
priority(PRIORITY_NORMAL),
peekCount(0),
processCount(0),
repeek(false),
crashIdentifyingValues(*this),
escalateImmediately(escalateImmediately_),
_plugin(plugin),
_inProgressTiming(INVALID, 0, 0),
_timeout(_getTimeout(request))
Expand Down
6 changes: 5 additions & 1 deletion BedrockCommand.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class BedrockCommand : public SQLiteCommand {
static const uint64_t DEFAULT_PROCESS_TIMEOUT = 30'000; // 30 seconds.

// Constructor to initialize via a request object (by move).
BedrockCommand(SQLiteCommand&& baseCommand, BedrockPlugin* plugin);
BedrockCommand(SQLiteCommand&& baseCommand, BedrockPlugin* plugin, bool escalateImmediately_ = false);

// Destructor.
virtual ~BedrockCommand();
Expand Down Expand Up @@ -146,6 +146,10 @@ class BedrockCommand : public SQLiteCommand {
// Return the number of commands in existence.
static size_t getCommandCount() { return _commandCount.load(); }

// True if this command should be escalated immediately. This can be true for any command that does all of its work
// in `process` instead of peek, as it will always be escalated to leader
const bool escalateImmediately;

protected:
// The plugin that owns this command.
BedrockPlugin* _plugin;
Expand Down
51 changes: 30 additions & 21 deletions BedrockServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -873,27 +873,6 @@ void BedrockServer::worker(SQLitePool& dbPool,
usleep(10000);
}

// If this command is dependent on a commitCount newer than what we have (maybe it's a follow-up to a
// command that was escalated to leader), we'll set it aside for later processing. When the sync node
// finishes its update loop, it will re-queue any of these commands that are no longer blocked on our
// updated commit count.
uint64_t commitCount = db.getCommitCount();
uint64_t commandCommitCount = command->request.calcU64("commitCount");
if (commandCommitCount > commitCount) {
SAUTOLOCK(server._futureCommitCommandMutex);
auto newQueueSize = server._futureCommitCommands.size() + 1;
SINFO("Command (" << command->request.methodLine << ") depends on future commit (" << commandCommitCount
<< "), Currently at: " << commitCount << ", storing for later. Queue size: " << newQueueSize);
server._futureCommitCommandTimeouts.insert(make_pair(command->timeout(), commandCommitCount));
server._futureCommitCommands.insert(make_pair(commandCommitCount, move(command)));

// Don't count this as `in progress`, it's just sitting there.
if (newQueueSize > 100) {
SHMMM("server._futureCommitCommands.size() == " << newQueueSize);
}
continue;
}

// OK, so this is the state right now, which isn't necessarily anything in particular, because the sync
// node can change it at any time, and we're not synchronizing on it. We're going to go ahead and assume
// it's something reasonable, because in most cases, that's pretty safe. If we think we're anything but
Expand All @@ -902,6 +881,15 @@ void BedrockServer::worker(SQLitePool& dbPool,
// our state right before we commit.
SQLiteNode::State state = replicationState.load();

// If we're following, any incomplete commands can be immediately escalated to leader. This saves the work
// of a `peek` operation, but more importantly, it skips any delays that might be introduced by waiting in
// the `_futureCommitCommands` queue.
if (state == SQLiteNode::FOLLOWING && command->escalateImmediately && !command->complete) {
SINFO("Immediately escalating " << command->request.methodLine << " to leader. Sync thread has " << syncNodeQueuedCommands.size() << " queued commands.");
syncNodeQueuedCommands.push(move(command));
continue;
}

// If we find that we've gotten a command with an initiatingPeerID, but we're not in a leading or
// standing down state, we'll have no way of returning this command to the caller, so we discard it. The
// original caller will need to re-send the request. This can happen if we're leading, and receive a
Expand All @@ -917,6 +905,27 @@ void BedrockServer::worker(SQLitePool& dbPool,
continue;
}

// If this command is dependent on a commitCount newer than what we have (maybe it's a follow-up to a
// command that was escalated to leader), we'll set it aside for later processing. When the sync node
// finishes its update loop, it will re-queue any of these commands that are no longer blocked on our
// updated commit count.
uint64_t commitCount = db.getCommitCount();
uint64_t commandCommitCount = command->request.calcU64("commitCount");
if (commandCommitCount > commitCount) {
SAUTOLOCK(server._futureCommitCommandMutex);
auto newQueueSize = server._futureCommitCommands.size() + 1;
SINFO("Command (" << command->request.methodLine << ") depends on future commit (" << commandCommitCount
<< "), Currently at: " << commitCount << ", storing for later. Queue size: " << newQueueSize);
server._futureCommitCommandTimeouts.insert(make_pair(command->timeout(), commandCommitCount));
server._futureCommitCommands.insert(make_pair(commandCommitCount, move(command)));

// Don't count this as `in progress`, it's just sitting there.
if (newQueueSize > 100) {
SHMMM("server._futureCommitCommands.size() == " << newQueueSize);
}
continue;
}

// If this command is already complete, then we should be a follower, and the sync node got a response back
// from a command that had been escalated to leader, and queued it for a worker to respond to. We'll send
// that response now.
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ TESTOBJ = $(TESTCPP:%.cpp=$(INTERMEDIATEDIR)/%.o)
TESTDEP = $(TESTCPP:%.cpp=$(INTERMEDIATEDIR)/%.d)

CLUSTERTESTCPP = $(shell find test -name '*.cpp' -not -path 'test/tests*' -not -path "test/main.cpp")
CLUSTERTESTCPP += test/tests/jobs/JobTestHelper.cpp
CLUSTERTESTOBJ = $(CLUSTERTESTCPP:%.cpp=$(INTERMEDIATEDIR)/%.o)
CLUSTERTESTDEP = $(CLUSTERTESTCPP:%.cpp=$(INTERMEDIATEDIR)/%.d)

Expand Down
9 changes: 8 additions & 1 deletion plugins/Jobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ const set<string, STableComp> BedrockPlugin_Jobs::supportedRequestVerbs = {
"RequeueJobs",
};

bool BedrockJobsCommand::canEscalateImmediately(SQLiteCommand& baseCommand) {
// This is a set of commands that we will escalate to leader without waiting. It's not intended to be complete but
// to solve the biggest issues we have with slow escalation times (i.e., this is usually a problem for `FinishJob`).
static const set<string> commands = {"CreateJob", "CreateJobs", "FinishJob"};
return commands.count(baseCommand.request.methodLine);
}

// Disable noop mode for the lifetime of this object.
class scopedDisableNoopMode {
public:
Expand All @@ -45,7 +52,7 @@ class scopedDisableNoopMode {
};

BedrockJobsCommand::BedrockJobsCommand(SQLiteCommand&& baseCommand, BedrockPlugin_Jobs* plugin) :
BedrockCommand(move(baseCommand), plugin)
BedrockCommand(move(baseCommand), plugin, canEscalateImmediately(baseCommand))
{
}

Expand Down
3 changes: 3 additions & 0 deletions plugins/Jobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,7 @@ class BedrockJobsCommand : public BedrockCommand {
void _validatePriority(const int64_t priority);

bool mockRequest;

// Returns true if this command can skip straight to leader for process.
bool canEscalateImmediately(SQLiteCommand& baseCommand);
};
Loading

0 comments on commit 0cf6578

Please sign in to comment.