diff --git a/Cargo.lock b/Cargo.lock index 657fa0005..3df00c759 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1206,6 +1206,7 @@ name = "clockwork-thread-program" version = "2.0.17" dependencies = [ "anchor-lang", + "anchor-spl", "chrono", "clockwork-cron", "clockwork-network-program", @@ -1250,6 +1251,7 @@ name = "clockwork_plugin" version = "2.0.17" dependencies = [ "anchor-lang", + "anchor-spl", "async-trait", "async_once", "bincode", diff --git a/plugin/Cargo.toml b/plugin/Cargo.toml index 55e3dd2c5..165e603c3 100644 --- a/plugin/Cargo.toml +++ b/plugin/Cargo.toml @@ -20,6 +20,7 @@ name = "clockwork_plugin" [dependencies] anchor-lang = "0.27.0" +anchor-spl = { features = ["token"], version = "0.27.0" } async_once = "0.2.6" async-trait = "0.1.64" bincode = "1.3.3" diff --git a/plugin/src/events.rs b/plugin/src/events.rs index 7c8278fe6..34758ec6e 100644 --- a/plugin/src/events.rs +++ b/plugin/src/events.rs @@ -1,4 +1,5 @@ use anchor_lang::{prelude::AccountInfo, AccountDeserialize, Discriminator}; +use anchor_spl::token::TokenAccount; use bincode::deserialize; use clockwork_thread_program::state::{Thread as ThreadV2, VersionedThread}; use clockwork_thread_program_v1::state::Thread as ThreadV1; @@ -10,7 +11,7 @@ use solana_geyser_plugin_interface::geyser_plugin_interface::{ use solana_program::{clock::Clock, pubkey::Pubkey, sysvar}; use static_pubkey::static_pubkey; -static PYTH_ORACLE_PROGRAM_ID_MAINNET: Pubkey = +static PYTH_ORACLE_PROGRAM_ID_MAINNET: Pubkey = static_pubkey!("FsJ3A3u2vn5cTVofAjvy6y5kwABJAqYWpe4975bi2epH"); static PYTH_ORACLE_PROGRAM_ID_DEVNET: Pubkey = static_pubkey!("gSbePebfvPy7tRqimPoVecS2UsBvYv46ynrzWocc92s"); @@ -20,6 +21,7 @@ pub enum AccountUpdateEvent { Clock { clock: Clock }, Thread { thread: VersionedThread }, PriceFeed { price_feed: PriceFeed }, + TokenAccount { token_account: TokenAccount }, Webhook { webhook: Webhook }, } @@ -96,6 +98,17 @@ impl TryFrom<&mut ReplicaAccountInfo<'_>> for AccountUpdateEvent { return Ok(AccountUpdateEvent::PriceFeed { price_feed }); } + // If the account belongs to the token program, attempt to parse it. + if owner_pubkey.eq(&anchor_spl::token::ID) { + let token_account = + TokenAccount::try_deserialize(&mut account_info.data).map_err(|_| { + GeyserPluginError::AccountsUpdateError { + msg: "Failed to parse SPL token account".into(), + } + })?; + return Ok(AccountUpdateEvent::TokenAccount { token_account }); + } + // If the account belongs to the webhook program, parse in if owner_pubkey.eq(&clockwork_webhook_program::ID) && account_info.data.len() > 8 { return Ok(AccountUpdateEvent::Webhook { diff --git a/plugin/src/observers/thread.rs b/plugin/src/observers/thread.rs index be8114c17..9c6bf885d 100644 --- a/plugin/src/observers/thread.rs +++ b/plugin/src/observers/thread.rs @@ -5,9 +5,10 @@ use std::{ sync::{atomic::AtomicU64, Arc}, }; +use anchor_spl::token::TokenAccount; use chrono::{DateTime, NaiveDateTime, Utc}; use clockwork_cron::Schedule; -use clockwork_thread_program::state::{Trigger, TriggerContext, Equality, VersionedThread}; +use clockwork_thread_program::state::{Equality, Trigger, TriggerContext, VersionedThread}; use log::info; use pyth_sdk_solana::PriceFeed; use solana_geyser_plugin_interface::geyser_plugin_interface::{ @@ -20,6 +21,9 @@ pub struct ThreadObserver { // Map from slot numbers to the sysvar clock data for that slot. pub clocks: RwLock>, + // The set of threads that have an active trigger condition this slot. + pub activated_threads: RwLock>, + // Integer tracking the current epoch. pub current_epoch: AtomicU64, @@ -31,9 +35,6 @@ pub struct ThreadObserver { // Map from unix timestamps to the list of threads scheduled for that moment. pub cron_threads: RwLock>>, - // The set of threads with a now trigger. - pub now_threads: RwLock>, - // The set of threads with a slot trigger. pub slot_threads: RwLock>>, @@ -43,6 +44,12 @@ pub struct ThreadObserver { // The set of threads with a pyth trigger. pub pyth_threads: RwLock>>, + // The set of threads with a token trigger. + pub token_threads: RwLock>>, + + // The set of threads with a token limit trigger. + pub token_limit_threads: RwLock>>, + // The set of accounts that have updated. pub updated_accounts: RwLock>, } @@ -54,6 +61,13 @@ pub struct PythThread { pub limit: i64, } +#[derive(Eq, Hash, PartialEq)] +pub struct TokenLimitThread { + pub thread_pubkey: Pubkey, + pub equality: Equality, + pub limit: u64, +} + impl ThreadObserver { pub fn new() -> Self { Self { @@ -61,10 +75,12 @@ impl ThreadObserver { current_epoch: AtomicU64::new(0), account_threads: RwLock::new(HashMap::new()), cron_threads: RwLock::new(HashMap::new()), - now_threads: RwLock::new(HashSet::new()), + activated_threads: RwLock::new(HashSet::new()), slot_threads: RwLock::new(HashMap::new()), epoch_threads: RwLock::new(HashMap::new()), pyth_threads: RwLock::new(HashMap::new()), + token_threads: RwLock::new(HashMap::new()), + token_limit_threads: RwLock::new(HashMap::new()), updated_accounts: RwLock::new(HashSet::new()), } } @@ -139,12 +155,12 @@ impl ThreadObserver { drop(w_epoch_threads); // Get the set of immediate threads. - let mut w_now_threads = self.now_threads.write().await; - w_now_threads.iter().for_each(|pubkey| { + let mut w_activated_threads = self.activated_threads.write().await; + w_activated_threads.iter().for_each(|pubkey| { executable_threads.insert(*pubkey); }); - w_now_threads.clear(); - drop(w_now_threads); + w_activated_threads.clear(); + drop(w_activated_threads); Ok(executable_threads) } @@ -187,9 +203,9 @@ impl ThreadObserver { .price .ge(&pyth_thread.limit) { - let mut w_now_threads = self.now_threads.write().await; - w_now_threads.insert(pyth_thread.thread_pubkey); - drop(w_now_threads); + let mut w_activated_threads = self.activated_threads.write().await; + w_activated_threads.insert(pyth_thread.thread_pubkey); + drop(w_activated_threads); } } Equality::LessThanOrEqual => { @@ -198,9 +214,9 @@ impl ThreadObserver { .price .le(&pyth_thread.limit) { - let mut w_now_threads = self.now_threads.write().await; - w_now_threads.insert(pyth_thread.thread_pubkey); - drop(w_now_threads); + let mut w_activated_threads = self.activated_threads.write().await; + w_activated_threads.insert(pyth_thread.thread_pubkey); + drop(w_activated_threads); } } } @@ -210,6 +226,48 @@ impl ThreadObserver { Ok(()) } + pub async fn observe_token_account( + self: Arc, + account_pubkey: Pubkey, + token_account: TokenAccount, + ) -> PluginResult<()> { + // Queue up the token threads. + let r_token_threads = self.token_threads.read().await; + if let Some(token_threads) = r_token_threads.get(&account_pubkey) { + let mut w_activated_threads = self.activated_threads.write().await; + for token_thread in token_threads { + w_activated_threads.insert(*token_thread); + } + drop(w_activated_threads); + } + drop(r_token_threads); + + // Queue up the token limit threads. + let r_token_limit_threads = self.token_limit_threads.read().await; + if let Some(token_limit_threads) = r_token_limit_threads.get(&account_pubkey) { + for token_limit_thread in token_limit_threads { + match token_limit_thread.equality { + Equality::GreaterThanOrEqual => { + if token_account.amount.ge(&token_limit_thread.limit) { + let mut w_activated_threads = self.activated_threads.write().await; + w_activated_threads.insert(token_limit_thread.thread_pubkey); + drop(w_activated_threads); + } + } + Equality::LessThanOrEqual => { + if token_account.amount.le(&token_limit_thread.limit) { + let mut w_activated_threads = self.activated_threads.write().await; + w_activated_threads.insert(token_limit_thread.thread_pubkey); + drop(w_activated_threads); + } + } + } + } + } + drop(r_token_limit_threads); + Ok(()) + } + pub async fn observe_thread( self: Arc, thread: VersionedThread, @@ -224,9 +282,9 @@ impl ThreadObserver { info!("Indexing thread: {:?} slot: {}", thread_pubkey, slot); if thread.next_instruction().is_some() { // If the thread has a next instruction, index it as executable. - let mut w_now_threads = self.now_threads.write().await; - w_now_threads.insert(thread_pubkey); - drop(w_now_threads); + let mut w_activated_threads = self.activated_threads.write().await; + w_activated_threads.insert(thread_pubkey); + drop(w_activated_threads); } else { // Otherwise, index the thread according to its trigger type. match thread.trigger() { @@ -250,10 +308,10 @@ impl ThreadObserver { drop(w_account_threads); // Threads with account triggers might be immediately executable, - // Thus, we should attempt to execute these threads right away without for an account update. - let mut w_now_threads = self.now_threads.write().await; - w_now_threads.insert(thread_pubkey); - drop(w_now_threads); + // Thus, we should attempt to execute these threads right away without for an account update. + let mut w_activated_threads = self.activated_threads.write().await; + w_activated_threads.insert(thread_pubkey); + drop(w_activated_threads); } Trigger::Cron { schedule, @@ -306,9 +364,9 @@ impl ThreadObserver { drop(w_cron_threads); } Trigger::Now => { - let mut w_now_threads = self.now_threads.write().await; - w_now_threads.insert(thread_pubkey); - drop(w_now_threads); + let mut w_activated_threads = self.activated_threads.write().await; + w_activated_threads.insert(thread_pubkey); + drop(w_activated_threads); } Trigger::Slot { slot } => { let mut w_slot_threads = self.slot_threads.write().await; @@ -364,6 +422,46 @@ impl ThreadObserver { }); drop(w_pyth_threads); } + Trigger::Token { token_account } => { + let mut w_token_threads = self.token_threads.write().await; + w_token_threads + .entry(token_account) + .and_modify(|v| { + v.insert(thread_pubkey); + }) + .or_insert_with(|| { + let mut v = HashSet::new(); + v.insert(thread_pubkey); + v + }); + drop(w_token_threads); + } + Trigger::TokenLimit { + token_account, + equality, + limit, + } => { + let mut w_token_limit_threads = self.token_limit_threads.write().await; + w_token_limit_threads + .entry(token_account) + .and_modify(|v| { + v.insert(TokenLimitThread { + thread_pubkey, + equality: equality.clone(), + limit, + }); + }) + .or_insert_with(|| { + let mut v = HashSet::new(); + v.insert(TokenLimitThread { + thread_pubkey, + equality, + limit, + }); + v + }); + drop(w_token_limit_threads); + } } } diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index f978c24a1..2311f9c98 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -133,6 +133,15 @@ impl GeyserPlugin for ClockworkPlugin { .await .ok(); } + AccountUpdateEvent::TokenAccount { token_account } => { + inner + .observers + .thread + .clone() + .observe_token_account(account_pubkey, token_account) + .await + .ok(); + } } } Ok(()) diff --git a/programs/thread/Cargo.toml b/programs/thread/Cargo.toml index 26444929c..d7bb9d0b8 100644 --- a/programs/thread/Cargo.toml +++ b/programs/thread/Cargo.toml @@ -23,11 +23,13 @@ default = [] [dependencies] anchor-lang = "0.27.0" +anchor-spl = { features = ["token"], version = "0.27.0" } chrono = { version = "0.4.19", default-features = false, features = ["alloc"] } clockwork-cron = { path = "../../cron", version = "=2.0.17" } clockwork-network-program = { path = "../network", features = ["cpi"], version = "=2.0.17" } clockwork-thread-program-v1 = { path = "v1", version = "=1.4.4" } clockwork-utils = { path = "../../utils", version = "=2.0.17" } pyth-sdk-solana = "0.7.1" +# spl-token = { version = "3.5.0", features = ["no-entrypoint"] } static-pubkey = "1.0.3" version = "3.0.0" diff --git a/programs/thread/src/instructions/thread_kickoff.rs b/programs/thread/src/instructions/thread_kickoff.rs index 2c94e0c35..f04b0fc92 100644 --- a/programs/thread/src/instructions/thread_kickoff.rs +++ b/programs/thread/src/instructions/thread_kickoff.rs @@ -5,6 +5,7 @@ use std::{ }; use anchor_lang::prelude::*; +use anchor_spl::token::TokenAccount; use chrono::{DateTime, NaiveDateTime, Utc}; use clockwork_cron::Schedule; use clockwork_network_program::state::{Worker, WorkerAccount}; @@ -249,6 +250,100 @@ pub fn handler(ctx: Context) -> Result<()> { } } } + Trigger::Token { + token_account: token_account_pubkey, + } => { + // Verify token balance has changed. + match ctx.remaining_accounts.first() { + None => { + return Err(ClockworkError::TriggerConditionFailed.into()); + } + Some(account_info) => { + require!( + token_account_pubkey.eq(account_info.key), + ClockworkError::TriggerConditionFailed + ); + let mut data: &[u8] = &account_info.try_borrow_data()?; + let token_account = TokenAccount::try_deserialize_unchecked(&mut data)?; + + // Verify the current balance is different than the last observed balance. + if let Some(exec_context) = thread.exec_context { + match exec_context.trigger_context { + TriggerContext::Token { + amount: prior_amount, + } => { + require!( + token_account.amount.ne(&prior_amount), + ClockworkError::TriggerConditionFailed + ) + } + _ => return Err(ClockworkError::InvalidThreadState.into()), + } + } + thread.exec_context = Some(ExecContext { + exec_index: 0, + execs_since_reimbursement: 0, + execs_since_slot: 0, + last_exec_at: clock.slot, + trigger_context: TriggerContext::Token { + amount: token_account.amount, + }, + }); + } + } + } + Trigger::TokenLimit { + token_account: token_account_pubkey, + equality, + limit, + } => { + // Verify token balance limit has been reached. + match ctx.remaining_accounts.first() { + None => { + return Err(ClockworkError::TriggerConditionFailed.into()); + } + Some(account_info) => { + require!( + token_account_pubkey.eq(account_info.key), + ClockworkError::TriggerConditionFailed + ); + let mut data: &[u8] = &account_info.try_borrow_data()?; + let token_account = TokenAccount::try_deserialize_unchecked(&mut data)?; + match equality { + Equality::GreaterThanOrEqual => { + require!( + token_account.amount.ge(&limit), + ClockworkError::TriggerConditionFailed + ); + thread.exec_context = Some(ExecContext { + exec_index: 0, + execs_since_reimbursement: 0, + execs_since_slot: 0, + last_exec_at: clock.slot, + trigger_context: TriggerContext::TokenLimit { + amount: token_account.amount, + }, + }); + } + Equality::LessThanOrEqual => { + require!( + token_account.amount.le(&limit), + ClockworkError::TriggerConditionFailed + ); + thread.exec_context = Some(ExecContext { + exec_index: 0, + execs_since_reimbursement: 0, + execs_since_slot: 0, + last_exec_at: clock.slot, + trigger_context: TriggerContext::TokenLimit { + amount: token_account.amount, + }, + }); + } + } + } + } + } } // If we make it here, the trigger is active. Update the next instruction and be done. diff --git a/programs/thread/src/state/thread.rs b/programs/thread/src/state/thread.rs index d01f7b126..667e8efb0 100644 --- a/programs/thread/src/state/thread.rs +++ b/programs/thread/src/state/thread.rs @@ -148,6 +148,12 @@ pub enum TriggerContext { /// The trigger context for threads with a "pyth" trigger. Pyth { price: i64 }, + + /// The trigger context for threads with a "token" trigger. + Token { amount: u64 }, + + /// The trigger context for threads with a "token limit" trigger. + TokenLimit { amount: u64 }, } /// The properties of threads which are updatable. diff --git a/utils/src/thread.rs b/utils/src/thread.rs index 5fe237b87..43623df9d 100644 --- a/utils/src/thread.rs +++ b/utils/src/thread.rs @@ -83,11 +83,27 @@ pub enum Trigger { Pyth { /// The address of the price feed to monitor. price_feed: Pubkey, - /// The equality operator (gte or lte) used to compare prices. + /// The equality operator (gte or lte) used to compare prices. equality: Equality, - /// The limit price to compare the Pyth feed to. + /// The limit price to compare the Pyth feed to. limit: i64, }, + + /// Allows a thread to be kicked off whenever a token balance changes. + Token { + /// The address of the token account. + token_account: Pubkey, + }, + + /// Allows a thread to be kicked off according to a token amount. + TokenLimit { + /// The address of the token account. + token_account: Pubkey, + /// The equality operator (gte or lte) used to compare balances. + equality: Equality, + /// The limit to compare the token balance to. + limit: u64, + }, } /// Operators for describing how to compare two values to one another.