Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
williampsmith committed Jan 17, 2025
1 parent 2664a75 commit 3a7983d
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 9 deletions.
43 changes: 34 additions & 9 deletions crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ impl ValidatorService {
let _handle_tx_metrics_guard = metrics.handle_transaction_latency.start_timer();

let tx_verif_metrics_guard = metrics.tx_verification_latency.start_timer();
let signers: NonEmpty<_> = transaction.clone().intent_message().value.signers();
let transaction = epoch_store.verify_transaction(transaction).tap_err(|_| {
metrics.signature_errors.inc();
})?;
Expand Down Expand Up @@ -537,6 +538,8 @@ impl ValidatorService {
let transaction = epoch_store.verify_transaction(transaction).tap_err(|_| {
metrics.signature_errors.inc();
})?;
// IMPORTANT: we cannot extract
let signers: NonEmpty<_> = transaction.clone().intent_message().value.signers();
drop(tx_verif_metrics_guard);

// Enable Trace Propagation across spans/processes using tx_digest
Expand Down Expand Up @@ -616,7 +619,14 @@ impl ValidatorService {
include_auxiliary_data: bool,
epoch_store: &Arc<AuthorityPerEpochStore>,
wait_for_effects: bool,
) -> Result<(Option<Vec<HandleCertificateResponseV3>>, Weight), tonic::Status> {
) -> Result<
(
Option<Vec<HandleCertificateResponseV3>>,
Weight,
NonEmpty<SuiAddress>,
),
tonic::Status,
> {
// Validate if cert can be executed
// Fullnode does not serve handle_certificate call.
fp_ensure!(
Expand Down Expand Up @@ -697,14 +707,19 @@ impl ValidatorService {
}
}

let verified_certificates = {
let verified_certificates_and_senders = {
let _timer = self.metrics.cert_verification_latency.start_timer();
epoch_store
let verified_certs = epoch_store
.signature_verifier
.multi_verify_certs(certificates.into())
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?
.collect::<Result<Vec<_>, _>>()?;
let signers = certificates
.iter()
.map(|cert| cert.clone().intent_message().value.signers())
.collect::<Vec<_>>();
verified_certs.into_iter().zip(signers).collect::<Vec<_>>()
};
let consensus_transactions =
NonEmpty::collect(verified_certificates.iter().map(|certificate| {
Expand Down Expand Up @@ -868,7 +883,13 @@ impl ValidatorService {
}
}

type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
struct ServiceResponse<T> {
response: tonic::Response<T>,
weight: Weight,
signers: Option<NonEmpty<SuiAddress>>,
}

type WrappedServiceResponse<T> = Result<ServiceResponse<T>, tonic::Status>;

impl ValidatorService {
async fn transaction_impl(
Expand Down Expand Up @@ -1319,11 +1340,16 @@ impl ValidatorService {
client: Option<IpAddr>,
wrapped_response: WrappedServiceResponse<T>,
) -> Result<tonic::Response<T>, tonic::Status> {
let (error, spam_weight, unwrapped_response) = match wrapped_response {
Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
let (error, spam_weight, signers, unwrapped_response) = match wrapped_response {
Ok(ServiceResponse {
response: result,
weight: spam_weight,
signers,
}) => (None, spam_weight.clone(), signers, Ok(result)),
Err(status) => (
Some(SuiError::from(status.clone())),
Weight::zero(),
None,
Err(status.clone()),
),
};
Expand All @@ -1338,6 +1364,7 @@ impl ValidatorService {
(error_weight, error_type)
}),
spam_weight,
signers,
timestamp: SystemTime::now(),
})
}
Expand Down Expand Up @@ -1401,7 +1428,6 @@ impl Validator for ValidatorService {
request: tonic::Request<Transaction>,
) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
let validator_service = self.clone();

// Spawns a task which handles the transaction. The task will unconditionally continue
// processing in the event that the client connection is dropped.
spawn_monitored_task!(async move {
Expand All @@ -1418,7 +1444,6 @@ impl Validator for ValidatorService {
request: tonic::Request<HandleTransactionRequestV2>,
) -> Result<tonic::Response<HandleTransactionResponseV2>, tonic::Status> {
let validator_service = self.clone();

// Spawns a task which handles the transaction. The task will unconditionally continue
// processing in the event that the client connection is dropped.
spawn_monitored_task!(async move {
Expand Down
6 changes: 6 additions & 0 deletions crates/sui-core/src/traffic_controller/policies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ pub struct TrafficTally {
pub through_fullnode: Option<IpAddr>,
pub error_info: Option<(Weight, String)>,
pub spam_weight: Weight,
pub signers: Option<NonEmpty<SuiAddress>>,
pub timestamp: SystemTime,
}

Expand All @@ -237,12 +238,14 @@ impl TrafficTally {
through_fullnode: Option<IpAddr>,
error_info: Option<(Weight, String)>,
spam_weight: Weight,
signers: Option<NonEmpty<SuiAddress>>,
) -> Self {
Self {
direct,
through_fullnode,
error_info,
spam_weight,
signers,
timestamp: SystemTime::now(),
}
}
Expand Down Expand Up @@ -543,20 +546,23 @@ mod tests {
through_fullnode: Some(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4))),
error_info: None,
spam_weight: Weight::one(),
signers: None,
timestamp: SystemTime::now(),
};
let bob = TrafficTally {
direct: Some(IpAddr::V4(Ipv4Addr::new(8, 7, 6, 5))),
through_fullnode: Some(IpAddr::V4(Ipv4Addr::new(4, 3, 2, 1))),
error_info: None,
spam_weight: Weight::one(),
signers: None,
timestamp: SystemTime::now(),
};
let charlie = TrafficTally {
direct: Some(IpAddr::V4(Ipv4Addr::new(8, 7, 6, 5))),
through_fullnode: Some(IpAddr::V4(Ipv4Addr::new(5, 6, 7, 8))),
error_info: None,
spam_weight: Weight::one(),
signers: None,
timestamp: SystemTime::now(),
};

Expand Down

0 comments on commit 3a7983d

Please sign in to comment.