Skip to content

Commit

Permalink
Change the interface so source_location is used instead of manual string
Browse files Browse the repository at this point in the history
  • Loading branch information
RobinTF committed Jan 31, 2024
1 parent ea905cc commit 79ca6d5
Show file tree
Hide file tree
Showing 22 changed files with 133 additions and 155 deletions.
4 changes: 2 additions & 2 deletions src/engine/Operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@ shared_ptr<const ResultTable> Operation::getResult(bool isRoot,
}
});
auto computeLambda = [this, &timer] {
checkCancellation([this]() { return "Before " + getDescriptor(); });
checkCancellation();
runtimeInfo().status_ = RuntimeInformation::Status::inProgress;
signalQueryUpdate();
ResultTable result = computeResult();

checkCancellation([this]() { return "After " + getDescriptor(); });
checkCancellation();
// Compute the datatypes that occur in each column of the result.
// Also assert, that if a column contains UNDEF values, then the
// `mightContainUndef` flag for that columns is set.
Expand Down
12 changes: 3 additions & 9 deletions src/engine/Operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,15 +222,9 @@ class Operation {
// as lightweight as possible because of that. The `detailSupplier` allows to
// pass a message to add to any potential exception that might be thrown.
AD_ALWAYS_INLINE void checkCancellation(
const ad_utility::InvocableWithConvertibleReturnType<
std::string_view> auto& detailSupplier) const {
cancellationHandle_->throwIfCancelled(detailSupplier);
}

// Same as checkCancellation, but with the descriptor of this operation
// as string.
AD_ALWAYS_INLINE void checkCancellation() const {
cancellationHandle_->throwIfCancelled(&Operation::getDescriptor, this);
ad_utility::source_location location =
ad_utility::source_location::current()) const {
cancellationHandle_->throwIfCancelled(location);
}

std::chrono::milliseconds remainingTime() const;
Expand Down
11 changes: 6 additions & 5 deletions src/engine/QueryPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1109,7 +1109,7 @@ vector<QueryPlanner::SubtreePlan> QueryPlanner::merge(
candidates[getPruningKey(plan, plan._qet->resultSortedOn())]
.emplace_back(std::move(plan));
}
cancellationHandle_->throwIfCancelled("Creating join candidates");
checkCancellation();
}
}

Expand All @@ -1126,7 +1126,7 @@ vector<QueryPlanner::SubtreePlan> QueryPlanner::merge(
(void)key; // silence unused warning
size_t minIndex = findCheapestExecutionTree(value);
prunedPlans.push_back(std::move(value[minIndex]));
cancellationHandle_->throwIfCancelled("Pruning candidates");
checkCancellation();
}
};

