Skip to content

Commit

Permalink
[kv store] bigtable: add support for reversed scans
Browse files Browse the repository at this point in the history
  • Loading branch information
phoenix-o committed Oct 31, 2024
1 parent c92dd1b commit b0876fa
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 2 deletions.
52 changes: 50 additions & 2 deletions crates/sui-kvstore/src/bigtable/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use crate::bigtable::proto::bigtable::v2::bigtable_client::BigtableClient as Big
use crate::bigtable::proto::bigtable::v2::mutate_rows_request::Entry;
use crate::bigtable::proto::bigtable::v2::mutation::SetCell;
use crate::bigtable::proto::bigtable::v2::read_rows_response::cell_chunk::RowStatus;
use crate::bigtable::proto::bigtable::v2::row_range::EndKey;
use crate::bigtable::proto::bigtable::v2::{
mutation, MutateRowsRequest, MutateRowsResponse, Mutation, ReadRowsRequest, RowSet,
mutation, MutateRowsRequest, MutateRowsResponse, Mutation, ReadRowsRequest, RowRange, RowSet,
};
use crate::{Checkpoint, KeyValueStoreReader, KeyValueStoreWriter, TransactionData};
use anyhow::{anyhow, Result};
Expand All @@ -18,8 +19,8 @@ use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};
use std::time::Duration;
use sui_types::base_types::TransactionDigest;
use sui_types::digests::CheckpointDigest;
use sui_types::base_types::{ObjectID, TransactionDigest};
use sui_types::full_checkpoint_content::CheckpointData;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
use sui_types::object::Object;
Expand Down Expand Up @@ -234,6 +235,28 @@ impl KeyValueStoreReader for BigTableClient {
}
Ok(None)
}

async fn get_latest_checkpoint(&mut self) -> Result<CheckpointSequenceNumber> {
let upper_limit = u64::MAX.to_be_bytes().to_vec();
match self
.reversed_scan(CHECKPOINTS_TABLE, upper_limit)
.await?
.pop()
{
Some((key_bytes, _)) => Ok(u64::from_be_bytes(key_bytes.as_slice().try_into()?)),
None => Ok(0),
}
}

async fn get_latest_object(&mut self, object_id: &ObjectID) -> Result<Option<Object>> {
let upper_limit = Self::raw_object_key(&ObjectKey::max_for_id(object_id))?;
if let Some((_, row)) = self.reversed_scan(OBJECTS_TABLE, upper_limit).await?.pop() {
if let Some((_, value)) = row.into_iter().next() {
return Ok(Some(bcs::from_bytes(&value)?));
}
}
Ok(None)
}
}

impl BigTableClient {
Expand Down Expand Up @@ -407,6 +430,28 @@ impl BigTableClient {
Ok(result)
}

async fn reversed_scan(
&mut self,
table_name: &str,
upper_limit: Bytes,
) -> Result<Vec<(Bytes, Vec<(Bytes, Bytes)>)>> {
let range = RowRange {
start_key: None,
end_key: Some(EndKey::EndKeyClosed(upper_limit)),
};
let request = ReadRowsRequest {
table_name: format!("{}{}", self.table_prefix, table_name),
rows_limit: 1,
rows: Some(RowSet {
row_keys: vec![],
row_ranges: vec![range],
}),
reversed: true,
..ReadRowsRequest::default()
};
self.read_rows(request).await
}

fn raw_object_key(object_key: &ObjectKey) -> Result<Vec<u8>> {
let mut raw_key = object_key.0.to_vec();
raw_key.extend(object_key.1.value().to_be_bytes());
Expand Down Expand Up @@ -457,6 +502,9 @@ impl Service<Request<BoxBody>> for AuthChannel {
HeaderValue::from_str(format!("Bearer {}", token_string.as_str()).as_str())?;
request.headers_mut().insert("authorization", header);
}
// enable reverse scan
let header = HeaderValue::from_static("CAE=");
request.headers_mut().insert("bigtable-features", header);
Ok(inner.call(request).await?)
})
}
Expand Down
3 changes: 3 additions & 0 deletions crates/sui-kvstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use anyhow::Result;
use async_trait::async_trait;
pub use bigtable::client::BigTableClient;
pub use bigtable::worker::KvWorker;
use sui_types::base_types::ObjectID;
use sui_types::crypto::AuthorityStrongQuorumSignInfo;
use sui_types::digests::{CheckpointDigest, TransactionDigest};
use sui_types::effects::{TransactionEffects, TransactionEvents};
Expand All @@ -31,6 +32,8 @@ pub trait KeyValueStoreReader {
&mut self,
digest: CheckpointDigest,
) -> Result<Option<Checkpoint>>;
async fn get_latest_checkpoint(&mut self) -> Result<CheckpointSequenceNumber>;
async fn get_latest_object(&mut self, object_id: &ObjectID) -> Result<Option<Object>>;
}

#[async_trait]
Expand Down

0 comments on commit b0876fa

Please sign in to comment.