Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Print the source_location on missed timeout checks. #1254

Merged
merged 11 commits into from
Feb 6, 2024
4 changes: 2 additions & 2 deletions src/engine/Operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@ shared_ptr<const ResultTable> Operation::getResult(bool isRoot,
}
});
auto computeLambda = [this, &timer] {
checkCancellation([this]() { return "Before " + getDescriptor(); });
checkCancellation();
runtimeInfo().status_ = RuntimeInformation::Status::inProgress;
signalQueryUpdate();
ResultTable result = computeResult();

checkCancellation([this]() { return "After " + getDescriptor(); });
checkCancellation();
RobinTF marked this conversation as resolved.
Show resolved Hide resolved
// Compute the datatypes that occur in each column of the result.
// Also assert, that if a column contains UNDEF values, then the
// `mightContainUndef` flag for that columns is set.
Expand Down
15 changes: 4 additions & 11 deletions src/engine/Operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,18 +219,11 @@ class Operation {
// Check if the cancellation flag has been set and throw an exception if
// that's the case. This will be called at strategic places on code that
// potentially can take a (too) long time. This function is designed to be
// as lightweight as possible because of that. The `detailSupplier` allows to
// pass a message to add to any potential exception that might be thrown.
// as lightweight as possible because of that.
AD_ALWAYS_INLINE void checkCancellation(
const ad_utility::InvocableWithConvertibleReturnType<
std::string_view> auto& detailSupplier) const {
cancellationHandle_->throwIfCancelled(detailSupplier);
}

// Same as checkCancellation, but with the descriptor of this operation
// as string.
AD_ALWAYS_INLINE void checkCancellation() const {
cancellationHandle_->throwIfCancelled(&Operation::getDescriptor, this);
ad_utility::source_location location =
ad_utility::source_location::current()) const {
cancellationHandle_->throwIfCancelled(location);
}

std::chrono::milliseconds remainingTime() const;
Expand Down
11 changes: 6 additions & 5 deletions src/engine/QueryPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1109,7 +1109,7 @@ vector<QueryPlanner::SubtreePlan> QueryPlanner::merge(
candidates[getPruningKey(plan, plan._qet->resultSortedOn())]
.emplace_back(std::move(plan));
}
cancellationHandle_->throwIfCancelled("Creating join candidates");
checkCancellation();
}
}

Expand All @@ -1126,7 +1126,7 @@ vector<QueryPlanner::SubtreePlan> QueryPlanner::merge(
(void)key; // silence unused warning
size_t minIndex = findCheapestExecutionTree(value);
prunedPlans.push_back(std::move(value[minIndex]));
cancellationHandle_->throwIfCancelled("Pruning candidates");
checkCancellation();
}
};

