From bfea2c740f942cdbf478acca17b5723be2581771 Mon Sep 17 00:00:00 2001 From: ZelionD Date: Thu, 15 Aug 2024 11:50:20 -0400 Subject: [PATCH] add profiles fetcher --- Cargo.lock | 41 ++++++- Cargo.toml | 1 + profiles-fetcher/Cargo.toml | 36 ++++++ profiles-fetcher/src/main.rs | 219 +++++++++++++++++++++++++++++++++++ 4 files changed, 296 insertions(+), 1 deletion(-) create mode 100644 profiles-fetcher/Cargo.toml create mode 100644 profiles-fetcher/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 1d4a64a..3bc42c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -134,6 +134,25 @@ dependencies = [ "url", ] +[[package]] +name = "airdao-gov-portal-profiles-fetcher" +version = "0.1.0" +dependencies = [ + "anyhow", + "backtrace", + "chrono", + "dotenv", + "ethereum-types", + "futures-util", + "log", + "reqwest 0.11.27", + "serde", + "serde_json", + "shared", + "tokio", + "tracing", +] + [[package]] name = "airdao-gov-user-verifier" version = "0.1.0" @@ -1381,6 +1400,20 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" +dependencies = [ + "futures-util", + "http 0.2.12", + "hyper 0.14.28", + "rustls 0.21.12", + "tokio", + "tokio-rustls 0.24.1", +] + [[package]] name = "hyper-rustls" version = "0.26.0" @@ -2284,6 +2317,7 @@ version = "0.11.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62" dependencies = [ + "async-compression", "base64 0.21.7", "bytes", "encoding_rs", @@ -2293,6 +2327,7 @@ dependencies = [ "http 0.2.12", "http-body 0.4.6", "hyper 0.14.28", + "hyper-rustls 0.24.2", "hyper-tls 0.5.0", "ipnet", "js-sys", @@ -2302,6 +2337,7 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", + "rustls 0.21.12", "rustls-pemfile 1.0.4", "serde", "serde_json", @@ -2310,11 +2346,14 @@ dependencies = [ "system-configuration", "tokio", "tokio-native-tls", + "tokio-rustls 0.24.1", + "tokio-util", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", "web-sys", + "webpki-roots 0.25.4", "winreg 0.50.0", ] @@ -2335,7 +2374,7 @@ dependencies = [ "http-body 1.0.0", "http-body-util", "hyper 1.3.1", - "hyper-rustls", + "hyper-rustls 0.26.0", "hyper-tls 0.6.0", "hyper-util", "ipnet", diff --git a/Cargo.toml b/Cargo.toml index eba9358..c75ecf0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "gov-portal-db", "shared", "mailer", + "profiles-fetcher", ] resolver = "2" diff --git a/profiles-fetcher/Cargo.toml b/profiles-fetcher/Cargo.toml new file mode 100644 index 0000000..9f7a7b1 --- /dev/null +++ b/profiles-fetcher/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "airdao-gov-portal-profiles-fetcher" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +# Local +shared = { path = "../shared/" } + +# Async +tokio = { workspace = true } +futures-util = { workspace = true } + +# Ethereum +ethereum-types = { workspace = true } + +# Web +reqwest = { workspace = true } + +# SerDe +serde = { workspace = true } +serde_json = { workspace = true } + +# Logging +tracing = { workspace = true } +log = { workspace = true } + +# Misc +backtrace = { workspace = true } +chrono = { workspace = true } +anyhow = { workspace = true } +dotenv = { workspace = true } + +[dev-dependencies] diff --git a/profiles-fetcher/src/main.rs b/profiles-fetcher/src/main.rs new file mode 100644 index 0000000..5fa934c --- /dev/null +++ b/profiles-fetcher/src/main.rs @@ -0,0 +1,219 @@ +use chrono::{DateTime, Utc}; +use ethereum_types::Address; +use serde::{Deserialize, Serialize}; +use shared::{logger, utils}; +use std::{fs::File, io::prelude::*}; +use tracing::{info, warn}; + +const REQUEST_TIMEOUT: tokio::time::Duration = tokio::time::Duration::from_secs(60); +const REQUEST_LIMIT: usize = 4; +const MAX_PARALLEL_REQUESTS: usize = 5; +const BATCH_SIZE: usize = REQUEST_LIMIT * MAX_PARALLEL_REQUESTS; + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +enum SBTReportKind { + Success(SBTReport), + Failure(SBTReportError), + Unavailable, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +struct SBTReport { + name: String, + address: Address, + issued_at: DateTime, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +struct SBTReportError { + name: String, + address: Address, + error: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +struct User { + wallet: Address, + reports: Vec, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +struct AllUsersReport { + next: usize, + users: Vec, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + logger::init(); + utils::set_heavy_panic(); + + // Try load environment variables from `.env` if provided + dotenv::dotenv().ok(); + + let token = std::env::var("TOKEN")?; + let mut start = 0; + let mut users = vec![]; + + if let Ok(mut file) = File::open("./reports.json") { + let mut json = String::new(); + + file.read_to_string(&mut json)?; + + let all_users = serde_json::from_str::(&json)?; + + start = all_users.next; + users = all_users.users; + } + + for batch_start in (start..).step_by(BATCH_SIZE).into_iter() { + let mut requests = (batch_start..batch_start + BATCH_SIZE) + .step_by(REQUEST_LIMIT) + .into_iter() + .collect::>(); + + info!(start = ?batch_start, limit = ?REQUEST_LIMIT, "Waiting for a batch"); + + let mut end = false; + let client = reqwest::Client::builder() + .pool_max_idle_per_host(0) + .timeout(REQUEST_TIMEOUT) + .build()?; + + let mut fetched_users_count = 0; + + loop { + if requests.is_empty() { + break; + } + + let prepared = requests.iter().map(|start| { + let client = client.clone(); + let token = token.as_str(); + + tokio::time::timeout(REQUEST_TIMEOUT, async move { + client + .post("https://gov-portal-verifier-api.ambrosus.io/db/sbt-report") + .json(&serde_json::json!({ + "token": token, + "start": start, + "limit": REQUEST_LIMIT, + })) + .send() + .await + }) + }); + + let results = futures_util::future::join_all(prepared) + .await + .into_iter() + .zip(requests); + + info!(count = ?results.len(), "Fetched results"); + + requests = vec![]; + + for (result, start) in results { + match result { + Err(_) => { + warn!(?start, "Fetch a batch timed out"); + requests.push(start); + } + Ok(Err(error)) => { + warn!(?start, ?error, "Failed to fetch a batch"); + requests.push(start); + } + Ok(Ok(response)) => { + let (text, parsed_users) = match response.text().await { + Err(error) => { + warn!(?start, ?error, "Failed to get response for a batch"); + requests.push(start); + continue; + } + Ok(text) => { + let parsed = serde_json::from_str::>(&text); + (text, parsed) + } + }; + + let Ok(fetched_users) = parsed_users else { + warn!(?text, "Failed to parse users response"); + requests.push(start); + continue; + }; + + info!(count = ?fetched_users.len(), "Fetched users"); + + fetched_users_count += fetched_users.len(); + + if fetched_users.is_empty() { + end = true; + continue; + } + + users.extend(fetched_users.into_iter().filter_map(|user| { + let reports = user + .reports + .into_iter() + .filter_map(|report| { + if let SBTReportKind::Unavailable = report { + None + } else { + Some(report) + } + }) + .collect::>(); + + if reports.is_empty() { + None + } else { + Some(User { + wallet: user.wallet, + reports, + }) + } + })); + } + } + } + + if !requests.is_empty() { + info!(count = ?requests.len(), "Retry requests..."); + } + } + + info!(count = ?users.len(), "Total fetched profiles"); + + match File::create("./reports.json.tmp") { + Ok(mut file) => { + if let Err(error) = file.write_all( + serde_json::to_string_pretty(&AllUsersReport { + next: batch_start + fetched_users_count, + users: users.clone(), + })? + .as_bytes(), + ) { + warn!(?error, "Failed to write file"); + } + } + Err(error) => { + warn!(?error, "Failed to open file"); + } + } + + if let Err(error) = std::fs::rename("./reports.json.tmp", "./reports.json") { + warn!(?error, "Failed to replace file"); + } + + if end { + break; + } + } + + Ok(()) +}