Skip to content

Commit

Permalink
Store query inside query registry for nicer debugging purposes
Browse files Browse the repository at this point in the history
  • Loading branch information
RobinTF committed Jan 31, 2024
1 parent ea905cc commit 0b0ea70
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 40 deletions.
16 changes: 11 additions & 5 deletions src/engine/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,11 @@ Awaitable<void> Server::process(
} else if (auto cmd =
checkParameter("cmd", "dump-active-queries", accessTokenOk)) {
logCommand(cmd, "dump active queries");
response = createJsonResponse(queryRegistry_.getActiveQueries(), request);
nlohmann::json json;
for (auto& [key, value] : queryRegistry_.getActiveQueries()) {
json[nlohmann::json(std::move(key))] = std::move(value);
}
response = createJsonResponse(json, request);
}

// Ping with or without messsage.
Expand Down Expand Up @@ -488,13 +492,15 @@ class QueryAlreadyInUseError : public std::runtime_error {
// _____________________________________________

ad_utility::websocket::OwningQueryId Server::getQueryId(
const ad_utility::httpUtils::HttpRequest auto& request) {
const ad_utility::httpUtils::HttpRequest auto& request,
const std::string& query) {
using ad_utility::websocket::OwningQueryId;
std::string_view queryIdHeader = request.base()["Query-Id"];
if (queryIdHeader.empty()) {
return queryRegistry_.uniqueId();
return queryRegistry_.uniqueId(query);
}
auto queryId = queryRegistry_.uniqueIdFromString(std::string(queryIdHeader));
auto queryId =
queryRegistry_.uniqueIdFromString(std::string(queryIdHeader), query);
if (!queryId) {
throw QueryAlreadyInUseError{queryIdHeader};
}
Expand Down Expand Up @@ -637,7 +643,7 @@ boost::asio::awaitable<void> Server::processQuery(
auto queryHub = queryHub_.lock();
AD_CORRECTNESS_CHECK(queryHub);
auto messageSender = co_await ad_utility::websocket::MessageSender::create(
getQueryId(request), *queryHub);
getQueryId(request, query), *queryHub);
// Do the query planning. This creates a `QueryExecutionTree`, which will
// then be used to process the query.
//
Expand Down
4 changes: 3 additions & 1 deletion src/engine/Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,13 @@ class Server {
/// `QueryAlreadyInUseError` exception is thrown.
///
/// \param request The HTTP request to extract the id from.
/// \param query A string representation of the query to register an id for.
///
/// \return An OwningQueryId object. It removes itself from the registry
/// on destruction.
ad_utility::websocket::OwningQueryId getQueryId(
const ad_utility::httpUtils::HttpRequest auto& request);
const ad_utility::httpUtils::HttpRequest auto& request,
const std::string& query);

/// Schedule a task to trigger the timeout after the `timeLimit`.
/// The returned callback can be used to prevent this task from executing
Expand Down
28 changes: 18 additions & 10 deletions src/util/http/websocket/QueryId.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,12 @@ static_assert(!std::is_copy_assignable_v<OwningQueryId>);

/// A factory class to create unique query ids within each individual instance.
class QueryRegistry {
struct CancellationHandleWithQuery {
SharedCancellationHandle cancellationHandle_;
std::string query_;
};
using SynchronizedType = ad_utility::Synchronized<
ad_utility::HashMap<QueryId, SharedCancellationHandle>>;
ad_utility::HashMap<QueryId, CancellationHandleWithQuery>>;
// Technically no shared pointer is required because the registry lives
// for the entire lifetime of the application, but since the instances of
// `OwningQueryId` need to deregister themselves again they need some
Expand All @@ -87,11 +91,14 @@ class QueryRegistry {
/// \return A std::optional<OwningQueryId> object wrapping the passed string
/// if it was not present in the registry before. An empty
/// std::optional if the id already existed before.
std::optional<OwningQueryId> uniqueIdFromString(std::string id) {
std::optional<OwningQueryId> uniqueIdFromString(std::string id,
const std::string& query) {
auto queryId = QueryId::idFromString(std::move(id));
bool success =
registry_->wlock()
->emplace(queryId, std::make_shared<CancellationHandle<>>())
->emplace(queryId,
CancellationHandleWithQuery{
std::make_shared<CancellationHandle<>>(), query})
.second;
if (success) {
// Avoid undefined behavior when the registry is no longer alive at the
Expand All @@ -111,25 +118,26 @@ class QueryRegistry {
}

/// Generates a unique pseudo-random OwningQueryId object for this registry
OwningQueryId uniqueId() {
/// and associates it with the given query.
OwningQueryId uniqueId(const std::string& query) {
static thread_local std::mt19937 generator(std::random_device{}());
std::uniform_int_distribution<uint64_t> distrib{};
std::optional<OwningQueryId> result;
do {
result = uniqueIdFromString(std::to_string(distrib(generator)));
result = uniqueIdFromString(std::to_string(distrib(generator)), query);
} while (!result.has_value());
return std::move(result.value());
}

/// Member function that acquires a read lock and returns a vector
/// of all currently registered queries.
std::vector<QueryId> getActiveQueries() const {
ad_utility::HashMap<QueryId, std::string> getActiveQueries() const {
return registry_->withReadLock([](const auto& map) {
// TODO<C++23> Use `ranges::to` to transform map keys into vector
std::vector<QueryId> result;
// TODO<C++23> Use `ranges::to` to transform map keys into map
ad_utility::HashMap<QueryId, std::string> result;
result.reserve(map.size());
for (const auto& entry : map) {
result.emplace_back(entry.first);
result.emplace(entry.first, entry.second.query_);
}
return result;
});
Expand All @@ -140,7 +148,7 @@ class QueryRegistry {
SharedCancellationHandle getCancellationHandle(const QueryId& queryId) const {
return registry_->withReadLock([&queryId](const auto& map) {
auto it = map.find(queryId);
return it != map.end() ? it->second : nullptr;
return it != map.end() ? it->second.cancellationHandle_ : nullptr;
});
}
};
Expand Down
6 changes: 3 additions & 3 deletions test/MessageSenderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ using ::testing::VariantWith;

ASYNC_TEST(MessageSender, destructorCallsSignalEnd) {
QueryRegistry queryRegistry;
OwningQueryId queryId = queryRegistry.uniqueId();
OwningQueryId queryId = queryRegistry.uniqueId("my-query");
QueryHub queryHub{ioContext};

auto distributor = co_await queryHub.createOrAcquireDistributorForReceiving(
Expand All @@ -47,7 +47,7 @@ ASYNC_TEST(MessageSender, destructorCallsSignalEnd) {

ASYNC_TEST(MessageSender, callingOperatorBroadcastsPayload) {
QueryRegistry queryRegistry;
OwningQueryId queryId = queryRegistry.uniqueId();
OwningQueryId queryId = queryRegistry.uniqueId("my-query");
QueryHub queryHub{ioContext};

{
Expand Down Expand Up @@ -85,7 +85,7 @@ ASYNC_TEST(MessageSender, callingOperatorBroadcastsPayload) {

ASYNC_TEST(MessageSender, testGetQueryIdGetterWorks) {
QueryRegistry queryRegistry;
OwningQueryId queryId = queryRegistry.uniqueId();
OwningQueryId queryId = queryRegistry.uniqueId("my-query");
QueryId reference = queryId.toQueryId();
QueryHub queryHub{ioContext};

Expand Down
45 changes: 27 additions & 18 deletions test/QueryIdTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
using ad_utility::websocket::OwningQueryId;
using ad_utility::websocket::QueryId;
using ad_utility::websocket::QueryRegistry;
using ::testing::ContainerEq;
using ::testing::ElementsAre;
using ::testing::IsEmpty;
using ::testing::UnorderedElementsAre;

TEST(QueryId, checkIdEqualityRelation) {
auto queryIdOne = QueryId::idFromString("some-id");
Expand Down Expand Up @@ -53,8 +53,8 @@ TEST(QueryId, veriyToJsonWorks) {

TEST(QueryRegistry, verifyUniqueIdProvidesUniqueIds) {
QueryRegistry registry{};
auto queryIdOne = registry.uniqueId();
auto queryIdTwo = registry.uniqueId();
auto queryIdOne = registry.uniqueId("my-query");
auto queryIdTwo = registry.uniqueId("my-query");

EXPECT_NE(queryIdOne.toQueryId(), queryIdTwo.toQueryId());
}
Expand All @@ -63,8 +63,10 @@ TEST(QueryRegistry, verifyUniqueIdProvidesUniqueIds) {

TEST(QueryRegistry, verifyUniqueIdFromStringEnforcesUniqueness) {
QueryRegistry registry{};
auto optionalQueryIdOne = registry.uniqueIdFromString("01123581321345589144");
auto optionalQueryIdTwo = registry.uniqueIdFromString("01123581321345589144");
auto optionalQueryIdOne =
registry.uniqueIdFromString("01123581321345589144", "my-query");
auto optionalQueryIdTwo =
registry.uniqueIdFromString("01123581321345589144", "my-query");

EXPECT_TRUE(optionalQueryIdOne.has_value());
EXPECT_FALSE(optionalQueryIdTwo.has_value());
Expand All @@ -75,11 +77,13 @@ TEST(QueryRegistry, verifyUniqueIdFromStringEnforcesUniqueness) {
TEST(QueryRegistry, verifyIdIsUnregisteredAfterUse) {
QueryRegistry registry{};
{
auto optionalQueryId = registry.uniqueIdFromString("01123581321345589144");
auto optionalQueryId =
registry.uniqueIdFromString("01123581321345589144", "my-query");
EXPECT_TRUE(optionalQueryId.has_value());
}
{
auto optionalQueryId = registry.uniqueIdFromString("01123581321345589144");
auto optionalQueryId =
registry.uniqueIdFromString("01123581321345589144", "my-query");
EXPECT_TRUE(optionalQueryId.has_value());
}
}
Expand All @@ -89,8 +93,10 @@ TEST(QueryRegistry, verifyIdIsUnregisteredAfterUse) {
TEST(QueryRegistry, demonstrateRegistryLocalUniqueness) {
QueryRegistry registryOne{};
QueryRegistry registryTwo{};
auto optQidOne = registryOne.uniqueIdFromString("01123581321345589144");
auto optQidTwo = registryTwo.uniqueIdFromString("01123581321345589144");
auto optQidOne =
registryOne.uniqueIdFromString("01123581321345589144", "my-query");
auto optQidTwo =
registryTwo.uniqueIdFromString("01123581321345589144", "my-query");
ASSERT_TRUE(optQidOne.has_value());
ASSERT_TRUE(optQidTwo.has_value());
// The QueryId object doesn't know anything about registries,
Expand All @@ -106,15 +112,15 @@ TEST(QueryRegistry, performCleanupFromDestroyedRegistry) {
std::unique_ptr<OwningQueryId> holder;
{
QueryRegistry registry{};
holder = std::make_unique<OwningQueryId>(registry.uniqueId());
holder = std::make_unique<OwningQueryId>(registry.uniqueId("my-query"));
}
}

// _____________________________________________________________________________

TEST(QueryRegistry, verifyCancellationHandleIsCreated) {
QueryRegistry registry{};
auto queryId = registry.uniqueId();
auto queryId = registry.uniqueId("my-query");

auto handle1 = registry.getCancellationHandle(queryId.toQueryId());
auto handle2 = registry.getCancellationHandle(queryId.toQueryId());
Expand All @@ -138,24 +144,27 @@ TEST(QueryRegistry, verifyCancellationHandleIsNullptrIfNotPresent) {
// _____________________________________________________________________________

TEST(QueryRegistry, verifyGetActiveQueriesReturnsAllActiveQueries) {
using MapType = ad_utility::HashMap<QueryId, std::string>;
QueryRegistry registry{};

EXPECT_THAT(registry.getActiveQueries(), IsEmpty());

{
auto queryId1 = registry.uniqueId();
auto queryId1 = registry.uniqueId("my-query");

EXPECT_THAT(registry.getActiveQueries(), ElementsAre(queryId1.toQueryId()));
EXPECT_THAT(registry.getActiveQueries(),
ContainerEq(MapType{{queryId1.toQueryId(), "my-query"}}));

{
auto queryId2 = registry.uniqueId();
auto queryId2 = registry.uniqueId("other-query");

EXPECT_THAT(
registry.getActiveQueries(),
UnorderedElementsAre(queryId1.toQueryId(), queryId2.toQueryId()));
EXPECT_THAT(registry.getActiveQueries(),
ContainerEq(MapType{{queryId1.toQueryId(), "my-query"},
{queryId2.toQueryId(), "other-query"}}));
}

EXPECT_THAT(registry.getActiveQueries(), ElementsAre(queryId1.toQueryId()));
EXPECT_THAT(registry.getActiveQueries(),
ContainerEq(MapType{{queryId1.toQueryId(), "my-query"}}));
}

EXPECT_THAT(registry.getActiveQueries(), IsEmpty());
Expand Down
6 changes: 3 additions & 3 deletions test/WebSocketSessionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ ASYNC_TEST(WebSocketSession, verifySessionEndsWhenServerIsDoneSending) {
ASYNC_TEST(WebSocketSession, verifyCancelStringTriggersCancellation) {
auto c = co_await createTestContainer(ioContext);

auto queryId = c.registry_.uniqueIdFromString("some-id");
auto queryId = c.registry_.uniqueIdFromString("some-id", "my-query");
ASSERT_TRUE(queryId.has_value());
auto cancellationHandle =
c.registry_.getCancellationHandle(queryId->toQueryId());
Expand Down Expand Up @@ -285,7 +285,7 @@ ASYNC_TEST(WebSocketSession, verifyWrongExecutorConfigThrows) {
ASYNC_TEST(WebSocketSession, verifyCancelOnCloseStringTriggersCancellation) {
auto c = co_await createTestContainer(ioContext);

auto queryId = c.registry_.uniqueIdFromString("some-id");
auto queryId = c.registry_.uniqueIdFromString("some-id", "my-query");
ASSERT_TRUE(queryId.has_value());
auto cancellationHandle =
c.registry_.getCancellationHandle(queryId->toQueryId());
Expand Down Expand Up @@ -353,7 +353,7 @@ ASYNC_TEST(WebSocketSession, verifyCancelOnCloseStringTriggersCancellation) {
ASYNC_TEST(WebSocketSession, verifyWithoutClientActionNoCancelDoesHappen) {
auto c = co_await createTestContainer(ioContext);

auto queryId = c.registry_.uniqueIdFromString("some-id");
auto queryId = c.registry_.uniqueIdFromString("some-id", "my-query");
ASSERT_TRUE(queryId.has_value());
auto cancellationHandle =
c.registry_.getCancellationHandle(queryId->toQueryId());
Expand Down

0 comments on commit 0b0ea70

Please sign in to comment.