Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Deletion #29

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions benches/hnsw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,21 @@ fn hnsw_db(c: &mut Criterion) {

runtime.block_on(async {
for query in queries.iter() {
let neighbors = initial_db
let (neighbors, buffers) = initial_db
.search_to_insert(vector_store, graph_store, query)
.await;
assert!(!initial_db.is_match(vector_store, &neighbors).await);
// Insert the new vector into the store.
let inserted = vector_store.insert(query).await;
initial_db
.insert_from_search_results(vector_store, graph_store, rng, inserted, neighbors)
.insert_from_search_results(
vector_store,
graph_store,
rng,
inserted,
neighbors,
buffers,
)
.await;
}
});
Expand All @@ -45,7 +52,7 @@ fn hnsw_db(c: &mut Criterion) {
runtime.block_on(async move {
let raw_query = database_size;
let query = vector_store.prepare_query(raw_query);
let neighbors = initial_db
let (neighbors, buffers) = initial_db
.search_to_insert(vector_store, graph_store, &query)
.await;
let inserted = vector_store.insert(&query).await;
Expand All @@ -56,6 +63,7 @@ fn hnsw_db(c: &mut Criterion) {
rng,
inserted,
neighbors,
buffers,
)
.await;
});
Expand Down
1 change: 1 addition & 0 deletions migrations/20240909105321_init.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ CREATE TABLE IF NOT EXISTS hawk_graph_links (
source_ref text NOT NULL,
layer integer NOT NULL,
links jsonb NOT NULL,
buf jsonb NOT NULL,
CONSTRAINT hawk_graph_pkey PRIMARY KEY (source_ref, layer)
);

Expand Down
39 changes: 34 additions & 5 deletions src/examples/lazy_memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ pub struct LazyMemoryStore {

#[derive(Clone, Debug, PartialEq, Eq)]
struct Point {
/// Index of the point in the store
point_id: PointId,
/// Whatever encoding of a vector.
data: u64,
/// Distinguish between queries that are pending, and those that were ultimately accepted into the vector store.
Expand All @@ -34,20 +36,29 @@ impl LazyMemoryStore {
}

impl LazyMemoryStore {
pub fn get_point_position(&self, point_id: PointId) -> Option<usize> {
self.points
.iter()
.position(|point| point.point_id == point_id)
}

pub fn prepare_query(&mut self, raw_query: u64) -> <Self as VectorStore>::QueryRef {
let point_id = PointId(self.points.len());
self.points.push(Point {
point_id,
data: raw_query,
is_persistent: false,
});

let point_id = self.points.len() - 1;
PointId(point_id)
point_id
}

fn actually_evaluate_distance(&self, pair: &<Self as VectorStore>::DistanceRef) -> u32 {
let position_0 = self.get_point_position(pair.0).expect("Point not found");
let position_1 = self.get_point_position(pair.1).expect("Point not found");
// Hamming distance
let vector_0 = self.points[pair.0 .0].data;
let vector_1 = self.points[pair.1 .0].data;
let vector_0 = self.points[position_0].data;
let vector_1 = self.points[position_1].data;
(vector_0 ^ vector_1).count_ones()
}
}
Expand All @@ -57,12 +68,26 @@ impl VectorStore for LazyMemoryStore {
type VectorRef = PointId; // Vector ID, inserted.
type DistanceRef = (PointId, PointId); // Lazy distance representation.

async fn num_entries(&self) -> usize {
self.points.len()
}

async fn insert(&mut self, query: &Self::QueryRef) -> Self::VectorRef {
let position = self
.get_point_position(*query)
.expect("query does not exist");
// The query is now accepted in the store. It keeps the same ID.
self.points[query.0].is_persistent = true;
self.points[position].is_persistent = true;
*query
}

async fn delete(&mut self, vector: &Self::VectorRef) {
let position = self
.get_point_position(*vector)
.expect("vector does not exist");
self.points.remove(position);
}

async fn eval_distance(
&self,
query: &Self::QueryRef,
Expand All @@ -83,6 +108,10 @@ impl VectorStore for LazyMemoryStore {
) -> bool {
self.actually_evaluate_distance(distance1) < self.actually_evaluate_distance(distance2)
}

fn get_range(&self, range: std::ops::Range<usize>) -> Vec<PointId> {
self.points[range].iter().map(|p| p.point_id).collect()
}
}

#[cfg(test)]
Expand Down
14 changes: 13 additions & 1 deletion src/graph_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize};

use crate::hnsw_db::FurthestQueueV;
use crate::VectorStore;
use std::fmt::Debug;
use std::{fmt::Debug, ops::Range};

pub mod graph_mem;
mod graph_pg;
Expand All @@ -19,6 +19,18 @@ pub trait GraphStore<V: VectorStore> {
-> FurthestQueueV<V>;

async fn set_links(&mut self, base: V::VectorRef, links: FurthestQueueV<V>, lc: usize);

async fn get_buffer(
&self,
base: &<V as VectorStore>::VectorRef,
lc: usize,
) -> FurthestQueueV<V>;

async fn set_buffer(&mut self, base: V::VectorRef, buffer: FurthestQueueV<V>, lc: usize);

async fn quick_delete(&mut self, point: V::VectorRef);

async fn delete_cleanup(&mut self, range: Range<usize>, vector_store: &V);
}

#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
Expand Down
134 changes: 129 additions & 5 deletions src/graph_store/graph_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,24 @@ use crate::{
hnsw_db::{FurthestQueue, FurthestQueueV},
VectorStore,
};
use std::collections::HashMap;
use std::{
collections::{HashMap, HashSet},
ops::Range,
};

#[derive(Default, Clone, PartialEq, Eq, Debug)]
pub struct GraphMem<V: VectorStore> {
entry_point: Option<EntryPoint<V::VectorRef>>,
layers: Vec<Layer<V>>,
tombstones: HashSet<V::VectorRef>,
}

impl<V: VectorStore> GraphMem<V> {
pub fn new() -> Self {
GraphMem {
entry_point: None,
layers: vec![],
tombstones: HashSet::new(),
}
}

Expand All @@ -26,6 +31,7 @@ impl<V: VectorStore> GraphMem<V> {
GraphMem {
entry_point,
layers,
tombstones: HashSet::new(),
}
}

Expand Down Expand Up @@ -57,12 +63,26 @@ impl<V: VectorStore> GraphMem<V> {
.into_iter()
.map(|(v, q)| (vector_map(v), q.map::<V, F1, F2>(vector_map, distance_map)))
.collect();
Layer::<V> { links }
let buffer: HashMap<_, _> = v
.buffer
.into_iter()
.map(|(v, q)| (vector_map(v), q.map::<V, F1, F2>(vector_map, distance_map)))
.collect();

Layer::<V> { links, buffer }
})
.collect();

let tombstones = graph
.tombstones
.into_iter()
.map(|v| vector_map(v))
.collect();

GraphMem::<V> {
entry_point: new_entry,
layers,
tombstones,
}
}
}
Expand Down Expand Up @@ -104,23 +124,101 @@ impl<V: VectorStore> GraphStore<V> for GraphMem<V> {
let layer = &mut self.layers[lc];
layer.set_links(base, links);
}

async fn get_buffer(
&self,
base: &<V as VectorStore>::VectorRef,
lc: usize,
) -> FurthestQueueV<V> {
let layer = &self.layers[lc];
if let Some(buffer) = layer.get_buffer(base) {
buffer.clone()
} else {
FurthestQueue::new()
}
}

async fn set_buffer(&mut self, base: V::VectorRef, buffer: FurthestQueueV<V>, lc: usize) {
let layer = &mut self.layers[lc];
layer.set_buffer(base, buffer);
}

async fn quick_delete(&mut self, point: <V as VectorStore>::VectorRef) {
let entry_point = self.get_entry_point().await;
if point == entry_point.expect("No entry point").vector_ref {
let mut layer_count = 0;
for (lc, layer) in self.layers.iter().enumerate().rev() {
if !layer.links.is_empty() {
layer_count = lc;
break;
}
}
let vector_ref = self.layers[layer_count]
.links
.keys()
.next()
.unwrap()
.clone();
self.entry_point = Some(EntryPoint {
vector_ref,
layer_count,
});
}

self.tombstones.insert(point.clone());

for layer in self.layers.iter_mut() {
layer.links.remove(&point);
}
}

async fn delete_cleanup(&mut self, range: Range<usize>, vector_store: &V) {
for vector in vector_store.get_range(range).iter() {
for lc in 0..self.layers.len() {
let mut neighbors = self.get_links(&vector, lc).await;
let queue = neighbors.queue.clone();
for neighbor in queue.iter() {
if self.tombstones.get(&neighbor.0).is_some() {
neighbors.remove(neighbor.0.clone()).await;
let mut buffer = self.get_buffer(&vector, lc).await;
let nearest_point = buffer.get_nearest().cloned();
if nearest_point.is_some() {
let (point, distance) = nearest_point.unwrap();
neighbors
.insert(&vector_store.clone(), point.clone(), distance.clone())
.await;
buffer.remove(point.clone()).await;
}
}
}

self.layers[lc].set_links(vector.clone(), neighbors);
}
}
}
}

#[derive(PartialEq, Eq, Default, Clone, Debug)]
pub struct Layer<V: VectorStore> {
/// Map a base vector to its neighbors, including the distance base-neighbor.
links: HashMap<V::VectorRef, FurthestQueueV<V>>,
/// A buffer of pruned neighbors
buffer: HashMap<V::VectorRef, FurthestQueueV<V>>,
}

impl<V: VectorStore> Layer<V> {
fn new() -> Self {
Layer {
links: HashMap::new(),
buffer: HashMap::new(),
}
}

pub fn from_links(links: HashMap<V::VectorRef, FurthestQueueV<V>>) -> Self {
Layer { links }
Layer {
links,
buffer: HashMap::new(),
}
}

fn get_links(&self, from: &V::VectorRef) -> Option<&FurthestQueueV<V>> {
Expand All @@ -134,6 +232,14 @@ impl<V: VectorStore> Layer<V> {
pub fn get_links_map(&self) -> &HashMap<V::VectorRef, FurthestQueueV<V>> {
&self.links
}

fn get_buffer(&self, from: &V::VectorRef) -> Option<&FurthestQueueV<V>> {
self.buffer.get(from)
}

fn set_buffer(&mut self, from: V::VectorRef, buffer: FurthestQueueV<V>) {
self.buffer.insert(from, buffer);
}
}

#[cfg(test)]
Expand Down Expand Up @@ -208,6 +314,22 @@ mod tests {
) -> bool {
*distance1 < *distance2
}

async fn num_entries(&self) -> usize {
self.points.len()
}

fn get_range(&self, range: std::ops::Range<usize>) -> Vec<Self::VectorRef> {
self.points
.iter()
.take(range.len())
.map(|(id, _)| TestPointId(*id))
.collect()
}

async fn delete(&mut self, vector: &Self::VectorRef) {
self.points.remove(&vector.0);
}
}

#[tokio::test]
Expand All @@ -219,7 +341,7 @@ mod tests {

for raw_query in 0..10 {
let query = vector_store.prepare_query(raw_query);
let neighbors = searcher
let (neighbors, buffer) = searcher
.search_to_insert(&mut vector_store, &mut graph_store, &query)
.await;
let inserted = vector_store.insert(&query).await;
Expand All @@ -230,6 +352,7 @@ mod tests {
&mut rng,
inserted,
neighbors,
buffer,
)
.await;
}
Expand All @@ -255,7 +378,7 @@ mod tests {

for raw_query in 0..10 {
let query = vector_store.prepare_query(raw_query);
let neighbors = searcher
let (neighbors, buffer) = searcher
.search_to_insert(&mut vector_store, &mut graph_store, &query)
.await;
let inserted = vector_store.insert(&query).await;
Expand All @@ -266,6 +389,7 @@ mod tests {
&mut rng,
inserted,
neighbors,
buffer,
)
.await;

Expand Down
Loading