Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update expensify_prod branch #1566

Merged
merged 23 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
387ef4f
added new method to get valid peer
danieldoglas Aug 4, 2023
6376d7d
wip method to runOnValidFollower
danieldoglas Aug 4, 2023
3a419c7
adding logic to escalate to other followers
danieldoglas Aug 4, 2023
9d37cfa
Merge branch 'main' into dsilva_addingVersionMismatchHandling
danieldoglas Aug 4, 2023
30a7aff
adding peer call
danieldoglas Aug 4, 2023
125ba22
Merge branch 'main' into dsilva_addingVersionMismatchHandling
danieldoglas Sep 7, 2023
4c8cacb
adding tests
danieldoglas Sep 11, 2023
9c70500
making tester a class variable, including new table to test insert
danieldoglas Sep 11, 2023
943d7d8
deploying escalation tests
danieldoglas Sep 11, 2023
90d4a27
fixing escalation path and tests
danieldoglas Sep 11, 2023
e118417
fix assert
danieldoglas Sep 11, 2023
f2ce01a
renaming property from nodeName to nodesPath
danieldoglas Sep 11, 2023
37f09dd
changing tests from 6 to 5 servers
danieldoglas Sep 11, 2023
b77d761
addressing comments
danieldoglas Sep 11, 2023
efc44a0
removing broken code
danieldoglas Sep 11, 2023
5b02591
addressing comments, tests don't load test plugin locally
danieldoglas Sep 12, 2023
a39254b
tests are now passing
danieldoglas Sep 12, 2023
dc0732d
removing unecessary log
danieldoglas Sep 12, 2023
2801428
removing file
danieldoglas Sep 12, 2023
9bbfe68
fixing log
danieldoglas Sep 12, 2023
d7589c6
Merge branches 'dsilva_addingVersionMismatchHandling' and 'dsilva_add…
danieldoglas Sep 12, 2023
d613c06
fixing comment
danieldoglas Sep 17, 2023
828f739
Merge pull request #1560 from Expensify/dsilva_addingVersionMismatchH…
tylerkaraszewski Sep 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions BedrockServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -772,17 +772,21 @@ void BedrockServer::runCommand(unique_ptr<BedrockCommand>&& _command, bool isBlo
// our state right before we commit.
SQLiteNodeState state = _replicationState.load();

// If we're following, we will automatically escalate any command that's not already complete (complete
// commands are likely already returned from leader with legacy escalation) and is marked as
// `escalateImmediately` (which lets them skip the queue, which is particularly useful if they're waiting
// for a previous commit to be delivered to this follower), OR if we're on a different version from leader.
if (state == SQLiteNodeState::FOLLOWING && (_version != _leaderVersion.load() || command->escalateImmediately) && !command->complete) {
// If we're following, we will automatically escalate any command that's:
// 1. Not already complete (complete commands are likely already returned from leader with legacy escalation)
// and is marked as `escalateImmediately` (which lets them skip the queue, which is particularly useful if they're waiting
// for a previous commit to be delivered to this follower);
// 2. Any commands if the current version of the code is not the same one as leader is executing.
if (state == SQLiteNodeState::FOLLOWING && !command->complete && (command->escalateImmediately || _version != _leaderVersion.load())) {
auto _clusterMessengerCopy = _clusterMessenger;
if (_clusterMessengerCopy && _clusterMessengerCopy->runOnLeader(*command)) {
string escalatedTo = "";
if (command->escalateImmediately && _clusterMessengerCopy && _clusterMessengerCopy->runOnPeer(*command, true)) {
// command->complete is now true for this command. It will get handled a few lines below.
SINFO("Immediately escalated " << command->request.methodLine << " to leader.");
} else if (_version != _leaderVersion.load() && _clusterMessengerCopy && _clusterMessengerCopy->runOnPeer(*command, false)) {
SINFO("Escalated " << command->request.methodLine << " to follower peer.");
} else {
SINFO("Couldn't immediately escalate command " << command->request.methodLine << " to leader, queuing normally.");
SINFO("Couldn't escalate command " << command->request.methodLine << " to " << (command->escalateImmediately ? "leader" : "follower peer") << ", queuing it again.");
_commandQueue.push(move(command));
return;
}
Expand Down Expand Up @@ -966,7 +970,7 @@ void BedrockServer::runCommand(unique_ptr<BedrockCommand>&& _command, bool isBlo
} else if (state == SQLiteNodeState::STANDINGDOWN) {
SINFO("Need to process command " << command->request.methodLine << " but STANDINGDOWN, moving to _standDownQueue.");
_standDownQueue.push(move(command));
} else if (_clusterMessengerCopy && _clusterMessengerCopy->runOnLeader(*command)) {
} else if (_clusterMessengerCopy && _clusterMessengerCopy->runOnPeer(*command, true)) {
SINFO("Escalated " << command->request.methodLine << " to leader and complete, responding.");
_reply(command);
} else {
Expand Down
21 changes: 11 additions & 10 deletions sqlitecluster/SQLiteClusterMessenger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,25 +241,26 @@ unique_ptr<SHTTPSManager::Socket> SQLiteClusterMessenger::_getSocketForAddress(s
return s;
}

bool SQLiteClusterMessenger::runOnLeader(BedrockCommand& command) {
bool SQLiteClusterMessenger::runOnPeer(BedrockCommand& command, bool runOnLeader) {
auto start = chrono::steady_clock::now();
bool sent = false;
string peerType = runOnLeader ? "leader" : "follower peer";
size_t sleepsDueToFailures = 0;
string leaderAddress;
string peerAddress;

unique_ptr<SHTTPSManager::Socket> s;
while (chrono::steady_clock::now() < (start + 5s) && !sent) {
leaderAddress = _node->leaderCommandAddress();
if (leaderAddress.empty()) {
peerAddress = runOnLeader ? _node->leaderCommandAddress() : _node->getEligibleFollowerForForwardingAddress();
if (peerAddress.empty()) {
// If there's no leader, it's possible we're supposed to be the leader. In this case, we can exit early.
auto myState = _node->getState();
if (myState == SQLiteNodeState::LEADING || myState == SQLiteNodeState::STANDINGUP) {
if (runOnLeader && (myState == SQLiteNodeState::LEADING || myState == SQLiteNodeState::STANDINGUP)) {
SINFO("[HTTPESC] I'm the leader now! Exiting early.");
return false;
}

// Otherwise, just wait until there is a leader.
SINFO("[HTTPESC] No leader address.");
SINFO("[HTTPESC] No " + peerType + " address.");
sleepsDueToFailures++;
usleep(500'000);
continue;
Expand All @@ -268,7 +269,7 @@ bool SQLiteClusterMessenger::runOnLeader(BedrockCommand& command) {
// Start our escalation timing
command.escalationTimeUS = STimeNow();

s = _getSocketForAddress(leaderAddress);
s = _getSocketForAddress(peerAddress);
if (!s) {
command.escalationTimeUS = STimeNow() - command.escalationTimeUS;
return false;
Expand All @@ -282,8 +283,8 @@ bool SQLiteClusterMessenger::runOnLeader(BedrockCommand& command) {
}

// If we fell out of the loop simply because we did not get a leader address in time, we can return false and retry later.
if (leaderAddress.empty()) {
SINFO("[HTTPESC] Could not get leader address in 5s, will retry later.");
if (peerAddress.empty()) {
SINFO("[HTTPESC] Could not get " + peerType + " address in 5s, will retry later.");
return false;
}

Expand All @@ -301,7 +302,7 @@ bool SQLiteClusterMessenger::runOnLeader(BedrockCommand& command) {

// Since everything went fine with this command, we can save its socket, unless it's being closed.
if (!commandWillCloseSocket(command)) {
_socketPool.returnSocket(move(s), leaderAddress);
_socketPool.returnSocket(move(s), peerAddress);
}

return true;
Expand Down
8 changes: 4 additions & 4 deletions sqlitecluster/SQLiteClusterMessenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ class SQLiteClusterMessenger {

SQLiteClusterMessenger(const shared_ptr<const SQLiteNode> node);

// Attempts to make a TCP connection to the leader, and run the given command there, setting the appropriate
// response from leader in the command, and marking it as complete if possible.
// Attempts to make a TCP connection to a peer, that could be the leader or not, and run the given command there,
// setting the appropriate response from the peer in the command, and marking it as complete if possible.
// Returns command->complete at the end of the function, this is true if the command was successfully completed on
// leader, or if a fatal error occurred. This will be false if the command can be re-tried later (for instance, if
// the peer, or if a fatal error occurred. This will be false if the command can be re-tried later (for instance, if
// no connection to leader could be made).
bool runOnLeader(BedrockCommand& command);
bool runOnPeer(BedrockCommand& command, bool runOnLeader);

// Attempts to run command on every peer. This is done in threads, so the
// order in which the peers run the command is not deterministic. Returns a
Expand Down
19 changes: 19 additions & 0 deletions sqlitecluster/SQLiteNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,25 @@ list<STable> SQLiteNode::getPeerInfo() const {
return peerData;
}

string SQLiteNode::getEligibleFollowerForForwardingAddress() const {
vector<string> validPeers;
const string leaderVersion = getLeaderVersion();
if (leaderVersion.empty()) {
return "";
}
for (SQLitePeer* peer : _peerList) {
// We want only nodes that are using the same version as leader and are currently followers
if (peer->version.load() != leaderVersion || peer->state.load() != SQLiteNodeState::FOLLOWING) {
continue;
}
validPeers.push_back(peer->name);
}
if (validPeers.empty()) {
return "";
}
return getPeerByName(validPeers[rand() % validPeers.size()])->commandAddress.load();
}

// --------------------------------------------------------------------------
// State Machine
// --------------------------------------------------------------------------
Expand Down
3 changes: 3 additions & 0 deletions sqlitecluster/SQLiteNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ class SQLiteNode : public STCPManager {
// Can block.
list<STable> getPeerInfo() const;

// Gets a random follower peer that is in the same version as leader.
string getEligibleFollowerForForwardingAddress() const;

// Returns our current priority.
// Does not block.
int getPriority() const;
Expand Down
12 changes: 11 additions & 1 deletion test/clustertest/testplugin/TestPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ unique_ptr<BedrockCommand> BedrockPlugin_TestPlugin::getCommand(SQLiteCommand&&
"prepeekcommand",
"postprocesscommand",
"prepeekpostprocesscommand",
"preparehandler"
"preparehandler",
"testquery"
};
for (auto& cmdName : supportedCommands) {
if (SStartsWith(baseCommand.request.methodLine, cmdName)) {
Expand Down Expand Up @@ -342,6 +343,13 @@ bool TestPluginCommand::peek(SQLite& db) {
string statString = "Peeking testescalate (" + serverState + ")\n";
fileAppend(request["tempFile"], statString);
return false;
} else if (request.methodLine == "testquery") {
response["nodeRequestWasExecuted"] = plugin().server.args["-nodeName"];
if (SStartsWith(request["Query"], "SELECT")) {
db.read(request["Query"]);
return true;
}
return false;
}

return false;
Expand Down Expand Up @@ -497,6 +505,8 @@ void TestPluginCommand::process(SQLite& db) {
jsonContent["cole"] = "hello";
db.write("INSERT INTO test (id, value) VALUES (999999888, 'this is a test');");
return;
} else if (request.methodLine == "testquery") {
db.write(request["Query"]);
}
}

Expand Down
69 changes: 47 additions & 22 deletions test/clustertest/tests/VersionMismatchTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,60 @@

struct VersionMismatchTest : tpunit::TestFixture {
VersionMismatchTest()
: tpunit::TestFixture("VersionMismatch", TEST(VersionMismatchTest::test)) { }
: tpunit::TestFixture("VersionMismatch",
BEFORE_CLASS(VersionMismatchTest::setup),
TEST(VersionMismatchTest::testReadEscalation),
TEST(VersionMismatchTest::testWriteEscalation),
AFTER_CLASS(VersionMismatchTest::setup)) { }

void test()
{
// Create a cluster.
BedrockClusterTester tester;
BedrockClusterTester* tester = nullptr;

void setup() {
tester = new BedrockClusterTester(ClusterSize::FIVE_NODE_CLUSTER, {"CREATE TABLE test (id INTEGER NOT NULL PRIMARY KEY, value TEXT NOT NULL)"});
// Restart one of the followers on a new version.
tester.getTester(2).stopServer();
tester.getTester(2).updateArgs({{"-versionOverride", "ABCDE"}});
tester.getTester(2).startServer();
tester->getTester(2).stopServer();
tester->getTester(2).updateArgs({{"-versionOverride", "ABCDE"}});
tester->getTester(2).startServer();

// Restart one of the followers on a new version.
tester->getTester(4).stopServer();
tester->getTester(4).updateArgs({{"-versionOverride", "ABCDE"}});
tester->getTester(4).startServer();
}
void destroy() {
delete tester;
}
void testReadEscalation()
{
// Send a query to all three and make sure the version-mismatched one escalates.
// Can do them all in parallel so might as well.
list<thread> threads;
for (size_t i = 0; i < 3; i++) {
threads.emplace_back([this, i, &tester](){
SData command("Query");
command["Query"] = "SELECT 1;";
auto result = tester.getTester(i).executeWaitMultipleData({command})[0];
for (size_t i = 0; i < 5; i++) {
SData command("testquery");
command["Query"] = "SELECT 1;";
auto result = tester->getTester(i).executeWaitMultipleData({command})[0];

// For read commands sent directly to leader, or to a follower on the same version as leader, there
// should be no upstream times. However, on a follower on a different version to leader, it should
// escalates even read commands.
ASSERT_EQUAL(result.isSet("upstreamPeekTime"), i == 2);
});
// For read commands sent directly to leader, or to a follower on the same version as leader, there
// we don't care about how they are executed. However, on a follower on a different version to leader,
// it should escalates even read commands to follower peers.
ASSERT_TRUE(result["nodeRequestWasExecuted"].length() > 0);
if (i == 2 || i == 4) {
// Confirm it didn't execute in leader
ASSERT_NOT_EQUAL(result["nodeRequestWasExecuted"], "cluster_node_0");

// Confirm it didn't execute in the server with version mismatch
ASSERT_NOT_EQUAL(result["nodeRequestWasExecuted"], "cluster_node_" + to_string(i));
}
}
for (auto& t : threads) {
t.join();
}
void testWriteEscalation()
{
for (int64_t i = 0; i < 5; i++) {
SData command("testquery");
command["Query"] = "INSERT INTO test VALUES(" + SQ(i) + ", " + SQ("val") + ");";
auto result = tester->getTester(i).executeWaitMultipleData({command})[0];

// For read commands sent directly to leader, or to a follower on the same version as leader, the one
// that will final execute the request should always be the leader
ASSERT_EQUAL(result["nodeRequestWasExecuted"], "cluster_node_0");
}
}
} __VersionMismatchTest;