Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: store failed receipts and RAVs #108

Merged
merged 1 commit into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions migrations/20230912220523_tap_receipts.down.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ DROP TRIGGER IF EXISTS receipt_update ON scalar_tap_receipts CASCADE;
DROP FUNCTION IF EXISTS scalar_tap_receipt_notify() CASCADE;

DROP TABLE IF EXISTS scalar_tap_receipts CASCADE;
DROP TABLE IF EXISTS scalar_tap_receipts_invalid CASCADE;
11 changes: 11 additions & 0 deletions migrations/20230912220523_tap_receipts.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,14 @@ CREATE TRIGGER receipt_update AFTER INSERT OR UPDATE

CREATE INDEX IF NOT EXISTS scalar_tap_receipts_allocation_id_idx ON scalar_tap_receipts (allocation_id);
CREATE INDEX IF NOT EXISTS scalar_tap_receipts_timestamp_ns_idx ON scalar_tap_receipts (timestamp_ns);

-- This table is used to store invalid receipts (receipts that fail at least one of the checks in the tap-agent).
-- Used for logging and debugging purposes.
CREATE TABLE IF NOT EXISTS scalar_tap_receipts_invalid (
id BIGSERIAL PRIMARY KEY,
allocation_id CHAR(40) NOT NULL,
sender_address CHAR(40) NOT NULL,
timestamp_ns NUMERIC(20) NOT NULL,
value NUMERIC(39) NOT NULL,
received_receipt JSON NOT NULL
);
1 change: 1 addition & 0 deletions migrations/20230915230734_tap_ravs.down.sql
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
DROP TABLE IF EXISTS scalar_tap_ravs CASCADE;
DROP TABLE IF EXISTS scalar_tap_rav_requests_failed CASCADE;
11 changes: 11 additions & 0 deletions migrations/20230915230734_tap_ravs.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,14 @@ CREATE TABLE IF NOT EXISTS scalar_tap_ravs (
final BOOLEAN DEFAULT FALSE NOT NULL,
PRIMARY KEY (allocation_id, sender_address)
);

-- This table is used to store failed RAV requests.
-- Used for logging and debugging purposes.
CREATE TABLE IF NOT EXISTS scalar_tap_rav_requests_failed (
id BIGSERIAL PRIMARY KEY,
allocation_id CHAR(40) NOT NULL,
sender_address CHAR(40) NOT NULL,
expected_rav JSON NOT NULL,
rav_response JSON NOT NULL,
reason TEXT NOT NULL
);
130 changes: 120 additions & 10 deletions tap-agent/src/tap/sender_allocation_relationship.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright 2023-, GraphOps and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

use std::{collections::HashMap, sync::Arc, time::Duration};
use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};

