Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-5189 cache Value objects retrieved from parent sail #5190

Merged
merged 1 commit into from
Nov 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,9 @@ private String getFullQueryString() {
} else {
extraVariablesString = "";
}
} else
} else {
extraVariablesString = "";
}

if (scope_validationReport == ConstraintComponent.Scope.propertyShape
&& propertyShapeWithValue_validationReport) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ protected void init() {

@Override
public void localClose() {
if (iterator != null)
if (iterator != null) {
iterator.close();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.sail.SailConnection;
import org.eclipse.rdf4j.sail.memory.MemoryStoreConnection;
Expand All @@ -36,6 +35,7 @@ public class FilterByPredicate implements PlanNode {
private final Set<IRI> filterOnPredicates;
final PlanNode parent;
private final On on;
private final ConnectionsGroup connectionsGroup;
private boolean printed = false;
private ValidationExecutionLogger validationExecutionLogger;
private final Resource[] dataGraph;
Expand All @@ -53,6 +53,7 @@ public FilterByPredicate(SailConnection connection, Set<IRI> filterOnPredicates,
assert this.connection != null;
this.filterOnPredicates = filterOnPredicates;
this.on = on;
this.connectionsGroup = connectionsGroup;
}

@Override
Expand All @@ -77,18 +78,11 @@ void calculateNext() {
return;
}

filterOnPredicates = FilterByPredicate.this.filterOnPredicates.stream()
.map(predicate -> {
try (var stream = connection
.getStatements(null, predicate, null, true, dataGraph)
.stream()) {
return stream.map(Statement::getPredicate)
.findAny()
.orElse(null);
}
}
)
.filter(Objects::nonNull)
filterOnPredicates = FilterByPredicate.this.filterOnPredicates
.stream()
.map(iri -> connectionsGroup.getSailSpecificValue(iri,
ConnectionsGroup.StatementPosition.predicate, connection
))
.collect(Collectors.toList());

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.vocabulary.RDF;
import org.eclipse.rdf4j.sail.SailConnection;
import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.memory.MemoryStoreConnection;
Expand All @@ -42,10 +42,12 @@ public class FilterByPredicateObject implements PlanNode {
private final FilterOn filterOn;
private final PlanNode parent;
private final boolean returnMatching;
private final ConnectionsGroup connectionsGroup;
private StackTraceElement[] stackTrace;
private boolean printed = false;
private ValidationExecutionLogger validationExecutionLogger;
private final Resource[] dataGraph;
boolean typeFilterWithInference;

private final Cache<Resource, Boolean> cache;

Expand All @@ -68,6 +70,11 @@ public FilterByPredicateObject(SailConnection connection, Resource[] dataGraph,
cache = CacheBuilder.newBuilder().maximumSize(10000).build();
}

this.connectionsGroup = connectionsGroup;
if (includeInferred && connectionsGroup.getRdfsSubClassOfReasoner() != null
&& RDF.TYPE.equals(filterOnPredicate)) {
typeFilterWithInference = true;
}
// this.stackTrace = Thread.currentThread().getStackTrace();
}

Expand Down Expand Up @@ -148,31 +155,32 @@ void calculateNext() {

private void internResources() {
if (filterOnObject == null) {

try (var stream = connection
.getStatements(null, FilterByPredicateObject.this.filterOnPredicate, null, includeInferred,
dataGraph)
.stream()) {
filterOnPredicate = stream.map(Statement::getPredicate).findAny().orElse(null);
}

filterOnPredicate = connectionsGroup.getSailSpecificValue(
FilterByPredicateObject.this.filterOnPredicate,
ConnectionsGroup.StatementPosition.predicate, connection
);
if (filterOnPredicate == null) {
filterOnObject = new Resource[0];
} else {
filterOnObject = FilterByPredicateObject.this.filterOnObject.stream()
.map(object -> {
try (var stream = connection
.getStatements(null, filterOnPredicate, object, includeInferred, dataGraph)
.stream()) {
return stream.map(Statement::getObject)
.map(o -> ((Resource) o))
.findAny()
.orElse(null);
}
}
)
.filter(Objects::nonNull)
.toArray(Resource[]::new);
if (typeFilterWithInference) {
filterOnObject = FilterByPredicateObject.this.filterOnObject.stream()
.flatMap(type -> connectionsGroup.getRdfsSubClassOfReasoner()
.backwardsChain(type)
.stream())
.distinct()
.map(object -> connectionsGroup.getSailSpecificValue(object,
ConnectionsGroup.StatementPosition.object, connection
))
.filter(Objects::nonNull)
.toArray(Resource[]::new);
} else {
filterOnObject = FilterByPredicateObject.this.filterOnObject.stream()
.map(object -> connectionsGroup.getSailSpecificValue(object,
ConnectionsGroup.StatementPosition.object, connection
))
.filter(Objects::nonNull)
.toArray(Resource[]::new);
}
}

}
Expand Down Expand Up @@ -237,8 +245,8 @@ private Boolean matchesCached(Resource subject, IRI filterOnPredicate, Resource[

private boolean matchesUnCached(Resource subject, IRI filterOnPredicate, Resource[] filterOnObject) {
for (Resource object : filterOnObject) {
if (connection.hasStatement(subject, filterOnPredicate, object, includeInferred,
dataGraph)) {
if (connection.hasStatement(subject, filterOnPredicate, object,
includeInferred && !typeFilterWithInference, dataGraph)) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,13 @@ void calculateNext() {
@Override
public void localClose() {
try {
if (leftIterator != null)
if (leftIterator != null) {
leftIterator.close();
}
} finally {
if (rightIterator != null)
if (rightIterator != null) {
rightIterator.close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,9 @@ private void calculateNext() {

@Override
public void localClose() {
if (parentIterator != null)
if (parentIterator != null) {
parentIterator.close();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,18 @@

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;

import org.eclipse.rdf4j.common.annotation.InternalUseOnly;
import org.eclipse.rdf4j.common.transaction.IsolationLevels;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.util.Values;
import org.eclipse.rdf4j.sail.Sail;
import org.eclipse.rdf4j.sail.SailConnection;
import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.shacl.ShaclSailConnection;
import org.eclipse.rdf4j.sail.shacl.Stats;
import org.eclipse.rdf4j.sail.shacl.ast.planNodes.BufferedSplitter;
Expand All @@ -27,6 +34,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

/**
* @apiNote since 3.0. This feature is for internal use only: its existence, signature or behavior may change without
* warning from one release to the next.
Expand All @@ -52,6 +62,11 @@ public class ConnectionsGroup implements AutoCloseable {
// used to cache Select plan nodes so that we don't query a store for the same data during the same validation step.
private final Map<PlanNode, BufferedSplitter> nodeCache = new ConcurrentHashMap<>();

private final Cache<Value, Value> INTERNED_VALUE_CACHE = CacheBuilder.newBuilder()
.concurrencyLevel(Runtime.getRuntime().availableProcessors() * 2)
.maximumSize(10000)
.build();

public ConnectionsGroup(SailConnection baseConnection,
SailConnection previousStateConnection, Sail addedStatements, Sail removedStatements,
Stats stats, RdfsSubClassOfReasonerProvider rdfsSubClassOfReasonerProvider,
Expand Down Expand Up @@ -95,6 +110,67 @@ public SailConnection getRemovedStatements() {
return removedStatements;
}

public enum StatementPosition {
subject,
predicate,
object
}

/**
* This method is a performance optimization for converting a more general value object, like RDF.TYPE, to the
* specific Value object that the underlying sail would use for that node. It uses a cache to avoid querying the
* store for the same value multiple times during the same validation.
*
* @param value the value object to be converted
* @param statementPosition the position of the statement (subject, predicate, or object)
* @param connection the SailConnection used to retrieve the specific Value object
* @param <T> the type of the value
* @return the specific Value object used by the underlying sail, or the original value if no specific Value is
* found
* @throws SailException if an error occurs while retrieving the specific Value object
*/
public <T extends Value> T getSailSpecificValue(T value, StatementPosition statementPosition,
SailConnection connection) {
try {

Value t = INTERNED_VALUE_CACHE.get(value, () -> {

switch (statementPosition) {
case subject:
try (var statements = connection.getStatements(((Resource) value), null, null, false).stream()) {
Resource ret = statements.map(Statement::getSubject).findAny().orElse(null);
if (ret == null) {
return value;
}
return ret;
}
case predicate:
try (var statements = connection.getStatements(null, ((IRI) value), null, false).stream()) {
IRI ret = statements.map(Statement::getPredicate).findAny().orElse(null);
if (ret == null) {
return value;
}
return ret;
}
case object:
try (var statements = connection.getStatements(null, null, value, false).stream()) {
Value ret = statements.map(Statement::getObject).findAny().orElse(null);
if (ret == null) {
return value;
}
return ret;
}
}

throw new IllegalStateException("Unknown statement position: " + statementPosition);

});
return ((T) t);
} catch (ExecutionException e) {
throw new SailException(e);
}
}

@Override
public void close() {
if (addedStatements != null) {
Expand Down Expand Up @@ -143,9 +219,15 @@ public PlanNode getCachedNodeFor(PlanNode planNode) {

}

/**
* Returns the RdfsSubClassOfReasoner if it is enabled. If it is not enabled this method will return null.
*
* @return RdfsSubClassOfReasoner or null
*/
public RdfsSubClassOfReasoner getRdfsSubClassOfReasoner() {
if (rdfsSubClassOfReasonerProvider == null)
if (rdfsSubClassOfReasonerProvider == null) {
return null;
}
return rdfsSubClassOfReasonerProvider.getRdfsSubClassOfReasoner();
}

Expand Down
Loading