Skip to content

Commit

Permalink
refactor: split quantized data vectors to two tapes (#196)
Browse files Browse the repository at this point in the history
split data vectors to two tapes, one is immutable since `maintain`, and
another one is appendable for newly-inserted vectors

Signed-off-by: usamoi <[email protected]>
  • Loading branch information
usamoi authored Feb 21, 2025
1 parent ffbd865 commit d16d469
Show file tree
Hide file tree
Showing 15 changed files with 883 additions and 1,002 deletions.
101 changes: 94 additions & 7 deletions crates/algorithm/src/build.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::RelationWrite;
use crate::operator::{Accessor2, Operator, Vector};
use crate::tape::*;
use crate::tape::TapeWriter;
use crate::tuples::*;
use crate::types::*;
use crate::{Branch, DerefMut, Page, PageGuard, RelationWrite};
use simd::fast_scan::{any_pack, padding_pack};
use vector::VectorOwned;

pub fn build<O: Operator>(
Expand Down Expand Up @@ -47,10 +48,13 @@ pub fn build<O: Operator>(
let mut level = Vec::new();
for j in 0..structures[i].len() {
if i == 0 {
let tape = TapeWriter::<_, _, H0Tuple>::create(|| index.extend(false));
let frozen_tape = TapeWriter::<_, _, FrozenTuple>::create(|| index.extend(false));
let appendable_tape =
TapeWriter::<_, _, AppendableTuple>::create(|| index.extend(false));
let mut jump = TapeWriter::<_, _, JumpTuple>::create(|| index.extend(false));
jump.push(JumpTuple {
first: tape.first(),
frozen_first: frozen_tape.first(),
appendable_first: appendable_tape.first(),
});
level.push(jump.first());
} else {
Expand All @@ -73,17 +77,46 @@ pub fn build<O: Operator>(
} else {
O::Vector::code(h1_mean)
};
tape.push(H1Branch {
tape.push(Branch {
mean: pointer_of_means[i - 1][child as usize],
dis_u_2: code.dis_u_2,
factor_ppc: code.factor_ppc,
factor_ip: code.factor_ip,
factor_err: code.factor_err,
signs: code.signs,
first: pointer_of_firsts[i - 1][child as usize],
extra: pointer_of_firsts[i - 1][child as usize],
});
}
let tape = tape.into_inner();
let (mut tape, branches) = tape.into_inner();
if !branches.is_empty() {
let mut remain =
padding_pack(branches.iter().map(|x| rabitq::pack_to_u4(&x.signs)));
loop {
let freespace = tape.freespace();
if H1Tuple::estimate_size_0(remain.len()) <= freespace as usize {
tape.tape_put(H1Tuple::_0 {
mean: any_pack(branches.iter().map(|x| x.mean)),
dis_u_2: any_pack(branches.iter().map(|x| x.dis_u_2)),
factor_ppc: any_pack(branches.iter().map(|x| x.factor_ppc)),
factor_ip: any_pack(branches.iter().map(|x| x.factor_ip)),
factor_err: any_pack(branches.iter().map(|x| x.factor_err)),
first: any_pack(branches.iter().map(|x| x.extra)),
len: branches.len() as _,
elements: remain,
});
break;
}
if let Some(w) = H1Tuple::fit_1(freespace) {
let (left, right) = remain.split_at(std::cmp::min(w, remain.len()));
tape.tape_put(H1Tuple::_1 {
elements: left.to_vec(),
});
remain = right.to_vec();
} else {
tape.tape_move();
}
}
}
level.push(tape.first());
}
}
Expand All @@ -100,3 +133,57 @@ pub fn build<O: Operator>(
freepage_first: freepage.first(),
});
}

pub struct H1TapeWriter<G, E> {
tape: TapeWriter<G, E, H1Tuple>,
branches: Vec<Branch<u32>>,
}

impl<G, E> H1TapeWriter<G, E>
where
G: PageGuard + DerefMut,
G::Target: Page,
E: Fn() -> G,
{
fn create(extend: E) -> Self {
Self {
tape: TapeWriter::create(extend),
branches: Vec::new(),
}
}
fn push(&mut self, branch: Branch<u32>) {
self.branches.push(branch);
if self.branches.len() == 32 {
let chunk = std::array::from_fn::<_, 32, _>(|_| self.branches.pop().unwrap());
let mut remain = padding_pack(chunk.iter().map(|x| rabitq::pack_to_u4(&x.signs)));
loop {
let freespace = self.tape.freespace();
if H1Tuple::estimate_size_0(remain.len()) <= freespace as usize {
self.tape.tape_put(H1Tuple::_0 {
mean: chunk.each_ref().map(|x| x.mean),
dis_u_2: chunk.each_ref().map(|x| x.dis_u_2),
factor_ppc: chunk.each_ref().map(|x| x.factor_ppc),
factor_ip: chunk.each_ref().map(|x| x.factor_ip),
factor_err: chunk.each_ref().map(|x| x.factor_err),
first: chunk.each_ref().map(|x| x.extra),
len: chunk.len() as _,
elements: remain,
});
break;
}
if let Some(w) = H1Tuple::fit_1(freespace) {
let (left, right) = remain.split_at(std::cmp::min(w, remain.len()));
self.tape.tape_put(H1Tuple::_1 {
elements: left.to_vec(),
});
remain = right.to_vec();
} else {
self.tape.tape_move();
}
}
}
}
fn into_inner(self) -> (TapeWriter<G, E, H1Tuple>, Vec<Branch<u32>>) {
(self.tape, self.branches)
}
}
183 changes: 86 additions & 97 deletions crates/algorithm/src/bulkdelete.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::operator::Operator;
use crate::pipe::Pipe;
use crate::operator::{FunctionalAccessor, Operator};
use crate::tuples::*;
use crate::{Page, RelationWrite};
use crate::{Page, RelationWrite, tape};
use std::num::NonZeroU64;

pub fn bulkdelete<O: Operator>(
Expand All @@ -10,7 +9,8 @@ pub fn bulkdelete<O: Operator>(
callback: impl Fn(NonZeroU64) -> bool,
) {
let meta_guard = index.read(0);
let meta_tuple = meta_guard.get(1).unwrap().pipe(read_tuple::<MetaTuple>);
let meta_bytes = meta_guard.get(1).expect("data corruption");
let meta_tuple = MetaTuple::deserialize_ref(meta_bytes);
let height_of_root = meta_tuple.height_of_root();
let root_first = meta_tuple.root_first();
let vectors_first = meta_tuple.vectors_first();
Expand All @@ -21,25 +21,19 @@ pub fn bulkdelete<O: Operator>(
let step = |state: State| {
let mut results = Vec::new();
for first in state {
let mut current = first;
while current != u32::MAX {
let h1_guard = index.read(current);
for i in 1..=h1_guard.len() {
let h1_tuple = h1_guard
.get(i)
.expect("data corruption")
.pipe(read_tuple::<H1Tuple>);
match h1_tuple {
H1TupleReader::_0(h1_tuple) => {
for first in h1_tuple.first().iter().copied() {
results.push(first);
}
}
H1TupleReader::_1(_) => (),
tape::read_h1_tape(
index.clone(),
first,
|| {
fn push<T>(_: &mut (), _: &[T]) {}
fn finish<T>(_: (), _: (&T, &T, &T, &T)) -> [(); 32] {
[(); 32]
}
}
current = h1_guard.get_opaque().next;
}
FunctionalAccessor::new((), push, finish)
},
|(), _, first| results.push(first),
|_| check(),
);
}
results
};
Expand All @@ -48,97 +42,94 @@ pub fn bulkdelete<O: Operator>(
}
for first in state {
let jump_guard = index.read(first);
let jump_tuple = jump_guard
.get(1)
.expect("data corruption")
.pipe(read_tuple::<JumpTuple>);
let first = jump_tuple.first();
let mut current = first;
while current != u32::MAX {
check();
let read = index.read(current);
let flag = 'flag: {
for i in 1..=read.len() {
let h0_tuple = read
.get(i)
.expect("data corruption")
.pipe(read_tuple::<H0Tuple>);
match h0_tuple {
H0TupleReader::_0(h0_tuple) => {
let p = h0_tuple.payload();
if let Some(payload) = p {
if callback(payload) {
let jump_bytes = jump_guard.get(1).expect("data corruption");
let jump_tuple = JumpTuple::deserialize_ref(jump_bytes);
{
let mut current = jump_tuple.frozen_first();
while current != u32::MAX {
check();
let read = index.read(current);
let flag = 'flag: {
for i in 1..=read.len() {
let bytes = read.get(i).expect("data corruption");
let tuple = FrozenTuple::deserialize_ref(bytes);
if let FrozenTupleReader::_0(tuple) = tuple {
for p in tuple.payload().iter() {
if Some(true) == p.map(&callback) {
break 'flag true;
}
}
}
H0TupleReader::_1(h0_tuple) => {
let p = h0_tuple.payload();
for j in 0..32 {
if let Some(payload) = p[j] {
if callback(payload) {
break 'flag true;
}
}
false
};
if flag {
drop(read);
let mut write = index.write(current, false);
for i in 1..=write.len() {
let bytes = write.get_mut(i).expect("data corruption");
let tuple = FrozenTuple::deserialize_mut(bytes);
if let FrozenTupleWriter::_0(mut tuple) = tuple {
for p in tuple.payload().iter_mut() {
if Some(true) == p.map(&callback) {
*p = None;
}
}
}
H0TupleReader::_2(_) => (),
}
current = write.get_opaque().next;
} else {
current = read.get_opaque().next;
}
false
};
if flag {
drop(read);
let mut write = index.write(current, false);
for i in 1..=write.len() {
let h0_tuple = write
.get_mut(i)
.expect("data corruption")
.pipe(write_tuple::<H0Tuple>);
match h0_tuple {
H0TupleWriter::_0(mut h0_tuple) => {
let p = h0_tuple.payload();
if let Some(payload) = *p {
if callback(payload) {
*p = None;
}
}
}
}
{
let mut current = jump_tuple.appendable_first();
while current != u32::MAX {
check();
let read = index.read(current);
let flag = 'flag: {
for i in 1..=read.len() {
let bytes = read.get(i).expect("data corruption");
let tuple = AppendableTuple::deserialize_ref(bytes);
let p = tuple.payload();
if Some(true) == p.map(&callback) {
break 'flag true;
}
H0TupleWriter::_1(mut h0_tuple) => {
let p = h0_tuple.payload();
for j in 0..32 {
if let Some(payload) = p[j] {
if callback(payload) {
p[j] = None;
}
}
}
}
false
};
if flag {
drop(read);
let mut write = index.write(current, false);
for i in 1..=write.len() {
let bytes = write.get_mut(i).expect("data corruption");
let mut tuple = AppendableTuple::deserialize_mut(bytes);
let p = tuple.payload();
if Some(true) == p.map(&callback) {
*p = None;
}
H0TupleWriter::_2(_) => (),
}
current = write.get_opaque().next;
} else {
current = read.get_opaque().next;
}
current = write.get_opaque().next;
} else {
current = read.get_opaque().next;
}
}
}
}
{
let first = vectors_first;
let mut current = first;
let mut current = vectors_first;
while current != u32::MAX {
check();
let read = index.read(current);
let flag = 'flag: {
for i in 1..=read.len() {
if let Some(vector_bytes) = read.get(i) {
let vector_tuple = vector_bytes.pipe(read_tuple::<VectorTuple<O::Vector>>);
let p = vector_tuple.payload();
if let Some(payload) = p {
if callback(payload) {
break 'flag true;
}
if let Some(bytes) = read.get(i) {
let tuple = VectorTuple::<O::Vector>::deserialize_ref(bytes);
let p = tuple.payload();
if Some(true) == p.map(&callback) {
break 'flag true;
}
}
}
Expand All @@ -148,13 +139,11 @@ pub fn bulkdelete<O: Operator>(
drop(read);
let mut write = index.write(current, true);
for i in 1..=write.len() {
if let Some(vector_bytes) = write.get(i) {
let vector_tuple = vector_bytes.pipe(read_tuple::<VectorTuple<O::Vector>>);
let p = vector_tuple.payload();
if let Some(payload) = p {
if callback(payload) {
write.free(i);
}
if let Some(bytes) = write.get(i) {
let tuple = VectorTuple::<O::Vector>::deserialize_ref(bytes);
let p = tuple.payload();
if Some(true) == p.map(&callback) {
write.free(i);
}
};
}
Expand Down
Loading

0 comments on commit d16d469

Please sign in to comment.