diff --git a/Cargo.lock b/Cargo.lock index 62d7b79..73ec150 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -77,6 +77,15 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + [[package]] name = "bindgen" version = "0.70.1" @@ -1513,6 +1522,7 @@ name = "vchord" version = "0.0.0" dependencies = [ "algorithm", + "bincode", "distance", "half 2.4.1", "k_means", diff --git a/Cargo.toml b/Cargo.toml index 55cf4c2..40dfd5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ random_orthogonal_matrix = { path = "./crates/random_orthogonal_matrix" } simd = { path = "./crates/simd" } vector = { path = "./crates/vector" } +bincode = "1.3.3" half.workspace = true paste = "1" pgrx = { version = "=0.12.9", default-features = false, features = ["cshim"] } diff --git a/crates/algorithm/src/cache.rs b/crates/algorithm/src/cache.rs new file mode 100644 index 0000000..0405708 --- /dev/null +++ b/crates/algorithm/src/cache.rs @@ -0,0 +1,51 @@ +use crate::pipe::Pipe; +use crate::tuples::{H1Tuple, H1TupleReader, MetaTuple, read_tuple}; +use crate::{Page, RelationRead}; + +pub fn cache(index: impl RelationRead) -> Vec { + let mut trace = Vec::::new(); + let mut read = |id| { + let result = index.read(id); + trace.push(id); + result + }; + let meta_guard = read(0); + let meta_tuple = meta_guard.get(1).unwrap().pipe(read_tuple::); + let height_of_root = meta_tuple.height_of_root(); + let root_first = meta_tuple.root_first(); + drop(meta_guard); + type State = Vec; + let mut state: State = vec![root_first]; + let mut step = |state: State| { + let mut results = Vec::new(); + for first in state { + let mut current = first; + while current != u32::MAX { + let h1_guard = read(current); + for i in 1..=h1_guard.len() { + let h1_tuple = h1_guard + .get(i) + .expect("data corruption") + .pipe(read_tuple::); + match h1_tuple { + H1TupleReader::_0(h1_tuple) => { + for first in h1_tuple.first().iter().copied() { + results.push(first); + } + } + H1TupleReader::_1(_) => (), + } + } + current = h1_guard.get_opaque().next; + } + } + results + }; + for _ in (1..height_of_root).rev() { + state = step(state); + } + for first in state { + let _ = read(first); + } + trace +} diff --git a/crates/algorithm/src/lib.rs b/crates/algorithm/src/lib.rs index dccc459..171365a 100644 --- a/crates/algorithm/src/lib.rs +++ b/crates/algorithm/src/lib.rs @@ -5,6 +5,7 @@ mod build; mod bulkdelete; +mod cache; mod freepages; mod insert; mod maintain; @@ -20,6 +21,7 @@ pub mod types; pub use build::build; pub use bulkdelete::bulkdelete; +pub use cache::cache; pub use insert::insert; pub use maintain::maintain; pub use prewarm::prewarm; diff --git a/src/index/am/am_build.rs b/src/index/am/am_build.rs index e7d6558..55b9eba 100644 --- a/src/index/am/am_build.rs +++ b/src/index/am/am_build.rs @@ -2,14 +2,17 @@ use crate::datatype::typmod::Typmod; use crate::index::am::{Reloption, ctid_to_pointer}; use crate::index::opclass::{Opfamily, opfamily}; use crate::index::projection::RandomProject; -use crate::index::storage::PostgresRelation; +use crate::index::storage::{PostgresPage, PostgresRelation}; use algorithm::operator::{Dot, L2, Op, Vector}; use algorithm::types::*; +use algorithm::{PageGuard, RelationRead, RelationWrite}; use half::f16; use pgrx::pg_sys::Datum; use rand::Rng; use simd::Floating; +use std::collections::HashMap; use std::num::NonZeroU64; +use std::ops::Deref; use std::sync::Arc; use vector::vect::VectOwned; use vector::{VectorBorrowed, VectorOwned}; @@ -208,9 +211,24 @@ pub unsafe extern "C" fn ambuild( map_structures(structures, |x| InternalBuild::build_from_vecf32(&x)), ), } - if let Some(leader) = - unsafe { VchordrqLeader::enter(heap_relation, index_relation, (*index_info).ii_Concurrent) } - { + let cache = { + let trace = algorithm::cache(index.clone()); + let mut dir = HashMap::::new(); + let mut pages = Vec::>::new(); + for i in trace { + dir.insert(i, pages.len()); + pages.push(index.read(i).clone_into_boxed()); + } + (dir, pages) + }; + if let Some(leader) = unsafe { + VchordrqLeader::enter( + heap_relation, + index_relation, + (*index_info).ii_Concurrent, + &cache, + ) + } { unsafe { parallel_build( index_relation, @@ -219,6 +237,8 @@ pub unsafe extern "C" fn ambuild( leader.tablescandesc, leader.vchordrqshared, Some(reporter), + &*leader.cache_0, + &*leader.cache_1, ); leader.wait(); let nparticipants = leader.nparticipants; @@ -312,6 +332,8 @@ struct VchordrqShared { heaprelid: pgrx::pg_sys::Oid, indexrelid: pgrx::pg_sys::Oid, isconcurrent: bool, + est_cache_0: usize, + est_cache_1: usize, /* Worker progress */ workersdonecv: pgrx::pg_sys::ConditionVariable, @@ -338,6 +360,8 @@ struct VchordrqLeader { vchordrqshared: *mut VchordrqShared, tablescandesc: *mut pgrx::pg_sys::ParallelTableScanDescData, snapshot: pgrx::pg_sys::Snapshot, + cache_0: *const [u8], + cache_1: *const [u8], } impl VchordrqLeader { @@ -345,7 +369,10 @@ impl VchordrqLeader { heap_relation: pgrx::pg_sys::Relation, index_relation: pgrx::pg_sys::Relation, isconcurrent: bool, + cache: &(HashMap, Vec>), ) -> Option { + let cache_mapping: Vec = bincode::serialize(&cache.0).unwrap(); + unsafe fn compute_parallel_workers( heap_relation: pgrx::pg_sys::Relation, index_relation: pgrx::pg_sys::Relation, @@ -402,6 +429,8 @@ impl VchordrqLeader { } let est_tablescandesc = unsafe { pgrx::pg_sys::table_parallelscan_estimate(heap_relation, snapshot) }; + let est_cache_0 = cache_mapping.len(); + let est_cache_1 = cache.1.len() * size_of::(); unsafe { estimate_chunk(&mut (*pcxt).estimator, size_of::()); estimate_keys(&mut (*pcxt).estimator, 1); @@ -433,6 +462,8 @@ impl VchordrqLeader { mutex: std::mem::zeroed(), nparticipantsdone: 0, indtuples: 0, + est_cache_0, + est_cache_1, }); pgrx::pg_sys::ConditionVariableInit(&raw mut (*vchordrqshared).workersdonecv); pgrx::pg_sys::SpinLockInit(&raw mut (*vchordrqshared).mutex); @@ -446,9 +477,29 @@ impl VchordrqLeader { tablescandesc }; + let cache_0 = unsafe { + let cache_0 = pgrx::pg_sys::shm_toc_allocate((*pcxt).toc, est_cache_0).cast::(); + std::ptr::copy(cache_mapping.as_ptr(), cache_0, est_cache_0); + core::ptr::slice_from_raw_parts(cache_0, est_cache_0) + }; + + let cache_1 = unsafe { + let cache_1 = pgrx::pg_sys::shm_toc_allocate((*pcxt).toc, est_cache_1).cast::(); + for i in 0..cache.1.len() { + std::ptr::copy( + (cache.1[i].deref() as *const PostgresPage).cast::(), + cache_1.cast::().add(i).cast(), + size_of::(), + ); + } + core::ptr::slice_from_raw_parts(cache_1, est_cache_1) + }; + unsafe { pgrx::pg_sys::shm_toc_insert((*pcxt).toc, 0xA000000000000001, vchordrqshared.cast()); pgrx::pg_sys::shm_toc_insert((*pcxt).toc, 0xA000000000000002, tablescandesc.cast()); + pgrx::pg_sys::shm_toc_insert((*pcxt).toc, 0xA000000000000003, cache_0 as _); + pgrx::pg_sys::shm_toc_insert((*pcxt).toc, 0xA000000000000004, cache_1 as _); } unsafe { @@ -474,6 +525,8 @@ impl VchordrqLeader { nparticipants: nworkers_launched + 1, vchordrqshared, tablescandesc, + cache_0, + cache_1, snapshot, }) } @@ -513,6 +566,18 @@ pub unsafe extern "C" fn vchordrq_parallel_build_main( pgrx::pg_sys::shm_toc_lookup(toc, 0xA000000000000002, false) .cast::() }; + let cache_0 = unsafe { + std::slice::from_raw_parts( + pgrx::pg_sys::shm_toc_lookup(toc, 0xA000000000000003, false).cast::(), + (*vchordrqshared).est_cache_0, + ) + }; + let cache_1 = unsafe { + std::slice::from_raw_parts( + pgrx::pg_sys::shm_toc_lookup(toc, 0xA000000000000004, false).cast::(), + (*vchordrqshared).est_cache_1, + ) + }; let heap_lockmode; let index_lockmode; if unsafe { !(*vchordrqshared).isconcurrent } { @@ -530,7 +595,16 @@ pub unsafe extern "C" fn vchordrq_parallel_build_main( } unsafe { - parallel_build(index, heap, index_info, tablescandesc, vchordrqshared, None); + parallel_build( + index, + heap, + index_info, + tablescandesc, + vchordrqshared, + None, + cache_0, + cache_1, + ); } unsafe { @@ -546,8 +620,24 @@ unsafe fn parallel_build( tablescandesc: *mut pgrx::pg_sys::ParallelTableScanDescData, vchordrqshared: *mut VchordrqShared, mut reporter: Option, + cache_0: &[u8], + cache_1: &[u8], ) { let index = unsafe { PostgresRelation::new(index_relation) }; + let index = CachingRelation { + cache: { + let cache_0: HashMap = bincode::deserialize(cache_0).unwrap(); + assert!(cache_1.len() % size_of::() == 0); + let n = cache_1.len() / size_of::(); + let cache_1 = unsafe { + (0..n) + .map(|i| &*cache_1.as_ptr().cast::().add(i)) + .collect::>() + }; + &(cache_0, cache_1) + }, + relation: index, + }; let scan = unsafe { pgrx::pg_sys::table_beginscan_parallel(heap_relation, tablescandesc) }; let opfamily = unsafe { opfamily(index_relation) }; @@ -948,3 +1038,78 @@ impl InternalBuild for VectOwned { Self::new(f16::vector_from_f32(x)) } } + +struct CachingRelation<'a, R: RelationRead> { + cache: &'a (HashMap, Vec<&'a R::Page>), + relation: R, +} + +impl Clone for CachingRelation<'_, R> { + fn clone(&self) -> Self { + Self { + cache: self.cache, + relation: self.relation.clone(), + } + } +} + +enum CachingRelationReadGuard<'a, G: Deref> { + Wrapping(G), + Cached(u32, &'a G::Target), +} + +impl PageGuard for CachingRelationReadGuard<'_, G> { + fn id(&self) -> u32 { + match self { + CachingRelationReadGuard::Wrapping(x) => x.id(), + CachingRelationReadGuard::Cached(id, _) => *id, + } + } +} + +impl Deref for CachingRelationReadGuard<'_, G> { + type Target = G::Target; + + fn deref(&self) -> &Self::Target { + match self { + CachingRelationReadGuard::Wrapping(x) => x, + CachingRelationReadGuard::Cached(_, page) => page, + } + } +} + +impl RelationRead for CachingRelation<'_, R> { + type Page = R::Page; + + type ReadGuard<'a> + = CachingRelationReadGuard<'a, R::ReadGuard<'a>> + where + Self: 'a; + + fn read(&self, id: u32) -> Self::ReadGuard<'_> { + if let Some(&x) = self.cache.0.get(&id) { + CachingRelationReadGuard::Cached(id, self.cache.1[x]) + } else { + CachingRelationReadGuard::Wrapping(self.relation.read(id)) + } + } +} + +impl RelationWrite for CachingRelation<'_, R> { + type WriteGuard<'a> + = R::WriteGuard<'a> + where + Self: 'a; + + fn write(&self, id: u32, tracking_freespace: bool) -> Self::WriteGuard<'_> { + self.relation.write(id, tracking_freespace) + } + + fn extend(&self, tracking_freespace: bool) -> Self::WriteGuard<'_> { + self.relation.extend(tracking_freespace) + } + + fn search(&self, freespace: usize) -> Option> { + self.relation.search(freespace) + } +} diff --git a/src/index/storage.rs b/src/index/storage.rs index a190ac0..8ac1c64 100644 --- a/src/index/storage.rs +++ b/src/index/storage.rs @@ -42,6 +42,13 @@ impl PostgresPage { assert_eq!(offset_of!(Self, opaque), this.header.pd_special as usize); this } + pub fn clone_into_boxed(&self) -> Box { + let mut result = Box::new_uninit(); + unsafe { + std::ptr::copy(self as *const Self, result.as_mut_ptr(), 1); + result.assume_init() + } + } } impl Page for PostgresPage {