Skip to content

Commit

Permalink
Merge pull request #10 from Asurar0/database_part2
Browse files Browse the repository at this point in the history
Finishing transition to LMDB (Rayon, Async, Zstd, Dynamic resizing)
  • Loading branch information
ReCore-sys authored Sep 12, 2024
2 parents 5df6a4f + cd1b948 commit 23890b9
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 81 deletions.
223 changes: 154 additions & 69 deletions src/database/chunks.rs
Original file line number Diff line number Diff line change
@@ -1,116 +1,195 @@
use std::borrow::Cow;
use std::future::Future;
use std::marker::PhantomData;

use bincode::{Decode, Encode, config::standard};
use byteorder::LE;
use zstd::bulk::compress as zstd_compress;
use zstd::bulk::decompress as zstd_decompress;
use heed::types::{Bytes, U64};
use heed::Env;
use tokio::task::spawn_blocking;
use tracing::{trace, warn};

use crate::database::Database;
use crate::utils::error::Error;
use crate::utils::hash::hash;
use crate::world::chunk_format::Chunk;

// use crate::utils::binary_utils::{bzip_compress, bzip_decompress};
use bincode::config::standard;
use bincode::{decode_from_slice, encode_to_vec};
use futures::channel::oneshot::{self, Canceled};
use heed::{types::U64, BytesDecode, BytesEncode, Env, MdbError};
use tracing::{info, trace, warn};

use crate::{
database::Database,
utils::error::Error,
utils::hash::hash,
world::chunk_format::Chunk
};

use super::{LMDB_PAGE_SIZE, LMDB_PAGE_SIZE_INCREMENT, LMDB_READER_SYNC, LMDB_THREADPOOL};

pub struct Zstd<T>(PhantomData<T>);

impl<'a, T: Encode + 'a> BytesEncode<'a> for Zstd<T> {
type EItem = T;

fn bytes_encode(item: &'a Self::EItem) -> Result<Cow<'a, [u8]>, heed::BoxedError> {

// Compress
let mut bytes = Vec::new();
let mut compressor = zstd::Encoder::new(&mut bytes, 6)?;
bincode::encode_into_std_write(item, &mut compressor, standard())?;
compressor.finish()?;

Ok(Cow::Owned(bytes))
}
}

impl<'a, T: Decode + 'a> BytesDecode<'a> for Zstd<T> {
type DItem = T;

fn bytes_decode(bytes: &'a [u8]) -> Result<Self::DItem, heed::BoxedError> {

let mut decompressor = zstd::Decoder::new(bytes)?;
let decoded = bincode::decode_from_std_read(&mut decompressor, standard())?;
Ok(decoded)
}
}

/// LMDB will follow a linear growth as opposed to MDBX which
/// uses a geometric growth.
pub(super) fn new_page_size(old_size: usize) -> usize {
old_size + LMDB_PAGE_SIZE_INCREMENT
}

// Will delegate a database operation to the database threadpool
pub(super) fn spawn_blocking_db<F, R>(db: Env, f: F) -> impl Future<Output = Result<Result<R,heed::Error>,Canceled>>
where
F: Fn() -> Result<R,heed::Error> + Send + 'static,
R: Send + 'static + std::fmt::Debug,
{
let (tx,res) = oneshot::channel::<Result<R,heed::Error>>();

let pool = LMDB_THREADPOOL.get().unwrap();
pool.spawn(move || {

let read_lock = LMDB_READER_SYNC.read()
.expect("Database RWLock has been poisoned. A thread should have crashed somewhere.");

let mut res = f();
if let Err(heed::Error::Mdb(MdbError::MapFull)) = res {

tracing::warn!("Database page is full. Resizing...");

drop(read_lock);

let _resize_guard = LMDB_READER_SYNC.write()
.expect("Database RWLock has been poisoned. A thread should have crashed somewhere.");

let mut global_size_lock = LMDB_PAGE_SIZE.lock().unwrap();
let old_size = *global_size_lock;
*global_size_lock = new_page_size(old_size);
unsafe { db.resize(*global_size_lock).expect("Unable to resize LMDB environment.") };

tracing::info!("Successfully resized LMDB page from {} MiB to {} MiB", (old_size / 1024usize.pow(2)), (*global_size_lock / 1024usize.pow(2)));

drop(global_size_lock);
drop(_resize_guard);

res = f();
} else {
drop(read_lock)
}

if tx.send(res).is_err() {
tracing::warn!("A database task has been unable to send its result because the receiver at other end have closed.")
}
});

res
}

