Skip to content

Commit

Permalink
Merge pull request #14 from fpco/perform-query-builder
Browse files Browse the repository at this point in the history
perform_query follows the builder pattern
  • Loading branch information
snoyberg authored Jun 6, 2024
2 parents e444684 + c6c91bd commit 65fcb65
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 48 deletions.
3 changes: 2 additions & 1 deletion packages/cosmos/src/authz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ impl Cosmos {
mut grants,
pagination: pag_res,
} = self
.perform_query(req, Action::QueryGranterGrants(granter.get_address()), true)
.perform_query(req, Action::QueryGranterGrants(granter.get_address()))
.run()
.await?
.into_inner();
println!("{grants:?}");
Expand Down
94 changes: 66 additions & 28 deletions packages/cosmos/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,24 @@ impl std::fmt::Debug for Cosmos {
}
}

pub(crate) struct PerformQueryBuilder<'a, Request> {
cosmos: &'a Cosmos,
req: Request,
action: Action,
should_retry: bool,
}

impl<Request: GrpcRequest> PerformQueryBuilder<'_, Request> {
pub(crate) async fn run(self) -> Result<PerformQueryWrapper<Request::Response>, QueryError> {
Cosmos::run_query(self).await
}

pub(crate) fn no_retry(mut self) -> Self {
self.should_retry = false;
self
}
}

