Skip to content

Commit

Permalink
bigtable: add counters
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed Aug 8, 2024
1 parent 1d3a02d commit 6916960
Show file tree
Hide file tree
Showing 2 changed files with 227 additions and 0 deletions.
53 changes: 53 additions & 0 deletions storage-bigtable/src/bigtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use {
backoff::{future::retry, Error as BackoffError, ExponentialBackoff},
log::*,
std::{
collections::HashMap,
str::FromStr,
time::{Duration, Instant},
},
Expand Down Expand Up @@ -781,6 +782,31 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
.collect())
}

pub async fn get_bincode_cells2<T>(
&mut self,
table: &str,
keys: &[RowKey],
) -> Result<(HashMap<RowKey, Result<T>>, usize)>
where
T: serde::de::DeserializeOwned,
{
let mut size = 0;
let rows = self
.get_multi_row_data(table, keys)
.await?
.into_iter()
.map(|(key, row_data)| {
size += row_data.len();
let key_str = key.to_string();
(
key,
deserialize_bincode_cell_data(&row_data, table, key_str),
)
})
.collect();
Ok((rows, size))
}

pub async fn get_protobuf_cell<P>(&mut self, table: &str, key: RowKey) -> Result<P>
where
P: prost::Message + Default,
Expand Down Expand Up @@ -827,6 +853,33 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
}))
}

pub async fn get_protobuf_or_bincode_cells2<'a, B, P>(
&mut self,
table: &'a str,
row_keys: impl IntoIterator<Item = RowKey>,
) -> Result<impl Iterator<Item = (RowKey, CellData<B, P>, usize)> + 'a>
where
B: serde::de::DeserializeOwned,
P: prost::Message + Default,
{
Ok(self
.get_multi_row_data(
table,
row_keys.into_iter().collect::<Vec<RowKey>>().as_slice(),
)
.await?
.into_iter()
.map(|(key, row_data)| {
let size = row_data.iter().fold(0, |acc, row| acc + row.1.len());
let key_str = key.to_string();
(
key,
deserialize_protobuf_or_bincode_cell_data(&row_data, table, key_str).unwrap(),
size,
)
}))
}

pub async fn put_bincode_cells<T>(
&mut self,
table: &str,
Expand Down
174 changes: 174 additions & 0 deletions storage-bigtable/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ impl std::convert::From<std::io::Error> for Error {
}
}

impl Error {
pub fn is_rpc_unauthenticated(&self) -> bool {
if let Error::BigTableError(bigtable::Error::Rpc(status)) = self {
status.code() == tonic::Code::Unauthenticated
} else {
false
}
}
}

pub type Result<T> = std::result::Result<T, Error>;

// Convert a slot to its bucket representation whereby lower slots are always lexically ordered
Expand Down Expand Up @@ -764,6 +774,170 @@ impl LedgerStorage {
}
}

// Fetches and gets a vector of confirmed blocks via a multirow fetch
pub async fn get_confirmed_blocks_with_data2<'a>(
&self,
slots: &'a [Slot],
) -> Result<impl Iterator<Item = (Option<(Slot, ConfirmedBlock)>, usize)> + 'a> {
debug!(
"LedgerStorage::get_confirmed_blocks_with_data request received: {:?}",
slots
);
inc_new_counter_debug!("storage-bigtable-query", 1);
let mut bigtable = self.connection.client();
let row_keys = slots.iter().copied().map(slot_to_blocks_key);
Ok(bigtable
.get_protobuf_or_bincode_cells2("blocks", row_keys)
.await?
.map(
|(row_key, block_cell_data, size): (
RowKey,
bigtable::CellData<StoredConfirmedBlock, generated::ConfirmedBlock>,
usize,
)| {
let block = match block_cell_data {
bigtable::CellData::Bincode(block) => block.into(),
bigtable::CellData::Protobuf(block) => match block.try_into() {
Ok(block) => block,
Err(_) => return (None, size),
},
};
(Some((key_to_slot(&row_key).unwrap(), block)), size)
},
))
}

