From 7542620b19f8272521de2f8ffcdc3c3f2268846c Mon Sep 17 00:00:00 2001 From: Alex Upshaw Date: Sat, 4 Jun 2022 01:15:01 +0000 Subject: [PATCH] Fix race conditions in thin_replica_server_test. This commit is intended to fix a couple race conditions previously observed in the thin_replica_server_test unit test suite in cases with names of the form SubscribeTo*EventGroup*WithGapTwoClients. The root cause of these race conditions was determined to be that these cases make calls to the test suite's addMoreEventGroups helper function in such a way that the addition of these event groups races with the TestStateMachine helper object's logic for detecting that all event group updates of interest to the test case have been streamed in order to end the subscription. This commit makes a couple changes to prevent the TestStateMachine from incorrectly detecting the end of a subscription that may be running concurrently with addMoreEventGroups calls. First, addMoreEventGroups's existing behavior included alerting the TestStateMachine that there are no more event groups to add with a call to TestStateMachine::toggle_more_event_groups_to_add(false). This commit adds a new optional bool parameter even_more_event_groups_to_add_after_these to addMoreEventGroups in order to allow test cases to avoid incorrectly notifying the TestStateMachine that there are no more event groups to add in cases where the test case will be making further calls to addMoreEventGroups after the current one. Second, this commit shifts the responsibility for calling TestStateMachine::set_expected_last_event_group_to_send from addMoreEventGroups to the actual test cases that call it so that the expected last event group to be sent can be correctly set strictly before the Thin Replica subscription begins running in order to avoid races between the final expected event group ID getting set correctly and the TestStateMachine checking updates the ThinReplicaImpl writes against this value. Additionally, this commit reactivates the affected test cases, as they were previously deactivated to avoid blocking other changes to improve Concord-BFT's and its CI's stability. --- .../test/thin_replica_server_test.cpp | 193 ++++++++++++------ 1 file changed, 133 insertions(+), 60 deletions(-) diff --git a/thin-replica-server/test/thin_replica_server_test.cpp b/thin-replica-server/test/thin_replica_server_test.cpp index 19ce3f08a5..a534c598fb 100644 --- a/thin-replica-server/test/thin_replica_server_test.cpp +++ b/thin-replica-server/test/thin_replica_server_test.cpp @@ -1,6 +1,6 @@ // Concord // -// Copyright (c) 2021 VMware, Inc. All Rights Reserved. +// Copyright (c) 2021-2022 VMware, Inc. All Rights Reserved. // // This product is licensed to you under the Apache 2.0 license (the "License"). // You may not use this product except in compliance with the Apache 2.0 @@ -551,7 +551,8 @@ class TestStateMachine { if (not is_event_group_sm && live_buffer_) { if ((current_block_to_send_ == last_block_to_send_ - 1) && not more_blocks_to_add_) { on_finished_dropping_blocks(); - } else if (live_buffer_->Full() || live_buffer_->oldestBlockId() > (storage_.getLastBlockId() + 1)) { + } else if (live_buffer_->Full() || + (!(live_buffer_->Empty()) && (live_buffer_->oldestBlockId() > (storage_.getLastBlockId() + 1)))) { // There is a gap that is supposed to be filled with blocks from the // storage on_sync_with_kvb_finished(); @@ -574,7 +575,9 @@ class TestStateMachine { if ((current_event_group_to_send_ + storage_.total_egs_for_other_clients == (last_event_group_to_send_ - 1)) && not more_event_groups_to_add_) { on_finished_dropping_event_groups(); - } else if (live_buffer_->Full() || live_buffer_->oldestEventGroupId() > (storage_.getLastEventGroupId() + 1)) { + } else if (live_buffer_->Full() || + (!(live_buffer_->EmptyEventGroupQueue()) && + live_buffer_->oldestEventGroupId() > (storage_.getLastEventGroupId() + 1))) { // There is a gap that is supposed to be filled with event groups from the // storage on_sync_with_event_groups_finished(); @@ -669,6 +672,11 @@ class TestSubBufferList : public concord::thin_replica::SubBufferList { } }; +// Note that, if a test case uses addMoreEventGroups, it should call set_expected_last_event_group_to_send on the +// case's TestStateMachine instance with a parameter of ("end" + 1), where "end" is the highest value that the test +// case calls addMoreEventGroups with. This is necessary to enable the TestStateMachine to detect where the test case +// should stop the subscription. Furthermore, this set_expected_last_event_group_to_send call must be completed before +// the test casee begins the thin replica subscription to avoid race conditions with it. template void addMoreEventGroups(const EventGroupId start, const EventGroupId end, @@ -676,17 +684,17 @@ void addMoreEventGroups(const EventGroupId start, FakeStorage& storage, TestStateMachine& state_machine, TestSubBufferList& buffer, - const std::string& trid = kClientId1) { + const std::string& trid = kClientId1, + bool even_more_event_groups_to_add_after_these = false) { auto more_live_updates = generateEventGroupMap(start, end, type, trid); storage.addEventGroups(more_live_updates); storage.updateEventGroupStorageMaps(more_live_updates); - state_machine.toggle_more_event_groups_to_add(false); + state_machine.toggle_more_event_groups_to_add(even_more_event_groups_to_add_after_these); for (const auto& [key, val] : more_live_updates) { EventGroupId eg_id(concordUtils::fromBigEndianBuffer(key.data())); concord::thin_replica::SubEventGroupUpdate update{eg_id, val}; buffer.updateEventGroupSubBuffers(update); } - state_machine.set_expected_last_event_group_to_send(end + 1); } TEST(thin_replica_server_test, SubscribeToFirstBlockNotInStorage) { @@ -907,6 +915,7 @@ TEST(thin_replica_server_test, SubscribeToPrivateEventGroupUpdatesAlreadySynced) EXPECT_EQ(storage.getLastEventGroupId(), kLastEventGroupId + 5); TestStateMachine state_machine{storage, storage_egs, 1, true}; state_machine.current_eg_type = EventGroupType::PrivateEventGroupsOnly; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 7); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -958,6 +967,7 @@ TEST(thin_replica_server_test, SubscribeToPrivateEventGroupUpdatesAlreadySyncedT EXPECT_EQ(storage.getLastEventGroupId(), kLastEventGroupId + 5); TestStateMachine state_machine{storage, storage_egs, 1, true}; state_machine.current_eg_type = EventGroupType::PrivateEventGroupsOnly; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 8); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -988,7 +998,8 @@ TEST(thin_replica_server_test, SubscribeToPrivateEventGroupUpdatesAlreadySyncedT storage, state_machine, buffer, - kClientId2); + kClientId2, + true); // add 1 more event group for kClientId1 addMoreEventGroups(kLastEventGroupId + 7, kLastEventGroupId + 7, @@ -996,7 +1007,8 @@ TEST(thin_replica_server_test, SubscribeToPrivateEventGroupUpdatesAlreadySyncedT storage, state_machine, buffer, - kClientId1); + kClientId1, + false); if (status != std::future_status::ready) { out_stream.wait(); } @@ -1012,6 +1024,7 @@ TEST(thin_replica_server_test, SubscribeToPublicEventGroupUpdatesAlreadySynced) EXPECT_EQ(storage.getLastEventGroupId(), kLastEventGroupId + 5); TestStateMachine state_machine{storage, storage_egs, 1, true}; state_machine.current_eg_type = EventGroupType::PublicEventGroupsOnly; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 7); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -1062,6 +1075,7 @@ TEST(thin_replica_server_test, SubscribeToPublicEventGroupUpdatesAlreadySyncedTw EXPECT_EQ(storage.getLastEventGroupId(), kLastEventGroupId + 5); TestStateMachine state_machine{storage, storage_egs, 1, true}; state_machine.current_eg_type = EventGroupType::PublicEventGroupsOnly; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 8); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -1092,14 +1106,17 @@ TEST(thin_replica_server_test, SubscribeToPublicEventGroupUpdatesAlreadySyncedTw storage, state_machine, buffer, - kClientId2); + kClientId2, + true); // add 1 more public event group addMoreEventGroups(kLastEventGroupId + 7, kLastEventGroupId + 7, EventGroupType::PublicEventGroupsOnly, storage, state_machine, - buffer); + buffer, + kClientId1, + false); if (status != std::future_status::ready) { out_stream.wait(); } @@ -1116,6 +1133,7 @@ TEST(thin_replica_server_test, SubscribeToPublicAndPrivateEventGroupUpdatesAlrea EXPECT_EQ(storage.getLastEventGroupId(), kLastEventGroupId + 5); TestStateMachine state_machine{storage, storage_egs, 1, true}; state_machine.current_eg_type = EventGroupType::PublicAndPrivateEventGroups; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 7); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -1167,6 +1185,7 @@ TEST(thin_replica_server_test, SubscribeToPublicAndPrivateEventGroupUpdatesAlrea EXPECT_EQ(storage.getLastEventGroupId(), kLastEventGroupId + 5); TestStateMachine state_machine{storage, storage_egs, 1, true}; state_machine.current_eg_type = EventGroupType::PublicAndPrivateEventGroups; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 8); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -1197,7 +1216,8 @@ TEST(thin_replica_server_test, SubscribeToPublicAndPrivateEventGroupUpdatesAlrea storage, state_machine, buffer, - kClientId2); + kClientId2, + true); // add 1 more event group for kClientId1 addMoreEventGroups(kLastEventGroupId + 7, kLastEventGroupId + 7, @@ -1205,7 +1225,8 @@ TEST(thin_replica_server_test, SubscribeToPublicAndPrivateEventGroupUpdatesAlrea storage, state_machine, buffer, - kClientId1); + kClientId1, + false); if (status != std::future_status::ready) { out_stream.wait(); } @@ -1251,6 +1272,7 @@ TEST(thin_replica_server_test, SubscribeToPrivateEventGroupUpdatesWithGap) { EXPECT_EQ(storage.getLastEventGroupId(), kLastEventGroupId + 5); TestStateMachine state_machine{storage, storage_egs, 1, true}; state_machine.current_eg_type = EventGroupType::PrivateEventGroupsOnly; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 9); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -1296,9 +1318,7 @@ TEST(thin_replica_server_test, SubscribeToPrivateEventGroupUpdatesWithGap) { EXPECT_EQ(state_machine.numEventGroupsReceived(), total_event_groups); } -// Disabled due to instability in order to unblock master for other fixes to improve its stability. -// TODO (Alex): Develop a propper fix and re-enable this test. -TEST(thin_replica_server_test, DISABLED_SubscribeToPrivateEventGroupUpdatesWithGapTwoClients) { +TEST(thin_replica_server_test, SubscribeToPrivateEventGroupUpdatesWithGapTwoClients) { // Initialize storage and live update queue FakeStorage storage(generateEventGroupMap(1, kLastEventGroupId, EventGroupType::PrivateEventGroupsOnly, kClientId1)); auto client2_egs = generateEventGroupMap( @@ -1309,6 +1329,7 @@ TEST(thin_replica_server_test, DISABLED_SubscribeToPrivateEventGroupUpdatesWithG EXPECT_EQ(storage.getLastEventGroupId(), kLastEventGroupId + 5); TestStateMachine state_machine{storage, storage_egs, 1, true}; state_machine.current_eg_type = EventGroupType::PrivateEventGroupsOnly; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 14); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -1345,7 +1366,8 @@ TEST(thin_replica_server_test, DISABLED_SubscribeToPrivateEventGroupUpdatesWithG storage, state_machine, buffer, - kClientId2); + kClientId2, + true); // add 2 more event groups for kClientId1 addMoreEventGroups(kLastEventGroupId + 9, kLastEventGroupId + 10, @@ -1353,7 +1375,8 @@ TEST(thin_replica_server_test, DISABLED_SubscribeToPrivateEventGroupUpdatesWithG storage, state_machine, buffer, - kClientId1); + kClientId1, + true); // add 2 more event groups for kClientId2 addMoreEventGroups(kLastEventGroupId + 11, kLastEventGroupId + 12, @@ -1361,7 +1384,8 @@ TEST(thin_replica_server_test, DISABLED_SubscribeToPrivateEventGroupUpdatesWithG storage, state_machine, buffer, - kClientId2); + kClientId2, + true); // add 1 more event group for kClientId1 addMoreEventGroups(kLastEventGroupId + 13, kLastEventGroupId + 13, @@ -1369,7 +1393,8 @@ TEST(thin_replica_server_test, DISABLED_SubscribeToPrivateEventGroupUpdatesWithG storage, state_machine, buffer, - kClientId1); + kClientId1, + false); if (status != std::future_status::ready) { out_stream.wait(); } @@ -1385,6 +1410,7 @@ TEST(thin_replica_server_test, SubscribeToPublicEventGroupUpdatesWithGap) { EXPECT_EQ(storage.getLastEventGroupId(), kLastEventGroupId + 5); TestStateMachine state_machine{storage, storage_egs, 1, true}; state_machine.current_eg_type = EventGroupType::PublicEventGroupsOnly; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 9); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -1429,9 +1455,7 @@ TEST(thin_replica_server_test, SubscribeToPublicEventGroupUpdatesWithGap) { EXPECT_EQ(state_machine.numEventGroupsReceived(), total_event_groups); } -// Disabled due to instability in order to unblock master for other fixes to improve its stability. -// TODO (Alex): Develop a propper fix and re-enable this test. -TEST(thin_replica_server_test, DISABLED_SubscribeToPublicEventGroupUpdatesWithGapTwoClients) { +TEST(thin_replica_server_test, SubscribeToPublicEventGroupUpdatesWithGapTwoClients) { // Initialize storage and live update queue with public event groups and private event groups for kClientId2 // Initialize storage and live update queue FakeStorage storage(generateEventGroupMap(1, kLastEventGroupId, EventGroupType::PublicEventGroupsOnly)); @@ -1443,6 +1467,7 @@ TEST(thin_replica_server_test, DISABLED_SubscribeToPublicEventGroupUpdatesWithGa EXPECT_EQ(storage.getLastEventGroupId(), kLastEventGroupId + 5); TestStateMachine state_machine{storage, storage_egs, 1, true}; state_machine.current_eg_type = EventGroupType::PublicEventGroupsOnly; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 14); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -1478,14 +1503,17 @@ TEST(thin_replica_server_test, DISABLED_SubscribeToPublicEventGroupUpdatesWithGa storage, state_machine, buffer, - kClientId2); + kClientId2, + true); // add 2 more public event groups addMoreEventGroups(kLastEventGroupId + 9, kLastEventGroupId + 10, EventGroupType::PublicEventGroupsOnly, storage, state_machine, - buffer); + buffer, + kClientId1, + true); // add 2 more event groups for kClientId2 addMoreEventGroups(kLastEventGroupId + 11, kLastEventGroupId + 12, @@ -1493,14 +1521,17 @@ TEST(thin_replica_server_test, DISABLED_SubscribeToPublicEventGroupUpdatesWithGa storage, state_machine, buffer, - kClientId2); + kClientId2, + true); // add 2 more public event groups addMoreEventGroups(kLastEventGroupId + 13, kLastEventGroupId + 13, EventGroupType::PublicEventGroupsOnly, storage, state_machine, - buffer); + buffer, + kClientId1, + false); if (status != std::future_status::ready) { out_stream.wait(); } @@ -1516,6 +1547,7 @@ TEST(thin_replica_server_test, SubscribeToPublicAndPrivateEventGroupUpdatesWithG EXPECT_EQ(storage.getLastEventGroupId(), kLastEventGroupId + 5); TestStateMachine state_machine{storage, storage_egs, 1, true}; state_machine.current_eg_type = EventGroupType::PublicAndPrivateEventGroups; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 9); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -1561,9 +1593,7 @@ TEST(thin_replica_server_test, SubscribeToPublicAndPrivateEventGroupUpdatesWithG EXPECT_EQ(state_machine.numEventGroupsReceived(), total_event_groups); } -// Disabled due to instability in order to unblock master for other fixes to improve its stability. -// TODO (Alex): Develop a propper fix and re-enable this test. -TEST(thin_replica_server_test, DISABLED_SubscribeToPublicAndPrivateEventGroupUpdatesWithGapTwoClients) { +TEST(thin_replica_server_test, SubscribeToPublicAndPrivateEventGroupUpdatesWithGapTwoClients) { // Initialize storage and live update queue FakeStorage storage( generateEventGroupMap(1, kLastEventGroupId, EventGroupType::PublicAndPrivateEventGroups, kClientId1)); @@ -1575,6 +1605,7 @@ TEST(thin_replica_server_test, DISABLED_SubscribeToPublicAndPrivateEventGroupUpd EXPECT_EQ(storage.getLastEventGroupId(), kLastEventGroupId + 5); TestStateMachine state_machine{storage, storage_egs, 1, true}; state_machine.current_eg_type = EventGroupType::PublicAndPrivateEventGroups; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 14); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -1611,7 +1642,8 @@ TEST(thin_replica_server_test, DISABLED_SubscribeToPublicAndPrivateEventGroupUpd storage, state_machine, buffer, - kClientId2); + kClientId2, + true); // add 2 more public and private event groups addMoreEventGroups(kLastEventGroupId + 9, kLastEventGroupId + 10, @@ -1619,7 +1651,8 @@ TEST(thin_replica_server_test, DISABLED_SubscribeToPublicAndPrivateEventGroupUpd storage, state_machine, buffer, - kClientId1); + kClientId1, + true); // add 2 more private event groups addMoreEventGroups(kLastEventGroupId + 11, kLastEventGroupId + 12, @@ -1627,7 +1660,8 @@ TEST(thin_replica_server_test, DISABLED_SubscribeToPublicAndPrivateEventGroupUpd storage, state_machine, buffer, - kClientId2); + kClientId2, + true); // add 1 more public and/or private event group addMoreEventGroups(kLastEventGroupId + 13, kLastEventGroupId + 13, @@ -1635,7 +1669,8 @@ TEST(thin_replica_server_test, DISABLED_SubscribeToPublicAndPrivateEventGroupUpd storage, state_machine, buffer, - kClientId1); + kClientId1, + false); if (status != std::future_status::ready) { out_stream.wait(); @@ -1680,6 +1715,7 @@ TEST(thin_replica_server_test, SubscribeToPrivateEventGroupUpdatesWithGapFromThe auto storage_egs = storage.getEventGroups(); EXPECT_EQ(storage.getLastEventGroupId(), kLastEventGroupId); TestStateMachine state_machine{storage, storage_egs, 3, true}; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 6); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -1731,6 +1767,7 @@ TEST(thin_replica_server_test, SubscribeToPublicEventGroupUpdatesWithGapFromTheM auto storage_egs = storage.getEventGroups(); EXPECT_EQ(storage.getLastEventGroupId(), kLastEventGroupId); TestStateMachine state_machine{storage, storage_egs, 3, true}; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 6); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -1781,6 +1818,7 @@ TEST(thin_replica_server_test, SubscribeToPublicAndPrivateEventGroupUpdatesWithG auto storage_egs = storage.getEventGroups(); EXPECT_EQ(storage.getLastEventGroupId(), kLastEventGroupId); TestStateMachine state_machine{storage, storage_egs, 3, true}; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 6); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -1862,6 +1900,7 @@ TEST(thin_replica_server_test, SubscribeToPrivateEventGroupUpdateHashesAlreadySy EXPECT_EQ(storage.getLastEventGroupId(), kLastEventGroupId + 5); TestStateMachine state_machine{storage, storage_egs, 1, true}; state_machine.current_eg_type = EventGroupType::PrivateEventGroupsOnly; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 7); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -1912,6 +1951,7 @@ TEST(thin_replica_server_test, SubscribeToPrivateEventGroupUpdateHashesAlreadySy EXPECT_EQ(storage.getLastEventGroupId(), kLastEventGroupId + 5); TestStateMachine state_machine{storage, storage_egs, 1, true}; state_machine.current_eg_type = EventGroupType::PrivateEventGroupsOnly; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 8); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -1942,7 +1982,8 @@ TEST(thin_replica_server_test, SubscribeToPrivateEventGroupUpdateHashesAlreadySy storage, state_machine, buffer, - kClientId2); + kClientId2, + true); // add 1 more event group for kClientId1 addMoreEventGroups(kLastEventGroupId + 7, kLastEventGroupId + 7, @@ -1950,7 +1991,8 @@ TEST(thin_replica_server_test, SubscribeToPrivateEventGroupUpdateHashesAlreadySy storage, state_machine, buffer, - kClientId1); + kClientId1, + false); if (status != std::future_status::ready) { out_stream.wait(); } @@ -1966,6 +2008,7 @@ TEST(thin_replica_server_test, SubscribeToPublicEventGroupUpdateHashesAlreadySyn EXPECT_EQ(storage.getLastEventGroupId(), kLastEventGroupId + 5); TestStateMachine state_machine{storage, storage_egs, 1, true}; state_machine.current_eg_type = EventGroupType::PublicEventGroupsOnly; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 7); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -2017,6 +2060,7 @@ TEST(thin_replica_server_test, SubscribeToPublicEventGroupUpdateHashesAlreadySyn EXPECT_EQ(storage.getLastEventGroupId(), kLastEventGroupId + 5); TestStateMachine state_machine{storage, storage_egs, 1, true}; state_machine.current_eg_type = EventGroupType::PublicEventGroupsOnly; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 8); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -2047,7 +2091,8 @@ TEST(thin_replica_server_test, SubscribeToPublicEventGroupUpdateHashesAlreadySyn storage, state_machine, buffer, - kClientId2); + kClientId2, + true); // add 1 more public event group addMoreEventGroups(kLastEventGroupId + 7, @@ -2055,7 +2100,9 @@ TEST(thin_replica_server_test, SubscribeToPublicEventGroupUpdateHashesAlreadySyn EventGroupType::PublicEventGroupsOnly, storage, state_machine, - buffer); + buffer, + kClientId1, + false); if (status != std::future_status::ready) { out_stream.wait(); @@ -2072,6 +2119,7 @@ TEST(thin_replica_server_test, SubscribeToPublicAndPrivateEventGroupUpdateHashes EXPECT_EQ(storage.getLastEventGroupId(), kLastEventGroupId + 5); TestStateMachine state_machine{storage, storage_egs, 1, true}; state_machine.current_eg_type = EventGroupType::PublicAndPrivateEventGroups; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 7); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -2123,6 +2171,7 @@ TEST(thin_replica_server_test, SubscribeToPublicAndPrivateEventGroupUpdateHashes EXPECT_EQ(storage.getLastEventGroupId(), kLastEventGroupId + 5); TestStateMachine state_machine{storage, storage_egs, 1, true}; state_machine.current_eg_type = EventGroupType::PublicAndPrivateEventGroups; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 8); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -2153,7 +2202,8 @@ TEST(thin_replica_server_test, SubscribeToPublicAndPrivateEventGroupUpdateHashes storage, state_machine, buffer, - kClientId2); + kClientId2, + true); // add 1 more event group for kClientId1 addMoreEventGroups(kLastEventGroupId + 7, kLastEventGroupId + 7, @@ -2161,7 +2211,8 @@ TEST(thin_replica_server_test, SubscribeToPublicAndPrivateEventGroupUpdateHashes storage, state_machine, buffer, - kClientId1); + kClientId1, + false); if (status != std::future_status::ready) { out_stream.wait(); } @@ -2206,6 +2257,7 @@ TEST(thin_replica_server_test, SubscribeToPrivateEventGroupUpdateHashesWithGap) EXPECT_EQ(storage.getLastEventGroupId(), kLastEventGroupId + 5); TestStateMachine state_machine{storage, storage_egs, 1, true}; state_machine.current_eg_type = EventGroupType::PrivateEventGroupsOnly; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 9); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -2251,9 +2303,7 @@ TEST(thin_replica_server_test, SubscribeToPrivateEventGroupUpdateHashesWithGap) EXPECT_EQ(state_machine.numEventGroupsReceived(), total_event_groups); } -// Disabled due to instability in order to unblock master for other fixes to improve its stability. -// TODO (Alex): Develop a propper fix and re-enable this test. -TEST(thin_replica_server_test, DISABLED_SubscribeToPrivateEventGroupUpdateHashesWithGapTwoClients) { +TEST(thin_replica_server_test, SubscribeToPrivateEventGroupUpdateHashesWithGapTwoClients) { // Initialize storage and live update queue FakeStorage storage(generateEventGroupMap(1, kLastEventGroupId, EventGroupType::PrivateEventGroupsOnly, kClientId1)); auto client2_egs = generateEventGroupMap( @@ -2264,6 +2314,7 @@ TEST(thin_replica_server_test, DISABLED_SubscribeToPrivateEventGroupUpdateHashes EXPECT_EQ(storage.getLastEventGroupId(), kLastEventGroupId + 5); TestStateMachine state_machine{storage, storage_egs, 1, true}; state_machine.current_eg_type = EventGroupType::PrivateEventGroupsOnly; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 14); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -2300,7 +2351,8 @@ TEST(thin_replica_server_test, DISABLED_SubscribeToPrivateEventGroupUpdateHashes storage, state_machine, buffer, - kClientId2); + kClientId2, + true); // add 2 more event groups for kClientId1 addMoreEventGroups(kLastEventGroupId + 9, kLastEventGroupId + 10, @@ -2308,7 +2360,8 @@ TEST(thin_replica_server_test, DISABLED_SubscribeToPrivateEventGroupUpdateHashes storage, state_machine, buffer, - kClientId1); + kClientId1, + true); // add 2 more event groups for kClientId2 addMoreEventGroups(kLastEventGroupId + 11, kLastEventGroupId + 12, @@ -2316,7 +2369,8 @@ TEST(thin_replica_server_test, DISABLED_SubscribeToPrivateEventGroupUpdateHashes storage, state_machine, buffer, - kClientId2); + kClientId2, + true); // add 1 more event group for kClientId1 addMoreEventGroups(kLastEventGroupId + 13, kLastEventGroupId + 13, @@ -2324,7 +2378,8 @@ TEST(thin_replica_server_test, DISABLED_SubscribeToPrivateEventGroupUpdateHashes storage, state_machine, buffer, - kClientId1); + kClientId1, + false); if (status != std::future_status::ready) { out_stream.wait(); } @@ -2340,6 +2395,7 @@ TEST(thin_replica_server_test, SubscribeToPublicEventGroupUpdateHashesWithGap) { EXPECT_EQ(storage.getLastEventGroupId(), kLastEventGroupId + 5); TestStateMachine state_machine{storage, storage_egs, 1, true}; state_machine.current_eg_type = EventGroupType::PublicEventGroupsOnly; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 9); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -2382,9 +2438,7 @@ TEST(thin_replica_server_test, SubscribeToPublicEventGroupUpdateHashesWithGap) { EXPECT_EQ(state_machine.numEventGroupsReceived(), total_event_groups); } -// Disabled due to instability in order to unblock master for other fixes to improve its stability. -// TODO (Alex): Develop a propper fix and re-enable this test. -TEST(thin_replica_server_test, DISABLED_SubscribeToPublicEventGroupUpdateHashesWithGapTwoClients) { +TEST(thin_replica_server_test, SubscribeToPublicEventGroupUpdateHashesWithGapTwoClients) { // Initialize storage and live update queue FakeStorage storage(generateEventGroupMap(1, kLastEventGroupId, EventGroupType::PublicEventGroupsOnly)); auto client2_egs = generateEventGroupMap( @@ -2395,6 +2449,7 @@ TEST(thin_replica_server_test, DISABLED_SubscribeToPublicEventGroupUpdateHashesW EXPECT_EQ(storage.getLastEventGroupId(), kLastEventGroupId + 5); TestStateMachine state_machine{storage, storage_egs, 1, true}; state_machine.current_eg_type = EventGroupType::PublicEventGroupsOnly; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 14); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -2430,14 +2485,17 @@ TEST(thin_replica_server_test, DISABLED_SubscribeToPublicEventGroupUpdateHashesW storage, state_machine, buffer, - kClientId2); + kClientId2, + true); // add 2 more public event groups addMoreEventGroups(kLastEventGroupId + 9, kLastEventGroupId + 10, EventGroupType::PublicEventGroupsOnly, storage, state_machine, - buffer); + buffer, + kClientId1, + true); // add 2 more event groups for kClientId2 addMoreEventGroups(kLastEventGroupId + 11, kLastEventGroupId + 12, @@ -2445,14 +2503,17 @@ TEST(thin_replica_server_test, DISABLED_SubscribeToPublicEventGroupUpdateHashesW storage, state_machine, buffer, - kClientId2); + kClientId2, + true); // add 2 more public event groups addMoreEventGroups(kLastEventGroupId + 13, kLastEventGroupId + 13, EventGroupType::PublicEventGroupsOnly, storage, state_machine, - buffer); + buffer, + kClientId1, + false); if (status != std::future_status::ready) { out_stream.wait(); } @@ -2468,6 +2529,7 @@ TEST(thin_replica_server_test, SubscribeToPublicAndPrivateEventGroupUpdateHashes EXPECT_EQ(storage.getLastEventGroupId(), kLastEventGroupId + 5); TestStateMachine state_machine{storage, storage_egs, 1, true}; state_machine.current_eg_type = EventGroupType::PublicAndPrivateEventGroups; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 9); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -2513,9 +2575,7 @@ TEST(thin_replica_server_test, SubscribeToPublicAndPrivateEventGroupUpdateHashes EXPECT_EQ(state_machine.numEventGroupsReceived(), total_event_groups); } -// Disabled due to instability in order to unblock master for other fixes to improve its stability. -// TODO (Alex): Develop a propper fix and re-enable this test. -TEST(thin_replica_server_test, DISABLED_SubscribeToPublicAndPrivateEventGroupUpdateHashesWithGapTwoClients) { +TEST(thin_replica_server_test, SubscribeToPublicAndPrivateEventGroupUpdateHashesWithGapTwoClients) { // Initialize storage and live update queue FakeStorage storage( generateEventGroupMap(1, kLastEventGroupId, EventGroupType::PublicAndPrivateEventGroups, kClientId1)); @@ -2527,6 +2587,7 @@ TEST(thin_replica_server_test, DISABLED_SubscribeToPublicAndPrivateEventGroupUpd EXPECT_EQ(storage.getLastEventGroupId(), kLastEventGroupId + 5); TestStateMachine state_machine{storage, storage_egs, 1, true}; state_machine.current_eg_type = EventGroupType::PublicAndPrivateEventGroups; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 14); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -2562,7 +2623,8 @@ TEST(thin_replica_server_test, DISABLED_SubscribeToPublicAndPrivateEventGroupUpd storage, state_machine, buffer, - kClientId2); + kClientId2, + true); // add 2 more public and private event groups addMoreEventGroups(kLastEventGroupId + 9, kLastEventGroupId + 10, @@ -2570,7 +2632,8 @@ TEST(thin_replica_server_test, DISABLED_SubscribeToPublicAndPrivateEventGroupUpd storage, state_machine, buffer, - kClientId1); + kClientId1, + true); // add 2 more private event groups addMoreEventGroups(kLastEventGroupId + 11, kLastEventGroupId + 12, @@ -2578,7 +2641,8 @@ TEST(thin_replica_server_test, DISABLED_SubscribeToPublicAndPrivateEventGroupUpd storage, state_machine, buffer, - kClientId2); + kClientId2, + true); // add 1 more public and/or private event group addMoreEventGroups(kLastEventGroupId + 13, kLastEventGroupId + 13, @@ -2586,7 +2650,8 @@ TEST(thin_replica_server_test, DISABLED_SubscribeToPublicAndPrivateEventGroupUpd storage, state_machine, buffer, - kClientId1); + kClientId1, + false); if (status != std::future_status::ready) { out_stream.wait(); } @@ -2899,6 +2964,7 @@ TEST(thin_replica_server_test, SubscribeWithPrunedEventGroupIdPrivateNewEgsAdded storage.prune(10, kClientId1); auto live_update_event_groups = generateEventGroupMap(0, 0, EventGroupType::PrivateEventGroupsOnly); TestStateMachine state_machine{storage, live_update_event_groups, 11, true}; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 7); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -2938,6 +3004,7 @@ TEST(thin_replica_server_test, SubscribeWithPrunedEventGroupIdPrivateNewEgsAdded storage.prune(10, kClientId1); auto live_update_event_groups = generateEventGroupMap(0, 0, EventGroupType::PrivateEventGroupsOnly); TestStateMachine state_machine{storage, live_update_event_groups, 11, true}; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 7); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -3047,6 +3114,7 @@ TEST(thin_replica_server_test, SubscribeWithPrunedEventGroupIdPublicNewEgsAdded) storage.prune(10, kPublicEgIdKey); auto live_update_event_groups = generateEventGroupMap(0, 0, EventGroupType::PublicEventGroupsOnly); TestStateMachine state_machine{storage, live_update_event_groups, 11, true}; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 7); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -3085,6 +3153,7 @@ TEST(thin_replica_server_test, SubscribeWithPrunedEventGroupIdPublicNewEgsAddedC storage.prune(10, kPublicEgIdKey); auto live_update_event_groups = generateEventGroupMap(0, 0, EventGroupType::PublicEventGroupsOnly); TestStateMachine state_machine{storage, live_update_event_groups, 11, true}; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 7); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -3196,6 +3265,7 @@ TEST(thin_replica_server_test, SubscribeWithPrunedEventGroupIdPublicAndPrivateNe storage.prune(5, kClientId1); auto live_update_event_groups = generateEventGroupMap(0, 0, EventGroupType::PublicAndPrivateEventGroups); TestStateMachine state_machine{storage, live_update_event_groups, 11, true}; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 7); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -3236,6 +3306,7 @@ TEST(thin_replica_server_test, SubscribeWithPrunedEventGroupIdPublicAndPrivateNe storage.prune(5, kClientId1); auto live_update_event_groups = generateEventGroupMap(0, 0, EventGroupType::PublicAndPrivateEventGroups); TestStateMachine state_machine{storage, live_update_event_groups, 11, true}; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 7); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -3415,6 +3486,7 @@ TEST(thin_replica_server_test, SubscribeToUpdatesLegacyTransition) { generateEventGroupMap(kLastEventGroupId + 1, kLastEventGroupId + 5, EventGroupType::PrivateEventGroupsOnly); TestStateMachine state_machine{storage, live_update_event_groups, 1, true}; state_machine.current_eg_type = EventGroupType::PublicEventGroupsOnly; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 7); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine}; @@ -3473,6 +3545,7 @@ TEST(thin_replica_server_test, SubscribeToUpdatesLegacyRequestEventGroups) { // Live updates can contain event groups only (storage_egs = live_updates) TestStateMachine state_machine{storage, storage_egs, 1, true}; state_machine.current_eg_type = EventGroupType::PrivateEventGroupsOnly; + state_machine.set_expected_last_event_group_to_send(kLastEventGroupId + 12); TestSubBufferList buffer{state_machine}; TestServerWriter stream{state_machine};