Skip to content

Commit

Permalink
add total rewards to /rewards endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
Kirill-K-1 committed Nov 15, 2024
1 parent 4068008 commit 87b5198
Show file tree
Hide file tree
Showing 12 changed files with 585 additions and 119 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,4 @@ anyhow = "1"
dotenv = "0"
sendgrid = "0"
clap = { version = "4", features = ["derive"] }
parking_lot = "0.12"
21 changes: 17 additions & 4 deletions event-indexer/src/gov_db_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use serde::Deserialize;

use airdao_gov_portal_db::session_manager::{SessionConfig, SessionManager};
use shared::common::{
RewardInfo, SBTInfo, UpdateRewardKind, UpdateRewardRequest, UpdateSBTKind, UpdateUserSBTRequest,
BatchId, RewardInfo, SBTInfo, UpdateRewardKind, UpdateRewardRequest, UpdateSBTKind,
UpdateUserSBTRequest,
};

use crate::auto_refresh_token::AutoRefreshToken;
Expand Down Expand Up @@ -137,6 +138,7 @@ impl GovDbProvider {

pub async fn claim_reward(
&mut self,
block_number: u64,
wallet: Address,
id: u64,
) -> anyhow::Result<axum::Json<()>> {
Expand All @@ -147,7 +149,11 @@ impl GovDbProvider {
.post([&self.config.url, "update-reward"].concat())
.json(&UpdateRewardRequest {
token,
kind: UpdateRewardKind::Claim { wallet, id },
kind: UpdateRewardKind::Claim {
block_number,
wallet,
id: BatchId(id),
},
})
.send()
.await?
Expand All @@ -166,15 +172,22 @@ impl GovDbProvider {
Ok(json)
}

pub async fn revert_reward(&mut self, id: u64) -> anyhow::Result<axum::Json<()>> {
pub async fn revert_reward(
&mut self,
block_number: u64,
id: u64,
) -> anyhow::Result<axum::Json<()>> {
let token = self.auto_refresh_token.acquire_token()?.clone();

let bytes = self
.client
.post([&self.config.url, "update-reward"].concat())
.json(&UpdateRewardRequest {
token,
kind: UpdateRewardKind::Revert { id },
kind: UpdateRewardKind::Revert {
block_number,
id: BatchId(id),
},
})
.send()
.await?
Expand Down
18 changes: 12 additions & 6 deletions event-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use event_listener::{EventListener, GovEvent, GovEventNotification};
use gov_db_provider::GovDbProvider;
use indexer_state_redis_cache::IndexerStateRedisCache;
use shared::{
common::{RewardInfo, RewardStatus, SBTInfo},
common::{BatchId, RewardInfo, RewardStatus, SBTInfo, TimestampSeconds},
logger, utils,
};

Expand All @@ -32,7 +32,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let mut indexer_state_redis_cache =
IndexerStateRedisCache::new(chain_id, &config.redis, config.block_number).await?;

indexer_state_redis_cache.block_number = 2513099;
let mut gov_db_provider = GovDbProvider::new(config.db.clone())?;

let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<GovEventNotification>();
Expand Down Expand Up @@ -88,11 +88,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
GovEvent::Reward(event) => {
gov_db_provider
.insert_reward(RewardInfo {
id: event_notification.block_number,
id: BatchId(event_notification.block_number),
grantor: event.grantor,
wallet: event.wallet,
amount: event.amount,
timestamp: event.timestamp.as_u64(),
timestamp: TimestampSeconds(event.timestamp.as_u64()),
event_name: event.name.clone(),
region: event.region.clone(),
community: Some(event.community.clone()).filter(|v| !v.is_empty()),
Expand All @@ -102,9 +102,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await
}
GovEvent::ClaimReward(wallet, id) => {
gov_db_provider.claim_reward(*wallet, *id).await
gov_db_provider
.claim_reward(event_notification.block_number, *wallet, *id)
.await
}
GovEvent::RevertReward(id) => {
gov_db_provider
.revert_reward(event_notification.block_number, *id)
.await
}
GovEvent::RevertReward(id) => gov_db_provider.revert_reward(*id).await,
};

if let Ok(axum::Json(())) = result {
Expand Down
1 change: 1 addition & 0 deletions gov-portal-db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ anyhow = { workspace = true }
dotenv = { workspace = true }
rand = { workspace = true }
clap = { workspace = true }
parking_lot = { workspace = true }

[dev-dependencies]
assert_matches = { workspace = true }
Expand Down
10 changes: 10 additions & 0 deletions gov-portal-db/src/rewards_manager/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use tokio::sync::mpsc;

#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Not authorized")]
Expand All @@ -14,4 +16,12 @@ pub enum Error {
Timeout(#[from] tokio::time::error::Elapsed),
#[error("Internal error: {0}")]
Internal(#[from] anyhow::Error),
#[error("Send error: {0}")]
Send(String),
}

impl<T: std::fmt::Debug> From<mpsc::error::SendError<T>> for Error {
fn from(error: mpsc::error::SendError<T>) -> Self {
Self::Send(error.to_string())
}
}
127 changes: 110 additions & 17 deletions gov-portal-db/src/rewards_manager/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
pub mod client;
pub mod error;
mod rewards_cache;

use bson::{doc, Document};
use ethereum_types::Address;
use ethereum_types::{Address, U256};
use futures_util::TryStreamExt;
use mongodb::options::{CountOptions, FindOptions, UpdateOptions};
use rewards_cache::{RewardsCache, RewardsDelta};
use serde::Deserialize;

use shared::common::{
RewardDbEntry, RewardInfo, RewardStatus, Rewards, RewardsDbEntry, UpdateRewardKind,
BatchId, RewardDbEntry, RewardInfo, RewardStatus, Rewards, RewardsDbEntry, UpdateRewardKind,
};

use crate::mongo_client::{MongoClient, MongoConfig};
Expand All @@ -30,6 +32,7 @@ pub struct RewardsManagerConfig {
pub struct RewardsManager {
pub db_client: RewardsDbClient,
pub config: RewardsManagerConfig,
pub rewards_cache: RewardsCache,
}

impl RewardsManager {
Expand All @@ -40,14 +43,37 @@ impl RewardsManager {
) -> anyhow::Result<Self> {
let db_client = RewardsDbClient::new(mongo_config, &config.collection).await?;

Ok(Self { db_client, config })
let mut rewards = Vec::with_capacity(MAX_GET_REWARDS_LIMIT as usize);

loop {
let fetched = Self::load_all_rewards(
&db_client,
Some(rewards.len() as u64),
Some(MAX_GET_REWARDS_LIMIT),
)
.await?;

if fetched.is_empty() {
break;
}

rewards.extend(fetched);
}

let rewards_cache = RewardsCache::init(rewards)?;

Ok(Self {
db_client,
config,
rewards_cache,
})
}

/// Update reward struct at MongoDB by reward id [`u64`]
/// Update reward struct at MongoDB by reward id [`BatchId`]
pub async fn update_reward(&self, update_kind: UpdateRewardKind) -> Result<(), error::Error> {
let (query, set_doc) = match update_kind {
let (query, set_doc, rewards_delta) = match update_kind {
UpdateRewardKind::Grant(reward) => {
let (id, wallet, reward_entry) = <(u64, Address, RewardDbEntry)>::from(reward);
let (id, wallet, reward_entry) = <(BatchId, Address, RewardDbEntry)>::from(reward);

let query = doc! {
"id": bson::to_bson(&id)?,
Expand All @@ -57,9 +83,18 @@ impl RewardsManager {
format!("wallets.0x{}", hex::encode(wallet)): bson::to_bson(&reward_entry)?,
};

(query, set_doc)
(
query,
set_doc,
// [`BatchId`] is a block number in which it was created
RewardsDelta::Grant(id.0, id, wallet, reward_entry.amount),
)
}
UpdateRewardKind::Claim { wallet, id } => {
UpdateRewardKind::Claim {
block_number,
wallet,
id,
} => {
let query = doc! {
"id": bson::to_bson(&id)?,
};
Expand All @@ -68,9 +103,13 @@ impl RewardsManager {
format!("wallets.0x{}.status", hex::encode(wallet)): bson::to_bson(&RewardStatus::Claimed)?,
};

(query, set_doc)
(
query,
set_doc,
RewardsDelta::Claim(block_number, id, wallet),
)
}
UpdateRewardKind::Revert { id } => {
UpdateRewardKind::Revert { block_number, id } => {
let query = doc! {
"id": bson::to_bson(&id)?,
};
Expand All @@ -79,7 +118,7 @@ impl RewardsManager {
"status": bson::to_bson(&RewardStatus::Reverted)?,
};

(query, set_doc)
(query, set_doc, RewardsDelta::RevertBatch(block_number, id))
}
};

Expand All @@ -99,11 +138,60 @@ impl RewardsManager {

match upsert_result {
// Upserts a Reward entry with an id
Ok(_) => Ok(()),
Ok(_) => self.rewards_cache.push_rewards_delta(rewards_delta),
Err(e) => Err(e.into()),
}
}

async fn load_all_rewards(
db_client: &RewardsDbClient,
start: Option<u64>,
limit: Option<u64>,
) -> Result<Vec<RewardsDbEntry>, error::Error> {
let start = start.unwrap_or_default();
let limit = limit
.unwrap_or(DEFAULT_GET_REWARDS_LIMIT)
.clamp(1, MAX_GET_REWARDS_LIMIT) as i64;

let find_options = FindOptions::builder()
.max_time(db_client.req_timeout)
.skip(start)
.limit(limit)
.build();

tokio::time::timeout(db_client.req_timeout, async {
let mut results = Vec::with_capacity(limit as usize);
let mut stream = db_client.collection().find(doc! {}, find_options).await?;
while let Ok(Some(doc)) = stream.try_next().await {
let rewards =
bson::from_document::<RewardsDbEntry>(doc).map_err(error::Error::from)?;
results.push(rewards);
}
Ok(results)
})
.await?
}

pub fn get_total_rewards(
&self,
requestor: &Address,
wallet: &Address,
) -> Result<U256, error::Error> {
let requires_moderator_access_rights = wallet != requestor;

if requires_moderator_access_rights
&& !self
.config
.moderators
.iter()
.any(|wallet| wallet == requestor)
{
return Err(error::Error::Unauthorized);
}

Ok(self.rewards_cache.get_total_rewards(wallet))
}

/// Counts all rewards allocated by requestor within MongoDB by provided wallet EVM-like address [`Address`]
pub async fn count_rewards(
&self,
Expand All @@ -121,7 +209,7 @@ impl RewardsManager {
return Err(error::Error::Unauthorized);
}

let expr = self.build_rewards_filter_expr(from, to, community)?;
let expr = Self::build_rewards_filter_expr(from, to, community)?;
let filter = doc! {
"$expr": bson::to_bson(&expr)?,
};
Expand Down Expand Up @@ -164,13 +252,16 @@ impl RewardsManager {
return Err(error::Error::Unauthorized);
}

let expr = self.build_rewards_filter_expr(from, to, community)?;
let expr = Self::build_rewards_filter_expr(from, to, community)?;
let filter = doc! {
"$expr": expr
};

let find_options = FindOptions::builder()
.max_time(self.db_client.req_timeout)
// .sort(doc! {
// "timestamp": -1
// })
.skip(start)
.limit(limit)
.build();
Expand Down Expand Up @@ -241,7 +332,7 @@ impl RewardsManager {
return Err(error::Error::Unauthorized);
}

let expr = self.build_rewards_filter_expr(from, to, community)?;
let expr = Self::build_rewards_filter_expr(from, to, community)?;
let filter = doc! {
format!("wallets.0x{}", hex::encode(wallet)): { "$exists": true },
"$expr": bson::to_bson(&expr)?,
Expand Down Expand Up @@ -290,14 +381,17 @@ impl RewardsManager {
return Err(error::Error::Unauthorized);
}

let expr = self.build_rewards_filter_expr(from, to, community)?;
let expr = Self::build_rewards_filter_expr(from, to, community)?;
let filter = doc! {
format!("wallets.0x{}", hex::encode(wallet)): { "$exists": true },
"$expr": bson::to_bson(&expr)?,
};

let find_options = FindOptions::builder()
.max_time(self.db_client.req_timeout)
// .sort(doc! {
// "timestamp": -1
// })
.skip(start)
.limit(limit)
.projection(doc! {
Expand Down Expand Up @@ -353,7 +447,6 @@ impl RewardsManager {
}

fn build_rewards_filter_expr(
&self,
from: Option<u64>,
to: Option<u64>,
community: Option<&str>,
Expand Down
Loading

0 comments on commit 87b5198

Please sign in to comment.