diff --git a/site/content/documentation/programming/federation.md b/site/content/documentation/programming/federation.md index 93508f262ce..0be4361e3b8 100644 --- a/site/content/documentation/programming/federation.md +++ b/site/content/documentation/programming/federation.md @@ -305,7 +305,8 @@ FedX provides various means for configuration. Configuration settings can be def |leftJoinWorkerThreads | The number of left join worker threads for parallelization, default _10_ | |boundJoinBlockSize | Block size for bound joins, default _25_ | |enforceMaxQueryTime | Max query time in seconds, 0 to disable, default _30_ | -|enableServiceAsBoundJoin | Flag for evaluating a SERVICE expression (contacting non-federation members) using vectored evaluation, default _true_. For today's endpoints it is more efficient to disable vectored evaluation of SERVICE | +|enableServiceAsBoundJoin | Flag for evaluating a SERVICE expression (contacting non-federation members) using vectored evaluation, default _true_. | +|enableOptionalAsBindJoin | Flag for evaluating an OPTIONAL expression using bind join, default _true_. | |includeInferredDefault | whether include inferred statements should be considered, default _true_ | |consumingIterationMax | the max number of results to be consumed by `ConsumingIteration`, default _1000_ | |debugQueryPlan | Print the optimized query execution plan to stdout, default _false_ | diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConfig.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConfig.java index ccdc04552fb..c9535d71fdb 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConfig.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConfig.java @@ -13,7 +13,6 @@ import java.util.Optional; import org.eclipse.rdf4j.collection.factory.api.CollectionFactory; -import org.eclipse.rdf4j.collection.factory.impl.DefaultCollectionFactory; import org.eclipse.rdf4j.federated.cache.SourceSelectionCache; import org.eclipse.rdf4j.federated.cache.SourceSelectionCacheFactory; import org.eclipse.rdf4j.federated.cache.SourceSelectionMemoryCache; @@ -48,6 +47,8 @@ public class FedXConfig { private boolean enableServiceAsBoundJoin = true; + private boolean enableOptionalAsBindJoin = true; + private boolean enableMonitoring = false; private boolean isLogQueryPlan = false; @@ -68,7 +69,6 @@ public class FedXConfig { private int consumingIterationMax = 1000; - private CollectionFactory cf = new DefaultCollectionFactory(); /* factory like setters */ /** @@ -244,6 +244,17 @@ public FedXConfig withEnableServiceAsBoundJoin(boolean flag) { return this; } + /** + * Whether OPTIONAL clauses are evaluated using bind join (i.e. with the VALUES clause). Default true + * + * @param flag + * @return the current config. + */ + public FedXConfig withEnableOptionalAsBindJoin(boolean flag) { + this.enableOptionalAsBindJoin = flag; + return this; + } + /** * The cache specification for the {@link SourceSelectionMemoryCache}. If not set explicitly, the * {@link SourceSelectionMemoryCache#DEFAULT_CACHE_SPEC} is used. @@ -326,16 +337,26 @@ public int getBoundJoinBlockSize() { * Returns a flag indicating whether vectored evaluation using the VALUES clause shall be applied for SERVICE * expressions. * - * Default: false + * Default: true * - * Note: for todays endpoints it is more efficient to disable vectored evaluation of SERVICE. - * - * @return whether SERVICE expressions are evaluated using bound joins + * @return whether SERVICE expressions are evaluated using bind joins */ public boolean getEnableServiceAsBoundJoin() { return enableServiceAsBoundJoin; } + /** + * Returns a flag indicating whether bind join evaluation using the VALUES clause shall be applied for OPTIONAL + * expressions. + * + * Default: true + * + * @return whether OPTIONAL expressions are evaluated using bind joins + */ + public boolean isEnableOptionalAsBindJoin() { + return enableOptionalAsBindJoin; + } + /** * Get the maximum query time in seconds used for query evaluation. Applied if {@link QueryManager} is used to * create queries. @@ -485,9 +506,10 @@ public int getConsumingIterationMax() { * * @param cf * @return the current config + * @deprecated unusedO */ + @Deprecated(forRemoval = true) public FedXConfig withCollectionFactory(CollectionFactory cf) { - this.cf = cf; return this; } } diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/algebra/BoundJoinTupleExpr.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/algebra/BoundJoinTupleExpr.java index 68b9ca6e4b0..24290ed89cd 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/algebra/BoundJoinTupleExpr.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/algebra/BoundJoinTupleExpr.java @@ -10,14 +10,14 @@ *******************************************************************************/ package org.eclipse.rdf4j.federated.algebra; -import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBoundJoin; +import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin; /** * Marker interface indicating that instances are applicable for bound join processing (see - * {@link ControlledWorkerBoundJoin} + * {@link ControlledWorkerBindJoin} * * @author Andreas Schwarte - * @see ControlledWorkerBoundJoin + * @see ControlledWorkerBindJoin */ public interface BoundJoinTupleExpr { diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java index a3e6237dac3..561a00bacbc 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java @@ -15,10 +15,8 @@ import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Supplier; import java.util.stream.Collectors; -import org.eclipse.rdf4j.collection.factory.api.CollectionFactory; import org.eclipse.rdf4j.common.iteration.CloseableIteration; import org.eclipse.rdf4j.common.iteration.EmptyIteration; import org.eclipse.rdf4j.common.iteration.SingletonIteration; @@ -39,6 +37,7 @@ import org.eclipse.rdf4j.federated.algebra.FedXZeroLengthPath; import org.eclipse.rdf4j.federated.algebra.FederatedDescribeOperator; import org.eclipse.rdf4j.federated.algebra.FilterExpr; +import org.eclipse.rdf4j.federated.algebra.FilterTuple; import org.eclipse.rdf4j.federated.algebra.FilterValueExpr; import org.eclipse.rdf4j.federated.algebra.HolderNode; import org.eclipse.rdf4j.federated.algebra.NJoin; @@ -53,12 +52,14 @@ import org.eclipse.rdf4j.federated.endpoint.Endpoint; import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler; import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelServiceExecutor; +import org.eclipse.rdf4j.federated.evaluation.iterator.BindLeftJoinIteration; import org.eclipse.rdf4j.federated.evaluation.iterator.FedXPathIteration; import org.eclipse.rdf4j.federated.evaluation.iterator.FederatedDescribeIteration; +import org.eclipse.rdf4j.federated.evaluation.iterator.FilteringIteration; import org.eclipse.rdf4j.federated.evaluation.iterator.SingleBindingSetIteration; +import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin; import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBoundJoin; import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin; -import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerLeftJoin; import org.eclipse.rdf4j.federated.evaluation.join.SynchronousBoundJoin; import org.eclipse.rdf4j.federated.evaluation.join.SynchronousJoin; import org.eclipse.rdf4j.federated.evaluation.union.ControlledWorkerUnion; @@ -68,6 +69,7 @@ import org.eclipse.rdf4j.federated.evaluation.union.ParallelUnionOperatorTask; import org.eclipse.rdf4j.federated.evaluation.union.SynchronousWorkerUnion; import org.eclipse.rdf4j.federated.evaluation.union.WorkerUnionBase; +import org.eclipse.rdf4j.federated.exception.ExceptionUtil; import org.eclipse.rdf4j.federated.exception.FedXRuntimeException; import org.eclipse.rdf4j.federated.exception.IllegalQueryException; import org.eclipse.rdf4j.federated.optimizer.DefaultFedXCostModel; @@ -97,6 +99,7 @@ import org.eclipse.rdf4j.query.QueryEvaluationException; import org.eclipse.rdf4j.query.algebra.DescribeOperator; import org.eclipse.rdf4j.query.algebra.Join; +import org.eclipse.rdf4j.query.algebra.LeftJoin; import org.eclipse.rdf4j.query.algebra.QueryRoot; import org.eclipse.rdf4j.query.algebra.Service; import org.eclipse.rdf4j.query.algebra.StatementPattern; @@ -108,12 +111,10 @@ import org.eclipse.rdf4j.query.algebra.evaluation.ValueExprEvaluationException; import org.eclipse.rdf4j.query.algebra.evaluation.federation.FederatedService; import org.eclipse.rdf4j.query.algebra.evaluation.federation.ServiceJoinIterator; -import org.eclipse.rdf4j.query.algebra.evaluation.impl.DefaultEvaluationStrategy; import org.eclipse.rdf4j.query.algebra.evaluation.impl.EvaluationStatistics; import org.eclipse.rdf4j.query.algebra.evaluation.impl.QueryEvaluationContext; import org.eclipse.rdf4j.query.algebra.evaluation.impl.StrictEvaluationStrategy; import org.eclipse.rdf4j.query.algebra.evaluation.iterator.BadlyDesignedLeftJoinIterator; -import org.eclipse.rdf4j.query.algebra.evaluation.iterator.DescribeIteration; import org.eclipse.rdf4j.query.algebra.evaluation.iterator.HashJoinIteration; import org.eclipse.rdf4j.query.algebra.evaluation.optimizer.ConstantOptimizer; import org.eclipse.rdf4j.query.algebra.evaluation.optimizer.DisjunctiveConstraintOptimizer; @@ -748,10 +749,7 @@ public CloseableIteration evaluate(BindingSet bindings) { if (problemVars.containsAll(bindings.getBindingNames())) { var leftIter = leftPrepared.evaluate(bindings); - ControlledWorkerLeftJoin join = new ControlledWorkerLeftJoin(scheduler, FederationEvalStrategy.this, - leftIter, leftJoin, bindings, leftJoin.getQueryInfo()); - executor.execute(join); - return join; + return executeLeftJoin(scheduler, leftIter, leftJoin, bindings, leftJoin.getQueryInfo()); } else { Set problemVarsClone = new HashSet<>(problemVars); problemVarsClone.retainAll(bindings.getBindingNames()); @@ -815,8 +813,14 @@ public QueryEvaluationStep prepareNaryUnion(NUnion union, QueryEvaluationContext /** * Execute the join in a separate thread using some join executor. * - * Join executors are for instance: - {@link SynchronousJoin} - {@link SynchronousBoundJoin} - - * {@link ControlledWorkerJoin} - {@link ControlledWorkerBoundJoin} + * Join executors are for instance: + * + * * * For endpoint federation use controlled worker bound join, for local federation use controlled worker join. The * other operators are there for completeness. @@ -836,6 +840,21 @@ protected abstract CloseableIteration executeJoin( CloseableIteration leftIter, TupleExpr rightArg, Set joinVariables, BindingSet bindings, QueryInfo queryInfo) throws QueryEvaluationException; + /** + * Execute the left join in a separate thread using some join executor. + * + * @param joinScheduler + * @param leftIter + * @param leftJoin + * @param bindings + * @return the result + * @throws QueryEvaluationException + */ + protected abstract CloseableIteration executeLeftJoin( + ControlledWorkerScheduler joinScheduler, + CloseableIteration leftIter, LeftJoin leftJoin, + BindingSet bindings, QueryInfo queryInfo) throws QueryEvaluationException; + public abstract CloseableIteration evaluateExclusiveGroup( ExclusiveGroup group, BindingSet bindings) throws RepositoryException, MalformedQueryException, QueryEvaluationException; @@ -920,10 +939,63 @@ public abstract CloseableIteration evaluateBoundJoinStatementPattern public abstract CloseableIteration evaluateGroupedCheck( CheckStatementPattern stmt, final List bindings) throws QueryEvaluationException; + /** + * Evaluate the left bind join for the given {@link StatementTupleExpr} and bindings at the relevant endpoints. + * + * @param stmt + * @param bindings + * @return the result iteration + * @throws QueryEvaluationException + * @see {@link BindLeftJoinIteration} + */ + public CloseableIteration evaluateLeftBoundJoinStatementPattern( + StatementTupleExpr stmt, final List bindings) throws QueryEvaluationException { + // we can omit the bound join handling + if (bindings.size() == 1) { + return evaluate(stmt, bindings.get(0)); + } + + FilterValueExpr filterExpr = null; + if (stmt instanceof FilterTuple) { + filterExpr = ((FilterTuple) stmt).getFilterExpr(); + } + + AtomicBoolean isEvaluated = new AtomicBoolean(false); + String preparedQuery = QueryStringUtil.selectQueryStringBoundJoinVALUES((StatementPattern) stmt, bindings, + filterExpr, isEvaluated, stmt.getQueryInfo().getDataset()); + + CloseableIteration result = null; + try { + result = evaluateAtStatementSources(preparedQuery, stmt.getStatementSources(), stmt.getQueryInfo()); + + // apply filter and/or convert to original bindings + if (filterExpr != null && !isEvaluated.get()) { + result = new BindLeftJoinIteration(result, bindings); // apply conversion + result = new FilteringIteration(filterExpr, result, this); // apply filter + if (!result.hasNext()) { + result.close(); + return new EmptyIteration<>(); + } + } else { + result = new BindLeftJoinIteration(result, bindings); + } + + return result; + } catch (Throwable t) { + if (result != null) { + result.close(); + } + if (t instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throw ExceptionUtil.toQueryEvaluationException(t); + } + } + /** * Evaluate a SERVICE using vectored evaluation, taking the provided bindings as input. * - * See {@link ControlledWorkerBoundJoin} and {@link FedXConfig#getEnableServiceAsBoundJoin()} + * See {@link ControlledWorkerBindJoin} and {@link FedXConfig#getEnableServiceAsBoundJoin()} * * @param service * @param bindings diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SailFederationEvalStrategy.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SailFederationEvalStrategy.java index dbe6cf8bfcc..9315a4b8feb 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SailFederationEvalStrategy.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SailFederationEvalStrategy.java @@ -27,11 +27,13 @@ import org.eclipse.rdf4j.federated.evaluation.iterator.FilteringIteration; import org.eclipse.rdf4j.federated.evaluation.iterator.GroupedCheckConversionIteration; import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin; +import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerLeftJoin; import org.eclipse.rdf4j.federated.structures.QueryInfo; import org.eclipse.rdf4j.federated.util.QueryAlgebraUtil; import org.eclipse.rdf4j.query.BindingSet; import org.eclipse.rdf4j.query.MalformedQueryException; import org.eclipse.rdf4j.query.QueryEvaluationException; +import org.eclipse.rdf4j.query.algebra.LeftJoin; import org.eclipse.rdf4j.query.algebra.StatementPattern; import org.eclipse.rdf4j.query.algebra.TupleExpr; import org.eclipse.rdf4j.repository.RepositoryException; @@ -119,6 +121,16 @@ public CloseableIteration executeJoin( return join; } + @Override + protected CloseableIteration executeLeftJoin(ControlledWorkerScheduler joinScheduler, + CloseableIteration leftIter, LeftJoin leftJoin, BindingSet bindings, QueryInfo queryInfo) + throws QueryEvaluationException { + ControlledWorkerLeftJoin join = new ControlledWorkerLeftJoin(joinScheduler, this, + leftIter, leftJoin, bindings, queryInfo); + executor.execute(join); + return join; + } + @Override public CloseableIteration evaluateExclusiveGroup( ExclusiveGroup group, BindingSet bindings) diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SparqlFederationEvalStrategy.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SparqlFederationEvalStrategy.java index 0be9600d7c1..69bdf63cb38 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SparqlFederationEvalStrategy.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SparqlFederationEvalStrategy.java @@ -17,8 +17,10 @@ import org.eclipse.rdf4j.common.iteration.CloseableIteration; import org.eclipse.rdf4j.common.iteration.EmptyIteration; import org.eclipse.rdf4j.federated.FederationContext; +import org.eclipse.rdf4j.federated.algebra.BoundJoinTupleExpr; import org.eclipse.rdf4j.federated.algebra.CheckStatementPattern; import org.eclipse.rdf4j.federated.algebra.ExclusiveGroup; +import org.eclipse.rdf4j.federated.algebra.FedXService; import org.eclipse.rdf4j.federated.algebra.FilterTuple; import org.eclipse.rdf4j.federated.algebra.FilterValueExpr; import org.eclipse.rdf4j.federated.algebra.StatementTupleExpr; @@ -29,7 +31,11 @@ import org.eclipse.rdf4j.federated.evaluation.iterator.GroupedCheckConversionIteration; import org.eclipse.rdf4j.federated.evaluation.iterator.InsertBindingsIteration; import org.eclipse.rdf4j.federated.evaluation.iterator.SingleBindingSetIteration; -import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBoundJoin; +import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin; +import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindLeftJoin; +import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin; +import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerLeftJoin; +import org.eclipse.rdf4j.federated.evaluation.join.JoinExecutorBase; import org.eclipse.rdf4j.federated.exception.ExceptionUtil; import org.eclipse.rdf4j.federated.exception.IllegalQueryException; import org.eclipse.rdf4j.federated.structures.QueryInfo; @@ -37,6 +43,7 @@ import org.eclipse.rdf4j.query.BindingSet; import org.eclipse.rdf4j.query.MalformedQueryException; import org.eclipse.rdf4j.query.QueryEvaluationException; +import org.eclipse.rdf4j.query.algebra.LeftJoin; import org.eclipse.rdf4j.query.algebra.StatementPattern; import org.eclipse.rdf4j.query.algebra.TupleExpr; import org.eclipse.rdf4j.repository.RepositoryException; @@ -45,7 +52,7 @@ * Implementation of a federation evaluation strategy which provides some special optimizations for SPARQL (remote) * endpoints. The most important optimization is to used prepared SPARQL Queries that are already created using Strings. *

