diff --git a/src/database/chunks.rs b/src/database/chunks.rs index 1ba44fbc..c39960aa 100644 --- a/src/database/chunks.rs +++ b/src/database/chunks.rs @@ -21,29 +21,45 @@ impl Database { } /// Fetch chunk from database - fn get_chunk_from_database(db: &Env, key: &u64) -> Result, heed::Error> { - // Initialize read transaction and open chunks table - let ro_tx = db.read_txn()?; - let database = db - .open_database::, Bytes>(&ro_tx, Some("chunks"))? - .expect("No table \"chunks\" found. The database should have been initialized"); + async fn get_chunk_from_database(db: &Env, key: &u64) -> Result, heed::Error> { + let data = { + // Initialize read transaction and open chunks table + let ro_tx = db.read_txn()?; + let database = db + .open_database::, Bytes>(&ro_tx, Some("chunks"))? + .expect("No table \"chunks\" found. The database should have been initialized"); + + // Attempt to fetch chunk from table + let data = database.get(&ro_tx, key)?; + + + data.map(|data| data.to_vec()) + }; + + // Now, proceed with the async operation without holding `ro_tx` + if let Some(data) = data { + let chunk = ZstdCodec::decompress_data::(data).await.expect("Failed to decompress chunk"); + Ok(Some(chunk)) + } else { + Ok(None) + } + + + /* + + drop(ro_tx); - // Attempt to fetch chunk from table - let data = database.get(&ro_tx, key)?; let chunk = match data { Some(data) => { - // let chunk = ZstdCodec::decompress_data::(data.to_vec()).expect("Failed to decompress chunk"); - let chunk = Handle::current().block_on(async { - ZstdCodec::decompress_data::(data.to_vec()) - .await - .expect("Failed to decompress chunk") - }); + let chunk = ZstdCodec::decompress_data::(data) + .await + .expect("Failed to decompress chunk"); Some(chunk) } None => None, }; - Ok(chunk) + Ok(chunk)*/ } /// Insert a single chunk into database @@ -106,18 +122,20 @@ impl Database { cache: Arc>, key: u64, ) -> Result<(), Error> { - let tsk_db = db.clone(); + // let tsk_db = db.clone(); + let db = db.clone(); tokio::task::spawn(async move { + // Check cache if cache.contains_key(&key) { trace!("Chunk already exists in cache: {:X}", key); } // If not in cache then search in database - else if let Ok(chunk) = - spawn_blocking_db(tsk_db, move || Self::get_chunk_from_database(&db, &key)) + else if let Ok(chunk) = Self::get_chunk_from_database(&db, &key).await + /*spawn_blocking_db(tsk_db, move || Self::get_chunk_from_database(&db, &key)) .await - .unwrap() + .unwrap()*/ { if let Some(chunk) = chunk { cache.insert(key, chunk).await; @@ -202,7 +220,6 @@ impl Database { ) -> Result, Error> { // Calculate key of this chunk and clone database pointer let key = hash((dimension, x, z)); - let tsk_db = self.db.clone(); let db = self.db.clone(); // First check cache @@ -210,10 +227,11 @@ impl Database { Ok(self.cache.get(&key).await) } // Attempt to get chunk from persistent database - else if let Some(chunk) = + else if let Some(chunk) = Self::get_chunk_from_database(&db, &key).await? + /* spawn_blocking_db(tsk_db, move || Self::get_chunk_from_database(&db, &key)) .await - .unwrap()? + .unwrap()?*/ { self.cache.insert(key, chunk.clone()).await; Ok(Some(chunk)) @@ -245,7 +263,6 @@ impl Database { pub async fn chunk_exists(&self, x: i32, z: i32, dimension: String) -> Result { // Calculate key and copy database pointer let key = hash((dimension, x, z)); - let tsk_db = self.db.clone(); let db = self.db.clone(); // Check first cache @@ -253,13 +270,21 @@ impl Database { Ok(true) // Else check persistent database and load it into cache } else { - let res = spawn_blocking_db(tsk_db, move || Self::get_chunk_from_database(&db, &key)) + /*let res = spawn_blocking_db(tsk_db, move || Self::get_chunk_from_database(&db, &key)) .await - .unwrap(); + .unwrap();*/ + let Some(res) = Self::get_chunk_from_database(&db, &key).await? else { + return Ok(false); + }; // WARNING: The previous logic was to order the chunk to be loaded into cache whether it existed or not. // This has been replaced by directly loading the queried chunk into cache - match res { + + // Load chunk into cache + self.cache.insert(key, res.clone()).await; + Ok(true) + + /* match res { Ok(opt) => { let exist = opt.is_some(); if let Some(chunk) = opt { @@ -268,7 +293,7 @@ impl Database { Ok(exist) } Err(err) => Err(Error::LmdbError(err)), - } + }*/ } } diff --git a/src/database/encoding.rs b/src/database/encoding.rs index fe718511..3ddf3dc7 100644 --- a/src/database/encoding.rs +++ b/src/database/encoding.rs @@ -41,8 +41,10 @@ impl ZstdCodec { } pub async fn decompress_data(data: Vec) -> crate::Result { tokio::task::spawn_blocking(move || { - let decoded = bincode::decode_from_slice(data.as_slice(), standard())?; - Ok(decoded.0) + let mut decoder = zstd::Decoder::new(data.as_slice())?; + // let decoded = bincode::decode_from_slice(data.as_slice(), standard())?; + let decoded = bincode::decode_from_std_read(&mut decoder, standard())?; + Ok(decoded) }) .await? }