From 996a1eee1396c03a2c7a213d2a7586b31a697f5e Mon Sep 17 00:00:00 2001 From: Andrew Wygle Date: Tue, 11 Apr 2023 22:15:39 -0700 Subject: [PATCH] Implement rate limiting for Key Image requests in Fog Ledger Router --- Cargo.lock | 81 +++++++++- fog/ledger/connection/src/lib.rs | 2 +- fog/ledger/connection/src/router_client.rs | 21 ++- fog/ledger/server/Cargo.toml | 1 + fog/ledger/server/src/config.rs | 13 +- fog/ledger/server/src/router_handlers.rs | 49 ++++-- fog/ledger/server/src/router_server.rs | 2 + fog/ledger/server/src/router_service.rs | 13 ++ fog/ledger/server/tests/router_connection.rs | 12 +- fog/ledger/server/tests/router_integration.rs | 146 +++++++++++++++++- util/grpc/src/lib.rs | 16 ++ util/parse/src/lib.rs | 7 + 12 files changed, 340 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fe2a85a319..25748e68c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1499,6 +1499,12 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" +[[package]] +name = "futures-timer" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" + [[package]] name = "futures-util" version = "0.3.28" @@ -1559,7 +1565,7 @@ dependencies = [ "cfg-if 1.0.0", "js-sys", "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "wasm-bindgen", ] @@ -1605,6 +1611,24 @@ dependencies = [ "serde", ] +[[package]] +name = "governor" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c390a940a5d157878dd057c78680a33ce3415bcd05b4799509ea44210914b4d5" +dependencies = [ + "cfg-if 1.0.0", + "dashmap", + "futures", + "futures-timer", + "no-std-compat", + "nonzero_ext", + "parking_lot 0.12.0", + "quanta", + "rand", + "smallvec", +] + [[package]] name = "grpcio" version = "0.12.1" @@ -2193,6 +2217,15 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "mach" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b823e83b2affd8f40a9ee8c29dbc56404c1e34cd2710921f2801e2cf29527afa" +dependencies = [ + "libc", +] + [[package]] name = "maplit" version = "1.0.2" @@ -3845,6 +3878,7 @@ dependencies = [ "clap 4.1.11", "displaydoc", "futures", + "governor", "grpcio", "itertools", "lazy_static", @@ -5995,7 +6029,7 @@ checksum = "e5d732bc30207a6423068df043e3d02e0735b155ad7ce1a6f76fe2baa5b158de" dependencies = [ "libc", "log", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys 0.42.0", ] @@ -6052,6 +6086,12 @@ dependencies = [ "version_check", ] +[[package]] +name = "no-std-compat" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" + [[package]] name = "nom" version = "7.1.0" @@ -6063,6 +6103,12 @@ dependencies = [ "version_check", ] +[[package]] +name = "nonzero_ext" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" + [[package]] name = "normalize-line-endings" version = "0.3.0" @@ -6709,6 +6755,22 @@ dependencies = [ "tempfile", ] +[[package]] +name = "quanta" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20afe714292d5e879d8b12740aa223c6a88f118af41870e8b6196e39a02238a8" +dependencies = [ + "crossbeam-utils", + "libc", + "mach", + "once_cell", + "raw-cpuid", + "wasi 0.10.2+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + [[package]] name = "quick-error" version = "1.2.3" @@ -6795,6 +6857,15 @@ dependencies = [ "rand_core", ] +[[package]] +name = "raw-cpuid" +version = "10.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "rayon" version = "1.7.0" @@ -8474,6 +8545,12 @@ dependencies = [ "try-lock", ] +[[package]] +name = "wasi" +version = "0.10.2+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" diff --git a/fog/ledger/connection/src/lib.rs b/fog/ledger/connection/src/lib.rs index 538c3fc7a4..25593a4e5d 100644 --- a/fog/ledger/connection/src/lib.rs +++ b/fog/ledger/connection/src/lib.rs @@ -20,4 +20,4 @@ mod untrusted; pub use untrusted::FogUntrustedLedgerGrpcClient; mod router_client; -pub use router_client::LedgerGrpcClient; +pub use router_client::{Error as RouterClientError, LedgerGrpcClient}; diff --git a/fog/ledger/connection/src/router_client.rs b/fog/ledger/connection/src/router_client.rs index 9d8a64653c..c4e914bc91 100644 --- a/fog/ledger/connection/src/router_client.rs +++ b/fog/ledger/connection/src/router_client.rs @@ -47,7 +47,7 @@ pub struct LedgerGrpcClient { response_receiver: ClientDuplexReceiver, /// Low-lever ledger API client - _client: LedgerApiClient, + client: LedgerApiClient, } impl LedgerGrpcClient { @@ -76,7 +76,7 @@ impl LedgerGrpcClient { Self { logger, attest_cipher: None, - _client: client, + client, request_sender, response_receiver, uri, @@ -84,6 +84,19 @@ impl LedgerGrpcClient { } } + /// Need ability to reconnect in case of rate limiting + pub fn reconnect(&mut self) { + self.deattest(); + + let (request_sender, response_receiver) = self + .client + .request() + .expect("Could not retrieve grpc sender and receiver."); + + self.request_sender = request_sender; + self.response_receiver = response_receiver; + } + fn is_attested(&self) -> bool { self.attest_cipher.is_some() } @@ -136,7 +149,6 @@ impl LedgerGrpcClient { &mut self, key_images: &[KeyImage], ) -> Result { - log::trace!(self.logger, "Check key images was called"); if !self.is_attested() { let verification_report = self.attest().await; verification_report?; @@ -202,7 +214,8 @@ impl LedgerGrpcClient { impl Drop for LedgerGrpcClient { fn drop(&mut self) { - block_on(self.request_sender.close()).expect("Couldn't close the router request sender"); + // closing streams that have received an error result will fail, but that's OK + let _ = block_on(self.request_sender.close()); } } diff --git a/fog/ledger/server/Cargo.toml b/fog/ledger/server/Cargo.toml index 605e9666fc..a84bbf1ac3 100644 --- a/fog/ledger/server/Cargo.toml +++ b/fog/ledger/server/Cargo.toml @@ -55,6 +55,7 @@ mc-fog-uri = { path = "../../uri" } clap = { version = "4.1", features = ["derive", "env"] } displaydoc = { version = "0.2", default-features = false } futures = "0.3" +governor = "0.5" grpcio = "0.12.1" itertools = "0.10" lazy_static = "1.4" diff --git a/fog/ledger/server/src/config.rs b/fog/ledger/server/src/config.rs index 78ad2dc2ec..4b49c92423 100644 --- a/fog/ledger/server/src/config.rs +++ b/fog/ledger/server/src/config.rs @@ -9,10 +9,10 @@ use clap::Parser; use mc_attest_core::ProviderId; use mc_common::ResponderId; use mc_fog_uri::{FogLedgerUri, KeyImageStoreUri}; -use mc_util_parse::parse_duration_in_seconds; +use mc_util_parse::{parse_duration_in_milliseconds, parse_duration_in_seconds}; use mc_util_uri::AdminUri; use serde::Serialize; -use std::{path::PathBuf, str::FromStr, time::Duration}; +use std::{num::NonZeroU32, path::PathBuf, str::FromStr, time::Duration}; /// Configuration parameters for the Fog Ledger Router service. #[derive(Clone, Parser, Serialize)] @@ -65,6 +65,15 @@ pub struct LedgerRouterConfig { #[clap(long, default_value = "86400", value_parser = parse_duration_in_seconds, env = "MC_CLIENT_AUTH_TOKEN_MAX_LIFETIME")] pub client_auth_token_max_lifetime: Duration, + /// Rate limiting burst period, in milliseconds. Defaults to 10000 (10 sec) + #[clap(long, default_value = "10000", value_parser = parse_duration_in_milliseconds, env = "MC_RATE_LIMIT_BURST_PERIOD")] + pub rate_limit_burst_period: Duration, + + /// Rate limiting maximum burst. Defaults to 80 requests. + #[clap(long, default_value = "80", env = "MC_RATE_LIMIT_MAX_BURST")] + pub rate_limit_max_burst: NonZeroU32, + + /// Path to ledger db (lmdb) /// Path to ledger db (lmdb) #[clap(long, env = "MC_LEDGER_DB")] pub ledger_db: PathBuf, diff --git a/fog/ledger/server/src/router_handlers.rs b/fog/ledger/server/src/router_handlers.rs index e7ce6a2768..c853335151 100644 --- a/fog/ledger/server/src/router_handlers.rs +++ b/fog/ledger/server/src/router_handlers.rs @@ -5,6 +5,11 @@ use crate::{ SVC_COUNTERS, }; use futures::{future::try_join_all, SinkExt, TryStreamExt}; +use governor::{ + clock::{Clock, DefaultClock}, + state::keyed::DefaultKeyedStateStore, + RateLimiter, +}; use grpcio::{ChannelBuilder, DuplexSink, RequestStream, RpcStatus, WriteFlags}; use mc_attest_api::attest; use mc_attest_enclave_api::{EnclaveMessage, NonceSession}; @@ -21,7 +26,9 @@ use mc_fog_api::{ }; use mc_fog_ledger_enclave::LedgerEnclaveProxy; use mc_fog_uri::{ConnectionUri, KeyImageStoreUri}; -use mc_util_grpc::{rpc_invalid_arg_error, ConnectionUriGrpcioChannel, ResponseStatus}; +use mc_util_grpc::{ + rpc_invalid_arg_error, rpc_resource_exhausted_error, ConnectionUriGrpcioChannel, ResponseStatus, +}; use mc_util_metrics::GrpcMethodName; use mc_util_telemetry::{create_context, tracer, BoxedTracer, FutureExt, Tracer}; use std::{collections::BTreeMap, str::FromStr, sync::Arc}; @@ -35,6 +42,7 @@ pub async fn handle_requests( mut requests: RequestStream, mut responses: DuplexSink, query_retries: usize, + rate_limit_context: Arc, DefaultKeyedStateStore>, DefaultClock>>, logger: Logger, ) -> Result<(), grpcio::Error> where @@ -52,6 +60,7 @@ where shard_clients.clone(), enclave.clone(), query_retries, + rate_limit_context.clone(), logger.clone(), ) .await; @@ -76,6 +85,7 @@ pub async fn handle_request( shard_clients: Vec>, enclave: E, query_retries: usize, + rate_limit_context: Arc, DefaultKeyedStateStore>, DefaultClock>>, logger: Logger, ) -> Result where @@ -87,16 +97,33 @@ where tracer.in_span("auth", |_cx| handle_auth_request(enclave, request, logger)) } Some(LedgerRequest_oneof_request_data::check_key_images(request)) => { - handle_query_request( - request, - enclave, - shard_clients, - query_retries, - logger, - &tracer, - ) - .with_context(create_context(&tracer, "check_key_images")) - .await + match rate_limit_context.check_key(&request.channel_id) { + Ok(()) => { + handle_query_request( + request, + enclave, + shard_clients, + query_retries, + logger, + &tracer, + ) + .with_context(create_context(&tracer, "check_key_images")) + .await + } + Err(not_until) => { + let rpc_status = rpc_resource_exhausted_error( + "Key image rate limit exceeded", + format!( + "Try again in {} milliseconds", + not_until + .wait_time_from(DefaultClock::default().now()) + .as_millis() + ), + &logger, + ); + Err(rpc_status) + } + } } None => { let rpc_status = rpc_invalid_arg_error( diff --git a/fog/ledger/server/src/router_server.rs b/fog/ledger/server/src/router_server.rs index adcf42576c..723c80da4d 100644 --- a/fog/ledger/server/src/router_server.rs +++ b/fog/ledger/server/src/router_server.rs @@ -99,6 +99,8 @@ where enclave.clone(), ledger_store_grpc_clients.clone(), config.query_retries, + config.rate_limit_burst_period, + config.rate_limit_max_burst, logger.clone(), ); diff --git a/fog/ledger/server/src/router_service.rs b/fog/ledger/server/src/router_service.rs index 1d364c69bc..da488e5fcf 100644 --- a/fog/ledger/server/src/router_service.rs +++ b/fog/ledger/server/src/router_service.rs @@ -5,6 +5,7 @@ use crate::{ SVC_COUNTERS, }; use futures::{FutureExt, TryFutureExt}; +use governor::{clock::DefaultClock, state::keyed::DefaultKeyedStateStore, Quota, RateLimiter}; use grpcio::{DuplexSink, RequestStream, RpcContext, UnarySink}; use mc_attest_api::attest::{AuthMessage, Message}; use mc_common::logger::{log, Logger}; @@ -20,7 +21,9 @@ use mc_util_telemetry::tracer; use std::{ collections::HashMap, + num::NonZeroU32, sync::{Arc, RwLock}, + time::Duration, }; #[derive(Clone)] @@ -31,6 +34,7 @@ where enclave: E, shards: Arc>>>, query_retries: usize, + rate_limit_context: Arc, DefaultKeyedStateStore>, DefaultClock>>, logger: Logger, } @@ -41,12 +45,20 @@ impl LedgerRouterService { enclave: E, shards: Arc>>>, query_retries: usize, + burst_period: Duration, + max_burst: NonZeroU32, logger: Logger, ) -> Self { + let rate_limiter = RateLimiter::keyed( + Quota::with_period(burst_period / max_burst.get()) + .unwrap() + .allow_burst(max_burst), + ); Self { enclave, shards, query_retries, + rate_limit_context: Arc::new(rate_limiter), logger, } } @@ -80,6 +92,7 @@ where requests, responses, self.query_retries, + self.rate_limit_context.clone(), logger.clone(), ) .map_err(move |err| log::error!(&logger, "failed to reply: {}", err)) diff --git a/fog/ledger/server/tests/router_connection.rs b/fog/ledger/server/tests/router_connection.rs index dfea289f92..a35ff6f6dd 100644 --- a/fog/ledger/server/tests/router_connection.rs +++ b/fog/ledger/server/tests/router_connection.rs @@ -36,7 +36,7 @@ use mc_util_grpc::{GrpcRetryConfig, CHAIN_ID_MISMATCH_ERR_MSG}; use mc_util_test_helper::{CryptoRng, RngCore, RngType, SeedableRng}; use mc_util_uri::AdminUri; use mc_watcher::watcher_db::WatcherDB; -use std::{path::PathBuf, str::FromStr, sync::Arc, thread::sleep, time::Duration}; +use std::{num::NonZeroU32, path::PathBuf, str::FromStr, sync::Arc, thread::sleep, time::Duration}; use tempfile::TempDir; use url::Url; @@ -135,6 +135,8 @@ fn fog_ledger_merkle_proofs_test(logger: Logger) { ias_api_key: Default::default(), client_auth_token_secret: None, client_auth_token_max_lifetime: Default::default(), + rate_limit_burst_period: Duration::from_secs(10), + rate_limit_max_burst: NonZeroU32::new(80).unwrap(), query_retries: 3, omap_capacity: OMAP_CAPACITY, }; @@ -415,6 +417,8 @@ fn fog_ledger_key_images_test(logger: Logger) { ias_api_key: Default::default(), client_auth_token_secret: None, client_auth_token_max_lifetime: Default::default(), + rate_limit_burst_period: Duration::from_secs(10), + rate_limit_max_burst: NonZeroU32::new(80).unwrap(), query_retries: 3, omap_capacity: OMAP_CAPACITY, }; @@ -615,6 +619,8 @@ fn fog_ledger_blocks_api_test(logger: Logger) { ias_api_key: Default::default(), client_auth_token_secret: None, client_auth_token_max_lifetime: Default::default(), + rate_limit_burst_period: Duration::from_secs(10), + rate_limit_max_burst: NonZeroU32::new(80).unwrap(), query_retries: 3, omap_capacity: OMAP_CAPACITY, }; @@ -780,6 +786,8 @@ fn fog_ledger_untrusted_tx_out_api_test(logger: Logger) { ias_api_key: Default::default(), client_auth_token_secret: None, client_auth_token_max_lifetime: Default::default(), + rate_limit_burst_period: Duration::from_secs(10), + rate_limit_max_burst: NonZeroU32::new(80).unwrap(), query_retries: 3, omap_capacity: OMAP_CAPACITY, }; @@ -995,6 +1003,8 @@ fn fog_router_unary_key_image_test(logger: Logger) { ias_api_key: Default::default(), client_auth_token_secret: None, client_auth_token_max_lifetime: Default::default(), + rate_limit_burst_period: Duration::from_secs(10), + rate_limit_max_burst: NonZeroU32::new(80).unwrap(), query_retries: 3, omap_capacity: OMAP_CAPACITY, }; diff --git a/fog/ledger/server/tests/router_integration.rs b/fog/ledger/server/tests/router_integration.rs index e5b8fa2a71..c3e447e222 100644 --- a/fog/ledger/server/tests/router_integration.rs +++ b/fog/ledger/server/tests/router_integration.rs @@ -1,5 +1,6 @@ // Copyright (c) 2018-2023 The MobileCoin Foundation +use grpcio::Error::RpcFailure; use mc_account_keys::{AccountKey, PublicAddress}; use mc_api::watcher::TimestampResultCode; use mc_attest_net::{Client as AttestClient, RaClient}; @@ -10,7 +11,9 @@ use mc_common::{ logger::{log, Logger}, time::SystemTimeProvider, }; -use mc_fog_ledger_connection::{KeyImageResultExtension, LedgerGrpcClient}; +use mc_fog_ledger_connection::{ + KeyImageResultExtension, LedgerGrpcClient, RouterClientError::Grpc, +}; use mc_fog_ledger_enclave::LedgerSgxEnclave; use mc_fog_ledger_server::{ sharding_strategy::EpochShardingStrategy, KeyImageStoreServer, LedgerRouterConfig, @@ -31,9 +34,11 @@ use serde::{Deserialize, Serialize}; use std::{ collections::HashMap, net::{IpAddr, Ipv4Addr, SocketAddr}, + num::NonZeroU32, path::{Path, PathBuf}, str::FromStr, sync::Arc, + time::{Duration, Instant}, }; use tempfile::TempDir; use url::Url; @@ -224,6 +229,12 @@ fn create_router( ias_api_key: Default::default(), client_auth_token_secret: None, client_auth_token_max_lifetime: Default::default(), + rate_limit_burst_period: test_config + .rate_limit_period + .unwrap_or(Duration::from_secs(10)), + rate_limit_max_burst: test_config + .rate_limit_max + .unwrap_or(NonZeroU32::new(80).unwrap()), query_retries: 3, omap_capacity: test_config.omap_capacity, }; @@ -341,6 +352,8 @@ struct TestEnvironmentConfig { router_address: SocketAddr, router_admin_address: SocketAddr, shards: Vec, + rate_limit_period: Option, + rate_limit_max: Option, omap_capacity: u64, } @@ -398,6 +411,8 @@ async fn smoke_test() { router_address: free_sockaddr(), router_admin_address: free_sockaddr(), shards: shards_config, + rate_limit_period: None, + rate_limit_max: None, omap_capacity: 1000, }; @@ -485,7 +500,6 @@ async fn smoke_test() { #[tokio::test(flavor = "multi_thread")] async fn overlapping_stores() { let logger = logger::create_test_logger("overlapping_stores".to_string()); - log::info!(logger, "test"); // Three shards, three stores each, correct config, each stores three blocks, // each has three users with three keys each - but the blocks overlap (so // total of 5 blocks) @@ -515,6 +529,8 @@ async fn overlapping_stores() { router_address: free_sockaddr(), router_admin_address: free_sockaddr(), shards: shards_config, + rate_limit_period: None, + rate_limit_max: None, omap_capacity: 1000, }; @@ -598,3 +614,129 @@ async fn overlapping_stores() { TimestampResultCode::TimestampFound as u32 ); } + +#[tokio::test(flavor = "multi_thread")] +async fn rate_limit() { + let logger = logger::create_test_logger("rate_limit".to_string()); + let rate_limit_period_secs = 1; + let rate_limit_max = 2; + // One shard with one store that owns all blocks. Three blocks. + let num_shards = 1; + let stores_per_shard = 1; + let blocks_per_shard = 3; + let mut rng = RngType::from_seed([0u8; 32]); + let mut shards_config = vec![]; + for i in 0..num_shards { + let mut stores_config = vec![]; + for _ in 0..stores_per_shard { + let store = StoreConfig { + address: free_sockaddr(), + block_range: None, + omap_capacity: 1000, + }; + stores_config.push(store); + } + let shard = ShardConfig { + address: free_sockaddr(), + block_range: BlockRange::new_from_length(i + 1, blocks_per_shard), + stores: stores_config, + }; + shards_config.push(shard); + } + let config = TestEnvironmentConfig { + router_address: free_sockaddr(), + router_admin_address: free_sockaddr(), + shards: shards_config, + rate_limit_period: Some(Duration::from_secs(rate_limit_period_secs)), + rate_limit_max: Some(NonZeroU32::new(rate_limit_max).unwrap()), + omap_capacity: 1000, + }; + + let mut blocks_config = vec![]; + let mut key_index = 0; + let num_blocks = 3; + let users_per_block = 3; + let keys_per_user = 3; + for _ in 0..num_blocks { + let mut block = HashMap::new(); + for _ in 0..users_per_block { + let account = AccountKey::random_with_fog(&mut rng); + let mut keys = vec![]; + for _ in 0..keys_per_user { + keys.push(KeyImage::from(key_index)); + key_index += 1; + } + block.insert(account.default_subaddress(), keys); + } + blocks_config.push(block); + } + + let grpc_env = Arc::new(grpcio::EnvBuilder::new().build()); + + let mut test_environment = create_env(config, blocks_config, grpc_env, logger.clone()); + + // Check that we can get all the key images from the shard + let keys_per_block = users_per_block * keys_per_user; + let keys: Vec<_> = (0..key_index).map(KeyImage::from).collect(); + let response = test_environment + .router_client + .check_key_images(&keys) + .await + .expect("check_key_images failed"); + assert_eq!(response.results.len(), key_index as usize); + for i in 0..key_index { + let key = KeyImage::from(i); + assert_eq!(response.results[i as usize].key_image, key); + assert_eq!( + response.results[i as usize].status(), + Ok(Some((i / keys_per_block) + 1)) + ); + assert_eq!( + response.results[i as usize].timestamp_result_code, + TimestampResultCode::TimestampFound as u32 + ); + } + + // Trigger the rate limit (hammer as hard as possible for 3 seconds) + let start = Instant::now(); + let mut limited = false; + while Instant::now().duration_since(start) < Duration::from_secs(rate_limit_period_secs * 3) { + if let Err(Grpc(RpcFailure(e))) = + test_environment.router_client.check_key_images(&keys).await + { + if e.code() == grpcio::RpcStatusCode::RESOURCE_EXHAUSTED { + limited = true; + break; + } + } + } + + assert!(limited, "failed to trigger the rate limit"); + + // Reconnect to clear rate limit condition + test_environment.router_client.reconnect(); + + // Wait 3s for the rate limit to recover + tokio::time::sleep(Duration::from_secs(3)).await; + + // Confirm that we can still get all the key images from the shard + let keys: Vec<_> = (0..key_index).map(KeyImage::from).collect(); + let response = test_environment + .router_client + .check_key_images(&keys) + .await + .expect("check_key_images failed"); + assert_eq!(response.results.len(), key_index as usize); + for i in 0..key_index { + let key = KeyImage::from(i); + assert_eq!(response.results[i as usize].key_image, key); + assert_eq!( + response.results[i as usize].status(), + Ok(Some((i / keys_per_block) + 1)) + ); + assert_eq!( + response.results[i as usize].timestamp_result_code, + TimestampResultCode::TimestampFound as u32 + ); + } +} diff --git a/util/grpc/src/lib.rs b/util/grpc/src/lib.rs index e7a3d8761c..e99ef7aac9 100644 --- a/util/grpc/src/lib.rs +++ b/util/grpc/src/lib.rs @@ -266,6 +266,22 @@ pub fn rpc_unavailable_error( ) } +/// Resource exhausted error may be returned if a rate limit is exceeded. +#[inline] +pub fn rpc_resource_exhausted_error( + context: S, + err: E, + logger: &Logger, +) -> RpcStatus { + report_err_with_code!( + context, + err, + RpcStatusCode::RESOURCE_EXHAUSTED, + logger, + Level::Debug + ) +} + /// Converts a serialization Error to an RpcStatus error. pub fn ser_to_rpc_err(error: mc_util_serial::encode::Error, logger: &Logger) -> RpcStatus { rpc_internal_error("Serialization", error, logger) diff --git a/util/parse/src/lib.rs b/util/parse/src/lib.rs index fa9dd96d80..54681f171c 100644 --- a/util/parse/src/lib.rs +++ b/util/parse/src/lib.rs @@ -19,6 +19,13 @@ pub fn parse_duration_in_seconds(src: &str) -> Result Result { + Ok(Duration::from_millis(u64::from_str(src)?)) +} + /// Load a CSS file from disk. This represents a signature over an enclave, /// and contains attestation parameters like MRENCLAVE and MRSIGNER as well /// as other stuff.