Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Begin implementing token balance trigger #264

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions plugin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ name = "clockwork_plugin"

[dependencies]
anchor-lang = "0.27.0"
anchor-spl = { features = ["token"], version = "0.27.0" }
async_once = "0.2.6"
async-trait = "0.1.64"
bincode = "1.3.3"
Expand Down
15 changes: 14 additions & 1 deletion plugin/src/events.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anchor_lang::{prelude::AccountInfo, AccountDeserialize, Discriminator};
use anchor_spl::token::TokenAccount;
use bincode::deserialize;
use clockwork_thread_program::state::{Thread as ThreadV2, VersionedThread};
use clockwork_thread_program_v1::state::Thread as ThreadV1;
Expand All @@ -10,7 +11,7 @@ use solana_geyser_plugin_interface::geyser_plugin_interface::{
use solana_program::{clock::Clock, pubkey::Pubkey, sysvar};
use static_pubkey::static_pubkey;

static PYTH_ORACLE_PROGRAM_ID_MAINNET: Pubkey =
static PYTH_ORACLE_PROGRAM_ID_MAINNET: Pubkey =
static_pubkey!("FsJ3A3u2vn5cTVofAjvy6y5kwABJAqYWpe4975bi2epH");
static PYTH_ORACLE_PROGRAM_ID_DEVNET: Pubkey =
static_pubkey!("gSbePebfvPy7tRqimPoVecS2UsBvYv46ynrzWocc92s");
Expand All @@ -20,6 +21,7 @@ pub enum AccountUpdateEvent {
Clock { clock: Clock },
Thread { thread: VersionedThread },
PriceFeed { price_feed: PriceFeed },
TokenAccount { token_account: TokenAccount },
Webhook { webhook: Webhook },
}

Expand Down Expand Up @@ -96,6 +98,17 @@ impl TryFrom<&mut ReplicaAccountInfo<'_>> for AccountUpdateEvent {
return Ok(AccountUpdateEvent::PriceFeed { price_feed });
}

// If the account belongs to the token program, attempt to parse it.
if owner_pubkey.eq(&anchor_spl::token::ID) {
let token_account =
TokenAccount::try_deserialize(&mut account_info.data).map_err(|_| {
GeyserPluginError::AccountsUpdateError {
msg: "Failed to parse SPL token account".into(),
}
})?;
return Ok(AccountUpdateEvent::TokenAccount { token_account });
}

