Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Meta PR for various improvements to JOIN #1662

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/engine/Filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ class Filter : public Operation {
return _subtree->getMultiplicity(col);
}

std::vector<Operation*> getIndexScansForSortVariables(
std::span<const Variable> variables) override {
return _subtree->getIndexScansForSortVariables(variables);
}

private:
VariableToColumnMap computeVariableToColumnMap() const override {
return _subtree->getVariableColumns();
Expand Down
60 changes: 59 additions & 1 deletion src/engine/IndexScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,11 @@ IdTable IndexScan::materializedIndexScan() const {
// _____________________________________________________________________________
ProtoResult IndexScan::computeResult(bool requestLaziness) {
LOG(DEBUG) << "IndexScan result computation...\n";
if (explicitLazyResult_.has_value()) {
AD_CORRECTNESS_CHECK(requestLaziness);
absl::Cleanup cleanup{[this]() { explicitLazyResult_.reset(); }};
return {std::move(explicitLazyResult_.value()), resultSortedOn()};
}
if (requestLaziness) {
return {chunkedIndexScan(), resultSortedOn()};
}
Expand Down Expand Up @@ -339,7 +344,14 @@ IndexScan::getBlockMetadata() const {
// _____________________________________________________________________________
std::optional<std::vector<CompressedBlockMetadata>>
IndexScan::getBlockMetadataOptionallyPrefiltered() const {
auto optBlockSpan = getBlockMetadata();
auto optBlockSpan =
[&]() -> std::optional<std::span<const CompressedBlockMetadata>> {
if (prefilteredBlocks_.has_value()) {
return prefilteredBlocks_.value();
} else {
return getBlockMetadata();
}
}();
std::optional<std::vector<CompressedBlockMetadata>> optBlocks = std::nullopt;
if (optBlockSpan.has_value()) {
const auto& blockSpan = optBlockSpan.value();
Expand Down Expand Up @@ -437,6 +449,34 @@ IndexScan::lazyScanForJoinOfTwoScans(const IndexScan& s1, const IndexScan& s2) {
return result;
}

// _____________________________________________________________________________
void IndexScan::setBlocksForJoinOfIndexScans(Operation* left,
Operation* right) {
auto& leftScan = dynamic_cast<IndexScan&>(*left);
auto& rightScan = dynamic_cast<IndexScan&>(*right);

auto getBlocks = [](IndexScan& scan) {
auto metaBlocks = scan.getMetadataForScan();
if (!metaBlocks.has_value()) {
return metaBlocks;
}
if (scan.prefilteredBlocks_.has_value()) {
metaBlocks.value().blockMetadata_ = scan.prefilteredBlocks_.value();
}
return metaBlocks;
};

auto metaBlocks1 = getBlocks(leftScan);
auto metaBlocks2 = getBlocks(rightScan);
if (!metaBlocks1.has_value() || !metaBlocks2.has_value()) {
return;
}
auto [blocks1, blocks2] = CompressedRelationReader::getBlocksForJoin(
metaBlocks1.value(), metaBlocks2.value());
leftScan.prefilteredBlocks_ = std::move(blocks1);
rightScan.prefilteredBlocks_ = std::move(blocks2);
}

// _____________________________________________________________________________
Permutation::IdTableGenerator IndexScan::lazyScanForJoinOfColumnWithScan(
std::span<const Id> joinColumn) const {
Expand Down Expand Up @@ -652,3 +692,21 @@ std::pair<Result::Generator, Result::Generator> IndexScan::prefilterTables(
return {createPrefilteredJoinSide(state),
createPrefilteredIndexScanSide(state)};
}

// _____________________________________________________________________________
std::vector<Operation*> IndexScan::getIndexScansForSortVariables(
std::span<const Variable> variables) {
const auto& sorted = resultSortedOn();
if (resultSortedOn().size() < variables.size()) {
return {};
}
const auto& varColMap = getExternallyVisibleVariableColumns();
for (size_t i = 0; i < variables.size(); ++i) {
auto it = varColMap.find(variables[i]);
if (it == varColMap.end() ||
it->second.columnIndex_ != resultSortedOn().at(i)) {
return {};
}
}
return {this};
}
19 changes: 19 additions & 0 deletions src/engine/IndexScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ class IndexScan final : public Operation {
std::vector<ColumnIndex> additionalColumns_;
std::vector<Variable> additionalVariables_;

std::optional<std::vector<CompressedBlockMetadata>> prefilteredBlocks_;
std::optional<Result::Generator> explicitLazyResult_ = std::nullopt;

public:
IndexScan(QueryExecutionContext* qec, Permutation::Enum permutation,
const SparqlTriple& triple, Graphs graphsToFilter = std::nullopt,
Expand Down Expand Up @@ -108,6 +111,12 @@ class IndexScan final : public Operation {
std::pair<Result::Generator, Result::Generator> prefilterTables(
Result::Generator input, ColumnIndex joinColumn);

static void setBlocksForJoinOfIndexScans(Operation* left, Operation* right);

void setLazyResultManually(Result::Generator lazyResult) {
explicitLazyResult_ = std::move(lazyResult);
}

private:
// Implementation detail that allows to consume a generator from two other
// cooperating generators. Needs to be forward declared as it is used by
Expand Down Expand Up @@ -234,4 +243,14 @@ class IndexScan final : public Operation {
Permutation::IdTableGenerator getLazyScan(
std::vector<CompressedBlockMetadata> blocks) const;
std::optional<Permutation::MetadataAndBlocks> getMetadataForScan() const;

// TODO<joka921> Comment.
void setPrefilteredBlocks(
std::vector<CompressedBlockMetadata> prefilteredBlocks) {
prefilteredBlocks_ = std::move(prefilteredBlocks);
disableCaching();
}

std::vector<Operation*> getIndexScansForSortVariables(
std::span<const Variable> variables) override;
};
52 changes: 8 additions & 44 deletions src/engine/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,14 @@ ProtoResult Join::computeResult(bool requestLaziness) {
auto rightResIfCached = getCachedOrSmallResult(*_right);
checkCancellation();

std::span joinVarSpan{&_joinVar, 1};
auto leftIndexScans = _left->getIndexScansForSortVariables(joinVarSpan);
auto rightIndexScans = _right->getIndexScansForSortVariables(joinVarSpan);
for (auto* left : leftIndexScans) {
for (auto* right : rightIndexScans) {
IndexScan::setBlocksForJoinOfIndexScans(left, right);
}
}
auto leftIndexScan =
std::dynamic_pointer_cast<IndexScan>(_left->getRootOperation());
if (leftIndexScan &&
Expand All @@ -189,9 +197,6 @@ ProtoResult Join::computeResult(bool requestLaziness) {
AD_CORRECTNESS_CHECK(rightResIfCached->isFullyMaterialized());
return computeResultForIndexScanAndIdTable<true>(
requestLaziness, std::move(rightResIfCached), leftIndexScan);

} else if (!leftResIfCached) {
return computeResultForTwoIndexScans(requestLaziness);
}
}

Expand Down Expand Up @@ -647,47 +652,6 @@ void Join::addCombinedRowToIdTable(const ROW_A& rowA, const ROW_B& rowB,
}
}

// ______________________________________________________________________________________________________
ProtoResult Join::computeResultForTwoIndexScans(bool requestLaziness) const {
return createResult(
requestLaziness,
[this](std::function<void(IdTable&, LocalVocab&)> yieldTable) {
auto leftScan =
std::dynamic_pointer_cast<IndexScan>(_left->getRootOperation());
auto rightScan =
std::dynamic_pointer_cast<IndexScan>(_right->getRootOperation());
AD_CORRECTNESS_CHECK(leftScan && rightScan);
// The join column already is the first column in both inputs, so we
// don't have to permute the inputs and results for the
// `AddCombinedRowToIdTable` class to work correctly.
AD_CORRECTNESS_CHECK(_leftJoinCol == 0 && _rightJoinCol == 0);
auto rowAdder = makeRowAdder(std::move(yieldTable));

ad_utility::Timer timer{
ad_utility::timer::Timer::InitialStatus::Started};
auto [leftBlocksInternal, rightBlocksInternal] =
IndexScan::lazyScanForJoinOfTwoScans(*leftScan, *rightScan);
runtimeInfo().addDetail("time-for-filtering-blocks", timer.msecs());

auto leftBlocks = convertGenerator(std::move(leftBlocksInternal));
auto rightBlocks = convertGenerator(std::move(rightBlocksInternal));

ad_utility::zipperJoinForBlocksWithoutUndef(leftBlocks, rightBlocks,
std::less{}, rowAdder);

leftScan->updateRuntimeInfoForLazyScan(leftBlocks.details());
rightScan->updateRuntimeInfoForLazyScan(rightBlocks.details());

AD_CORRECTNESS_CHECK(leftBlocks.details().numBlocksRead_ <=
rightBlocks.details().numElementsRead_);
AD_CORRECTNESS_CHECK(rightBlocks.details().numBlocksRead_ <=
leftBlocks.details().numElementsRead_);
auto localVocab = std::move(rowAdder.localVocab());
return Result::IdTableVocabPair{std::move(rowAdder).resultTable(),
std::move(localVocab)};
});
}

// ______________________________________________________________________________________________________
template <bool idTableIsRightInput>
ProtoResult Join::computeResultForIndexScanAndIdTable(
Expand Down
14 changes: 9 additions & 5 deletions src/engine/Join.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,6 @@ class Join : public Operation {

VariableToColumnMap computeVariableToColumnMap() const override;

// A special implementation that is called when both children are
// `IndexScan`s. Uses the lazy scans to only retrieve the subset of the
// `IndexScan`s that is actually needed without fully materializing them.
ProtoResult computeResultForTwoIndexScans(bool requestLaziness) const;

// A special implementation that is called when exactly one of the children is
// an `IndexScan` and the other one is a fully materialized result. The
// argument `idTableIsRightInput` determines whether the `IndexScan` is the
Expand Down Expand Up @@ -214,4 +209,13 @@ class Join : public Operation {
// Helper function to create the commonly used instance of this class.
ad_utility::AddCombinedRowToIdTable makeRowAdder(
std::function<void(IdTable&, LocalVocab&)> callback) const;

public:
std::vector<Operation*> getIndexScansForSortVariables(
std::span<const Variable> variables) override {
auto result = _left->getIndexScansForSortVariables(variables);
auto right = _right->getIndexScansForSortVariables(variables);
result.insert(result.end(), right.begin(), right.end());
return result;
}
};
19 changes: 12 additions & 7 deletions src/engine/Operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,9 @@ CacheValue Operation::runComputationAndPrepareForCache(
const ad_utility::Timer& timer, ComputationMode computationMode,
const QueryCacheKey& cacheKey, bool pinned) {
auto& cache = _executionContext->getQueryTreeCache();
bool suitableForCaching = isSuitableForCaching();
auto result = runComputation(timer, computationMode);
if (!result.isFullyMaterialized() &&
if (suitableForCaching && !result.isFullyMaterialized() &&
!unlikelyToFitInCache(cache.getMaxSizeSingleEntry())) {
AD_CONTRACT_CHECK(!pinned);
result.cacheDuringConsumption(
Expand Down Expand Up @@ -276,12 +277,16 @@ std::shared_ptr<const Result> Operation::getResult(

bool onlyReadFromCache = computationMode == ComputationMode::ONLY_IF_CACHED;

auto result =
pinResult ? cache.computeOncePinned(cacheKey, cacheSetup,
onlyReadFromCache, suitedForCache)
: cache.computeOnce(cacheKey, cacheSetup, onlyReadFromCache,
suitedForCache);

auto result = [&]() -> QueryResultCache::ResultAndCacheStatus {
auto compute = [&](auto&&... args) {
if (!isSuitableForCaching()) {
return cache.computeWithoutCache(AD_FWD(args)...);
}
return pinResult ? cache.computeOncePinned(AD_FWD(args)...)
: cache.computeOnce(AD_FWD(args)...);
};
return compute(cacheKey, cacheSetup, onlyReadFromCache, suitedForCache);
}();
if (result._resultPointer == nullptr) {
AD_CORRECTNESS_CHECK(onlyReadFromCache);
return nullptr;
Expand Down
15 changes: 14 additions & 1 deletion src/engine/Operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

// forward declaration needed to break dependencies
class QueryExecutionTree;
class IndexScan;

enum class ComputationMode {
FULLY_MATERIALIZED,
Expand Down Expand Up @@ -90,6 +91,8 @@ class Operation {
// limit/offset is applied post computation.
bool externalLimitApplied_ = false;

bool isSuitableForCaching_ = true;

public:
// Holds a `PrefilterExpression` with its corresponding `Variable`.
using PrefilterVariablePair = sparqlExpression::PrefilterExprVariablePair;
Expand Down Expand Up @@ -162,7 +165,7 @@ class Operation {
// Get a unique, not ambiguous string representation for a subtree.
// This should act like an ID for each subtree.
// Calls `getCacheKeyImpl` and adds the information about the `LIMIT` clause.
virtual string getCacheKey() const final {
virtual std::string getCacheKey() const final {
auto result = getCacheKeyImpl();
if (_limit._limit.has_value()) {
absl::StrAppend(&result, " LIMIT ", _limit._limit.value());
Expand All @@ -173,6 +176,11 @@ class Operation {
return result;
}

// If this function returns `false`, then the result of this `Operation` will
// never be stored in the cache.
virtual bool isSuitableForCaching() const { return isSuitableForCaching_; }
virtual void disableCaching() final { isSuitableForCaching_ = true; }

private:
// The individual implementation of `getCacheKey` (see above) that has to be
// customized by every child class.
Expand Down Expand Up @@ -426,6 +434,11 @@ class Operation {
RuntimeInformation::Status status =
RuntimeInformation::Status::optimizedOut);

virtual std::vector<Operation*> getIndexScansForSortVariables(
[[maybe_unused]] std::span<const Variable> variables) {
return {};
}

private:
// Create the runtime information in case the evaluation of this operation has
// failed.
Expand Down
22 changes: 21 additions & 1 deletion src/engine/QueryExecutionTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include <string>
#include <vector>

#include "engine/Filter.h"
#include "engine/IndexScan.h"
#include "engine/Sort.h"
#include "parser/RdfEscaping.h"

Expand Down Expand Up @@ -164,8 +166,26 @@ std::shared_ptr<QueryExecutionTree> QueryExecutionTree::createSortedTree(
}

QueryExecutionContext* qec = qet->getRootOperation()->getExecutionContext();
/*
bool isResortedFilter = false;
if (auto filter = dynamic_cast<const Filter*>(qet->rootOperation_.get())) {
if (dynamic_cast<const
IndexScan*>(filter->getSubtree()->rootOperation_.get())) { isResortedFilter =
true;
}
}
*/
auto sort = std::make_shared<Sort>(qec, std::move(qet), sortColumns);
return std::make_shared<QueryExecutionTree>(qec, std::move(sort));
auto result = std::make_shared<QueryExecutionTree>(qec, std::move(sort));
/*
// TODO<joka921> This currently is a hack for the pubchem queries, let's
// discuss the options to do this.
if (isResortedFilter) {
result->getSizeEstimate();
result->sizeEstimate_.value() *= 20;
}
*/
return result;
}

// _____________________________________________________________________________
Expand Down
12 changes: 12 additions & 0 deletions src/engine/QueryExecutionTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,18 @@ class QueryExecutionTree {
setPrefilterGetUpdatedQueryExecutionTree(
std::vector<Operation::PrefilterVariablePair> prefilterPairs) const;

virtual std::vector<Operation*> getIndexScansForSortVariables(
std::span<const Variable> variables) final {
auto result = rootOperation_->getIndexScansForSortVariables(variables);
if (result.empty()) {
return result;
}
rootOperation_->disableCaching();
cachedResult_.reset();
sizeEstimate_.reset();
return result;
}

size_t getDistinctEstimate(size_t col) const {
return static_cast<size_t>(rootOperation_->getSizeEstimate() /
rootOperation_->getMultiplicity(col));
Expand Down
Loading