diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..1cc2a4c --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,51 @@ +use std::collections::HashSet; + +use anyhow::Result; +use hyper::{Body, Response}; +use serde::Serialize; +use sha2::{Digest, Sha256}; + +// Extract accounts, +// returns: account, is lockup, master account +pub fn get_accounts_and_lockups(accounts: &str) -> HashSet<(String, Option)> { + let mut accounts: HashSet<(String, Option)> = accounts + .split(',') + .map(String::from) + .filter(|account| account != "near" && account != "system") + .map(|account| (account, None)) + .collect(); + + for a in accounts.clone() { + let lockup_account = get_associated_lockup(&a.0, "near"); + accounts.insert((lockup_account, Some(a.0.clone()))); + } + + accounts +} + +// Consolidate results and return a Response +pub fn results_to_response(results: Vec) -> Result, csv::Error> { + let mut wtr = csv::Writer::from_writer(Vec::new()); + for row in results { + wtr.serialize(row)?; + } + wtr.flush()?; + Ok(Response::builder() + .header("Content-Type", "text/csv") + .body(Body::from(wtr.into_inner().unwrap())) + .unwrap()) +} + +pub fn get_associated_lockup(account_id: &str, master_account_id: &str) -> String { + format!( + "{}.lockup.{}", + &sha256(account_id)[0..40], + master_account_id + ) +} + +fn sha256(value: &str) -> String { + let mut hasher = Sha256::new(); + hasher.update(value.as_bytes()); + format!("{:x}", hasher.finalize()) +} diff --git a/src/main.rs b/src/main.rs index 5f73b8b..59440f9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -33,6 +33,7 @@ use tokio::{spawn, sync::Semaphore}; use tracing::*; use tracing_subscriber::{prelude::__tracing_subscriber_SubscriberExt, EnvFilter, FmtSubscriber}; use tta::tta_impl::TTA; +use tta_rust::{get_accounts_and_lockups, results_to_response}; use crate::tta::{ft_metadata::FtService, sql::sql_queries::SqlClient}; @@ -125,7 +126,9 @@ async fn router() -> anyhow::Result { .route("/likelyBlockId", get(get_closest_block_id)) .with_state(sql_client.clone()) .route("/balances", get(get_balances)) - .with_state((sql_client, ft_service.clone())) + .with_state((sql_client.clone(), ft_service.clone())) + .route("/staking", get(get_staking_report)) + .with_state((sql_client, ft_service)) .layer(middleware)) } @@ -255,17 +258,12 @@ async fn get_balances( let start_block_id = sql_client.get_closest_block_id(start_nanos).await?; let end_block_id = sql_client.get_closest_block_id(end_nanos).await?; - let accounts: HashSet = params - .accounts - .split(',') - .map(String::from) - .filter(|account| account != "near" && account != "system") - .collect(); + let accounts = get_accounts_and_lockups(¶ms.accounts); let client = reqwest::Client::new(); let mut handles = vec![]; - for account in accounts { + for (account, _) in accounts { let client = client.clone(); let ft_service = ft_service.clone(); let start_block_id = start_block_id; @@ -396,15 +394,148 @@ async fn get_balances( } }); - let mut wtr = csv::Writer::from_writer(Vec::new()); - for row in rows { - wtr.serialize(row).unwrap(); + let r = results_to_response(rows)?; + Ok(r) +} + +#[derive(Debug, Deserialize)] +struct GetStaking { + pub date: String, + pub accounts: String, +} + +#[derive(Debug, Serialize, Clone)] +struct StakingReportRow { + pub account: String, + pub staking_pool: String, + pub amount_staked: f64, + pub amount_unstaked: f64, + pub ready_for_withdraw: bool, + pub lockup_of: Option, + pub date: String, + pub block_id: u128, +} + +#[derive(Debug, Deserialize, Clone)] +struct StakingDeposit { + pub deposit: String, + pub validator_id: String, +} + +async fn get_staking_report( + Query(params): Query, + State((sql_client, ft_service)): State<(SqlClient, FtService)>, +) -> Result, AppError> { + let date: DateTime = DateTime::parse_from_rfc3339(¶ms.date).unwrap().into(); + let start_nanos = date.timestamp_nanos() as u128; + + let block_id = sql_client.get_closest_block_id(start_nanos).await?; + + let accounts = get_accounts_and_lockups(¶ms.accounts); + + // todo add support for lockup accounts + + let client = reqwest::Client::new(); + let mut handles = vec![]; + + for (account, master_account) in accounts { + let client = client.clone(); + let ft_service = ft_service.clone(); + let block_id = block_id; + + let handle = spawn(async move { + info!("Getting staking for {}", account); + let mut rows: Vec = vec![]; + + let staking_deposits = client + .get(format!( + "https://api.kitwallet.app/staking-deposits/{account}" + )) + .send() + .await? + .json::>() + .await?; + info!( + "Account {} staking deposits: {:?}", + account, staking_deposits + ); + + let handles: Vec<_> = staking_deposits + .iter() + .map(|staking_deposit| { + let staking_deposit = staking_deposit.clone(); + let account = account.clone(); + let ft_service = ft_service.clone(); + let master_account = master_account.clone(); + async move { + let staking_details = match ft_service + .get_staking_details( + &staking_deposit.validator_id, + &account, + block_id as u64, + ) + .await + { + Ok(v) => v, + Err(e) => { + debug!("{}: {}", account, e); + return Err(e); + } + }; + + if staking_details.0 == 0.0 && staking_details.1 == 0.0 { + return Ok(None); + } + + let record = StakingReportRow { + account, + staking_pool: staking_deposit.validator_id.clone(), + amount_staked: staking_details.0, + amount_unstaked: staking_details.1, + ready_for_withdraw: staking_details.2, + lockup_of: master_account, + date: date.to_rfc3339(), + block_id, + }; + Ok(Some(record)) + } + }) + .collect(); + + let results: Vec<_> = join_all(handles).await; + for result in results { + match result { + Ok(record) => { + if let Some(record) = record { + rows.push(record) + } + } + Err(e) => { + error!("staking error: {:?}", e); + } + } + } + + anyhow::Ok(rows) + }); + handles.push(handle); } - wtr.flush()?; - let response = Response::builder() - .header("Content-Type", "text/csv") - .body(Body::from(wtr.into_inner().unwrap()))?; - Ok(response) + + let mut rows = vec![]; + join_all(handles).await.iter().for_each(|row| match row { + Ok(result) => match result { + Ok(res) => rows.extend(res.iter().cloned()), + Err(e) => { + println!("{:?}", e) + } + }, + Err(e) => { + warn!("{:?}", e) + } + }); + + let r = results_to_response(rows)?; + Ok(r) } struct AppError(anyhow::Error); diff --git a/src/tta/ft_metadata.rs b/src/tta/ft_metadata.rs index 44dd847..80db06c 100644 --- a/src/tta/ft_metadata.rs +++ b/src/tta/ft_metadata.rs @@ -19,7 +19,7 @@ use std::{ num::{NonZeroU32, NonZeroUsize}, sync::Arc, }; -use tokio::sync::RwLock; +use tokio::{join, sync::RwLock}; use tracing::{debug, info}; use std::hash::{Hash, Hasher}; @@ -233,6 +233,122 @@ impl FtService { let amount = safe_divide_u128(amount, 24); Ok(amount) } + + pub async fn get_staking_details( + &self, + staking_pool: &str, + account_id: &str, + block_id: u64, + ) -> Result<(f64, f64, bool)> { + let args = json!({ "account_id": account_id }).to_string().into_bytes(); + + let unstaked_balance_future = self.get_unstaked_balance(staking_pool, &args, block_id); + let staked_balance_future = self.get_staked_balance(staking_pool, &args, block_id); + let unstaked_balance_available_future = + self.is_unstaked_balance_available(staking_pool, &args, block_id); + + let (unstaked_balance, staked_balance, unstaked_balance_available) = join!( + unstaked_balance_future, + staked_balance_future, + unstaked_balance_available_future + ); + + Ok(( + safe_divide_u128(staked_balance?, 24), + safe_divide_u128(unstaked_balance?, 24), + unstaked_balance_available?, + )) + } + + async fn get_unstaked_balance( + &self, + staking_pool: &str, + args: &[u8], + block_id: u64, + ) -> Result { + self.archival_rate_limiter.write().await.until_ready().await; + let result = view_function_call( + &self.near_client, + QueryRequest::CallFunction { + account_id: staking_pool.parse()?, + method_name: "get_account_unstaked_balance".to_string(), + args: FunctionArgs::from(args.to_vec()), + }, + BlockReference::BlockId(Height(block_id)), + ) + .await; + + match result { + Ok(v) => Ok(serde_json::from_slice::(&v)?.parse::()?), + Err(e) => { + bail!( + "Error getting staking details for staking pool: {}, error: {:?}", + staking_pool, + e + ); + } + } + } + + async fn get_staked_balance( + &self, + staking_pool: &str, + args: &[u8], + block_id: u64, + ) -> Result { + self.archival_rate_limiter.write().await.until_ready().await; + let result = view_function_call( + &self.near_client, + QueryRequest::CallFunction { + account_id: staking_pool.parse()?, + method_name: "get_account_staked_balance".to_string(), + args: FunctionArgs::from(args.to_vec()), + }, + BlockReference::BlockId(Height(block_id)), + ) + .await; + + match result { + Ok(v) => Ok(serde_json::from_slice::(&v)?.parse::()?), + Err(e) => { + bail!( + "Error getting staking details for staking pool: {}, error: {:?}", + staking_pool, + e + ); + } + } + } + + async fn is_unstaked_balance_available( + &self, + staking_pool: &str, + args: &[u8], + block_id: u64, + ) -> Result { + self.archival_rate_limiter.write().await.until_ready().await; + let result = view_function_call( + &self.near_client, + QueryRequest::CallFunction { + account_id: staking_pool.parse()?, + method_name: "is_account_unstaked_balance_available".to_string(), + args: FunctionArgs::from(args.to_vec()), + }, + BlockReference::BlockId(Height(block_id)), + ) + .await; + + match result { + Ok(v) => Ok(serde_json::from_slice::(&v)?), + Err(e) => { + bail!( + "Error getting staking details for staking pool: {}, error: {:?}", + staking_pool, + e + ); + } + } + } } pub async fn view_function_call(