// If the account belongs to the webhook program, parse in
if owner_pubkey.eq(&clockwork_webhook_program::ID) && account_info.data.len() > 8 {
return Ok(AccountUpdateEvent::Webhook {
Expand Down
148 changes: 123 additions & 25 deletions plugin/src/observers/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ use std::{
sync::{atomic::AtomicU64, Arc},
};

use anchor_spl::token::TokenAccount;
use chrono::{DateTime, NaiveDateTime, Utc};
use clockwork_cron::Schedule;
use clockwork_thread_program::state::{Trigger, TriggerContext, Equality, VersionedThread};
use clockwork_thread_program::state::{Equality, Trigger, TriggerContext, VersionedThread};
use log::info;
use pyth_sdk_solana::PriceFeed;
use solana_geyser_plugin_interface::geyser_plugin_interface::{
Expand All @@ -20,6 +21,9 @@ pub struct ThreadObserver {
// Map from slot numbers to the sysvar clock data for that slot.
pub clocks: RwLock<HashMap<u64, Clock>>,

// The set of threads that have an active trigger condition this slot.
pub activated_threads: RwLock<HashSet<Pubkey>>,

// Integer tracking the current epoch.
pub current_epoch: AtomicU64,

Expand All @@ -31,9 +35,6 @@ pub struct ThreadObserver {
// Map from unix timestamps to the list of threads scheduled for that moment.
pub cron_threads: RwLock<HashMap<i64, HashSet<Pubkey>>>,

// The set of threads with a now trigger.
pub now_threads: RwLock<HashSet<Pubkey>>,

// The set of threads with a slot trigger.
pub slot_threads: RwLock<HashMap<u64, HashSet<Pubkey>>>,

Expand All @@ -43,6 +44,12 @@ pub struct ThreadObserver {
// The set of threads with a pyth trigger.
pub pyth_threads: RwLock<HashMap<Pubkey, HashSet<PythThread>>>,

// The set of threads with a token trigger.
pub token_threads: RwLock<HashMap<Pubkey, HashSet<Pubkey>>>,

// The set of threads with a token limit trigger.
pub token_limit_threads: RwLock<HashMap<Pubkey, HashSet<TokenLimitThread>>>,

// The set of accounts that have updated.
pub updated_accounts: RwLock<HashSet<Pubkey>>,
}
Expand All @@ -54,17 +61,26 @@ pub struct PythThread {
pub limit: i64,
}

#[derive(Eq, Hash, PartialEq)]
pub struct TokenLimitThread {
pub thread_pubkey: Pubkey,
pub equality: Equality,
pub limit: u64,
}

impl ThreadObserver {
pub fn new() -> Self {
Self {
clocks: RwLock::new(HashMap::new()),
current_epoch: AtomicU64::new(0),
account_threads: RwLock::new(HashMap::new()),
cron_threads: RwLock::new(HashMap::new()),
now_threads: RwLock::new(HashSet::new()),
activated_threads: RwLock::new(HashSet::new()),
slot_threads: RwLock::new(HashMap::new()),
epoch_threads: RwLock::new(HashMap::new()),
pyth_threads: RwLock::new(HashMap::new()),
token_threads: RwLock::new(HashMap::new()),
token_limit_threads: RwLock::new(HashMap::new()),
updated_accounts: RwLock::new(HashSet::new()),
}
}
Expand Down Expand Up @@ -139,12 +155,12 @@ impl ThreadObserver {
drop(w_epoch_threads);

// Get the set of immediate threads.
let mut w_now_threads = self.now_threads.write().await;
w_now_threads.iter().for_each(|pubkey| {
let mut w_activated_threads = self.activated_threads.write().await;
w_activated_threads.iter().for_each(|pubkey| {
executable_threads.insert(*pubkey);
});
w_now_threads.clear();
drop(w_now_threads);
w_activated_threads.clear();
drop(w_activated_threads);

Ok(executable_threads)
}
Expand Down Expand Up @@ -187,9 +203,9 @@ impl ThreadObserver {
.price
.ge(&pyth_thread.limit)
{
let mut w_now_threads = self.now_threads.write().await;
w_now_threads.insert(pyth_thread.thread_pubkey);
drop(w_now_threads);
let mut w_activated_threads = self.activated_threads.write().await;
w_activated_threads.insert(pyth_thread.thread_pubkey);
drop(w_activated_threads);
}
}
Equality::LessThanOrEqual => {
Expand All @@ -198,9 +214,9 @@ impl ThreadObserver {
.price
.le(&pyth_thread.limit)
{
let mut w_now_threads = self.now_threads.write().await;
w_now_threads.insert(pyth_thread.thread_pubkey);
drop(w_now_threads);
let mut w_activated_threads = self.activated_threads.write().await;
w_activated_threads.insert(pyth_thread.thread_pubkey);
drop(w_activated_threads);
}
}
}
Expand All @@ -210,6 +226,48 @@ impl ThreadObserver {
Ok(())
}

pub async fn observe_token_account(
self: Arc<Self>,
account_pubkey: Pubkey,
token_account: TokenAccount,
) -> PluginResult<()> {
// Queue up the token threads.
let r_token_threads = self.token_threads.read().await;
if let Some(token_threads) = r_token_threads.get(&account_pubkey) {
let mut w_activated_threads = self.activated_threads.write().await;
for token_thread in token_threads {
w_activated_threads.insert(*token_thread);
}
drop(w_activated_threads);
}
drop(r_token_threads);

// Queue up the token limit threads.
let r_token_limit_threads = self.token_limit_threads.read().await;
if let Some(token_limit_threads) = r_token_limit_threads.get(&account_pubkey) {
for token_limit_thread in token_limit_threads {
match token_limit_thread.equality {
Equality::GreaterThanOrEqual => {
if token_account.amount.ge(&token_limit_thread.limit) {
let mut w_activated_threads = self.activated_threads.write().await;
w_activated_threads.insert(token_limit_thread.thread_pubkey);
drop(w_activated_threads);
}
}
Equality::LessThanOrEqual => {
if token_account.amount.le(&token_limit_thread.limit) {
let mut w_activated_threads = self.activated_threads.write().await;
w_activated_threads.insert(token_limit_thread.thread_pubkey);
drop(w_activated_threads);
}
}
}
}
}
drop(r_token_limit_threads);
Ok(())
}

pub async fn observe_thread(
self: Arc<Self>,
thread: VersionedThread,
Expand All @@ -224,9 +282,9 @@ impl ThreadObserver {
info!("Indexing thread: {:?} slot: {}", thread_pubkey, slot);
if thread.next_instruction().is_some() {
// If the thread has a next instruction, index it as executable.
let mut w_now_threads = self.now_threads.write().await;
w_now_threads.insert(thread_pubkey);
drop(w_now_threads);
let mut w_activated_threads = self.activated_threads.write().await;
w_activated_threads.insert(thread_pubkey);
drop(w_activated_threads);
} else {
// Otherwise, index the thread according to its trigger type.
match thread.trigger() {
Expand All @@ -250,10 +308,10 @@ impl ThreadObserver {
drop(w_account_threads);

// Threads with account triggers might be immediately executable,
// Thus, we should attempt to execute these threads right away without for an account update.
let mut w_now_threads = self.now_threads.write().await;
w_now_threads.insert(thread_pubkey);
drop(w_now_threads);
// Thus, we should attempt to execute these threads right away without for an account update.
let mut w_activated_threads = self.activated_threads.write().await;
w_activated_threads.insert(thread_pubkey);
drop(w_activated_threads);
}
Trigger::Cron {
schedule,
Expand Down Expand Up @@ -306,9 +364,9 @@ impl ThreadObserver {
drop(w_cron_threads);
}
Trigger::Now => {
let mut w_now_threads = self.now_threads.write().await;
w_now_threads.insert(thread_pubkey);
drop(w_now_threads);
let mut w_activated_threads = self.activated_threads.write().await;
w_activated_threads.insert(thread_pubkey);
drop(w_activated_threads);
}
Trigger::Slot { slot } => {
let mut w_slot_threads = self.slot_threads.write().await;
Expand Down Expand Up @@ -364,6 +422,46 @@ impl ThreadObserver {
});
drop(w_pyth_threads);
}
Trigger::Token { token_account } => {
let mut w_token_threads = self.token_threads.write().await;
w_token_threads
.entry(token_account)
.and_modify(|v| {
v.insert(thread_pubkey);
})
.or_insert_with(|| {
let mut v = HashSet::new();
v.insert(thread_pubkey);
v
});
drop(w_token_threads);
}
Trigger::TokenLimit {
token_account,
equality,
limit,
} => {
let mut w_token_limit_threads = self.token_limit_threads.write().await;
w_token_limit_threads
.entry(token_account)
.and_modify(|v| {
v.insert(TokenLimitThread {
thread_pubkey,
equality: equality.clone(),
limit,
});
})
.or_insert_with(|| {
let mut v = HashSet::new();
v.insert(TokenLimitThread {
thread_pubkey,
equality,
limit,
});
v
});
drop(w_token_limit_threads);
}
}
}

Expand Down
9 changes: 9 additions & 0 deletions plugin/src/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,15 @@ impl GeyserPlugin for ClockworkPlugin {
.await
.ok();
}
AccountUpdateEvent::TokenAccount { token_account } => {
inner
.observers
.thread
.clone()
.observe_token_account(account_pubkey, token_account)
.await
.ok();
}
}
}
Ok(())
Expand Down
2 changes: 2 additions & 0 deletions programs/thread/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ default = []

[dependencies]
anchor-lang = "0.27.0"
anchor-spl = { features = ["token"], version = "0.27.0" }
chrono = { version = "0.4.19", default-features = false, features = ["alloc"] }
clockwork-cron = { path = "../../cron", version = "=2.0.17" }
clockwork-network-program = { path = "../network", features = ["cpi"], version = "=2.0.17" }
clockwork-thread-program-v1 = { path = "v1", version = "=1.4.4" }
clockwork-utils = { path = "../../utils", version = "=2.0.17" }
pyth-sdk-solana = "0.7.1"
# spl-token = { version = "3.5.0", features = ["no-entrypoint"] }
static-pubkey = "1.0.3"
version = "3.0.0"
Loading