From e9675d5f419cfd360711b80f243d870b5a39b616 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ha=CC=8Avard=20Ottestad?= Date: Sat, 18 May 2024 22:49:39 +0200 Subject: [PATCH] GH-2998 improve memory overflow performance of the NativeStore --- .../sail/nativerdf/MemoryOverflowModel.java | 153 +++++++++++---- .../rdf4j/sail/nativerdf/NativeSailStore.java | 42 +++++ .../rdf4j/sail/nativerdf/NativeStore.java | 1 + .../rdf4j/sail/nativerdf/SailSourceModel.java | 176 +++++++++++++----- .../rdf4j/sail/nativerdf/TripleStore.java | 4 + .../rdf4j/sail/nativerdf/TxnStatusFile.java | 17 ++ .../nativerdf/btree/ConcurrentNodeCache.java | 4 +- 7 files changed, 304 insertions(+), 93 deletions(-) diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/MemoryOverflowModel.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/MemoryOverflowModel.java index 001e25de584..e651d77b07c 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/MemoryOverflowModel.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/MemoryOverflowModel.java @@ -16,6 +16,7 @@ import java.io.ObjectOutputStream; import java.nio.file.Files; import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; import java.util.Optional; import java.util.Set; @@ -48,11 +49,14 @@ abstract class MemoryOverflowModel extends AbstractModel { private static final Runtime RUNTIME = Runtime.getRuntime(); - private static final int LARGE_BLOCK = 10000; + private static final int LARGE_BLOCK = 1024 * 5; + + private static volatile boolean overflow; // To reduce the chance of OOM we will always overflow once we get close to running out of memory even if we think - // we have space for one more block. The limit is currently set at 32 MB - private static final int MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING = 32 * 1024 * 1024; + // we have space for one more block. The limit is currently set at 32 MB for small heaps and 128 MB for large heaps. + private static final int MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING = RUNTIME.maxMemory() >= 1024 ? 128 * 1024 * 1024 + : 32 * 1024 * 1024; final Logger logger = LoggerFactory.getLogger(MemoryOverflowModel.class); @@ -71,7 +75,7 @@ abstract class MemoryOverflowModel extends AbstractModel { SimpleValueFactory vf = SimpleValueFactory.getInstance(); public MemoryOverflowModel() { - memory = new LinkedHashModel(LARGE_BLOCK); + memory = new LinkedHashModel(LARGE_BLOCK * 2); } public MemoryOverflowModel(Model model) { @@ -138,6 +142,33 @@ public boolean add(Statement st) { return getDelegate().add(st); } + @Override + public boolean addAll(Collection c) { + checkMemoryOverflow(); + if (disk != null || c.size() <= 1024) { + return getDelegate().addAll(c); + } else { + boolean ret = false; + HashSet buffer = new HashSet<>(); + for (Statement st : c) { + buffer.add(st); + if (buffer.size() >= 1024) { + ret |= getDelegate().addAll(buffer); + buffer.clear(); + innerCheckMemoryOverflow(); + } + } + if (!buffer.isEmpty()) { + ret |= getDelegate().addAll(buffer); + buffer.clear(); + } + + return ret; + + } + + } + @Override public boolean remove(Resource subj, IRI pred, Value obj, Resource... contexts) { return getDelegate().remove(subj, pred, obj, contexts); @@ -195,13 +226,23 @@ public synchronized void removeTermIteration(Iterator iter, Resource protected abstract SailStore createSailStore(File dataDir) throws IOException, SailException; private Model getDelegate() { - LinkedHashModel memory = this.memory; + var memory = this.memory; if (memory != null) { return memory; } else { - synchronized (this) { + var disk = this.disk; + if (disk != null) { return disk; } + synchronized (this) { + if (this.memory != null) { + return this.memory; + } + if (this.disk != null) { + return this.disk; + } + throw new IllegalStateException("MemoryOverflowModel is in an inconsistent state"); + } } } @@ -232,45 +273,76 @@ private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundEx } } - private synchronized void checkMemoryOverflow() { - if (disk == null) { - int size = size(); - if (size >= LARGE_BLOCK && size % LARGE_BLOCK == 0) { - // maximum heap size the JVM can allocate - long maxMemory = RUNTIME.maxMemory(); - - // total currently allocated JVM memory - long totalMemory = RUNTIME.totalMemory(); - - // amount of memory free in the currently allocated JVM memory - long freeMemory = RUNTIME.freeMemory(); - - // estimated memory used - long used = totalMemory - freeMemory; - - // amount of memory the JVM can still allocate from the OS (upper boundary is the max heap) - long freeToAllocateMemory = maxMemory - used; - - if (baseline > 0) { - long blockSize = used - baseline; - if (blockSize > maxBlockSize) { - maxBlockSize = blockSize; - } - - // Sync if either the estimated size of the next block is larger than remaining memory, or - // if less than 15% of the heap is still free (this last condition to avoid GC overhead limit) - if (freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING || - freeToAllocateMemory < Math.min(0.15 * maxMemory, maxBlockSize)) { - logger.debug("syncing at {} triples. max block size: {}", size, maxBlockSize); - overflowToDisk(); - } + private void checkMemoryOverflow() { + if (disk == getDelegate()) { + return; + } + + if (overflow) { + innerCheckMemoryOverflow(); + } + int size = size() + 1; + if (size >= LARGE_BLOCK && size % LARGE_BLOCK == 0) { + innerCheckMemoryOverflow(); + } + + } + + private void innerCheckMemoryOverflow() { + if (disk == getDelegate()) { + return; + } + + // maximum heap size the JVM can allocate + long maxMemory = RUNTIME.maxMemory(); + + // total currently allocated JVM memory + long totalMemory = RUNTIME.totalMemory(); + + // amount of memory free in the currently allocated JVM memory + long freeMemory = RUNTIME.freeMemory(); + + // estimated memory used + long used = totalMemory - freeMemory; + + // amount of memory the JVM can still allocate from the OS (upper boundary is the max heap) + long freeToAllocateMemory = maxMemory - used; + + if (baseline > 0) { + long blockSize = used - baseline; + if (blockSize > maxBlockSize) { + maxBlockSize = blockSize; + } + if (overflow && freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING * 2) { + // stricter memory requirements to not overflow if other models are overflowing + logger.debug("syncing at {} triples. max block size: {}", size(), maxBlockSize); + overflowToDisk(); + System.gc(); + } else if (freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING || + freeToAllocateMemory < Math.min(0.15 * maxMemory, maxBlockSize)) { + // Sync if either the estimated size of the next block is larger than remaining memory, or + // if less than 15% of the heap is still free (this last condition to avoid GC overhead limit) + + logger.debug("syncing at {} triples. max block size: {}", size(), maxBlockSize); + overflowToDisk(); + System.gc(); + } else { + if (overflow) { + overflow = false; } - baseline = used; } } + baseline = used; } private synchronized void overflowToDisk() { + overflow = true; + + if (memory == null) { + assert disk != null; + return; + } + try { LinkedHashModel memory = this.memory; this.memory = null; @@ -279,8 +351,7 @@ private synchronized void overflowToDisk() { dataDir = Files.createTempDirectory("model").toFile(); logger.debug("memory overflow using temp directory {}", dataDir); store = createSailStore(dataDir); - disk = new SailSourceModel(store); - disk.addAll(memory); + this.disk = new SailSourceModel(store, memory); logger.debug("overflow synced to disk"); } catch (IOException | SailException e) { String path = dataDir != null ? dataDir.getAbsolutePath() : "(unknown)"; diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeSailStore.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeSailStore.java index da790e890ac..c074d074126 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeSailStore.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeSailStore.java @@ -316,6 +316,10 @@ CloseableIteration createStatementIterator(Resource subj, I return tripleStore.cardinality(subjID, predID, objID, contextID); } + public void disableTxnStatus() { + this.tripleStore.disableTxnStatus(); + } + private final class NativeSailSource extends BackingSailSource { private final boolean explicit; @@ -444,6 +448,44 @@ public void approve(Resource subj, IRI pred, Value obj, Resource ctx) throws Sai addStatement(subj, pred, obj, explicit, ctx); } + @Override + public void approveAll(Set approved, Set approvedContexts) { + sinkStoreAccessLock.lock(); + startTriplestoreTransaction(); + + try { + for (Statement statement : approved) { + Resource subj = statement.getSubject(); + IRI pred = statement.getPredicate(); + Value obj = statement.getObject(); + Resource context = statement.getContext(); + + int subjID = valueStore.storeValue(subj); + int predID = valueStore.storeValue(pred); + int objID = valueStore.storeValue(obj); + + int contextID = 0; + if (context != null) { + contextID = valueStore.storeValue(context); + } + + boolean wasNew = tripleStore.storeTriple(subjID, predID, objID, contextID, explicit); + if (wasNew && context != null) { + contextStore.increment(context); + } + + } + } catch (IOException e) { + throw new SailException(e); + } catch (RuntimeException e) { + logger.error("Encountered an unexpected problem while trying to add a statement", e); + throw e; + } finally { + sinkStoreAccessLock.unlock(); + } + + } + @Override public void deprecate(Statement statement) throws SailException { removeStatements(statement.getSubject(), statement.getPredicate(), statement.getObject(), explicit, diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeStore.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeStore.java index 8ebcd319912..c82bca9d4d9 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeStore.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeStore.java @@ -81,6 +81,7 @@ private static final class OverFlowStoreCleaner implements Runnable { private OverFlowStoreCleaner(NativeSailStore nativeSailStore, File dataDir) { this.nativeSailStore = nativeSailStore; + nativeSailStore.disableTxnStatus(); this.dataDir = dataDir; } diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModel.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModel.java index 1e73b9cdffc..55593456182 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModel.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModel.java @@ -11,6 +11,8 @@ package org.eclipse.rdf4j.sail.nativerdf; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; @@ -47,52 +49,6 @@ class SailSourceModel extends AbstractModel { private static final Logger logger = LoggerFactory.getLogger(SailSourceModel.class); - private final class StatementIterator implements Iterator { - - final CloseableIteration stmts; - - Statement last; - - StatementIterator(CloseableIteration closeableIteration) { - this.stmts = closeableIteration; - } - - @Override - public boolean hasNext() { - try { - if (stmts.hasNext()) { - return true; - } - stmts.close(); - return false; - } catch (SailException e) { - throw new ModelException(e); - } - } - - @Override - public Statement next() { - try { - last = stmts.next(); - if (last == null) { - stmts.close(); - } - return last; - } catch (SailException e) { - throw new ModelException(e); - } - } - - @Override - public void remove() { - if (last == null) { - throw new IllegalStateException("next() not yet called"); - } - SailSourceModel.this.remove(last); - last = null; - } - } - final SailSource source; SailDataset dataset; @@ -111,6 +67,13 @@ public SailSourceModel(SailSource source) { this.source = source; } + public SailSourceModel(SailStore store, Model bulk) { + this(store); + sink().approveAll(bulk, bulk.contexts()); + size = bulk.size(); + sink.flush(); + } + @Override public void closeIterator(Iterator iter) { super.closeIterator(iter); @@ -265,6 +228,75 @@ public synchronized boolean add(Resource subj, IRI pred, Value obj, Resource... } } + @Override + public boolean add(Statement st) { + Resource subj = st.getSubject(); + IRI pred = st.getPredicate(); + Value obj = st.getObject(); + Resource ctx = st.getContext(); + try { + if (contains(subj, pred, obj, ctx)) { + logger.trace("already contains statement {} {} {} {}", subj, pred, obj, ctx); + return false; + } + if (size >= 0) { + size++; + } + + sink().approve(subj, pred, obj, ctx); + + return true; + } catch (SailException e) { + throw new ModelException(e); + } + } + + @Override + public boolean addAll(Collection statements) { + if (statements.isEmpty()) { + return false; + } + + if (statements.size() == 1) { + Statement st = statements.iterator().next(); + return add(st.getSubject(), st.getPredicate(), st.getObject(), st.getContext()); + } + + boolean added = false; + + HashSet tempSet = new HashSet<>(); + HashSet contexts = new HashSet<>(); + SailSink sink = sink(); + + for (Statement statement : statements) { + if (tempSet.size() >= 1024) { + sink.approveAll(tempSet, contexts); + if (size >= 0) { + size += tempSet.size(); + } + tempSet.clear(); + contexts.clear(); + added = true; + } + if (!contains(statement.getSubject(), statement.getPredicate(), statement.getObject(), + statement.getContext())) { + contexts.add(statement.getContext()); + tempSet.add(statement); + } + } + + if (!tempSet.isEmpty()) { + sink.approveAll(tempSet, contexts); + if (size >= 0) { + size += tempSet.size(); + } + added = true; + } + + return added; + + } + @Override public synchronized boolean clear(Resource... contexts) { try { @@ -411,12 +443,8 @@ private boolean contains(SailDataset dataset, Resource subj, IRI pred, Value obj if (dataset == null) { return false; } - CloseableIteration stmts; - stmts = dataset.getStatements(subj, pred, obj, contexts); - try { + try (CloseableIteration stmts = dataset.getStatements(subj, pred, obj, contexts)) { return stmts.hasNext(); - } finally { - stmts.close(); } } @@ -466,4 +494,50 @@ Resource[] cast(Value[] contexts) { return result; } + private final class StatementIterator implements Iterator { + + final CloseableIteration stmts; + + Statement last; + + StatementIterator(CloseableIteration closeableIteration) { + this.stmts = closeableIteration; + } + + @Override + public boolean hasNext() { + try { + if (stmts.hasNext()) { + return true; + } + stmts.close(); + return false; + } catch (SailException e) { + throw new ModelException(e); + } + } + + @Override + public Statement next() { + try { + last = stmts.next(); + if (last == null) { + stmts.close(); + } + return last; + } catch (SailException e) { + throw new ModelException(e); + } + } + + @Override + public void remove() { + if (last == null) { + throw new IllegalStateException("next() not yet called"); + } + SailSourceModel.this.remove(last); + last = null; + } + } + } diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/TripleStore.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/TripleStore.java index 96ec387ff2e..c4bc52cd318 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/TripleStore.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/TripleStore.java @@ -506,6 +506,10 @@ public RecordIterator getTriples(int subj, int pred, int obj, int context, boole return btreeIter; } + public void disableTxnStatus() { + txnStatusFile.disable(); + } + /*-------------------------------------* * Inner class ExplicitStatementFilter * *-------------------------------------*/ diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/TxnStatusFile.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/TxnStatusFile.java index 469cb4892e3..fe8013124ac 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/TxnStatusFile.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/TxnStatusFile.java @@ -22,6 +22,12 @@ */ class TxnStatusFile { + boolean disabled = false; + + public void disable() { + this.disabled = true; + } + public enum TxnStatus { /** @@ -100,6 +106,9 @@ public void close() throws IOException { * @throws IOException If the transaction status could not be written to file. */ public void setTxnStatus(TxnStatus txnStatus) throws IOException { + if (disabled) { + return; + } if (txnStatus == TxnStatus.NONE) { nioFile.truncate(0); } else { @@ -115,6 +124,10 @@ public void setTxnStatus(TxnStatus txnStatus) throws IOException { * @throws IOException If the transaction status file could not be read. */ public TxnStatus getTxnStatus() throws IOException { + if (disabled) { + return TxnStatus.NONE; + } + byte[] bytes = nioFile.readBytes(0, 1); TxnStatus status; @@ -147,6 +160,10 @@ public TxnStatus getTxnStatus() throws IOException { } private TxnStatus getTxnStatusDeprecated() throws IOException { + if (disabled) { + return TxnStatus.NONE; + } + byte[] bytes = nioFile.readBytes(0, (int) nioFile.size()); String s = new String(bytes, US_ASCII); diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/btree/ConcurrentNodeCache.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/btree/ConcurrentNodeCache.java index e758bbb888a..bb0f6693a5a 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/btree/ConcurrentNodeCache.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/btree/ConcurrentNodeCache.java @@ -19,6 +19,8 @@ class ConcurrentNodeCache extends ConcurrentCache { + private final static int CONCURRENCY = Runtime.getRuntime().availableProcessors(); + private final Function reader; private static final Consumer writeNode = node -> { @@ -38,7 +40,7 @@ public ConcurrentNodeCache(Function reader) { } public void flush() { - cache.forEachValue(Long.MAX_VALUE, writeNode); + cache.forEachValue(CONCURRENCY, writeNode); } public void put(Node node) {