Expand Down Expand Up @@ -1330,7 +1330,7 @@ QueryPlanner::runDynamicProgrammingOnConnectedComponent(
<< std::endl;
dpTab.emplace_back(vector<SubtreePlan>());
for (size_t i = 1; i * 2 <= k; ++i) {
cancellationHandle_->throwIfCancelled("QueryPlanner producing plans");
checkCancellation();
auto newPlans = merge(dpTab[i - 1], dpTab[k - i - 1], tg);
dpTab[k - 1].insert(dpTab[k - 1].end(), newPlans.begin(), newPlans.end());
applyFiltersIfPossible(dpTab.back(), filters, false);
Expand Down Expand Up @@ -1943,6 +1943,7 @@ std::vector<size_t> QueryPlanner::QueryGraph::dfsForAllNodes() {
}

// _______________________________________________________________
void QueryPlanner::checkCancellation() const {
cancellationHandle_->throwIfCancelled("Query planning");
void QueryPlanner::checkCancellation(
ad_utility::source_location location) const {
cancellationHandle_->throwIfCancelled(location);
}
3 changes: 2 additions & 1 deletion src/engine/QueryPlanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -460,5 +460,6 @@ class QueryPlanner {

/// Helper function to check if the assigned `cancellationHandle_` has
/// been cancelled yet.
void checkCancellation() const;
void checkCancellation(ad_utility::source_location location =
ad_utility::source_location::current()) const;
};
4 changes: 2 additions & 2 deletions src/engine/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -807,11 +807,11 @@ net::awaitable<Server::PlannedQuery> Server::parseAndPlan(
[&query, &qec, enablePatternTrick = enablePatternTrick_,
handle = std::move(handle), timeLimit]() mutable {
auto pq = SparqlParser::parseQuery(query);
handle->throwIfCancelled("After parsing");
handle->throwIfCancelled();
QueryPlanner qp(&qec, handle);
qp.setEnablePatternTrick(enablePatternTrick);
auto qet = qp.createExecutionTree(pq);
handle->throwIfCancelled("After query planning");
handle->throwIfCancelled();
PlannedQuery plannedQuery{std::move(pq), std::move(qet)};

plannedQuery.queryExecutionTree_.getRootOperation()
Expand Down
18 changes: 9 additions & 9 deletions src/engine/sparqlExpressions/AggregateExpression.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,14 @@ class AggregateExpression : public SparqlExpression {
// The operands *without* applying the `valueGetter`.
auto operands =
makeGenerator(std::forward<Operand>(operand), inputSize, context);

auto impl = [&valueGetter, context, &finalOperation,
&callFunction](auto&& inputs) {
auto checkCancellation = [context]() {
context->cancellationHandle_->throwIfCancelled(
"AggregateExpression evaluate on child operand");
};
auto checkCancellation =
[context](ad_utility::source_location location =
ad_utility::source_location::current()) {
context->cancellationHandle_->throwIfCancelled(location);
};

auto impl = [&valueGetter, context, &finalOperation, &callFunction,
&checkCancellation](auto&& inputs) {
auto it = inputs.begin();
AD_CORRECTNESS_CHECK(it != inputs.end());

Expand All @@ -144,8 +145,7 @@ class AggregateExpression : public SparqlExpression {
if (distinct) {
auto uniqueValues =
getUniqueElements(context, inputSize, std::move(operands));
context->cancellationHandle_->throwIfCancelled(
"AggregateExpression after filtering unique values");
checkCancellation();
return impl(std::move(uniqueValues));
} else {
return impl(std::move(operands));
Expand Down
18 changes: 5 additions & 13 deletions src/engine/sparqlExpressions/ConditionalExpressions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,12 @@ class CoalesceExpression : public VariadicExpression {
ad_utility::chunkedForLoop<CHUNK_SIZE>(
0, ctx->size(),
[&unboundIndices](size_t i) { unboundIndices.push_back(i); },
[ctx]() {
ctx->cancellationHandle_->throwIfCancelled("CoalesceExpression");
});
[ctx]() { ctx->cancellationHandle_->throwIfCancelled(); });
VectorWithMemoryLimit<IdOrString> result{ctx->_allocator};
std::fill_n(std::back_inserter(result), ctx->size(),
IdOrString{Id::makeUndefined()});

ctx->cancellationHandle_->throwIfCancelled("CoalesceExpression");
ctx->cancellationHandle_->throwIfCancelled();

auto isUnbound = [](const IdOrString& x) {
return (std::holds_alternative<Id>(x) &&
Expand All @@ -83,10 +81,7 @@ class CoalesceExpression : public VariadicExpression {
DISABLE_UNINITIALIZED_WARNINGS
result[unboundIndices[idx]] = constantResult;
},
[ctx]() {
ctx->cancellationHandle_->throwIfCancelled(
"CoalesceExpression constant expression result");
});
[ctx]() { ctx->cancellationHandle_->throwIfCancelled(); });
};
ENABLE_UNINITIALIZED_WARNINGS

Expand Down Expand Up @@ -125,10 +120,7 @@ class CoalesceExpression : public VariadicExpression {
}
++generatorIterator;
},
[ctx]() {
ctx->cancellationHandle_->throwIfCancelled(
"CoalesceExpression vector expression result");
});
[ctx]() { ctx->cancellationHandle_->throwIfCancelled(); });
};
auto visitExpressionResult =
[
Expand All @@ -150,7 +142,7 @@ class CoalesceExpression : public VariadicExpression {
std::visit(visitExpressionResult, child->evaluate(ctx));
unboundIndices = std::move(nextUnboundIndices);
nextUnboundIndices.clear();
ctx->cancellationHandle_->throwIfCancelled("CoalesceExpression");
ctx->cancellationHandle_->throwIfCancelled();
// Early stopping if no more unbound result remain.
if (unboundIndices.empty()) {
break;
Expand Down
5 changes: 2 additions & 3 deletions src/engine/sparqlExpressions/GroupConcatExpression.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,13 @@ class GroupConcatExpression : public SparqlExpression {
}
result.append(s.value());
}
context->cancellationHandle_->throwIfCancelled(
"GroupConcatExpression");
context->cancellationHandle_->throwIfCancelled();
}
};
auto generator =
detail::makeGenerator(AD_FWD(el), context->size(), context);
if (distinct_) {
context->cancellationHandle_->throwIfCancelled("GroupConcatExpression");
context->cancellationHandle_->throwIfCancelled();
groupConcatImpl(detail::getUniqueElements(context, context->size(),
std::move(generator)));
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/engine/sparqlExpressions/LiteralExpression.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class LiteralExpression : public SparqlExpression {
auto ptrForCache = std::make_unique<IdOrString>(result);
ptrForCache.reset(std::atomic_exchange_explicit(
&cachedResult_, ptrForCache.release(), std::memory_order_relaxed));
context->cancellationHandle_->throwIfCancelled("LiteralExpression");
context->cancellationHandle_->throwIfCancelled();
return result;
};
if constexpr (std::is_same_v<TripleComponent::Literal, T>) {
Expand Down
2 changes: 1 addition & 1 deletion src/engine/sparqlExpressions/RandomExpression.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class RandomExpression : public SparqlExpression {
result.push_back(Id::makeFromInt(randInt() >> Id::numDatatypeBits));
},
[context]() {
context->cancellationHandle_->throwIfCancelled("RandomExpression");
context->cancellationHandle_->throwIfCancelled();
});
return result;
}
Expand Down
5 changes: 3 additions & 2 deletions src/engine/sparqlExpressions/RegexExpression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,9 @@ auto RegexExpression::getEstimatesForFilterExpression(

// ____________________________________________________________________________
void RegexExpression::checkCancellation(
const sparqlExpression::EvaluationContext* context) {
context->cancellationHandle_->throwIfCancelled("RegexExpression");
const sparqlExpression::EvaluationContext* context,
ad_utility::source_location location) {
context->cancellationHandle_->throwIfCancelled(location);
}

} // namespace sparqlExpression
4 changes: 3 additions & 1 deletion src/engine/sparqlExpressions/RegexExpression.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ class RegexExpression : public SparqlExpression {
/// Helper function to check if the `CancellationHandle` of the passed
/// `EvaluationContext` has been cancelled.
static void checkCancellation(
const sparqlExpression::EvaluationContext* context);
const sparqlExpression::EvaluationContext* context,
ad_utility::source_location location =
ad_utility::source_location::current());
};
namespace detail {
// Check if `regex` is a prefix regex which means that it starts with `^` and
Expand Down
4 changes: 2 additions & 2 deletions src/engine/sparqlExpressions/RelationalExpressions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ requires AreComparable<S1, S2> ExpressionResult evaluateRelationalExpression(
context);
}
}
context->cancellationHandle_->throwIfCancelled("RelationalExpression");
context->cancellationHandle_->throwIfCancelled();
return std::nullopt;
};
std::optional<ExpressionResult> resultFromBinarySearch;
Expand Down Expand Up @@ -196,7 +196,7 @@ requires AreComparable<S1, S2> ExpressionResult evaluateRelationalExpression(
ad_utility::visitWithVariantsAndParameters(impl, base(*itA), base(*itB));
++itA;
++itB;
context->cancellationHandle_->throwIfCancelled("RelationalExpression");
context->cancellationHandle_->throwIfCancelled();
}

if constexpr (resultIsConstant) {
Expand Down
2 changes: 1 addition & 1 deletion src/engine/sparqlExpressions/SparqlExpressionGenerators.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ inline auto valueGetterGenerator = []<typename ValueGetter, SingleExpressionResu
Input&& input, ValueGetter&& valueGetter) {
auto transformation = [ context, valueGetter ]<typename I>(I && i)
requires std::invocable<ValueGetter, I&&, EvaluationContext*> {
context->cancellationHandle_->throwIfCancelled("Value getter generator");
context->cancellationHandle_->throwIfCancelled();
return valueGetter(AD_FWD(i), context);
};
return makeGenerator(std::forward<Input>(input), numElements, context, transformation);
Expand Down
4 changes: 2 additions & 2 deletions src/engine/sparqlExpressions/StringExpressions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,11 +341,11 @@ class ConcatExpression : public detail::VariadicExpression {
str.has_value()) {
resultAsVec[i].append(str.value());
}
ctx->cancellationHandle_->throwIfCancelled("ConcatExpression");
ctx->cancellationHandle_->throwIfCancelled();
++i;
}
}
ctx->cancellationHandle_->throwIfCancelled("ConcatExpression");
ctx->cancellationHandle_->throwIfCancelled();
};
std::ranges::for_each(
childrenVec(), [&ctx, &visitSingleExpressionResult](const auto& child) {
Expand Down
20 changes: 10 additions & 10 deletions src/index/CompressedRelation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ CompressedRelationReader::asyncParallelBlockGenerator(
std::mutex blockIteratorMutex;
auto readAndDecompressBlock =
[&]() -> std::optional<std::pair<size_t, DecompressedBlock>> {
checkCancellation(cancellationHandle);
cancellationHandle->throwIfCancelled();
std::unique_lock lock{blockIteratorMutex};
if (blockIterator == endBlock) {
return std::nullopt;
Expand Down Expand Up @@ -77,7 +77,7 @@ CompressedRelationReader::asyncParallelBlockGenerator(
queueSize, numThreads, readAndDecompressBlock);
for (IdTable& block : queue) {
popTimer.stop();
checkCancellation(cancellationHandle);
cancellationHandle->throwIfCancelled();
++details.numBlocksRead_;
details.numElementsRead_ += block.numRows();
co_yield block;
Expand Down Expand Up @@ -110,7 +110,7 @@ CompressedRelationReader::IdTableGenerator CompressedRelationReader::lazyScan(
auto getIncompleteBlock = [&](auto it) {
auto result = readPossiblyIncompleteBlock(metadata, col1Id, *it,
std::ref(details), columnIndices);
checkCancellation(cancellationHandle);
cancellationHandle->throwIfCancelled();
return result;
};

Expand Down Expand Up @@ -327,13 +327,13 @@ IdTable CompressedRelationReader::scan(
firstBlockResult = readIncompleteBlock(*beginBlock);
totalResultSize += firstBlockResult.value().size();
++beginBlock;
checkCancellation(cancellationHandle);
cancellationHandle->throwIfCancelled();
}
if (beginBlock < endBlock) {
lastBlockResult = readIncompleteBlock(*(endBlock - 1));
totalResultSize += lastBlockResult.value().size();
endBlock--;
checkCancellation(cancellationHandle);
cancellationHandle->throwIfCancelled();
}

// Determine the total size of the result.
Expand All @@ -343,7 +343,7 @@ IdTable CompressedRelationReader::scan(
return count + block.numRows_;
});
result.resize(totalResultSize);
checkCancellation(cancellationHandle);
cancellationHandle->throwIfCancelled();

size_t rowIndexOfNextBlockStart = 0;
// Lambda that appends a possibly incomplete block (the first or last block)
Expand All @@ -365,7 +365,7 @@ IdTable CompressedRelationReader::scan(
};

addIncompleteBlockIfExists(firstBlockResult);
checkCancellation(cancellationHandle);
cancellationHandle->throwIfCancelled();

// Insert the complete blocks from the middle in parallel
if (beginBlock < endBlock) {
Expand Down Expand Up @@ -394,7 +394,7 @@ IdTable CompressedRelationReader::scan(
// block in parallel
#pragma omp task
{
if (!cancellationHandle->isCancelled("CompressedRelation scan")) {
if (!cancellationHandle->isCancelled()) {
decompressLambda();
}
}
Expand All @@ -403,11 +403,11 @@ IdTable CompressedRelationReader::scan(
rowIndexOfNextBlockStart += block.numRows_;
} // end of parallel region
}
checkCancellation(cancellationHandle);
cancellationHandle->throwIfCancelled();
// Add the last block.
addIncompleteBlockIfExists(lastBlockResult);
AD_CORRECTNESS_CHECK(rowIndexOfNextBlockStart == result.size());
checkCancellation(cancellationHandle);
cancellationHandle->throwIfCancelled();
return result;
}

Expand Down
9 changes: 0 additions & 9 deletions src/index/CompressedRelation.h
Original file line number Diff line number Diff line change
Expand Up @@ -531,15 +531,6 @@ class CompressedRelationReader {
auto beginBlock, auto endBlock, ColumnIndices columnIndices,
ad_utility::SharedCancellationHandle cancellationHandle) const;

// A helper function to abstract away the timeout check:
static void checkCancellation(
const ad_utility::SharedCancellationHandle& cancellationHandle) {
// Not really expensive but since this should be called
// very often, try to avoid any extra checks.
AD_EXPENSIVE_CHECK(cancellationHandle);
cancellationHandle->throwIfCancelled("IndexScan"sv);
}

// Return a vector that consists of the concatenation of `baseColumns` and
// `additionalColumns`
static std::vector<ColumnIndex> prepareColumnIndices(
Expand Down
2 changes: 1 addition & 1 deletion src/index/IndexImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1455,7 +1455,7 @@ IdTable IndexImpl::scan(
: std::nullopt;
if (!col0Id.has_value() || (col1String.has_value() && !col1Id.has_value())) {
size_t numColumns = col1String.has_value() ? 1 : 2;
cancellationHandle->throwIfCancelled("IndexImpl scan");
cancellationHandle->throwIfCancelled();
return IdTable{numColumns, allocator_};
}
return scan(col0Id.value(), col1Id, permutation, additionalColumns,
Expand Down
2 changes: 1 addition & 1 deletion src/index/Permutation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ IdTable Permutation::scan(

if (!meta_.col0IdExists(col0Id)) {
size_t numColumns = col1Id.has_value() ? 1 : 2;
cancellationHandle->throwIfCancelled("Permutation scan");
cancellationHandle->throwIfCancelled();
return IdTable{numColumns, reader().allocator()};
}
const auto& metaData = meta_.getMetaData(col0Id);
Expand Down
Loading

0 comments on commit 79ca6d5

Please sign in to comment.