Skip to content

Commit

Permalink
The code is absolutely garbage, but it works & That's enough for toda…
Browse files Browse the repository at this point in the history
…y ladies and gentlemen. Some other day, we'll meet again. I'm going crazy.
  • Loading branch information
Sweattypalms committed Sep 12, 2024
1 parent bc6f119 commit 4e2ac0b
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 31 deletions.
81 changes: 67 additions & 14 deletions src/database/chunks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use bincode::{Decode, Encode, config::standard};
use byteorder::LE;
use futures::channel::oneshot::{self, Canceled};
use heed::{types::U64, BytesDecode, BytesEncode, Env, MdbError};
use heed::types::Bytes;
use moka::future::Cache;
use tokio::runtime::Handle;
use tokio::task::block_in_place;
use tracing::{trace, warn};

use crate::{
Expand All @@ -15,7 +18,7 @@ use crate::{
utils::hash::hash,
world::chunk_format::Chunk
};

use crate::world::importing::SerializedChunk;
use super::{LMDB_PAGE_SIZE, LMDB_PAGE_SIZE_INCREMENT, LMDB_READER_SYNC, LMDB_THREADPOOL};

pub struct Zstd<T>(PhantomData<T>);
Expand Down Expand Up @@ -48,6 +51,30 @@ impl<'a, T: Decode + 'a> BytesDecode<'a> for Zstd<T> {
}
}

pub struct ZstdCodec;

impl ZstdCodec {
pub async fn compress_data<T: Encode + Send + 'static>(data: T) -> crate::Result<Vec<u8>> {
tokio::task::spawn_blocking(
move ||{
let mut bytes = Vec::new();
let mut compressor = zstd::Encoder::new(&mut bytes, 3)?;
bincode::encode_into_std_write(&data, &mut compressor, standard())?;
compressor.finish()?;
Ok(bytes)
}
).await?
}
pub async fn decompress_data<T: Decode + Send + 'static>(data: Vec<u8>) -> crate::Result<T> {
tokio::task::spawn_blocking(
move || {
let decoded = bincode::decode_from_slice(data.as_slice(), standard())?;
Ok(decoded.0)
}
).await?
}
}

