Skip to content

Commit

Permalink
Merge pull request #1560 from Expensify/dsilva_addingVersionMismatchH…
Browse files Browse the repository at this point in the history
…andling

Adding version mismatch handling
  • Loading branch information
tylerkaraszewski authored Sep 18, 2023
2 parents 28551f8 + d613c06 commit 828f739
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 45 deletions.
20 changes: 12 additions & 8 deletions BedrockServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -772,17 +772,21 @@ void BedrockServer::runCommand(unique_ptr<BedrockCommand>&& _command, bool isBlo
// our state right before we commit.
SQLiteNodeState state = _replicationState.load();

// If we're following, we will automatically escalate any command that's not already complete (complete
// commands are likely already returned from leader with legacy escalation) and is marked as
// `escalateImmediately` (which lets them skip the queue, which is particularly useful if they're waiting
// for a previous commit to be delivered to this follower), OR if we're on a different version from leader.
if (state == SQLiteNodeState::FOLLOWING && (_version != _leaderVersion.load() || command->escalateImmediately) && !command->complete) {
// If we're following, we will automatically escalate any command that's:
// 1. Not already complete (complete commands are likely already returned from leader with legacy escalation)
// and is marked as `escalateImmediately` (which lets them skip the queue, which is particularly useful if they're waiting
// for a previous commit to be delivered to this follower);
// 2. Any commands if the current version of the code is not the same one as leader is executing.
if (state == SQLiteNodeState::FOLLOWING && !command->complete && (command->escalateImmediately || _version != _leaderVersion.load())) {
auto _clusterMessengerCopy = _clusterMessenger;
if (_clusterMessengerCopy && _clusterMessengerCopy->runOnLeader(*command)) {
string escalatedTo = "";
if (command->escalateImmediately && _clusterMessengerCopy && _clusterMessengerCopy->runOnPeer(*command, true)) {
// command->complete is now true for this command. It will get handled a few lines below.
SINFO("Immediately escalated " << command->request.methodLine << " to leader.");
} else if (_version != _leaderVersion.load() && _clusterMessengerCopy && _clusterMessengerCopy->runOnPeer(*command, false)) {
SINFO("Escalated " << command->request.methodLine << " to follower peer.");
} else {
SINFO("Couldn't immediately escalate command " << command->request.methodLine << " to leader, queuing normally.");
SINFO("Couldn't escalate command " << command->request.methodLine << " to " << (command->escalateImmediately ? "leader" : "follower peer") << ", queuing it again.");
_commandQueue.push(move(command));
return;
}
Expand Down Expand Up @@ -966,7 +970,7 @@ void BedrockServer::runCommand(unique_ptr<BedrockCommand>&& _command, bool isBlo
} else if (state == SQLiteNodeState::STANDINGDOWN) {
SINFO("Need to process command " << command->request.methodLine << " but STANDINGDOWN, moving to _standDownQueue.");
_standDownQueue.push(move(command));
} else if (_clusterMessengerCopy && _clusterMessengerCopy->runOnLeader(*command)) {
} else if (_clusterMessengerCopy && _clusterMessengerCopy->runOnPeer(*command, true)) {
SINFO("Escalated " << command->request.methodLine << " to leader and complete, responding.");
_reply(command);
} else {
Expand Down
21 changes: 11 additions & 10 deletions sqlitecluster/SQLiteClusterMessenger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,25 +241,26 @@ unique_ptr<SHTTPSManager::Socket> SQLiteClusterMessenger::_getSocketForAddress(s
return s;
}

bool SQLiteClusterMessenger::runOnLeader(BedrockCommand& command) {
bool SQLiteClusterMessenger::runOnPeer(BedrockCommand& command, bool runOnLeader) {
auto start = chrono::steady_clock::now();
bool sent = false;
string peerType = runOnLeader ? "leader" : "follower peer";
size_t sleepsDueToFailures = 0;
string leaderAddress;
string peerAddress;

unique_ptr<SHTTPSManager::Socket> s;
while (chrono::steady_clock::now() < (start + 5s) && !sent) {
leaderAddress = _node->leaderCommandAddress();
if (leaderAddress.empty()) {
peerAddress = runOnLeader ? _node->leaderCommandAddress() : _node->getEligibleFollowerForForwardingAddress();
if (peerAddress.empty()) {
// If there's no leader, it's possible we're supposed to be the leader. In this case, we can exit early.
auto myState = _node->getState();
if (myState == SQLiteNodeState::LEADING || myState == SQLiteNodeState::STANDINGUP) {
if (runOnLeader && (myState == SQLiteNodeState::LEADING || myState == SQLiteNodeState::STANDINGUP)) {
SINFO("[HTTPESC] I'm the leader now! Exiting early.");
return false;
}

// Otherwise, just wait until there is a leader.
SINFO("[HTTPESC] No leader address.");
SINFO("[HTTPESC] No " + peerType + " address.");
sleepsDueToFailures++;
usleep(500'000);
continue;
Expand All @@ -268,7 +269,7 @@ bool SQLiteClusterMessenger::runOnLeader(BedrockCommand& command) {
// Start our escalation timing
command.escalationTimeUS = STimeNow();

s = _getSocketForAddress(leaderAddress);
s = _getSocketForAddress(peerAddress);
if (!s) {
command.escalationTimeUS = STimeNow() - command.escalationTimeUS;
return false;
Expand All @@ -282,8 +283,8 @@ bool SQLiteClusterMessenger::runOnLeader(BedrockCommand& command) {
}

// If we fell out of the loop simply because we did not get a leader address in time, we can return false and retry later.
if (leaderAddress.empty()) {
SINFO("[HTTPESC] Could not get leader address in 5s, will retry later.");
if (peerAddress.empty()) {
SINFO("[HTTPESC] Could not get " + peerType + " address in 5s, will retry later.");
return false;
}

Expand All @@ -301,7 +302,7 @@ bool SQLiteClusterMessenger::runOnLeader(BedrockCommand& command) {

// Since everything went fine with this command, we can save its socket, unless it's being closed.
if (!commandWillCloseSocket(command)) {
_socketPool.returnSocket(move(s), leaderAddress);
_socketPool.returnSocket(move(s), peerAddress);
}

return true;
Expand Down
8 changes: 4 additions & 4 deletions sqlitecluster/SQLiteClusterMessenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ class SQLiteClusterMessenger {

SQLiteClusterMessenger(const shared_ptr<const SQLiteNode> node);

// Attempts to make a TCP connection to the leader, and run the given command there, setting the appropriate
// response from leader in the command, and marking it as complete if possible.
// Attempts to make a TCP connection to a peer, that could be the leader or not, and run the given command there,
// setting the appropriate response from the peer in the command, and marking it as complete if possible.
// Returns command->complete at the end of the function, this is true if the command was successfully completed on
// leader, or if a fatal error occurred. This will be false if the command can be re-tried later (for instance, if
// the peer, or if a fatal error occurred. This will be false if the command can be re-tried later (for instance, if
// no connection to leader could be made).
bool runOnLeader(BedrockCommand& command);
bool runOnPeer(BedrockCommand& command, bool runOnLeader);

// Attempts to run command on every peer. This is done in threads, so the
// order in which the peers run the command is not deterministic. Returns a
Expand Down
19 changes: 19 additions & 0 deletions sqlitecluster/SQLiteNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,25 @@ list<STable> SQLiteNode::getPeerInfo() const {
return peerData;
}

string SQLiteNode::getEligibleFollowerForForwardingAddress() const {
vector<string> validPeers;
const string leaderVersion = getLeaderVersion();
if (leaderVersion.empty()) {
return "";
}
for (SQLitePeer* peer : _peerList) {
// We want only nodes that are using the same version as leader and are currently followers
if (peer->version.load() != leaderVersion || peer->state.load() != SQLiteNodeState::FOLLOWING) {
continue;
}
validPeers.push_back(peer->name);
}
if (validPeers.empty()) {
return "";
}
return getPeerByName(validPeers[rand() % validPeers.size()])->commandAddress.load();
}

// --------------------------------------------------------------------------
// State Machine
// --------------------------------------------------------------------------
Expand Down
3 changes: 3 additions & 0 deletions sqlitecluster/SQLiteNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ class SQLiteNode : public STCPManager {
// Can block.
list<STable> getPeerInfo() const;

// Gets a random follower peer that is in the same version as leader.
string getEligibleFollowerForForwardingAddress() const;

// Returns our current priority.
// Does not block.
int getPriority() const;
Expand Down
12 changes: 11 additions & 1 deletion test/clustertest/testplugin/TestPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ unique_ptr<BedrockCommand> BedrockPlugin_TestPlugin::getCommand(SQLiteCommand&&
"prepeekcommand",
"postprocesscommand",
"prepeekpostprocesscommand",
"preparehandler"
"preparehandler",
"testquery"
};
for (auto& cmdName : supportedCommands) {
if (SStartsWith(baseCommand.request.methodLine, cmdName)) {
Expand Down Expand Up @@ -342,6 +343,13 @@ bool TestPluginCommand::peek(SQLite& db) {
string statString = "Peeking testescalate (" + serverState + ")\n";
fileAppend(request["tempFile"], statString);
return false;
} else if (request.methodLine == "testquery") {
response["nodeRequestWasExecuted"] = plugin().server.args["-nodeName"];
if (SStartsWith(request["Query"], "SELECT")) {
db.read(request["Query"]);
return true;
}
return false;
}

return false;
Expand Down Expand Up @@ -497,6 +505,8 @@ void TestPluginCommand::process(SQLite& db) {
jsonContent["cole"] = "hello";
db.write("INSERT INTO test (id, value) VALUES (999999888, 'this is a test');");
return;
} else if (request.methodLine == "testquery") {
db.write(request["Query"]);
}
}

Expand Down
69 changes: 47 additions & 22 deletions test/clustertest/tests/VersionMismatchTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,60 @@

struct VersionMismatchTest : tpunit::TestFixture {
VersionMismatchTest()
: tpunit::TestFixture("VersionMismatch", TEST(VersionMismatchTest::test)) { }
: tpunit::TestFixture("VersionMismatch",
BEFORE_CLASS(VersionMismatchTest::setup),
TEST(VersionMismatchTest::testReadEscalation),
TEST(VersionMismatchTest::testWriteEscalation),
AFTER_CLASS(VersionMismatchTest::setup)) { }

void test()
{
// Create a cluster.
BedrockClusterTester tester;
BedrockClusterTester* tester = nullptr;

void setup() {
tester = new BedrockClusterTester(ClusterSize::FIVE_NODE_CLUSTER, {"CREATE TABLE test (id INTEGER NOT NULL PRIMARY KEY, value TEXT NOT NULL)"});
// Restart one of the followers on a new version.
tester.getTester(2).stopServer();
tester.getTester(2).updateArgs({{"-versionOverride", "ABCDE"}});
tester.getTester(2).startServer();
tester->getTester(2).stopServer();
tester->getTester(2).updateArgs({{"-versionOverride", "ABCDE"}});
tester->getTester(2).startServer();

// Restart one of the followers on a new version.
tester->getTester(4).stopServer();
tester->getTester(4).updateArgs({{"-versionOverride", "ABCDE"}});
tester->getTester(4).startServer();
}
void destroy() {
delete tester;
}
void testReadEscalation()
{
// Send a query to all three and make sure the version-mismatched one escalates.
// Can do them all in parallel so might as well.
list<thread> threads;
for (size_t i = 0; i < 3; i++) {
threads.emplace_back([this, i, &tester](){
SData command("Query");
command["Query"] = "SELECT 1;";
auto result = tester.getTester(i).executeWaitMultipleData({command})[0];
for (size_t i = 0; i < 5; i++) {
SData command("testquery");
command["Query"] = "SELECT 1;";
auto result = tester->getTester(i).executeWaitMultipleData({command})[0];

// For read commands sent directly to leader, or to a follower on the same version as leader, there
// should be no upstream times. However, on a follower on a different version to leader, it should
// escalates even read commands.
ASSERT_EQUAL(result.isSet("upstreamPeekTime"), i == 2);
});
// For read commands sent directly to leader, or to a follower on the same version as leader, there
// we don't care about how they are executed. However, on a follower on a different version to leader,
// it should escalates even read commands to follower peers.
ASSERT_TRUE(result["nodeRequestWasExecuted"].length() > 0);
if (i == 2 || i == 4) {
// Confirm it didn't execute in leader
ASSERT_NOT_EQUAL(result["nodeRequestWasExecuted"], "cluster_node_0");

// Confirm it didn't execute in the server with version mismatch
ASSERT_NOT_EQUAL(result["nodeRequestWasExecuted"], "cluster_node_" + to_string(i));
}
}
for (auto& t : threads) {
t.join();
}
void testWriteEscalation()
{
for (int64_t i = 0; i < 5; i++) {
SData command("testquery");
command["Query"] = "INSERT INTO test VALUES(" + SQ(i) + ", " + SQ("val") + ");";
auto result = tester->getTester(i).executeWaitMultipleData({command})[0];

// For read commands sent directly to leader, or to a follower on the same version as leader, the one
// that will final execute the request should always be the leader
ASSERT_EQUAL(result["nodeRequestWasExecuted"], "cluster_node_0");
}
}
} __VersionMismatchTest;

0 comments on commit 828f739

Please sign in to comment.