From 22e2ad9926b133b612f9b688bca9512dad35feba Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 15 Jan 2025 16:32:02 +0800 Subject: [PATCH] refine --- .../hummock/store/local_hummock_storage.rs | 54 ++++++----- src/storage/src/hummock/store/version.rs | 91 +++++++++++-------- src/storage/src/memory.rs | 36 ++++---- 3 files changed, 94 insertions(+), 87 deletions(-) diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index 770aaebe9c97f..17a9314eced61 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -302,39 +302,37 @@ impl StateStoreRead for LocalHummockFlushedSnapshotReader { } impl StateStoreGet for LocalHummockStorage { - fn on_key_value( + async fn on_key_value( &self, key: TableKey, read_options: ReadOptions, on_key_value_fn: impl KeyValueFn, - ) -> impl StorageFuture<'_, Option> { - async move { - assert_eq!(self.table_id, read_options.table_id); - match self.mem_table.buffer.get(&key) { - None => { - LocalHummockFlushedSnapshotReader::get_flushed( - &self.hummock_version_reader, - &self.read_version, - key, - read_options, - on_key_value_fn, - ) - .await - } - Some(op) => match op { - KeyOp::Insert(value) | KeyOp::Update((_, value)) => Ok({ - Some(on_key_value_fn( - FullKey::new_with_gap_epoch( - self.table_id, - key.to_ref(), - self.current_epoch_with_gap(), - ), - value.as_ref(), - )?) - }), - KeyOp::Delete(_) => Ok(None), - }, + ) -> StorageResult> { + assert_eq!(self.table_id, read_options.table_id); + match self.mem_table.buffer.get(&key) { + None => { + LocalHummockFlushedSnapshotReader::get_flushed( + &self.hummock_version_reader, + &self.read_version, + key, + read_options, + on_key_value_fn, + ) + .await } + Some(op) => match op { + KeyOp::Insert(value) | KeyOp::Update((_, value)) => Ok({ + Some(on_key_value_fn( + FullKey::new_with_gap_epoch( + self.table_id, + key.to_ref(), + self.current_epoch_with_gap(), + ), + value.as_ref(), + )?) + }), + KeyOp::Delete(_) => Ok(None), + }, } } } diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 2ae748d9e9e92..0f13fd47820aa 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -557,17 +557,19 @@ impl HummockVersionReader { ) { return Ok(if data_epoch.pure_epoch() < min_epoch { None - } else if let Some(value) = data.into_user_value() { - Some(on_key_value_fn( - FullKey::new_with_gap_epoch( - read_options.table_id, - table_key.to_ref(), - data_epoch, - ), - value.as_ref(), - )?) } else { - None + data.into_user_value() + .map(|v| { + on_key_value_fn( + FullKey::new_with_gap_epoch( + read_options.table_id, + table_key.to_ref(), + data_epoch, + ), + v.as_ref(), + ) + }) + .transpose()? }); } } @@ -600,17 +602,20 @@ impl HummockVersionReader { let data_epoch = iter.key().epoch_with_gap; return Ok(if data_epoch.pure_epoch() < min_epoch { None - } else if let Some(value) = iter.value().into_user_value() { - Some(on_key_value_fn( - FullKey::new_with_gap_epoch( - read_options.table_id, - table_key.to_ref(), - data_epoch, - ), - value, - )?) } else { - None + iter.value() + .into_user_value() + .map(|v| { + on_key_value_fn( + FullKey::new_with_gap_epoch( + read_options.table_id, + table_key.to_ref(), + data_epoch, + ), + v, + ) + }) + .transpose()? }); } } @@ -647,17 +652,20 @@ impl HummockVersionReader { let data_epoch = iter.key().epoch_with_gap; return Ok(if data_epoch.pure_epoch() < min_epoch { None - } else if let Some(value) = iter.value().into_user_value() { - Some(on_key_value_fn( - FullKey::new_with_gap_epoch( - read_options.table_id, - table_key.to_ref(), - data_epoch, - ), - value, - )?) } else { - None + iter.value() + .into_user_value() + .map(|v| { + on_key_value_fn( + FullKey::new_with_gap_epoch( + read_options.table_id, + table_key.to_ref(), + data_epoch, + ), + v, + ) + }) + .transpose()? }); } } @@ -693,17 +701,20 @@ impl HummockVersionReader { let data_epoch = iter.key().epoch_with_gap; return Ok(if data_epoch.pure_epoch() < min_epoch { None - } else if let Some(value) = iter.value().into_user_value() { - Some(on_key_value_fn( - FullKey::new_with_gap_epoch( - read_options.table_id, - table_key.to_ref(), - data_epoch, - ), - value, - )?) } else { - None + iter.value() + .into_user_value() + .map(|v| { + on_key_value_fn( + FullKey::new_with_gap_epoch( + read_options.table_id, + table_key.to_ref(), + data_epoch, + ), + v, + ) + }) + .transpose()? }); } } diff --git a/src/storage/src/memory.rs b/src/storage/src/memory.rs index e5060856597b0..47efd4286d9f2 100644 --- a/src/storage/src/memory.rs +++ b/src/storage/src/memory.rs @@ -885,29 +885,27 @@ impl RangeKvLocalStateStore { } impl StateStoreGet for RangeKvLocalStateStore { - fn on_key_value( + async fn on_key_value( &self, key: TableKey, _read_options: ReadOptions, on_key_value_fn: impl KeyValueFn, - ) -> impl StorageFuture<'_, Option> { - async move { - if let Some((key, value)) = match self.mem_table.buffer.get(&key) { - None => self - .inner - .get_keyed_row_impl(key, self.epoch(), self.table_id)?, - Some(op) => match op { - KeyOp::Insert(value) | KeyOp::Update((_, value)) => Some(( - FullKey::new(self.table_id, key, self.epoch()), - value.clone(), - )), - KeyOp::Delete(_) => None, - }, - } { - Ok(Some(on_key_value_fn(key.to_ref(), value.as_ref())?)) - } else { - Ok(None) - } + ) -> StorageResult> { + if let Some((key, value)) = match self.mem_table.buffer.get(&key) { + None => self + .inner + .get_keyed_row_impl(key, self.epoch(), self.table_id)?, + Some(op) => match op { + KeyOp::Insert(value) | KeyOp::Update((_, value)) => Some(( + FullKey::new(self.table_id, key, self.epoch()), + value.clone(), + )), + KeyOp::Delete(_) => None, + }, + } { + Ok(Some(on_key_value_fn(key.to_ref(), value.as_ref())?)) + } else { + Ok(None) } } }