use alloy_primitives::Address;
use alloy_sol_types::Eip712Domain;
use anyhow::ensure;
use anyhow::{anyhow, ensure};
use ethereum_types::U256;
use eventuals::Eventual;
use indexer_common::prelude::SubgraphClient;
Expand All @@ -14,8 +14,9 @@ use sqlx::{types::BigDecimal, PgPool};
use tap_aggregator::jsonrpsee_helpers::JsonRpcResponse;
use tap_core::{
eip_712_signed_message::EIP712SignedMessage,
receipt_aggregate_voucher::ReceiptAggregateVoucher, tap_manager::RAVRequest,
tap_receipt::ReceiptCheck,
receipt_aggregate_voucher::ReceiptAggregateVoucher,
tap_manager::RAVRequest,
tap_receipt::{ReceiptCheck, ReceivedReceipt},
};
use tokio::{
sync::{Mutex, MutexGuard},
Expand Down Expand Up @@ -286,7 +287,7 @@ impl SenderAllocationRelationship {
let RAVRequest {
valid_receipts,
previous_rav,
invalid_receipts: _,
invalid_receipts,
expected_rav,
} = inner
.tap_manager
Expand All @@ -297,6 +298,20 @@ impl SenderAllocationRelationship {
)
.await?;

// Invalid receipts
if !invalid_receipts.is_empty() {
warn!(
"Found {} invalid receipts for allocation {} and sender {}.",
invalid_receipts.len(),
inner.allocation_id,
inner.sender
);

// Save invalid receipts to the database for logs.
// TODO: consider doing that in a spawned task?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a quick note, maybe we want to use channels with the spawned task for the future instead of spawning tasks on demand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

Self::store_invalid_receipts(inner, &invalid_receipts).await?;
}

// TODO: Request compression and response decompression. Also a fancy user agent?
let client = HttpClientBuilder::default()
.request_timeout(Duration::from_secs(
Expand All @@ -319,12 +334,39 @@ impl SenderAllocationRelationship {
warn!("Warnings from sender's TAP aggregator: {:?}", warnings);
}

inner
match inner
.tap_manager
.verify_and_store_rav(expected_rav, response.data)
.await?;

// TODO: Handle invalid receipts
.verify_and_store_rav(expected_rav.clone(), response.data.clone())
.await
{
Ok(_) => {}

// Adapter errors are local software errors. Shouldn't be a problem with the sender.
Err(tap_core::Error::AdapterError { source_error: e }) => {
anyhow::bail!("TAP Adapter error while storing RAV: {:?}", e)
}

// The 3 errors below signal an invalid RAV, which should be about problems with the
// sender. The sender could be malicious.
Err(
e @ tap_core::Error::InvalidReceivedRAV {
expected_rav: _,
received_rav: _,
}
| e @ tap_core::Error::SignatureError(_)
| e @ tap_core::Error::InvalidRecoveredSigner { address: _ },
) => {
Self::store_failed_rav(inner, &expected_rav, &response.data, &e.to_string())
.await?;
anyhow::bail!("Invalid RAV, sender could be malicious: {:?}.", e);
}

// All relevant errors should be handled above. If we get here, we forgot to handle
// an error case.
Err(e) => {
anyhow::bail!("Error while verifying and storing RAV: {:?}", e);
}
}

// This is not the fastest way to do this, but it's the easiest.
// Note: we rely on the unaggregated_fees lock to make sure we don't miss any receipt
Expand Down Expand Up @@ -395,6 +437,74 @@ impl SenderAllocationRelationship {
pub async fn state(&self) -> State {
*self.inner.state.lock().await
}

async fn store_invalid_receipts(
inner: &Inner,
receipts: &[ReceivedReceipt],
) -> anyhow::Result<()> {
for received_receipt in receipts.iter() {
sqlx::query!(
r#"
INSERT INTO scalar_tap_receipts_invalid (
allocation_id,
sender_address,
timestamp_ns,
value,
received_receipt
)
VALUES ($1, $2, $3, $4, $5)
"#,
inner
.allocation_id
.to_string()
.trim_start_matches("0x")
.to_owned(),
inner.sender.to_string().trim_start_matches("0x").to_owned(),
BigDecimal::from(received_receipt.signed_receipt().message.timestamp_ns),
BigDecimal::from_str(&received_receipt.signed_receipt().message.value.to_string())?,
serde_json::to_value(received_receipt)?
)
.execute(&inner.pgpool)
.await
.map_err(|e| anyhow!("Failed to store failed receipt: {:?}", e))?;
}

Ok(())
}

async fn store_failed_rav(
inner: &Inner,
expected_rav: &ReceiptAggregateVoucher,
rav: &EIP712SignedMessage<ReceiptAggregateVoucher>,
reason: &str,
) -> anyhow::Result<()> {
sqlx::query!(
r#"
INSERT INTO scalar_tap_rav_requests_failed (
allocation_id,
sender_address,
expected_rav,
rav_response,
reason
)
VALUES ($1, $2, $3, $4, $5)
"#,
inner
.allocation_id
.to_string()
.trim_start_matches("0x")
.to_owned(),
inner.sender.to_string().trim_start_matches("0x").to_owned(),
serde_json::to_value(expected_rav)?,
serde_json::to_value(rav)?,
reason
)
.execute(&inner.pgpool)
.await
.map_err(|e| anyhow!("Failed to store failed RAV: {:?}", e))?;

Ok(())
}
}

impl Drop for SenderAllocationRelationship {
Expand Down