Skip to content

Commit

Permalink
Merge branch 'fix/retrieve-existing-threads-at-startup' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
Aursen committed Oct 8, 2024
2 parents 77f0f01 + e662b2a commit 4f38229
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 18 deletions.
7 changes: 7 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"cSpell.words": [
"pdas",
"RUSTC",
"Sablier"
]
}
59 changes: 59 additions & 0 deletions plugin/src/plugin.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
use std::{fmt::Debug, sync::Arc};

use anchor_lang::{AccountDeserialize, Discriminator};
use log::info;
use sablier_thread_program::state::{Thread, VersionedThread};
use solana_account_decoder::UiAccountEncoding;
use solana_client::{
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
rpc_filter::{Memcmp, RpcFilterType},
};
use solana_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPlugin, ReplicaAccountInfoVersions, Result as PluginResult, SlotStatus,
};
use solana_sdk::pubkey::Pubkey;
use tokio::runtime::{Builder, Runtime};

use crate::{
config::PluginConfig,
error::PluginError,
events::{AccountUpdate, AccountUpdateEvent},
executors::Executors,
observers::Observers,
Expand Down Expand Up @@ -47,6 +56,56 @@ impl GeyserPlugin for SablierPlugin {
info!("Loading snapshot...");
let config = PluginConfig::read_from(config_file)?;
*self = SablierPlugin::new_from_config(config);

// Goal of this is to catch up on any existing threads that were created before the plugin was loaded.
{
info!("Loading previously existing Threads..");
let observers = self.inner.observers.clone();
self.inner.clone().spawn(|inner| async move {
info!("Fetch existing Thread pdas...");
let rpc_client = &inner.executors.client;
let program_id = sablier_thread_program::ID;

// Filter to retrieve only Thread PDAs
let account_type_filter =
RpcFilterType::Memcmp(Memcmp::new_base58_encoded(0, &Thread::discriminator()));
let config = RpcProgramAccountsConfig {
filters: Some([vec![account_type_filter]].concat()),
account_config: RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
..RpcAccountInfoConfig::default()
},
..RpcProgramAccountsConfig::default()
};

// Fetch Thread pdas
let thread_pdas = rpc_client
.get_program_accounts_with_config(&program_id, config)
.await
.map_err(PluginError::from)?;

let versioned_thread_pdas: Vec<(Pubkey, VersionedThread)> = thread_pdas
.into_iter()
.filter_map(|(pubkey, account)| {
VersionedThread::try_deserialize(&mut account.data.as_slice())
.ok()
.map(|thread| (pubkey, thread))
})
.collect();

info!("Add fetched Thread pdas to observers...");
for (pubkey, thread) in versioned_thread_pdas {
observers
.thread
.clone()
.observe_thread(thread, pubkey, 0)
.await
.ok();
}
Ok(())
});
}

Ok(())
}

Expand Down
6 changes: 6 additions & 0 deletions programs/network/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,10 @@ pub enum SablierError {

#[msg("The worker cannot rotate into the pool right now")]
PoolFull,

#[msg("The provided authority does not match the thread's authority")]
InvalidThreadAuthority,

#[msg("The provided thread account is not a valid Thread account")]
InvalidThreadAccount,
}
101 changes: 83 additions & 18 deletions programs/thread/src/instructions/thread_delete.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,107 @@
use {
crate::{constants::*, state::*},
anchor_lang::prelude::*,
crate::{constants::SEED_THREAD, state::*},
anchor_lang::{prelude::*, solana_program::system_program},
sablier_network_program::errors::SablierError,
};

/// Accounts required by the `thread_delete` instruction.
#[derive(Accounts)]
pub struct ThreadDelete<'info> {
/// The authority (owner) of the thread.
#[account(
constraint = authority.key().eq(&thread.authority) || authority.key().eq(&thread.key())
// constraint = authority.key().eq(&thread.authority) || authority.key().eq(&thread.key())
)]
pub authority: Signer<'info>,

/// The address to return the data rent lamports to.
#[account(mut)]
pub close_to: SystemAccount<'info>,

/// The thread to be delete.
#[account(
mut,
seeds = [
SEED_THREAD,
thread.authority.as_ref(),
thread.id.as_slice(),
thread.domain.as_ref().unwrap_or(&Vec::new()).as_slice()
],
bump = thread.bump,
)]
pub thread: Account<'info, Thread>,
/// The thread to be deleted.
/// CHECKS: made during the instruction processing
#[account(mut)]
pub thread: UncheckedAccount<'info>,
// #[account(
// mut,
// seeds = [
// SEED_THREAD,
// thread.authority.as_ref(),
// thread.id.as_slice(),
// thread.domain.as_ref().unwrap_or(&Vec::new()).as_slice()
// ],
// bump = thread.bump,
// )]
// pub thread: Account<'info, Thread>,
}

pub fn handler(ctx: Context<ThreadDelete>) -> Result<()> {
let thread = &ctx.accounts.thread;
let close_to = &ctx.accounts.close_to;

let thread_lamports = thread.get_lamports();
thread.sub_lamports(thread_lamports)?;
close_to.add_lamports(thread_lamports)?;
// We want this instruction not to fail if the thread is already deleted or inexistent.
// As such, all checks are done in the code that than in anchor (see commented code above)
// First, must try to deserialize the thread.

// Get either V1 or V2 thread - If the provided thread does not exist, print an error message and return Ok.
let thread = match Thread::try_deserialize_unchecked(&mut thread.data.borrow_mut().as_ref()) {
Ok(t) => t,
Err(_) => {
msg!("Not a thread or account does not exist");
return Ok(());
}
};

// Preliminary checks
{
// Verify the authority
let authority_key = ctx.accounts.authority.key;
let thread_key = ctx.accounts.thread.key;

require!(
thread.authority.eq(authority_key) || authority_key.eq(thread_key),
SablierError::InvalidThreadAuthority
);

// Verify the account provided
let thread_account = &ctx.accounts.thread;
{
// Verify the account is initialized
require!(
thread_account.owner != &system_program::ID && thread_account.lamports() > 0,
SablierError::InvalidThreadAccount
);

// Verify the account is owned by the program
require!(
thread_account.owner == &crate::ID,
SablierError::InvalidThreadAccount
);

// Verify the seed derivation
let default_vec = Vec::new();
let thread_bump = thread.bump.to_le_bytes();
let seed = [
SEED_THREAD,
thread.authority.as_ref(),
thread.id.as_slice(),
thread.domain.as_ref().unwrap_or(&default_vec).as_slice(),
thread_bump.as_ref(),
];
let expected_thread_key = Pubkey::create_program_address(&seed, &crate::ID)
.map_err(|_| SablierError::InvalidThreadAccount)?;
require!(
expected_thread_key == *thread_key,
SablierError::InvalidThreadAccount
);
}
}

// Transfer lamports out (implicit close)
{
let thread_account = &ctx.accounts.thread;
let thread_lamports = thread_account.get_lamports();
thread_account.sub_lamports(thread_lamports)?;
close_to.add_lamports(thread_lamports)?;
}
Ok(())
}

0 comments on commit 4f38229

Please sign in to comment.