Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to Jctools NBHML and FieldUpdater #110

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions jvector-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,12 @@
</parent>
<artifactId>jvector-base</artifactId>
<name>Base</name>

<dependencies>
<dependency>
<groupId>org.jctools</groupId>
<artifactId>jctools-core</artifactId>
<version>4.0.1</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@

import io.github.jbellis.jvector.util.Accountable;
import io.github.jbellis.jvector.util.RamUsageEstimator;
import org.jctools.maps.NonBlockingHashMap;
import org.jctools.maps.NonBlockingHashMapLong;

import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;

Expand All @@ -39,10 +43,12 @@
* and `nextNeighbor` operations.
*/
public final class OnHeapGraphIndex<T> implements GraphIndex<T>, Accountable {
private static final AtomicLongFieldUpdater<OnHeapGraphIndex> entryPointUpdater = AtomicLongFieldUpdater.newUpdater(OnHeapGraphIndex.class, "entryPointVal");

// the current graph entry node on the top level. -1 if not set
private final AtomicReference<Integer> entryPoint;
private volatile long entryPointVal;

private final ConcurrentHashMap<Integer, ConcurrentNeighborSet> nodes;
private final NonBlockingHashMapLong<ConcurrentNeighborSet> nodes;

// max neighbors/edges per node
final int nsize0;
Expand All @@ -51,11 +57,10 @@ public final class OnHeapGraphIndex<T> implements GraphIndex<T>, Accountable {
OnHeapGraphIndex(
int M, BiFunction<Integer, Integer, ConcurrentNeighborSet> neighborFactory) {
this.neighborFactory = neighborFactory;
this.entryPoint =
new AtomicReference<>(-1); // Entry node should be negative until a node is added
this.entryPointVal = -1; // Entry node should be negative until a node is added
this.nsize0 = 2 * M;

this.nodes = new ConcurrentHashMap<>();
this.nodes = new NonBlockingHashMapLong<>(1024);
}

/**
Expand Down Expand Up @@ -91,7 +96,7 @@ public void addNode(int node) {

/** must be called after addNode once neighbors are linked in all levels. */
void markComplete(int node) {
entryPoint.accumulateAndGet(
entryPointUpdater.accumulateAndGet(this,
tjake marked this conversation as resolved.
Show resolved Hide resolved
node,
(oldEntry, newEntry) -> {
if (oldEntry >= 0) {
Expand All @@ -103,7 +108,7 @@ void markComplete(int node) {
}

public void updateEntryNode(int node) {
entryPoint.set(node);
entryPointUpdater.set(this, node);
}

@Override
Expand All @@ -112,19 +117,19 @@ public int maxDegree() {
}

int entry() {
return entryPoint.get();
return (int) entryPointUpdater.get(this);
}

@Override
public NodesIterator getNodes() {
// We avoid the temptation to optimize this by using ArrayNodesIterator.
// This is because, while the graph will contain sequential ordinals once the graph is complete,
// we should not assume that that is the only time it will be called.
var keysInts = nodes.keySet().stream().mapToInt(Integer::intValue).iterator();
var keysInts = Arrays.stream(nodes.keySetLong()).iterator();
return new NodesIterator(nodes.size()) {
@Override
public int nextInt() {
return keysInts.nextInt();
return keysInts.next().intValue();
}

@Override
Expand Down Expand Up @@ -210,7 +215,7 @@ private static long concurrentHashMapRamUsed(int externalSize) {

@Override
public String toString() {
return String.format("OnHeapGraphIndex(size=%d, entryPoint=%d)", size(), entryPoint.get());
return String.format("OnHeapGraphIndex(size=%d, entryPoint=%d)", size(), entryPointUpdater.get(this));
}

@Override
Expand All @@ -232,7 +237,7 @@ void validateEntryNode() {
if (size() == 0) {
return;
}
var en = entryPoint.get();
var en = entryPointUpdater.get(this);
if (!(en >= 0 && nodes.containsKey(en))) {
throw new IllegalStateException("Entry node was incompletely added! " + en);
}
Expand All @@ -255,12 +260,12 @@ public int size() {

@Override
public int entryNode() {
return OnHeapGraphIndex.this.entryPoint.get();
return (int) entryPointUpdater.get(OnHeapGraphIndex.this);
}

@Override
public String toString() {
return "OnHeapGraphIndexView(size=" + size() + ", entryPoint=" + entryPoint.get();
return "OnHeapGraphIndexView(size=" + size() + ", entryPoint=" + entryPointUpdater.get(OnHeapGraphIndex.this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public static void main(String[] args) throws IOException {
var mGrid = List.of(8, 12, 16, 24, 32, 48, 64);
var efConstructionGrid = List.of(60, 80, 100, 120, 160, 200, 400, 600, 800);
var efSearchGrid = List.of(1, 2);
var diskGrid = List.of(false, true);
var diskGrid = List.of( true);

// this dataset contains more than 10k query vectors, so we limit it with .subList
var adaSet = loadWikipediaData();
Expand Down