From 7e2f86660058ffd1c97b2ddc43554a5be9e6bf53 Mon Sep 17 00:00:00 2001 From: Corto Date: Mon, 7 Oct 2024 18:56:52 +0800 Subject: [PATCH 01/17] WIP --- .vscode/settings.json | 7 +++++++ plugin/src/plugin.rs | 41 ++++++++++++++++++++++++++++++++++++++ plugin/utils/src/config.rs | 6 ++++++ 3 files changed, 54 insertions(+) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..9d394997 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,7 @@ +{ + "cSpell.words": [ + "pdas", + "RUSTC", + "Sablier" + ] +} \ No newline at end of file diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index 98e7eb57..602fd683 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -1,9 +1,13 @@ use std::{fmt::Debug, sync::Arc}; +use anchor_lang::AccountDeserialize; use log::info; +use sablier_thread_program::state::VersionedThread; +use solana_client::rpc_client::RpcClient; use solana_geyser_plugin_interface::geyser_plugin_interface::{ GeyserPlugin, ReplicaAccountInfoVersions, Result as PluginResult, SlotStatus, }; +use solana_sdk::pubkey::Pubkey; use tokio::runtime::{Builder, Runtime}; use crate::{ @@ -47,6 +51,24 @@ impl GeyserPlugin for SablierPlugin { info!("Loading snapshot..."); let config = PluginConfig::read_from(config_file)?; *self = SablierPlugin::new_from_config(config); + + info!("Fetch existing Thread pdas..."); + let existing_thread_pdas = self.fetch_existing_threads()?; + + info!("Add fetched Thread pdas to observers..."); + let observers = self.inner.observers.clone(); + self.inner.clone().spawn(move |inner| async move { + for (pubkey, thread) in existing_thread_pdas { + observers + .thread + .clone() + .observe_thread(thread, pubkey, 0) + .await + .ok(); + } + Ok(()) + }); + Ok(()) } @@ -142,6 +164,25 @@ impl SablierPlugin { }), } } + + /// Fetch existing threads from the chain, and return them as a list of (pubkey, thread) pairs. + /// Goal of this is to catch up on any existing threads that were created before the plugin was loaded. + fn fetch_existing_threads(&self) -> PluginResult> { + let rpc_client = RpcClient::new(self.inner.config.rpc_url.clone()); + let program_id = sablier_thread_program::ID; + + let accounts = rpc_client.get_program_accounts(&program_id)?; + + accounts + .into_iter() + .filter_map(|(pubkey, account)| { + VersionedThread::try_deserialize(&account.data) + .ok() + .map(|thread| (pubkey, thread)) + }) + .collect::>() + .pipe(Ok) + } } impl Default for SablierPlugin { diff --git a/plugin/utils/src/config.rs b/plugin/utils/src/config.rs index 1236ffcb..002f9f18 100644 --- a/plugin/utils/src/config.rs +++ b/plugin/utils/src/config.rs @@ -17,6 +17,9 @@ pub struct PluginConfig { pub thread_count: usize, pub transaction_timeout_threshold: u64, pub worker_id: u64, + pub rpc_url: String, + pub rpc_ws_url: String, + pub keypair: String, } impl Default for PluginConfig { @@ -27,6 +30,9 @@ impl Default for PluginConfig { transaction_timeout_threshold: DEFAULT_TRANSACTION_TIMEOUT_THRESHOLD, thread_count: DEFAULT_THREAD_COUNT, worker_id: 0, + rpc_url: "".to_string(), + rpc_ws_url: "".to_string(), + keypair: "".to_string(), } } } From e2cb6aa43e4609f54e013c2c2843e42c37a1d82e Mon Sep 17 00:00:00 2001 From: Corto Date: Mon, 7 Oct 2024 18:59:42 +0800 Subject: [PATCH 02/17] Use collect() --- plugin/src/plugin.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index 602fd683..d1682ebc 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -173,15 +173,14 @@ impl SablierPlugin { let accounts = rpc_client.get_program_accounts(&program_id)?; - accounts + Ok(accounts .into_iter() .filter_map(|(pubkey, account)| { VersionedThread::try_deserialize(&account.data) .ok() .map(|thread| (pubkey, thread)) }) - .collect::>() - .pipe(Ok) + .collect()) } } From 6c2e26af64da6b9f78ceb2893a180ff43891160b Mon Sep 17 00:00:00 2001 From: Corto Date: Mon, 7 Oct 2024 19:27:58 +0800 Subject: [PATCH 03/17] Fix plugin error --- plugin/src/plugin.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index d1682ebc..789fe286 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -12,6 +12,7 @@ use tokio::runtime::{Builder, Runtime}; use crate::{ config::PluginConfig, + error::PluginError, events::{AccountUpdate, AccountUpdateEvent}, executors::Executors, observers::Observers, @@ -171,12 +172,14 @@ impl SablierPlugin { let rpc_client = RpcClient::new(self.inner.config.rpc_url.clone()); let program_id = sablier_thread_program::ID; - let accounts = rpc_client.get_program_accounts(&program_id)?; + let accounts = rpc_client + .get_program_accounts(&program_id) + .map_err(|e| PluginError::from(e))?; Ok(accounts .into_iter() .filter_map(|(pubkey, account)| { - VersionedThread::try_deserialize(&account.data) + VersionedThread::try_deserialize(&mut account.data.as_slice()) .ok() .map(|thread| (pubkey, thread)) }) From fb92f84170738cc06723e58e867471bce8911d97 Mon Sep 17 00:00:00 2001 From: Corto Date: Mon, 7 Oct 2024 19:31:43 +0800 Subject: [PATCH 04/17] Use localhost --- plugin/src/plugin.rs | 3 ++- plugin/utils/src/config.rs | 6 ------ 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index 789fe286..e78871b4 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -169,7 +169,8 @@ impl SablierPlugin { /// Fetch existing threads from the chain, and return them as a list of (pubkey, thread) pairs. /// Goal of this is to catch up on any existing threads that were created before the plugin was loaded. fn fetch_existing_threads(&self) -> PluginResult> { - let rpc_client = RpcClient::new(self.inner.config.rpc_url.clone()); + // Using localhost will use the RPC client from the validator. + let rpc_client = RpcClient::new("http://localhost:8899"); let program_id = sablier_thread_program::ID; let accounts = rpc_client diff --git a/plugin/utils/src/config.rs b/plugin/utils/src/config.rs index 002f9f18..1236ffcb 100644 --- a/plugin/utils/src/config.rs +++ b/plugin/utils/src/config.rs @@ -17,9 +17,6 @@ pub struct PluginConfig { pub thread_count: usize, pub transaction_timeout_threshold: u64, pub worker_id: u64, - pub rpc_url: String, - pub rpc_ws_url: String, - pub keypair: String, } impl Default for PluginConfig { @@ -30,9 +27,6 @@ impl Default for PluginConfig { transaction_timeout_threshold: DEFAULT_TRANSACTION_TIMEOUT_THRESHOLD, thread_count: DEFAULT_THREAD_COUNT, worker_id: 0, - rpc_url: "".to_string(), - rpc_ws_url: "".to_string(), - keypair: "".to_string(), } } } From 63b499fd33d1ec726111a35380136c53122990bb Mon Sep 17 00:00:00 2001 From: Corto Date: Mon, 7 Oct 2024 19:37:00 +0800 Subject: [PATCH 05/17] Const for LOCAL_RPC_URL --- plugin/src/plugin.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index e78871b4..13b3fb43 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -28,6 +28,8 @@ impl Debug for SablierPlugin { } } +static LOCAL_RPC_URL: &str = "http://127.0.0.1:8899"; + #[derive(Debug)] pub struct Inner { pub config: PluginConfig, @@ -170,7 +172,7 @@ impl SablierPlugin { /// Goal of this is to catch up on any existing threads that were created before the plugin was loaded. fn fetch_existing_threads(&self) -> PluginResult> { // Using localhost will use the RPC client from the validator. - let rpc_client = RpcClient::new("http://localhost:8899"); + let rpc_client = RpcClient::new(LOCAL_RPC_URL.to_string()); let program_id = sablier_thread_program::ID; let accounts = rpc_client From bb1a2ff293b9c6748bb2af2552131249d81c5e62 Mon Sep 17 00:00:00 2001 From: Corto Date: Mon, 7 Oct 2024 19:48:12 +0800 Subject: [PATCH 06/17] Working untested --- plugin/src/plugin.rs | 74 +++++++++++++++++++++----------------------- 1 file changed, 35 insertions(+), 39 deletions(-) diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index 13b3fb43..14b17ae8 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -3,7 +3,6 @@ use std::{fmt::Debug, sync::Arc}; use anchor_lang::AccountDeserialize; use log::info; use sablier_thread_program::state::VersionedThread; -use solana_client::rpc_client::RpcClient; use solana_geyser_plugin_interface::geyser_plugin_interface::{ GeyserPlugin, ReplicaAccountInfoVersions, Result as PluginResult, SlotStatus, }; @@ -28,8 +27,6 @@ impl Debug for SablierPlugin { } } -static LOCAL_RPC_URL: &str = "http://127.0.0.1:8899"; - #[derive(Debug)] pub struct Inner { pub config: PluginConfig, @@ -55,22 +52,42 @@ impl GeyserPlugin for SablierPlugin { let config = PluginConfig::read_from(config_file)?; *self = SablierPlugin::new_from_config(config); - info!("Fetch existing Thread pdas..."); - let existing_thread_pdas = self.fetch_existing_threads()?; - - info!("Add fetched Thread pdas to observers..."); - let observers = self.inner.observers.clone(); - self.inner.clone().spawn(move |inner| async move { - for (pubkey, thread) in existing_thread_pdas { - observers - .thread - .clone() - .observe_thread(thread, pubkey, 0) + // Fetch existing threads from the chain, and return them as a list of (pubkey, thread) pairs. + // Goal of this is to catch up on any existing threads that were created before the plugin was loaded. + { + info!("Loading previously existing Threads.."); + let observers = self.inner.observers.clone(); + self.inner.clone().spawn(|inner| async move { + info!("Fetch existing Thread pdas..."); + let rpc_client = &inner.executors.client; + let program_id = sablier_thread_program::ID; + + let accounts = rpc_client + .get_program_accounts(&program_id) .await - .ok(); - } - Ok(()) - }); + .map_err(|e| PluginError::from(e))?; + + let existing_thread_pdas: Vec<(Pubkey, VersionedThread)> = accounts + .into_iter() + .filter_map(|(pubkey, account)| { + VersionedThread::try_deserialize(&mut account.data.as_slice()) + .ok() + .map(|thread| (pubkey, thread)) + }) + .collect(); + + info!("Add fetched Thread pdas to observers..."); + for (pubkey, thread) in existing_thread_pdas { + observers + .thread + .clone() + .observe_thread(thread, pubkey, 0) + .await + .ok(); + } + Ok(()) + }); + } Ok(()) } @@ -167,27 +184,6 @@ impl SablierPlugin { }), } } - - /// Fetch existing threads from the chain, and return them as a list of (pubkey, thread) pairs. - /// Goal of this is to catch up on any existing threads that were created before the plugin was loaded. - fn fetch_existing_threads(&self) -> PluginResult> { - // Using localhost will use the RPC client from the validator. - let rpc_client = RpcClient::new(LOCAL_RPC_URL.to_string()); - let program_id = sablier_thread_program::ID; - - let accounts = rpc_client - .get_program_accounts(&program_id) - .map_err(|e| PluginError::from(e))?; - - Ok(accounts - .into_iter() - .filter_map(|(pubkey, account)| { - VersionedThread::try_deserialize(&mut account.data.as_slice()) - .ok() - .map(|thread| (pubkey, thread)) - }) - .collect()) - } } impl Default for SablierPlugin { From 3c04fc51362998aaa3006e02af5a3aa3fab1b4f3 Mon Sep 17 00:00:00 2001 From: Corto Date: Mon, 7 Oct 2024 19:59:53 +0800 Subject: [PATCH 07/17] Update comment --- plugin/src/plugin.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index 14b17ae8..81420c49 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -52,7 +52,6 @@ impl GeyserPlugin for SablierPlugin { let config = PluginConfig::read_from(config_file)?; *self = SablierPlugin::new_from_config(config); - // Fetch existing threads from the chain, and return them as a list of (pubkey, thread) pairs. // Goal of this is to catch up on any existing threads that were created before the plugin was loaded. { info!("Loading previously existing Threads.."); From 410f93580378b69549f40fbff46fcc29ab9ec33c Mon Sep 17 00:00:00 2001 From: Corto Date: Mon, 7 Oct 2024 20:06:12 +0800 Subject: [PATCH 08/17] Filter at the request level --- plugin/src/plugin.rs | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index 81420c49..f07c63f5 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -1,8 +1,13 @@ use std::{fmt::Debug, sync::Arc}; -use anchor_lang::AccountDeserialize; +use anchor_lang::{AccountDeserialize, Discriminator}; use log::info; -use sablier_thread_program::state::VersionedThread; +use sablier_thread_program::state::{Thread, VersionedThread}; +use solana_account_decoder::UiAccountEncoding; +use solana_client::{ + rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, + rpc_filter::{Memcmp, RpcFilterType}, +}; use solana_geyser_plugin_interface::geyser_plugin_interface::{ GeyserPlugin, ReplicaAccountInfoVersions, Result as PluginResult, SlotStatus, }; @@ -61,12 +66,25 @@ impl GeyserPlugin for SablierPlugin { let rpc_client = &inner.executors.client; let program_id = sablier_thread_program::ID; - let accounts = rpc_client - .get_program_accounts(&program_id) + // Filter to retrieve only Thread PDAs + let account_type_filter = + RpcFilterType::Memcmp(Memcmp::new_base58_encoded(0, &Thread::discriminator())); + let config = RpcProgramAccountsConfig { + filters: Some([vec![account_type_filter]].concat()), + account_config: RpcAccountInfoConfig { + encoding: Some(UiAccountEncoding::Base64), + ..RpcAccountInfoConfig::default() + }, + ..RpcProgramAccountsConfig::default() + }; + + // Fetch Thread pdas + let thread_pdas = rpc_client + .get_program_accounts_with_config(&program_id, config) .await .map_err(|e| PluginError::from(e))?; - let existing_thread_pdas: Vec<(Pubkey, VersionedThread)> = accounts + let versioned_thread_pdas: Vec<(Pubkey, VersionedThread)> = thread_pdas .into_iter() .filter_map(|(pubkey, account)| { VersionedThread::try_deserialize(&mut account.data.as_slice()) @@ -76,7 +94,7 @@ impl GeyserPlugin for SablierPlugin { .collect(); info!("Add fetched Thread pdas to observers..."); - for (pubkey, thread) in existing_thread_pdas { + for (pubkey, thread) in versioned_thread_pdas { observers .thread .clone() From a371e231e1ad0e20802fc83b1ed07eb7496bac39 Mon Sep 17 00:00:00 2001 From: Corto Date: Mon, 7 Oct 2024 21:41:49 +0800 Subject: [PATCH 09/17] Add thread delette modifications --- programs/network/src/errors.rs | 6 ++ .../thread/src/instructions/thread_delete.rs | 82 +++++++++++++++---- 2 files changed, 70 insertions(+), 18 deletions(-) diff --git a/programs/network/src/errors.rs b/programs/network/src/errors.rs index 405e9078..2d3b08cd 100644 --- a/programs/network/src/errors.rs +++ b/programs/network/src/errors.rs @@ -22,4 +22,10 @@ pub enum SablierError { #[msg("The worker cannot rotate into the pool right now")] PoolFull, + + #[msg("The provided authority does not match the thread's authority")] + InvalidThreadAuthority, + + #[msg("The provided thread account is not a valid Thread account")] + InvalidThreadAccount, } diff --git a/programs/thread/src/instructions/thread_delete.rs b/programs/thread/src/instructions/thread_delete.rs index 4f468a75..fc5e3c6c 100644 --- a/programs/thread/src/instructions/thread_delete.rs +++ b/programs/thread/src/instructions/thread_delete.rs @@ -1,6 +1,7 @@ use { - crate::{constants::*, state::*}, - anchor_lang::prelude::*, + crate::state::*, + anchor_lang::{prelude::*, solana_program::system_program}, + sablier_network_program::errors::SablierError, }; /// Accounts required by the `thread_delete` instruction. @@ -8,7 +9,7 @@ use { pub struct ThreadDelete<'info> { /// The authority (owner) of the thread. #[account( - constraint = authority.key().eq(&thread.authority) || authority.key().eq(&thread.key()) + // constraint = authority.key().eq(&thread.authority) || authority.key().eq(&thread.key()) )] pub authority: Signer<'info>, @@ -16,27 +17,72 @@ pub struct ThreadDelete<'info> { #[account(mut)] pub close_to: SystemAccount<'info>, - /// The thread to be delete. - #[account( - mut, - seeds = [ - SEED_THREAD, - thread.authority.as_ref(), - thread.id.as_slice(), - thread.domain.as_ref().unwrap_or(&Vec::new()).as_slice() - ], - bump = thread.bump, - )] - pub thread: Account<'info, Thread>, + /// The thread to be deleted. + #[account(mut)] + pub thread: UncheckedAccount<'info>, + // #[account( + // mut, + // seeds = [ + // SEED_THREAD, + // thread.authority.as_ref(), + // thread.id.as_slice(), + // thread.domain.as_ref().unwrap_or(&Vec::new()).as_slice() + // ], + // bump = thread.bump, + // )] + // pub thread: Account<'info, Thread>, } pub fn handler(ctx: Context) -> Result<()> { let thread = &ctx.accounts.thread; let close_to = &ctx.accounts.close_to; - let thread_lamports = thread.get_lamports(); - thread.sub_lamports(thread_lamports)?; - close_to.add_lamports(thread_lamports)?; + // We want this instruction not to fail if the thread is already deleted or inexistent. + // As such, all checks are done in the code that than in anchor (see commented code above) + // First, must try to deserialize the thread. + + // Get either V1 or V2 thread - If the provided thread does not exist, print an error message and return Ok. + let thread = match Thread::try_deserialize_unchecked(&mut thread.data.borrow_mut().as_ref()) { + Ok(t) => t, + Err(_) => { + msg!("Not a thread or account does not exist"); + return Ok(()); + } + }; + + // Preliminary checks + { + // Verify the authority + let authority_key = ctx.accounts.authority.key; + let thread_key = ctx.accounts.thread.key; + + require!( + thread.authority.eq(authority_key) || authority_key.eq(thread_key), + SablierError::InvalidThreadAuthority + ); + + // Verify the account provided + let thread_account = &ctx.accounts.thread; + + // Verify the account is initialized + require!( + thread_account.owner != &system_program::ID && thread_account.lamports() > 0, + SablierError::InvalidThreadAccount + ); + + // Verify the account is owned by the program + require!( + thread_account.owner == &crate::ID, + SablierError::InvalidThreadAccount + ); + } + // Transfer lamports out (implicit close) + { + let thread_account = &ctx.accounts.thread; + let thread_lamports = thread_account.get_lamports(); + thread_account.sub_lamports(thread_lamports)?; + close_to.add_lamports(thread_lamports)?; + } Ok(()) } From dd0d07eb947648ee0f4f6bc77ac672997238a85f Mon Sep 17 00:00:00 2001 From: Corto Date: Mon, 7 Oct 2024 21:54:01 +0800 Subject: [PATCH 10/17] Check seed --- .../thread/src/instructions/thread_delete.rs | 40 ++++++++++++++----- 1 file changed, 29 insertions(+), 11 deletions(-) diff --git a/programs/thread/src/instructions/thread_delete.rs b/programs/thread/src/instructions/thread_delete.rs index fc5e3c6c..043290a8 100644 --- a/programs/thread/src/instructions/thread_delete.rs +++ b/programs/thread/src/instructions/thread_delete.rs @@ -1,5 +1,5 @@ use { - crate::state::*, + crate::{constants::SEED_THREAD, state::*}, anchor_lang::{prelude::*, solana_program::system_program}, sablier_network_program::errors::SablierError, }; @@ -63,18 +63,36 @@ pub fn handler(ctx: Context) -> Result<()> { // Verify the account provided let thread_account = &ctx.accounts.thread; + { + // Verify the account is initialized + require!( + thread_account.owner != &system_program::ID && thread_account.lamports() > 0, + SablierError::InvalidThreadAccount + ); - // Verify the account is initialized - require!( - thread_account.owner != &system_program::ID && thread_account.lamports() > 0, - SablierError::InvalidThreadAccount - ); + // Verify the account is owned by the program + require!( + thread_account.owner == &crate::ID, + SablierError::InvalidThreadAccount + ); - // Verify the account is owned by the program - require!( - thread_account.owner == &crate::ID, - SablierError::InvalidThreadAccount - ); + // Verify the seed derivation + let default_vec = Vec::new(); + let thread_bump = thread.bump.to_le_bytes(); + let seed = [ + SEED_THREAD, + thread.authority.as_ref(), + thread.id.as_slice(), + thread.domain.as_ref().unwrap_or(&default_vec).as_slice(), + thread_bump.as_ref(), + ]; + let expected_thread_key = Pubkey::create_program_address(&seed, &crate::ID) + .map_err(|_| SablierError::InvalidThreadAccount)?; + require!( + expected_thread_key == *thread_key, + SablierError::InvalidThreadAccount + ); + } } // Transfer lamports out (implicit close) From e662b2aeee294d64ac98181f074d2003917a4716 Mon Sep 17 00:00:00 2001 From: Jean Marchand Date: Mon, 7 Oct 2024 20:52:38 +0700 Subject: [PATCH 11/17] Fix test and lints --- plugin/src/plugin.rs | 2 +- programs/thread/src/instructions/thread_delete.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index f07c63f5..b93588df 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -82,7 +82,7 @@ impl GeyserPlugin for SablierPlugin { let thread_pdas = rpc_client .get_program_accounts_with_config(&program_id, config) .await - .map_err(|e| PluginError::from(e))?; + .map_err(PluginError::from)?; let versioned_thread_pdas: Vec<(Pubkey, VersionedThread)> = thread_pdas .into_iter() diff --git a/programs/thread/src/instructions/thread_delete.rs b/programs/thread/src/instructions/thread_delete.rs index 043290a8..f0cb9a94 100644 --- a/programs/thread/src/instructions/thread_delete.rs +++ b/programs/thread/src/instructions/thread_delete.rs @@ -18,6 +18,7 @@ pub struct ThreadDelete<'info> { pub close_to: SystemAccount<'info>, /// The thread to be deleted. + /// CHECKS: made during the instruction processing #[account(mut)] pub thread: UncheckedAccount<'info>, // #[account( From 77f0f01c1f5d9004bcdd20805c4f625e9934f63b Mon Sep 17 00:00:00 2001 From: Jean Marchand Date: Tue, 8 Oct 2024 13:48:57 +0700 Subject: [PATCH 12/17] thread: Temp fix kickoff --- programs/thread/src/instructions/mod.rs | 2 ++ .../src/instructions/thread_reset_next.rs | 33 +++++++++++++++++++ programs/thread/src/lib.rs | 4 +++ 3 files changed, 39 insertions(+) create mode 100644 programs/thread/src/instructions/thread_reset_next.rs diff --git a/programs/thread/src/instructions/mod.rs b/programs/thread/src/instructions/mod.rs index 533d9450..571407d7 100644 --- a/programs/thread/src/instructions/mod.rs +++ b/programs/thread/src/instructions/mod.rs @@ -7,6 +7,7 @@ pub mod thread_instruction_remove; pub mod thread_kickoff; pub mod thread_pause; pub mod thread_reset; +pub mod thread_reset_next; pub mod thread_resume; pub mod thread_update; pub mod thread_withdraw; @@ -20,6 +21,7 @@ pub use thread_instruction_remove::*; pub use thread_kickoff::*; pub use thread_pause::*; pub use thread_reset::*; +pub use thread_reset_next::*; pub use thread_resume::*; pub use thread_update::*; pub use thread_withdraw::*; diff --git a/programs/thread/src/instructions/thread_reset_next.rs b/programs/thread/src/instructions/thread_reset_next.rs new file mode 100644 index 00000000..b3920727 --- /dev/null +++ b/programs/thread/src/instructions/thread_reset_next.rs @@ -0,0 +1,33 @@ +use {crate::state::*, anchor_lang::prelude::*, sablier_network_program::state::Config}; + +/// Accounts required by the `thread_reset` instruction. +#[derive(Accounts)] +pub struct ThreadResetNext<'info> { + #[account(has_one = admin)] + pub config: AccountLoader<'info, Config>, + pub admin: Signer<'info>, + /// The thread to be paused. + #[account(mut)] + pub thread: Account<'info, Thread>, +} + +pub fn handler(ctx: Context, timestamp: i64) -> Result<()> { + // Get accounts + let thread = &mut ctx.accounts.thread; + let clock = Clock::get()?; + + // Full reset the thread state. + thread.exec_context = Some(ExecContext { + exec_index: 0, + execs_since_reimbursement: 0, + execs_since_slot: 0, + last_exec_at: clock.slot, + trigger_context: TriggerContext::Periodic { + started_at: timestamp, + }, + }); + thread.trigger = Trigger::Periodic { delay: 21600 }; + thread.next_instruction = None; + + Ok(()) +} diff --git a/programs/thread/src/lib.rs b/programs/thread/src/lib.rs index fa543897..7f05ed54 100644 --- a/programs/thread/src/lib.rs +++ b/programs/thread/src/lib.rs @@ -97,4 +97,8 @@ pub mod thread_program { pub fn thread_withdraw(ctx: Context, amount: u64) -> Result<()> { thread_withdraw::handler(ctx, amount) } + + pub fn thread_reset_next(ctx: Context, timestamp: i64) -> Result<()> { + thread_reset_next::handler(ctx, timestamp) + } } From 42475e29fd1afa983a9f003b1cb0ca106a840485 Mon Sep 17 00:00:00 2001 From: Corto Date: Tue, 8 Oct 2024 18:32:48 +0800 Subject: [PATCH 13/17] tag to test hot reload --- plugin/src/plugin.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index b93588df..26cf45db 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -53,7 +53,7 @@ impl GeyserPlugin for SablierPlugin { env!("GEYSER_INTERFACE_VERSION"), env!("RUSTC_VERSION") ); - info!("Loading snapshot..."); + info!("Loading snapshot... 8----"); let config = PluginConfig::read_from(config_file)?; *self = SablierPlugin::new_from_config(config); From 194c428e44886ced17a5b07eb843c5108654631b Mon Sep 17 00:00:00 2001 From: Corto Date: Tue, 8 Oct 2024 19:23:08 +0800 Subject: [PATCH 14/17] Logs --- plugin/src/plugin.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index 26cf45db..f1203ba7 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -53,7 +53,7 @@ impl GeyserPlugin for SablierPlugin { env!("GEYSER_INTERFACE_VERSION"), env!("RUSTC_VERSION") ); - info!("Loading snapshot... 8----"); + info!("Loading snapshot..."); let config = PluginConfig::read_from(config_file)?; *self = SablierPlugin::new_from_config(config); @@ -83,6 +83,7 @@ impl GeyserPlugin for SablierPlugin { .get_program_accounts_with_config(&program_id, config) .await .map_err(PluginError::from)?; + info!(" - Fetched {} Thread PDAs", thread_pdas.len()); let versioned_thread_pdas: Vec<(Pubkey, VersionedThread)> = thread_pdas .into_iter() @@ -92,8 +93,9 @@ impl GeyserPlugin for SablierPlugin { .map(|thread| (pubkey, thread)) }) .collect(); + info!(" - after deserialization: {} Thread PDAs left", versioned_thread_pdas.len()); - info!("Add fetched Thread pdas to observers..."); + info!("Adding {} fetched Thread pdas to observers...", versioned_thread_pdas.len()); for (pubkey, thread) in versioned_thread_pdas { observers .thread From 7065058cdb933639683ceedcf1f024bd8b44064d Mon Sep 17 00:00:00 2001 From: Corto Date: Wed, 9 Oct 2024 08:46:09 +0800 Subject: [PATCH 15/17] more logs --- plugin/src/plugin.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index f1203ba7..36efe9d8 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -59,7 +59,7 @@ impl GeyserPlugin for SablierPlugin { // Goal of this is to catch up on any existing threads that were created before the plugin was loaded. { - info!("Loading previously existing Threads.."); + info!("Loading previously existing Threads..."); let observers = self.inner.observers.clone(); self.inner.clone().spawn(|inner| async move { info!("Fetch existing Thread pdas..."); @@ -69,6 +69,7 @@ impl GeyserPlugin for SablierPlugin { // Filter to retrieve only Thread PDAs let account_type_filter = RpcFilterType::Memcmp(Memcmp::new_base58_encoded(0, &Thread::discriminator())); + info!("here"); let config = RpcProgramAccountsConfig { filters: Some([vec![account_type_filter]].concat()), account_config: RpcAccountInfoConfig { @@ -77,7 +78,7 @@ impl GeyserPlugin for SablierPlugin { }, ..RpcProgramAccountsConfig::default() }; - + info!("here2"); // Fetch Thread pdas let thread_pdas = rpc_client .get_program_accounts_with_config(&program_id, config) @@ -93,10 +94,17 @@ impl GeyserPlugin for SablierPlugin { .map(|thread| (pubkey, thread)) }) .collect(); - info!(" - after deserialization: {} Thread PDAs left", versioned_thread_pdas.len()); - - info!("Adding {} fetched Thread pdas to observers...", versioned_thread_pdas.len()); + info!( + " - after deserialization: {} Thread PDAs left", + versioned_thread_pdas.len() + ); + + info!( + "Adding {} fetched Thread pdas to observers...", + versioned_thread_pdas.len() + ); for (pubkey, thread) in versioned_thread_pdas { + info!("here3"); observers .thread .clone() From bacf5aeac4cdbe5eca5b6b59afacab92ee163835 Mon Sep 17 00:00:00 2001 From: Corto Date: Wed, 9 Oct 2024 09:03:29 +0800 Subject: [PATCH 16/17] more logs --- plugin/src/plugin.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index 36efe9d8..15b1e255 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -83,7 +83,10 @@ impl GeyserPlugin for SablierPlugin { let thread_pdas = rpc_client .get_program_accounts_with_config(&program_id, config) .await - .map_err(PluginError::from)?; + .map_err(|err| { + info!("Error fetching Thread PDAs: {}", err); + PluginError::from(err) + })?; info!(" - Fetched {} Thread PDAs", thread_pdas.len()); let versioned_thread_pdas: Vec<(Pubkey, VersionedThread)> = thread_pdas From e366982d5a89e75b478a0c7e8b22a058dcc7f9ac Mon Sep 17 00:00:00 2001 From: Corto Date: Wed, 9 Oct 2024 09:21:20 +0800 Subject: [PATCH 17/17] Update logs --- plugin/src/plugin.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/plugin/src/plugin.rs b/plugin/src/plugin.rs index 15b1e255..d163cb98 100644 --- a/plugin/src/plugin.rs +++ b/plugin/src/plugin.rs @@ -63,13 +63,12 @@ impl GeyserPlugin for SablierPlugin { let observers = self.inner.observers.clone(); self.inner.clone().spawn(|inner| async move { info!("Fetch existing Thread pdas..."); - let rpc_client = &inner.executors.client; + let client = inner.executors.client.clone(); let program_id = sablier_thread_program::ID; // Filter to retrieve only Thread PDAs let account_type_filter = RpcFilterType::Memcmp(Memcmp::new_base58_encoded(0, &Thread::discriminator())); - info!("here"); let config = RpcProgramAccountsConfig { filters: Some([vec![account_type_filter]].concat()), account_config: RpcAccountInfoConfig { @@ -78,9 +77,9 @@ impl GeyserPlugin for SablierPlugin { }, ..RpcProgramAccountsConfig::default() }; - info!("here2"); + // Fetch Thread pdas - let thread_pdas = rpc_client + let thread_pdas = client .get_program_accounts_with_config(&program_id, config) .await .map_err(|err| {