/// Fetch blocks and transactions
pub async fn get_confirmed_blocks_transactions(
&self,
blocks: &[Slot],
transactions: &[String],
transactions_status: &[String],
) -> Result<(
Vec<(Slot, ConfirmedBlock)>,
Vec<ConfirmedTransactionWithStatusMeta>,
HashMap<String, TransactionStatus>,
usize,
)> {
let mut bigtable = self.connection.client();

let mut blocks_resp = Vec::with_capacity(blocks.len());
let mut transactions_resp = Vec::with_capacity(transactions.len());
let mut transactions_status_resp = HashMap::new();
let mut size = 0;

// Collect slots for request
let mut blocks_map: HashMap<Slot, Vec<(u32, String)>> = HashMap::new();
for block in blocks {
blocks_map.entry(*block).or_default();
}

// Fetch transactions info and collect slots
if !transactions.is_empty() || !transactions_status.is_empty() {
let mut keys = Vec::with_capacity(transactions.len() + transactions_status.len());
keys.extend(transactions.iter().cloned());
keys.extend(transactions_status.iter().cloned());

let (mut cells, bt_size) = bigtable
.get_bincode_cells2::<TransactionInfo>("tx", keys.as_slice())
.await?;
size += bt_size;

for signature in transactions_status {
if let Some(Ok(info)) = cells.get(signature) {
transactions_status_resp.insert(
signature.clone(),
TransactionStatus {
slot: info.slot,
confirmations: None,
status: match &info.err {
Some(err) => Err(err.clone()),
None => Ok(()),
},
err: info.err.clone(),
confirmation_status: Some(TransactionConfirmationStatus::Finalized),
},
);
}
}
for signature in transactions {
if let Some((signature, Ok(TransactionInfo { slot, index, .. }))) =
cells.remove_entry(signature)
{
blocks_map.entry(slot).or_default().push((index, signature));
}
}
}

// Fetch blocks
if !blocks_map.is_empty() {
let keys = blocks_map.keys().copied().collect::<Vec<_>>();
let cells = self.get_confirmed_blocks_with_data2(&keys).await?;
for (maybe_slot_block, row_size) in cells {
size += row_size;
if let Some((slot, block)) = maybe_slot_block {
if let Some(entries) = blocks_map.get(&slot) {
for (index, signature) in entries.iter() {
if let Some(tx_with_meta) = block.transactions.get(*index as usize) {
if tx_with_meta.transaction_signature().to_string() != *signature {
warn!(
"Transaction info or confirmed block for {} is corrupt",
signature
);
} else {
transactions_resp.push(ConfirmedTransactionWithStatusMeta {
slot,
tx_with_meta: tx_with_meta.clone(),
block_time: block.block_time,
});
}
}
}
blocks_resp.push((slot, block));
}
}
}
}

Ok((
blocks_resp,
transactions_resp,
transactions_status_resp,
size,
))
}

/// Fetch TX index for transactions
pub async fn get_txindex(
&self,
transactions: &[String],
) -> Result<(Vec<Option<(Slot, u32)>>, usize)> {
let mut bigtable = self.connection.client();

let mut response = Vec::with_capacity(transactions.len());
let mut size = 0;

// Fetch transactions info and collect slots
if transactions.is_empty() {
response.resize(transactions.len(), None);
} else {
let (cells, bt_size) = bigtable
.get_bincode_cells2::<TransactionInfo>("tx", transactions)
.await?;
size += bt_size;

for signature in transactions {
if let Some(Ok(TransactionInfo { slot, index, .. })) = cells.get(signature) {
response.push(Some((*slot, *index)));
} else {
response.push(None);
}
}
}

Ok((response, size))
}

/// Get confirmed signatures for the provided address, in descending ledger order
///
/// address: address to search for
Expand Down

0 comments on commit 6916960

Please sign in to comment.