Expand Down Expand Up @@ -1330,7 +1330,7 @@ QueryPlanner::runDynamicProgrammingOnConnectedComponent(
<< std::endl;
dpTab.emplace_back(vector<SubtreePlan>());
for (size_t i = 1; i * 2 <= k; ++i) {
cancellationHandle_->throwIfCancelled("QueryPlanner producing plans");
checkCancellation();
auto newPlans = merge(dpTab[i - 1], dpTab[k - i - 1], tg);
dpTab[k - 1].insert(dpTab[k - 1].end(), newPlans.begin(), newPlans.end());
applyFiltersIfPossible(dpTab.back(), filters, false);
Expand Down Expand Up @@ -1943,6 +1943,7 @@ std::vector<size_t> QueryPlanner::QueryGraph::dfsForAllNodes() {
}

// _______________________________________________________________
void QueryPlanner::checkCancellation() const {
cancellationHandle_->throwIfCancelled("Query planning");
void QueryPlanner::checkCancellation(
ad_utility::source_location location) const {
cancellationHandle_->throwIfCancelled(location);
}
5 changes: 3 additions & 2 deletions src/engine/QueryPlanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ class QueryPlanner {
[[nodiscard]] bool isInTestMode() const { return _qec == nullptr; }

/// Helper function to check if the assigned `cancellationHandle_` has
/// been cancelled yet.
void checkCancellation() const;
/// been cancelled yet and throw an exception if this is the case.
void checkCancellation(ad_utility::source_location location =
ad_utility::source_location::current()) const;
};
4 changes: 2 additions & 2 deletions src/engine/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -807,11 +807,11 @@ net::awaitable<Server::PlannedQuery> Server::parseAndPlan(
[&query, &qec, enablePatternTrick = enablePatternTrick_,
handle = std::move(handle), timeLimit]() mutable {
auto pq = SparqlParser::parseQuery(query);
handle->throwIfCancelled("After parsing");
handle->throwIfCancelled();
QueryPlanner qp(&qec, handle);
qp.setEnablePatternTrick(enablePatternTrick);
auto qet = qp.createExecutionTree(pq);
handle->throwIfCancelled("After query planning");
handle->throwIfCancelled();
PlannedQuery plannedQuery{std::move(pq), std::move(qet)};

plannedQuery.queryExecutionTree_.getRootOperation()
Expand Down
18 changes: 9 additions & 9 deletions src/engine/sparqlExpressions/AggregateExpression.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,14 @@ class AggregateExpression : public SparqlExpression {
// The operands *without* applying the `valueGetter`.
auto operands =
makeGenerator(std::forward<Operand>(operand), inputSize, context);

auto impl = [&valueGetter, context, &finalOperation,
&callFunction](auto&& inputs) {
auto checkCancellation = [context]() {
context->cancellationHandle_->throwIfCancelled(
"AggregateExpression evaluate on child operand");
};
auto checkCancellation =
[context](ad_utility::source_location location =
ad_utility::source_location::current()) {
context->cancellationHandle_->throwIfCancelled(location);
};

auto impl = [&valueGetter, context, &finalOperation, &callFunction,
&checkCancellation](auto&& inputs) {
auto it = inputs.begin();
AD_CORRECTNESS_CHECK(it != inputs.end());

Expand All @@ -144,8 +145,7 @@ class AggregateExpression : public SparqlExpression {
if (distinct) {
auto uniqueValues =
getUniqueElements(context, inputSize, std::move(operands));
context->cancellationHandle_->throwIfCancelled(
"AggregateExpression after filtering unique values");
checkCancellation();
return impl(std::move(uniqueValues));
} else {
return impl(std::move(operands));
Expand Down
18 changes: 5 additions & 13 deletions src/engine/sparqlExpressions/ConditionalExpressions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,12 @@ class CoalesceExpression : public VariadicExpression {
ad_utility::chunkedForLoop<CHUNK_SIZE>(
0, ctx->size(),
[&unboundIndices](size_t i) { unboundIndices.push_back(i); },
[ctx]() {
ctx->cancellationHandle_->throwIfCancelled("CoalesceExpression");
});
[ctx]() { ctx->cancellationHandle_->throwIfCancelled(); });
VectorWithMemoryLimit<IdOrString> result{ctx->_allocator};
std::fill_n(std::back_inserter(result), ctx->size(),
IdOrString{Id::makeUndefined()});

ctx->cancellationHandle_->throwIfCancelled("CoalesceExpression");
ctx->cancellationHandle_->throwIfCancelled();

auto isUnbound = [](const IdOrString& x) {
return (std::holds_alternative<Id>(x) &&
Expand All @@ -83,10 +81,7 @@ class CoalesceExpression : public VariadicExpression {
DISABLE_UNINITIALIZED_WARNINGS
result[unboundIndices[idx]] = constantResult;
},
[ctx]() {
ctx->cancellationHandle_->throwIfCancelled(
"CoalesceExpression constant expression result");
});
[ctx]() { ctx->cancellationHandle_->throwIfCancelled(); });
};
ENABLE_UNINITIALIZED_WARNINGS

Expand Down Expand Up @@ -125,10 +120,7 @@ class CoalesceExpression : public VariadicExpression {
}
++generatorIterator;
},
[ctx]() {
ctx->cancellationHandle_->throwIfCancelled(
"CoalesceExpression vector expression result");
});
[ctx]() { ctx->cancellationHandle_->throwIfCancelled(); });
};
auto visitExpressionResult =
[
Expand All @@ -150,7 +142,7 @@ class CoalesceExpression : public VariadicExpression {
std::visit(visitExpressionResult, child->evaluate(ctx));
unboundIndices = std::move(nextUnboundIndices);
nextUnboundIndices.clear();
ctx->cancellationHandle_->throwIfCancelled("CoalesceExpression");
ctx->cancellationHandle_->throwIfCancelled();
// Early stopping if no more unbound result remain.
if (unboundIndices.empty()) {
break;
Expand Down
5 changes: 2 additions & 3 deletions src/engine/sparqlExpressions/GroupConcatExpression.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,13 @@
}
result.append(s.value());
}
context->cancellationHandle_->throwIfCancelled(
"GroupConcatExpression");
context->cancellationHandle_->throwIfCancelled();
}
};
auto generator =
detail::makeGenerator(AD_FWD(el), context->size(), context);
if (distinct_) {
context->cancellationHandle_->throwIfCancelled("GroupConcatExpression");
context->cancellationHandle_->throwIfCancelled();

Check warning on line 49 in src/engine/sparqlExpressions/GroupConcatExpression.h

View check run for this annotation

Codecov / codecov/patch

src/engine/sparqlExpressions/GroupConcatExpression.h#L49

Added line #L49 was not covered by tests
groupConcatImpl(detail::getUniqueElements(context, context->size(),
std::move(generator)));
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/engine/sparqlExpressions/LiteralExpression.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class LiteralExpression : public SparqlExpression {
auto ptrForCache = std::make_unique<IdOrString>(result);
ptrForCache.reset(std::atomic_exchange_explicit(
&cachedResult_, ptrForCache.release(), std::memory_order_relaxed));
context->cancellationHandle_->throwIfCancelled("LiteralExpression");
context->cancellationHandle_->throwIfCancelled();
return result;
};
if constexpr (std::is_same_v<TripleComponent::Literal, T>) {
Expand Down
4 changes: 1 addition & 3 deletions src/engine/sparqlExpressions/RandomExpression.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ class RandomExpression : public SparqlExpression {
[&result, &randInt](size_t) {
result.push_back(Id::makeFromInt(randInt() >> Id::numDatatypeBits));
},
[context]() {
context->cancellationHandle_->throwIfCancelled("RandomExpression");
});
[context]() { context->cancellationHandle_->throwIfCancelled(); });
return result;
}

Expand Down
5 changes: 3 additions & 2 deletions src/engine/sparqlExpressions/RegexExpression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,9 @@ auto RegexExpression::getEstimatesForFilterExpression(

// ____________________________________________________________________________
void RegexExpression::checkCancellation(
const sparqlExpression::EvaluationContext* context) {
context->cancellationHandle_->throwIfCancelled("RegexExpression");
const sparqlExpression::EvaluationContext* context,
ad_utility::source_location location) {
context->cancellationHandle_->throwIfCancelled(location);
}

} // namespace sparqlExpression
7 changes: 5 additions & 2 deletions src/engine/sparqlExpressions/RegexExpression.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,12 @@ class RegexExpression : public SparqlExpression {
sparqlExpression::EvaluationContext* context) const;

/// Helper function to check if the `CancellationHandle` of the passed
/// `EvaluationContext` has been cancelled.
/// `EvaluationContext` has been cancelled and throw an exception if this is
/// the case.
static void checkCancellation(
const sparqlExpression::EvaluationContext* context);
const sparqlExpression::EvaluationContext* context,
ad_utility::source_location location =
ad_utility::source_location::current());
};
namespace detail {
// Check if `regex` is a prefix regex which means that it starts with `^` and
Expand Down
4 changes: 2 additions & 2 deletions src/engine/sparqlExpressions/RelationalExpressions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ requires AreComparable<S1, S2> ExpressionResult evaluateRelationalExpression(
context);
}
}
context->cancellationHandle_->throwIfCancelled("RelationalExpression");
context->cancellationHandle_->throwIfCancelled();
return std::nullopt;
};
std::optional<ExpressionResult> resultFromBinarySearch;
Expand Down Expand Up @@ -196,7 +196,7 @@ requires AreComparable<S1, S2> ExpressionResult evaluateRelationalExpression(
ad_utility::visitWithVariantsAndParameters(impl, base(*itA), base(*itB));
++itA;
++itB;
context->cancellationHandle_->throwIfCancelled("RelationalExpression");
context->cancellationHandle_->throwIfCancelled();
}