pub(crate) struct PerformQueryWrapper<Res> {
pub(crate) grpc_url: Arc<String>,
pub(crate) tonic: tonic::Response<Res>,
Expand Down Expand Up @@ -292,26 +310,41 @@ impl Cosmos {
Ok(base_account)
}

pub(crate) async fn perform_query<Request: GrpcRequest>(
pub(crate) fn perform_query<Request: GrpcRequest>(
&self,
req: Request,
action: Action,
should_retry: bool,
) -> PerformQueryBuilder<Request> {
PerformQueryBuilder {
cosmos: self,
req,
action,
should_retry: true,
}
}

async fn run_query<Request: GrpcRequest>(
PerformQueryBuilder {
cosmos,
req,
action,
should_retry,
}: PerformQueryBuilder<'_, Request>,
) -> Result<PerformQueryWrapper<Request::Response>, QueryError> {
let mut attempt = 0;
loop {
let (err, can_retry, grpc_url) = match self.pool.get().await {
let (err, can_retry, grpc_url) = match cosmos.pool.get().await {
Err(err) => (
QueryErrorDetails::ConnectionError(err),
true,
self.get_cosmos_builder().grpc_url_arc().clone(),
cosmos.get_cosmos_builder().grpc_url_arc().clone(),
),
Ok(mut guard) => {
let cosmos_inner = guard.get_inner_mut();
if self.pool.builder.get_log_requests() {
if cosmos.pool.builder.get_log_requests() {
tracing::info!("{action}");
}
match self.perform_query_inner(req.clone(), cosmos_inner).await {
match cosmos.perform_query_inner(req.clone(), cosmos_inner).await {
Ok(x) => {
cosmos_inner.log_query_result(QueryResult::Success);
break Ok(PerformQueryWrapper {
Expand All @@ -333,20 +366,20 @@ impl Cosmos {
}
}
};
if attempt >= self.pool.builder.query_retries() || !should_retry || !can_retry {
if attempt >= cosmos.pool.builder.query_retries() || !should_retry || !can_retry {
break Err(QueryError {
action,
builder: self.pool.builder.clone(),
height: self.height,
builder: cosmos.pool.builder.clone(),
height: cosmos.height,
query: err,
grpc_url,
node_health: self.pool.node_chooser.health_report(),
node_health: cosmos.pool.node_chooser.health_report(),
});
} else {
attempt += 1;
tracing::debug!(
"Error performing a query, retrying. Attempt {attempt} of {}. {err:?}",
self.pool.builder.query_retries()
cosmos.pool.builder.query_retries()
);
}
}
Expand Down Expand Up @@ -554,7 +587,9 @@ impl CosmosBuilder {
let cosmos = self.build_lazy()?;

let resp = cosmos
.perform_query(GetLatestBlockRequest {}, Action::SanityCheck, false)
.perform_query(GetLatestBlockRequest {}, Action::SanityCheck)
.no_retry()
.run()
.await
.map_err(|source| BuilderError::SanityQueryFailed { source })?;

Expand Down Expand Up @@ -646,8 +681,8 @@ impl Cosmos {
address: address.get_address_string(),
},
action.clone(),
true,
)
.run()
.await?
.into_inner();

Expand Down Expand Up @@ -701,8 +736,8 @@ impl Cosmos {
pagination: pagination.take(),
},
Action::QueryAllBalances(address),
true,
)
.run()
.await?
.into_inner();
coins.append(&mut res.balances);
Expand All @@ -723,11 +758,8 @@ impl Cosmos {

pub(crate) async fn code_info(&self, code_id: u64) -> Result<Vec<u8>, crate::Error> {
let res = self
.perform_query(
QueryCodeRequest { code_id },
Action::CodeInfo(code_id),
true,
)
.perform_query(QueryCodeRequest { code_id }, Action::CodeInfo(code_id))
.run()
.await?;
Ok(res.into_inner().data)
}
Expand Down Expand Up @@ -772,8 +804,8 @@ impl Cosmos {
hash: txhash.clone(),
},
action.clone(),
true,
)
.run()
.await?
.into_inner();
Self::txres_to_pair(txres, action)
Expand All @@ -797,8 +829,8 @@ impl Cosmos {
hash: txhash.clone(),
},
action.clone(),
true,
)
.run()
.await;
match res {
Ok(txres) => Self::txres_to_pair(txres.into_inner(), action),
Expand Down Expand Up @@ -849,8 +881,9 @@ impl Cosmos {
action
.clone()
.unwrap_or_else(|| Action::WaitForTransaction(txhash.clone())),
false,
)
.no_retry()
.run()
.await;
match txres {
Ok(txres) => {
Expand Down Expand Up @@ -903,7 +936,8 @@ impl Cosmos {
page: page.unwrap_or(1),
limit: limit.unwrap_or(10),
};
self.perform_query(req, Action::ListTransactionsFor(address), true)
self.perform_query(req, Action::ListTransactionsFor(address))
.run()
.await
.map(|x| {
x.into_inner()
Expand Down Expand Up @@ -934,7 +968,8 @@ impl Cosmos {
pub async fn get_block_info(&self, height: i64) -> Result<BlockInfo, crate::Error> {
let action = Action::GetBlock(height);
let res = self
.perform_query(GetBlockByHeightRequest { height }, action.clone(), true)
.perform_query(GetBlockByHeightRequest { height }, action.clone())
.run()
.await?
.into_inner();
BlockInfo::new(action, res.block_id, res.sdk_block, res.block, Some(height))
Expand All @@ -947,7 +982,8 @@ impl Cosmos {
) -> Result<BlockInfo, crate::Error> {
let action = Action::GetBlock(height);
let res = self
.perform_query(GetBlockByHeightRequest { height }, action.clone(), true)
.perform_query(GetBlockByHeightRequest { height }, action.clone())
.run()
.await
.map(|x| x.into_inner());
match res {
Expand Down Expand Up @@ -997,7 +1033,8 @@ impl Cosmos {
pub async fn get_latest_block_info(&self) -> Result<BlockInfo, crate::Error> {
let action = Action::GetLatestBlock;
let res = self
.perform_query(GetLatestBlockRequest {}, action.clone(), true)
.perform_query(GetLatestBlockRequest {}, action.clone())
.run()
.await?
.into_inner();
BlockInfo::new(action, res.block_id, res.sdk_block, res.block, None)
Expand Down Expand Up @@ -1435,7 +1472,8 @@ impl TxBuilder {

let action = Action::Simulate(self.clone());
let simres = cosmos
.perform_query(simulate_req, action.clone(), true)
.perform_query(simulate_req, action.clone())
.run()
.await?
.into_inner();

Expand Down Expand Up @@ -1538,8 +1576,8 @@ impl TxBuilder {
mode: BroadcastMode::Sync as i32,
},
mk_action(),
true,
)
.run()
.await?;
let res = tonic.into_inner().tx_response.ok_or_else(|| {
crate::Error::InvalidChainResponse {
Expand Down
10 changes: 5 additions & 5 deletions packages/cosmos/src/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ impl Contract {
contract: self.address,
key: key.into(),
},
true,
)
.run()
.await?
.into_inner()
.data)
Expand Down Expand Up @@ -261,8 +261,8 @@ impl Contract {
contract: self.address,
message: msg.into(),
},
true,
)
.run()
.await?
.into_inner();
Ok(res.data)
Expand Down Expand Up @@ -294,8 +294,8 @@ impl Contract {
query_data: msg,
},
action.clone(),
true,
)
.run()
.await?
.into_inner();
serde_json::from_slice(&res.data).map_err(|source| crate::Error::JsonDeserialize {
Expand Down Expand Up @@ -341,8 +341,8 @@ impl Contract {
address: self.address.into(),
},
action.clone(),
true,
)
.run()
.await?
.into_inner()
.contract_info
Expand All @@ -362,8 +362,8 @@ impl Contract {
pagination: None,
},
Action::ContractHistory(self.address),
true,
)
.run()
.await?
.into_inner())
}
Expand Down
22 changes: 8 additions & 14 deletions packages/cosmos/src/osmosis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,20 @@ impl Cosmos {
///
/// Note that this query will fail if called on chains besides Osmosis Mainnet.
pub async fn get_osmosis_epoch_info(&self) -> Result<EpochsInfo, QueryError> {
self.perform_query(
epochs::QueryEpochsInfoRequest {},
Action::OsmosisEpochsInfo,
true,
)
.await
.map(|res| EpochsInfo {
epochs: res.into_inner().epochs,
})
self.perform_query(epochs::QueryEpochsInfoRequest {}, Action::OsmosisEpochsInfo)
.run()
.await
.map(|res| EpochsInfo {
epochs: res.into_inner().epochs,
})
}
/// Get the Osmosis txfees information.
///
/// Note that this query will fail if called on chains besides Osmosis Mainnet.
pub async fn get_osmosis_txfees_info(&self) -> Result<TxFeesInfo, Error> {
let eip_base_fee = self
.perform_query(
txfees::QueryEipBaseFeeRequest {},
Action::OsmosisTxFeesInfo,
true,
)
.perform_query(txfees::QueryEipBaseFeeRequest {}, Action::OsmosisTxFeesInfo)
.run()
.await
.map(|res| res.into_inner())?;

Expand Down

0 comments on commit 65fcb65

Please sign in to comment.