Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Also store the actual query alongside its ID. #1256

Merged
merged 3 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions src/engine/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,11 @@ Awaitable<void> Server::process(
} else if (auto cmd =
checkParameter("cmd", "dump-active-queries", accessTokenOk)) {
logCommand(cmd, "dump active queries");
response = createJsonResponse(queryRegistry_.getActiveQueries(), request);
nlohmann::json json;
for (auto& [key, value] : queryRegistry_.getActiveQueries()) {
json[nlohmann::json(key)] = std::move(value);
}
RobinTF marked this conversation as resolved.
Show resolved Hide resolved
response = createJsonResponse(json, request);
}

// Ping with or without messsage.
Expand Down Expand Up @@ -488,13 +492,15 @@ class QueryAlreadyInUseError : public std::runtime_error {
// _____________________________________________

ad_utility::websocket::OwningQueryId Server::getQueryId(
const ad_utility::httpUtils::HttpRequest auto& request) {
const ad_utility::httpUtils::HttpRequest auto& request,
const std::string& query) {
using ad_utility::websocket::OwningQueryId;
std::string_view queryIdHeader = request.base()["Query-Id"];
if (queryIdHeader.empty()) {
return queryRegistry_.uniqueId();
return queryRegistry_.uniqueId(query);
}
auto queryId = queryRegistry_.uniqueIdFromString(std::string(queryIdHeader));
auto queryId =
queryRegistry_.uniqueIdFromString(std::string(queryIdHeader), query);
if (!queryId) {
throw QueryAlreadyInUseError{queryIdHeader};
}
Expand Down Expand Up @@ -637,7 +643,7 @@ boost::asio::awaitable<void> Server::processQuery(
auto queryHub = queryHub_.lock();
AD_CORRECTNESS_CHECK(queryHub);
auto messageSender = co_await ad_utility::websocket::MessageSender::create(
getQueryId(request), *queryHub);
getQueryId(request, query), *queryHub);
// Do the query planning. This creates a `QueryExecutionTree`, which will
// then be used to process the query.
//
Expand Down
4 changes: 3 additions & 1 deletion src/engine/Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,13 @@ class Server {
/// `QueryAlreadyInUseError` exception is thrown.
///
/// \param request The HTTP request to extract the id from.
/// \param query A string representation of the query to register an id for.
///
/// \return An OwningQueryId object. It removes itself from the registry
/// on destruction.
ad_utility::websocket::OwningQueryId getQueryId(
const ad_utility::httpUtils::HttpRequest auto& request);
const ad_utility::httpUtils::HttpRequest auto& request,
const std::string& query);

/// Schedule a task to trigger the timeout after the `timeLimit`.
/// The returned callback can be used to prevent this task from executing
Expand Down
28 changes: 18 additions & 10 deletions src/util/http/websocket/QueryId.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,12 @@ static_assert(!std::is_copy_assignable_v<OwningQueryId>);

/// A factory class to create unique query ids within each individual instance.
class QueryRegistry {
struct CancellationHandleWithQuery {
SharedCancellationHandle cancellationHandle_;
std::string query_;
RobinTF marked this conversation as resolved.
Show resolved Hide resolved
};
using SynchronizedType = ad_utility::Synchronized<
ad_utility::HashMap<QueryId, SharedCancellationHandle>>;
ad_utility::HashMap<QueryId, CancellationHandleWithQuery>>;
// Technically no shared pointer is required because the registry lives
// for the entire lifetime of the application, but since the instances of
// `OwningQueryId` need to deregister themselves again they need some
Expand All @@ -87,11 +91,14 @@ class QueryRegistry {
/// \return A std::optional<OwningQueryId> object wrapping the passed string
/// if it was not present in the registry before. An empty
/// std::optional if the id already existed before.
std::optional<OwningQueryId> uniqueIdFromString(std::string id) {
std::optional<OwningQueryId> uniqueIdFromString(std::string id,
const std::string& query) {
auto queryId = QueryId::idFromString(std::move(id));
bool success =
registry_->wlock()
->emplace(queryId, std::make_shared<CancellationHandle<>>())
->emplace(queryId,
CancellationHandleWithQuery{
std::make_shared<CancellationHandle<>>(), query})
RobinTF marked this conversation as resolved.
Show resolved Hide resolved
.second;
if (success) {
// Avoid undefined behavior when the registry is no longer alive at the
Expand All @@ -111,25 +118,26 @@ class QueryRegistry {
}

/// Generates a unique pseudo-random OwningQueryId object for this registry
OwningQueryId uniqueId() {
/// and associates it with the given query.
OwningQueryId uniqueId(const std::string& query) {
static thread_local std::mt19937 generator(std::random_device{}());
std::uniform_int_distribution<uint64_t> distrib{};
std::optional<OwningQueryId> result;
do {
result = uniqueIdFromString(std::to_string(distrib(generator)));
result = uniqueIdFromString(std::to_string(distrib(generator)), query);
} while (!result.has_value());
return std::move(result.value());
}

/// Member function that acquires a read lock and returns a vector
/// of all currently registered queries.
std::vector<QueryId> getActiveQueries() const {
ad_utility::HashMap<QueryId, std::string> getActiveQueries() const {
return registry_->withReadLock([](const auto& map) {
// TODO<C++23> Use `ranges::to` to transform map keys into vector
std::vector<QueryId> result;
// TODO<C++23> Use `ranges::to` to transform map keys into map
ad_utility::HashMap<QueryId, std::string> result;
result.reserve(map.size());
for (const auto& entry : map) {
result.emplace_back(entry.first);
result.emplace(entry.first, entry.second.query_);
}
return result;
});
Expand All @@ -140,7 +148,7 @@ class QueryRegistry {
SharedCancellationHandle getCancellationHandle(const QueryId& queryId) const {
return registry_->withReadLock([&queryId](const auto& map) {
auto it = map.find(queryId);
return it != map.end() ? it->second : nullptr;
return it != map.end() ? it->second.cancellationHandle_ : nullptr;
});
}
};
Expand Down
6 changes: 3 additions & 3 deletions test/MessageSenderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ using ::testing::VariantWith;

ASYNC_TEST(MessageSender, destructorCallsSignalEnd) {
QueryRegistry queryRegistry;
OwningQueryId queryId = queryRegistry.uniqueId();
OwningQueryId queryId = queryRegistry.uniqueId("my-query");
QueryHub queryHub{ioContext};

auto distributor = co_await queryHub.createOrAcquireDistributorForReceiving(
Expand All @@ -47,7 +47,7 @@ ASYNC_TEST(MessageSender, destructorCallsSignalEnd) {

ASYNC_TEST(MessageSender, callingOperatorBroadcastsPayload) {
QueryRegistry queryRegistry;
OwningQueryId queryId = queryRegistry.uniqueId();
OwningQueryId queryId = queryRegistry.uniqueId("my-query");
QueryHub queryHub{ioContext};

{
Expand Down Expand Up @@ -85,7 +85,7 @@ ASYNC_TEST(MessageSender, callingOperatorBroadcastsPayload) {

ASYNC_TEST(MessageSender, testGetQueryIdGetterWorks) {
QueryRegistry queryRegistry;
OwningQueryId queryId = queryRegistry.uniqueId();
OwningQueryId queryId = queryRegistry.uniqueId("my-query");
QueryId reference = queryId.toQueryId();
QueryHub queryHub{ioContext};

Expand Down
45 changes: 27 additions & 18 deletions test/QueryIdTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
using ad_utility::websocket::OwningQueryId;
using ad_utility::websocket::QueryId;
using ad_utility::websocket::QueryRegistry;
using ::testing::ContainerEq;
using ::testing::ElementsAre;
using ::testing::IsEmpty;
using ::testing::UnorderedElementsAre;

TEST(QueryId, checkIdEqualityRelation) {
auto queryIdOne = QueryId::idFromString("some-id");
Expand Down Expand Up @@ -53,8 +53,8 @@ TEST(QueryId, veriyToJsonWorks) {

TEST(QueryRegistry, verifyUniqueIdProvidesUniqueIds) {
QueryRegistry registry{};
auto queryIdOne = registry.uniqueId();
auto queryIdTwo = registry.uniqueId();
auto queryIdOne = registry.uniqueId("my-query");
auto queryIdTwo = registry.uniqueId("my-query");

EXPECT_NE(queryIdOne.toQueryId(), queryIdTwo.toQueryId());
}
Expand All @@ -63,8 +63,10 @@ TEST(QueryRegistry, verifyUniqueIdProvidesUniqueIds) {

TEST(QueryRegistry, verifyUniqueIdFromStringEnforcesUniqueness) {
QueryRegistry registry{};
auto optionalQueryIdOne = registry.uniqueIdFromString("01123581321345589144");
auto optionalQueryIdTwo = registry.uniqueIdFromString("01123581321345589144");
auto optionalQueryIdOne =
registry.uniqueIdFromString("01123581321345589144", "my-query");
auto optionalQueryIdTwo =
registry.uniqueIdFromString("01123581321345589144", "my-query");

EXPECT_TRUE(optionalQueryIdOne.has_value());
EXPECT_FALSE(optionalQueryIdTwo.has_value());
Expand All @@ -75,11 +77,13 @@ TEST(QueryRegistry, verifyUniqueIdFromStringEnforcesUniqueness) {
TEST(QueryRegistry, verifyIdIsUnregisteredAfterUse) {
QueryRegistry registry{};
{
auto optionalQueryId = registry.uniqueIdFromString("01123581321345589144");
auto optionalQueryId =
registry.uniqueIdFromString("01123581321345589144", "my-query");
EXPECT_TRUE(optionalQueryId.has_value());
}
{
auto optionalQueryId = registry.uniqueIdFromString("01123581321345589144");
auto optionalQueryId =
registry.uniqueIdFromString("01123581321345589144", "my-query");
EXPECT_TRUE(optionalQueryId.has_value());
}
}
Expand All @@ -89,8 +93,10 @@ TEST(QueryRegistry, verifyIdIsUnregisteredAfterUse) {
TEST(QueryRegistry, demonstrateRegistryLocalUniqueness) {
QueryRegistry registryOne{};
QueryRegistry registryTwo{};
auto optQidOne = registryOne.uniqueIdFromString("01123581321345589144");
auto optQidTwo = registryTwo.uniqueIdFromString("01123581321345589144");
auto optQidOne =
registryOne.uniqueIdFromString("01123581321345589144", "my-query");
auto optQidTwo =
registryTwo.uniqueIdFromString("01123581321345589144", "my-query");
ASSERT_TRUE(optQidOne.has_value());
ASSERT_TRUE(optQidTwo.has_value());
// The QueryId object doesn't know anything about registries,
Expand All @@ -106,15 +112,15 @@ TEST(QueryRegistry, performCleanupFromDestroyedRegistry) {
std::unique_ptr<OwningQueryId> holder;
{
QueryRegistry registry{};
holder = std::make_unique<OwningQueryId>(registry.uniqueId());
holder = std::make_unique<OwningQueryId>(registry.uniqueId("my-query"));
}
}

// _____________________________________________________________________________

TEST(QueryRegistry, verifyCancellationHandleIsCreated) {
QueryRegistry registry{};
auto queryId = registry.uniqueId();
auto queryId = registry.uniqueId("my-query");

auto handle1 = registry.getCancellationHandle(queryId.toQueryId());
auto handle2 = registry.getCancellationHandle(queryId.toQueryId());
Expand All @@ -138,24 +144,27 @@ TEST(QueryRegistry, verifyCancellationHandleIsNullptrIfNotPresent) {
// _____________________________________________________________________________

TEST(QueryRegistry, verifyGetActiveQueriesReturnsAllActiveQueries) {
using MapType = ad_utility::HashMap<QueryId, std::string>;
QueryRegistry registry{};

EXPECT_THAT(registry.getActiveQueries(), IsEmpty());

{
auto queryId1 = registry.uniqueId();
auto queryId1 = registry.uniqueId("my-query");

EXPECT_THAT(registry.getActiveQueries(), ElementsAre(queryId1.toQueryId()));
EXPECT_THAT(registry.getActiveQueries(),
ContainerEq(MapType{{queryId1.toQueryId(), "my-query"}}));

{
auto queryId2 = registry.uniqueId();
auto queryId2 = registry.uniqueId("other-query");

EXPECT_THAT(
registry.getActiveQueries(),
UnorderedElementsAre(queryId1.toQueryId(), queryId2.toQueryId()));
EXPECT_THAT(registry.getActiveQueries(),
ContainerEq(MapType{{queryId1.toQueryId(), "my-query"},
{queryId2.toQueryId(), "other-query"}}));
}

EXPECT_THAT(registry.getActiveQueries(), ElementsAre(queryId1.toQueryId()));
EXPECT_THAT(registry.getActiveQueries(),
ContainerEq(MapType{{queryId1.toQueryId(), "my-query"}}));
}

EXPECT_THAT(registry.getActiveQueries(), IsEmpty());
Expand Down
6 changes: 3 additions & 3 deletions test/WebSocketSessionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ ASYNC_TEST(WebSocketSession, verifySessionEndsWhenServerIsDoneSending) {
ASYNC_TEST(WebSocketSession, verifyCancelStringTriggersCancellation) {
auto c = co_await createTestContainer(ioContext);

auto queryId = c.registry_.uniqueIdFromString("some-id");
auto queryId = c.registry_.uniqueIdFromString("some-id", "my-query");
ASSERT_TRUE(queryId.has_value());
auto cancellationHandle =
c.registry_.getCancellationHandle(queryId->toQueryId());
Expand Down Expand Up @@ -285,7 +285,7 @@ ASYNC_TEST(WebSocketSession, verifyWrongExecutorConfigThrows) {
ASYNC_TEST(WebSocketSession, verifyCancelOnCloseStringTriggersCancellation) {
auto c = co_await createTestContainer(ioContext);

auto queryId = c.registry_.uniqueIdFromString("some-id");
auto queryId = c.registry_.uniqueIdFromString("some-id", "my-query");
ASSERT_TRUE(queryId.has_value());
auto cancellationHandle =
c.registry_.getCancellationHandle(queryId->toQueryId());
Expand Down Expand Up @@ -353,7 +353,7 @@ ASYNC_TEST(WebSocketSession, verifyCancelOnCloseStringTriggersCancellation) {
ASYNC_TEST(WebSocketSession, verifyWithoutClientActionNoCancelDoesHappen) {
auto c = co_await createTestContainer(ioContext);

auto queryId = c.registry_.uniqueIdFromString("some-id");
auto queryId = c.registry_.uniqueIdFromString("some-id", "my-query");
ASSERT_TRUE(queryId.has_value());
auto cancellationHandle =
c.registry_.getCancellationHandle(queryId->toQueryId());
Expand Down
Loading