Skip to content

Commit

Permalink
feat: pinning index in memory when building, second try
Browse files Browse the repository at this point in the history
Signed-off-by: usamoi <[email protected]>
  • Loading branch information
usamoi committed Feb 6, 2025
1 parent 3c082e6 commit 1b321ef
Show file tree
Hide file tree
Showing 6 changed files with 241 additions and 5 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ random_orthogonal_matrix = { path = "./crates/random_orthogonal_matrix" }
simd = { path = "./crates/simd" }
vector = { path = "./crates/vector" }

bincode = "1.3.3"
half.workspace = true
paste = "1"
pgrx = { version = "=0.12.9", default-features = false, features = ["cshim"] }
Expand Down
51 changes: 51 additions & 0 deletions crates/algorithm/src/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use crate::pipe::Pipe;
use crate::tuples::{H1Tuple, H1TupleReader, MetaTuple, read_tuple};
use crate::{Page, RelationRead};

pub fn cache(index: impl RelationRead) -> Vec<u32> {
let mut trace = Vec::<u32>::new();
let mut read = |id| {
let result = index.read(id);
trace.push(id);
result
};
let meta_guard = read(0);
let meta_tuple = meta_guard.get(1).unwrap().pipe(read_tuple::<MetaTuple>);
let height_of_root = meta_tuple.height_of_root();
let root_first = meta_tuple.root_first();
drop(meta_guard);
type State = Vec<u32>;
let mut state: State = vec![root_first];
let mut step = |state: State| {
let mut results = Vec::new();
for first in state {
let mut current = first;
while current != u32::MAX {
let h1_guard = read(current);
for i in 1..=h1_guard.len() {
let h1_tuple = h1_guard
.get(i)
.expect("data corruption")
.pipe(read_tuple::<H1Tuple>);
match h1_tuple {
H1TupleReader::_0(h1_tuple) => {
for first in h1_tuple.first().iter().copied() {
results.push(first);
}
}
H1TupleReader::_1(_) => (),
}
}
current = h1_guard.get_opaque().next;
}
}
results
};
for _ in (1..height_of_root).rev() {
state = step(state);
}
for first in state {
let _ = read(first);
}
trace
}
2 changes: 2 additions & 0 deletions crates/algorithm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

mod build;
mod bulkdelete;
mod cache;
mod freepages;
mod insert;
mod maintain;
Expand All @@ -20,6 +21,7 @@ pub mod types;

