Skip to content

Commit

Permalink
Fixed problems with chunk loading.
Browse files Browse the repository at this point in the history
  • Loading branch information
Sweattypalms committed Sep 13, 2024
1 parent 4e61002 commit e7dd061
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 29 deletions.
79 changes: 52 additions & 27 deletions src/database/chunks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,45 @@ impl Database {
}

/// Fetch chunk from database
fn get_chunk_from_database(db: &Env, key: &u64) -> Result<Option<Chunk>, heed::Error> {
// Initialize read transaction and open chunks table
let ro_tx = db.read_txn()?;
let database = db
.open_database::<U64<LE>, Bytes>(&ro_tx, Some("chunks"))?
.expect("No table \"chunks\" found. The database should have been initialized");
async fn get_chunk_from_database(db: &Env, key: &u64) -> Result<Option<Chunk>, heed::Error> {
let data = {
// Initialize read transaction and open chunks table
let ro_tx = db.read_txn()?;
let database = db
.open_database::<U64<LE>, Bytes>(&ro_tx, Some("chunks"))?
.expect("No table \"chunks\" found. The database should have been initialized");

// Attempt to fetch chunk from table
let data = database.get(&ro_tx, key)?;


data.map(|data| data.to_vec())
};

// Now, proceed with the async operation without holding `ro_tx`
if let Some(data) = data {
let chunk = ZstdCodec::decompress_data::<Chunk>(data).await.expect("Failed to decompress chunk");
Ok(Some(chunk))
} else {
Ok(None)
}


/*
drop(ro_tx);
// Attempt to fetch chunk from table
let data = database.get(&ro_tx, key)?;
let chunk = match data {
Some(data) => {
// let chunk = ZstdCodec::decompress_data::<Chunk>(data.to_vec()).expect("Failed to decompress chunk");
let chunk = Handle::current().block_on(async {
ZstdCodec::decompress_data::<Chunk>(data.to_vec())
.await
.expect("Failed to decompress chunk")
});
let chunk = ZstdCodec::decompress_data::<Chunk>(data)
.await
.expect("Failed to decompress chunk");
Some(chunk)
}
None => None,
};
Ok(chunk)
Ok(chunk)*/
}

/// Insert a single chunk into database
Expand Down Expand Up @@ -106,18 +122,20 @@ impl Database {
cache: Arc<Cache<u64, Chunk>>,
key: u64,
) -> Result<(), Error> {
let tsk_db = db.clone();
// let tsk_db = db.clone();

let db = db.clone();
tokio::task::spawn(async move {

// Check cache
if cache.contains_key(&key) {
trace!("Chunk already exists in cache: {:X}", key);
}
// If not in cache then search in database
else if let Ok(chunk) =
spawn_blocking_db(tsk_db, move || Self::get_chunk_from_database(&db, &key))
else if let Ok(chunk) = Self::get_chunk_from_database(&db, &key).await
/*spawn_blocking_db(tsk_db, move || Self::get_chunk_from_database(&db, &key))
.await
.unwrap()
.unwrap()*/
{
if let Some(chunk) = chunk {
cache.insert(key, chunk).await;
Expand Down Expand Up @@ -202,18 +220,18 @@ impl Database {
) -> Result<Option<Chunk>, Error> {
// Calculate key of this chunk and clone database pointer
let key = hash((dimension, x, z));
let tsk_db = self.db.clone();
let db = self.db.clone();

// First check cache
if self.cache.contains_key(&key) {
Ok(self.cache.get(&key).await)
}
// Attempt to get chunk from persistent database
else if let Some(chunk) =
else if let Some(chunk) = Self::get_chunk_from_database(&db, &key).await?
/*
spawn_blocking_db(tsk_db, move || Self::get_chunk_from_database(&db, &key))
.await
.unwrap()?
.unwrap()?*/
{
self.cache.insert(key, chunk.clone()).await;
Ok(Some(chunk))
Expand Down Expand Up @@ -245,21 +263,28 @@ impl Database {
pub async fn chunk_exists(&self, x: i32, z: i32, dimension: String) -> Result<bool, Error> {
// Calculate key and copy database pointer
let key = hash((dimension, x, z));
let tsk_db = self.db.clone();
let db = self.db.clone();

// Check first cache
if self.cache.contains_key(&key) {
Ok(true)
// Else check persistent database and load it into cache
} else {
let res = spawn_blocking_db(tsk_db, move || Self::get_chunk_from_database(&db, &key))
/*let res = spawn_blocking_db(tsk_db, move || Self::get_chunk_from_database(&db, &key))
.await
.unwrap();
.unwrap();*/
let Some(res) = Self::get_chunk_from_database(&db, &key).await? else {
return Ok(false);
};

// WARNING: The previous logic was to order the chunk to be loaded into cache whether it existed or not.
// This has been replaced by directly loading the queried chunk into cache
match res {

// Load chunk into cache
self.cache.insert(key, res.clone()).await;
Ok(true)

/* match res {
Ok(opt) => {
let exist = opt.is_some();
if let Some(chunk) = opt {
Expand All @@ -268,7 +293,7 @@ impl Database {
Ok(exist)
}
Err(err) => Err(Error::LmdbError(err)),
}
}*/
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/database/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ impl ZstdCodec {
}
pub async fn decompress_data<T: Decode + Send + 'static>(data: Vec<u8>) -> crate::Result<T> {
tokio::task::spawn_blocking(move || {
let decoded = bincode::decode_from_slice(data.as_slice(), standard())?;
Ok(decoded.0)
let mut decoder = zstd::Decoder::new(data.as_slice())?;
// let decoded = bincode::decode_from_slice(data.as_slice(), standard())?;
let decoded = bincode::decode_from_std_read(&mut decoder, standard())?;
Ok(decoded)
})
.await?
}
Expand Down

0 comments on commit e7dd061

Please sign in to comment.