Skip to content

Commit

Permalink
Merge pull request #2712 from upshaw-alex/fix-client-pool-timer-test
Browse files Browse the repository at this point in the history
Rework client-pool-timer-test to avoid races
  • Loading branch information
upshaw-alex authored Jul 28, 2022
2 parents ac2769d + d4a064e commit 435b40a
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Concord
//
// Copyright (c) 2020 VMware, Inc. All Rights Reserved.
// Copyright (c) 2020-2022 VMware, Inc. All Rights Reserved.
//
// This product is licensed to you under the Apache 2.0 license (the "License").
// You may not use this product except in compliance with the Apache 2.0
Expand All @@ -23,11 +23,22 @@

namespace concord_client_pool {

// Note oddity: ClientT has to implement `operator<<`
/*
* Timer type used by Concord's Client Pool implementation for managing batching timeouts.
*
* Note oddity: ClientT has to implement `operator<<` (for logging purposes)
*/
template <typename ClientT>
class Timer {
public:
using Clock = std::chrono::high_resolution_clock;

/*
* Constructor for a Timer<ClientT> to (once started) asynchronously call on_timeout after timeout milliseconds. A
* value of 0 milliseconds for timeout is interpreted as meaning there is no timeout, and in that case, starting the
* constructed Timer will never cause it to call on_timeout. Any behavior of the constructed Timer should be
* considered undefined if timeout is a negative number of milliseconds.
*/
Timer(std::chrono::milliseconds timeout, std::function<void(ClientT&&)> on_timeout)
: timeout_{timeout},
on_timeout_{on_timeout},
Expand All @@ -38,8 +49,19 @@ class Timer {
}
}

/*
* Destructor for Timer<ClientT>. This Timer will not make any calls to its on_timeout function after its destructor
* has returned, however, if there is an ongoing timeout when this destructor begins, this Timer may call the
* on_timeout function concurrently with this destructor.
*/
~Timer() { stopTimerThread(); }

/*
* Stop and end any asynchronous thread this Timer has for managing timeouts and callbacks. This puts this Timer
* object into an inactive "done" state (this transition cannot be reversed). Once this funciton has returned, this
* Timer will not make any calls to its on_timeout function, however, if there is an ongoing timeout when
* stopTimerThread begins, this Timer may call its on_timeout function concurrently with stopTimerThread.
*/
void stopTimerThread() {
if (timeout_.count() == 0) {
return;
Expand All @@ -48,6 +70,16 @@ class Timer {
timer_thread_future_.wait();
}

/*
* Start the timer for a timeout, and make an asynchronous call to the on_timeout function (given at this Timer's
* construction) with client as a parameter after the timeout duration (also given at this Timer's construction).
* Note that, though this Timer's implementation should make a reasonable effort to make this callback after about
* the given timeout duration, no hard guarantees about the precise timing of this callback are provided. No callback
* will be made if this Timer was constructed with a timeout of 0 milliseconds or if stopTimerThread has previously
* been called for this Timer. Behavior is undefined if start is called at any time when there is an ongoing timeout
* from a previous call to start that has not either completed its callback or been successfully cancelled via
* cancel() or stopTimerThread().
*/
void start(const ClientT& client) {
if (timeout_.count() == 0 || not timer_thread_future_.valid() || io_context_.stopped()) {
LOG_WARN(logger_, "Timer cannot start for client " << client_);
Expand All @@ -66,6 +98,16 @@ class Timer {
LOG_DEBUG(logger_, "Timer set for client " << client_);
}

/*
* Attempt to cancel any ongoing timeout. If there is such an ongoing timeout and it is cancelled successfully, the
* call to this Timer's on_timeout function scheduled by the most recent call to start will not occur. If there is
* such an ongoing timeout and it could not be cancelled successfully, that call will still occur. Returns the
* approximate duration, in milliseconds, between the most recent time start begin timing a timeout and the time this
* call to cancel returns (this duration is guaranteed to be non-negative, but no other particular guarantees of its
* accuracy or precision are given). Instead returns a duration of 0 milliseconds if this Timer was constructed with
* a timeout duration of 0 milliseconds. Behavior is undefined if no calls to start have been made for this Timer
* prior to this call to cancel.
*/
std::chrono::milliseconds cancel() {
if (timeout_.count() == 0) {
return timeout_;
Expand Down
3 changes: 1 addition & 2 deletions client/client_pool/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,4 @@ target_link_libraries(client-pool-timer-test PUBLIC
concord_client_pool
)

# Disabled due to instability
# add_test(client-pool-timer-test client-pool-timer-test)
add_test(client-pool-timer-test client-pool-timer-test)
166 changes: 135 additions & 31 deletions client/client_pool/test/client_pool_timer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,50 +19,154 @@

using concord_client_pool::Timer;

using std::chrono::milliseconds;
using std::make_shared;
using std::make_unique;
using std::shared_ptr;

using namespace std::chrono_literals;

using TestClient = std::shared_ptr<void>;
using TestClient = shared_ptr<void>;

const milliseconds kTestTimeout = 5ms;

TEST(client_pool_timer, work_items) {
TEST(client_pool_timer, startCausesCallback) {
uint16_t num_times_called = 0;
std::chrono::milliseconds timeout = 10ms;
auto timer = Timer<TestClient>(timeout, [&num_times_called](TestClient&& c) -> void { num_times_called++; });
auto timer = Timer<TestClient>(kTestTimeout, [&num_times_called](TestClient&& c) -> void { num_times_called++; });

// Wait for timeout
TestClient client1;
timer.start(client1);
std::this_thread::sleep_for(timeout * 2);
ASSERT_EQ(num_times_called, 1);

// Cancel timeout
// Note while we expect the timer to make a call back after about kTestTimeout, it makes no hard timing guarantees,
// so we may have to wait longer sometimes.
while (num_times_called < 1) { // NOLINT(bugprone-infinite-loop): timer should update num_times_called
std::this_thread::sleep_for(kTestTimeout * 2);
}
EXPECT_EQ(num_times_called, 1) << "Timer::start caused an unexpected number of callbacks to timeout funciton.";

std::this_thread::sleep_for(kTestTimeout * 20);
EXPECT_EQ(num_times_called, 1) << "Timer::start appears to have caused extraneous callback(s) after the initial one.";

TestClient client2;
timer.start(client2);
timer.cancel();
ASSERT_EQ(num_times_called, 1);
while (num_times_called < 2) { // NOLINT(bugprone-infinite-loop): timer should update num_times_called
std::this_thread::sleep_for(kTestTimeout * 2);
}
EXPECT_EQ(num_times_called, 2) << "Timer::start caused an unexpected number of callbacks to timeout function.";

// Wait for timeout
TestClient client3;
timer.start(client3);
std::this_thread::sleep_for(timeout * 2);
ASSERT_EQ(num_times_called, 2);

// Wait for timeout
TestClient client4;
timer.start(client4);
std::this_thread::sleep_for(timeout * 2);
ASSERT_EQ(num_times_called, 3);

// Stop timer thread
TestClient client5;
timer.start(client5);
timer.stopTimerThread();
ASSERT_EQ(num_times_called, 3);

// Starting new timer won't work because the thread is stopped
TestClient client6;
timer.start(client6);
std::this_thread::sleep_for(timeout * 2);
ASSERT_EQ(num_times_called, 3);
while (num_times_called < 3) { // NOLINT(bugprone-infinite-loop): timer should update num_times_called
std::this_thread::sleep_for(kTestTimeout * 2);
}
EXPECT_EQ(num_times_called, 3) << "Timer::start caused an unexpected number of callbacks to timeout funciton.";
}

TEST(client_pool_timer, cancelReturnsNonNegativeDuration) {
uint16_t num_times_called = 0;
auto timer = Timer<TestClient>(kTestTimeout, [&num_times_called](TestClient&& c) -> void { num_times_called++; });

TestClient client;
timer.start(client);

// Note we do not test whether the started timeout was cancelled successfully, since Timer::cancel is not guaranteed
// to succeed in cancelling an ongoing timeout.
milliseconds time_to_cancellation = timer.cancel();
EXPECT_GE(time_to_cancellation.count(), 0) << "Timer::cancel returned a negative duration.";

time_to_cancellation = timer.cancel();
EXPECT_GE(time_to_cancellation.count(), 0)
<< "Timer::cancel returned a negative duration when called a second time without starting a new timeout.";
}

TEST(client_pool_timer, stopTimerThreadStopsCallbacks) {
uint16_t timer_1_num_times_called = 0;
auto timer1 = Timer<TestClient>(kTestTimeout,
[&timer_1_num_times_called](TestClient&& c) -> void { timer_1_num_times_called++; });

timer1.stopTimerThread();

TestClient client1;
timer1.start(client1);
std::this_thread::sleep_for(kTestTimeout * 20);
EXPECT_EQ(timer_1_num_times_called, 0) << "Timer::start caused a callback after Timer::stopTimerThread completed.";

uint16_t timer_2_num_times_called = 0;
auto timer2 = Timer<TestClient>(kTestTimeout,
[&timer_2_num_times_called](TestClient&& c) -> void { timer_2_num_times_called++; });

TestClient client2;
timer2.start(client2);
timer2.stopTimerThread();
uint16_t calls_shortly_after_stopping = timer_2_num_times_called;

TestClient client3;
timer2.start(client3);
std::this_thread::sleep_for(kTestTimeout * 20);
EXPECT_EQ(timer_2_num_times_called, calls_shortly_after_stopping)
<< "Timer::start caused a callback after Timer::stopTimerThread completed, when stopTimerThread had been called "
"while there was potentially an ongoing timeout.";
}

TEST(client_pool_timer, startCausesCallbacksWithTheCorrectClient) {
auto most_recent_client_called = make_shared<uint16_t>(0);
uint16_t num_times_called = 0;
auto timer = Timer<shared_ptr<uint16_t>>(
kTestTimeout, [&num_times_called, &most_recent_client_called](shared_ptr<uint16_t>&& c) -> void {
most_recent_client_called = c;
num_times_called++;
});

auto client1 = make_shared<uint16_t>(1);
timer.start(client1);

// Note while we expect the timer to make a call back after about kTestTimeout, it makes no hard timing guarantees,
// so we may have to wait longer sometimes.
while (num_times_called < 1) { // NOLINT(bugprone-infinite-loop): timer should update num_times_called
std::this_thread::sleep_for(kTestTimeout * 2);
}
EXPECT_EQ(*most_recent_client_called, *client1)
<< "Timer::start did not correctly provide its ClientT parameter to the Timer's on_timeout function on callback.";

auto client2 = make_shared<uint16_t>(2);
timer.start(client2);
while (num_times_called < 2) { // NOLINT(bugprone-infinite-loop): timer should update num_times_called
std::this_thread::sleep_for(kTestTimeout * 2);
}
EXPECT_EQ(*most_recent_client_called, *client2)
<< "Timer::start did not correctly provide its ClientT parameter to the Timer's on_timeout function on callback.";
}

TEST(client_pool_timer, noCallbacksCompleteAfterDestructor) {
uint16_t num_times_called = 0;
auto timer =
make_unique<Timer<TestClient>>(kTestTimeout, [&num_times_called](TestClient&& c) -> void { num_times_called++; });

TestClient client1;
timer->start(client1);
timer.reset();
uint16_t calls_shortly_after_destructor = num_times_called;
std::this_thread::sleep_for(kTestTimeout * 20);
EXPECT_EQ(num_times_called, calls_shortly_after_destructor)
<< "A Timer object appears to have made a callback after its destructor completed.";
}

TEST(client_pool_timer, timerWith0TimeoutMakesNoCallbacks) {
uint16_t num_times_called = 0;
auto timer = Timer<TestClient>(0ms, [&num_times_called](TestClient&& c) -> void { num_times_called++; });

TestClient client1;
timer.start(client1);

std::this_thread::sleep_for(kTestTimeout * 20);
EXPECT_EQ(num_times_called, 0)
<< "Timer::start appears to have caused a callback for a Timer with a timeout interval of 0.";

EXPECT_EQ(timer.cancel(), 0ms)
<< "Timer::cancel returned an unexpected value for a Timer constructed with a timeout interval of 0.";

EXPECT_NO_THROW(timer.stopTimerThread())
<< "Timer::stopTimerThread unexpectedly threw an exception for a Timer constructed with a timeout interval of 0.";
}

int main(int argc, char** argv) {
Expand Down

0 comments on commit 435b40a

Please sign in to comment.