diff --git a/src/database/chunks.rs b/src/database/chunks.rs index 934cde2d..046b5fda 100644 --- a/src/database/chunks.rs +++ b/src/database/chunks.rs @@ -6,7 +6,10 @@ use bincode::{Decode, Encode, config::standard}; use byteorder::LE; use futures::channel::oneshot::{self, Canceled}; use heed::{types::U64, BytesDecode, BytesEncode, Env, MdbError}; +use heed::types::Bytes; use moka::future::Cache; +use tokio::runtime::Handle; +use tokio::task::block_in_place; use tracing::{trace, warn}; use crate::{ @@ -15,7 +18,7 @@ use crate::{ utils::hash::hash, world::chunk_format::Chunk }; - +use crate::world::importing::SerializedChunk; use super::{LMDB_PAGE_SIZE, LMDB_PAGE_SIZE_INCREMENT, LMDB_READER_SYNC, LMDB_THREADPOOL}; pub struct Zstd(PhantomData); @@ -48,6 +51,30 @@ impl<'a, T: Decode + 'a> BytesDecode<'a> for Zstd { } } +pub struct ZstdCodec; + +impl ZstdCodec { + pub async fn compress_data(data: T) -> crate::Result> { + tokio::task::spawn_blocking( + move ||{ + let mut bytes = Vec::new(); + let mut compressor = zstd::Encoder::new(&mut bytes, 3)?; + bincode::encode_into_std_write(&data, &mut compressor, standard())?; + compressor.finish()?; + Ok(bytes) + } + ).await? + } + pub async fn decompress_data(data: Vec) -> crate::Result { + tokio::task::spawn_blocking( + move || { + let decoded = bincode::decode_from_slice(data.as_slice(), standard())?; + Ok(decoded.0) + } + ).await? + } +} + /// LMDB will follow a linear growth as opposed to MDBX which /// uses a geometric growth. pub(super) fn new_page_size(old_size: usize) -> usize { @@ -114,11 +141,23 @@ impl Database { // Initialize read transaction and open chunks table let ro_tx = db.read_txn()?; let database = db - .open_database::, Zstd>(&ro_tx, Some("chunks"))? + .open_database::, Bytes>(&ro_tx, Some("chunks"))? .expect("No table \"chunks\" found. The database should have been initialized"); // Attempt to fetch chunk from table - database.get(&ro_tx, key) + let data = database.get(&ro_tx, key)?; + let chunk = match data { + Some(data) => { + // let chunk = ZstdCodec::decompress_data::(data.to_vec()).expect("Failed to decompress chunk"); + let chunk = Handle::current().block_on(async { + ZstdCodec::decompress_data::(data.to_vec()).await.expect("Failed to decompress chunk") + }); + Some(chunk) + } + None => None, + }; + + Ok(chunk) //.map_err(|err| Error::DatabaseError(format!("Failed to get chunk: {err}"))) } @@ -127,14 +166,23 @@ impl Database { // Initialize write transaction and open chunks table let mut rw_tx = db.write_txn()?; let database = db - .open_database::, Zstd>(&rw_tx, Some("chunks"))? + .open_database::, Bytes>(&rw_tx, Some("chunks"))? .expect("No table \"chunks\" found. The database should have been initialized"); // Calculate key let key = hash((chunk.dimension.as_ref().unwrap(), chunk.x_pos, chunk.z_pos)); + // let chunk = Handle::current().block_on(ZstdCodec::compress_data(chunk)).expect("Failed to compress chunk"); + let chunk = chunk.clone(); + let chunk = Handle::current().block_on(async { + ZstdCodec::compress_data(chunk).await.expect("Failed to compress chunk") + }); + /* + ZstdCodec::compress_data(chunk).await.expect("Failed to compress chunk") + });*/ + // Insert chunk - let res = database.put(&mut rw_tx, &key, chunk); + let res = database.put(&mut rw_tx, &key, chunk.as_slice()); rw_tx.commit()?; // .map_err(|err| { // Error::DatabaseError(format!("Unable to commit changes to database: {err}")) @@ -152,20 +200,20 @@ impl Database { /// Insert multiple chunks into database /// TODO: Find better name/disambiguation - fn insert_chunks_into_database(db: &Env, chunks: &[Chunk]) -> Result<(), heed::Error> { + fn insert_chunks_into_database(db: &Env, chunks: &[SerializedChunk]) -> Result<(), heed::Error> { // Initialize write transaction and open chunks table let mut rw_tx = db.write_txn()?; let database = db - .open_database::, Zstd>(&rw_tx, Some("chunks"))? + .open_database::, Bytes>(&rw_tx, Some("chunks"))? .expect("No table \"chunks\" found. The database should have been initialized"); // Update page for chunk in chunks { // Calculate key - let key = hash((chunk.dimension.as_ref().unwrap(), chunk.x_pos, chunk.z_pos)); + // let key = hash((chunk.dimension.as_ref().unwrap(), chunk.x_pos, chunk.z_pos)); // Insert chunk - database.put(&mut rw_tx, &key, chunk)?; + database.put(&mut rw_tx, &chunk.hash(), chunk.data())?; } // Commit changes rw_tx.commit()?; @@ -392,25 +440,30 @@ impl Database { /// } /// /// ``` - pub async fn batch_insert(&self, values: Vec) -> Result<(), Error> { + pub async fn batch_insert(&self, values: Vec) -> Result<(), Error> { // Clone database pointer let db = self.db.clone(); let tsk_db = self.db.clone(); // Calculate all keys - let keys = values + /* let keys = values .iter() .map(|v| hash((v.dimension.as_ref().unwrap_or_else(|| panic!("Invalid chunk @ ({},{})", v.x_pos, v.z_pos)), v.x_pos, v.z_pos))) .collect::>(); +*/ + // let keys = values.iter().map(|v| v.hash()).collect::>(); // WARNING: The previous logic was to first insert in database and then insert in cache using load_into_cache fn. // This has been modified to avoid having to query database while we already have the data available. // First insert into cache - for (key, chunk) in keys.into_iter().zip(&values) { + // TODO: Renable cache. Currently disabled because we only get serialized bytes with the hash. + // to save in the database + /*for (chunk) in values.iter() { let cache = self.cache.clone(); let db = self.db.clone(); - let chunk = chunk.clone(); + let key = chunk.hash(); + let chunk = chunk.data().clone(); tokio::spawn(async move { cache.insert(key, chunk).await; if let Err(e) = Database::load_into_cache_standalone(db, cache, key).await { @@ -418,7 +471,7 @@ impl Database { } }); } - + */ // Then insert into persistent database spawn_blocking_db(tsk_db, move || Self::insert_chunks_into_database(&db, &values)) .await diff --git a/src/database/mod.rs b/src/database/mod.rs index 89e9ccce..adb1b7c8 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -2,7 +2,7 @@ use byteorder::LE; use chunks::Zstd; use deepsize::DeepSizeOf; use futures::FutureExt; -use heed::types::U64; +use heed::types::{Bytes, U64}; use heed::{Env as LMDBDatabase, EnvFlags, EnvOpenOptions}; use moka::notification::{ListenerFuture, RemovalCause}; use rayon::{ThreadPool, ThreadPoolBuilder}; @@ -95,18 +95,18 @@ pub async fn start_database() -> Result { }); // Check if database is built. Otherwise, initialize it - let mut rw_tx = lmdb.write_txn().unwrap(); + let mut rw_tx = lmdb.write_txn()?; if lmdb - .open_database::, Zstd>(&rw_tx, Some("chunks")) - .unwrap() + // .open_database::, Zstd>(&rw_tx, Some("chunks")) + .open_database::, Bytes>(&rw_tx, Some("chunks"))? .is_none() { - lmdb.create_database::, Zstd>(&mut rw_tx, Some("chunks")) + lmdb.create_database::, Bytes>(&mut rw_tx, Some("chunks")) .expect("Unable to create database"); } // `entities` table to be added, but needs the type to do so - rw_tx.commit().unwrap(); + rw_tx.commit()?; info!("Database started"); diff --git a/src/utils/error.rs b/src/utils/error.rs index 1f61434a..8e7553f8 100644 --- a/src/utils/error.rs +++ b/src/utils/error.rs @@ -93,6 +93,10 @@ pub enum Error { #[error("Database error: {0}")] LmdbError(#[from] heed::Error), + #[error("(bincode) Encode error")] + BincodeEncodeError(#[from] bincode::error::EncodeError), + #[error("(bincode) Decode error")] + BincodeDecodeError(#[from] bincode::error::DecodeError), } impl From for Error { diff --git a/src/world/importing.rs b/src/world/importing.rs index 8ac122e2..383530a2 100644 --- a/src/world/importing.rs +++ b/src/world/importing.rs @@ -1,20 +1,41 @@ use crate::utils::prelude::*; use fastanvil::{ChunkData, Region}; use indicatif::{ProgressBar, ProgressStyle}; -use nbt_lib::NBTDeserializeBytes; +use nbt_lib::{NBTDeserializeBytes, NBTSerialize}; use rayon::prelude::*; use std::env; use std::fs::File; use std::io::Cursor; use std::path::PathBuf; use std::sync::Arc; +use bincode::config::standard; +use bincode::Encode; +use tokio::runtime::Handle; use tracing::{debug, info, warn}; - +use crate::database::chunks::ZstdCodec; use crate::state::GlobalState; +use crate::utils::hash::hash; use crate::world::chunk_format::Chunk; const DEFAULT_BATCH_SIZE: u8 = 150; +/// A serialized chunk is a tuple of the chunk's hash and the compressed chunk data +/// (hash, compressed_chunk_data) +pub struct SerializedChunk(u64, Vec); + +impl SerializedChunk { + pub fn new(hash: u64, data: Vec) -> Self { + Self(hash, data) + } + pub fn hash(&self) -> u64 { + self.0 + } + + pub fn data(&self) -> &Vec { + self.1.as_ref() + } +} + fn get_batch_size() -> i32 { let batch_size = env::args() .find(|x| x.starts_with("--batch_size=")) @@ -73,22 +94,31 @@ async fn get_total_chunks(dir: &PathBuf) -> Result { Ok(regions.into_par_iter().map(|mut region| region.iter().count()).sum()) } -fn process_chunk(chunk_data: Vec, file_name: &str, bar: Arc) -> Result { - let mut final_chunk = Chunk::read_from_bytes(&mut Cursor::new(chunk_data)) +async fn process_chunk(chunk_data: Vec, file_name: &str, bar: Arc) -> Result { + let mut chunk = Chunk::read_from_bytes(&mut Cursor::new(chunk_data)) .map_err(|e| { bar.abandon_with_message(format!("Chunk {} failed to import", file_name)); Error::Generic(format!("Could not read chunk {} {}", e, file_name)) })?; - final_chunk.convert_to_net_mode() + chunk.convert_to_net_mode() .map_err(|e| { - bar.abandon_with_message(format!("Chunk {} {} failed to import", final_chunk.x_pos, final_chunk.z_pos)); - Error::Generic(format!("Could not convert chunk {} {} to network mode: {}", final_chunk.x_pos, final_chunk.z_pos, e)) + bar.abandon_with_message(format!("Chunk {} {} failed to import", chunk.x_pos, chunk.z_pos)); + Error::Generic(format!("Could not convert chunk {} {} to network mode: {}", chunk.x_pos, chunk.z_pos, e)) })?; - final_chunk.dimension = Some("overworld".to_string()); + chunk.dimension = Some("overworld".to_string()); - Ok(final_chunk) + // let chunk_data = bincode::encode_to_vec(final_chunk, standard())?; + // let chunk_data = ZstdCodec::compress_data(final_chunk); + let hash = hash((chunk.dimension.as_ref().expect(format!("Invalid chunk @ ({},{})", chunk.x_pos, chunk.z_pos).as_str()), chunk.x_pos, chunk.z_pos)); + /*let chunk_data = tokio::task::(async move { + ZstdCodec::compress_data(chunk).await.expect("Failed to compress chunk") + });*/ + // let chunk_data = Handle::current().block_on(ZstdCodec::compress_data(chunk)).expect("Failed to compress chunk"); + let chunk_data = ZstdCodec::compress_data(chunk).await.expect("Failed to compress chunk"); + + Ok(SerializedChunk::new(hash, chunk_data)) } //noinspection RsBorrowChecker @@ -122,7 +152,7 @@ pub async fn import_regions(state: GlobalState) -> Result<()> { while !chunks.is_empty() { let chunk_batch: Vec = chunks.drain(..std::cmp::min(batch_size, chunks.len())).collect(); - let processed_chunks: Vec = chunk_batch.into_par_iter() + /*let processed_chunks: Vec = chunk_batch.into_par_iter() .filter_map(|chunk| { let data = chunk.data.clone(); match process_chunk(data, file_name, Arc::clone(&bar)) { @@ -137,6 +167,32 @@ pub async fn import_regions(state: GlobalState) -> Result<()> { } } }) + .collect();*/ + + let processed_chunks_futures: Vec<_> = chunk_batch.into_iter() + .map(|chunk| { + let data = chunk.data.clone(); + let bar_clone = Arc::clone(&bar); + let file_name = file_name.to_string(); + tokio::spawn(async move { + match process_chunk(data, &file_name, Arc::clone(&bar_clone)).await { + Ok(processed) => { + bar_clone.inc(1); + Some(processed) + } + Err(e) => { + warn!("Failed to process chunk: {}. Skipping.", e); + None + } + } + }) + }) + .collect(); + + let processed_chunks: Vec = futures::future::join_all(processed_chunks_futures) + .await + .into_iter() + .filter_map(|result| result.ok().flatten()) .collect(); // Insert the batch of processed chunks @@ -193,7 +249,7 @@ fn create_progress_bar(total_chunks: usize) -> ProgressBar { bar } -async fn insert_chunks(state: &GlobalState, queued_chunks: Vec, bar: &ProgressBar) -> Result<()> { +async fn insert_chunks(state: &GlobalState, queued_chunks: Vec, bar: &ProgressBar) -> Result<()> { state.database.batch_insert(queued_chunks).await .map_err(|e| { bar.abandon_with_message("Chunk insertion failed".to_string());