Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shared memory hash table #1379

Open
wants to merge 15 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pgrx-examples/shmem/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ static HASH: PgLwLock<heapless::FnvIndexMap<i32, i32, 4>> = PgLwLock::new();
static STRUCT: PgLwLock<Pgtest> = PgLwLock::new();
static PRIMITIVE: PgLwLock<i32> = PgLwLock::new();
static ATOMIC: PgAtomic<std::sync::atomic::AtomicBool> = PgAtomic::new();
static HASH_TABLE: PgHashMap<i64, i64> = PgHashMap::new(250);
static HASH_TABLE: ShmemHashMap<i64, i64> = ShmemHashMap::new(250);

#[pg_guard]
pub extern "C" fn _PG_init() {
Expand Down
63 changes: 37 additions & 26 deletions pgrx/src/shmem_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@ use uuid::Uuid;

#[derive(Debug, Eq, PartialEq)]
#[non_exhaustive]
pub enum Error {
pub enum ShmemHashMapError {
/// Hash table can't have more entries due to fixed allocation size.
HashTableFull,
}

#[derive(Copy, Clone, Debug)]
struct PgHashMapInner {
struct ShmemHashMapInner {
htab: *mut pg_sys::HTAB,
elements: u64,
elements: i64,
}

unsafe impl PGRXSharedMemory for PgHashMapInner {}
unsafe impl Send for PgHashMapInner {}
unsafe impl Sync for PgHashMapInner {}
unsafe impl PGRXSharedMemory for ShmemHashMapInner {}
unsafe impl Send for ShmemHashMapInner {}
unsafe impl Sync for ShmemHashMapInner {}

#[repr(C)]
#[derive(Copy, Clone, Debug)]
Expand All @@ -39,7 +39,7 @@ struct Value<K, V> {
value: V,
}

impl Default for PgHashMapInner {
impl Default for ShmemHashMapInner {
fn default() -> Self {
Self { htab: std::ptr::null_mut(), elements: 0 }
}
Expand All @@ -48,12 +48,12 @@ impl Default for PgHashMapInner {
/// A shared memory HashMap using Postgres' `HTAB`.
/// This HashMap is used for `pg_stat_statements` and Postgres
/// internals to store key/value pairs in shared memory.
pub struct PgHashMap<K: Copy + Clone, V: Copy + Clone> {
pub struct ShmemHashMap<K: Copy + Clone, V: Copy + Clone> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please weaken the bounds for the struct and instead keep them on the implementation. It should be made clear why certain functions require the bounds they do in order to be sound, instead of plastering the bounds across the entire API. Aside from leaking implementation details, it makes it unclear where the soundness requirements actually start, which means later refactoring that attempts to weaken the bounds can be made unsound.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that it's fine if the bounds are overeagerly applied in cases where it's not clear, but e.g. I have done a lot of this refactoring on Array<'_, T> and List<T> recently, and in general I tried to pull the scope tighter.

/// HTAB protected by a SpinLock.
htab: OnceCell<PgSpinLock<PgHashMapInner>>,
htab: OnceCell<PgSpinLock<ShmemHashMapInner>>,

/// Max size, allocated at server start.
size: u64,
size: i64,

// Markers for key/value types.
_phantom_key: std::marker::PhantomData<K>,
Expand Down Expand Up @@ -82,21 +82,30 @@ macro_rules! value_ptr {
}};
}

impl<K: Copy + Clone, V: Copy + Clone> PgHashMap<K, V> {
/// Create new `PgHashMap`. This still needs to be allocated with
impl<K: Copy + Clone, V: Copy + Clone> ShmemHashMap<K, V> {
/// Create new `ShmemHashMap`. This still needs to be allocated with
/// `pg_shmem_init!` just like any other shared memory structure.
pub const fn new(size: u64) -> PgHashMap<K, V> {
PgHashMap {
///
/// # Arguments
///
/// * `size` - Maximum number of elements in the HashMap. This is allocated
/// at server start and cannot be changed. `i64` is the expected type
/// for `pg_sys::ShmemInitHash`, so we don't attempt runtime conversions
/// unnecessarily.
Comment on lines +91 to +94
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...okay, "you only can allocate this once ever" is pretty coherent as to why we need it, yeah. Though I believe a runtime conversion that happens once ever is... fine?

///
pub const fn new(size: i64) -> ShmemHashMap<K, V> {
ShmemHashMap {
htab: OnceCell::new(),
size,
_phantom_key: std::marker::PhantomData,
_phantom_value: std::marker::PhantomData,
}
}

/// Insert a key and value into the `PgHashMap`. If the key is already
/// present, it will be replaced and returned. If the `PgHashMap` is full, return an error.
pub fn insert(&self, key: K, value: V) -> Result<Option<V>, Error> {
/// Insert a key and value into the `ShmemHashMap`. If the key is already
/// present, it will be replaced and returned. If the `ShmemHashMap` is full,
/// an error is returned.
pub fn insert(&self, key: K, value: V) -> Result<Option<V>, ShmemHashMapError> {
let mut found = false;
let mut htab = self.htab.get().unwrap().lock();
let (key_ptr, hash_value) = key!(key, htab);
Expand All @@ -122,7 +131,7 @@ impl<K: Copy + Clone, V: Copy + Clone> PgHashMap<K, V> {
// If we don't do this check, pg will overwrite
// some random entry with our key/value pair...
if entry.is_null() && htab.elements == self.size {
return Err(Error::HashTableFull);
return Err(ShmemHashMapError::HashTableFull);
}

let entry = unsafe {
Expand All @@ -148,12 +157,12 @@ impl<K: Copy + Clone, V: Copy + Clone> PgHashMap<K, V> {
Ok(return_value)
} else {
// OOM. We pre-allocate at server start, so this should never be an issue.
return Err(Error::HashTableFull);
return Err(ShmemHashMapError::HashTableFull);
}
}

/// Get a value from the HashMap using the key.
/// If the key doesn't exist, return None.
/// If the key doesn't exist, return `None`.
pub fn get(&self, key: K) -> Option<V> {
let htab = self.htab.get().unwrap().lock();
let (key_ptr, hash_value) = key!(key, htab);
Expand All @@ -177,7 +186,7 @@ impl<K: Copy + Clone, V: Copy + Clone> PgHashMap<K, V> {
}
}

/// Remove the value from the `PgHashMap` and return it.
/// Remove the value from the `ShmemHashMap` and return it.
/// If the key doesn't exist, return None.
pub fn remove(&self, key: K) -> Option<V> {
if let Some(value) = self.get(key) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably use let-else, like so:

Suggested change
if let Some(value) = self.get(key) {
let Some(value) = self.get(key) else { return None };

Expand All @@ -203,15 +212,17 @@ impl<K: Copy + Clone, V: Copy + Clone> PgHashMap<K, V> {
}

/// Get the number of elements in the HashMap.
pub fn len(&self) -> u64 {
pub fn len(&self) -> i64 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reminds me, this needs capacity(&self) as well.

let htab = self.htab.get().unwrap().lock();
htab.elements
}
}

impl<K: Copy + Clone, V: Copy + Clone> PgSharedMemoryInitialization for PgHashMap<K, V> {
impl<K: Copy + Clone, V: Copy + Clone> PgSharedMemoryInitialization for ShmemHashMap<K, V> {
fn pg_init(&'static self) {
self.htab.set(PgSpinLock::new(PgHashMapInner::default())).expect("htab cell is not empty");
self.htab
.set(PgSpinLock::new(ShmemHashMapInner::default()))
.expect("htab cell is not empty");
}

fn shmem_init(&'static self) {
Expand All @@ -227,8 +238,8 @@ impl<K: Copy + Clone, V: Copy + Clone> PgSharedMemoryInitialization for PgHashMa
let htab_ptr = unsafe {
pg_sys::ShmemInitHash(
shm_name.into_raw(),
self.size.try_into().unwrap(),
self.size.try_into().unwrap(),
self.size,
self.size,
&mut hash_ctl,
(pg_sys::HASH_ELEM | pg_sys::HASH_BLOBS).try_into().unwrap(),
)
Expand Down
8 changes: 7 additions & 1 deletion pgrx/src/spinlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
//LICENSE Use of this source code is governed by the MIT license that can be found in the LICENSE file.
use crate::pg_sys;
use core::mem::MaybeUninit;
use std::fmt;
use std::{cell::UnsafeCell, marker::PhantomData};

/// A Rust locking mechanism which uses a PostgreSQL `slock_t` to lock the data.
Expand All @@ -28,7 +29,6 @@ use std::{cell::UnsafeCell, marker::PhantomData};
/// [`storage/spin.h`]:
/// https://github.com/postgres/postgres/blob/1f0c4fa255253d223447c2383ad2b384a6f05854/src/include/storage/spin.h
#[doc(alias = "slock_t")]
#[derive(Debug)]
pub struct PgSpinLock<T> {
item: UnsafeCell<T>,
lock: UnsafeCell<pg_sys::slock_t>,
Expand All @@ -39,6 +39,12 @@ pub struct PgSpinLock<T> {
unsafe impl<T: Send> Send for PgSpinLock<T> {}
unsafe impl<T: Send> Sync for PgSpinLock<T> {}

impl<T> fmt::Debug for PgSpinLock<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PgSpinLock").finish()
}
}

impl<T> PgSpinLock<T> {
/// Create a new [`PgSpinLock`]. See the type documentation for more info.
#[inline]
Expand Down