From f826f4fb147cef52623185f21a33489aee261c42 Mon Sep 17 00:00:00 2001 From: Nick Garfield Date: Thu, 1 Jun 2023 21:27:17 +0000 Subject: [PATCH 1/2] Begin implementing token balance trigger --- Cargo.lock | 2 + plugin/Cargo.toml | 1 + plugin/src/events.rs | 15 +++- plugin/src/observers/thread.rs | 72 ++++++++++++++++++- plugin/src/plugin.rs | 9 +++ programs/thread/Cargo.toml | 2 + .../thread/src/instructions/thread_kickoff.rs | 53 ++++++++++++++ programs/thread/src/state/thread.rs | 3 + utils/src/thread.rs | 14 +++- 9 files changed, 166 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 657fa0005..3df00c759 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1206,6 +1206,7 @@ name = "clockwork-thread-program" version = "2.0.17" dependencies = [ "anchor-lang", + "anchor-spl", "chrono", "clockwork-cron", "clockwork-network-program", @@ -1250,6 +1251,7 @@ name = "clockwork_plugin" version = "2.0.17" dependencies = [ "anchor-lang", + "anchor-spl", "async-trait", "async_once", "bincode", diff --git a/plugin/Cargo.toml b/plugin/Cargo.toml index 55e3dd2c5..165e603c3 100644 --- a/plugin/Cargo.toml +++ b/plugin/Cargo.toml @@ -20,6 +20,7 @@ name = "clockwork_plugin" [dependencies] anchor-lang = "0.27.0" +anchor-spl = { features = ["token"], version = "0.27.0" } async_once = "0.2.6" async-trait = "0.1.64" bincode = "1.3.3" diff --git a/plugin/src/events.rs b/plugin/src/events.rs index 7c8278fe6..34758ec6e 100644 --- a/plugin/src/events.rs +++ b/plugin/src/events.rs @@ -1,4 +1,5 @@ use anchor_lang::{prelude::AccountInfo, AccountDeserialize, Discriminator}; +use anchor_spl::token::TokenAccount; use bincode::deserialize; use clockwork_thread_program::state::{Thread as ThreadV2, VersionedThread}; use clockwork_thread_program_v1::state::Thread as ThreadV1; @@ -10,7 +11,7 @@ use solana_geyser_plugin_interface::geyser_plugin_interface::{ use solana_program::{clock::Clock, pubkey::Pubkey, sysvar}; use static_pubkey::static_pubkey; -static PYTH_ORACLE_PROGRAM_ID_MAINNET: Pubkey = +static PYTH_ORACLE_PROGRAM_ID_MAINNET: Pubkey = static_pubkey!("FsJ3A3u2vn5cTVofAjvy6y5kwABJAqYWpe4975bi2epH"); static PYTH_ORACLE_PROGRAM_ID_DEVNET: Pubkey = static_pubkey!("gSbePebfvPy7tRqimPoVecS2UsBvYv46ynrzWocc92s"); @@ -20,6 +21,7 @@ pub enum AccountUpdateEvent { Clock { clock: Clock }, Thread { thread: VersionedThread }, PriceFeed { price_feed: PriceFeed }, + TokenAccount { token_account: TokenAccount }, Webhook { webhook: Webhook }, } @@ -96,6 +98,17 @@ impl TryFrom<&mut ReplicaAccountInfo<'_>> for AccountUpdateEvent { return Ok(AccountUpdateEvent::PriceFeed { price_feed }); } + // If the account belongs to the token program, attempt to parse it. + if owner_pubkey.eq(&anchor_spl::token::ID) { + let token_account = + TokenAccount::try_deserialize(&mut account_info.data).map_err(|_| { + GeyserPluginError::AccountsUpdateError { + msg: "Failed to parse SPL token account".into(), + } + })?; + return Ok(AccountUpdateEvent::TokenAccount { token_account }); + } + // If the account belongs to the webhook program, parse in if owner_pubkey.eq(&clockwork_webhook_program::ID) && account_info.data.len() > 8 { return Ok(AccountUpdateEvent::Webhook { diff --git a/plugin/src/observers/thread.rs b/plugin/src/observers/thread.rs index be8114c17..85a56315d 100644 --- a/plugin/src/observers/thread.rs +++ b/plugin/src/observers/thread.rs @@ -5,9 +5,10 @@ use std::{ sync::{atomic::AtomicU64, Arc}, }; +use anchor_spl::token::TokenAccount; use chrono::{DateTime, NaiveDateTime, Utc}; use clockwork_cron::Schedule; -use clockwork_thread_program::state::{Trigger, TriggerContext, Equality, VersionedThread}; +use clockwork_thread_program::state::{Equality, Trigger, TriggerContext, VersionedThread}; use log::info; use pyth_sdk_solana::PriceFeed; use solana_geyser_plugin_interface::geyser_plugin_interface::{ @@ -43,6 +44,9 @@ pub struct ThreadObserver { // The set of threads with a pyth trigger. pub pyth_threads: RwLock>>, + // The set of threads with a token trigger. + pub token_threads: RwLock>>, + // The set of accounts that have updated. pub updated_accounts: RwLock>, } @@ -54,6 +58,13 @@ pub struct PythThread { pub limit: i64, } +#[derive(Eq, Hash, PartialEq)] +pub struct TokenThread { + pub thread_pubkey: Pubkey, + pub equality: Equality, + pub limit: u64, +} + impl ThreadObserver { pub fn new() -> Self { Self { @@ -65,6 +76,7 @@ impl ThreadObserver { slot_threads: RwLock::new(HashMap::new()), epoch_threads: RwLock::new(HashMap::new()), pyth_threads: RwLock::new(HashMap::new()), + token_threads: RwLock::new(HashMap::new()), updated_accounts: RwLock::new(HashSet::new()), } } @@ -210,6 +222,36 @@ impl ThreadObserver { Ok(()) } + pub async fn observe_token_account( + self: Arc, + account_pubkey: Pubkey, + token_account: TokenAccount, + ) -> PluginResult<()> { + let r_token_threads = self.token_threads.read().await; + if let Some(token_threads) = r_token_threads.get(&account_pubkey) { + for token_thread in token_threads { + match token_thread.equality { + Equality::GreaterThanOrEqual => { + if token_account.amount.ge(&token_thread.limit) { + let mut w_now_threads = self.now_threads.write().await; + w_now_threads.insert(token_thread.thread_pubkey); + drop(w_now_threads); + } + } + Equality::LessThanOrEqual => { + if token_account.amount.le(&token_thread.limit) { + let mut w_now_threads = self.now_threads.write().await; + w_now_threads.insert(token_thread.thread_pubkey); + drop(w_now_threads); + } + } + } + } + } + drop(r_token_threads); + Ok(()) + } + pub async fn observe_thread( self: Arc, thread: VersionedThread, @@ -250,7 +292,7 @@ impl ThreadObserver { drop(w_account_threads); // Threads with account triggers might be immediately executable, - // Thus, we should attempt to execute these threads right away without for an account update. + // Thus, we should attempt to execute these threads right away without for an account update. let mut w_now_threads = self.now_threads.write().await; w_now_threads.insert(thread_pubkey); drop(w_now_threads); @@ -364,6 +406,32 @@ impl ThreadObserver { }); drop(w_pyth_threads); } + Trigger::Token { + token_account, + equality, + limit, + } => { + let mut w_token_threads = self.token_threads.write().await; + w_token_threads + .entry(token_account) + .and_modify(|v| { + v.insert(TokenThread { + thread_pubkey, + equality: equality.clone(), + limit, + }); + }) + .or_insert_with(|| { + let mut v = HashSet::new(); + v.insert(TokenThread { + thread_pubkey, + equality, + limit, + }); + v + }); + drop(w_token_threads); + } } } diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index f978c24a1..2311f9c98 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -133,6 +133,15 @@ impl GeyserPlugin for ClockworkPlugin { .await .ok(); } + AccountUpdateEvent::TokenAccount { token_account } => { + inner + .observers + .thread + .clone() + .observe_token_account(account_pubkey, token_account) + .await + .ok(); + } } } Ok(()) diff --git a/programs/thread/Cargo.toml b/programs/thread/Cargo.toml index 26444929c..d7bb9d0b8 100644 --- a/programs/thread/Cargo.toml +++ b/programs/thread/Cargo.toml @@ -23,11 +23,13 @@ default = [] [dependencies] anchor-lang = "0.27.0" +anchor-spl = { features = ["token"], version = "0.27.0" } chrono = { version = "0.4.19", default-features = false, features = ["alloc"] } clockwork-cron = { path = "../../cron", version = "=2.0.17" } clockwork-network-program = { path = "../network", features = ["cpi"], version = "=2.0.17" } clockwork-thread-program-v1 = { path = "v1", version = "=1.4.4" } clockwork-utils = { path = "../../utils", version = "=2.0.17" } pyth-sdk-solana = "0.7.1" +# spl-token = { version = "3.5.0", features = ["no-entrypoint"] } static-pubkey = "1.0.3" version = "3.0.0" diff --git a/programs/thread/src/instructions/thread_kickoff.rs b/programs/thread/src/instructions/thread_kickoff.rs index 2c94e0c35..76ef6fe5c 100644 --- a/programs/thread/src/instructions/thread_kickoff.rs +++ b/programs/thread/src/instructions/thread_kickoff.rs @@ -5,6 +5,7 @@ use std::{ }; use anchor_lang::prelude::*; +use anchor_spl::token::TokenAccount; use chrono::{DateTime, NaiveDateTime, Utc}; use clockwork_cron::Schedule; use clockwork_network_program::state::{Worker, WorkerAccount}; @@ -249,6 +250,58 @@ pub fn handler(ctx: Context) -> Result<()> { } } } + Trigger::Token { + token_account: token_account_pubkey, + equality, + limit, + } => { + // Verify token balance limit has been reached. + match ctx.remaining_accounts.first() { + None => { + return Err(ClockworkError::TriggerConditionFailed.into()); + } + Some(account_info) => { + require!( + token_account_pubkey.eq(account_info.key), + ClockworkError::TriggerConditionFailed + ); + let mut data: &[u8] = &account_info.try_borrow_data()?; + let token_account = TokenAccount::try_deserialize_unchecked(&mut data)?; + match equality { + Equality::GreaterThanOrEqual => { + require!( + token_account.amount.ge(&limit), + ClockworkError::TriggerConditionFailed + ); + thread.exec_context = Some(ExecContext { + exec_index: 0, + execs_since_reimbursement: 0, + execs_since_slot: 0, + last_exec_at: clock.slot, + trigger_context: TriggerContext::Token { + amount: token_account.amount, + }, + }); + } + Equality::LessThanOrEqual => { + require!( + token_account.amount.le(&limit), + ClockworkError::TriggerConditionFailed + ); + thread.exec_context = Some(ExecContext { + exec_index: 0, + execs_since_reimbursement: 0, + execs_since_slot: 0, + last_exec_at: clock.slot, + trigger_context: TriggerContext::Token { + amount: token_account.amount, + }, + }); + } + } + } + } + } } // If we make it here, the trigger is active. Update the next instruction and be done. diff --git a/programs/thread/src/state/thread.rs b/programs/thread/src/state/thread.rs index d01f7b126..dd5dc2735 100644 --- a/programs/thread/src/state/thread.rs +++ b/programs/thread/src/state/thread.rs @@ -148,6 +148,9 @@ pub enum TriggerContext { /// The trigger context for threads with a "pyth" trigger. Pyth { price: i64 }, + + /// The trigger context for threads with a "token amount" trigger. + Token { amount: u64 }, } /// The properties of threads which are updatable. diff --git a/utils/src/thread.rs b/utils/src/thread.rs index 5fe237b87..50371a22d 100644 --- a/utils/src/thread.rs +++ b/utils/src/thread.rs @@ -83,11 +83,21 @@ pub enum Trigger { Pyth { /// The address of the price feed to monitor. price_feed: Pubkey, - /// The equality operator (gte or lte) used to compare prices. + /// The equality operator (gte or lte) used to compare prices. equality: Equality, - /// The limit price to compare the Pyth feed to. + /// The limit price to compare the Pyth feed to. limit: i64, }, + + /// Allows a thread to be kicked off according to a token amount. + Token { + /// The address of the token account. + token_account: Pubkey, + /// The equality operator (gte or lte) used to compare balances. + equality: Equality, + /// The limit to compare the token balance to. + limit: u64, + }, } /// Operators for describing how to compare two values to one another. From cc6a4ae9a6f4ded3f0867f521454532dc62b6c8c Mon Sep 17 00:00:00 2001 From: Nick Garfield Date: Thu, 1 Jun 2023 22:17:38 +0000 Subject: [PATCH 2/2] Split between Token and TokenLimit triggers --- plugin/src/observers/thread.rs | 112 +++++++++++------- .../thread/src/instructions/thread_kickoff.rs | 46 ++++++- programs/thread/src/state/thread.rs | 5 +- utils/src/thread.rs | 8 +- 4 files changed, 126 insertions(+), 45 deletions(-) diff --git a/plugin/src/observers/thread.rs b/plugin/src/observers/thread.rs index 85a56315d..9c6bf885d 100644 --- a/plugin/src/observers/thread.rs +++ b/plugin/src/observers/thread.rs @@ -21,6 +21,9 @@ pub struct ThreadObserver { // Map from slot numbers to the sysvar clock data for that slot. pub clocks: RwLock>, + // The set of threads that have an active trigger condition this slot. + pub activated_threads: RwLock>, + // Integer tracking the current epoch. pub current_epoch: AtomicU64, @@ -32,9 +35,6 @@ pub struct ThreadObserver { // Map from unix timestamps to the list of threads scheduled for that moment. pub cron_threads: RwLock>>, - // The set of threads with a now trigger. - pub now_threads: RwLock>, - // The set of threads with a slot trigger. pub slot_threads: RwLock>>, @@ -45,7 +45,10 @@ pub struct ThreadObserver { pub pyth_threads: RwLock>>, // The set of threads with a token trigger. - pub token_threads: RwLock>>, + pub token_threads: RwLock>>, + + // The set of threads with a token limit trigger. + pub token_limit_threads: RwLock>>, // The set of accounts that have updated. pub updated_accounts: RwLock>, @@ -59,7 +62,7 @@ pub struct PythThread { } #[derive(Eq, Hash, PartialEq)] -pub struct TokenThread { +pub struct TokenLimitThread { pub thread_pubkey: Pubkey, pub equality: Equality, pub limit: u64, @@ -72,11 +75,12 @@ impl ThreadObserver { current_epoch: AtomicU64::new(0), account_threads: RwLock::new(HashMap::new()), cron_threads: RwLock::new(HashMap::new()), - now_threads: RwLock::new(HashSet::new()), + activated_threads: RwLock::new(HashSet::new()), slot_threads: RwLock::new(HashMap::new()), epoch_threads: RwLock::new(HashMap::new()), pyth_threads: RwLock::new(HashMap::new()), token_threads: RwLock::new(HashMap::new()), + token_limit_threads: RwLock::new(HashMap::new()), updated_accounts: RwLock::new(HashSet::new()), } } @@ -151,12 +155,12 @@ impl ThreadObserver { drop(w_epoch_threads); // Get the set of immediate threads. - let mut w_now_threads = self.now_threads.write().await; - w_now_threads.iter().for_each(|pubkey| { + let mut w_activated_threads = self.activated_threads.write().await; + w_activated_threads.iter().for_each(|pubkey| { executable_threads.insert(*pubkey); }); - w_now_threads.clear(); - drop(w_now_threads); + w_activated_threads.clear(); + drop(w_activated_threads); Ok(executable_threads) } @@ -199,9 +203,9 @@ impl ThreadObserver { .price .ge(&pyth_thread.limit) { - let mut w_now_threads = self.now_threads.write().await; - w_now_threads.insert(pyth_thread.thread_pubkey); - drop(w_now_threads); + let mut w_activated_threads = self.activated_threads.write().await; + w_activated_threads.insert(pyth_thread.thread_pubkey); + drop(w_activated_threads); } } Equality::LessThanOrEqual => { @@ -210,9 +214,9 @@ impl ThreadObserver { .price .le(&pyth_thread.limit) { - let mut w_now_threads = self.now_threads.write().await; - w_now_threads.insert(pyth_thread.thread_pubkey); - drop(w_now_threads); + let mut w_activated_threads = self.activated_threads.write().await; + w_activated_threads.insert(pyth_thread.thread_pubkey); + drop(w_activated_threads); } } } @@ -227,28 +231,40 @@ impl ThreadObserver { account_pubkey: Pubkey, token_account: TokenAccount, ) -> PluginResult<()> { + // Queue up the token threads. let r_token_threads = self.token_threads.read().await; if let Some(token_threads) = r_token_threads.get(&account_pubkey) { + let mut w_activated_threads = self.activated_threads.write().await; for token_thread in token_threads { - match token_thread.equality { + w_activated_threads.insert(*token_thread); + } + drop(w_activated_threads); + } + drop(r_token_threads); + + // Queue up the token limit threads. + let r_token_limit_threads = self.token_limit_threads.read().await; + if let Some(token_limit_threads) = r_token_limit_threads.get(&account_pubkey) { + for token_limit_thread in token_limit_threads { + match token_limit_thread.equality { Equality::GreaterThanOrEqual => { - if token_account.amount.ge(&token_thread.limit) { - let mut w_now_threads = self.now_threads.write().await; - w_now_threads.insert(token_thread.thread_pubkey); - drop(w_now_threads); + if token_account.amount.ge(&token_limit_thread.limit) { + let mut w_activated_threads = self.activated_threads.write().await; + w_activated_threads.insert(token_limit_thread.thread_pubkey); + drop(w_activated_threads); } } Equality::LessThanOrEqual => { - if token_account.amount.le(&token_thread.limit) { - let mut w_now_threads = self.now_threads.write().await; - w_now_threads.insert(token_thread.thread_pubkey); - drop(w_now_threads); + if token_account.amount.le(&token_limit_thread.limit) { + let mut w_activated_threads = self.activated_threads.write().await; + w_activated_threads.insert(token_limit_thread.thread_pubkey); + drop(w_activated_threads); } } } } } - drop(r_token_threads); + drop(r_token_limit_threads); Ok(()) } @@ -266,9 +282,9 @@ impl ThreadObserver { info!("Indexing thread: {:?} slot: {}", thread_pubkey, slot); if thread.next_instruction().is_some() { // If the thread has a next instruction, index it as executable. - let mut w_now_threads = self.now_threads.write().await; - w_now_threads.insert(thread_pubkey); - drop(w_now_threads); + let mut w_activated_threads = self.activated_threads.write().await; + w_activated_threads.insert(thread_pubkey); + drop(w_activated_threads); } else { // Otherwise, index the thread according to its trigger type. match thread.trigger() { @@ -293,9 +309,9 @@ impl ThreadObserver { // Threads with account triggers might be immediately executable, // Thus, we should attempt to execute these threads right away without for an account update. - let mut w_now_threads = self.now_threads.write().await; - w_now_threads.insert(thread_pubkey); - drop(w_now_threads); + let mut w_activated_threads = self.activated_threads.write().await; + w_activated_threads.insert(thread_pubkey); + drop(w_activated_threads); } Trigger::Cron { schedule, @@ -348,9 +364,9 @@ impl ThreadObserver { drop(w_cron_threads); } Trigger::Now => { - let mut w_now_threads = self.now_threads.write().await; - w_now_threads.insert(thread_pubkey); - drop(w_now_threads); + let mut w_activated_threads = self.activated_threads.write().await; + w_activated_threads.insert(thread_pubkey); + drop(w_activated_threads); } Trigger::Slot { slot } => { let mut w_slot_threads = self.slot_threads.write().await; @@ -406,16 +422,30 @@ impl ThreadObserver { }); drop(w_pyth_threads); } - Trigger::Token { + Trigger::Token { token_account } => { + let mut w_token_threads = self.token_threads.write().await; + w_token_threads + .entry(token_account) + .and_modify(|v| { + v.insert(thread_pubkey); + }) + .or_insert_with(|| { + let mut v = HashSet::new(); + v.insert(thread_pubkey); + v + }); + drop(w_token_threads); + } + Trigger::TokenLimit { token_account, equality, limit, } => { - let mut w_token_threads = self.token_threads.write().await; - w_token_threads + let mut w_token_limit_threads = self.token_limit_threads.write().await; + w_token_limit_threads .entry(token_account) .and_modify(|v| { - v.insert(TokenThread { + v.insert(TokenLimitThread { thread_pubkey, equality: equality.clone(), limit, @@ -423,14 +453,14 @@ impl ThreadObserver { }) .or_insert_with(|| { let mut v = HashSet::new(); - v.insert(TokenThread { + v.insert(TokenLimitThread { thread_pubkey, equality, limit, }); v }); - drop(w_token_threads); + drop(w_token_limit_threads); } } } diff --git a/programs/thread/src/instructions/thread_kickoff.rs b/programs/thread/src/instructions/thread_kickoff.rs index 76ef6fe5c..f04b0fc92 100644 --- a/programs/thread/src/instructions/thread_kickoff.rs +++ b/programs/thread/src/instructions/thread_kickoff.rs @@ -252,6 +252,48 @@ pub fn handler(ctx: Context) -> Result<()> { } Trigger::Token { token_account: token_account_pubkey, + } => { + // Verify token balance has changed. + match ctx.remaining_accounts.first() { + None => { + return Err(ClockworkError::TriggerConditionFailed.into()); + } + Some(account_info) => { + require!( + token_account_pubkey.eq(account_info.key), + ClockworkError::TriggerConditionFailed + ); + let mut data: &[u8] = &account_info.try_borrow_data()?; + let token_account = TokenAccount::try_deserialize_unchecked(&mut data)?; + + // Verify the current balance is different than the last observed balance. + if let Some(exec_context) = thread.exec_context { + match exec_context.trigger_context { + TriggerContext::Token { + amount: prior_amount, + } => { + require!( + token_account.amount.ne(&prior_amount), + ClockworkError::TriggerConditionFailed + ) + } + _ => return Err(ClockworkError::InvalidThreadState.into()), + } + } + thread.exec_context = Some(ExecContext { + exec_index: 0, + execs_since_reimbursement: 0, + execs_since_slot: 0, + last_exec_at: clock.slot, + trigger_context: TriggerContext::Token { + amount: token_account.amount, + }, + }); + } + } + } + Trigger::TokenLimit { + token_account: token_account_pubkey, equality, limit, } => { @@ -278,7 +320,7 @@ pub fn handler(ctx: Context) -> Result<()> { execs_since_reimbursement: 0, execs_since_slot: 0, last_exec_at: clock.slot, - trigger_context: TriggerContext::Token { + trigger_context: TriggerContext::TokenLimit { amount: token_account.amount, }, }); @@ -293,7 +335,7 @@ pub fn handler(ctx: Context) -> Result<()> { execs_since_reimbursement: 0, execs_since_slot: 0, last_exec_at: clock.slot, - trigger_context: TriggerContext::Token { + trigger_context: TriggerContext::TokenLimit { amount: token_account.amount, }, }); diff --git a/programs/thread/src/state/thread.rs b/programs/thread/src/state/thread.rs index dd5dc2735..667e8efb0 100644 --- a/programs/thread/src/state/thread.rs +++ b/programs/thread/src/state/thread.rs @@ -149,8 +149,11 @@ pub enum TriggerContext { /// The trigger context for threads with a "pyth" trigger. Pyth { price: i64 }, - /// The trigger context for threads with a "token amount" trigger. + /// The trigger context for threads with a "token" trigger. Token { amount: u64 }, + + /// The trigger context for threads with a "token limit" trigger. + TokenLimit { amount: u64 }, } /// The properties of threads which are updatable. diff --git a/utils/src/thread.rs b/utils/src/thread.rs index 50371a22d..43623df9d 100644 --- a/utils/src/thread.rs +++ b/utils/src/thread.rs @@ -89,10 +89,16 @@ pub enum Trigger { limit: i64, }, - /// Allows a thread to be kicked off according to a token amount. + /// Allows a thread to be kicked off whenever a token balance changes. Token { /// The address of the token account. token_account: Pubkey, + }, + + /// Allows a thread to be kicked off according to a token amount. + TokenLimit { + /// The address of the token account. + token_account: Pubkey, /// The equality operator (gte or lte) used to compare balances. equality: Equality, /// The limit to compare the token balance to.