Skip to content

Commit

Permalink
Make the timeout watchdog more consistent (ad-freiburg#1239)
Browse files Browse the repository at this point in the history
The public function `CancellationHandle::isCancelled()` now also pleases the cancellation watchdog and suppresses the warning about missed timeout checks. This will lead to less such warnings during fully materialized `IndesScan`s.
Also avoid the duplicate printing of this warning message if two concurrently running operations of the same query detect the missed cancellation check at exactly the same time.
  • Loading branch information
RobinTF authored Jan 24, 2024
1 parent a53a3b0 commit 3dd1570
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 39 deletions.
2 changes: 1 addition & 1 deletion src/index/CompressedRelation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ IdTable CompressedRelationReader::scan(
// block in parallel
#pragma omp task
{
if (!cancellationHandle->isCancelled()) {
if (!cancellationHandle->isCancelled("CompressedRelation scan")) {
decompressLambda();
}
}
Expand Down
11 changes: 7 additions & 4 deletions src/util/CancellationHandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@ void CancellationHandle<Mode>::startWatchDogInternal() requires WatchDogEnabled
cancellationState_.compare_exchange_strong(state, WAITING_FOR_CHECK,
std::memory_order_relaxed);
} else if (state == WAITING_FOR_CHECK) {
if (cancellationState_.compare_exchange_strong(
state, CHECK_WINDOW_MISSED, std::memory_order_relaxed)) {
startTimeoutWindow_ = steady_clock::now();
}
// This variable needs to be set before compare exchange,
// otherwise another thread might read an old value before
// the new value is set. This might lead to redundant stores,
// which is acceptable here.
startTimeoutWindow_ = steady_clock::now();
cancellationState_.compare_exchange_strong(state, CHECK_WINDOW_MISSED,
std::memory_order_relaxed);
}
} while (!watchDogState_.conditionVariable_.wait_for(
lock, DESIRED_CANCELLATION_CHECK_INTERVAL,
Expand Down
63 changes: 45 additions & 18 deletions src/util/CancellationHandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,27 +121,44 @@ class CancellationHandle {
/// Make sure internal state is set back to
/// `CancellationState::NOT_CANCELLED`, in order to prevent logging warnings
/// in the console that would otherwise be triggered by the watchdog.
/// NOTE: The parameter state is expected to be one of `CHECK_WINDOW_MISSED`
/// or `WAITING_FOR_CHECK`, otherwise it will violate the correctness check.
template <typename... ArgTypes>
void pleaseWatchDog(CancellationState state,
const InvocableWithConvertibleReturnType<
std::string_view, ArgTypes...> auto& detailSupplier,
ArgTypes&&... argTypes) requires WatchDogEnabled {
using DurationType =
std::remove_const_t<decltype(DESIRED_CANCELLATION_CHECK_INTERVAL)>;
AD_CORRECTNESS_CHECK(!detail::isCancelled(state) &&
state != CancellationState::NOT_CANCELLED);

if (state == CancellationState::CHECK_WINDOW_MISSED) {
LOG(WARN) << "Cancellation check missed deadline of "
<< ParseableDuration{DESIRED_CANCELLATION_CHECK_INTERVAL}
<< " by "
<< ParseableDuration{duration_cast<DurationType>(
steady_clock::now() - startTimeoutWindow_.load())}
<< ". Stage: "
<< std::invoke(detailSupplier, AD_FWD(argTypes)...)
<< std::endl;
}

cancellationState_.compare_exchange_strong(
state, CancellationState::NOT_CANCELLED, std::memory_order_relaxed);
bool windowMissed = state == CancellationState::CHECK_WINDOW_MISSED;
// Because we know `state` will be one of `CHECK_WINDOW_MISSED` or
// `WAITING_FOR_CHECK` at this point, we can skip the initial check.
do {
if (cancellationState_.compare_exchange_weak(
state, CancellationState::NOT_CANCELLED,
std::memory_order_relaxed)) {
if (windowMissed) {
LOG(WARN) << "The time since the last timeout check is at least "
<< ParseableDuration{duration_cast<DurationType>(
steady_clock::now() -
startTimeoutWindow_.load()) +
DESIRED_CANCELLATION_CHECK_INTERVAL}
<< ", should be at most "
<< ParseableDuration{DESIRED_CANCELLATION_CHECK_INTERVAL}
<< ". Stage: "
<< std::invoke(detailSupplier, AD_FWD(argTypes)...)
<< std::endl;
}
break;
}
// If state is NOT_CANCELLED this means another thread already reported
// the missed deadline, so we don't report a second time or a cancellation
// kicked in and there is no need to continue the loop.
} while (!detail::isCancelled(state) &&
state != CancellationState::NOT_CANCELLED);
}

/// Internal function that starts the watch dog. It will set this
Expand Down Expand Up @@ -196,12 +213,18 @@ class CancellationHandle {
/// Return true if this cancellation handle has been cancelled, false
/// otherwise. Note: Make sure to not use this value to set any other atomic
/// value with relaxed memory ordering, as this may lead to out-of-thin-air
/// values. It also does not interact with the watchdog at all, prefer
/// `throwIfCancelled` if possible.
AD_ALWAYS_INLINE bool isCancelled() const {
/// values. If the watchdog is enabled, this will please it and print
/// a warning with the passed detail string.
AD_ALWAYS_INLINE bool isCancelled(std::string_view details) {
if constexpr (CancellationEnabled) {
return detail::isCancelled(
cancellationState_.load(std::memory_order_relaxed));
auto state = cancellationState_.load(std::memory_order_relaxed);
bool isCancelled = detail::isCancelled(state);
if constexpr (WatchDogEnabled) {
if (!isCancelled && state != CancellationState::NOT_CANCELLED) {
pleaseWatchDog(state, std::identity{}, details);
}
}
return isCancelled;
} else {
return false;
}
Expand Down Expand Up @@ -234,6 +257,10 @@ class CancellationHandle {
FRIEND_TEST(CancellationHandle,
verifyCheckAfterDeadlineMissDoesReportProperly);
FRIEND_TEST(CancellationHandle, verifyWatchDogDoesNotChangeStateAfterCancel);
FRIEND_TEST(CancellationHandle, verifyPleaseWatchDogReportsOnlyWhenNecessary);
FRIEND_TEST(CancellationHandle, verifyIsCancelledDoesPleaseWatchDog);
FRIEND_TEST(CancellationHandle,
verifyPleaseWatchDogDoesNotAcceptInvalidState);
};

using SharedCancellationHandle = std::shared_ptr<CancellationHandle<>>;
Expand Down
101 changes: 95 additions & 6 deletions test/CancellationHandleTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ TEST(CancellationHandle, verifyConstructorDoesNotAcceptNoReason) {
TYPED_TEST(CancellationHandleFixture, verifyNotCancelledByDefault) {
auto& handle = this->handle_;

EXPECT_FALSE(handle.isCancelled());
EXPECT_FALSE(handle.isCancelled(""));
EXPECT_NO_THROW(handle.throwIfCancelled(""));
EXPECT_NO_THROW(handle.throwIfCancelled([]() { return ""; }));
}
Expand All @@ -70,7 +70,7 @@ TYPED_TEST(CancellationHandleFixture, verifyTimeoutCancellationWorks) {
handle.cancel(TIMEOUT);

auto timeoutMessageMatcher = AllOf(HasSubstr(detail), HasSubstr("timeout"));
EXPECT_TRUE(handle.isCancelled());
EXPECT_TRUE(handle.isCancelled(""));
AD_EXPECT_THROW_WITH_MESSAGE_AND_TYPE(handle.throwIfCancelled(detail),
timeoutMessageMatcher,
CancellationException);
Expand All @@ -88,7 +88,7 @@ TYPED_TEST(CancellationHandleFixture, verifyManualCancellationWorks) {

auto cancellationMessageMatcher =
AllOf(HasSubstr(detail), HasSubstr("manual cancellation"));
EXPECT_TRUE(handle.isCancelled());
EXPECT_TRUE(handle.isCancelled(""));
AD_EXPECT_THROW_WITH_MESSAGE_AND_TYPE(handle.throwIfCancelled(detail),
cancellationMessageMatcher,
CancellationException);
Expand Down Expand Up @@ -116,7 +116,7 @@ TYPED_TEST(CancellationHandleFixture,
}
},
CancellationException);
EXPECT_TRUE(handle.isCancelled());
EXPECT_TRUE(handle.isCancelled(""));
}

// _____________________________________________________________________________
Expand Down Expand Up @@ -264,6 +264,8 @@ TEST(CancellationHandle, verifyCheckDoesNotOverrideCancelledState) {
// _____________________________________________________________________________

TEST(CancellationHandle, verifyCheckAfterDeadlineMissDoesReportProperly) {
// If the log level is not high enough this test will fail
static_assert(LOGLEVEL >= WARN);
auto& choice = ad_utility::LogstreamChoice::get();
CancellationHandle<ENABLED> handle;

Expand All @@ -284,15 +286,102 @@ TEST(CancellationHandle, verifyCheckAfterDeadlineMissDoesReportProperly) {
HasSubstr(ParseableDuration{DESIRED_CANCELLATION_CHECK_INTERVAL}
.toString()),
// Check for small miss window
ContainsRegex("by [0-9]ms")));
ContainsRegex("least 5[0-9]ms")));
// This test assumes this interval to be 50ms to build the regex
static_assert(DESIRED_CANCELLATION_CHECK_INTERVAL == 50ms);
}

// _____________________________________________________________________________

TEST(CancellationHandle, verifyPleaseWatchDogReportsOnlyWhenNecessary) {
// If the log level is not high enough this test will fail
static_assert(LOGLEVEL >= WARN);
auto& choice = ad_utility::LogstreamChoice::get();
CancellationHandle<ENABLED> handle;

auto& originalOStream = choice.getStream();
absl::Cleanup cleanup{[&]() { choice.setStream(&originalOStream); }};

std::ostringstream testStream;
choice.setStream(&testStream);

handle.startTimeoutWindow_ = std::chrono::steady_clock::now();
handle.cancellationState_ = CHECK_WINDOW_MISSED;

// The first call should trigger a log
handle.pleaseWatchDog(CHECK_WINDOW_MISSED, std::identity{}, "my-detail");

EXPECT_EQ(handle.cancellationState_, NOT_CANCELLED);
EXPECT_THAT(std::move(testStream).str(), HasSubstr("my-detail"));

testStream.str("");

// The second call should not trigger a log because the state has already
// been reset
handle.pleaseWatchDog(CHECK_WINDOW_MISSED, std::identity{}, "other-detail");

EXPECT_EQ(handle.cancellationState_, NOT_CANCELLED);
EXPECT_THAT(std::move(testStream).str(), Not(HasSubstr("other-detail")));

handle.cancellationState_ = CHECK_WINDOW_MISSED;
testStream.str("");

// WAITING_FOR_CHECK should not trigger a log
handle.pleaseWatchDog(WAITING_FOR_CHECK, std::identity{}, "my-detail");

EXPECT_EQ(handle.cancellationState_, NOT_CANCELLED);
EXPECT_THAT(std::move(testStream).str(), Not(HasSubstr("my-detail")));
}

// _____________________________________________________________________________

TEST(CancellationHandle, verifyPleaseWatchDogDoesNotAcceptInvalidState) {
CancellationHandle<ENABLED> handle;
EXPECT_THROW(handle.pleaseWatchDog(NOT_CANCELLED, std::identity{}, ""),
ad_utility::Exception);
EXPECT_THROW(handle.pleaseWatchDog(MANUAL, std::identity{}, ""),
ad_utility::Exception);
EXPECT_THROW(handle.pleaseWatchDog(TIMEOUT, std::identity{}, ""),
ad_utility::Exception);
}

// _____________________________________________________________________________

TEST(CancellationHandle, verifyIsCancelledDoesPleaseWatchDog) {
// If the log level is not high enough this test will fail
static_assert(LOGLEVEL >= WARN);
auto& choice = ad_utility::LogstreamChoice::get();
CancellationHandle<ENABLED> handle;

auto& originalOStream = choice.getStream();
absl::Cleanup cleanup{[&]() { choice.setStream(&originalOStream); }};

std::ostringstream testStream;
choice.setStream(&testStream);

handle.startTimeoutWindow_ = std::chrono::steady_clock::now();
handle.cancellationState_ = CHECK_WINDOW_MISSED;

handle.isCancelled("my-detail");

EXPECT_EQ(handle.cancellationState_, NOT_CANCELLED);
EXPECT_THAT(std::move(testStream).str(), HasSubstr("my-detail"));

handle.cancellationState_ = WAITING_FOR_CHECK;
testStream.str("");

handle.isCancelled("my-detail");

EXPECT_EQ(handle.cancellationState_, NOT_CANCELLED);
EXPECT_THAT(std::move(testStream).str(), Not(HasSubstr("my-detail")));
}

// _____________________________________________________________________________

TEST(CancellationHandle, expectDisabledHandleIsAlwaysFalse) {
CancellationHandle<DISABLED> handle;

EXPECT_FALSE(handle.isCancelled());
EXPECT_FALSE(handle.isCancelled(""));
EXPECT_NO_THROW(handle.throwIfCancelled("Abc"));
}

Expand Down
20 changes: 10 additions & 10 deletions test/WebSocketSessionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ ASYNC_TEST(WebSocketSession, verifyCancelStringTriggersCancellation) {
net::use_awaitable);
ASSERT_TRUE(webSocket.is_open());

EXPECT_FALSE(cancellationHandle->isCancelled());
EXPECT_FALSE(cancellationHandle->isCancelled(""));

// Wrong keyword should be ignored
co_await webSocket.async_write(toBuffer("other"), net::use_awaitable);
Expand All @@ -202,15 +202,15 @@ ASYNC_TEST(WebSocketSession, verifyCancelStringTriggersCancellation) {
net::steady_timer timer{c.strand_, clientTimeout};
co_await timer.async_wait(net::use_awaitable);

EXPECT_FALSE(cancellationHandle->isCancelled());
EXPECT_FALSE(cancellationHandle->isCancelled(""));

co_await webSocket.async_write(toBuffer("cancel"), net::use_awaitable);

// Give server some time to process cancellation request
timer.expires_after(clientTimeout);
co_await timer.async_wait(net::use_awaitable);

EXPECT_TRUE(cancellationHandle->isCancelled());
EXPECT_TRUE(cancellationHandle->isCancelled(""));
AD_EXPECT_THROW_WITH_MESSAGE_AND_TYPE(
cancellationHandle->throwIfCancelled(""),
HasSubstr("manual cancellation"), ad_utility::CancellationException);
Expand Down Expand Up @@ -298,17 +298,17 @@ ASYNC_TEST(WebSocketSession, verifyCancelOnCloseStringTriggersCancellation) {
net::use_awaitable);
ASSERT_TRUE(webSocket.is_open());

EXPECT_FALSE(cancellationHandle->isCancelled());
EXPECT_FALSE(cancellationHandle->isCancelled(""));

// Wrong keyword should be ignored
co_await webSocket.async_write(toBuffer("other"), net::use_awaitable);

EXPECT_FALSE(cancellationHandle->isCancelled());
EXPECT_FALSE(cancellationHandle->isCancelled(""));

co_await webSocket.async_write(toBuffer("cancel_on_close"),
net::use_awaitable);

EXPECT_FALSE(cancellationHandle->isCancelled());
EXPECT_FALSE(cancellationHandle->isCancelled(""));

// Wrong keyword should be ignored
co_await webSocket.async_write(toBuffer("other2"), net::use_awaitable);
Expand All @@ -317,7 +317,7 @@ ASYNC_TEST(WebSocketSession, verifyCancelOnCloseStringTriggersCancellation) {
net::steady_timer timer{c.strand_, clientTimeout};
co_await timer.async_wait(net::use_awaitable);

EXPECT_FALSE(cancellationHandle->isCancelled());
EXPECT_FALSE(cancellationHandle->isCancelled(""));

co_await webSocket.async_close(boost::beast::websocket::close_code::normal,
net::use_awaitable);
Expand All @@ -326,7 +326,7 @@ ASYNC_TEST(WebSocketSession, verifyCancelOnCloseStringTriggersCancellation) {
timer.expires_after(clientTimeout);
co_await timer.async_wait(net::use_awaitable);

EXPECT_TRUE(cancellationHandle->isCancelled());
EXPECT_TRUE(cancellationHandle->isCancelled(""));
AD_EXPECT_THROW_WITH_MESSAGE_AND_TYPE(
cancellationHandle->throwIfCancelled(""),
HasSubstr("manual cancellation"), ad_utility::CancellationException);
Expand Down Expand Up @@ -365,7 +365,7 @@ ASYNC_TEST(WebSocketSession, verifyWithoutClientActionNoCancelDoesHappen) {
net::use_awaitable);
ASSERT_TRUE(webSocket.is_open());

EXPECT_FALSE(cancellationHandle->isCancelled());
EXPECT_FALSE(cancellationHandle->isCancelled(""));

// Wrong keyword should be ignored
co_await webSocket.async_write(toBuffer("other"), net::use_awaitable);
Expand All @@ -374,7 +374,7 @@ ASYNC_TEST(WebSocketSession, verifyWithoutClientActionNoCancelDoesHappen) {
co_await net::co_spawn(c.strand_, c.serverLogic() && controllerActions(),
net::use_awaitable);

EXPECT_FALSE(cancellationHandle->isCancelled());
EXPECT_FALSE(cancellationHandle->isCancelled(""));
}

// _____________________________________________________________________________
Expand Down

0 comments on commit 3dd1570

Please sign in to comment.