if constexpr (resultIsConstant) {
Expand Down
2 changes: 1 addition & 1 deletion src/engine/sparqlExpressions/SparqlExpressionGenerators.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ inline auto valueGetterGenerator = []<typename ValueGetter, SingleExpressionResu
Input&& input, ValueGetter&& valueGetter) {
auto transformation = [ context, valueGetter ]<typename I>(I && i)
requires std::invocable<ValueGetter, I&&, EvaluationContext*> {
context->cancellationHandle_->throwIfCancelled("Value getter generator");
context->cancellationHandle_->throwIfCancelled();
return valueGetter(AD_FWD(i), context);
};
return makeGenerator(std::forward<Input>(input), numElements, context, transformation);
Expand Down
4 changes: 2 additions & 2 deletions src/engine/sparqlExpressions/StringExpressions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,11 +341,11 @@ class ConcatExpression : public detail::VariadicExpression {
str.has_value()) {
resultAsVec[i].append(str.value());
}
ctx->cancellationHandle_->throwIfCancelled("ConcatExpression");
ctx->cancellationHandle_->throwIfCancelled();
++i;
}
}
ctx->cancellationHandle_->throwIfCancelled("ConcatExpression");
ctx->cancellationHandle_->throwIfCancelled();
};
std::ranges::for_each(
childrenVec(), [&ctx, &visitSingleExpressionResult](const auto& child) {
Expand Down
20 changes: 10 additions & 10 deletions src/index/CompressedRelation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ CompressedRelationReader::asyncParallelBlockGenerator(
std::mutex blockIteratorMutex;
auto readAndDecompressBlock =
[&]() -> std::optional<std::pair<size_t, DecompressedBlock>> {
checkCancellation(cancellationHandle);
cancellationHandle->throwIfCancelled();
std::unique_lock lock{blockIteratorMutex};
if (blockIterator == endBlock) {
return std::nullopt;
Expand Down Expand Up @@ -77,7 +77,7 @@ CompressedRelationReader::asyncParallelBlockGenerator(
queueSize, numThreads, readAndDecompressBlock);
for (IdTable& block : queue) {
popTimer.stop();
checkCancellation(cancellationHandle);
cancellationHandle->throwIfCancelled();
++details.numBlocksRead_;
details.numElementsRead_ += block.numRows();
co_yield block;
Expand Down Expand Up @@ -110,7 +110,7 @@ CompressedRelationReader::IdTableGenerator CompressedRelationReader::lazyScan(
auto getIncompleteBlock = [&](auto it) {
auto result = readPossiblyIncompleteBlock(metadata, col1Id, *it,
std::ref(details), columnIndices);
checkCancellation(cancellationHandle);
cancellationHandle->throwIfCancelled();
return result;
};

Expand Down Expand Up @@ -327,13 +327,13 @@ IdTable CompressedRelationReader::scan(
firstBlockResult = readIncompleteBlock(*beginBlock);
totalResultSize += firstBlockResult.value().size();
++beginBlock;
checkCancellation(cancellationHandle);
cancellationHandle->throwIfCancelled();
}
if (beginBlock < endBlock) {
lastBlockResult = readIncompleteBlock(*(endBlock - 1));
totalResultSize += lastBlockResult.value().size();
endBlock--;
checkCancellation(cancellationHandle);
cancellationHandle->throwIfCancelled();
}

// Determine the total size of the result.
Expand All @@ -343,7 +343,7 @@ IdTable CompressedRelationReader::scan(
return count + block.numRows_;
});
result.resize(totalResultSize);
checkCancellation(cancellationHandle);
cancellationHandle->throwIfCancelled();

size_t rowIndexOfNextBlockStart = 0;
// Lambda that appends a possibly incomplete block (the first or last block)
Expand All @@ -365,7 +365,7 @@ IdTable CompressedRelationReader::scan(
};

addIncompleteBlockIfExists(firstBlockResult);
checkCancellation(cancellationHandle);
cancellationHandle->throwIfCancelled();

// Insert the complete blocks from the middle in parallel
if (beginBlock < endBlock) {
Expand Down Expand Up @@ -394,7 +394,7 @@ IdTable CompressedRelationReader::scan(
// block in parallel
#pragma omp task
{
if (!cancellationHandle->isCancelled("CompressedRelation scan")) {
if (!cancellationHandle->isCancelled()) {
decompressLambda();
}
}
Expand All @@ -403,11 +403,11 @@ IdTable CompressedRelationReader::scan(
rowIndexOfNextBlockStart += block.numRows_;
} // end of parallel region
}
checkCancellation(cancellationHandle);
cancellationHandle->throwIfCancelled();
// Add the last block.
addIncompleteBlockIfExists(lastBlockResult);
AD_CORRECTNESS_CHECK(rowIndexOfNextBlockStart == result.size());
checkCancellation(cancellationHandle);
cancellationHandle->throwIfCancelled();
return result;
}

Expand Down
9 changes: 0 additions & 9 deletions src/index/CompressedRelation.h
Original file line number Diff line number Diff line change
Expand Up @@ -531,15 +531,6 @@ class CompressedRelationReader {
auto beginBlock, auto endBlock, ColumnIndices columnIndices,
ad_utility::SharedCancellationHandle cancellationHandle) const;

// A helper function to abstract away the timeout check:
static void checkCancellation(
const ad_utility::SharedCancellationHandle& cancellationHandle) {
// Not really expensive but since this should be called
// very often, try to avoid any extra checks.
AD_EXPENSIVE_CHECK(cancellationHandle);
cancellationHandle->throwIfCancelled("IndexScan"sv);
}

// Return a vector that consists of the concatenation of `baseColumns` and
// `additionalColumns`
static std::vector<ColumnIndex> prepareColumnIndices(
Expand Down
2 changes: 1 addition & 1 deletion src/index/IndexImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1455,7 +1455,7 @@ IdTable IndexImpl::scan(
: std::nullopt;
if (!col0Id.has_value() || (col1String.has_value() && !col1Id.has_value())) {
size_t numColumns = col1String.has_value() ? 1 : 2;
cancellationHandle->throwIfCancelled("IndexImpl scan");
cancellationHandle->throwIfCancelled();
return IdTable{numColumns, allocator_};
}
return scan(col0Id.value(), col1Id, permutation, additionalColumns,
Expand Down
Loading
Loading