- * Joins are executed using {@link ControlledWorkerBoundJoin}. + * Joins are executed using {@link ControlledWorkerBindJoin}. *

*

* This implementation uses the SPARQL 1.1 VALUES operator for the bound-join evaluation @@ -173,13 +180,61 @@ public CloseableIteration executeJoin( TupleExpr rightArg, Set joinVars, BindingSet bindings, QueryInfo queryInfo) throws QueryEvaluationException { - ControlledWorkerBoundJoin join = new ControlledWorkerBoundJoin(joinScheduler, this, leftIter, rightArg, - bindings, queryInfo); + // determine if we can execute the expr as bind join + boolean executeAsBindJoin = false; + if (rightArg instanceof BoundJoinTupleExpr) { + if (rightArg instanceof FedXService) { + executeAsBindJoin = queryInfo.getFederationContext().getConfig().getEnableServiceAsBoundJoin(); + } else { + executeAsBindJoin = true; + } + } + + JoinExecutorBase join; + if (executeAsBindJoin) { + join = new ControlledWorkerBindJoin(joinScheduler, this, leftIter, rightArg, + bindings, queryInfo); + } else { + join = new ControlledWorkerJoin(joinScheduler, this, leftIter, rightArg, bindings, + queryInfo); + } + join.setJoinVars(joinVars); executor.execute(join); return join; } + @Override + protected CloseableIteration executeLeftJoin(ControlledWorkerScheduler joinScheduler, + CloseableIteration leftIter, LeftJoin leftJoin, BindingSet bindings, QueryInfo queryInfo) + throws QueryEvaluationException { + + var rightArg = leftJoin.getRightArg(); + var fedxConfig = queryInfo.getFederationContext().getConfig(); + + // determine if we can execute the expr as bind join + boolean executeAsBindJoin = false; + if (fedxConfig.isEnableOptionalAsBindJoin() && rightArg instanceof BoundJoinTupleExpr) { + if (rightArg instanceof FedXService) { + executeAsBindJoin = false; + } else { + executeAsBindJoin = true; + } + } + + JoinExecutorBase join; + if (executeAsBindJoin) { + join = new ControlledWorkerBindLeftJoin(joinScheduler, this, leftIter, rightArg, + bindings, queryInfo); + } else { + join = new ControlledWorkerLeftJoin(joinScheduler, this, + leftIter, leftJoin, bindings, queryInfo); + } + + executor.execute(join); + return join; + } + @Override public CloseableIteration evaluateExclusiveGroup( ExclusiveGroup group, BindingSet bindings) throws RepositoryException, diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java index f677ca46ea4..1060e86a2de 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java @@ -18,7 +18,7 @@ import java.util.concurrent.TimeUnit; import org.eclipse.rdf4j.common.iteration.CloseableIteration; -import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBoundJoin; +import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin; import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin; import org.eclipse.rdf4j.federated.evaluation.union.ControlledWorkerUnion; import org.eclipse.rdf4j.federated.exception.ExceptionUtil; @@ -34,7 +34,7 @@ * @author Andreas Schwarte * @see ControlledWorkerUnion * @see ControlledWorkerJoin - * @see ControlledWorkerBoundJoin + * @see ControlledWorkerBindJoin */ public class ControlledWorkerScheduler implements Scheduler, TaskWrapperAware { diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/iterator/BindLeftJoinIteration.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/iterator/BindLeftJoinIteration.java new file mode 100644 index 00000000000..5b0b61f544a --- /dev/null +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/iterator/BindLeftJoinIteration.java @@ -0,0 +1,95 @@ +/******************************************************************************* + * Copyright (c) 2024 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.federated.evaluation.iterator; + +import java.util.List; +import java.util.ListIterator; + +import org.eclipse.collections.impl.set.mutable.primitive.IntHashSet; +import org.eclipse.rdf4j.common.iteration.CloseableIteration; +import org.eclipse.rdf4j.common.iteration.LookAheadIteration; +import org.eclipse.rdf4j.model.Literal; +import org.eclipse.rdf4j.query.Binding; +import org.eclipse.rdf4j.query.BindingSet; +import org.eclipse.rdf4j.query.QueryEvaluationException; +import org.eclipse.rdf4j.query.algebra.evaluation.QueryBindingSet; + +/** + * A {@link LookAheadIteration} for processing bind left join results (i.e., result of joining OPTIONAL clauses) + * + * Algorithm: + * + *

    + *
  • execute left bind join using regular bound join query
  • + *
  • process result iteration similar to {@link BoundJoinVALUESConversionIteration}
  • + *
  • remember seen set of bindings (using index) and add original bindings to those, i.e. put to result return all + * non-seen bindings directly from the input
  • + * + * + * @author Andreas Schwarte + */ +public class BindLeftJoinIteration extends LookAheadIteration { + + protected final CloseableIteration iter; + protected final List bindings; + + protected IntHashSet seenBindingIndexes = new IntHashSet(); + protected final ListIterator bindingsIterator; + + public BindLeftJoinIteration(CloseableIteration iter, + List bindings) { + this.iter = iter; + this.bindings = bindings; + this.bindingsIterator = bindings.listIterator(); + } + + @Override + protected BindingSet getNextElement() { + + if (iter.hasNext()) { + var bIn = iter.next(); + int bIndex = ((Literal) bIn.getValue(BoundJoinVALUESConversionIteration.INDEX_BINDING_NAME)).intValue(); + seenBindingIndexes.add(bIndex); + return convert(bIn, bIndex); + } + + while (bindingsIterator.hasNext()) { + if (seenBindingIndexes.contains(bindingsIterator.nextIndex())) { + // the binding was already processed as part of the optional + bindingsIterator.next(); + continue; + } + return bindingsIterator.next(); + } + + return null; + } + + @Override + protected void handleClose() { + iter.close(); + } + + protected BindingSet convert(BindingSet bIn, int bIndex) throws QueryEvaluationException { + QueryBindingSet res = new QueryBindingSet(); + for (Binding b : bIn) { + if (b.getName().equals(BoundJoinVALUESConversionIteration.INDEX_BINDING_NAME)) { + continue; + } + res.addBinding(b); + } + for (Binding bs : bindings.get(bIndex)) { + res.setBinding(bs); + } + return res; + } + +} diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/iterator/BoundJoinVALUESConversionIteration.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/iterator/BoundJoinVALUESConversionIteration.java index c734fe6fea9..9bc71b87dbd 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/iterator/BoundJoinVALUESConversionIteration.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/iterator/BoundJoinVALUESConversionIteration.java @@ -10,13 +10,13 @@ *******************************************************************************/ package org.eclipse.rdf4j.federated.evaluation.iterator; -import java.util.Iterator; import java.util.List; import org.eclipse.rdf4j.common.iteration.CloseableIteration; import org.eclipse.rdf4j.common.iteration.ConvertingIteration; import org.eclipse.rdf4j.federated.evaluation.SparqlFederationEvalStrategy; import org.eclipse.rdf4j.federated.util.QueryStringUtil; +import org.eclipse.rdf4j.model.Literal; import org.eclipse.rdf4j.query.Binding; import org.eclipse.rdf4j.query.BindingSet; import org.eclipse.rdf4j.query.QueryEvaluationException; @@ -53,10 +53,8 @@ public BoundJoinVALUESConversionIteration(CloseableIteration iter, @Override protected BindingSet convert(BindingSet bIn) throws QueryEvaluationException { QueryBindingSet res = new QueryBindingSet(); - int bIndex = Integer.parseInt(bIn.getBinding(INDEX_BINDING_NAME).getValue().stringValue()); - Iterator bIter = bIn.iterator(); - while (bIter.hasNext()) { - Binding b = bIter.next(); + int bIndex = ((Literal) bIn.getValue(BoundJoinVALUESConversionIteration.INDEX_BINDING_NAME)).intValue(); + for (Binding b : bIn) { if (b.getName().equals(INDEX_BINDING_NAME)) { continue; } diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBindJoin.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBindJoin.java new file mode 100644 index 00000000000..17010b1e3c5 --- /dev/null +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBindJoin.java @@ -0,0 +1,114 @@ +/******************************************************************************* + * Copyright (c) 2019 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.federated.evaluation.join; + +import java.util.List; + +import org.eclipse.rdf4j.common.iteration.CloseableIteration; +import org.eclipse.rdf4j.federated.algebra.CheckStatementPattern; +import org.eclipse.rdf4j.federated.algebra.FedXService; +import org.eclipse.rdf4j.federated.algebra.StatementTupleExpr; +import org.eclipse.rdf4j.federated.evaluation.FederationEvalStrategy; +import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler; +import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelExecutor; +import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelTask; +import org.eclipse.rdf4j.federated.structures.QueryInfo; +import org.eclipse.rdf4j.query.BindingSet; +import org.eclipse.rdf4j.query.QueryEvaluationException; +import org.eclipse.rdf4j.query.algebra.TupleExpr; + +/** + * Execution of a regular join as bind join. + * + * @author Andreas Schwarte + * @see ControlledWorkerBindJoinBase + */ +public class ControlledWorkerBindJoin extends ControlledWorkerBindJoinBase { + + public ControlledWorkerBindJoin(ControlledWorkerScheduler scheduler, FederationEvalStrategy strategy, + CloseableIteration leftIter, + TupleExpr rightArg, BindingSet bindings, QueryInfo queryInfo) + throws QueryEvaluationException { + super(scheduler, strategy, leftIter, rightArg, bindings, queryInfo); + } + + @Override + protected TaskCreator determineTaskCreator(TupleExpr expr, BindingSet bs) { + final TaskCreator taskCreator; + if (expr instanceof StatementTupleExpr) { + StatementTupleExpr stmt = (StatementTupleExpr) expr; + if (stmt.hasFreeVarsFor(bs)) { + taskCreator = new BoundJoinTaskCreator(strategy, stmt); + } else { + expr = new CheckStatementPattern(stmt, queryInfo); + taskCreator = new CheckJoinTaskCreator(strategy, (CheckStatementPattern) expr); + } + } else if (expr instanceof FedXService) { + taskCreator = new FedXServiceJoinTaskCreator(strategy, (FedXService) expr); + } else { + throw new RuntimeException("Expr is of unexpected type: " + expr.getClass().getCanonicalName() + + ". Please report this problem."); + } + return taskCreator; + } + + protected class BoundJoinTaskCreator implements TaskCreator { + protected final FederationEvalStrategy _strategy; + protected final StatementTupleExpr _expr; + + public BoundJoinTaskCreator( + FederationEvalStrategy strategy, StatementTupleExpr expr) { + super(); + _strategy = strategy; + _expr = expr; + } + + @Override + public ParallelTask getTask(ParallelExecutor control, List bindings) { + return new ParallelBoundJoinTask(control, _strategy, _expr, bindings); + } + } + + protected class CheckJoinTaskCreator implements TaskCreator { + protected final FederationEvalStrategy _strategy; + protected final CheckStatementPattern _expr; + + public CheckJoinTaskCreator( + FederationEvalStrategy strategy, CheckStatementPattern expr) { + super(); + _strategy = strategy; + _expr = expr; + } + + @Override + public ParallelTask getTask(ParallelExecutor control, List bindings) { + return new ParallelCheckJoinTask(control, _strategy, _expr, bindings); + } + } + + protected class FedXServiceJoinTaskCreator implements TaskCreator { + protected final FederationEvalStrategy _strategy; + protected final FedXService _expr; + + public FedXServiceJoinTaskCreator( + FederationEvalStrategy strategy, FedXService expr) { + super(); + _strategy = strategy; + _expr = expr; + } + + @Override + public ParallelTask getTask(ParallelExecutor control, List bindings) { + return new ParallelServiceJoinTask(control, _strategy, _expr, bindings); + } + } + +} diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBindJoinBase.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBindJoinBase.java new file mode 100644 index 00000000000..d00477a2adf --- /dev/null +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBindJoinBase.java @@ -0,0 +1,163 @@ +/******************************************************************************* + * Copyright (c) 2024 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.federated.evaluation.join; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; + +import org.eclipse.rdf4j.common.iteration.CloseableIteration; +import org.eclipse.rdf4j.federated.algebra.BoundJoinTupleExpr; +import org.eclipse.rdf4j.federated.evaluation.FederationEvalStrategy; +import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler; +import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelExecutor; +import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelTask; +import org.eclipse.rdf4j.federated.structures.QueryInfo; +import org.eclipse.rdf4j.query.BindingSet; +import org.eclipse.rdf4j.query.QueryEvaluationException; +import org.eclipse.rdf4j.query.algebra.TupleExpr; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Base class for executing joins as bind joins (i.e., the bindings of a block are injected in the SPARQL query as + * VALUES clause). + * + * The number of concurrent threads is controlled by a {@link ControlledWorkerScheduler} which works according to the + * FIFO principle and uses worker threads. + * + * This join cursor blocks until all scheduled tasks are finished, however the result iteration can be accessed from + * different threads to allow for pipelining. + * + * @author Andreas Schwarte + * + */ +public abstract class ControlledWorkerBindJoinBase extends JoinExecutorBase { + + private static final Logger log = LoggerFactory.getLogger(ControlledWorkerBindJoinBase.class); + + protected final ControlledWorkerScheduler scheduler; + + protected final Phaser phaser = new Phaser(1); + + public ControlledWorkerBindJoinBase(ControlledWorkerScheduler scheduler, + FederationEvalStrategy strategy, + CloseableIteration leftIter, + TupleExpr rightArg, BindingSet bindings, QueryInfo queryInfo) + throws QueryEvaluationException { + super(strategy, leftIter, rightArg, bindings, queryInfo); + this.scheduler = scheduler; + } + + @Override + protected void handleBindings() throws Exception { + if (!(rightArg instanceof BoundJoinTupleExpr)) { + String msg = "Right argument is not an applicable expression for bind joins. Was: " + + rightArg.getClass().getCanonicalName(); + log.debug(msg); + throw new QueryEvaluationException(msg); + } + + int nBindingsCfg = this.queryInfo.getFederationContext().getConfig().getBoundJoinBlockSize(); + int totalBindings = 0; // the total number of bindings + TupleExpr expr = rightArg; + + TaskCreator taskCreator = null; + Phaser currentPhaser = phaser; + + int nBindings; + List bindings; + while (!isClosed() && leftIter.hasNext()) { + + // create a new phaser if there are more than 10000 parties + // note: a phaser supports only up to 65535 registered parties + if (currentPhaser.getRegisteredParties() >= 10000) { + currentPhaser = new Phaser(currentPhaser); + } + + // determine the bind join block size + nBindings = getNextBindJoinSize(nBindingsCfg, totalBindings); + + bindings = new ArrayList<>(nBindings); + + int count = 0; + while (!isClosed() && count < nBindings && leftIter.hasNext()) { + var bs = leftIter.next(); + if (taskCreator == null) { + taskCreator = determineTaskCreator(expr, bs); + } + bindings.add(bs); + count++; + } + + totalBindings += count; + + currentPhaser.register(); + scheduler.schedule(taskCreator.getTask(new PhaserHandlingParallelExecutor(this, currentPhaser), bindings)); + } + + leftIter.close(); + + scheduler.informFinish(this); + + if (log.isDebugEnabled()) { + log.debug("JoinStats: left iter of " + getDisplayId() + " had " + totalBindings + " results."); + } + + phaser.awaitAdvanceInterruptibly(phaser.arrive(), queryInfo.getMaxRemainingTimeMS(), TimeUnit.MILLISECONDS); + } + + @Override + public void handleClose() throws QueryEvaluationException { + try { + super.handleClose(); + } finally { + // signal the phaser to close (if currently being blocked) + phaser.forceTermination(); + } + } + + /** + * Return the {@link TaskCreator} for executing the bind join + * + * @param expr + * @param bs + * @return + */ + protected abstract TaskCreator determineTaskCreator(TupleExpr expr, BindingSet bs); + + /** + * Return the size of the next bind join block. + * + * @param configuredBindJoinSize the configured bind join size + * @param totalBindings the current process bindings from the intermediate result set + * @return + */ + protected int getNextBindJoinSize(int configuredBindJoinSize, int totalBindings) { + + /* + * XXX idea: + * + * make nBindings dependent on the number of intermediate results of the left argument. + * + * If many intermediate results, increase the number of bindings. This will result in less remote SPARQL + * requests. + * + */ + + return configuredBindJoinSize; + } + + protected interface TaskCreator { + ParallelTask getTask(ParallelExecutor control, List bindings); + } +} diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBindLeftJoin.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBindLeftJoin.java new file mode 100644 index 00000000000..a30ed66cf7a --- /dev/null +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBindLeftJoin.java @@ -0,0 +1,70 @@ +/******************************************************************************* + * Copyright (c) 2024 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.federated.evaluation.join; + +import java.util.List; + +import org.eclipse.rdf4j.common.iteration.CloseableIteration; +import org.eclipse.rdf4j.federated.algebra.StatementTupleExpr; +import org.eclipse.rdf4j.federated.evaluation.FederationEvalStrategy; +import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler; +import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelExecutor; +import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelTask; +import org.eclipse.rdf4j.federated.structures.QueryInfo; +import org.eclipse.rdf4j.query.BindingSet; +import org.eclipse.rdf4j.query.QueryEvaluationException; +import org.eclipse.rdf4j.query.algebra.TupleExpr; + +/** + * Bind join implementation for left joins (i.e., OPTIOAL clauses) + * + * @author Andreas Schwarte + */ +public class ControlledWorkerBindLeftJoin extends ControlledWorkerBindJoinBase { + + public ControlledWorkerBindLeftJoin(ControlledWorkerScheduler scheduler, + FederationEvalStrategy strategy, CloseableIteration leftIter, TupleExpr rightArg, + BindingSet bindings, QueryInfo queryInfo) throws QueryEvaluationException { + super(scheduler, strategy, leftIter, rightArg, bindings, queryInfo); + } + + @Override + protected TaskCreator determineTaskCreator(TupleExpr expr, BindingSet bs) { + final TaskCreator taskCreator; + if (expr instanceof StatementTupleExpr) { + StatementTupleExpr stmt = (StatementTupleExpr) expr; + taskCreator = new LeftBoundJoinTaskCreator(strategy, stmt); + + } else { + throw new RuntimeException("Expr is of unexpected type: " + expr.getClass().getCanonicalName() + + ". Please report this problem."); + } + return taskCreator; + } + + static protected class LeftBoundJoinTaskCreator implements TaskCreator { + protected final FederationEvalStrategy _strategy; + protected final StatementTupleExpr _expr; + + public LeftBoundJoinTaskCreator( + FederationEvalStrategy strategy, StatementTupleExpr expr) { + super(); + _strategy = strategy; + _expr = expr; + } + + @Override + public ParallelTask getTask(ParallelExecutor control, List bindings) { + return new ParallelBindLeftJoinTask(control, _strategy, _expr, bindings); + } + } + +} diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBoundJoin.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBoundJoin.java index db939175e6f..062be55ab75 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBoundJoin.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBoundJoin.java @@ -44,7 +44,9 @@ * * @author Andreas Schwarte * + * @deprecated replaced with {@link ControlledWorkerBindJoin}l */ +@Deprecated(forRemoval = true) public class ControlledWorkerBoundJoin extends ControlledWorkerJoin { private static final Logger log = LoggerFactory.getLogger(ControlledWorkerBoundJoin.class); diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ParallelBindLeftJoinTask.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ParallelBindLeftJoinTask.java new file mode 100644 index 00000000000..bfabcb87c40 --- /dev/null +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ParallelBindLeftJoinTask.java @@ -0,0 +1,53 @@ +/******************************************************************************* + * Copyright (c) 2024 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.federated.evaluation.join; + +import java.util.List; + +import org.eclipse.rdf4j.common.iteration.CloseableIteration; +import org.eclipse.rdf4j.federated.algebra.StatementTupleExpr; +import org.eclipse.rdf4j.federated.evaluation.FederationEvalStrategy; +import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelExecutor; +import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelTaskBase; +import org.eclipse.rdf4j.query.BindingSet; + +/** + * A {@link ParallelTaskBase} for executing bind left joins. + * + * @author Andreas Schwarte + * @see FederationEvalStrategy#evaluateLeftBoundJoinStatementPattern(StatementTupleExpr, List) + */ +public class ParallelBindLeftJoinTask extends ParallelTaskBase { + + protected final FederationEvalStrategy strategy; + protected final StatementTupleExpr rightArg; + protected final List bindings; + protected final ParallelExecutor joinControl; + + public ParallelBindLeftJoinTask(ParallelExecutor joinControl, FederationEvalStrategy strategy, + StatementTupleExpr expr, List bindings) { + this.strategy = strategy; + this.rightArg = expr; + this.bindings = bindings; + this.joinControl = joinControl; + } + + @Override + public ParallelExecutor getControl() { + return joinControl; + } + + @Override + protected CloseableIteration performTaskInternal() throws Exception { + return strategy.evaluateLeftBoundJoinStatementPattern(rightArg, bindings); + } + +} diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/repository/FedXRepositoryConfig.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/repository/FedXRepositoryConfig.java index 4a7f4dab854..23923a99e42 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/repository/FedXRepositoryConfig.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/repository/FedXRepositoryConfig.java @@ -135,6 +135,11 @@ public class FedXRepositoryConfig extends AbstractRepositoryImplConfig { */ public static final IRI CONFIG_ENABLE_SERVICE_AS_BOUND_JOIN = vf.createIRI(NAMESPACE, "enableServiceAsBoundJoin"); + /** + * IRI of the property populating {@link FedXConfig#isEnableOptionalAsBindJoin()} + */ + public static final IRI CONFIG_ENABLE_OPTIONAL_AS_BIND_JOIN = vf.createIRI(NAMESPACE, "enableOptionalAsBindJoin"); + /** * IRI of the property populating {@link FedXConfig#isEnableMonitoring()} */ @@ -331,6 +336,9 @@ private void parseFedXConfigInternal(Model m, Resource confNode) throws Reposito Models.objectLiteral(m.getStatements(confNode, CONFIG_ENABLE_SERVICE_AS_BOUND_JOIN, null)) .ifPresent(value -> config.withEnableServiceAsBoundJoin(value.booleanValue())); + Models.objectLiteral(m.getStatements(confNode, CONFIG_ENABLE_OPTIONAL_AS_BIND_JOIN, null)) + .ifPresent(value -> config.withEnableOptionalAsBindJoin(value.booleanValue())); + Models.objectLiteral(m.getStatements(confNode, CONFIG_ENABLE_MONITORING, null)) .ifPresent(value -> config.withEnableMonitoring(value.booleanValue())); @@ -384,6 +392,9 @@ protected void exportFedXConfig(Model model, Resource implNode) { model.add(confNode, CONFIG_ENABLE_SERVICE_AS_BOUND_JOIN, vf.createLiteral(config.getEnableServiceAsBoundJoin())); + model.add(confNode, CONFIG_ENABLE_OPTIONAL_AS_BIND_JOIN, + vf.createLiteral(config.isEnableOptionalAsBindJoin())); + model.add(confNode, CONFIG_ENABLE_MONITORING, vf.createLiteral(config.isEnableMonitoring())); model.add(confNode, CONFIG_LOG_QUERY_PLAN, vf.createLiteral(config.isLogQueryPlan())); diff --git a/tools/federation/src/test/java/org/eclipse/rdf4j/federated/BindLeftJoinTests.java b/tools/federation/src/test/java/org/eclipse/rdf4j/federated/BindLeftJoinTests.java new file mode 100644 index 00000000000..19d3d0386e7 --- /dev/null +++ b/tools/federation/src/test/java/org/eclipse/rdf4j/federated/BindLeftJoinTests.java @@ -0,0 +1,249 @@ +/******************************************************************************* + * Copyright (c) 2024 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.federated; + +import java.util.Arrays; +import java.util.Set; + +import org.eclipse.rdf4j.common.iteration.Iterations; +import org.eclipse.rdf4j.model.util.Values; +import org.eclipse.rdf4j.model.vocabulary.FOAF; +import org.eclipse.rdf4j.model.vocabulary.OWL; +import org.eclipse.rdf4j.query.BindingSet; +import org.eclipse.rdf4j.query.TupleQuery; +import org.eclipse.rdf4j.query.TupleQueryResult; +import org.eclipse.rdf4j.repository.Repository; +import org.eclipse.rdf4j.repository.RepositoryConnection; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public class BindLeftJoinTests extends SPARQLBaseTest { + + @Override + protected void initFedXConfig() { + + fedxRule.withConfiguration(config -> { + config.withEnableMonitoring(true); + }); + } + + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void test_leftBindJoin_basic(boolean bindLeftJoinOptimizationEnabled) throws Exception { + + prepareTest( + Arrays.asList("/tests/basic/data_emptyStore.ttl", "/tests/basic/data_emptyStore.ttl", + "/tests/basic/data_emptyStore.ttl")); + + Repository repo1 = getRepository(1); + Repository repo2 = getRepository(2); + Repository repo3 = getRepository(3); + + Repository fedxRepo = fedxRule.getRepository(); + + fedxRule.setConfig(config -> { + config.withBoundJoinBlockSize(10); + config.withEnableOptionalAsBindJoin(bindLeftJoinOptimizationEnabled); + }); + + // add some persons + try (RepositoryConnection conn = repo1.getConnection()) { + + for (int i = 1; i <= 30; i++) { + var p = Values.iri("http://ex.com/p" + i); + var otherP = Values.iri("http://other.com/p" + i); + conn.add(p, OWL.SAMEAS, otherP); + } + } + + // add names for person 1, 4, 7, ... + try (RepositoryConnection conn = repo2.getConnection()) { + + for (int i = 1; i <= 30; i += 3) { + var otherP = Values.iri("http://other.com/p" + i); + conn.add(otherP, FOAF.NAME, Values.literal("Person " + i)); + } + } + + // add names for person 2, 5, 8, ... + try (RepositoryConnection conn = repo3.getConnection()) { + + for (int i = 2; i <= 30; i += 3) { + var otherP = Values.iri("http://other.com/p" + i); + conn.add(otherP, FOAF.NAME, Values.literal("Person " + i)); + } + } + + try { + // run query which joins results from multiple repos + // for a subset of persons there exist names + try (RepositoryConnection conn = fedxRepo.getConnection()) { + String query = "PREFIX foaf: " + + "SELECT * WHERE { " + + " ?person owl:sameAs ?otherPerson . " + + " OPTIONAL { ?otherPerson foaf:name ?name . } " // # @repo2 and @repo3 + + "}"; + + TupleQuery tupleQuery = conn.prepareTupleQuery(query); + try (TupleQueryResult tqr = tupleQuery.evaluate()) { + var bindings = Iterations.asList(tqr); + + Assertions.assertEquals(30, bindings.size()); + + for (int i = 1; i <= 30; i++) { + var p = Values.iri("http://ex.com/p" + i); + var otherP = Values.iri("http://other.com/p" + i); + + // find the bindingset for the person in the unordered result + BindingSet bs = bindings.stream() + .filter(b -> b.getValue("person").equals(p)) + .findFirst() + .orElseThrow(); + + Assertions.assertEquals(otherP, bs.getValue("otherPerson")); + if (i % 3 == 1 || i % 3 == 2) { + // names from repo 2 or 3 + Assertions.assertEquals("Person " + i, bs.getValue("name").stringValue()); + } else { + // no name for others + Assertions.assertFalse(bs.hasBinding("name")); + } + } + } + + } + + if (bindLeftJoinOptimizationEnabled) { + assertNumberOfRequests("endpoint1", 3); + assertNumberOfRequests("endpoint2", 5); + assertNumberOfRequests("endpoint3", 5); + } else { + assertNumberOfRequests("endpoint1", 3); + assertNumberOfRequests("endpoint2", 32); + assertNumberOfRequests("endpoint3", 32); + } + + } finally { + fedxRepo.shutDown(); + } + + } + + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void testBoundLeftJoin_stmt_nonExclusive_boundCheck(boolean bindLeftJoinOptimizationEnabled) + throws Exception { + + prepareTest( + Arrays.asList("/tests/basic/data_emptyStore.ttl", "/tests/basic/data_emptyStore.ttl", + "/tests/basic/data_emptyStore.ttl")); + + // test scenario: + // 3 repositories, 30 persons, bind join size 10, names distributed in repo 2 + // and repo 3 + Repository repo1 = getRepository(1); + Repository repo2 = getRepository(2); + Repository repo3 = getRepository(3); + + Repository fedxRepo = fedxRule.getRepository(); + + fedxRule.setConfig(config -> { + config.withBoundJoinBlockSize(10); + config.withEnableOptionalAsBindJoin(bindLeftJoinOptimizationEnabled); + }); + + // add some persons + try (RepositoryConnection conn = repo1.getConnection()) { + + for (int i = 1; i <= 30; i++) { + var p = Values.iri("http://ex.com/p" + i); + var otherP = Values.iri("http://other.com/p" + i); + conn.add(p, OWL.SAMEAS, otherP); + } + } + + // add "male" for person 1, 4, 7, ... + try (RepositoryConnection conn = repo2.getConnection()) { + + for (int i = 1; i <= 30; i += 3) { + var otherP = Values.iri("http://other.com/p" + i); + conn.add(otherP, FOAF.GENDER, Values.literal("male")); + } + } + + // add "female" for person 2, 5, 8, ... + // add "male" for person 30 + try (RepositoryConnection conn = repo3.getConnection()) { + + for (int i = 2; i <= 30; i += 3) { + var otherP = Values.iri("http://other.com/p" + i); + conn.add(otherP, FOAF.GENDER, Values.literal("female")); + } + + conn.add(Values.iri("http://other.com/p30"), FOAF.GENDER, Values.literal("male")); + } + + fedxRule.enableDebug(); + + try { + // run query which joins results from multiple repos + // for a subset of persons there exist names + try (RepositoryConnection conn = fedxRepo.getConnection()) { + String query = "PREFIX foaf: " + + "SELECT * WHERE { " + + " ?person owl:sameAs ?otherPerson . " + + " OPTIONAL { " + + " ?otherPerson foaf:gender \"male\" . " // # @repo2 and @repo3 + + " } " + + "}"; + + TupleQuery tupleQuery = conn.prepareTupleQuery(query); + try (TupleQueryResult tqr = tupleQuery.evaluate()) { + var bindings = Iterations.asList(tqr); + + Assertions.assertEquals(30, bindings.size()); + + for (int i = 1; i <= 30; i++) { + var p = Values.iri("http://ex.com/p" + i); + var otherP = Values.iri("http://other.com/p" + i); + + // find the bindingset for the person in the unordered result + BindingSet bs = bindings.stream() + .filter(b -> b.getValue("person").equals(p)) + .findFirst() + .orElseThrow(); + + Assertions.assertEquals(otherP, bs.getValue("otherPerson")); + Assertions.assertEquals(Set.of("person", "otherPerson"), bs.getBindingNames()); + } + } + + } + + if (bindLeftJoinOptimizationEnabled) { + assertNumberOfRequests("endpoint1", 3); + assertNumberOfRequests("endpoint2", 5); + assertNumberOfRequests("endpoint3", 5); + } else { + assertNumberOfRequests("endpoint1", 3); + // Note: with the current implementation we cannot + // make exact assertions for endpoint 2 and 3 + // this is because due to the check statement + // not all requests are required + } + + } finally { + fedxRepo.shutDown(); + } + } + +} diff --git a/tools/federation/src/test/java/org/eclipse/rdf4j/federated/SPARQLServerBaseTest.java b/tools/federation/src/test/java/org/eclipse/rdf4j/federated/SPARQLServerBaseTest.java index df40772402c..719d2d4bb05 100644 --- a/tools/federation/src/test/java/org/eclipse/rdf4j/federated/SPARQLServerBaseTest.java +++ b/tools/federation/src/test/java/org/eclipse/rdf4j/federated/SPARQLServerBaseTest.java @@ -18,6 +18,7 @@ import java.util.List; import org.eclipse.rdf4j.federated.endpoint.Endpoint; +import org.eclipse.rdf4j.federated.monitoring.MonitoringService; import org.eclipse.rdf4j.federated.repository.RepositorySettings; import org.eclipse.rdf4j.federated.server.NativeStoreServer; import org.eclipse.rdf4j.federated.server.SPARQLEmbeddedServer; @@ -28,6 +29,7 @@ import org.eclipse.rdf4j.rio.RDFParseException; import org.eclipse.rdf4j.rio.Rio; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -237,4 +239,39 @@ protected RepositorySettings repoSettings(int endpoint) { return server.getRepository(endpoint); } + /** + * Helper method to check the number of requests sent to respective endpoint + * + * @param memberName the memberName, typically "endpointN", where N >= 1 + * @param expectedRequests + */ + protected void assertNumberOfRequests(String memberName, int expectedRequests) { + if (!isSPARQLServer()) { + return; // ignore for non SPARQL server environment where requests are not counted + } + var fedxContext = federationContext(); + if (!fedxContext.getConfig().isEnableMonitoring()) { + Assertions.fail("monitoring is not enabled in the current federation."); + } + MonitoringService monitoringService = (MonitoringService) fedxContext.getMonitoringService(); + + // obtain the monitoring information + // Note: this method has some simplifications for the name + var monitoringInformation = monitoringService.getAllMonitoringInformation() + .stream() + .filter(m -> { + var endpoint = m.getE(); + return endpoint.getId().equals(memberName) + || endpoint.getId().equals("http://" + memberName) + || endpoint.getName().equals(memberName) + || endpoint.getName().equals("http://" + memberName); + }) + .findFirst() + .orElse(null); + + Assertions.assertEquals(expectedRequests, + (monitoringInformation != null ? monitoringInformation.getNumberOfRequests() : 0)); + + } + } diff --git a/tools/federation/src/test/java/org/eclipse/rdf4j/federated/repository/FedXRepositoryConfigTest.java b/tools/federation/src/test/java/org/eclipse/rdf4j/federated/repository/FedXRepositoryConfigTest.java index 03e46e1699d..55f7410fa46 100644 --- a/tools/federation/src/test/java/org/eclipse/rdf4j/federated/repository/FedXRepositoryConfigTest.java +++ b/tools/federation/src/test/java/org/eclipse/rdf4j/federated/repository/FedXRepositoryConfigTest.java @@ -12,10 +12,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.eclipse.rdf4j.model.util.Models.subject; -import static org.junit.Assert.assertThat; import java.io.InputStream; -import java.util.Optional; import org.eclipse.rdf4j.federated.FedXConfig; import org.eclipse.rdf4j.federated.util.Vocabulary.FEDX; @@ -161,7 +159,7 @@ public void testExport() throws Exception { .orElse(null); assertThat(configNode).isNotNull(); - assertThat(export.filter(configNode, null, null)).hasSize(14); + assertThat(export.filter(configNode, null, null)).hasSize(15); assertThat( Models.objectLiteral( @@ -189,6 +187,11 @@ public void testExport() throws Exception { export.getStatements(configNode, FedXRepositoryConfig.CONFIG_ENABLE_SERVICE_AS_BOUND_JOIN, null))) .hasValueSatisfying(v -> assertThat(v.booleanValue()).isFalse()); + assertThat( + Models.objectLiteral( + export.getStatements(configNode, FedXRepositoryConfig.CONFIG_ENABLE_OPTIONAL_AS_BIND_JOIN, + null))) + .hasValueSatisfying(v -> assertThat(v.booleanValue()).isFalse()); assertThat( Models.objectLiteral( export.getStatements(configNode, FedXRepositoryConfig.CONFIG_ENABLE_MONITORING, null))) @@ -242,9 +245,9 @@ public void testExportWithEmptyConfig() throws Exception { .orElse(null); assertThat(configNode).isNotNull(); - // Note: 14 instead of 12 since CONFIG_SOURCE_SELECTION_CACHE_SPEC & CONFIG_PREFIX_DECLARATIONS are null + // Note: 13 instead of 15 since CONFIG_SOURCE_SELECTION_CACHE_SPEC & CONFIG_PREFIX_DECLARATIONS are null // and thus should not be populated - assertThat(export.filter(configNode, null, null)).hasSize(12); + assertThat(export.filter(configNode, null, null)).hasSize(13); assertThat( Models.objectLiteral( @@ -272,6 +275,11 @@ public void testExportWithEmptyConfig() throws Exception { export.getStatements(configNode, FedXRepositoryConfig.CONFIG_ENABLE_SERVICE_AS_BOUND_JOIN, null))) .hasValueSatisfying(v -> assertThat(v.booleanValue()).isTrue()); + assertThat( + Models.objectLiteral( + export.getStatements(configNode, FedXRepositoryConfig.CONFIG_ENABLE_OPTIONAL_AS_BIND_JOIN, + null))) + .hasValueSatisfying(v -> assertThat(v.booleanValue()).isTrue()); assertThat( Models.objectLiteral( export.getStatements(configNode, FedXRepositoryConfig.CONFIG_ENABLE_MONITORING, null))) diff --git a/tools/federation/src/test/resources/tests/rdf4jserver/config-withFedXConfig.ttl b/tools/federation/src/test/resources/tests/rdf4jserver/config-withFedXConfig.ttl index 730ffa1ce2c..6b48d4522b8 100644 --- a/tools/federation/src/test/resources/tests/rdf4jserver/config-withFedXConfig.ttl +++ b/tools/federation/src/test/resources/tests/rdf4jserver/config-withFedXConfig.ttl @@ -16,6 +16,7 @@ fedx:boundJoinBlockSize 104 ; fedx:enforceMaxQueryTime 105 ; fedx:enableServiceAsBoundJoin false ; + fedx:enableOptionalAsBindJoin false ; fedx:enableMonitoring true ; fedx:logQueryPlan true ; fedx:logQueries true ;