pub use build::build;
pub use bulkdelete::bulkdelete;
pub use cache::cache;
pub use insert::insert;
pub use maintain::maintain;
pub use prewarm::prewarm;
Expand Down
175 changes: 170 additions & 5 deletions src/index/am/am_build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ use crate::datatype::typmod::Typmod;
use crate::index::am::{Reloption, ctid_to_pointer};
use crate::index::opclass::{Opfamily, opfamily};
use crate::index::projection::RandomProject;
use crate::index::storage::PostgresRelation;
use crate::index::storage::{PostgresPage, PostgresRelation};
use algorithm::operator::{Dot, L2, Op, Vector};
use algorithm::types::*;
use algorithm::{PageGuard, RelationRead, RelationWrite};
use half::f16;
use pgrx::pg_sys::Datum;
use rand::Rng;
use simd::Floating;
use std::collections::HashMap;
use std::num::NonZeroU64;
use std::ops::Deref;
use std::sync::Arc;
use vector::vect::VectOwned;
use vector::{VectorBorrowed, VectorOwned};
Expand Down Expand Up @@ -208,9 +211,24 @@ pub unsafe extern "C" fn ambuild(
map_structures(structures, |x| InternalBuild::build_from_vecf32(&x)),
),
}
if let Some(leader) =
unsafe { VchordrqLeader::enter(heap_relation, index_relation, (*index_info).ii_Concurrent) }
{
let cache = {
let trace = algorithm::cache(index.clone());
let mut dir = HashMap::<u32, usize>::new();
let mut pages = Vec::<Box<PostgresPage>>::new();
for i in trace {
dir.insert(i, pages.len());
pages.push(index.read(i).clone_into_boxed());
}
(dir, pages)
};
if let Some(leader) = unsafe {
VchordrqLeader::enter(
heap_relation,
index_relation,
(*index_info).ii_Concurrent,
&cache,
)
} {
unsafe {
parallel_build(
index_relation,
Expand All @@ -219,6 +237,8 @@ pub unsafe extern "C" fn ambuild(
leader.tablescandesc,
leader.vchordrqshared,
Some(reporter),
&*leader.cache_0,
&*leader.cache_1,
);
leader.wait();
let nparticipants = leader.nparticipants;
Expand Down Expand Up @@ -312,6 +332,8 @@ struct VchordrqShared {
heaprelid: pgrx::pg_sys::Oid,
indexrelid: pgrx::pg_sys::Oid,
isconcurrent: bool,
est_cache_0: usize,
est_cache_1: usize,

/* Worker progress */
workersdonecv: pgrx::pg_sys::ConditionVariable,
Expand All @@ -338,14 +360,19 @@ struct VchordrqLeader {
vchordrqshared: *mut VchordrqShared,
tablescandesc: *mut pgrx::pg_sys::ParallelTableScanDescData,
snapshot: pgrx::pg_sys::Snapshot,
cache_0: *const [u8],
cache_1: *const [u8],
}

impl VchordrqLeader {
pub unsafe fn enter(
heap_relation: pgrx::pg_sys::Relation,
index_relation: pgrx::pg_sys::Relation,
isconcurrent: bool,
cache: &(HashMap<u32, usize>, Vec<Box<PostgresPage>>),
) -> Option<Self> {
let cache_mapping: Vec<u8> = bincode::serialize(&cache.0).unwrap();

unsafe fn compute_parallel_workers(
heap_relation: pgrx::pg_sys::Relation,
index_relation: pgrx::pg_sys::Relation,
Expand Down Expand Up @@ -402,6 +429,8 @@ impl VchordrqLeader {
}
let est_tablescandesc =
unsafe { pgrx::pg_sys::table_parallelscan_estimate(heap_relation, snapshot) };
let est_cache_0 = cache_mapping.len();
let est_cache_1 = cache.1.len() * size_of::<PostgresPage>();
unsafe {
estimate_chunk(&mut (*pcxt).estimator, size_of::<VchordrqShared>());
estimate_keys(&mut (*pcxt).estimator, 1);
Expand Down Expand Up @@ -433,6 +462,8 @@ impl VchordrqLeader {
mutex: std::mem::zeroed(),
nparticipantsdone: 0,
indtuples: 0,
est_cache_0,
est_cache_1,
});
pgrx::pg_sys::ConditionVariableInit(&raw mut (*vchordrqshared).workersdonecv);
pgrx::pg_sys::SpinLockInit(&raw mut (*vchordrqshared).mutex);
Expand All @@ -446,9 +477,29 @@ impl VchordrqLeader {
tablescandesc
};

let cache_0 = unsafe {
let cache_0 = pgrx::pg_sys::shm_toc_allocate((*pcxt).toc, est_cache_0).cast::<u8>();
std::ptr::copy(cache_mapping.as_ptr(), cache_0, est_cache_0);
core::ptr::slice_from_raw_parts(cache_0, est_cache_0)
};

let cache_1 = unsafe {
let cache_1 = pgrx::pg_sys::shm_toc_allocate((*pcxt).toc, est_cache_1).cast::<u8>();
for i in 0..cache.1.len() {
std::ptr::copy(
(cache.1[i].deref() as *const PostgresPage).cast::<u8>(),
cache_1.cast::<PostgresPage>().add(i).cast(),
size_of::<PostgresPage>(),
);
}
core::ptr::slice_from_raw_parts(cache_1, est_cache_1)
};

unsafe {
pgrx::pg_sys::shm_toc_insert((*pcxt).toc, 0xA000000000000001, vchordrqshared.cast());
pgrx::pg_sys::shm_toc_insert((*pcxt).toc, 0xA000000000000002, tablescandesc.cast());
pgrx::pg_sys::shm_toc_insert((*pcxt).toc, 0xA000000000000003, cache_0 as _);
pgrx::pg_sys::shm_toc_insert((*pcxt).toc, 0xA000000000000004, cache_1 as _);
}

unsafe {
Expand All @@ -474,6 +525,8 @@ impl VchordrqLeader {
nparticipants: nworkers_launched + 1,
vchordrqshared,
tablescandesc,
cache_0,
cache_1,
snapshot,
})
}
Expand Down Expand Up @@ -513,6 +566,18 @@ pub unsafe extern "C" fn vchordrq_parallel_build_main(
pgrx::pg_sys::shm_toc_lookup(toc, 0xA000000000000002, false)
.cast::<pgrx::pg_sys::ParallelTableScanDescData>()
};
let cache_0 = unsafe {
std::slice::from_raw_parts(
pgrx::pg_sys::shm_toc_lookup(toc, 0xA000000000000003, false).cast::<u8>(),
(*vchordrqshared).est_cache_0,
)
};
let cache_1 = unsafe {
std::slice::from_raw_parts(
pgrx::pg_sys::shm_toc_lookup(toc, 0xA000000000000004, false).cast::<u8>(),
(*vchordrqshared).est_cache_1,
)
};
let heap_lockmode;
let index_lockmode;
if unsafe { !(*vchordrqshared).isconcurrent } {
Expand All @@ -530,7 +595,16 @@ pub unsafe extern "C" fn vchordrq_parallel_build_main(
}

unsafe {
parallel_build(index, heap, index_info, tablescandesc, vchordrqshared, None);
parallel_build(
index,
heap,
index_info,
tablescandesc,
vchordrqshared,
None,
cache_0,
cache_1,
);
}

unsafe {
Expand All @@ -546,8 +620,24 @@ unsafe fn parallel_build(
tablescandesc: *mut pgrx::pg_sys::ParallelTableScanDescData,
vchordrqshared: *mut VchordrqShared,
mut reporter: Option<PostgresReporter>,
cache_0: &[u8],
cache_1: &[u8],
) {
let index = unsafe { PostgresRelation::new(index_relation) };
let index = CachingRelation {
cache: {
let cache_0: HashMap<u32, usize> = bincode::deserialize(cache_0).unwrap();
assert!(cache_1.len() % size_of::<PostgresPage>() == 0);
let n = cache_1.len() / size_of::<PostgresPage>();
let cache_1 = unsafe {
(0..n)
.map(|i| &*cache_1.as_ptr().cast::<PostgresPage>().add(i))
.collect::<Vec<&PostgresPage>>()
};
&(cache_0, cache_1)
},
relation: index,
};

let scan = unsafe { pgrx::pg_sys::table_beginscan_parallel(heap_relation, tablescandesc) };
let opfamily = unsafe { opfamily(index_relation) };
Expand Down Expand Up @@ -948,3 +1038,78 @@ impl InternalBuild for VectOwned<f16> {
Self::new(f16::vector_from_f32(x))
}
}

struct CachingRelation<'a, R: RelationRead> {
cache: &'a (HashMap<u32, usize>, Vec<&'a R::Page>),
relation: R,
}

impl<R: RelationRead> Clone for CachingRelation<'_, R> {
fn clone(&self) -> Self {
Self {
cache: self.cache,
relation: self.relation.clone(),
}
}
}

enum CachingRelationReadGuard<'a, G: Deref> {
Wrapping(G),
Cached(u32, &'a G::Target),
}

impl<G: PageGuard + Deref> PageGuard for CachingRelationReadGuard<'_, G> {
fn id(&self) -> u32 {
match self {
CachingRelationReadGuard::Wrapping(x) => x.id(),
CachingRelationReadGuard::Cached(id, _) => *id,
}
}
}

impl<G: Deref> Deref for CachingRelationReadGuard<'_, G> {
type Target = G::Target;

fn deref(&self) -> &Self::Target {
match self {
CachingRelationReadGuard::Wrapping(x) => x,
CachingRelationReadGuard::Cached(_, page) => page,
}
}
}

impl<R: RelationRead> RelationRead for CachingRelation<'_, R> {
type Page = R::Page;

type ReadGuard<'a>
= CachingRelationReadGuard<'a, R::ReadGuard<'a>>
where
Self: 'a;

fn read(&self, id: u32) -> Self::ReadGuard<'_> {
if let Some(&x) = self.cache.0.get(&id) {
CachingRelationReadGuard::Cached(id, self.cache.1[x])
} else {
CachingRelationReadGuard::Wrapping(self.relation.read(id))
}
}
}

impl<R: RelationWrite> RelationWrite for CachingRelation<'_, R> {
type WriteGuard<'a>
= R::WriteGuard<'a>
where
Self: 'a;

fn write(&self, id: u32, tracking_freespace: bool) -> Self::WriteGuard<'_> {
self.relation.write(id, tracking_freespace)
}

fn extend(&self, tracking_freespace: bool) -> Self::WriteGuard<'_> {
self.relation.extend(tracking_freespace)
}

fn search(&self, freespace: usize) -> Option<Self::WriteGuard<'_>> {
self.relation.search(freespace)
}
}
7 changes: 7 additions & 0 deletions src/index/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ impl PostgresPage {
assert_eq!(offset_of!(Self, opaque), this.header.pd_special as usize);
this
}
pub fn clone_into_boxed(&self) -> Box<Self> {
let mut result = Box::new_uninit();
unsafe {
std::ptr::copy(self as *const Self, result.as_mut_ptr(), 1);
result.assume_init()
}
}
}

impl Page for PostgresPage {
Expand Down

0 comments on commit 1b321ef

Please sign in to comment.