impl Database {

// Close the database
pub fn close(self) {
let token = self.db.prepare_for_closing();
token.wait();
}

/// Fetch chunk from database
fn get_chunk_from_database(db: &Env, key: &u64) -> Result<Option<Chunk>, Error> {
fn get_chunk_from_database(db: &Env, key: &u64) -> Result<Option<Chunk>, heed::Error> {
// Initialize read transaction and open chunks table
let ro_tx = db.read_txn()?;
let database = db
.open_database::<U64<LE>, Bytes>(&ro_tx, Some("chunks"))?
.open_database::<U64<LE>, Zstd<Chunk>>(&ro_tx, Some("chunks"))?
.expect("No table \"chunks\" found. The database should have been initialized");

// Attempt to fetch chunk from table
if let Ok(data) = database.get(&ro_tx, key) {
Ok(data.map(|encoded_chunk| {
// let decompressed =
// bzip_decompress(&encoded_chunk).expect("Failed to decompress chunk");
let decompressed = zstd_decompress(&encoded_chunk, 1024*1024*64).expect("Failed to decompress chunk");
let chunk: (Chunk, usize) = decode_from_slice(&*decompressed, standard())
.expect("Failed to decode chunk from database");
chunk.0
}))
} else {
Err(Error::DatabaseError("Failed to get chunk".into()))
}
database.get(&ro_tx, key)
//.map_err(|err| Error::DatabaseError(format!("Failed to get chunk: {err}")))
}

/// Insert a single chunk into database
fn insert_chunk_into_database(db: &Env, chunk: &Chunk) -> Result<(), Error> {
fn insert_chunk_into_database(db: &Env, chunk: &Chunk) -> Result<(), heed::Error> {
// Initialize write transaction and open chunks table
let mut rw_tx = db.write_txn()?;
let database = db
.open_database::<U64<LE>, Bytes>(&rw_tx, Some("chunks"))?
.open_database::<U64<LE>, Zstd<Chunk>>(&rw_tx, Some("chunks"))?
.expect("No table \"chunks\" found. The database should have been initialized");

// Encode chunk
let encoded_chunk = encode_to_vec(chunk, standard()).expect("Failed to encode chunk");
// let compressed = bzip_compress(&encoded_chunk).expect("Failed to compress chunk");
let compressed = zstd_compress(&encoded_chunk, 3).expect("Failed to compress chunk");
// Calculate key
let key = hash((chunk.dimension.as_ref().unwrap(), chunk.x_pos, chunk.z_pos));

// Insert chunk
let res = database.put(&mut rw_tx, &key, &compressed);
rw_tx.commit().map_err(|err| {
Error::DatabaseError(format!("Unable to commit changes to database: {err}"))
})?;

if let Err(err) = res {
Err(Error::DatabaseError(format!(
"Failed to insert or update chunk: {err}"
)))
} else {
Ok(())
}
let res = database.put(&mut rw_tx, &key, chunk);
rw_tx.commit()?;
// .map_err(|err| {
// Error::DatabaseError(format!("Unable to commit changes to database: {err}"))
// })?;

res
// if let Err(err) = res {
// Err(Error::DatabaseError(format!(
// "Failed to insert or update chunk: {err}"
// )))
// } else {
// Ok(())
// }
}

/// Insert multiple chunks into database
/// TODO: Find better name/disambiguation
fn insert_chunks_into_database(db: &Env, chunks: &[Chunk]) -> Result<(), Error> {
fn insert_chunks_into_database(db: &Env, chunks: &[Chunk]) -> Result<(), heed::Error> {
// Initialize write transaction and open chunks table
let mut rw_tx = db.write_txn()?;
let database = db
.open_database::<U64<LE>, Bytes>(&rw_tx, Some("chunks"))?
.open_database::<U64<LE>, Zstd<Chunk>>(&rw_tx, Some("chunks"))?
.expect("No table \"chunks\" found. The database should have been initialized");

// Update page
for chunk in chunks {
// Encode chunk
let encoded_chunk = encode_to_vec(chunk, standard()).expect("Failed to encode chunk");

// let compressed = bzip_compress(&encoded_chunk).expect("Failed to compress chunk");
let compressed = zstd_compress(&encoded_chunk, 3).expect("Failed to compress chunk");
// Calculate key
let key = hash((chunk.dimension.as_ref().unwrap(), chunk.x_pos, chunk.z_pos));

// Insert chunk
database.put(&mut rw_tx, &key, &compressed).map_err(|err| {
Error::DatabaseError(format!("Failed to insert or update chunk: {err}"))
})?;
database.put(&mut rw_tx, &key, chunk)?
// .map_err(|err| {
// Error::DatabaseError(format!("Failed to insert or update chunk: {err}"))
// })?;
}

// Commit changes
rw_tx.commit().map_err(|err| {
Error::DatabaseError(format!("Unable to commit changes to database: {err}"))
})?;
rw_tx.commit()?;
// .map_err(|err| {
// Error::DatabaseError(format!("Unable to commit changes to database: {err}"))
// })?;
Ok(())
}

async fn load_into_cache(&self, key: u64) -> Result<(), Error> {
let db = self.db.clone();
let tsk_db = self.db.clone();
let cache = self.cache.clone();

tokio::task::spawn(async move {

// Check cache
if cache.contains_key(&key) {
trace!("Chunk already exists in cache: {:X}", key);
}
// If not in cache then search in database
else if let Ok(chunk) =
spawn_blocking(move || Self::get_chunk_from_database(&db, &key))
spawn_blocking_db(tsk_db, move || Self::get_chunk_from_database(&db, &key))
.await
.unwrap()
{
Expand Down Expand Up @@ -158,7 +237,8 @@ impl Database {
// Insert chunk into persistent database
let chunk = value.clone();
let db = self.db.clone();
spawn_blocking(move || Self::insert_chunk_into_database(&db, &chunk))
let tsk_db = self.db.clone();
spawn_blocking_db(tsk_db, move || Self::insert_chunk_into_database(&db, &chunk))
.await
.unwrap()?;

Expand Down Expand Up @@ -195,14 +275,15 @@ impl Database {
) -> Result<Option<Chunk>, Error> {
// Calculate key of this chunk and clone database pointer
let key = hash((dimension, x, z));
let tsk_db = self.db.clone();
let db = self.db.clone();

// First check cache
if self.cache.contains_key(&key) {
Ok(self.cache.get(&key).await)
}
// Attempt to get chunk from persistent database
else if let Some(chunk) = spawn_blocking(move || Self::get_chunk_from_database(&db, &key))
else if let Some(chunk) = spawn_blocking_db(tsk_db, move || Self::get_chunk_from_database(&db, &key))
.await
.unwrap()?
{
Expand Down Expand Up @@ -236,14 +317,15 @@ impl Database {
pub async fn chunk_exists(&self, x: i32, z: i32, dimension: String) -> Result<bool, Error> {
// Calculate key and copy database pointer
let key = hash((dimension, x, z));
let tsk_db = self.db.clone();
let db = self.db.clone();

// Check first cache
if self.cache.contains_key(&key) {
Ok(true)
// Else check persistent database and load it into cache
} else {
let res = spawn_blocking(move || Self::get_chunk_from_database(&db, &key)).await?;
let res = spawn_blocking_db(tsk_db, move || Self::get_chunk_from_database(&db, &key)).await.unwrap();

// WARNING: The previous logic was to order the chunk to be loaded into cache whether it existed or not.
// This has been replaced by directly loading the queried chunk into cache
Expand All @@ -255,7 +337,7 @@ impl Database {
}
Ok(exist)
}
Err(err) => Err(err),
Err(err) => Err(Error::LmdbError(err)),
}
}
}
Expand Down Expand Up @@ -286,7 +368,8 @@ impl Database {
// Insert new chunk state into persistent database
let chunk = value.clone();
let db = self.db.clone();
spawn_blocking(move || Self::insert_chunk_into_database(&db, &chunk)).await??;
let tsk_db = self.db.clone();
spawn_blocking_db(tsk_db, move || Self::insert_chunk_into_database(&db, &chunk)).await.unwrap()?;

// Insert new chunk state into cache
self.cache.insert(key, value).await;
Expand Down Expand Up @@ -373,11 +456,12 @@ impl Database {

// Clone database pointer
let db = self.db.clone();
let tsk_db = self.db.clone();

// Calculate all keys
let keys = values
.iter()
.map(|v| hash((v.dimension.as_ref().expect(format!("Invalid chunk @ ({},{})", v.x_pos, v.z_pos).as_str()), v.x_pos, v.z_pos)))
.map(|v| hash((v.dimension.as_ref().unwrap_or_else(|| panic!("Invalid chunk @ ({},{})", v.x_pos, v.z_pos)), v.x_pos, v.z_pos)))
.collect::<Vec<u64>>();

// WARNING: The previous logic was to first insert in database and then insert in cache using load_into_cache fn.
Expand All @@ -387,11 +471,12 @@ impl Database {
self.cache.insert(key, chunk.clone()).await;
self.load_into_cache(key).await?;
}

// Then insert into persistent database
spawn_blocking(move || Self::insert_chunks_into_database(&db, &values))
spawn_blocking_db(tsk_db, move || Self::insert_chunks_into_database(&db, &values))
.await
.unwrap()?;

Ok(())
}
}
Expand Down
Loading

0 comments on commit 23890b9

Please sign in to comment.