/// LMDB will follow a linear growth as opposed to MDBX which
/// uses a geometric growth.
pub(super) fn new_page_size(old_size: usize) -> usize {
Expand Down Expand Up @@ -114,11 +141,23 @@ impl Database {
// Initialize read transaction and open chunks table
let ro_tx = db.read_txn()?;
let database = db
.open_database::<U64<LE>, Zstd<Chunk>>(&ro_tx, Some("chunks"))?
.open_database::<U64<LE>, Bytes>(&ro_tx, Some("chunks"))?
.expect("No table \"chunks\" found. The database should have been initialized");

// Attempt to fetch chunk from table
database.get(&ro_tx, key)
let data = database.get(&ro_tx, key)?;
let chunk = match data {
Some(data) => {
// let chunk = ZstdCodec::decompress_data::<Chunk>(data.to_vec()).expect("Failed to decompress chunk");
let chunk = Handle::current().block_on(async {
ZstdCodec::decompress_data::<Chunk>(data.to_vec()).await.expect("Failed to decompress chunk")
});
Some(chunk)
}
None => None,
};

Ok(chunk)
//.map_err(|err| Error::DatabaseError(format!("Failed to get chunk: {err}")))
}

Expand All @@ -127,14 +166,23 @@ impl Database {
// Initialize write transaction and open chunks table
let mut rw_tx = db.write_txn()?;
let database = db
.open_database::<U64<LE>, Zstd<Chunk>>(&rw_tx, Some("chunks"))?
.open_database::<U64<LE>, Bytes>(&rw_tx, Some("chunks"))?
.expect("No table \"chunks\" found. The database should have been initialized");

// Calculate key
let key = hash((chunk.dimension.as_ref().unwrap(), chunk.x_pos, chunk.z_pos));

// let chunk = Handle::current().block_on(ZstdCodec::compress_data(chunk)).expect("Failed to compress chunk");
let chunk = chunk.clone();
let chunk = Handle::current().block_on(async {
ZstdCodec::compress_data(chunk).await.expect("Failed to compress chunk")
});
/*
ZstdCodec::compress_data(chunk).await.expect("Failed to compress chunk")
});*/

// Insert chunk
let res = database.put(&mut rw_tx, &key, chunk);
let res = database.put(&mut rw_tx, &key, chunk.as_slice());
rw_tx.commit()?;
// .map_err(|err| {
// Error::DatabaseError(format!("Unable to commit changes to database: {err}"))
Expand All @@ -152,20 +200,20 @@ impl Database {

/// Insert multiple chunks into database
/// TODO: Find better name/disambiguation
fn insert_chunks_into_database(db: &Env, chunks: &[Chunk]) -> Result<(), heed::Error> {
fn insert_chunks_into_database(db: &Env, chunks: &[SerializedChunk]) -> Result<(), heed::Error> {
// Initialize write transaction and open chunks table
let mut rw_tx = db.write_txn()?;
let database = db
.open_database::<U64<LE>, Zstd<Chunk>>(&rw_tx, Some("chunks"))?
.open_database::<U64<LE>, Bytes>(&rw_tx, Some("chunks"))?
.expect("No table \"chunks\" found. The database should have been initialized");

// Update page
for chunk in chunks {
// Calculate key
let key = hash((chunk.dimension.as_ref().unwrap(), chunk.x_pos, chunk.z_pos));
// let key = hash((chunk.dimension.as_ref().unwrap(), chunk.x_pos, chunk.z_pos));

// Insert chunk
database.put(&mut rw_tx, &key, chunk)?;
database.put(&mut rw_tx, &chunk.hash(), chunk.data())?;
}
// Commit changes
rw_tx.commit()?;
Expand Down Expand Up @@ -392,33 +440,38 @@ impl Database {
/// }
///
/// ```
pub async fn batch_insert(&self, values: Vec<Chunk>) -> Result<(), Error> {
pub async fn batch_insert(&self, values: Vec<SerializedChunk>) -> Result<(), Error> {
// Clone database pointer
let db = self.db.clone();
let tsk_db = self.db.clone();

// Calculate all keys
let keys = values
/* let keys = values
.iter()
.map(|v| hash((v.dimension.as_ref().unwrap_or_else(|| panic!("Invalid chunk @ ({},{})", v.x_pos, v.z_pos)), v.x_pos, v.z_pos)))
.collect::<Vec<u64>>();
*/
// let keys = values.iter().map(|v| v.hash()).collect::<Vec<u64>>();

// WARNING: The previous logic was to first insert in database and then insert in cache using load_into_cache fn.
// This has been modified to avoid having to query database while we already have the data available.
// First insert into cache

for (key, chunk) in keys.into_iter().zip(&values) {
// TODO: Renable cache. Currently disabled because we only get serialized bytes with the hash.
// to save in the database
/*for (chunk) in values.iter() {
let cache = self.cache.clone();
let db = self.db.clone();
let chunk = chunk.clone();
let key = chunk.hash();
let chunk = chunk.data().clone();
tokio::spawn(async move {
cache.insert(key, chunk).await;
if let Err(e) = Database::load_into_cache_standalone(db, cache, key).await {
warn!("Error inserting chunk into database: {:?}", e);
}
});
}

*/
// Then insert into persistent database
spawn_blocking_db(tsk_db, move || Self::insert_chunks_into_database(&db, &values))
.await
Expand Down
12 changes: 6 additions & 6 deletions src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use byteorder::LE;
use chunks::Zstd;
use deepsize::DeepSizeOf;
use futures::FutureExt;
use heed::types::U64;
use heed::types::{Bytes, U64};
use heed::{Env as LMDBDatabase, EnvFlags, EnvOpenOptions};
use moka::notification::{ListenerFuture, RemovalCause};
use rayon::{ThreadPool, ThreadPoolBuilder};
Expand Down Expand Up @@ -95,18 +95,18 @@ pub async fn start_database() -> Result<Database, Error> {
});

// Check if database is built. Otherwise, initialize it
let mut rw_tx = lmdb.write_txn().unwrap();
let mut rw_tx = lmdb.write_txn()?;
if lmdb
.open_database::<U64<LE>, Zstd<Chunk>>(&rw_tx, Some("chunks"))
.unwrap()
// .open_database::<U64<LE>, Zstd<Chunk>>(&rw_tx, Some("chunks"))
.open_database::<U64<LE>, Bytes>(&rw_tx, Some("chunks"))?
.is_none()
{
lmdb.create_database::<U64<LE>, Zstd<Chunk>>(&mut rw_tx, Some("chunks"))
lmdb.create_database::<U64<LE>, Bytes>(&mut rw_tx, Some("chunks"))
.expect("Unable to create database");
}
// `entities` table to be added, but needs the type to do so

rw_tx.commit().unwrap();
rw_tx.commit()?;

info!("Database started");

Expand Down
4 changes: 4 additions & 0 deletions src/utils/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ pub enum Error {

#[error("Database error: {0}")]
LmdbError(#[from] heed::Error),
#[error("(bincode) Encode error")]
BincodeEncodeError(#[from] bincode::error::EncodeError),
#[error("(bincode) Decode error")]
BincodeDecodeError(#[from] bincode::error::DecodeError),
}

impl From<Infallible> for Error {
Expand Down
78 changes: 67 additions & 11 deletions src/world/importing.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,41 @@
use crate::utils::prelude::*;
use fastanvil::{ChunkData, Region};
use indicatif::{ProgressBar, ProgressStyle};
use nbt_lib::NBTDeserializeBytes;
use nbt_lib::{NBTDeserializeBytes, NBTSerialize};
use rayon::prelude::*;
use std::env;
use std::fs::File;
use std::io::Cursor;
use std::path::PathBuf;
use std::sync::Arc;
use bincode::config::standard;
use bincode::Encode;
use tokio::runtime::Handle;
use tracing::{debug, info, warn};

use crate::database::chunks::ZstdCodec;
use crate::state::GlobalState;
use crate::utils::hash::hash;
use crate::world::chunk_format::Chunk;

const DEFAULT_BATCH_SIZE: u8 = 150;

/// A serialized chunk is a tuple of the chunk's hash and the compressed chunk data
/// (hash, compressed_chunk_data)
pub struct SerializedChunk(u64, Vec<u8>);

impl SerializedChunk {
pub fn new(hash: u64, data: Vec<u8>) -> Self {
Self(hash, data)
}
pub fn hash(&self) -> u64 {
self.0
}

pub fn data(&self) -> &Vec<u8> {
self.1.as_ref()
}
}

fn get_batch_size() -> i32 {
let batch_size = env::args()
.find(|x| x.starts_with("--batch_size="))
Expand Down Expand Up @@ -73,22 +94,31 @@ async fn get_total_chunks(dir: &PathBuf) -> Result<usize> {
Ok(regions.into_par_iter().map(|mut region| region.iter().count()).sum())
}

fn process_chunk(chunk_data: Vec<u8>, file_name: &str, bar: Arc<ProgressBar>) -> Result<Chunk> {
let mut final_chunk = Chunk::read_from_bytes(&mut Cursor::new(chunk_data))
async fn process_chunk(chunk_data: Vec<u8>, file_name: &str, bar: Arc<ProgressBar>) -> Result<SerializedChunk> {
let mut chunk = Chunk::read_from_bytes(&mut Cursor::new(chunk_data))
.map_err(|e| {
bar.abandon_with_message(format!("Chunk {} failed to import", file_name));
Error::Generic(format!("Could not read chunk {} {}", e, file_name))
})?;

final_chunk.convert_to_net_mode()
chunk.convert_to_net_mode()
.map_err(|e| {
bar.abandon_with_message(format!("Chunk {} {} failed to import", final_chunk.x_pos, final_chunk.z_pos));
Error::Generic(format!("Could not convert chunk {} {} to network mode: {}", final_chunk.x_pos, final_chunk.z_pos, e))
bar.abandon_with_message(format!("Chunk {} {} failed to import", chunk.x_pos, chunk.z_pos));
Error::Generic(format!("Could not convert chunk {} {} to network mode: {}", chunk.x_pos, chunk.z_pos, e))
})?;

final_chunk.dimension = Some("overworld".to_string());
chunk.dimension = Some("overworld".to_string());

Ok(final_chunk)
// let chunk_data = bincode::encode_to_vec(final_chunk, standard())?;
// let chunk_data = ZstdCodec::compress_data(final_chunk);
let hash = hash((chunk.dimension.as_ref().expect(format!("Invalid chunk @ ({},{})", chunk.x_pos, chunk.z_pos).as_str()), chunk.x_pos, chunk.z_pos));
/*let chunk_data = tokio::task::(async move {
ZstdCodec::compress_data(chunk).await.expect("Failed to compress chunk")
});*/
// let chunk_data = Handle::current().block_on(ZstdCodec::compress_data(chunk)).expect("Failed to compress chunk");
let chunk_data = ZstdCodec::compress_data(chunk).await.expect("Failed to compress chunk");

Ok(SerializedChunk::new(hash, chunk_data))
}

//noinspection RsBorrowChecker
Expand Down Expand Up @@ -122,7 +152,7 @@ pub async fn import_regions(state: GlobalState) -> Result<()> {
while !chunks.is_empty() {
let chunk_batch: Vec<ChunkData> = chunks.drain(..std::cmp::min(batch_size, chunks.len())).collect();

let processed_chunks: Vec<Chunk> = chunk_batch.into_par_iter()
/*let processed_chunks: Vec<SerializedChunk> = chunk_batch.into_par_iter()
.filter_map(|chunk| {
let data = chunk.data.clone();
match process_chunk(data, file_name, Arc::clone(&bar)) {
Expand All @@ -137,6 +167,32 @@ pub async fn import_regions(state: GlobalState) -> Result<()> {
}
}
})
.collect();*/

let processed_chunks_futures: Vec<_> = chunk_batch.into_iter()
.map(|chunk| {
let data = chunk.data.clone();
let bar_clone = Arc::clone(&bar);
let file_name = file_name.to_string();
tokio::spawn(async move {
match process_chunk(data, &file_name, Arc::clone(&bar_clone)).await {
Ok(processed) => {
bar_clone.inc(1);
Some(processed)
}
Err(e) => {
warn!("Failed to process chunk: {}. Skipping.", e);
None
}
}
})
})
.collect();

let processed_chunks: Vec<SerializedChunk> = futures::future::join_all(processed_chunks_futures)
.await
.into_iter()
.filter_map(|result| result.ok().flatten())
.collect();

// Insert the batch of processed chunks
Expand Down Expand Up @@ -193,7 +249,7 @@ fn create_progress_bar(total_chunks: usize) -> ProgressBar {
bar
}

async fn insert_chunks(state: &GlobalState, queued_chunks: Vec<Chunk>, bar: &ProgressBar) -> Result<()> {
async fn insert_chunks(state: &GlobalState, queued_chunks: Vec<SerializedChunk>, bar: &ProgressBar) -> Result<()> {
state.database.batch_insert(queued_chunks).await
.map_err(|e| {
bar.abandon_with_message("Chunk insertion failed".to_string());
Expand Down

0 comments on commit 4e2ac0b

Please sign in to comment.