diff --git a/crates/algorithm/src/build.rs b/crates/algorithm/src/build.rs index d8c9b62..0a8221f 100644 --- a/crates/algorithm/src/build.rs +++ b/crates/algorithm/src/build.rs @@ -1,8 +1,9 @@ -use crate::RelationWrite; use crate::operator::{Accessor2, Operator, Vector}; -use crate::tape::*; +use crate::tape::TapeWriter; use crate::tuples::*; use crate::types::*; +use crate::{Branch, DerefMut, Page, PageGuard, RelationWrite}; +use simd::fast_scan::{any_pack, padding_pack}; use vector::VectorOwned; pub fn build( @@ -47,10 +48,13 @@ pub fn build( let mut level = Vec::new(); for j in 0..structures[i].len() { if i == 0 { - let tape = TapeWriter::<_, _, H0Tuple>::create(|| index.extend(false)); + let frozen_tape = TapeWriter::<_, _, FrozenTuple>::create(|| index.extend(false)); + let appendable_tape = + TapeWriter::<_, _, AppendableTuple>::create(|| index.extend(false)); let mut jump = TapeWriter::<_, _, JumpTuple>::create(|| index.extend(false)); jump.push(JumpTuple { - first: tape.first(), + frozen_first: frozen_tape.first(), + appendable_first: appendable_tape.first(), }); level.push(jump.first()); } else { @@ -73,17 +77,46 @@ pub fn build( } else { O::Vector::code(h1_mean) }; - tape.push(H1Branch { + tape.push(Branch { mean: pointer_of_means[i - 1][child as usize], dis_u_2: code.dis_u_2, factor_ppc: code.factor_ppc, factor_ip: code.factor_ip, factor_err: code.factor_err, signs: code.signs, - first: pointer_of_firsts[i - 1][child as usize], + extra: pointer_of_firsts[i - 1][child as usize], }); } - let tape = tape.into_inner(); + let (mut tape, branches) = tape.into_inner(); + if !branches.is_empty() { + let mut remain = + padding_pack(branches.iter().map(|x| rabitq::pack_to_u4(&x.signs))); + loop { + let freespace = tape.freespace(); + if H1Tuple::estimate_size_0(remain.len()) <= freespace as usize { + tape.tape_put(H1Tuple::_0 { + mean: any_pack(branches.iter().map(|x| x.mean)), + dis_u_2: any_pack(branches.iter().map(|x| x.dis_u_2)), + factor_ppc: any_pack(branches.iter().map(|x| x.factor_ppc)), + factor_ip: any_pack(branches.iter().map(|x| x.factor_ip)), + factor_err: any_pack(branches.iter().map(|x| x.factor_err)), + first: any_pack(branches.iter().map(|x| x.extra)), + len: branches.len() as _, + elements: remain, + }); + break; + } + if let Some(w) = H1Tuple::fit_1(freespace) { + let (left, right) = remain.split_at(std::cmp::min(w, remain.len())); + tape.tape_put(H1Tuple::_1 { + elements: left.to_vec(), + }); + remain = right.to_vec(); + } else { + tape.tape_move(); + } + } + } level.push(tape.first()); } } @@ -100,3 +133,57 @@ pub fn build( freepage_first: freepage.first(), }); } + +pub struct H1TapeWriter { + tape: TapeWriter, + branches: Vec>, +} + +impl H1TapeWriter +where + G: PageGuard + DerefMut, + G::Target: Page, + E: Fn() -> G, +{ + fn create(extend: E) -> Self { + Self { + tape: TapeWriter::create(extend), + branches: Vec::new(), + } + } + fn push(&mut self, branch: Branch) { + self.branches.push(branch); + if self.branches.len() == 32 { + let chunk = std::array::from_fn::<_, 32, _>(|_| self.branches.pop().unwrap()); + let mut remain = padding_pack(chunk.iter().map(|x| rabitq::pack_to_u4(&x.signs))); + loop { + let freespace = self.tape.freespace(); + if H1Tuple::estimate_size_0(remain.len()) <= freespace as usize { + self.tape.tape_put(H1Tuple::_0 { + mean: chunk.each_ref().map(|x| x.mean), + dis_u_2: chunk.each_ref().map(|x| x.dis_u_2), + factor_ppc: chunk.each_ref().map(|x| x.factor_ppc), + factor_ip: chunk.each_ref().map(|x| x.factor_ip), + factor_err: chunk.each_ref().map(|x| x.factor_err), + first: chunk.each_ref().map(|x| x.extra), + len: chunk.len() as _, + elements: remain, + }); + break; + } + if let Some(w) = H1Tuple::fit_1(freespace) { + let (left, right) = remain.split_at(std::cmp::min(w, remain.len())); + self.tape.tape_put(H1Tuple::_1 { + elements: left.to_vec(), + }); + remain = right.to_vec(); + } else { + self.tape.tape_move(); + } + } + } + } + fn into_inner(self) -> (TapeWriter, Vec>) { + (self.tape, self.branches) + } +} diff --git a/crates/algorithm/src/bulkdelete.rs b/crates/algorithm/src/bulkdelete.rs index 524ec90..506d2f0 100644 --- a/crates/algorithm/src/bulkdelete.rs +++ b/crates/algorithm/src/bulkdelete.rs @@ -1,7 +1,6 @@ -use crate::operator::Operator; -use crate::pipe::Pipe; +use crate::operator::{FunctionalAccessor, Operator}; use crate::tuples::*; -use crate::{Page, RelationWrite}; +use crate::{Page, RelationWrite, tape}; use std::num::NonZeroU64; pub fn bulkdelete( @@ -10,7 +9,8 @@ pub fn bulkdelete( callback: impl Fn(NonZeroU64) -> bool, ) { let meta_guard = index.read(0); - let meta_tuple = meta_guard.get(1).unwrap().pipe(read_tuple::); + let meta_bytes = meta_guard.get(1).expect("data corruption"); + let meta_tuple = MetaTuple::deserialize_ref(meta_bytes); let height_of_root = meta_tuple.height_of_root(); let root_first = meta_tuple.root_first(); let vectors_first = meta_tuple.vectors_first(); @@ -21,25 +21,19 @@ pub fn bulkdelete( let step = |state: State| { let mut results = Vec::new(); for first in state { - let mut current = first; - while current != u32::MAX { - let h1_guard = index.read(current); - for i in 1..=h1_guard.len() { - let h1_tuple = h1_guard - .get(i) - .expect("data corruption") - .pipe(read_tuple::); - match h1_tuple { - H1TupleReader::_0(h1_tuple) => { - for first in h1_tuple.first().iter().copied() { - results.push(first); - } - } - H1TupleReader::_1(_) => (), + tape::read_h1_tape( + index.clone(), + first, + || { + fn push(_: &mut (), _: &[T]) {} + fn finish(_: (), _: (&T, &T, &T, &T)) -> [(); 32] { + [(); 32] } - } - current = h1_guard.get_opaque().next; - } + FunctionalAccessor::new((), push, finish) + }, + |(), _, first| results.push(first), + |_| check(), + ); } results }; @@ -48,97 +42,94 @@ pub fn bulkdelete( } for first in state { let jump_guard = index.read(first); - let jump_tuple = jump_guard - .get(1) - .expect("data corruption") - .pipe(read_tuple::); - let first = jump_tuple.first(); - let mut current = first; - while current != u32::MAX { - check(); - let read = index.read(current); - let flag = 'flag: { - for i in 1..=read.len() { - let h0_tuple = read - .get(i) - .expect("data corruption") - .pipe(read_tuple::); - match h0_tuple { - H0TupleReader::_0(h0_tuple) => { - let p = h0_tuple.payload(); - if let Some(payload) = p { - if callback(payload) { + let jump_bytes = jump_guard.get(1).expect("data corruption"); + let jump_tuple = JumpTuple::deserialize_ref(jump_bytes); + { + let mut current = jump_tuple.frozen_first(); + while current != u32::MAX { + check(); + let read = index.read(current); + let flag = 'flag: { + for i in 1..=read.len() { + let bytes = read.get(i).expect("data corruption"); + let tuple = FrozenTuple::deserialize_ref(bytes); + if let FrozenTupleReader::_0(tuple) = tuple { + for p in tuple.payload().iter() { + if Some(true) == p.map(&callback) { break 'flag true; } } } - H0TupleReader::_1(h0_tuple) => { - let p = h0_tuple.payload(); - for j in 0..32 { - if let Some(payload) = p[j] { - if callback(payload) { - break 'flag true; - } + } + false + }; + if flag { + drop(read); + let mut write = index.write(current, false); + for i in 1..=write.len() { + let bytes = write.get_mut(i).expect("data corruption"); + let tuple = FrozenTuple::deserialize_mut(bytes); + if let FrozenTupleWriter::_0(mut tuple) = tuple { + for p in tuple.payload().iter_mut() { + if Some(true) == p.map(&callback) { + *p = None; } } } - H0TupleReader::_2(_) => (), } + current = write.get_opaque().next; + } else { + current = read.get_opaque().next; } - false - }; - if flag { - drop(read); - let mut write = index.write(current, false); - for i in 1..=write.len() { - let h0_tuple = write - .get_mut(i) - .expect("data corruption") - .pipe(write_tuple::); - match h0_tuple { - H0TupleWriter::_0(mut h0_tuple) => { - let p = h0_tuple.payload(); - if let Some(payload) = *p { - if callback(payload) { - *p = None; - } - } + } + } + { + let mut current = jump_tuple.appendable_first(); + while current != u32::MAX { + check(); + let read = index.read(current); + let flag = 'flag: { + for i in 1..=read.len() { + let bytes = read.get(i).expect("data corruption"); + let tuple = AppendableTuple::deserialize_ref(bytes); + let p = tuple.payload(); + if Some(true) == p.map(&callback) { + break 'flag true; } - H0TupleWriter::_1(mut h0_tuple) => { - let p = h0_tuple.payload(); - for j in 0..32 { - if let Some(payload) = p[j] { - if callback(payload) { - p[j] = None; - } - } - } + } + false + }; + if flag { + drop(read); + let mut write = index.write(current, false); + for i in 1..=write.len() { + let bytes = write.get_mut(i).expect("data corruption"); + let mut tuple = AppendableTuple::deserialize_mut(bytes); + let p = tuple.payload(); + if Some(true) == p.map(&callback) { + *p = None; } - H0TupleWriter::_2(_) => (), } + current = write.get_opaque().next; + } else { + current = read.get_opaque().next; } - current = write.get_opaque().next; - } else { - current = read.get_opaque().next; } } } } { - let first = vectors_first; - let mut current = first; + let mut current = vectors_first; while current != u32::MAX { check(); let read = index.read(current); let flag = 'flag: { for i in 1..=read.len() { - if let Some(vector_bytes) = read.get(i) { - let vector_tuple = vector_bytes.pipe(read_tuple::>); - let p = vector_tuple.payload(); - if let Some(payload) = p { - if callback(payload) { - break 'flag true; - } + if let Some(bytes) = read.get(i) { + let tuple = VectorTuple::::deserialize_ref(bytes); + let p = tuple.payload(); + if Some(true) == p.map(&callback) { + break 'flag true; } } } @@ -148,13 +139,11 @@ pub fn bulkdelete( drop(read); let mut write = index.write(current, true); for i in 1..=write.len() { - if let Some(vector_bytes) = write.get(i) { - let vector_tuple = vector_bytes.pipe(read_tuple::>); - let p = vector_tuple.payload(); - if let Some(payload) = p { - if callback(payload) { - write.free(i); - } + if let Some(bytes) = write.get(i) { + let tuple = VectorTuple::::deserialize_ref(bytes); + let p = tuple.payload(); + if Some(true) == p.map(&callback) { + write.free(i); } }; } diff --git a/crates/algorithm/src/cache.rs b/crates/algorithm/src/cache.rs index 0405708..d3d0e5d 100644 --- a/crates/algorithm/src/cache.rs +++ b/crates/algorithm/src/cache.rs @@ -1,16 +1,12 @@ -use crate::pipe::Pipe; -use crate::tuples::{H1Tuple, H1TupleReader, MetaTuple, read_tuple}; -use crate::{Page, RelationRead}; +use crate::operator::FunctionalAccessor; +use crate::tuples::{MetaTuple, WithReader}; +use crate::{Page, RelationRead, tape}; pub fn cache(index: impl RelationRead) -> Vec { - let mut trace = Vec::::new(); - let mut read = |id| { - let result = index.read(id); - trace.push(id); - result - }; - let meta_guard = read(0); - let meta_tuple = meta_guard.get(1).unwrap().pipe(read_tuple::); + let mut trace = vec![0_u32]; + let meta_guard = index.read(0); + let meta_bytes = meta_guard.get(1).expect("data corruption"); + let meta_tuple = MetaTuple::deserialize_ref(meta_bytes); let height_of_root = meta_tuple.height_of_root(); let root_first = meta_tuple.root_first(); drop(meta_guard); @@ -19,25 +15,23 @@ pub fn cache(index: impl RelationRead) -> Vec { let mut step = |state: State| { let mut results = Vec::new(); for first in state { - let mut current = first; - while current != u32::MAX { - let h1_guard = read(current); - for i in 1..=h1_guard.len() { - let h1_tuple = h1_guard - .get(i) - .expect("data corruption") - .pipe(read_tuple::); - match h1_tuple { - H1TupleReader::_0(h1_tuple) => { - for first in h1_tuple.first().iter().copied() { - results.push(first); - } - } - H1TupleReader::_1(_) => (), + tape::read_h1_tape( + index.clone(), + first, + || { + fn push(_: &mut (), _: &[T]) {} + fn finish(_: (), _: (&T, &T, &T, &T)) -> [(); 32] { + [(); 32] } - } - current = h1_guard.get_opaque().next; - } + FunctionalAccessor::new((), push, finish) + }, + |(), _, first| { + results.push(first); + }, + |id| { + trace.push(id); + }, + ); } results }; @@ -45,7 +39,7 @@ pub fn cache(index: impl RelationRead) -> Vec { state = step(state); } for first in state { - let _ = read(first); + trace.push(first); } trace } diff --git a/crates/algorithm/src/freepages.rs b/crates/algorithm/src/freepages.rs index 6dc505c..95b63ae 100644 --- a/crates/algorithm/src/freepages.rs +++ b/crates/algorithm/src/freepages.rs @@ -1,4 +1,3 @@ -use crate::pipe::Pipe; use crate::tuples::*; use crate::*; use std::cmp::Reverse; @@ -7,9 +6,7 @@ pub fn mark(index: impl RelationWrite, freepage_first: u32, pages: &[u32]) { let mut pages = pages.to_vec(); pages.sort_by_key(|x| Reverse(*x)); pages.dedup(); - let first = freepage_first; - assert!(first != u32::MAX); - let (mut current, mut offset) = (first, 0_u32); + let (mut current, mut offset) = (freepage_first, 0_u32); while pages.is_empty() { let locals = { let mut local = Vec::new(); @@ -20,12 +17,10 @@ pub fn mark(index: impl RelationWrite, freepage_first: u32, pages: &[u32]) { }; let mut freespace_guard = index.write(current, false); if freespace_guard.len() == 0 { - freespace_guard.alloc(&serialize(&FreepageTuple {})); + freespace_guard.alloc(&FreepageTuple::serialize(&FreepageTuple {})); } - let mut freespace_tuple = freespace_guard - .get_mut(1) - .expect("data corruption") - .pipe(write_tuple::); + let freespace_bytes = freespace_guard.get_mut(1).expect("data corruption"); + let mut freespace_tuple = FreepageTuple::deserialize_mut(freespace_bytes); for local in locals { freespace_tuple.mark(local as _); } @@ -38,18 +33,14 @@ pub fn mark(index: impl RelationWrite, freepage_first: u32, pages: &[u32]) { } pub fn fetch(index: impl RelationWrite, freepage_first: u32) -> Option { - let first = freepage_first; - assert!(first != u32::MAX); - let (mut current, mut offset) = (first, 0_u32); + let (mut current, mut offset) = (freepage_first, 0_u32); loop { let mut freespace_guard = index.write(current, false); if freespace_guard.len() == 0 { return None; } - let mut freespace_tuple = freespace_guard - .get_mut(1) - .expect("data corruption") - .pipe(write_tuple::); + let freespace_bytes = freespace_guard.get_mut(1).expect("data corruption"); + let mut freespace_tuple = FreepageTuple::deserialize_mut(freespace_bytes); if let Some(local) = freespace_tuple.fetch() { return Some(local as u32 + offset); } diff --git a/crates/algorithm/src/insert.rs b/crates/algorithm/src/insert.rs index 9b71bae..d876aed 100644 --- a/crates/algorithm/src/insert.rs +++ b/crates/algorithm/src/insert.rs @@ -1,11 +1,9 @@ use crate::linked_vec::LinkedVec; use crate::operator::*; -use crate::pipe::Pipe; use crate::select_heap::SelectHeap; -use crate::tape::{access_1, append}; use crate::tuples::*; use crate::vectors::{self}; -use crate::{Page, RelationWrite}; +use crate::{Page, RelationWrite, tape}; use always_equal::AlwaysEqual; use distance::Distance; use std::cmp::Reverse; @@ -15,7 +13,8 @@ use vector::{VectorBorrowed, VectorOwned}; pub fn insert(index: impl RelationWrite, payload: NonZeroU64, vector: O::Vector) { let meta_guard = index.read(0); - let meta_tuple = meta_guard.get(1).unwrap().pipe(read_tuple::); + let meta_bytes = meta_guard.get(1).expect("data corruption"); + let meta_tuple = MetaTuple::deserialize_ref(meta_bytes); let dims = meta_tuple.dims(); let is_residual = meta_tuple.is_residual(); let rerank_in_heap = meta_tuple.rerank_in_heap(); @@ -42,7 +41,7 @@ pub fn insert(index: impl RelationWrite, payload: NonZeroU64, vecto let mut state: State = { let mean = root_mean; if is_residual { - let residual_u = vectors::access_1::( + let residual_u = vectors::read_for_h1_tuple::( index.clone(), mean, LAccess::new( @@ -64,7 +63,7 @@ pub fn insert(index: impl RelationWrite, payload: NonZeroU64, vecto } else { default_lut_block.as_ref().unwrap() }; - access_1( + tape::read_h1_tape( index.clone(), first, || { @@ -76,6 +75,7 @@ pub fn insert(index: impl RelationWrite, payload: NonZeroU64, vecto |lowerbound, mean, first| { results.push((Reverse(lowerbound), AlwaysEqual(mean), AlwaysEqual(first))); }, + |_| (), ); } let mut heap = SelectHeap::from_vec(results.into_vec()); @@ -84,7 +84,7 @@ pub fn insert(index: impl RelationWrite, payload: NonZeroU64, vecto while !heap.is_empty() && heap.peek().map(|x| x.0) > cache.peek().map(|x| x.0) { let (_, AlwaysEqual(mean), AlwaysEqual(first)) = heap.pop().unwrap(); if is_residual { - let (dis_u, residual_u) = vectors::access_1::( + let (dis_u, residual_u) = vectors::read_for_h1_tuple::( index.clone(), mean, LAccess::new( @@ -101,7 +101,7 @@ pub fn insert(index: impl RelationWrite, payload: NonZeroU64, vecto AlwaysEqual(Some(residual_u)), )); } else { - let dis_u = vectors::access_1::( + let dis_u = vectors::read_for_h1_tuple::( index.clone(), mean, LAccess::new( @@ -126,7 +126,7 @@ pub fn insert(index: impl RelationWrite, payload: NonZeroU64, vecto } else { O::Vector::code(vector.as_borrowed()) }; - let bytes = serialize(&H0Tuple::_0 { + let bytes = AppendableTuple::serialize(&AppendableTuple { mean, dis_u_2: code.dis_u_2, factor_ppc: code.factor_ppc, @@ -137,12 +137,8 @@ pub fn insert(index: impl RelationWrite, payload: NonZeroU64, vecto }); let jump_guard = index.read(first); - let jump_tuple = jump_guard - .get(1) - .expect("data corruption") - .pipe(read_tuple::); + let jump_bytes = jump_guard.get(1).expect("data corruption"); + let jump_tuple = JumpTuple::deserialize_ref(jump_bytes); - let first = jump_tuple.first(); - - append(index.clone(), first, &bytes, false); + tape::append(index.clone(), jump_tuple.appendable_first(), &bytes, false); } diff --git a/crates/algorithm/src/lib.rs b/crates/algorithm/src/lib.rs index 372774f..3bba11d 100644 --- a/crates/algorithm/src/lib.rs +++ b/crates/algorithm/src/lib.rs @@ -9,7 +9,6 @@ mod freepages; mod insert; mod linked_vec; mod maintain; -mod pipe; mod prewarm; mod rerank; mod search; @@ -30,6 +29,7 @@ pub use prewarm::prewarm; pub use rerank::{rerank_heap, rerank_index}; pub use search::search; +use crate::tuples::IndexPointer; use std::ops::{Deref, DerefMut}; use zerocopy_derive::{FromBytes, Immutable, IntoBytes, KnownLayout}; @@ -79,3 +79,13 @@ pub enum RerankMethod { Index, Heap, } + +pub(crate) struct Branch { + pub mean: IndexPointer, + pub dis_u_2: f32, + pub factor_ppc: f32, + pub factor_ip: f32, + pub factor_err: f32, + pub signs: Vec, + pub extra: T, +} diff --git a/crates/algorithm/src/maintain.rs b/crates/algorithm/src/maintain.rs index fec057b..2749a99 100644 --- a/crates/algorithm/src/maintain.rs +++ b/crates/algorithm/src/maintain.rs @@ -1,45 +1,39 @@ -use crate::operator::Operator; -use crate::pipe::Pipe; -use crate::tape::*; +use crate::operator::{FunctionalAccessor, Operator}; +use crate::tape::{self, TapeWriter}; use crate::tuples::*; -use crate::{Page, RelationWrite, freepages}; -use simd::fast_scan::unpack; +use crate::{Branch, DerefMut, Page, PageGuard, RelationWrite, freepages}; +use simd::fast_scan::{padding_pack, unpack}; +use std::num::NonZeroU64; pub fn maintain(index: impl RelationWrite, check: impl Fn()) { let meta_guard = index.read(0); - let meta_tuple = meta_guard.get(1).unwrap().pipe(read_tuple::); + let meta_bytes = meta_guard.get(1).expect("data corruption"); + let meta_tuple = MetaTuple::deserialize_ref(meta_bytes); let dims = meta_tuple.dims(); let height_of_root = meta_tuple.height_of_root(); let root_first = meta_tuple.root_first(); let freepage_first = meta_tuple.freepage_first(); drop(meta_guard); - let firsts = { + let state = { type State = Vec; let mut state: State = vec![root_first]; let step = |state: State| { let mut results = Vec::new(); for first in state { - let mut current = first; - while current != u32::MAX { - check(); - let h1_guard = index.read(current); - for i in 1..=h1_guard.len() { - let h1_tuple = h1_guard - .get(i) - .expect("data corruption") - .pipe(read_tuple::); - match h1_tuple { - H1TupleReader::_0(h1_tuple) => { - for first in h1_tuple.first().iter().copied() { - results.push(first); - } - } - H1TupleReader::_1(_) => (), + tape::read_h1_tape( + index.clone(), + first, + || { + fn push(_: &mut (), _: &[T]) {} + fn finish(_: (), _: (&T, &T, &T, &T)) -> [(); 32] { + [(); 32] } - } - current = h1_guard.get_opaque().next; - } + FunctionalAccessor::new((), push, finish) + }, + |(), _, first| results.push(first), + |_| check(), + ); } results }; @@ -49,14 +43,12 @@ pub fn maintain(index: impl RelationWrite, check: impl Fn()) { state }; - for first in firsts { + for first in state { let mut jump_guard = index.write(first, false); - let mut jump_tuple = jump_guard - .get_mut(1) - .expect("data corruption") - .pipe(write_tuple::); + let jump_bytes = jump_guard.get_mut(1).expect("data corruption"); + let mut jump_tuple = JumpTuple::deserialize_mut(jump_bytes); - let mut tape = H0TapeWriter::<_, _>::create(|| { + let mut tape = FrozenTapeWriter::<_, _>::create(|| { if let Some(id) = freepages::fetch(index.clone(), freepage_first) { let mut write = index.write(id, false); write.clear(); @@ -68,80 +60,131 @@ pub fn maintain(index: impl RelationWrite, check: impl Fn()) { let mut trace = Vec::new(); - let first = *jump_tuple.first(); - let mut current = first; - let mut computing = None; - while current != u32::MAX { + let mut callback = |code: (f32, f32, f32, f32, Vec<_>), mean, payload| { + tape.push(Branch { + mean, + dis_u_2: code.0, + factor_ppc: code.1, + factor_ip: code.2, + factor_err: code.3, + signs: code.4, + extra: payload, + }); + }; + let mut step = |id| { check(); - trace.push(current); - let h0_guard = index.read(current); - for i in 1..=h0_guard.len() { - let h0_tuple = h0_guard - .get(i) - .expect("data corruption") - .pipe(read_tuple::); - match h0_tuple { - H0TupleReader::_0(h0_tuple) => { - if let Some(payload) = h0_tuple.payload() { - tape.push(H0Branch { - mean: h0_tuple.mean(), - dis_u_2: h0_tuple.code().0, - factor_ppc: h0_tuple.code().1, - factor_ip: h0_tuple.code().2, - factor_err: h0_tuple.code().3, - signs: h0_tuple - .code() - .4 - .iter() - .flat_map(|x| { - std::array::from_fn::<_, 64, _>(|i| *x & (1 << i) != 0) - }) - .take(dims as _) - .collect::>(), - payload, - }); - } - } - H0TupleReader::_1(h0_tuple) => { - let computing = &mut computing.take().unwrap_or_else(Vec::new); - computing.extend_from_slice(h0_tuple.elements()); - let unpacked = unpack(computing); - for j in 0..32 { - if let Some(payload) = h0_tuple.payload()[j] { - tape.push(H0Branch { - mean: h0_tuple.mean()[j], - dis_u_2: h0_tuple.metadata().0[j], - factor_ppc: h0_tuple.metadata().1[j], - factor_ip: h0_tuple.metadata().2[j], - factor_err: h0_tuple.metadata().3[j], - signs: unpacked[j] - .iter() - .flat_map(|&x| { - [x & 1 != 0, x & 2 != 0, x & 4 != 0, x & 8 != 0] - }) - .collect(), - payload, - }); - } - } - } - H0TupleReader::_2(h0_tuple) => { - let computing = computing.get_or_insert_with(Vec::new); - computing.extend_from_slice(h0_tuple.elements()); - } - } - } - current = h0_guard.get_opaque().next; - drop(h0_guard); + trace.push(id); + }; + tape::read_frozen_tape( + index.clone(), + *jump_tuple.frozen_first(), + || { + FunctionalAccessor::new( + Vec::<[u8; 16]>::new(), + Vec::<[u8; 16]>::extend_from_slice, + |elements: Vec<_>, input: (&[f32; 32], &[f32; 32], &[f32; 32], &[f32; 32])| { + let unpacked = unpack(&elements); + std::array::from_fn(|i| { + let f = |&x| [x & 1 != 0, x & 2 != 0, x & 4 != 0, x & 8 != 0]; + let signs = unpacked[i].iter().flat_map(f).collect::>(); + (input.0[i], input.1[i], input.2[i], input.3[i], signs) + }) + }, + ) + }, + &mut callback, + &mut step, + ); + tape::read_appendable_tape( + index.clone(), + *jump_tuple.appendable_first(), + |code| { + let signs = code + .4 + .iter() + .flat_map(|x| std::array::from_fn::<_, 64, _>(|i| *x & (1 << i) != 0)) + .take(dims as _) + .collect::>(); + (code.0, code.1, code.2, code.3, signs) + }, + &mut callback, + &mut step, + ); + + let (frozen_tape, branches) = tape.into_inner(); + + let mut appendable_tape = TapeWriter::create(|| index.extend(false)); + + for branch in branches { + appendable_tape.push(AppendableTuple { + mean: branch.mean, + dis_u_2: branch.dis_u_2, + factor_ppc: branch.factor_ppc, + factor_ip: branch.factor_ip, + factor_err: branch.factor_err, + payload: Some(branch.extra), + elements: rabitq::pack_to_u64(&branch.signs), + }); } - let tape = tape.into_inner(); - let new = tape.first(); - drop(tape); + *jump_tuple.frozen_first() = { frozen_tape }.first(); + *jump_tuple.appendable_first() = { appendable_tape }.first(); - *jump_tuple.first() = new; drop(jump_guard); freepages::mark(index.clone(), freepage_first, &trace); } } + +struct FrozenTapeWriter { + tape: TapeWriter, + branches: Vec>, +} + +impl FrozenTapeWriter +where + G: PageGuard + DerefMut, + G::Target: Page, + E: Fn() -> G, +{ + fn create(extend: E) -> Self { + Self { + tape: TapeWriter::create(extend), + branches: Vec::new(), + } + } + fn push(&mut self, branch: Branch) { + self.branches.push(branch); + if self.branches.len() == 32 { + let chunk = std::array::from_fn::<_, 32, _>(|_| self.branches.pop().unwrap()); + let mut remain = padding_pack(chunk.iter().map(|x| rabitq::pack_to_u4(&x.signs))); + loop { + let freespace = self.tape.freespace(); + if FrozenTuple::estimate_size_0(remain.len()) <= freespace as usize { + self.tape.tape_put(FrozenTuple::_0 { + mean: chunk.each_ref().map(|x| x.mean), + dis_u_2: chunk.each_ref().map(|x| x.dis_u_2), + factor_ppc: chunk.each_ref().map(|x| x.factor_ppc), + factor_ip: chunk.each_ref().map(|x| x.factor_ip), + factor_err: chunk.each_ref().map(|x| x.factor_err), + payload: chunk.each_ref().map(|x| Some(x.extra)), + elements: remain, + }); + break; + } + if let Some(w) = FrozenTuple::fit_1(freespace) { + let (left, right) = remain.split_at(std::cmp::min(w, remain.len())); + self.tape.tape_put(FrozenTuple::_1 { + elements: left.to_vec(), + }); + remain = right.to_vec(); + } else { + self.tape.tape_move(); + } + } + } + } + fn into_inner(self) -> (TapeWriter, Vec>) { + (self.tape, self.branches) + } +} diff --git a/crates/algorithm/src/operator.rs b/crates/algorithm/src/operator.rs index e8a57bd..ae80180 100644 --- a/crates/algorithm/src/operator.rs +++ b/crates/algorithm/src/operator.rs @@ -287,6 +287,34 @@ where } } +pub struct FunctionalAccessor { + data: T, + p: P, + f: F, +} + +impl FunctionalAccessor { + pub fn new(data: T, p: P, f: F) -> Self { + Self { data, p, f } + } +} + +impl Accessor1 for FunctionalAccessor +where + P: for<'a> FnMut(&'a mut T, &'a [E]), + F: FnOnce(T, M) -> R, +{ + type Output = R; + + fn push(&mut self, input: &[E]) { + (self.p)(&mut self.data, input); + } + + fn finish(self, input: M) -> Self::Output { + (self.f)(self.data, input) + } +} + pub struct LAccess<'a, E, M, A> { elements: &'a [E], metadata: M, diff --git a/crates/algorithm/src/pipe.rs b/crates/algorithm/src/pipe.rs deleted file mode 100644 index 18cfc37..0000000 --- a/crates/algorithm/src/pipe.rs +++ /dev/null @@ -1,14 +0,0 @@ -pub trait Pipe { - fn pipe(self, f: impl FnOnce(Self) -> T) -> T - where - Self: Sized; -} - -impl Pipe for S { - fn pipe(self, f: impl FnOnce(Self) -> T) -> T - where - Self: Sized, - { - f(self) - } -} diff --git a/crates/algorithm/src/prewarm.rs b/crates/algorithm/src/prewarm.rs index 587f752..9395c06 100644 --- a/crates/algorithm/src/prewarm.rs +++ b/crates/algorithm/src/prewarm.rs @@ -1,12 +1,12 @@ -use crate::operator::Operator; -use crate::pipe::Pipe; +use crate::operator::{FunctionalAccessor, Operator}; use crate::tuples::*; -use crate::{Page, RelationRead, vectors}; +use crate::{Page, RelationRead, tape, vectors}; use std::fmt::Write; pub fn prewarm(index: impl RelationRead, height: i32, check: impl Fn()) -> String { let meta_guard = index.read(0); - let meta_tuple = meta_guard.get(1).unwrap().pipe(read_tuple::); + let meta_bytes = meta_guard.get(1).expect("data corruption"); + let meta_tuple = MetaTuple::deserialize_ref(meta_bytes); let height_of_root = meta_tuple.height_of_root(); let root_mean = meta_tuple.root_mean(); let root_first = meta_tuple.root_first(); @@ -20,96 +20,89 @@ pub fn prewarm(index: impl RelationRead, height: i32, check: impl F } type State = Vec; let mut state: State = { - let mut nodes = Vec::new(); + let mut results = Vec::new(); { - vectors::access_1::(index.clone(), root_mean, ()); - nodes.push(root_first); + vectors::read_for_h1_tuple::(index.clone(), root_mean, ()); + results.push(root_first); } writeln!(message, "------------------------").unwrap(); - writeln!(message, "number of nodes: {}", nodes.len()).unwrap(); - writeln!(message, "number of tuples: {}", 1).unwrap(); + writeln!(message, "number of nodes: {}", results.len()).unwrap(); writeln!(message, "number of pages: {}", 1).unwrap(); - nodes + results }; let mut step = |state: State| { - let mut counter_pages = 0_usize; - let mut counter_tuples = 0_usize; - let mut nodes = Vec::new(); - for list in state { - let mut current = list; - while current != u32::MAX { - counter_pages += 1; - check(); - let h1_guard = index.read(current); - for i in 1..=h1_guard.len() { - counter_tuples += 1; - let h1_tuple = h1_guard - .get(i) - .expect("data corruption") - .pipe(read_tuple::); - match h1_tuple { - H1TupleReader::_0(h1_tuple) => { - for mean in h1_tuple.mean().iter().copied() { - vectors::access_1::(index.clone(), mean, ()); - } - for first in h1_tuple.first().iter().copied() { - nodes.push(first); - } - } - H1TupleReader::_1(_) => (), + let mut counter = 0_usize; + let mut results = Vec::new(); + for first in state { + tape::read_h1_tape( + index.clone(), + first, + || { + fn push(_: &mut (), _: &[T]) {} + fn finish(_: (), _: (&T, &T, &T, &T)) -> [(); 32] { + [(); 32] } - } - current = h1_guard.get_opaque().next; - } + FunctionalAccessor::new((), push, finish) + }, + |(), mean, first| { + results.push(first); + vectors::read_for_h1_tuple::(index.clone(), mean, ()); + }, + |_| { + check(); + counter += 1; + }, + ); } writeln!(message, "------------------------").unwrap(); - writeln!(message, "number of nodes: {}", nodes.len()).unwrap(); - writeln!(message, "number of tuples: {}", counter_tuples).unwrap(); - writeln!(message, "number of pages: {}", counter_pages).unwrap(); - nodes + writeln!(message, "number of nodes: {}", results.len()).unwrap(); + writeln!(message, "number of pages: {}", counter).unwrap(); + results }; for _ in (std::cmp::max(1, prewarm_max_height)..height_of_root).rev() { state = step(state); } if prewarm_max_height == 0 { - let mut counter_pages = 0_usize; - let mut counter_tuples = 0_usize; - let mut counter_nodes = 0_usize; - for list in state { - let jump_guard = index.read(list); - let jump_tuple = jump_guard - .get(1) - .expect("data corruption") - .pipe(read_tuple::); - let first = jump_tuple.first(); - let mut current = first; - while current != u32::MAX { - counter_pages += 1; - check(); - let h0_guard = index.read(current); - for i in 1..=h0_guard.len() { - counter_tuples += 1; - let h0_tuple = h0_guard - .get(i) - .expect("data corruption") - .pipe(read_tuple::); - match h0_tuple { - H0TupleReader::_0(_h0_tuple) => { - counter_nodes += 1; - } - H0TupleReader::_1(_h0_tuple) => { - counter_nodes += 32; - } - H0TupleReader::_2(_) => (), + let mut counter = 0_usize; + let mut results = Vec::new(); + for first in state { + let jump_guard = index.read(first); + let jump_bytes = jump_guard.get(1).expect("data corruption"); + let jump_tuple = JumpTuple::deserialize_ref(jump_bytes); + tape::read_frozen_tape( + index.clone(), + jump_tuple.frozen_first(), + || { + fn push(_: &mut (), _: &[T]) {} + fn finish(_: (), _: (&T, &T, &T, &T)) -> [(); 32] { + [(); 32] } - } - current = h0_guard.get_opaque().next; - } + FunctionalAccessor::new((), push, finish) + }, + |(), _, _| { + results.push(()); + }, + |_| { + check(); + counter += 1; + }, + ); + tape::read_appendable_tape( + index.clone(), + jump_tuple.frozen_first(), + |_| (), + |(), _, _| { + results.push(()); + }, + |_| { + check(); + counter += 1; + }, + ); } writeln!(message, "------------------------").unwrap(); - writeln!(message, "number of nodes: {}", counter_nodes).unwrap(); - writeln!(message, "number of tuples: {}", counter_tuples).unwrap(); - writeln!(message, "number of pages: {}", counter_pages).unwrap(); + writeln!(message, "number of nodes: {}", results.len()).unwrap(); + writeln!(message, "number of pages: {}", counter).unwrap(); } message } diff --git a/crates/algorithm/src/rerank.rs b/crates/algorithm/src/rerank.rs index 840fdd4..2e409cb 100644 --- a/crates/algorithm/src/rerank.rs +++ b/crates/algorithm/src/rerank.rs @@ -22,7 +22,7 @@ pub fn rerank_index( std::iter::from_fn(move || { while !heap.is_empty() && heap.peek().map(|x| x.0) > cache.peek().map(|x| x.0) { let (_, AlwaysEqual(mean), AlwaysEqual(pay_u)) = heap.pop().unwrap(); - if let Some(dis_u) = vectors::access_0::( + if let Some(dis_u) = vectors::read_for_h0_tuple::( index.clone(), mean, pay_u, diff --git a/crates/algorithm/src/search.rs b/crates/algorithm/src/search.rs index 9972e86..7ed279a 100644 --- a/crates/algorithm/src/search.rs +++ b/crates/algorithm/src/search.rs @@ -1,9 +1,7 @@ use crate::linked_vec::LinkedVec; use crate::operator::*; -use crate::pipe::Pipe; -use crate::tape::{access_0, access_1}; use crate::tuples::*; -use crate::{Page, RelationRead, RerankMethod, vectors}; +use crate::{Page, RelationRead, RerankMethod, tape, vectors}; use always_equal::AlwaysEqual; use distance::Distance; use std::cmp::Reverse; @@ -25,7 +23,8 @@ pub fn search( )>, ) { let meta_guard = index.read(0); - let meta_tuple = meta_guard.get(1).unwrap().pipe(read_tuple::); + let meta_bytes = meta_guard.get(1).expect("data corruption"); + let meta_tuple = MetaTuple::deserialize_ref(meta_bytes); let dims = meta_tuple.dims(); let is_residual = meta_tuple.is_residual(); let rerank_in_heap = meta_tuple.rerank_in_heap(); @@ -52,7 +51,7 @@ pub fn search( let mut state: State = vec![{ let mean = root_mean; if is_residual { - let residual_u = vectors::access_1::( + let residual_u = vectors::read_for_h1_tuple::( index.clone(), mean, LAccess::new( @@ -73,7 +72,7 @@ pub fn search( } else { default_lut.as_ref().map(|x| &x.0).unwrap() }; - access_1( + tape::read_h1_tape( index.clone(), first, || { @@ -85,6 +84,7 @@ pub fn search( |lowerbound, mean, first| { results.push((Reverse(lowerbound), AlwaysEqual(mean), AlwaysEqual(first))); }, + |_| (), ); } let mut heap = BinaryHeap::from(results.into_vec()); @@ -93,7 +93,7 @@ pub fn search( while !heap.is_empty() && heap.peek().map(|x| x.0) > cache.peek().map(|x| x.0) { let (_, AlwaysEqual(mean), AlwaysEqual(first)) = heap.pop().unwrap(); if is_residual { - let (dis_u, residual_u) = vectors::access_1::( + let (dis_u, residual_u) = vectors::read_for_h1_tuple::( index.clone(), mean, LAccess::new( @@ -110,7 +110,7 @@ pub fn search( AlwaysEqual(Some(residual_u)), )); } else { - let dis_u = vectors::access_1::( + let dis_u = vectors::read_for_h1_tuple::( index.clone(), mean, LAccess::new( @@ -139,24 +139,29 @@ pub fn search( default_lut.as_ref().unwrap() }; let jump_guard = index.read(first); - let jump_tuple = jump_guard - .get(1) - .expect("data corruption") - .pipe(read_tuple::); - let first = jump_tuple.first(); - access_0( + let jump_bytes = jump_guard.get(1).expect("data corruption"); + let jump_tuple = JumpTuple::deserialize_ref(jump_bytes); + let mut callback = |lowerbound, mean, payload| { + results.push((Reverse(lowerbound), AlwaysEqual(mean), AlwaysEqual(payload))); + }; + tape::read_frozen_tape( index.clone(), - first, + jump_tuple.frozen_first(), || { RAccess::new( (&lut.0.4, (lut.0.0, lut.0.1, lut.0.2, lut.0.3, epsilon)), O::Distance::block_accessor(), ) }, + &mut callback, + |_| (), + ); + tape::read_appendable_tape( + index.clone(), + jump_tuple.appendable_first(), |code| O::Distance::compute_lowerbound_binary(&lut.1, code, epsilon), - |lowerbound, mean, payload| { - results.push((Reverse(lowerbound), AlwaysEqual(mean), AlwaysEqual(payload))); - }, + &mut callback, + |_| (), ); } ( diff --git a/crates/algorithm/src/tape.rs b/crates/algorithm/src/tape.rs index edc1c4e..e90be1f 100644 --- a/crates/algorithm/src/tape.rs +++ b/crates/algorithm/src/tape.rs @@ -1,9 +1,6 @@ use crate::operator::Accessor1; -use crate::pipe::Pipe; use crate::tuples::*; use crate::{Page, PageGuard, RelationRead, RelationWrite}; -use distance::Distance; -use simd::fast_scan::{any_pack, padding_pack}; use std::marker::PhantomData; use std::num::NonZeroU64; use std::ops::DerefMut; @@ -35,10 +32,10 @@ where pub fn first(&self) -> u32 { self.first } - fn freespace(&self) -> u16 { + pub fn freespace(&self) -> u16 { self.head.freespace() } - fn tape_move(&mut self) { + pub fn tape_move(&mut self) { if self.head.len() == 0 { panic!("tuple is too large to fit in a fresh page"); } @@ -56,7 +53,7 @@ where T: Tuple, { pub fn push(&mut self, x: T) -> IndexPointer { - let bytes = serialize(&x); + let bytes = T::serialize(&x); if let Some(i) = self.head.alloc(&bytes) { pair_to_pointer((self.head.id(), i)) } else { @@ -70,8 +67,8 @@ where } } } - fn tape_put(&mut self, x: T) -> IndexPointer { - let bytes = serialize(&x); + pub fn tape_put(&mut self, x: T) -> IndexPointer { + let bytes = T::serialize(&x); if let Some(i) = self.head.alloc(&bytes) { pair_to_pointer((self.head.id(), i)) } else { @@ -80,268 +77,111 @@ where } } -pub struct H1Branch { - pub mean: IndexPointer, - pub dis_u_2: f32, - pub factor_ppc: f32, - pub factor_ip: f32, - pub factor_err: f32, - pub signs: Vec, - pub first: u32, -} - -pub struct H1TapeWriter { - tape: TapeWriter, - branches: Vec, -} - -impl H1TapeWriter -where - G: PageGuard + DerefMut, - G::Target: Page, - E: Fn() -> G, -{ - pub fn create(extend: E) -> Self { - Self { - tape: TapeWriter::create(extend), - branches: Vec::new(), - } - } - pub fn push(&mut self, branch: H1Branch) { - self.branches.push(branch); - if self.branches.len() == 32 { - let chunk = std::array::from_fn::<_, 32, _>(|_| self.branches.pop().unwrap()); - let mut remain = padding_pack(chunk.iter().map(|x| rabitq::pack_to_u4(&x.signs))); - loop { - let freespace = self.tape.freespace(); - match (H1Tuple::fit_0(freespace), H1Tuple::fit_1(freespace)) { - (Some(w), _) if w >= remain.len() => { - self.tape.tape_put(H1Tuple::_0 { - mean: chunk.each_ref().map(|x| x.mean), - dis_u_2: chunk.each_ref().map(|x| x.dis_u_2), - factor_ppc: chunk.each_ref().map(|x| x.factor_ppc), - factor_ip: chunk.each_ref().map(|x| x.factor_ip), - factor_err: chunk.each_ref().map(|x| x.factor_err), - first: chunk.each_ref().map(|x| x.first), - len: chunk.len() as _, - elements: remain, - }); - break; - } - (_, Some(w)) => { - let (left, right) = remain.split_at(std::cmp::min(w, remain.len())); - self.tape.tape_put(H1Tuple::_1 { - elements: left.to_vec(), - }); - remain = right.to_vec(); - } - (_, None) => self.tape.tape_move(), - } - } - } - } - pub fn into_inner(mut self) -> TapeWriter { - let chunk = self.branches; - if !chunk.is_empty() { - let mut remain = padding_pack(chunk.iter().map(|x| rabitq::pack_to_u4(&x.signs))); - loop { - let freespace = self.tape.freespace(); - match (H1Tuple::fit_0(freespace), H1Tuple::fit_1(freespace)) { - (Some(w), _) if w >= remain.len() => { - self.tape.push(H1Tuple::_0 { - mean: any_pack(chunk.iter().map(|x| x.mean)), - dis_u_2: any_pack(chunk.iter().map(|x| x.dis_u_2)), - factor_ppc: any_pack(chunk.iter().map(|x| x.factor_ppc)), - factor_ip: any_pack(chunk.iter().map(|x| x.factor_ip)), - factor_err: any_pack(chunk.iter().map(|x| x.factor_err)), - first: any_pack(chunk.iter().map(|x| x.first)), - len: chunk.len() as _, - elements: remain, - }); - break; - } - (_, Some(w)) => { - let (left, right) = remain.split_at(std::cmp::min(w, remain.len())); - self.tape.tape_put(H1Tuple::_1 { - elements: left.to_vec(), - }); - remain = right.to_vec(); - } - (_, None) => self.tape.tape_move(), - } - } - } - self.tape - } -} - -pub struct H0Branch { - pub mean: IndexPointer, - pub dis_u_2: f32, - pub factor_ppc: f32, - pub factor_ip: f32, - pub factor_err: f32, - pub signs: Vec, - pub payload: NonZeroU64, -} - -pub struct H0TapeWriter { - tape: TapeWriter, - branches: Vec, -} - -impl H0TapeWriter -where - G: PageGuard + DerefMut, - G::Target: Page, - E: Fn() -> G, -{ - pub fn create(extend: E) -> Self { - Self { - tape: TapeWriter::create(extend), - branches: Vec::new(), - } - } - pub fn push(&mut self, branch: H0Branch) { - self.branches.push(branch); - if self.branches.len() == 32 { - let chunk = std::array::from_fn::<_, 32, _>(|_| self.branches.pop().unwrap()); - let mut remain = padding_pack(chunk.iter().map(|x| rabitq::pack_to_u4(&x.signs))); - loop { - let freespace = self.tape.freespace(); - match (H0Tuple::fit_1(freespace), H0Tuple::fit_2(freespace)) { - (Some(w), _) if w >= remain.len() => { - self.tape.push(H0Tuple::_1 { - mean: chunk.each_ref().map(|x| x.mean), - dis_u_2: chunk.each_ref().map(|x| x.dis_u_2), - factor_ppc: chunk.each_ref().map(|x| x.factor_ppc), - factor_ip: chunk.each_ref().map(|x| x.factor_ip), - factor_err: chunk.each_ref().map(|x| x.factor_err), - payload: chunk.each_ref().map(|x| Some(x.payload)), - elements: remain, - }); - break; - } - (_, Some(w)) => { - let (left, right) = remain.split_at(std::cmp::min(w, remain.len())); - self.tape.tape_put(H0Tuple::_2 { - elements: left.to_vec(), - }); - remain = right.to_vec(); - } - (_, None) => self.tape.tape_move(), - } - } - } - } - pub fn into_inner(mut self) -> TapeWriter { - for x in self.branches { - self.tape.push(H0Tuple::_0 { - mean: x.mean, - dis_u_2: x.dis_u_2, - factor_ppc: x.factor_ppc, - factor_ip: x.factor_ip, - factor_err: x.factor_err, - payload: Some(x.payload), - elements: rabitq::pack_to_u64(&x.signs), - }); - } - self.tape - } -} - -pub fn access_1( +pub fn read_h1_tape( index: impl RelationRead, first: u32, - make_block_accessor: impl Fn() -> A + Copy, - mut callback: impl FnMut(Distance, IndexPointer, u32), + accessor: impl Fn() -> A, + mut callback: impl FnMut(T, IndexPointer, u32), + mut step: impl FnMut(u32), ) where A: for<'a> Accessor1< [u8; 16], (&'a [f32; 32], &'a [f32; 32], &'a [f32; 32], &'a [f32; 32]), - Output = [Distance; 32], + Output = [T; 32], >, { assert!(first != u32::MAX); let mut current = first; - let mut computing = None; + let mut x = None; while current != u32::MAX { - let h1_guard = index.read(current); - for i in 1..=h1_guard.len() { - let h1_tuple = h1_guard - .get(i) - .expect("data corruption") - .pipe(read_tuple::); - match h1_tuple { - H1TupleReader::_0(h1_tuple) => { - let mut compute = computing.take().unwrap_or_else(make_block_accessor); - compute.push(h1_tuple.elements()); - let lowerbounds = compute.finish(h1_tuple.metadata()); - for i in 0..h1_tuple.len() { - callback( - lowerbounds[i as usize], - h1_tuple.mean()[i as usize], - h1_tuple.first()[i as usize], - ); + step(current); + let guard = index.read(current); + for i in 1..=guard.len() { + let bytes = guard.get(i).expect("data corruption"); + let tuple = H1Tuple::deserialize_ref(bytes); + match tuple { + H1TupleReader::_0(tuple) => { + let mut x = x.take().unwrap_or_else(&accessor); + x.push(tuple.elements()); + let values = x.finish(tuple.metadata()); + for (j, value) in values.into_iter().enumerate() { + if j < tuple.len() as usize { + callback(value, tuple.mean()[j], tuple.first()[j]); + } } } - H1TupleReader::_1(h1_tuple) => { - let computing = computing.get_or_insert_with(make_block_accessor); - computing.push(h1_tuple.elements()); + H1TupleReader::_1(tuple) => { + x.get_or_insert_with(&accessor).push(tuple.elements()); } } } - current = h1_guard.get_opaque().next; + current = guard.get_opaque().next; } } -pub fn access_0( +pub fn read_frozen_tape( index: impl RelationRead, first: u32, - make_block_accessor: impl Fn() -> A + Copy, - compute_binary: impl Fn((f32, f32, f32, f32, &[u64])) -> Distance, - mut callback: impl FnMut(Distance, IndexPointer, NonZeroU64), + accessor: impl Fn() -> A, + mut callback: impl FnMut(T, IndexPointer, NonZeroU64), + mut step: impl FnMut(u32), ) where A: for<'a> Accessor1< [u8; 16], (&'a [f32; 32], &'a [f32; 32], &'a [f32; 32], &'a [f32; 32]), - Output = [Distance; 32], + Output = [T; 32], >, { assert!(first != u32::MAX); let mut current = first; - let mut computing = None; + let mut x = None; while current != u32::MAX { - let h0_guard = index.read(current); - for i in 1..=h0_guard.len() { - let h0_tuple = h0_guard - .get(i) - .expect("data corruption") - .pipe(read_tuple::); - match h0_tuple { - H0TupleReader::_0(h0_tuple) => { - let lowerbound = compute_binary(h0_tuple.code()); - if let Some(payload) = h0_tuple.payload() { - callback(lowerbound, h0_tuple.mean(), payload); - } - } - H0TupleReader::_1(h0_tuple) => { - let mut compute = computing.take().unwrap_or_else(make_block_accessor); - compute.push(h0_tuple.elements()); - let lowerbounds = compute.finish(h0_tuple.metadata()); - for j in 0..32 { - if let Some(payload) = h0_tuple.payload()[j] { - callback(lowerbounds[j], h0_tuple.mean()[j], payload); + step(current); + let guard = index.read(current); + for i in 1..=guard.len() { + let bytes = guard.get(i).expect("data corruption"); + let tuple = FrozenTuple::deserialize_ref(bytes); + match tuple { + FrozenTupleReader::_0(tuple) => { + let mut x = x.take().unwrap_or_else(&accessor); + x.push(tuple.elements()); + let values = x.finish(tuple.metadata()); + for (j, value) in values.into_iter().enumerate() { + if let Some(payload) = tuple.payload()[j] { + callback(value, tuple.mean()[j], payload); } } } - H0TupleReader::_2(h0_tuple) => { - let computing = computing.get_or_insert_with(make_block_accessor); - computing.push(h0_tuple.elements()); + FrozenTupleReader::_1(tuple) => { + x.get_or_insert_with(&accessor).push(tuple.elements()); } } } - current = h0_guard.get_opaque().next; + current = guard.get_opaque().next; + } +} + +pub fn read_appendable_tape( + index: impl RelationRead, + first: u32, + mut access: impl FnMut((f32, f32, f32, f32, &[u64])) -> T, + mut callback: impl FnMut(T, IndexPointer, NonZeroU64), + mut step: impl FnMut(u32), +) { + assert!(first != u32::MAX); + let mut current = first; + while current != u32::MAX { + step(current); + let guard = index.read(current); + for i in 1..=guard.len() { + let bytes = guard.get(i).expect("data corruption"); + let tuple = AppendableTuple::deserialize_ref(bytes); + if let Some(payload) = tuple.payload() { + let value = access(tuple.code()); + callback(value, tuple.mean(), payload); + } + } + current = guard.get_opaque().next; } } diff --git a/crates/algorithm/src/tuples.rs b/crates/algorithm/src/tuples.rs index f60d552..60a92dd 100644 --- a/crates/algorithm/src/tuples.rs +++ b/crates/algorithm/src/tuples.rs @@ -7,41 +7,22 @@ use zerocopy_derive::{FromBytes, Immutable, IntoBytes, KnownLayout}; pub const ALIGN: usize = 8; pub type Tag = u64; const MAGIC: u64 = u64::from_ne_bytes(*b"vchordrq"); -const VERSION: u64 = 2; +const VERSION: u64 = 3; pub trait Tuple: 'static { - type Reader<'a>: TupleReader<'a, Tuple = Self>; fn serialize(&self) -> Vec; } -pub trait WithWriter: Tuple { - type Writer<'a>: TupleWriter<'a, Tuple = Self>; -} - -pub trait TupleReader<'a>: Copy { - type Tuple: Tuple; - fn deserialize_ref(source: &'a [u8]) -> Self; -} - -pub trait TupleWriter<'a> { - type Tuple: Tuple; - fn deserialize_mut(source: &'a mut [u8]) -> Self; -} - -pub fn serialize(tuple: &T) -> Vec { - Tuple::serialize(tuple) +pub trait WithReader: Tuple { + type Reader<'a>; + fn deserialize_ref(source: &[u8]) -> Self::Reader<'_>; } -pub fn read_tuple(source: &[u8]) -> T::Reader<'_> { - TupleReader::deserialize_ref(source) -} - -pub fn write_tuple(source: &mut [u8]) -> T::Writer<'_> { - TupleWriter::deserialize_mut(source) +pub trait WithWriter: Tuple { + type Writer<'a>; + fn deserialize_mut(source: &mut [u8]) -> Self::Writer<'_>; } -// meta tuple - #[repr(C, align(8))] #[derive(Debug, Clone, PartialEq, FromBytes, IntoBytes, Immutable, KnownLayout)] struct MetaTupleHeader { @@ -72,8 +53,6 @@ pub struct MetaTuple { } impl Tuple for MetaTuple { - type Reader<'a> = MetaTupleReader<'a>; - fn serialize(&self) -> Vec { MetaTupleHeader { magic: MAGIC, @@ -93,14 +72,9 @@ impl Tuple for MetaTuple { } } -#[derive(Debug, Clone, Copy)] -pub struct MetaTupleReader<'a> { - header: &'a MetaTupleHeader, -} - -impl<'a> TupleReader<'a> for MetaTupleReader<'a> { - type Tuple = MetaTuple; - fn deserialize_ref(source: &'a [u8]) -> Self { +impl WithReader for MetaTuple { + type Reader<'a> = MetaTupleReader<'a>; + fn deserialize_ref(source: &[u8]) -> MetaTupleReader<'_> { if source.len() < 16 { panic!("bad bytes") } @@ -114,10 +88,15 @@ impl<'a> TupleReader<'a> for MetaTupleReader<'a> { } let checker = RefChecker::new(source); let header = checker.prefix(0); - Self { header } + MetaTupleReader { header } } } +#[derive(Debug, Clone, Copy)] +pub struct MetaTupleReader<'a> { + header: &'a MetaTupleHeader, +} + impl MetaTupleReader<'_> { pub fn dims(self) -> u32 { self.header.dims @@ -145,8 +124,6 @@ impl MetaTupleReader<'_> { } } -// freepage tuple - #[repr(C, align(8))] #[derive(Debug, Clone, PartialEq, FromBytes, IntoBytes, Immutable, KnownLayout)] struct FreepageTupleHeader { @@ -162,8 +139,6 @@ const _: () = assert!(size_of::() == 4232); pub struct FreepageTuple {} impl Tuple for FreepageTuple { - type Reader<'a> = FreepageTupleReader<'a>; - fn serialize(&self) -> Vec { FreepageTupleHeader { a: std::array::from_fn(|_| 0), @@ -178,21 +153,11 @@ impl Tuple for FreepageTuple { impl WithWriter for FreepageTuple { type Writer<'a> = FreepageTupleWriter<'a>; -} - -#[derive(Debug, Clone, Copy)] -pub struct FreepageTupleReader<'a> { - #[allow(dead_code)] - header: &'a FreepageTupleHeader, -} - -impl<'a> TupleReader<'a> for FreepageTupleReader<'a> { - type Tuple = FreepageTuple; - fn deserialize_ref(source: &'a [u8]) -> Self { - let checker = RefChecker::new(source); + fn deserialize_mut(source: &mut [u8]) -> FreepageTupleWriter<'_> { + let mut checker = MutChecker::new(source); let header = checker.prefix(0); - Self { header } + FreepageTupleWriter { header } } } @@ -200,16 +165,6 @@ pub struct FreepageTupleWriter<'a> { header: &'a mut FreepageTupleHeader, } -impl<'a> TupleWriter<'a> for FreepageTupleWriter<'a> { - type Tuple = FreepageTuple; - - fn deserialize_mut(source: &'a mut [u8]) -> Self { - let mut checker = MutChecker::new(source); - let header = checker.prefix(0); - Self { header } - } -} - impl FreepageTupleWriter<'_> { pub fn mark(&mut self, i: usize) { let c_i = i; @@ -237,8 +192,6 @@ impl FreepageTupleWriter<'_> { } } -// vector tuple - #[repr(C, align(8))] #[derive(Debug, Clone, PartialEq, FromBytes, IntoBytes, Immutable, KnownLayout)] struct VectorTupleHeader0 { @@ -274,8 +227,6 @@ pub enum VectorTuple { } impl Tuple for VectorTuple { - type Reader<'a> = VectorTupleReader<'a, V>; - fn serialize(&self) -> Vec { let mut buffer = Vec::::new(); match self { @@ -286,9 +237,6 @@ impl Tuple for VectorTuple { } => { buffer.extend((0 as Tag).to_ne_bytes()); buffer.extend(std::iter::repeat_n(0, size_of::())); - while buffer.len() % ALIGN != 0 { - buffer.push(0); - } let metadata_s = buffer.len(); buffer.extend(metadata.as_bytes()); while buffer.len() % ALIGN != 0 { @@ -297,6 +245,9 @@ impl Tuple for VectorTuple { let elements_s = buffer.len(); buffer.extend(elements.as_bytes()); let elements_e = buffer.len(); + while buffer.len() % ALIGN != 0 { + buffer.push(0); + } buffer[size_of::()..][..size_of::()].copy_from_slice( VectorTupleHeader0 { payload: *payload, @@ -316,12 +267,12 @@ impl Tuple for VectorTuple { } => { buffer.extend((1 as Tag).to_ne_bytes()); buffer.extend(std::iter::repeat_n(0, size_of::())); - while buffer.len() % ALIGN != 0 { - buffer.push(0); - } let elements_s = buffer.len(); buffer.extend(elements.as_bytes()); let elements_e = buffer.len(); + while buffer.len() % ALIGN != 0 { + buffer.push(0); + } buffer[size_of::()..][..size_of::()].copy_from_slice( VectorTupleHeader1 { payload: *payload, @@ -337,6 +288,34 @@ impl Tuple for VectorTuple { } } +impl WithReader for VectorTuple { + type Reader<'a> = VectorTupleReader<'a, V>; + + fn deserialize_ref(source: &[u8]) -> VectorTupleReader<'_, V> { + let tag = Tag::from_ne_bytes(std::array::from_fn(|i| source[i])); + match tag { + 0 => { + let checker = RefChecker::new(source); + let header: &VectorTupleHeader0 = checker.prefix(size_of::()); + let metadata = checker.prefix(header.metadata_s); + let elements = checker.bytes(header.elements_s, header.elements_e); + VectorTupleReader::_0(VectorTupleReader0 { + header, + elements, + metadata, + }) + } + 1 => { + let checker = RefChecker::new(source); + let header: &VectorTupleHeader1 = checker.prefix(size_of::()); + let elements = checker.bytes(header.elements_s, header.elements_e); + VectorTupleReader::_1(VectorTupleReader1 { header, elements }) + } + _ => panic!("bad bytes"), + } + } +} + #[derive(Clone)] pub struct VectorTupleReader0<'a, V: Vector> { header: &'a VectorTupleHeader0, @@ -362,34 +341,6 @@ pub enum VectorTupleReader<'a, V: Vector> { impl Copy for VectorTupleReader<'_, V> {} -impl<'a, V: Vector> TupleReader<'a> for VectorTupleReader<'a, V> { - type Tuple = VectorTuple; - - fn deserialize_ref(source: &'a [u8]) -> Self { - let tag = Tag::from_ne_bytes(std::array::from_fn(|i| source[i])); - match tag { - 0 => { - let checker = RefChecker::new(source); - let header: &VectorTupleHeader0 = checker.prefix(size_of::()); - let metadata = checker.prefix(header.metadata_s); - let elements = checker.bytes(header.elements_s, header.elements_e); - Self::_0(VectorTupleReader0 { - header, - elements, - metadata, - }) - } - 1 => { - let checker = RefChecker::new(source); - let header: &VectorTupleHeader1 = checker.prefix(size_of::()); - let elements = checker.bytes(header.elements_s, header.elements_e); - Self::_1(VectorTupleReader1 { header, elements }) - } - _ => panic!("bad bytes"), - } - } -} - impl<'a, V: Vector> VectorTupleReader<'a, V> { pub fn payload(self) -> Option { match self { @@ -411,8 +362,6 @@ impl<'a, V: Vector> VectorTupleReader<'a, V> { } } -// height1tuple - #[repr(C, align(8))] #[derive(Debug, Clone, PartialEq, FromBytes, IntoBytes, Immutable, KnownLayout)] struct H1TupleHeader0 { @@ -454,15 +403,12 @@ pub enum H1Tuple { } impl H1Tuple { - pub fn fit_0(freespace: u16) -> Option { - let mut freespace = freespace as isize; - freespace -= size_of::() as isize; - freespace -= size_of::() as isize; - if freespace >= 0 { - Some(freespace as usize / size_of::<[u8; 16]>()) - } else { - None - } + pub fn estimate_size_0(elements: usize) -> usize { + let mut size = 0_usize; + size += size_of::(); + size += size_of::(); + size += elements * size_of::<[u8; 16]>(); + size } pub fn fit_1(freespace: u16) -> Option { let mut freespace = freespace as isize; @@ -477,8 +423,6 @@ impl H1Tuple { } impl Tuple for H1Tuple { - type Reader<'a> = H1TupleReader<'a>; - fn serialize(&self) -> Vec { let mut buffer = Vec::::new(); match self { @@ -494,9 +438,6 @@ impl Tuple for H1Tuple { } => { buffer.extend((0 as Tag).to_ne_bytes()); buffer.extend(std::iter::repeat_n(0, size_of::())); - while buffer.len() % ALIGN != 0 { - buffer.push(0); - } let elements_s = buffer.len(); buffer.extend(elements.as_bytes()); let elements_e = buffer.len(); @@ -519,9 +460,6 @@ impl Tuple for H1Tuple { Self::_1 { elements } => { buffer.extend((1 as Tag).to_ne_bytes()); buffer.extend(std::iter::repeat_n(0, size_of::())); - while buffer.len() % ALIGN != 0 { - buffer.push(0); - } let elements_s = buffer.len(); buffer.extend(elements.as_bytes()); let elements_e = buffer.len(); @@ -538,47 +476,47 @@ impl Tuple for H1Tuple { } } -#[derive(Debug, Clone, Copy, PartialEq)] -pub enum H1TupleReader<'a> { - _0(H1TupleReader0<'a>), - _1(H1TupleReader1<'a>), -} - -#[derive(Debug, Clone, Copy, PartialEq)] -pub struct H1TupleReader0<'a> { - header: &'a H1TupleHeader0, - elements: &'a [[u8; 16]], -} - -#[derive(Debug, Clone, Copy, PartialEq)] -pub struct H1TupleReader1<'a> { - header: &'a H1TupleHeader1, - elements: &'a [[u8; 16]], -} - -impl<'a> TupleReader<'a> for H1TupleReader<'a> { - type Tuple = H1Tuple; +impl WithReader for H1Tuple { + type Reader<'a> = H1TupleReader<'a>; - fn deserialize_ref(source: &'a [u8]) -> Self { + fn deserialize_ref(source: &[u8]) -> H1TupleReader<'_> { let tag = Tag::from_ne_bytes(std::array::from_fn(|i| source[i])); match tag { 0 => { let checker = RefChecker::new(source); let header: &H1TupleHeader0 = checker.prefix(size_of::()); let elements = checker.bytes(header.elements_s, header.elements_e); - Self::_0(H1TupleReader0 { header, elements }) + H1TupleReader::_0(H1TupleReader0 { header, elements }) } 1 => { let checker = RefChecker::new(source); let header: &H1TupleHeader1 = checker.prefix(size_of::()); let elements = checker.bytes(header.elements_s, header.elements_e); - Self::_1(H1TupleReader1 { header, elements }) + H1TupleReader::_1(H1TupleReader1 { header, elements }) } _ => panic!("bad bytes"), } } } +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum H1TupleReader<'a> { + _0(H1TupleReader0<'a>), + _1(H1TupleReader1<'a>), +} + +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct H1TupleReader0<'a> { + header: &'a H1TupleHeader0, + elements: &'a [[u8; 16]], +} + +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct H1TupleReader1<'a> { + header: &'a H1TupleHeader1, + elements: &'a [[u8; 16]], +} + impl<'a> H1TupleReader0<'a> { pub fn len(self) -> u32 { self.header.len @@ -611,30 +549,43 @@ impl<'a> H1TupleReader1<'a> { #[repr(C, align(8))] #[derive(Debug, Clone, PartialEq, FromBytes, IntoBytes, Immutable, KnownLayout)] struct JumpTupleHeader { - first: u32, - _padding_0: [ZeroU8; 4], + frozen_first: u32, + appendable_first: u32, } #[derive(Debug, Clone, PartialEq)] pub struct JumpTuple { - pub first: u32, + pub frozen_first: u32, + pub appendable_first: u32, } impl Tuple for JumpTuple { - type Reader<'a> = JumpTupleReader<'a>; - fn serialize(&self) -> Vec { JumpTupleHeader { - first: self.first, - _padding_0: Default::default(), + frozen_first: self.frozen_first, + appendable_first: self.appendable_first, } .as_bytes() .to_vec() } } +impl WithReader for JumpTuple { + type Reader<'a> = JumpTupleReader<'a>; + fn deserialize_ref(source: &[u8]) -> JumpTupleReader<'_> { + let checker = RefChecker::new(source); + let header: &JumpTupleHeader = checker.prefix(0); + JumpTupleReader { header } + } +} + impl WithWriter for JumpTuple { type Writer<'a> = JumpTupleWriter<'a>; + fn deserialize_mut(source: &mut [u8]) -> JumpTupleWriter<'_> { + let mut checker = MutChecker::new(source); + let header: &mut JumpTupleHeader = checker.prefix(0); + JumpTupleWriter { header } + } } #[derive(Debug, Clone, Copy)] @@ -642,19 +593,12 @@ pub struct JumpTupleReader<'a> { header: &'a JumpTupleHeader, } -impl<'a> TupleReader<'a> for JumpTupleReader<'a> { - type Tuple = JumpTuple; - - fn deserialize_ref(source: &'a [u8]) -> Self { - let checker = RefChecker::new(source); - let header: &JumpTupleHeader = checker.prefix(0); - Self { header } - } -} - impl JumpTupleReader<'_> { - pub fn first(self) -> u32 { - self.header.first + pub fn frozen_first(self) -> u32 { + self.header.frozen_first + } + pub fn appendable_first(self) -> u32 { + self.header.appendable_first } } @@ -663,38 +607,18 @@ pub struct JumpTupleWriter<'a> { header: &'a mut JumpTupleHeader, } -impl<'a> TupleWriter<'a> for JumpTupleWriter<'a> { - type Tuple = JumpTuple; - - fn deserialize_mut(source: &'a mut [u8]) -> Self { - let mut checker = MutChecker::new(source); - let header: &mut JumpTupleHeader = checker.prefix(0); - Self { header } - } -} - impl JumpTupleWriter<'_> { - pub fn first(&mut self) -> &mut u32 { - &mut self.header.first + pub fn frozen_first(&mut self) -> &mut u32 { + &mut self.header.frozen_first + } + pub fn appendable_first(&mut self) -> &mut u32 { + &mut self.header.appendable_first } } #[repr(C, align(8))] #[derive(Debug, Clone, PartialEq, FromBytes, IntoBytes, Immutable, KnownLayout)] -struct H0TupleHeader0 { - mean: IndexPointer, - dis_u_2: f32, - factor_ppc: f32, - factor_ip: f32, - factor_err: f32, - payload: Option, - elements_s: usize, - elements_e: usize, -} - -#[repr(C, align(8))] -#[derive(Debug, Clone, PartialEq, FromBytes, IntoBytes, Immutable, KnownLayout)] -struct H0TupleHeader1 { +struct FrozenTupleHeader0 { mean: [IndexPointer; 32], dis_u_2: [f32; 32], factor_ppc: [f32; 32], @@ -707,24 +631,15 @@ struct H0TupleHeader1 { #[repr(C, align(8))] #[derive(Debug, Clone, PartialEq, FromBytes, IntoBytes, Immutable, KnownLayout)] -struct H0TupleHeader2 { +struct FrozenTupleHeader1 { elements_s: usize, elements_e: usize, } #[allow(clippy::large_enum_variant)] #[derive(Debug, Clone, PartialEq)] -pub enum H0Tuple { +pub enum FrozenTuple { _0 { - mean: IndexPointer, - dis_u_2: f32, - factor_ppc: f32, - factor_ip: f32, - factor_err: f32, - payload: Option, - elements: Vec, - }, - _1 { mean: [IndexPointer; 32], dis_u_2: [f32; 32], factor_ppc: [f32; 32], @@ -733,26 +648,23 @@ pub enum H0Tuple { payload: [Option; 32], elements: Vec<[u8; 16]>, }, - _2 { + _1 { elements: Vec<[u8; 16]>, }, } -impl H0Tuple { - pub fn fit_1(freespace: u16) -> Option { - let mut freespace = freespace as isize; - freespace -= size_of::() as isize; - freespace -= size_of::() as isize; - if freespace >= 0 { - Some(freespace as usize / size_of::<[u8; 16]>()) - } else { - None - } +impl FrozenTuple { + pub fn estimate_size_0(elements: usize) -> usize { + let mut size = 0_usize; + size += size_of::(); + size += size_of::(); + size += elements * size_of::<[u8; 16]>(); + size } - pub fn fit_2(freespace: u16) -> Option { + pub fn fit_1(freespace: u16) -> Option { let mut freespace = freespace as isize; freespace -= size_of::() as isize; - freespace -= size_of::() as isize; + freespace -= size_of::() as isize; if freespace >= 0 { Some(freespace as usize / size_of::<[u8; 16]>()) } else { @@ -761,13 +673,11 @@ impl H0Tuple { } } -impl Tuple for H0Tuple { - type Reader<'a> = H0TupleReader<'a>; - +impl Tuple for FrozenTuple { fn serialize(&self) -> Vec { let mut buffer = Vec::::new(); match self { - H0Tuple::_0 { + FrozenTuple::_0 { mean, dis_u_2, factor_ppc, @@ -777,15 +687,12 @@ impl Tuple for H0Tuple { elements, } => { buffer.extend((0 as Tag).to_ne_bytes()); - buffer.extend(std::iter::repeat_n(0, size_of::())); - while buffer.len() % ALIGN != 0 { - buffer.push(0); - } + buffer.extend(std::iter::repeat_n(0, size_of::())); let elements_s = buffer.len(); buffer.extend(elements.as_bytes()); let elements_e = buffer.len(); - buffer[size_of::()..][..size_of::()].copy_from_slice( - H0TupleHeader0 { + buffer[size_of::()..][..size_of::()].copy_from_slice( + FrozenTupleHeader0 { mean: *mean, dis_u_2: *dis_u_2, factor_ppc: *factor_ppc, @@ -798,48 +705,14 @@ impl Tuple for H0Tuple { .as_bytes(), ); } - H0Tuple::_1 { - mean, - dis_u_2, - factor_ppc, - factor_ip, - factor_err, - payload, - elements, - } => { + Self::_1 { elements } => { buffer.extend((1 as Tag).to_ne_bytes()); - buffer.extend(std::iter::repeat_n(0, size_of::())); - while buffer.len() % ALIGN != 0 { - buffer.push(0); - } - let elements_s = buffer.len(); - buffer.extend(elements.as_bytes()); - let elements_e = buffer.len(); - buffer[size_of::()..][..size_of::()].copy_from_slice( - H0TupleHeader1 { - mean: *mean, - dis_u_2: *dis_u_2, - factor_ppc: *factor_ppc, - factor_ip: *factor_ip, - factor_err: *factor_err, - payload: *payload, - elements_s, - elements_e, - } - .as_bytes(), - ); - } - Self::_2 { elements } => { - buffer.extend((2 as Tag).to_ne_bytes()); - buffer.extend(std::iter::repeat_n(0, size_of::())); - while buffer.len() % ALIGN != 0 { - buffer.push(0); - } + buffer.extend(std::iter::repeat_n(0, size_of::())); let elements_s = buffer.len(); buffer.extend(elements.as_bytes()); let elements_e = buffer.len(); - buffer[size_of::()..][..size_of::()].copy_from_slice( - H0TupleHeader2 { + buffer[size_of::()..][..size_of::()].copy_from_slice( + FrozenTupleHeader1 { elements_s, elements_e, } @@ -851,48 +724,65 @@ impl Tuple for H0Tuple { } } -impl WithWriter for H0Tuple { - type Writer<'a> = H0TupleWriter<'a>; -} +impl WithReader for FrozenTuple { + type Reader<'a> = FrozenTupleReader<'a>; -#[derive(Debug, Clone, Copy, PartialEq)] -pub enum H0TupleReader<'a> { - _0(H0TupleReader0<'a>), - _1(H0TupleReader1<'a>), - _2(H0TupleReader2<'a>), + fn deserialize_ref(source: &[u8]) -> FrozenTupleReader<'_> { + let tag = Tag::from_ne_bytes(std::array::from_fn(|i| source[i])); + match tag { + 0 => { + let checker = RefChecker::new(source); + let header: &FrozenTupleHeader0 = checker.prefix(size_of::()); + let elements = checker.bytes(header.elements_s, header.elements_e); + FrozenTupleReader::_0(FrozenTupleReader0 { header, elements }) + } + 1 => { + let checker = RefChecker::new(source); + let header: &FrozenTupleHeader1 = checker.prefix(size_of::()); + let elements = checker.bytes(header.elements_s, header.elements_e); + FrozenTupleReader::_1(FrozenTupleReader1 { header, elements }) + } + _ => panic!("bad bytes"), + } + } } -#[derive(Debug, Clone, Copy, PartialEq)] -pub struct H0TupleReader0<'a> { - header: &'a H0TupleHeader0, - elements: &'a [u64], -} +impl WithWriter for FrozenTuple { + type Writer<'a> = FrozenTupleWriter<'a>; -impl<'a> H0TupleReader0<'a> { - pub fn mean(self) -> IndexPointer { - self.header.mean - } - pub fn code(self) -> (f32, f32, f32, f32, &'a [u64]) { - ( - self.header.dis_u_2, - self.header.factor_ppc, - self.header.factor_ip, - self.header.factor_err, - self.elements, - ) - } - pub fn payload(self) -> Option { - self.header.payload + fn deserialize_mut(source: &mut [u8]) -> FrozenTupleWriter<'_> { + let tag = Tag::from_ne_bytes(std::array::from_fn(|i| source[i])); + match tag { + 0 => { + let mut checker = MutChecker::new(source); + let header: &mut FrozenTupleHeader0 = checker.prefix(size_of::()); + let elements = checker.bytes(header.elements_s, header.elements_e); + FrozenTupleWriter::_0(FrozenTupleWriter0 { header, elements }) + } + 1 => { + let mut checker = MutChecker::new(source); + let header: &mut FrozenTupleHeader1 = checker.prefix(size_of::()); + let elements = checker.bytes(header.elements_s, header.elements_e); + FrozenTupleWriter::_1(FrozenTupleWriter1 { header, elements }) + } + _ => panic!("bad bytes"), + } } } #[derive(Debug, Clone, Copy, PartialEq)] -pub struct H0TupleReader1<'a> { - header: &'a H0TupleHeader1, +pub enum FrozenTupleReader<'a> { + _0(FrozenTupleReader0<'a>), + _1(FrozenTupleReader1<'a>), +} + +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct FrozenTupleReader0<'a> { + header: &'a FrozenTupleHeader0, elements: &'a [[u8; 16]], } -impl<'a> H0TupleReader1<'a> { +impl<'a> FrozenTupleReader0<'a> { pub fn mean(self) -> &'a [IndexPointer; 32] { &self.header.mean } @@ -913,113 +803,149 @@ impl<'a> H0TupleReader1<'a> { } #[derive(Debug, Clone, Copy, PartialEq)] -pub struct H0TupleReader2<'a> { - header: &'a H0TupleHeader2, +pub struct FrozenTupleReader1<'a> { + header: &'a FrozenTupleHeader1, elements: &'a [[u8; 16]], } -impl<'a> H0TupleReader2<'a> { +impl<'a> FrozenTupleReader1<'a> { pub fn elements(self) -> &'a [[u8; 16]] { self.elements } } -impl<'a> TupleReader<'a> for H0TupleReader<'a> { - type Tuple = H0Tuple; - - fn deserialize_ref(source: &'a [u8]) -> Self { - let tag = Tag::from_ne_bytes(std::array::from_fn(|i| source[i])); - match tag { - 0 => { - let checker = RefChecker::new(source); - let header: &H0TupleHeader0 = checker.prefix(size_of::()); - let elements = checker.bytes(header.elements_s, header.elements_e); - Self::_0(H0TupleReader0 { header, elements }) - } - 1 => { - let checker = RefChecker::new(source); - let header: &H0TupleHeader1 = checker.prefix(size_of::()); - let elements = checker.bytes(header.elements_s, header.elements_e); - Self::_1(H0TupleReader1 { header, elements }) - } - 2 => { - let checker = RefChecker::new(source); - let header: &H0TupleHeader2 = checker.prefix(size_of::()); - let elements = checker.bytes(header.elements_s, header.elements_e); - Self::_2(H0TupleReader2 { header, elements }) - } - _ => panic!("bad bytes"), - } - } -} - #[derive(Debug)] -pub enum H0TupleWriter<'a> { - _0(H0TupleWriter0<'a>), - _1(H0TupleWriter1<'a>), +pub enum FrozenTupleWriter<'a> { + _0(FrozenTupleWriter0<'a>), #[allow(dead_code)] - _2(H0TupleWriter2<'a>), + _1(FrozenTupleWriter1<'a>), } #[derive(Debug)] -pub struct H0TupleWriter0<'a> { - header: &'a mut H0TupleHeader0, - #[allow(dead_code)] - elements: &'a mut [u64], -} - -#[derive(Debug)] -pub struct H0TupleWriter1<'a> { - header: &'a mut H0TupleHeader1, +pub struct FrozenTupleWriter0<'a> { + header: &'a mut FrozenTupleHeader0, #[allow(dead_code)] elements: &'a mut [[u8; 16]], } #[derive(Debug)] -pub struct H0TupleWriter2<'a> { +pub struct FrozenTupleWriter1<'a> { #[allow(dead_code)] - header: &'a mut H0TupleHeader2, + header: &'a mut FrozenTupleHeader1, #[allow(dead_code)] elements: &'a mut [[u8; 16]], } -impl<'a> TupleWriter<'a> for H0TupleWriter<'a> { - type Tuple = H0Tuple; +impl FrozenTupleWriter0<'_> { + pub fn payload(&mut self) -> &mut [Option; 32] { + &mut self.header.payload + } +} - fn deserialize_mut(source: &'a mut [u8]) -> Self { - let tag = Tag::from_ne_bytes(std::array::from_fn(|i| source[i])); - match tag { - 0 => { - let mut checker = MutChecker::new(source); - let header: &mut H0TupleHeader0 = checker.prefix(size_of::()); - let elements = checker.bytes(header.elements_s, header.elements_e); - Self::_0(H0TupleWriter0 { header, elements }) - } - 1 => { - let mut checker = MutChecker::new(source); - let header: &mut H0TupleHeader1 = checker.prefix(size_of::()); - let elements = checker.bytes(header.elements_s, header.elements_e); - Self::_1(H0TupleWriter1 { header, elements }) - } - 2 => { - let mut checker = MutChecker::new(source); - let header: &mut H0TupleHeader2 = checker.prefix(size_of::()); - let elements = checker.bytes(header.elements_s, header.elements_e); - Self::_2(H0TupleWriter2 { header, elements }) +#[repr(C, align(8))] +#[derive(Debug, Clone, PartialEq, FromBytes, IntoBytes, Immutable, KnownLayout)] +struct AppendableTupleHeader { + mean: IndexPointer, + dis_u_2: f32, + factor_ppc: f32, + factor_ip: f32, + factor_err: f32, + payload: Option, + elements_s: usize, + elements_e: usize, +} + +#[allow(clippy::large_enum_variant)] +#[derive(Debug, Clone, PartialEq)] +pub struct AppendableTuple { + pub mean: IndexPointer, + pub dis_u_2: f32, + pub factor_ppc: f32, + pub factor_ip: f32, + pub factor_err: f32, + pub payload: Option, + pub elements: Vec, +} + +impl Tuple for AppendableTuple { + fn serialize(&self) -> Vec { + let mut buffer = Vec::::new(); + buffer.extend(std::iter::repeat_n(0, size_of::())); + let elements_s = buffer.len(); + buffer.extend(self.elements.as_bytes()); + let elements_e = buffer.len(); + buffer[..size_of::()].copy_from_slice( + AppendableTupleHeader { + mean: self.mean, + dis_u_2: self.dis_u_2, + factor_ppc: self.factor_ppc, + factor_ip: self.factor_ip, + factor_err: self.factor_err, + payload: self.payload, + elements_s, + elements_e, } - _ => panic!("bad bytes"), - } + .as_bytes(), + ); + buffer } } -impl H0TupleWriter0<'_> { - pub fn payload(&mut self) -> &mut Option { - &mut self.header.payload +impl WithReader for AppendableTuple { + type Reader<'a> = AppendableTupleReader<'a>; + + fn deserialize_ref(source: &[u8]) -> AppendableTupleReader<'_> { + let checker = RefChecker::new(source); + let header: &AppendableTupleHeader = checker.prefix(0); + let elements = checker.bytes(header.elements_s, header.elements_e); + AppendableTupleReader { header, elements } } } -impl H0TupleWriter1<'_> { - pub fn payload(&mut self) -> &mut [Option; 32] { +impl WithWriter for AppendableTuple { + type Writer<'a> = AppendableTupleWriter<'a>; + + fn deserialize_mut(source: &mut [u8]) -> AppendableTupleWriter<'_> { + let mut checker = MutChecker::new(source); + let header: &mut AppendableTupleHeader = checker.prefix(0); + let elements = checker.bytes(header.elements_s, header.elements_e); + AppendableTupleWriter { header, elements } + } +} + +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct AppendableTupleReader<'a> { + header: &'a AppendableTupleHeader, + elements: &'a [u64], +} + +impl<'a> AppendableTupleReader<'a> { + pub fn mean(self) -> IndexPointer { + self.header.mean + } + pub fn code(self) -> (f32, f32, f32, f32, &'a [u64]) { + ( + self.header.dis_u_2, + self.header.factor_ppc, + self.header.factor_ip, + self.header.factor_err, + self.elements, + ) + } + pub fn payload(self) -> Option { + self.header.payload + } +} + +#[derive(Debug)] +pub struct AppendableTupleWriter<'a> { + header: &'a mut AppendableTupleHeader, + #[allow(dead_code)] + elements: &'a mut [u64], +} + +impl AppendableTupleWriter<'_> { + pub fn payload(&mut self) -> &mut Option { &mut self.header.payload } } @@ -1201,9 +1127,6 @@ fn aliasing_test() { let elements = (0u32..1111).collect::>(); let mut buffer = Vec::::new(); buffer.extend(std::iter::repeat_n(0, size_of::())); - while buffer.len() % ALIGN != 0 { - buffer.push(0); - } let elements_s = buffer.len(); buffer.extend(elements.as_bytes()); let elements_e = buffer.len(); diff --git a/crates/algorithm/src/vectors.rs b/crates/algorithm/src/vectors.rs index dcc11af..42b83b0 100644 --- a/crates/algorithm/src/vectors.rs +++ b/crates/algorithm/src/vectors.rs @@ -1,11 +1,10 @@ use crate::operator::*; -use crate::pipe::Pipe; use crate::tuples::*; use crate::{Page, PageGuard, RelationRead, RelationWrite, tape}; use std::num::NonZeroU64; use vector::VectorOwned; -pub fn access_1< +pub fn read_for_h1_tuple< O: Operator, A: Accessor1<::Element, ::Metadata>, >( @@ -16,21 +15,19 @@ pub fn access_1< let mut cursor = Err(mean); let mut result = accessor; while let Err(mean) = cursor.map_err(pointer_to_pair) { - let vector_guard = index.read(mean.0); - let vector_tuple = vector_guard - .get(mean.1) - .expect("data corruption") - .pipe(read_tuple::>); - if vector_tuple.payload().is_some() { + let guard = index.read(mean.0); + let bytes = guard.get(mean.1).expect("data corruption"); + let tuple = VectorTuple::::deserialize_ref(bytes); + if tuple.payload().is_some() { panic!("data corruption"); } - result.push(vector_tuple.elements()); - cursor = vector_tuple.metadata_or_pointer(); + result.push(tuple.elements()); + cursor = tuple.metadata_or_pointer(); } result.finish(cursor.expect("data corruption")) } -pub fn access_0< +pub fn read_for_h0_tuple< O: Operator, A: Accessor1<::Element, ::Metadata>, >( @@ -42,18 +39,17 @@ pub fn access_0< let mut cursor = Err(mean); let mut result = accessor; while let Err(mean) = cursor.map_err(pointer_to_pair) { - let vector_guard = index.read(mean.0); - let vector_tuple = vector_guard - .get(mean.1)? - .pipe(read_tuple::>); - if vector_tuple.payload().is_none() { + let guard = index.read(mean.0); + let bytes = guard.get(mean.1)?; + let tuple = VectorTuple::::deserialize_ref(bytes); + if tuple.payload().is_none() { panic!("data corruption"); } - if vector_tuple.payload() != Some(payload) { + if tuple.payload() != Some(payload) { return None; } - result.push(vector_tuple.elements()); - cursor = vector_tuple.metadata_or_pointer(); + result.push(tuple.elements()); + cursor = tuple.metadata_or_pointer(); } Some(result.finish(cursor.ok()?)) } @@ -74,7 +70,7 @@ pub fn append( let (metadata, slices) = O::Vector::vector_split(vector); let mut chain = Ok(metadata); for i in (0..slices.len()).rev() { - let bytes = serialize::>(&match chain { + let bytes = VectorTuple::::serialize(&match chain { Ok(metadata) => VectorTuple::_0 { elements: slices[i].to_vec(), payload: Some(payload),