diff --git a/.sqlx/query-3b2d3a3d4290034df659a36d02bcdb377777d10f001bd22f037edea42416574f.json b/.sqlx/query-3b2d3a3d4290034df659a36d02bcdb377777d10f001bd22f037edea42416574f.json new file mode 100644 index 000000000..62a6daac5 --- /dev/null +++ b/.sqlx/query-3b2d3a3d4290034df659a36d02bcdb377777d10f001bd22f037edea42416574f.json @@ -0,0 +1,29 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT id, receipt\n FROM scalar_tap_receipts\n WHERE allocation_id = $1 AND $2::numrange @> timestamp_ns\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "receipt", + "type_info": "Json" + } + ], + "parameters": { + "Left": [ + "Bpchar", + "NumRange" + ] + }, + "nullable": [ + false, + false + ] + }, + "hash": "3b2d3a3d4290034df659a36d02bcdb377777d10f001bd22f037edea42416574f" +} diff --git a/.sqlx/query-61f6287e64013007ee9806e7deb1db4b71a9db2096f6b2c9716c9116280b6352.json b/.sqlx/query-61f6287e64013007ee9806e7deb1db4b71a9db2096f6b2c9716c9116280b6352.json new file mode 100644 index 000000000..daabc9da5 --- /dev/null +++ b/.sqlx/query-61f6287e64013007ee9806e7deb1db4b71a9db2096f6b2c9716c9116280b6352.json @@ -0,0 +1,24 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO scalar_tap_receipts (allocation_id, timestamp_ns, receipt)\n VALUES ($1, $2, $3)\n RETURNING id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Bpchar", + "Numeric", + "Json" + ] + }, + "nullable": [ + false + ] + }, + "hash": "61f6287e64013007ee9806e7deb1db4b71a9db2096f6b2c9716c9116280b6352" +} diff --git a/.sqlx/query-eb01ccbdf3095a66341d72dd22d5557de5c78cc99d160ac17cc3eb965623638b.json b/.sqlx/query-eb01ccbdf3095a66341d72dd22d5557de5c78cc99d160ac17cc3eb965623638b.json new file mode 100644 index 000000000..da58ce480 --- /dev/null +++ b/.sqlx/query-eb01ccbdf3095a66341d72dd22d5557de5c78cc99d160ac17cc3eb965623638b.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM scalar_tap_receipts\n WHERE $1::numrange @> timestamp_ns\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "NumRange" + ] + }, + "nullable": [] + }, + "hash": "eb01ccbdf3095a66341d72dd22d5557de5c78cc99d160ac17cc3eb965623638b" +} diff --git a/Cargo.lock b/Cargo.lock index 2c9820520..843be1c75 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -150,16 +150,15 @@ dependencies = [ [[package]] name = "anstream" -version = "0.3.2" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ca84f3628370c59db74ee214b3263d58f9aadd9b4fe7e711fd87dc452b7f163" +checksum = "b1f58811cfac344940f1a400b6e6231ce35171f614f26439e80f8c1465c5cc0c" dependencies = [ "anstyle", "anstyle-parse", "anstyle-query", "anstyle-wincon", "colorchoice", - "is-terminal", "utf8parse", ] @@ -189,9 +188,9 @@ dependencies = [ [[package]] name = "anstyle-wincon" -version = "1.0.1" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "180abfa45703aebe0093f79badacc01b8fd4ea2e35118747e5811127f926e188" +checksum = "58f54d10c6dfa51283a066ceab3ec1ab78d13fae00aa49243a45e4571fb79dfd" dependencies = [ "anstyle", "windows-sys 0.48.0", @@ -871,33 +870,31 @@ dependencies = [ [[package]] name = "clap" -version = "4.3.1" +version = "4.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4ed2379f8603fa2b7509891660e802b88c70a79a6427a70abb5968054de2c28" +checksum = "b1d7b8d5ec32af0fadc644bf1fd509a688c2103b185644bb1e29d164e0703136" dependencies = [ "clap_builder", "clap_derive", - "once_cell", ] [[package]] name = "clap_builder" -version = "4.3.1" +version = "4.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72394f3339a76daf211e57d4bcb374410f3965dcc606dd0e03738c7888766980" +checksum = "5179bb514e4d7c2051749d8fcefa2ed6d06a9f4e6d69faf3805f5d80b8cf8d56" dependencies = [ "anstream", "anstyle", - "bitflags 1.3.2", "clap_lex", "strsim", ] [[package]] name = "clap_derive" -version = "4.3.1" +version = "4.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59e9ef9a08ee1c0e1f2e162121665ac45ac3783b0f897db7244ae75ad9a8f65b" +checksum = "0862016ff20d69b84ef8247369fabf5c008a7417002411897d40ee1f4532b873" dependencies = [ "heck", "proc-macro2", @@ -2690,6 +2687,34 @@ dependencies = [ "wiremock", ] +[[package]] +name = "indexer_tap_agent" +version = "0.1.0" +dependencies = [ + "alloy-primitives", + "alloy-sol-types", + "anyhow", + "async-trait", + "clap", + "confy", + "dotenvy", + "ethereum-types", + "ethers-signers", + "faux", + "indexer-common", + "log", + "reqwest", + "rstest 0.18.2", + "serde", + "serde_json", + "sqlx", + "tap_core", + "thiserror", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "indexmap" version = "1.9.3" @@ -4105,6 +4130,12 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" +[[package]] +name = "relative-path" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c707298afce11da2efef2f600116fa93ffa7a032b5d7b628aa17711ec81383ca" + [[package]] name = "rend" version = "0.4.0" @@ -4276,7 +4307,19 @@ checksum = "de1bb486a691878cd320c2f0d319ba91eeaa2e894066d8b5f8f117c000e9d962" dependencies = [ "futures", "futures-timer", - "rstest_macros", + "rstest_macros 0.17.0", + "rustc_version", +] + +[[package]] +name = "rstest" +version = "0.18.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97eeab2f3c0a199bc4be135c36c924b6590b88c377d416494288c14f2db30199" +dependencies = [ + "futures", + "futures-timer", + "rstest_macros 0.18.2", "rustc_version", ] @@ -4294,6 +4337,23 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "rstest_macros" +version = "0.18.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d428f8247852f894ee1be110b375111b586d4fa431f6c46e64ba5a0dcccbe605" +dependencies = [ + "cfg-if", + "glob", + "proc-macro2", + "quote", + "regex", + "relative-path", + "rustc_version", + "syn 2.0.28", + "unicode-ident", +] + [[package]] name = "ruint" version = "1.10.1" @@ -5342,7 +5402,7 @@ dependencies = [ "ethers-core", "rand 0.8.5", "rand_core 0.6.4", - "rstest", + "rstest 0.17.0", "serde", "strum 0.24.1", "strum_macros 0.24.3", diff --git a/Cargo.toml b/Cargo.toml index c0b504471..61b4060f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ members = [ "common", "service", + "tap_agent", ] resolver = "2" diff --git a/common/src/escrow_monitor.rs b/common/src/escrow_monitor.rs index 0c2e6ac0e..dfb5ac3db 100644 --- a/common/src/escrow_monitor.rs +++ b/common/src/escrow_monitor.rs @@ -169,6 +169,10 @@ impl EscrowMonitor { self.inner.sender_accounts.read().await } + pub async fn get_account_balance(&self, address: &Address) -> Option { + self.inner.sender_accounts.read().await.get(address).copied() + } + /// Returns true if the given address has a non-zero balance in the escrow contract. /// /// Note that this method does not take into account the outstanding TAP balance (Escrow balance - TAP receipts). diff --git a/migrations/20230915230734_tap_ravs.down.sql b/migrations/20230915230734_tap_ravs.down.sql new file mode 100644 index 000000000..31dca1195 --- /dev/null +++ b/migrations/20230915230734_tap_ravs.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS scalar_tap_ravs CASCADE; diff --git a/migrations/20230915230734_tap_ravs.up.sql b/migrations/20230915230734_tap_ravs.up.sql new file mode 100644 index 000000000..b7c4ba566 --- /dev/null +++ b/migrations/20230915230734_tap_ravs.up.sql @@ -0,0 +1,5 @@ +CREATE TABLE IF NOT EXISTS scalar_tap_ravs ( + allocation_id CHAR(40) PRIMARY KEY, + timestamp_ns NUMERIC(20) NOT NULL, + rav JSON NOT NULL +); diff --git a/service/src/main.rs b/service/src/main.rs index d93ca1abf..e0f279ecf 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -30,9 +30,6 @@ mod server; mod tap_manager; mod util; -#[cfg(test)] -mod test_vectors; - /// Create Indexer service App /// /// Initialization for server and Query processor diff --git a/tap_agent/Cargo.toml b/tap_agent/Cargo.toml new file mode 100644 index 000000000..70e476284 --- /dev/null +++ b/tap_agent/Cargo.toml @@ -0,0 +1,42 @@ +[package] +name = "indexer_tap_agent" +version = "0.1.0" +edition = "2021" +publish = false + +[[bin]] +name = "indexer_tap_agent" +path = "src/main.rs" + +[dependencies] +alloy-primitives = "0.3.3" +alloy-sol-types = "0.3.2" +anyhow = "1.0.72" +async-trait = "0.1.72" +clap = { version = "4.4.3", features = ["derive", "env"] } +confy = "0.5.1" +dotenvy = "0.15.7" +ethereum-types = "0.14.1" +indexer-common = { version = "0.1.0", path = "../common" } +log = "0.4.19" +reqwest = "0.11.20" +serde = "1.0.188" +serde_json = "1.0.104" +sqlx = { version = "0.7.1", features = ["postgres", "runtime-tokio", "bigdecimal", "rust_decimal"] } +tap_core = "0.5.1" +thiserror = "1.0.44" +tokio = { version = "1.29.1", features = ["rt"] } +tracing = "0.1.37" +tracing-subscriber = { version = "0.3", features = [ + "env-filter", + "ansi", + "fmt", + "std", + "json", +] } + +[dev-dependencies] +ethers-signers = "2.0.8" +faux = "0.1.10" +indexer-common = { path = "../common", features = ["mock"] } +rstest = "0.18.1" diff --git a/tap_agent/src/agent.rs b/tap_agent/src/agent.rs new file mode 100644 index 000000000..7d835271d --- /dev/null +++ b/tap_agent/src/agent.rs @@ -0,0 +1,14 @@ +use indexer_common::prelude::{GraphNodeInstance, EscrowMonitor}; + +use crate::config; + +pub async fn start_agent(config: config::Cli) { + let graph_node = GraphNodeInstance::new(&config.indexer_infrastructure.graph_node_query_endpoint); + + let escrow_monitor = EscrowMonitor::new( + graph_node.clone(), + config.escrow_subgraph.escrow_subgraph_deployment, + config.ethereum.indexer_address, + config.escrow_subgraph.escrow_syncing_interval + ); +} diff --git a/tap_agent/src/config.rs b/tap_agent/src/config.rs new file mode 100644 index 000000000..ccadc00df --- /dev/null +++ b/tap_agent/src/config.rs @@ -0,0 +1,343 @@ +// Copyright 2023-, GraphOps and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use clap::{command, Args, Parser, ValueEnum}; + +use alloy_primitives::Address; +use dotenvy::dotenv; +use serde::{Deserialize, Serialize}; +use tracing::subscriber::{set_global_default, SetGlobalDefaultError}; +use tracing_subscriber::{EnvFilter, FmtSubscriber}; + +// TODO: DEDUPLICATE? + +#[derive(Clone, Debug, Parser, Serialize, Deserialize, Default)] +#[clap( + name = "indexer-tap-agent", + about = "Agent that manages Timeline Aggregation Protocol (TAP) receipts as well as Receipt Aggregate Voucher \ + (RAV) requests for an indexer." +)] +#[command(author, version, about, long_about = None, arg_required_else_help = true)] +pub struct Cli { + #[command(flatten)] + pub ethereum: Ethereum, + #[command(flatten)] + pub indexer_infrastructure: IndexerInfrastructure, + #[command(flatten)] + pub postgres: Postgres, + #[command(flatten)] + pub network_subgraph: NetworkSubgraph, + #[command(flatten)] + pub escrow_subgraph: EscrowSubgraph, + + #[arg( + short, + value_name = "config", + env = "CONFIG", + help = "Indexer service configuration file (YAML format)" + )] + config: Option, +} + +#[derive(Clone, Debug, Args, Serialize, Deserialize, Default)] +#[group(required = true, multiple = true)] +pub struct Ethereum { + #[clap( + long, + value_name = "ethereum-node-provider", + env = "ETH_NODE", + help = "Ethereum node or provider URL" + )] + pub ethereum: String, + #[clap( + long, + value_name = "ethereum-polling-interval", + env = "ETHEREUM_POLLING_INTERVAL", + default_value_t = 4000, + help = "Polling interval for the Ethereum provider (ms)" + )] + pub ethereum_polling_interval: usize, + #[clap( + long, + value_name = "mnemonic", + env = "MNEMONIC", + help = "Mnemonic for the operator wallet" + )] + pub mnemonic: String, + #[clap( + long, + value_name = "indexer-address", + env = "INDEXER_ADDRESS", + help = "Ethereum address of the indexer" + )] + pub indexer_address: Address, +} + +#[derive(Clone, Debug, Args, Serialize, Deserialize, Default)] +#[group(required = true, multiple = true)] +pub struct IndexerInfrastructure { + #[clap( + long, + value_name = "port", + env = "PORT", + default_value_t = 7600, + help = "Port to serve queries at" + )] + pub port: u32, + #[clap( + long, + value_name = "metrics-port", + env = "METRICS_PORT", + default_value_t = 7300, + help = "Port to serve Prometheus metrics at" + )] + pub metrics_port: u16, + #[clap( + long, + value_name = "graph-node-query-endpoint", + env = "GRAPH_NODE_QUERY_ENDPOINT", + default_value_t = String::from("http://0.0.0.0:8000"), + help = "Graph node GraphQL HTTP service endpoint" + )] + pub graph_node_query_endpoint: String, + #[clap( + long, + value_name = "graph-node-status-endpoint", + env = "GRAPH_NODE_STATUS_ENDPOINT", + default_value_t = String::from("http://0.0.0.0:8030"), + help = "Graph node endpoint for the index node server" + )] + pub graph_node_status_endpoint: String, + #[clap( + long, + value_name = "log-level", + env = "LOG_LEVEL", + value_enum, + help = "Log level in RUST_LOG format" + )] + pub log_level: Option, + #[clap( + long, + value_name = "gcloud-profiling", + env = "GCLOUD_PROFILING", + default_value_t = false, + help = "Whether to enable Google Cloud profiling" + )] + pub gcloud_profiling: bool, + #[clap( + long, + value_name = "free-query-auth-token", + env = "FREE_QUERY_AUTH_TOKEN", + help = "Auth token that clients can use to query for free" + )] + pub free_query_auth_token: Option, +} + +#[derive(Clone, Debug, Args, Serialize, Deserialize, Default)] +#[group(required = true, multiple = true)] +pub struct Postgres { + #[clap( + long, + value_name = "postgres-host", + env = "POSTGRES_HOST", + default_value_t = String::from("http://0.0.0.0/"), + help = "Postgres host" + )] + pub postgres_host: String, + #[clap( + long, + value_name = "postgres-port", + env = "POSTGRES_PORT", + default_value_t = 5432, + help = "Postgres port" + )] + pub postgres_port: usize, + #[clap( + long, + value_name = "postgres-database", + env = "POSTGRES_DATABASE", + help = "Postgres database name" + )] + pub postgres_database: String, + #[clap( + long, + value_name = "postgres-username", + env = "POSTGRES_USERNAME", + default_value_t = String::from("postgres"), + help = "Postgres username" + )] + pub postgres_username: String, + #[clap( + long, + value_name = "postgres-password", + env = "POSTGRES_PASSWORD", + default_value_t = String::from(""), + help = "Postgres password" + )] + pub postgres_password: String, +} + +#[derive(Clone, Debug, Args, Serialize, Deserialize, Default)] +#[group(required = true, multiple = true)] +pub struct NetworkSubgraph { + #[clap( + long, + value_name = "network-subgraph-deployment", + env = "NETWORK_SUBGRAPH_DEPLOYMENT", + help = "Network subgraph deployment" + )] + pub network_subgraph_deployment: Option, + #[clap( + long, + value_name = "network-subgraph-endpoint", + env = "NETWORK_SUBGRAPH_ENDPOINT", + default_value_t = String::from("https://api.thegraph.com/subgraphs/name/graphprotocol/graph-network-goerli"), + help = "Endpoint to query the network subgraph from" + )] + pub network_subgraph_endpoint: String, + #[clap( + long, + value_name = "network-subgraph-auth-token", + env = "NETWORK_SUBGRAPH_AUTH_TOKEN", + help = "Bearer token to require for /network queries" + )] + pub network_subgraph_auth_token: Option, + #[clap( + long, + value_name = "serve-network-subgraph", + env = "SERVE_NETWORK_SUBGRAPH", + default_value_t = false, + help = "Whether to serve the network subgraph at /network" + )] + pub serve_network_subgraph: bool, + #[clap( + long, + value_name = "allocation-syncing-interval", + env = "ALLOCATION_SYNCING_INTERVAL", + default_value_t = 120_000, + help = "Interval (in ms) for syncing indexer allocations from the network" + )] + pub allocation_syncing_interval: u64, + #[clap( + long, + value_name = "client-signer-address", + env = "CLIENT_SIGNER_ADDRESS", + help = "Address that signs query fee receipts from a known client" + )] + pub client_signer_address: Option, +} + +#[derive(Clone, Debug, Args, Serialize, Deserialize, Default)] +#[group(required = true, multiple = true)] +pub struct EscrowSubgraph { + #[clap( + long, + value_name = "escrow-subgraph-deployment", + env = "ESCROW_SUBGRAPH_DEPLOYMENT", + help = "Escrow subgraph deployment" + )] + pub escrow_subgraph_deployment: String, + // TODO: + // + // #[clap( + // long, + // value_name = "escrow-subgraph-endpoint", + // env = "ESCROW_SUBGRAPH_ENDPOINT", + // // TODO: + // // default_value_t = String::from("https://api.thegraph.com/subgraphs/name/?????????????"), + // help = "Endpoint to query the network subgraph from" + // )] + // pub escrow_subgraph_endpoint: Option, + // #[clap( + // long, + // value_name = "escrow-subgraph-auth-token", + // env = "ESCROW_SUBGRAPH_AUTH_TOKEN", + // help = "Bearer token to require for /network queries" + // )] + // pub escrow_subgraph_auth_token: Option, + // #[clap( + // long, + // value_name = "serve-escrow-subgraph", + // env = "SERVE_ESCROW_SUBGRAPH", + // default_value_t = false, + // help = "Whether to serve the escrow subgraph at /escrow" + // )] + // pub serve_escrow_subgraph: bool, + // #[clap( + // long, + // value_name = "escrow-syncing-interval", + // env = "ESCROW_SYNCING_INTERVAL", + // default_value_t = 120_000, + // help = "Interval (in ms) for syncing indexer escrow accounts from the escrow subgraph" + // )] + pub escrow_syncing_interval: u64, +} + +/// Sets up tracing, allows log level to be set from the environment variables +fn init_tracing(format: String) -> Result<(), SetGlobalDefaultError> { + let filter = EnvFilter::from_default_env(); + + let subscriber_builder: tracing_subscriber::fmt::SubscriberBuilder< + tracing_subscriber::fmt::format::DefaultFields, + tracing_subscriber::fmt::format::Format, + EnvFilter, + > = FmtSubscriber::builder().with_env_filter(filter); + + match format.as_str() { + "json" => set_global_default(subscriber_builder.json().finish()), + "full" => set_global_default(subscriber_builder.finish()), + "compact" => set_global_default(subscriber_builder.compact().finish()), + _ => set_global_default(subscriber_builder.with_ansi(true).pretty().finish()), + } +} + +impl Cli { + /// Parse config arguments + /// If environmental variable for config is set to a valid config file path, then parse from config + /// Otherwise parse from command line arguments + pub fn args() -> Self { + dotenv().ok(); + + let cli = if let Ok(file_path) = std::env::var("config") { + confy::load_path::(file_path.clone()) + .unwrap_or_else(|_| panic!("Parse config file at {}", file_path.clone())) + } else { + Cli::parse() + // Potentially store it for the user + // let _ = confy::store_path("./args.toml", cli.clone()); + }; + + // Enables tracing under RUST_LOG variable + if let Some(log_setting) = &cli.indexer_infrastructure.log_level { + std::env::set_var("RUST_LOG", log_setting); + }; + // add a LogFormat to config + init_tracing("pretty".to_string()).expect("Could not set up global default subscriber for logger, check environmental variable `RUST_LOG` or the CLI input `log-level`"); + cli + } +} + +#[derive(Debug, thiserror::Error)] +pub enum ConfigError { + #[error("Validate the input: {0}")] + ValidateInput(String), + #[error("Generate JSON representation of the config file: {0}")] + GenerateJson(serde_json::Error), + #[error("Toml file error: {0}")] + ReadStr(std::io::Error), + #[error("Unknown error: {0}")] + Other(anyhow::Error), +} + +#[derive( + Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Serialize, Deserialize, Default, +)] +pub enum LogLevel { + Trace, + #[default] + Debug, + Info, + Warn, + Error, + Fatal, +} diff --git a/tap_agent/src/database.rs b/tap_agent/src/database.rs new file mode 100644 index 000000000..5f0f7879c --- /dev/null +++ b/tap_agent/src/database.rs @@ -0,0 +1,36 @@ +// Copyright 2023-, GraphOps and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +// TODO: DEDUPLICATE + +use sqlx::{postgres::PgPoolOptions, PgPool}; + +use std::time::Duration; +use tracing::debug; + +use crate::config; + +pub async fn connect(config: &config::Postgres) -> PgPool { + let url = format!( + "postgresql://{}:{}@{}:{}/{}", + config.postgres_username, + config.postgres_password, + config.postgres_host, + config.postgres_port, + config.postgres_database + ); + + debug!( + postgres_host = tracing::field::debug(&config.postgres_host), + postgres_port = tracing::field::debug(&config.postgres_port), + postgres_database = tracing::field::debug(&config.postgres_database), + "Connecting to database" + ); + + PgPoolOptions::new() + .max_connections(50) + .acquire_timeout(Duration::from_secs(3)) + .connect(&url) + .await + .expect("Could not connect to DATABASE_URL") +} diff --git a/tap_agent/src/main.rs b/tap_agent/src/main.rs new file mode 100644 index 000000000..8b5cac7ee --- /dev/null +++ b/tap_agent/src/main.rs @@ -0,0 +1,39 @@ +// Copyright 2023-, GraphOps and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::Result; +use log::{debug, info}; +use tokio::signal::unix::{signal, SignalKind}; + +use crate::config::Cli; + +mod config; +mod database; +mod agent; +mod tap; + +#[tokio::main] +async fn main() -> Result<()> { + // Parse basic configurations, also initializes logging. + let config = Cli::args(); + + debug!("Config: {:?}", config); + + // START THE SERVER + + // Have tokio wait for SIGTERM or SIGINT. + let mut signal_sigint = signal(SignalKind::interrupt())?; + let mut signal_sigterm = signal(SignalKind::terminate())?; + tokio::select! { + _ = signal_sigint.recv() => debug!("Received SIGINT."), + _ = signal_sigterm.recv() => debug!("Received SIGTERM."), + } + + // If we're here, we've received a signal to exit. + info!("Shutting down..."); + + // Stop the server and wait for it to finish gracefully. + + debug!("Goodbye!"); + Ok(()) +} diff --git a/tap_agent/src/tap/escrow_adapter.rs b/tap_agent/src/tap/escrow_adapter.rs new file mode 100644 index 000000000..344d527a4 --- /dev/null +++ b/tap_agent/src/tap/escrow_adapter.rs @@ -0,0 +1,169 @@ +/// TODO: Implement the escrow adapter. This is only a basic mock implementation. +use std::{collections::HashMap, sync::Arc}; + +use alloy_primitives::Address; +use async_trait::async_trait; +use thiserror::Error; + +use tap_core::adapters::escrow_adapter::EscrowAdapter as EscrowAdapterTrait; +use tokio::sync::RwLock; +use indexer_common::prelude::EscrowMonitor; + +/// This is Arc internally, so it can be cloned and shared between threads. +#[cfg_attr(test, faux::create)] +#[derive(Clone, Debug)] +pub struct EscrowAdapter { + escrow_monitor: EscrowMonitor, + gateway_pending_fees: Arc>>, +} + +#[derive(Debug, Error)] +pub enum AdapterError { + #[error("something went wrong: {error}")] + AdapterError { error: String }, +} + +// TODO: Implement escrow subgraph polling. +#[cfg_attr(test, faux::methods)] +impl EscrowAdapter { + pub fn new(escrow_monitor: EscrowMonitor + ) -> Self { + Self { + escrow_monitor, + gateway_pending_fees: Arc::new(RwLock::new(HashMap::new())), + } + } +} + +#[cfg_attr(test, faux::methods)] +#[async_trait] +impl EscrowAdapterTrait for EscrowAdapter { + type AdapterError = AdapterError; + + async fn get_available_escrow(&self, gateway_id: Address) -> Result { + let balance = self + .escrow_monitor.get_account_balance(&gateway_id).await + .ok_or(AdapterError::AdapterError { + error: format!( + "Gateway {} not found in escrow balances map, could not get available escrow.", + gateway_id + ) + .to_string(), + })?; + let balance: u128 = balance.try_into().map_err(|_| AdapterError::AdapterError { + error: format!( + "Gateway {} escrow balance is too large to fit in u128, could not get available escrow.", + gateway_id + ) + .to_string(), + })?; + let fees = self + .gateway_pending_fees + .read() + .await + .get(&gateway_id) + .copied() + .ok_or(AdapterError::AdapterError { + error: format!( + "Gateway {} not found in pending fees map, could not get available escrow.", + gateway_id + ) + .to_string(), + })?; + + Ok(balance - fees) + } + + async fn subtract_escrow(&self, gateway_id: Address, value: u128) -> Result<(), AdapterError> { + let current_available_escrow = self.get_available_escrow(gateway_id).await?; + + let mut fees_write = self.gateway_pending_fees.write().await; + + let fees = fees_write + .get_mut(&gateway_id) + .ok_or(AdapterError::AdapterError { + error: format!( + "Gateway {} not found in pending fees map, could not subtract available escrow.", + gateway_id + ) + .to_string(), + })?; + + if current_available_escrow < value { + return Err(AdapterError::AdapterError { + error: format!( + "Gateway {} does not have enough escrow to subtract {} from {}.", + gateway_id, value, *fees + ) + .to_string(), + }); + } + + *fees += value; + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use std::str::FromStr; + + use ethereum_types::U256; + + use super::*; + + #[tokio::test] + async fn test_subtract_escrow() { + let gateway_id = Address::from_str("0xdeadbeefcafebabedeadbeefcafebabadeadbeef").unwrap(); + let gateway_pending_fees = Arc::new(RwLock::new(HashMap::new())); + let mut escrow_monitor_mock = EscrowMonitor::faux(); + + faux::when!(escrow_monitor_mock.get_account_balance(gateway_id)).then_return(Some(U256::from(1000))); + gateway_pending_fees.write().await.insert(gateway_id, 500); + + let adapter = _FauxOriginal_EscrowAdapter { + escrow_monitor: escrow_monitor_mock, + gateway_pending_fees: gateway_pending_fees.clone(), + }; + + adapter + .subtract_escrow(gateway_id, 500) + .await + .expect("Subtract escrow."); + + let available_escrow = adapter + .get_available_escrow(gateway_id) + .await + .expect("Get available escrow."); + assert_eq!(available_escrow, 0); + } + + #[tokio::test] + async fn test_subtract_escrow_overflow() { + let gateway_id = Address::from_str("0xdeadbeefcafebabedeadbeefcafebabadeadbeef").unwrap(); + let gateway_pending_fees = Arc::new(RwLock::new(HashMap::new())); + let mut escrow_monitor_mock = EscrowMonitor::faux(); + + faux::when!(escrow_monitor_mock.get_account_balance(gateway_id)).then_return(Some(U256::from(1000))); + gateway_pending_fees.write().await.insert(gateway_id, 500); + + let adapter = _FauxOriginal_EscrowAdapter { + escrow_monitor: escrow_monitor_mock, + gateway_pending_fees: gateway_pending_fees.clone(), + }; + + adapter + .subtract_escrow(gateway_id, 250) + .await + .expect("Subtract escrow."); + + assert!(adapter.subtract_escrow(gateway_id, 251).await.is_err()); + + let available_escrow = adapter + .get_available_escrow(gateway_id) + .await + .expect("Get available escrow."); + assert_eq!(available_escrow, 250); + } +} diff --git a/tap_agent/src/tap/mod.rs b/tap_agent/src/tap/mod.rs new file mode 100644 index 000000000..37b49f677 --- /dev/null +++ b/tap_agent/src/tap/mod.rs @@ -0,0 +1,9 @@ +pub use tap_core as core; + +pub mod escrow_adapter; +// pub mod rav_storage_adapter; +// pub mod receipt_checks_adapter; +pub mod receipt_storage_adapter; + +#[cfg(test)] +pub mod test_utils; diff --git a/tap_agent/src/tap/rav_storage_adapter.rs b/tap_agent/src/tap/rav_storage_adapter.rs new file mode 100644 index 000000000..3fad475b0 --- /dev/null +++ b/tap_agent/src/tap/rav_storage_adapter.rs @@ -0,0 +1,192 @@ +use async_trait::async_trait; +use log::debug; +use std::sync::Arc; + +use alloy_primitives::Address; +use anyhow::Result; +use sqlx::postgres::PgListener; +use sqlx::PgPool; +use tap_core::adapters::rav_storage_adapter::RAVStorageAdapter as RAVStorageAdapterTrait; +use tap_core::tap_manager::SignedRAV; +use thiserror::Error; +use tokio::sync::RwLock; + +#[derive(Debug)] +pub struct RAVStorageAdapter { + pgpool: PgPool, + local_rav_storage: Arc>>, + allocation_id: Address, + #[allow(dead_code)] // Silence "field is never read" + rav_notifications_watcher_handle: tokio::task::JoinHandle>, +} + +#[derive(Debug, Error)] +pub enum AdapterError { + #[error("something went wrong: {error}")] + AdapterError { error: String }, +} + +#[async_trait] +impl RAVStorageAdapterTrait for RAVStorageAdapter { + type AdapterError = AdapterError; + + async fn update_last_rav(&self, rav: SignedRAV) -> Result<(), Self::AdapterError> { + let _fut = sqlx::query!( + r#" + INSERT INTO scalar_tap_latest_rav (allocation_id, latest_rav) + VALUES ($1, $2) + ON CONFLICT (allocation_id) + DO UPDATE SET latest_rav = $2 + "#, + self.allocation_id.to_string(), + serde_json::to_value(rav).map_err(|e| AdapterError::AdapterError { + error: e.to_string() + })? + ) + .execute(&self.pgpool) + .await + .map_err(|e| AdapterError::AdapterError { + error: e.to_string(), + })?; + + Ok(()) + } + async fn last_rav(&self) -> Result, Self::AdapterError> { + Ok(self.local_rav_storage.read().await.clone()) + } +} + +impl RAVStorageAdapter { + /// Static version of `retrieve_last_rav` that can be used by a `tokio::spawn`ed task. + async fn retrieve_last_rav_static( + pgpool: PgPool, + allocation_id: Address, + local_rav_storage: Arc>>, + ) -> Result<()> { + let latest_rav = sqlx::query!( + r#" + SELECT latest_rav + FROM scalar_tap_latest_rav + WHERE allocation_id = $1 + "#, + allocation_id.to_string() + ) + .fetch_optional(&pgpool) + .await + .map(|r| r.map(|r| r.latest_rav))?; + + if let Some(latest_rav) = latest_rav { + let latest_rav: SignedRAV = serde_json::from_value(latest_rav)?; + local_rav_storage.write().await.replace(latest_rav); + } + + Ok(()) + } + + pub async fn retrieve_last_rav(&self) -> Result<()> { + RAVStorageAdapter::retrieve_last_rav_static( + self.pgpool.clone(), + self.allocation_id, + self.local_rav_storage.clone(), + ) + .await + } + + /// This function is meant to be spawned as a task that listens for new RAV notifications from the database. + async fn rav_notifications_watcher( + pgpool: PgPool, + allocation_id: Address, + local_rav_storage: Arc>>, + ) -> Result<()> { + // TODO: make this async thread more robust with a retry mechanism and a backoff + let mut listener = PgListener::connect_with(&pgpool).await?; + listener.listen("scalar_tap_rav_notification").await?; + loop { + let notification = listener.recv().await?; + debug!("Received notification: {:?}", notification); + RAVStorageAdapter::retrieve_last_rav_static( + pgpool.clone(), + allocation_id, + local_rav_storage.clone(), + ) + .await?; + } + } + + pub async fn new(pgpool: PgPool, allocation_id: Address) -> Result { + let local_rav_storage: Arc>> = Arc::new(RwLock::new(None)); + + let rav_storage_adapter = RAVStorageAdapter { + pgpool: pgpool.clone(), + local_rav_storage: local_rav_storage.clone(), + allocation_id, + rav_notifications_watcher_handle: tokio::spawn( + RAVStorageAdapter::rav_notifications_watcher( + pgpool.clone(), + allocation_id, + local_rav_storage.clone(), + ), + ), + }; + + rav_storage_adapter.retrieve_last_rav().await.unwrap(); + + Ok(rav_storage_adapter) + } +} + +#[cfg(test)] +mod test { + use std::str::FromStr; + + use tap_core::adapters::rav_storage_adapter::RAVStorageAdapter as RAVStorageAdapterTrait; + + use crate::test_utils::create_rav; + + use super::*; + + #[sqlx::test] + async fn update_and_retrieve_rav(pool: PgPool) { + let allocation_id = + Address::from_str("0xabababababababababababababababababababab").unwrap(); + let timestamp_ns = u64::MAX - 10; + let value_aggregate = u128::MAX; + let rav_storage_adapter = RAVStorageAdapter::new(pool.clone(), allocation_id) + .await + .unwrap(); + + // Insert a rav + let mut new_rav = create_rav(allocation_id, timestamp_ns, value_aggregate).await; + rav_storage_adapter + .update_last_rav(new_rav.clone()) + .await + .unwrap(); + + // Wait for the Postgres RAV notification to be processed + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + // Should trigger a retrieve_last_rav + // So eventually the last rav should be the one we inserted + let last_rav = rav_storage_adapter.last_rav().await.unwrap(); + assert_eq!(new_rav, last_rav.unwrap()); + + // Update the RAV 3 times in quick succession + for i in 0..3 { + new_rav = create_rav( + allocation_id, + timestamp_ns + i, + value_aggregate - (i as u128), + ) + .await; + rav_storage_adapter + .update_last_rav(new_rav.clone()) + .await + .unwrap(); + } + + // Check that the last rav is the last one we inserted + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + let last_rav = rav_storage_adapter.last_rav().await.unwrap(); + assert_eq!(new_rav, last_rav.unwrap()); + } +} diff --git a/tap_agent/src/tap/receipt_checks_adapter.rs b/tap_agent/src/tap/receipt_checks_adapter.rs new file mode 100644 index 000000000..75f709a70 --- /dev/null +++ b/tap_agent/src/tap/receipt_checks_adapter.rs @@ -0,0 +1,175 @@ +// Copyright 2023-, Semiotic AI, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; + +use alloy_primitives::Address; +use async_trait::async_trait; +use sqlx::PgPool; +use tap_core::adapters::receipt_checks_adapter::ReceiptChecksAdapter as ReceiptChecksAdapterTrait; +use tap_core::{eip_712_signed_message::EIP712SignedMessage, tap_receipt::Receipt}; +use thiserror::Error; +use tokio::sync::RwLock; + +use crate::escrow_adapter::EscrowAdapter; + +#[derive(Debug)] +pub struct ReceiptChecksAdapter { + pgpool: PgPool, + query_appraisals: Option>>>, + allocation_ids: Arc>>, + escrow_adapter: EscrowAdapter, +} + +impl ReceiptChecksAdapter { + pub fn new( + pgpool: PgPool, + query_appraisals: Option>>>, + allocation_ids: Arc>>, + escrow_adapter: EscrowAdapter, + ) -> Self { + Self { + pgpool, + query_appraisals, + allocation_ids, + escrow_adapter, + } + } +} + +#[derive(Debug, Error)] +pub enum AdapterError { + #[error("something went wrong: {error}")] + AdapterError { error: String }, +} + +#[async_trait] +impl ReceiptChecksAdapterTrait for ReceiptChecksAdapter { + type AdapterError = AdapterError; + + async fn is_unique( + &self, + receipt: &EIP712SignedMessage, + receipt_id: u64, + ) -> Result { + let record = sqlx::query!( + r#" + SELECT id + FROM scalar_tap_receipts + WHERE id != $1 and signature = $2 + LIMIT 1 + "#, + TryInto::::try_into(receipt_id).map_err(|e| AdapterError::AdapterError { + error: e.to_string(), + })?, + receipt.signature.to_string() + ) + .fetch_optional(&self.pgpool) + .await + .map_err(|e| AdapterError::AdapterError { + error: e.to_string(), + })?; + + Ok(record.is_none()) + } + + async fn is_valid_allocation_id( + &self, + allocation_id: Address, + ) -> Result { + let allocation_ids = self.allocation_ids.read().await; + Ok(allocation_ids.contains(&allocation_id)) + } + + async fn is_valid_value(&self, value: u128, query_id: u64) -> Result { + let query_appraisals = self.query_appraisals.as_ref().expect( + "Query appraisals should be initialized. The opposite should never happen when receipts value checking is enabled." + ); + let query_appraisals_read = query_appraisals.read().await; + let appraised_value = + query_appraisals_read + .get(&query_id) + .ok_or_else(|| AdapterError::AdapterError { + error: "No appraised value found for query".to_string(), + })?; + + if value != *appraised_value { + return Ok(false); + } + Ok(true) + } + + async fn is_valid_gateway_id(&self, gateway_id: Address) -> Result { + Ok(self.escrow_adapter.is_valid_gateway_id(gateway_id).await) + } +} + +#[cfg(test)] +mod test { + use std::collections::{HashMap, HashSet}; + use std::str::FromStr; + + use faux::when; + use tap_core::adapters::receipt_storage_adapter::ReceiptStorageAdapter as ReceiptStorageAdapterTrait; + + use crate::escrow_adapter; + use crate::receipt_storage_adapter::ReceiptStorageAdapter; + use crate::test_utils::{create_received_receipt, keys}; + + use super::*; + + #[sqlx::test] + async fn is_unique(pgpool: PgPool) { + let allocation_id = + Address::from_str("0xabababababababababababababababababababab").unwrap(); + let allocation_ids = Arc::new(RwLock::new(HashSet::new())); + allocation_ids.write().await.insert(allocation_id); + let (_, address) = keys(); + + let query_appraisals: Arc>> = + Arc::new(RwLock::new(HashMap::new())); + + let gateway_escrow_balance = Arc::new(RwLock::new(HashMap::new())); + gateway_escrow_balance.write().await.insert(address, 10000); + let mut escrow_adapter = escrow_adapter::EscrowAdapter::faux(); + when!(escrow_adapter.is_valid_gateway_id(address)).then_return(true); + + let rav_storage_adapter = ReceiptStorageAdapter::new(pgpool.clone(), allocation_id); + let receipt_checks_adapter = ReceiptChecksAdapter::new( + pgpool.clone(), + Some(query_appraisals), + allocation_ids, + escrow_adapter, + ); + + // Insert 3 unique receipts + for i in 0..3 { + let received_receipt = create_received_receipt(allocation_id, i, i, i as u128, i).await; + let receipt_id = rav_storage_adapter + .store_receipt(received_receipt.clone()) + .await + .unwrap(); + + assert!(receipt_checks_adapter + .is_unique(&received_receipt.signed_receipt(), receipt_id) + .await + .unwrap()); + } + + // Insert a duplicate receipt + let received_receipt = create_received_receipt(allocation_id, 1, 1, 1, 3).await; + let receipt_id = rav_storage_adapter + .store_receipt(received_receipt.clone()) + .await + .unwrap(); + assert!( + !(receipt_checks_adapter + .is_unique(&received_receipt.signed_receipt(), receipt_id) + .await + .unwrap()) + ); + } +} diff --git a/tap_agent/src/tap/receipt_storage_adapter.rs b/tap_agent/src/tap/receipt_storage_adapter.rs new file mode 100644 index 000000000..27330f4f7 --- /dev/null +++ b/tap_agent/src/tap/receipt_storage_adapter.rs @@ -0,0 +1,819 @@ +// Copyright 2023-, Semiotic AI, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::{ + num::TryFromIntError, + ops::{Bound, RangeBounds}, +}; + +use alloy_primitives::Address; +use async_trait::async_trait; +use sqlx::{postgres::types::PgRange, types::BigDecimal, PgPool}; +use tap_core::{adapters::receipt_storage_adapter::ReceiptStorageAdapter as ReceiptStorageAdapterTrait, tap_manager::SignedReceipt, tap_receipt::get_full_list_of_checks}; +use tap_core::tap_receipt::ReceivedReceipt; +use thiserror::Error; + + +#[derive(Debug)] +pub struct ReceiptStorageAdapter { + pgpool: PgPool, + allocation_id: Address, +} + +#[derive(Debug, Error)] +pub enum AdapterError { + #[error("something went wrong: {error}")] + AdapterError { error: String }, +} + +impl From for AdapterError { + fn from(error: TryFromIntError) -> Self { + AdapterError::AdapterError { + error: error.to_string(), + } + } +} +impl From for AdapterError { + fn from(error: sqlx::Error) -> Self { + AdapterError::AdapterError { + error: error.to_string(), + } + } +} +impl From for AdapterError { + fn from(error: serde_json::Error) -> Self { + AdapterError::AdapterError { + error: error.to_string(), + } + } +} + +// convert Bound to Bound +fn u64_bound_to_bigdecimal_bound(bound: Bound<&u64>) -> Bound { + match bound { + Bound::Included(val) => Bound::Included(BigDecimal::from(*val)), + Bound::Excluded(val) => Bound::Excluded(BigDecimal::from(*val)), + Bound::Unbounded => Bound::Unbounded, + } +} + +// convert RangeBounds to PgRange +fn rangebounds_to_pgrange>(range: R) -> PgRange { + PgRange::::from(( + u64_bound_to_bigdecimal_bound(range.start_bound()), + u64_bound_to_bigdecimal_bound(range.end_bound()), + )) +} + +#[async_trait] +impl ReceiptStorageAdapterTrait for ReceiptStorageAdapter { + type AdapterError = AdapterError; + + async fn store_receipt(&self, _receipt: ReceivedReceipt) -> Result { + panic!("Not implemented"); + // let signed_receipt = receipt.signed_receipt(); + + // let record = sqlx::query!( + // r#" + // INSERT INTO scalar_tap_receipts (signature, allocation_id, timestamp_ns, receipt) + // VALUES ($1, $2, $3, $4) + // RETURNING id + // "#, + // signed_receipt.signature.to_string(), + // self.allocation_id.to_string(), + // BigDecimal::from(signed_receipt.message.timestamp_ns), + // serde_json::to_value(receipt)? + // ).fetch_one(&self.pgpool).await?; + + // // id is BIGSERIAL, so it should be safe to cast to u64. + // let id: u64 = record.id.try_into()?; + // Ok(id) + } + + async fn retrieve_receipts_in_timestamp_range + Send>( + &self, + timestamp_range_ns: R, + ) -> Result, Self::AdapterError> { + let records = sqlx::query!( + r#" + SELECT id, receipt + FROM scalar_tap_receipts + WHERE allocation_id = $1 AND $2::numrange @> timestamp_ns + "#, + self.allocation_id.to_string(), + rangebounds_to_pgrange(timestamp_range_ns), + ) + .fetch_all(&self.pgpool) + .await?; + + records + .into_iter() + .map(|record| { + let id: u64 = record.id.try_into()?; + let signed_receipt: SignedReceipt = + serde_json::from_value(record.receipt)?; + let received_receipt = ReceivedReceipt::new(signed_receipt, id, &get_full_list_of_checks()); + Ok((id, received_receipt)) + }) + .collect() + } + + async fn update_receipt_by_id( + &self, + _receipt_id: u64, + _receipt: ReceivedReceipt, + ) -> Result<(), Self::AdapterError> { + panic!("Not implemented"); + + // let _signed_receipt = receipt.signed_receipt(); + + // let _record = sqlx::query!( + // r#" + // UPDATE scalar_tap_receipts + // SET receipt = $1 + // WHERE id = $2 + // RETURNING id + // "#, + // serde_json::to_value(receipt)?, + // TryInto::::try_into(receipt_id)? + // ) + // .fetch_one(&self.pgpool) + // .await?; + + // Ok(()) + } + + async fn remove_receipts_in_timestamp_range + Send>( + &self, + timestamp_ns: R, + ) -> Result<(), Self::AdapterError> { + sqlx::query!( + r#" + DELETE FROM scalar_tap_receipts + WHERE $1::numrange @> timestamp_ns + "#, + rangebounds_to_pgrange(timestamp_ns), + ) + .execute(&self.pgpool) + .await?; + Ok(()) + } +} + +impl ReceiptStorageAdapter { + pub fn new(pgpool: PgPool, allocation_id: Address) -> Self { + Self { + pgpool, + allocation_id, + } + } +} + +#[cfg(test)] +mod test { + use std::str::FromStr; + + use anyhow::Result; + use sqlx::PgPool; + + use crate::tap::test_utils::create_received_receipt; + + use super::*; + + // fixture + async fn store_receipt(pgpool: &PgPool, allocation_id: &Address, signed_receipt: SignedReceipt) -> Result { + let record = sqlx::query!( + r#" + INSERT INTO scalar_tap_receipts (allocation_id, timestamp_ns, receipt) + VALUES ($1, $2, $3) + RETURNING id + "#, + allocation_id.to_string(), + BigDecimal::from(signed_receipt.message.timestamp_ns), + serde_json::to_value(signed_receipt)? + ).fetch_one(pgpool).await?; + + // id is BIGSERIAL, so it should be safe to cast to u64. + let id: u64 = record.id.try_into()?; + Ok(id) + } + + // #[sqlx::test] + // async fn store_receipt(pgpool: PgPool) { + // let allocation_id = + // Address::from_str("0xabababababababababababababababababababab").unwrap(); + // let received_receipt = create_received_receipt(allocation_id, 0, 42, 124, 0).await; + + // let storage_adapter = ReceiptStorageAdapter::new(pgpool, allocation_id); + + // let receipt_id = storage_adapter + // .store_receipt(received_receipt.clone()) + // .await + // .unwrap(); + + // let recovered_received_receipt_vec = storage_adapter + // .retrieve_receipts_in_timestamp_range(..) + // .await + // .unwrap(); + // assert_eq!(recovered_received_receipt_vec.len(), 1); + + // let (recovered_receipt_id, recovered_received_receipt) = &recovered_received_receipt_vec[0]; + // assert_eq!(*recovered_receipt_id, receipt_id); + // // Check that the recovered receipt is the same as the original receipt using serde_json. + // assert_eq!( + // serde_json::to_value(recovered_received_receipt).unwrap(), + // serde_json::to_value(received_receipt).unwrap() + // ); + // } + + /// This function compares a local receipts vector filter by timestamp range (we assume that the stdlib + /// implementation is correct) with the receipts vector retrieved from the database using + /// retrieve_receipts_in_timestamp_range. + async fn retrieve_range_and_check + Send>( + storage_adapter: &ReceiptStorageAdapter, + received_receipt_vec: &[(u64, ReceivedReceipt)], + range: R, + ) -> Result<()> { + // Filtering the received receipts by timestamp range + let received_receipt_vec: Vec<(u64, ReceivedReceipt)> = received_receipt_vec + .iter() + .filter(|(_, received_receipt)| { + range.contains(&received_receipt.signed_receipt().message.timestamp_ns) + }) + .cloned() + .collect(); + + // Retrieving receipts in timestamp range from the database + let mut recovered_received_receipt_vec = storage_adapter + .retrieve_receipts_in_timestamp_range(range) + .await?; + + // Sorting the recovered receipts by id + recovered_received_receipt_vec.sort_by(|(id1, _), (id2, _)| id1.cmp(id2)); + + // Checking + for (received_receipt, recovered_received_receipt) in received_receipt_vec + .iter() + .zip(recovered_received_receipt_vec.iter()) + { + if serde_json::to_value(recovered_received_receipt)? + != serde_json::to_value(received_receipt)? + { + return Err(anyhow::anyhow!("Receipts do not match")); + } + } + + Ok(()) + } + + async fn remove_range_and_check + Send>( + storage_adapter: &ReceiptStorageAdapter, + received_receipt_vec: &[ReceivedReceipt], + range: R, + ) -> Result<()> { + // Storing the receipts + let mut received_receipt_id_vec = Vec::new(); + for received_receipt in received_receipt_vec.iter() { + received_receipt_id_vec.push( + store_receipt(&storage_adapter.pgpool, &storage_adapter.allocation_id, received_receipt.signed_receipt()) + .await + .unwrap(), + ); + } + + // zip the 2 vectors together + let received_receipt_vec = received_receipt_id_vec + .into_iter() + .zip(received_receipt_vec.iter()) + .collect::>(); + + // Remove the received receipts by timestamp range + let received_receipt_vec: Vec<(u64, &ReceivedReceipt)> = received_receipt_vec + .iter() + .filter(|(_, received_receipt)| { + !range.contains(&received_receipt.signed_receipt().message.timestamp_ns) + }) + .cloned() + .collect(); + + // Removing the received receipts in timestamp range from the database + storage_adapter + .remove_receipts_in_timestamp_range(range) + .await?; + + // Retrieving all receipts + let mut recovered_received_receipt_vec = storage_adapter + .retrieve_receipts_in_timestamp_range(..) + .await?; + + // Sorting the recovered receipts by id + recovered_received_receipt_vec.sort_by(|(id1, _), (id2, _)| id1.cmp(id2)); + + // Checking + for (received_receipt, recovered_received_receipt) in received_receipt_vec + .iter() + .zip(recovered_received_receipt_vec.iter()) + { + if serde_json::to_value(recovered_received_receipt)? + != serde_json::to_value(received_receipt)? + { + return Err(anyhow::anyhow!("Receipts do not match")); + } + } + + // Removing the rest of the receipts + storage_adapter + .remove_receipts_in_timestamp_range(..) + .await?; + + // Checking that there are no receipts left + let recovered_received_receipt_vec = storage_adapter + .retrieve_receipts_in_timestamp_range(..) + .await?; + assert_eq!(recovered_received_receipt_vec.len(), 0); + + Ok(()) + } + + #[sqlx::test] + async fn retrieve_receipts_in_timestamp_range(pgpool: PgPool) { + let allocation_id = + Address::from_str("0xabababababababababababababababababababab").unwrap(); + let storage_adapter = ReceiptStorageAdapter::new(pgpool, allocation_id); + + // Creating 10 receipts with timestamps 42 to 51 + let mut received_receipt_vec = Vec::new(); + for i in 0..10 { + received_receipt_vec.push( + create_received_receipt(allocation_id, i + 684, i + 42, (i + 124).into(), i).await, + ); + } + + // Storing the receipts + let mut received_receipt_id_vec = Vec::new(); + for received_receipt in received_receipt_vec.iter() { + received_receipt_id_vec.push( + store_receipt(&storage_adapter.pgpool, &storage_adapter.allocation_id, received_receipt.signed_receipt()) + .await + .unwrap(), + ); + } + + // zip the 2 vectors together + let received_receipt_vec = received_receipt_id_vec + .into_iter() + .zip(received_receipt_vec.into_iter()) + .collect::>(); + + assert!( + retrieve_range_and_check(&storage_adapter, &received_receipt_vec, ..) + .await + .is_ok() + ); + + assert!( + retrieve_range_and_check(&storage_adapter, &received_receipt_vec, ..45) + .await + .is_ok() + ); + + assert!( + retrieve_range_and_check(&storage_adapter, &received_receipt_vec, ..51) + .await + .is_ok() + ); + + assert!( + retrieve_range_and_check(&storage_adapter, &received_receipt_vec, ..75) + .await + .is_ok() + ); + + assert!( + retrieve_range_and_check(&storage_adapter, &received_receipt_vec, ..=45) + .await + .is_ok() + ); + + assert!( + retrieve_range_and_check(&storage_adapter, &received_receipt_vec, ..=51) + .await + .is_ok() + ); + + assert!( + retrieve_range_and_check(&storage_adapter, &received_receipt_vec, ..=75) + .await + .is_ok() + ); + + assert!( + retrieve_range_and_check(&storage_adapter, &received_receipt_vec, 21..=45) + .await + .is_ok() + ); + + assert!( + retrieve_range_and_check(&storage_adapter, &received_receipt_vec, 45..=45) + .await + .is_ok() + ); + + assert!( + retrieve_range_and_check(&storage_adapter, &received_receipt_vec, 21..=51) + .await + .is_ok() + ); + + assert!( + retrieve_range_and_check(&storage_adapter, &received_receipt_vec, 45..=51) + .await + .is_ok() + ); + + assert!( + retrieve_range_and_check(&storage_adapter, &received_receipt_vec, 21..=75) + .await + .is_ok() + ); + + assert!( + retrieve_range_and_check(&storage_adapter, &received_receipt_vec, 45..=75) + .await + .is_ok() + ); + + assert!( + retrieve_range_and_check(&storage_adapter, &received_receipt_vec, 51..=75) + .await + .is_ok() + ); + + assert!( + retrieve_range_and_check(&storage_adapter, &received_receipt_vec, 70..=75) + .await + .is_ok() + ); + + assert!( + retrieve_range_and_check(&storage_adapter, &received_receipt_vec, 21..45) + .await + .is_ok() + ); + + assert!( + retrieve_range_and_check(&storage_adapter, &received_receipt_vec, 45..45) + .await + .is_ok() + ); + + assert!( + retrieve_range_and_check(&storage_adapter, &received_receipt_vec, 21..51) + .await + .is_ok() + ); + + assert!( + retrieve_range_and_check(&storage_adapter, &received_receipt_vec, 45..51) + .await + .is_ok() + ); + + assert!( + retrieve_range_and_check(&storage_adapter, &received_receipt_vec, 21..75) + .await + .is_ok() + ); + + assert!( + retrieve_range_and_check(&storage_adapter, &received_receipt_vec, 45..75) + .await + .is_ok() + ); + + assert!( + retrieve_range_and_check(&storage_adapter, &received_receipt_vec, 51..75) + .await + .is_ok() + ); + + assert!( + retrieve_range_and_check(&storage_adapter, &received_receipt_vec, 70..75) + .await + .is_ok() + ); + + assert!( + retrieve_range_and_check(&storage_adapter, &received_receipt_vec, 21..) + .await + .is_ok() + ); + + assert!( + retrieve_range_and_check(&storage_adapter, &received_receipt_vec, 45..) + .await + .is_ok() + ); + + assert!( + retrieve_range_and_check(&storage_adapter, &received_receipt_vec, 21..) + .await + .is_ok() + ); + + assert!( + retrieve_range_and_check(&storage_adapter, &received_receipt_vec, 45..) + .await + .is_ok() + ); + + assert!( + retrieve_range_and_check(&storage_adapter, &received_receipt_vec, 21..) + .await + .is_ok() + ); + + assert!( + retrieve_range_and_check(&storage_adapter, &received_receipt_vec, 45..) + .await + .is_ok() + ); + + assert!( + retrieve_range_and_check(&storage_adapter, &received_receipt_vec, 51..) + .await + .is_ok() + ); + + assert!( + retrieve_range_and_check(&storage_adapter, &received_receipt_vec, 70..) + .await + .is_ok() + ); + } + + #[sqlx::test] + async fn remove_receipts_in_timestamp_range(pgpool: PgPool) { + let allocation_id = + Address::from_str("0xabababababababababababababababababababab").unwrap(); + let storage_adapter = ReceiptStorageAdapter::new(pgpool, allocation_id); + + // Creating 10 receipts with timestamps 42 to 51 + let mut received_receipt_vec = Vec::new(); + for i in 0..10 { + received_receipt_vec.push( + create_received_receipt(allocation_id, i + 684, i + 42, (i + 124).into(), i).await, + ); + } + + assert!( + remove_range_and_check(&storage_adapter, &received_receipt_vec, ..) + .await + .is_ok() + ); + + assert!( + remove_range_and_check(&storage_adapter, &received_receipt_vec, ..45) + .await + .is_ok() + ); + + assert!( + remove_range_and_check(&storage_adapter, &received_receipt_vec, ..51) + .await + .is_ok() + ); + + assert!( + remove_range_and_check(&storage_adapter, &received_receipt_vec, ..75) + .await + .is_ok() + ); + + assert!( + remove_range_and_check(&storage_adapter, &received_receipt_vec, ..=45) + .await + .is_ok() + ); + + assert!( + remove_range_and_check(&storage_adapter, &received_receipt_vec, ..=51) + .await + .is_ok() + ); + + assert!( + remove_range_and_check(&storage_adapter, &received_receipt_vec, ..=75) + .await + .is_ok() + ); + + assert!( + remove_range_and_check(&storage_adapter, &received_receipt_vec, 21..=45) + .await + .is_ok() + ); + + assert!( + remove_range_and_check(&storage_adapter, &received_receipt_vec, 45..=45) + .await + .is_ok() + ); + + assert!( + remove_range_and_check(&storage_adapter, &received_receipt_vec, 21..=51) + .await + .is_ok() + ); + + assert!( + remove_range_and_check(&storage_adapter, &received_receipt_vec, 45..=51) + .await + .is_ok() + ); + + assert!( + remove_range_and_check(&storage_adapter, &received_receipt_vec, 21..=75) + .await + .is_ok() + ); + + assert!( + remove_range_and_check(&storage_adapter, &received_receipt_vec, 45..=75) + .await + .is_ok() + ); + + assert!( + remove_range_and_check(&storage_adapter, &received_receipt_vec, 51..=75) + .await + .is_ok() + ); + + assert!( + remove_range_and_check(&storage_adapter, &received_receipt_vec, 70..=75) + .await + .is_ok() + ); + + assert!( + remove_range_and_check(&storage_adapter, &received_receipt_vec, 21..45) + .await + .is_ok() + ); + + assert!( + remove_range_and_check(&storage_adapter, &received_receipt_vec, 45..45) + .await + .is_ok() + ); + + assert!( + remove_range_and_check(&storage_adapter, &received_receipt_vec, 21..51) + .await + .is_ok() + ); + + assert!( + remove_range_and_check(&storage_adapter, &received_receipt_vec, 45..51) + .await + .is_ok() + ); + + assert!( + remove_range_and_check(&storage_adapter, &received_receipt_vec, 21..75) + .await + .is_ok() + ); + + assert!( + remove_range_and_check(&storage_adapter, &received_receipt_vec, 45..75) + .await + .is_ok() + ); + + assert!( + remove_range_and_check(&storage_adapter, &received_receipt_vec, 51..75) + .await + .is_ok() + ); + + assert!( + remove_range_and_check(&storage_adapter, &received_receipt_vec, 70..75) + .await + .is_ok() + ); + + assert!( + remove_range_and_check(&storage_adapter, &received_receipt_vec, 21..) + .await + .is_ok() + ); + + assert!( + remove_range_and_check(&storage_adapter, &received_receipt_vec, 45..) + .await + .is_ok() + ); + + assert!( + remove_range_and_check(&storage_adapter, &received_receipt_vec, 21..) + .await + .is_ok() + ); + + assert!( + remove_range_and_check(&storage_adapter, &received_receipt_vec, 45..) + .await + .is_ok() + ); + + assert!( + remove_range_and_check(&storage_adapter, &received_receipt_vec, 21..) + .await + .is_ok() + ); + + assert!( + remove_range_and_check(&storage_adapter, &received_receipt_vec, 45..) + .await + .is_ok() + ); + + assert!( + remove_range_and_check(&storage_adapter, &received_receipt_vec, 51..) + .await + .is_ok() + ); + + assert!( + remove_range_and_check(&storage_adapter, &received_receipt_vec, 70..) + .await + .is_ok() + ); + } + + // #[sqlx::test] + // async fn update_receipt_by_id(pgpool: PgPool) { + // let allocation_id = + // Address::from_str("0xabababababababababababababababababababab").unwrap(); + // let storage_adapter = ReceiptStorageAdapter::new(pgpool, allocation_id); + + // // Creating 10 receipts with timestamps 42 to 51 + // let mut received_receipt_vec = Vec::new(); + // for i in 0..10 { + // received_receipt_vec.push( + // create_received_receipt(allocation_id, i + 684, i + 42, (i + 124).into(), i).await, + // ); + // } + + // // Storing the receipts + // let mut received_receipt_id_vec = Vec::new(); + // for received_receipt in received_receipt_vec.iter() { + // received_receipt_id_vec.push( + // storage_adapter + // .store_receipt(received_receipt.clone()) + // .await + // .unwrap(), + // ); + // } + + // // zip the 2 vectors together + // let mut received_receipt_vec = received_receipt_id_vec + // .into_iter() + // .zip(received_receipt_vec.into_iter()) + // .collect::>(); + + // // updating a receipt using an non-existing id + // assert!(storage_adapter + // .update_receipt_by_id(123456, received_receipt_vec[0].1.clone()) + // .await + // .is_err()); + + // // updating a receipt using an existing id + // storage_adapter + // .update_receipt_by_id(received_receipt_vec[4].0, received_receipt_vec[1].1.clone()) + // .await + // .unwrap(); + // // doing the same in received_receipt_vec + // received_receipt_vec[4].1 = received_receipt_vec[1].1.clone(); + + // // compare the local vector with the one in the database + // let mut recovered_received_receipt_vec = storage_adapter + // .retrieve_receipts_in_timestamp_range(..) + // .await + // .unwrap(); + // // Sorting the recovered receipts by id + // recovered_received_receipt_vec.sort_by(|(id1, _), (id2, _)| id1.cmp(id2)); + // // Check + // for (received_receipt, recovered_received_receipt) in received_receipt_vec + // .iter() + // .zip(recovered_received_receipt_vec.iter()) + // { + // assert_eq!( + // serde_json::to_value(recovered_received_receipt).unwrap(), + // serde_json::to_value(received_receipt).unwrap() + // ) + // } + // } +} diff --git a/tap_agent/src/tap/test_utils.rs b/tap_agent/src/tap/test_utils.rs new file mode 100644 index 000000000..1cbcbb68a --- /dev/null +++ b/tap_agent/src/tap/test_utils.rs @@ -0,0 +1,75 @@ +use alloy_primitives::Address; +use alloy_sol_types::{eip712_domain, Eip712Domain}; +use ethers_signers::{coins_bip39::English, LocalWallet, MnemonicBuilder, Signer}; +use tap_core::receipt_aggregate_voucher::ReceiptAggregateVoucher; +use tap_core::tap_manager::SignedRAV; +use tap_core::tap_receipt::ReceivedReceipt; +use tap_core::{eip_712_signed_message::EIP712SignedMessage, tap_receipt::Receipt}; + +/// Fixture to generate a wallet and address +pub fn keys() -> (LocalWallet, Address) { + let wallet: LocalWallet = MnemonicBuilder::::default() + .phrase("abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about") + .build() + .unwrap(); + let address = wallet.address(); + + (wallet, Address::from_slice(address.as_bytes())) +} + +pub fn domain() -> Eip712Domain { + eip712_domain! { + name: "TAP", + version: "1", + chain_id: 1, + verifying_contract: Address::from([0x11u8; 20]), + } +} + +/// Fixture to generate a signed receipt using the wallet from `keys()` +/// and the given `query_id` and `value` +pub async fn create_received_receipt( + allocation_id: Address, + nonce: u64, + timestamp_ns: u64, + value: u128, + query_id: u64, +) -> ReceivedReceipt { + let (wallet, _) = keys(); + + let receipt = EIP712SignedMessage::new( + &domain(), + Receipt { + allocation_id, + nonce, + timestamp_ns, + value, + }, + &wallet, + ) + .await + .unwrap(); + + ReceivedReceipt::new(receipt, query_id, &[]) +} + +/// Fixture to generate a RAV using the wallet from `keys()` +pub async fn create_rav( + allocation_id: Address, + timestamp_ns: u64, + value_aggregate: u128, +) -> SignedRAV { + let (wallet, _) = keys(); + + EIP712SignedMessage::new( + &domain(), + ReceiptAggregateVoucher { + allocation_id, + timestamp_ns, + value_aggregate, + }, + &wallet, + ) + .await + .unwrap() +}