Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge main into develop #5191

Merged
merged 18 commits into from
Nov 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
6bd8fff
GH-4920 SPARQLConnection.size() method should not fetch every stateme…
JervenBolleman May 5, 2024
9650d55
GH-4920 When sending the remote size query make sure we don't send null,
JervenBolleman May 11, 2024
a01fab9
GH-4920 Make the logic that distinguises between counting from the re…
JervenBolleman May 31, 2024
475d09e
GH-5148 Introduce "soft fail" for corrupt ValueStore
ebner Oct 10, 2024
029df52
GH-5148 Fixed typo
ebner Oct 10, 2024
34807e1
GH-5148 Introduce "soft fail" for corrupt ValueStore (#5150)
hmottestad Oct 23, 2024
cad4af9
GH-5148 fixes based on review
hmottestad Oct 23, 2024
196cf9d
GH-5148 add tests and extend corruption handling to more parts of the…
hmottestad Oct 23, 2024
0c58aac
GH-5148 corrupt data can be written as NQuads
hmottestad Oct 23, 2024
92f4fe4
GH-5148 add support for more files in the test cases and add a fix fo…
hmottestad Oct 24, 2024
b6215bb
GH-5148 improved soft fail on corruption for values.id and values.has…
hmottestad Oct 24, 2024
0076003
GH-5148 improved handling of corrupt spoc/posc/... indexes
hmottestad Oct 24, 2024
2e075d5
GH-5148 cleanup naming and docs
hmottestad Oct 24, 2024
590e658
GH-5148 better detection of non-empty b-tree
hmottestad Oct 25, 2024
517353e
GH-4920 SPARQLConnection.size() now uses count query (#4972)
hmottestad Nov 10, 2024
8fbb4ee
GH-5148 improved error message
hmottestad Nov 10, 2024
c47fe2b
improve javadocs and make some tests more robust
hmottestad Nov 10, 2024
136be9f
GH-5148 Introduce "soft fail" for corrupt ValueStore (#5157)
hmottestad Nov 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,4 @@ org.eclipse.dash.licenses-1.0.2.jar
e2e/node_modules
e2e/playwright-report
e2e/test-results
.aider*
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import java.io.InputStream;
import java.io.Reader;
import java.net.URL;
import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Collectors;

import org.apache.http.client.HttpClient;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
Expand All @@ -39,6 +41,8 @@
import org.eclipse.rdf4j.model.impl.DynamicModelFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.model.util.Literals;
import org.eclipse.rdf4j.model.vocabulary.RDF4J;
import org.eclipse.rdf4j.model.vocabulary.SESAME;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.BooleanQuery;
import org.eclipse.rdf4j.query.GraphQuery;
Expand Down Expand Up @@ -79,6 +83,8 @@
*/
public class SPARQLConnection extends AbstractRepositoryConnection implements HttpClientDependent {

private static final String COUNT_EVERYTHING = "SELECT (COUNT(*) AS ?count) WHERE { ?s ?p ?o }";

private static final String EVERYTHING = "CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }";

private static final String EVERYTHING_WITH_GRAPH = "SELECT * WHERE { ?s ?p ?o . OPTIONAL { GRAPH ?ctx { ?s ?p ?o } } }";
Expand Down Expand Up @@ -281,16 +287,61 @@ public boolean isEmpty() throws RepositoryException {

@Override
public long size(Resource... contexts) throws RepositoryException {
try (RepositoryResult<Statement> stmts = getStatements(null, null, null, true, contexts)) {
long i = 0;
while (stmts.hasNext()) {
stmts.next();
i++;
String query = sizeAsTupleQuery(contexts);
TupleQuery tq = prepareTupleQuery(SPARQL, query);
try (TupleQueryResult res = tq.evaluate()) {
if (res.hasNext()) {

Value value = res.next().getBinding("count").getValue();
if (value instanceof Literal) {
return ((Literal) value).longValue();
} else {
return 0;
}
}
} catch (QueryEvaluationException e) {
throw new RepositoryException(e);
}
return 0;
}

String sizeAsTupleQuery(Resource... contexts) {

// in case the context is null we want the
// default graph of the remote store i.e. ask without graph/from.
if (contexts != null && isQuadMode() && contexts.length > 0) {
// this is an optimization for the case that we can use a GRAPH instead of a FROM.
if (contexts.length == 1 && isExposableGraphIri(contexts[0])) {
return "SELECT (COUNT(*) AS ?count) WHERE { GRAPH <" + contexts[0].stringValue()
+ "> { ?s ?p ?o}}";
} else {
// If we had an default graph setting that is sesame/rdf4j specific
// we must drop it before sending it over the wire. Otherwise
// gather up the given contexts and send them as a from clauses
// to make the matching dataset.
String graphs = Arrays.stream(contexts)
.filter(SPARQLConnection::isExposableGraphIri)
.map(Resource::stringValue)
.map(s -> "FROM <" + s + ">")
.collect(Collectors.joining(" "));
return "SELECT (COUNT(*) AS ?count) " + graphs + "WHERE { ?s ?p ?o}";
}
return i;
} else {
return COUNT_EVERYTHING;
}
}

/**
* For the sparql protocol a context must be an IRI However we can't send out the RDF4j internal default graph IRIs
*
* @param resource to test if it can be the IRI for a named graph
* @return true if it the input can be a foreign named graph.
*/
private static boolean isExposableGraphIri(Resource resource) {
// We use the instanceof test to avoid any issue with a null pointer.
return resource instanceof IRI && !RDF4J.NIL.equals(resource) && !SESAME.NIL.equals(resource);
}

@Override
public RepositoryResult<Statement> getStatements(Resource subj, IRI pred, Value obj, boolean includeInferred,
Resource... contexts) throws RepositoryException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,41 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.rdf4j.model.util.Values.iri;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.lang.ref.WeakReference;

import org.eclipse.rdf4j.http.client.SPARQLProtocolSession;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.model.vocabulary.FOAF;
import org.eclipse.rdf4j.model.vocabulary.RDF;
import org.eclipse.rdf4j.model.vocabulary.RDF4J;
import org.eclipse.rdf4j.model.vocabulary.RDFS;
import org.eclipse.rdf4j.query.impl.MapBindingSet;
import org.eclipse.rdf4j.query.impl.SimpleBinding;
import org.eclipse.rdf4j.query.impl.TupleQueryResultBuilder;
import org.eclipse.rdf4j.query.parser.ParsedQuery;
import org.eclipse.rdf4j.query.parser.sparql.SPARQLParser;
import org.eclipse.rdf4j.query.parser.sparql.SPARQLParserFactory;
import org.eclipse.rdf4j.rio.ParserConfig;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;

public class SPARQLConnectionTest {

Expand Down Expand Up @@ -100,6 +116,36 @@ public void testAddSingleContextHandling() throws Exception {
assertThat(sparqlUpdate).containsPattern(expectedAddPattern).containsPattern(expectedRemovePattern);
}

@Test
public void testSizeQuery() throws Exception {

String sizeAsTupleQuery = subject.sizeAsTupleQuery();
ParsedQuery query = new SPARQLParserFactory().getParser().parseQuery(sizeAsTupleQuery, "http://example.org/");
assertNotNull(query);

sizeAsTupleQuery = subject.sizeAsTupleQuery(vf.createIRI("urn:g1"));
query = new SPARQLParserFactory().getParser().parseQuery(sizeAsTupleQuery, "http://example.org/");
assertNotNull(query);

sizeAsTupleQuery = subject.sizeAsTupleQuery(vf.createIRI("urn:g1"), vf.createIRI("urn:g2"));
query = new SPARQLParserFactory().getParser().parseQuery(sizeAsTupleQuery, "http://example.org/");
assertNotNull(query);

sizeAsTupleQuery = subject.sizeAsTupleQuery(vf.createIRI("urn:g1"), vf.createBNode());
query = new SPARQLParserFactory().getParser().parseQuery(sizeAsTupleQuery, "http://example.org/");
assertNotNull(query);

sizeAsTupleQuery = subject.sizeAsTupleQuery(RDF4J.NIL);
query = new SPARQLParserFactory().getParser().parseQuery(sizeAsTupleQuery, "http://example.org/");
assertNotNull(query);
assertFalse(sizeAsTupleQuery.contains("nil"));

sizeAsTupleQuery = subject.sizeAsTupleQuery(null);
query = new SPARQLParserFactory().getParser().parseQuery(sizeAsTupleQuery, "http://example.org/");

assertNotNull(query);
}

@Test
public void testAddMultipleContextHandling() throws Exception {
ArgumentCaptor<String> sparqlUpdateCaptor = ArgumentCaptor.forClass(String.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
*******************************************************************************/
package org.eclipse.rdf4j.sail.nativerdf;

import static org.eclipse.rdf4j.sail.nativerdf.NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES;

import java.io.IOException;

import org.eclipse.rdf4j.common.io.ByteArrayUtil;
Expand All @@ -20,13 +22,20 @@
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.nativerdf.btree.RecordIterator;
import org.eclipse.rdf4j.sail.nativerdf.model.CorruptIRI;
import org.eclipse.rdf4j.sail.nativerdf.model.CorruptIRIOrBNode;
import org.eclipse.rdf4j.sail.nativerdf.model.CorruptUnknownValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A statement iterator that wraps a RecordIterator containing statement records and translates these records to
* {@link Statement} objects.
*/
class NativeStatementIterator extends LookAheadIteration<Statement> {

private static final Logger logger = LoggerFactory.getLogger(NativeStatementIterator.class);

/*-----------*
* Variables *
*-----------*/
Expand Down Expand Up @@ -54,25 +63,42 @@ public NativeStatementIterator(RecordIterator btreeIter, ValueStore valueStore)
@Override
public Statement getNextElement() throws SailException {
try {
byte[] nextValue = btreeIter.next();
byte[] nextValue;
try {
nextValue = btreeIter.next();
} catch (AssertionError | Exception e) {
logger.error("Error while reading next value from btree iterator for {}", btreeIter.toString(), e);
throw e;
}

if (nextValue == null) {
return null;
}

int subjID = ByteArrayUtil.getInt(nextValue, TripleStore.SUBJ_IDX);
Resource subj = (Resource) valueStore.getValue(subjID);
Resource subj = valueStore.getResource(subjID);

int predID = ByteArrayUtil.getInt(nextValue, TripleStore.PRED_IDX);
IRI pred = (IRI) valueStore.getValue(predID);
IRI pred = valueStore.getIRI(predID);

int objID = ByteArrayUtil.getInt(nextValue, TripleStore.OBJ_IDX);
Value obj = valueStore.getValue(objID);

Resource context = null;
int contextID = ByteArrayUtil.getInt(nextValue, TripleStore.CONTEXT_IDX);
if (contextID != 0) {
context = (Resource) valueStore.getValue(contextID);
context = valueStore.getResource(contextID);
}
if (SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES) {
if (subj == null) {
subj = new CorruptIRIOrBNode(valueStore.getRevision(), subjID, null);
}
if (pred == null) {
pred = new CorruptIRI(valueStore.getRevision(), predID, null, null);
}
if (obj == null) {
obj = new CorruptUnknownValue(valueStore.getRevision(), objID, null);
}
}

return valueStore.createStatement(subj, pred, obj, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.commons.io.FileUtils;
import org.eclipse.rdf4j.collection.factory.api.CollectionFactory;
import org.eclipse.rdf4j.collection.factory.mapdb.MapDb3CollectionFactory;
import org.eclipse.rdf4j.common.annotation.InternalUseOnly;
import org.eclipse.rdf4j.common.concurrent.locks.Lock;
import org.eclipse.rdf4j.common.concurrent.locks.LockManager;
import org.eclipse.rdf4j.common.io.MavenUtil;
Expand Down Expand Up @@ -62,6 +63,17 @@ public class NativeStore extends AbstractNotifyingSail implements FederatedServi

private static final String VERSION = MavenUtil.loadVersion("org.eclipse.rdf4j", "rdf4j-sail-nativerdf", "devel");

/**
* Do not throw an exception when corrupt data is detected. Instead, try to return as much data as possible.
*
* Variable can be set through the system property
* org.eclipse.rdf4j.sail.nativerdf.softFailOnCorruptDataAndRepairIndexes.
*/
@InternalUseOnly
public static boolean SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES = "true"
.equalsIgnoreCase(
System.getProperty("org.eclipse.rdf4j.sail.nativerdf.softFailOnCorruptDataAndRepairIndexes"));;

private static final Cleaner REMOVE_STORES_USED_FOR_MEMORY_OVERFLOW = Cleaner.create();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,71 @@ private Set<String> parseIndexSpecList(String indexSpecStr) throws SailException
}

private void initIndexes(Set<String> indexSpecs) throws IOException {

HashSet<String> invalidIndexes = new HashSet<>();

for (String fieldSeq : indexSpecs) {
logger.trace("Initializing index '{}'...", fieldSeq);
indexes.add(new TripleIndex(fieldSeq));
try {
indexes.add(new TripleIndex(fieldSeq, false));
} catch (Exception e) {
if (NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES) {
invalidIndexes.add(fieldSeq);
logger.warn("Ignoring index because it failed to initialize index '{}'", fieldSeq, e);
} else {
logger.error(
"Failed to initialize index '{}', consider setting org.eclipse.rdf4j.sail.nativerdf.softFailOnCorruptDataAndRepairIndexes to true.",
fieldSeq, e);
throw e;
}

}

}

if (NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES) {
indexSpecs.removeAll(invalidIndexes);
}

List<TripleIndex> emptyIndexes = new ArrayList<>();
List<TripleIndex> nonEmptyIndexes = new ArrayList<>();

checkIfIndexesAreEmptyOrNot(nonEmptyIndexes, emptyIndexes);

if (!emptyIndexes.isEmpty() && !nonEmptyIndexes.isEmpty()) {
if (NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES) {
indexes.removeAll(emptyIndexes);
} else {
for (TripleIndex index : emptyIndexes) {
throw new IOException("Index '" + new String(index.getFieldSeq())
+ "' is unexpectedly empty while other indexes are not. Consider setting the system property org.eclipse.rdf4j.sail.nativerdf.softFailOnCorruptDataAndRepairIndexes to true. Index file: "
+ index.getBTree().getFile().getAbsolutePath());
}
}
}

}

private void checkIfIndexesAreEmptyOrNot(List<TripleIndex> nonEmptyIndexes, List<TripleIndex> emptyIndexes)
throws IOException {
for (TripleIndex index : indexes) {
try (RecordIterator recordIterator = index.getBTree().iterateAll()) {
try {
byte[] next = recordIterator.next();
if (next != null) {
next = recordIterator.next();
if (next != null) {
nonEmptyIndexes.add(index);
} else {
emptyIndexes.add(index);
}
} else {
emptyIndexes.add(index);
}
} catch (Throwable ignored) {
emptyIndexes.add(index);
}
}
}
}

Expand Down Expand Up @@ -355,7 +417,7 @@ private void reindex(Set<String> currentIndexSpecs, Set<String> newIndexSpecs) t
for (String fieldSeq : addedIndexSpecs) {
logger.debug("Initializing new index '{}'...", fieldSeq);

TripleIndex addedIndex = new TripleIndex(fieldSeq);
TripleIndex addedIndex = new TripleIndex(fieldSeq, true);
BTree addedBTree = null;
RecordIterator sourceIter = null;
try {
Expand Down Expand Up @@ -1122,7 +1184,17 @@ private class TripleIndex {

private final BTree btree;

public TripleIndex(String fieldSeq) throws IOException {
public TripleIndex(String fieldSeq, boolean deleteExistingIndexFile) throws IOException {
if (deleteExistingIndexFile) {
File indexFile = new File(dir, getFilenamePrefix(fieldSeq) + ".dat");
if (indexFile.exists()) {
indexFile.delete();
}
File alloxFile = new File(dir, getFilenamePrefix(fieldSeq) + ".alloc");
if (alloxFile.exists()) {
alloxFile.delete();
}
}
tripleComparator = new TripleComparator(fieldSeq);
btree = new BTree(dir, getFilenamePrefix(fieldSeq), 2048, RECORD_LENGTH, tripleComparator, forceSync);
}
Expand Down
Loading
Loading