Skip to content

Commit

Permalink
refine
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jan 15, 2025
1 parent 9ad0c6b commit 22e2ad9
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 87 deletions.
54 changes: 26 additions & 28 deletions src/storage/src/hummock/store/local_hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,39 +302,37 @@ impl StateStoreRead for LocalHummockFlushedSnapshotReader {
}

impl StateStoreGet for LocalHummockStorage {
fn on_key_value<O: Send + 'static>(
async fn on_key_value<O: Send + 'static>(
&self,
key: TableKey<Bytes>,
read_options: ReadOptions,
on_key_value_fn: impl KeyValueFn<O>,
) -> impl StorageFuture<'_, Option<O>> {
async move {
assert_eq!(self.table_id, read_options.table_id);
match self.mem_table.buffer.get(&key) {
None => {
LocalHummockFlushedSnapshotReader::get_flushed(
&self.hummock_version_reader,
&self.read_version,
key,
read_options,
on_key_value_fn,
)
.await
}
Some(op) => match op {
KeyOp::Insert(value) | KeyOp::Update((_, value)) => Ok({
Some(on_key_value_fn(
FullKey::new_with_gap_epoch(
self.table_id,
key.to_ref(),
self.current_epoch_with_gap(),
),
value.as_ref(),
)?)
}),
KeyOp::Delete(_) => Ok(None),
},
) -> StorageResult<Option<O>> {
assert_eq!(self.table_id, read_options.table_id);
match self.mem_table.buffer.get(&key) {
None => {
LocalHummockFlushedSnapshotReader::get_flushed(
&self.hummock_version_reader,
&self.read_version,
key,
read_options,
on_key_value_fn,
)
.await
}
Some(op) => match op {
KeyOp::Insert(value) | KeyOp::Update((_, value)) => Ok({
Some(on_key_value_fn(
FullKey::new_with_gap_epoch(
self.table_id,
key.to_ref(),
self.current_epoch_with_gap(),
),
value.as_ref(),
)?)
}),
KeyOp::Delete(_) => Ok(None),
},
}
}
}
Expand Down
91 changes: 51 additions & 40 deletions src/storage/src/hummock/store/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,17 +557,19 @@ impl HummockVersionReader {
) {
return Ok(if data_epoch.pure_epoch() < min_epoch {
None
} else if let Some(value) = data.into_user_value() {
Some(on_key_value_fn(
FullKey::new_with_gap_epoch(
read_options.table_id,
table_key.to_ref(),
data_epoch,
),
value.as_ref(),
)?)
} else {
None
data.into_user_value()
.map(|v| {
on_key_value_fn(
FullKey::new_with_gap_epoch(
read_options.table_id,
table_key.to_ref(),
data_epoch,
),
v.as_ref(),
)
})
.transpose()?
});
}
}
Expand Down Expand Up @@ -600,17 +602,20 @@ impl HummockVersionReader {
let data_epoch = iter.key().epoch_with_gap;
return Ok(if data_epoch.pure_epoch() < min_epoch {
None
} else if let Some(value) = iter.value().into_user_value() {
Some(on_key_value_fn(
FullKey::new_with_gap_epoch(
read_options.table_id,
table_key.to_ref(),
data_epoch,
),
value,
)?)
} else {
None
iter.value()
.into_user_value()
.map(|v| {
on_key_value_fn(
FullKey::new_with_gap_epoch(
read_options.table_id,
table_key.to_ref(),
data_epoch,
),
v,
)
})
.transpose()?
});
}
}
Expand Down Expand Up @@ -647,17 +652,20 @@ impl HummockVersionReader {
let data_epoch = iter.key().epoch_with_gap;
return Ok(if data_epoch.pure_epoch() < min_epoch {
None
} else if let Some(value) = iter.value().into_user_value() {
Some(on_key_value_fn(
FullKey::new_with_gap_epoch(
read_options.table_id,
table_key.to_ref(),
data_epoch,
),
value,
)?)
} else {
None
iter.value()
.into_user_value()
.map(|v| {
on_key_value_fn(
FullKey::new_with_gap_epoch(
read_options.table_id,
table_key.to_ref(),
data_epoch,
),
v,
)
})
.transpose()?
});
}
}
Expand Down Expand Up @@ -693,17 +701,20 @@ impl HummockVersionReader {
let data_epoch = iter.key().epoch_with_gap;
return Ok(if data_epoch.pure_epoch() < min_epoch {
None
} else if let Some(value) = iter.value().into_user_value() {
Some(on_key_value_fn(
FullKey::new_with_gap_epoch(
read_options.table_id,
table_key.to_ref(),
data_epoch,
),
value,
)?)
} else {
None
iter.value()
.into_user_value()
.map(|v| {
on_key_value_fn(
FullKey::new_with_gap_epoch(
read_options.table_id,
table_key.to_ref(),
data_epoch,
),
v,
)
})
.transpose()?
});
}
}
Expand Down
36 changes: 17 additions & 19 deletions src/storage/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,29 +885,27 @@ impl<R: RangeKv> RangeKvLocalStateStore<R> {
}

impl<R: RangeKv> StateStoreGet for RangeKvLocalStateStore<R> {
fn on_key_value<O: Send + 'static>(
async fn on_key_value<O: Send + 'static>(
&self,
key: TableKey<Bytes>,
_read_options: ReadOptions,
on_key_value_fn: impl KeyValueFn<O>,
) -> impl StorageFuture<'_, Option<O>> {
async move {
if let Some((key, value)) = match self.mem_table.buffer.get(&key) {
None => self
.inner
.get_keyed_row_impl(key, self.epoch(), self.table_id)?,
Some(op) => match op {
KeyOp::Insert(value) | KeyOp::Update((_, value)) => Some((
FullKey::new(self.table_id, key, self.epoch()),
value.clone(),
)),
KeyOp::Delete(_) => None,
},
} {
Ok(Some(on_key_value_fn(key.to_ref(), value.as_ref())?))
} else {
Ok(None)
}
) -> StorageResult<Option<O>> {
if let Some((key, value)) = match self.mem_table.buffer.get(&key) {
None => self
.inner
.get_keyed_row_impl(key, self.epoch(), self.table_id)?,
Some(op) => match op {
KeyOp::Insert(value) | KeyOp::Update((_, value)) => Some((
FullKey::new(self.table_id, key, self.epoch()),
value.clone(),
)),
KeyOp::Delete(_) => None,
},
} {
Ok(Some(on_key_value_fn(key.to_ref(), value.as_ref())?))
} else {
Ok(None)
}
}
}
Expand Down

0 comments on commit 22e2ad9

Please sign in to comment.