Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Versioned Tx Support for Thread Execs #244

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions plugin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
simple-error = "0.2.3"
solana-account-decoder = "=1.14.16"
solana-address-lookup-table-program = "1.10.1"
solana-client = "=1.14.16"
solana-geyser-plugin-interface = "=1.14.16"
solana-logger = "=1.14.16"
Expand Down
33 changes: 23 additions & 10 deletions plugin/src/builders/pool_rotation.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use std::sync::Arc;

use anchor_lang::{
solana_program::instruction::Instruction,
InstructionData, ToAccountMetas
};
use anchor_lang::{solana_program::instruction::Instruction, InstructionData, ToAccountMetas};
use clockwork_network_program::state::{Config, Pool, Registry, Snapshot, SnapshotFrame, Worker};
use log::info;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{signature::Keypair, signer::Signer, transaction::Transaction};
use solana_geyser_plugin_interface::geyser_plugin_interface::GeyserPluginError;
use solana_program::message::{v0, VersionedMessage};
use solana_sdk::{signature::Keypair, signer::Signer, transaction::VersionedTransaction};

use crate::pool_position::PoolPosition;

Expand All @@ -19,7 +18,7 @@ pub async fn build_pool_rotation_tx<'a>(
snapshot: Snapshot,
snapshot_frame: SnapshotFrame,
worker_id: u64,
) -> Option<Transaction> {
) -> Option<VersionedTransaction> {
info!("nonce: {:?} total_stake: {:?} current_position: {:?} stake_offset: {:?} stake_amount: {:?}",
registry.nonce.checked_rem(snapshot.total_stake),
snapshot.total_stake,
Expand Down Expand Up @@ -76,12 +75,26 @@ pub async fn build_pool_rotation_tx<'a>(
snapshot: snapshot_pubkey,
snapshot_frame: SnapshotFrame::pubkey(snapshot_pubkey, worker_id),
worker: Worker::pubkey(worker_id),
}.to_account_metas(Some(false)),
}
.to_account_metas(Some(false)),
data: clockwork_network_program::instruction::PoolRotate {}.data(),
};

// Build and sign tx.
let mut tx = Transaction::new_with_payer(&[ix.clone()], Some(&keypair.pubkey()));
tx.sign(&[keypair], client.get_latest_blockhash().await.unwrap());
return Some(tx);
let blockhash = client.get_latest_blockhash().await.unwrap();

let tx = match v0::Message::try_compile(&keypair.pubkey(), &[ix.clone()], &[], blockhash) {
Err(_) => Err(GeyserPluginError::Custom(
format!("Failed to compile to v0 message ").into(),
)),
Ok(message) => {
match VersionedTransaction::try_new(VersionedMessage::V0(message), &[keypair]) {
Err(_) => Err(GeyserPluginError::Custom(
format!("Failed to create versioned transaction ").into(),
)),
Ok(tx) => Ok(tx),
}
}
};
return tx.ok();
}
56 changes: 46 additions & 10 deletions plugin/src/builders/thread_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;
use anchor_lang::{InstructionData, ToAccountMetas};
use clockwork_thread_program::state::{VersionedThread, Trigger};
use clockwork_network_program::state::Worker;
use bincode::serialize;
use clockwork_utils::thread::PAYER_PUBKEY;
use log::info;
use solana_account_decoder::UiAccountEncoding;
Expand All @@ -16,12 +17,16 @@ use solana_geyser_plugin_interface::geyser_plugin_interface::{
};
use solana_program::{
instruction::{AccountMeta, Instruction},
pubkey::Pubkey,
pubkey::Pubkey, address_lookup_table_account::AddressLookupTableAccount,
};
use solana_sdk::{
account::Account, commitment_config::CommitmentConfig,
compute_budget::ComputeBudgetInstruction, signature::Keypair, signer::Signer,
transaction::Transaction,
account::Account,
commitment_config::CommitmentConfig,
compute_budget::ComputeBudgetInstruction,
message::{v0, VersionedMessage},
signature::Keypair,
signer::Signer,
transaction::{VersionedTransaction},
};

/// Max byte size of a serialized transaction.
Expand All @@ -40,7 +45,8 @@ pub async fn build_thread_exec_tx(
thread: VersionedThread,
thread_pubkey: Pubkey,
worker_id: u64,
) -> PluginResult<Option<Transaction>> {
address_lookup_tables: Vec<AddressLookupTableAccount>
) -> PluginResult<Option<VersionedTransaction>> {
// Grab the thread and relevant data.
let now = std::time::Instant::now();
let blockhash = client.get_latest_blockhash().await.unwrap();
Expand Down Expand Up @@ -73,11 +79,24 @@ pub async fn build_thread_exec_tx(
let mut successful_ixs: Vec<Instruction> = vec![];
let mut units_consumed: Option<u64> = None;
loop {
let mut sim_tx = Transaction::new_with_payer(&ixs, Some(&signatory_pubkey));
sim_tx.sign(&[payer], blockhash);
let sim_tx = match v0::Message::try_compile(
&signatory_pubkey,
&ixs,
&address_lookup_tables,
blockhash,
) {
Err(_) => Err(GeyserPluginError::Custom(format!("Failed to compile to v0 message ").into())),
Ok(message) => match VersionedTransaction::try_new(
VersionedMessage::V0(message),
&[payer]
) {
Err(_) => Err(GeyserPluginError::Custom(format!("Failed to create versioned transaction ").into())),
Ok(tx) => Ok(tx)
}
}?;

// Exit early if the transaction exceeds the size limit.
if sim_tx.message_data().len() > TRANSACTION_MESSAGE_SIZE_LIMIT {
if serialize(&sim_tx).unwrap().len() > TRANSACTION_MESSAGE_SIZE_LIMIT {
break;
}

Expand Down Expand Up @@ -198,9 +217,26 @@ pub async fn build_thread_exec_tx(
);
}

// let mut tx = Transaction::new_with_payer(&successful_ixs, Some(&signatory_pubkey));
// tx.sign(&[payer], blockhash);

// Build and return the signed transaction.
let mut tx = Transaction::new_with_payer(&successful_ixs, Some(&signatory_pubkey));
tx.sign(&[payer], blockhash);
let tx = match v0::Message::try_compile(
&signatory_pubkey,
&ixs,
&address_lookup_tables,
blockhash,
) {
Err(_) => Err(GeyserPluginError::Custom(format!("Failed to compile to v0 message ").into())),
Ok(message) => match VersionedTransaction::try_new(
VersionedMessage::V0(message),
&[payer]
) {
Err(_) => Err(GeyserPluginError::Custom(format!("Failed to create versioned transaction ").into())),
Ok(tx) => Ok(tx)
}

}?;
info!(
"slot: {:?} thread: {:?} sim_duration: {:?} instruction_count: {:?} compute_units: {:?} tx_sig: {:?}",
slot,
Expand Down
57 changes: 57 additions & 0 deletions plugin/src/executors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ use std::{

use anchor_lang::{prelude::Pubkey, AccountDeserialize};
use async_trait::async_trait;
use clockwork_thread_program::state::LookupTables;
use log::info;
use solana_address_lookup_table_program::state::AddressLookupTable;
use solana_client::{
client_error::{ClientError, ClientErrorKind, Result as ClientResult},
nonblocking::rpc_client::RpcClient,
};
use solana_geyser_plugin_interface::geyser_plugin_interface::Result as PluginResult;
use solana_program::address_lookup_table_account::AddressLookupTableAccount;
use solana_sdk::commitment_config::CommitmentConfig;
use tokio::runtime::Runtime;
use tx::TxExecutor;
Expand Down Expand Up @@ -137,3 +140,57 @@ impl AccountGet for RpcClient {
})
}
}

#[async_trait]
pub trait LookupTablesGet {
async fn get_lookup_tables(
&self,
pubkey: &Pubkey,
) -> ClientResult<Vec<AddressLookupTableAccount>>;
}

#[async_trait]
impl LookupTablesGet for RpcClient {
async fn get_lookup_tables(
&self,
pubkey: &Pubkey,
) -> ClientResult<Vec<AddressLookupTableAccount>> {
let lookup_account = self
.get_account_with_commitment(pubkey, self.commitment()) // returns Ok(None) if lookup account is not initialized
.await?
.value;
match lookup_account {
// return empty vec if lookup account has not been initialized
None => Ok(vec![]),

// get lookup tables in lookup accounts if account has been initialized
Some(lookup) => {
let lookup_keys = LookupTables::try_deserialize(&mut lookup.data.as_slice())
.map_err(|_| {
ClientError::from(ClientErrorKind::Custom(format!(
"Failed to deserialize account data"
)))
})
.expect("Failed to deserialize lookup data")
.lookup_tables;

let lookup_tables =
futures::future::join_all(lookup_keys.iter().map(|key| async move {
let raw_account = self.get_account(key).await?;
let address_lookup_table =
AddressLookupTable::deserialize(&raw_account.data).map_err(|_| {
ClientError::from(ClientErrorKind::Custom(format!(
"Could not deserialise Address Lookup Table"
)))
})?;
Ok(AddressLookupTableAccount {
key: *key,
addresses: address_lookup_table.addresses.to_vec(),
})
}))
.await;
lookup_tables.into_iter().collect()
}
}
}
}
34 changes: 26 additions & 8 deletions plugin/src/executors/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
use async_once::AsyncOnce;
use bincode::serialize;
use clockwork_network_program::state::{Pool, Registry, Snapshot, SnapshotFrame, Worker};
use clockwork_thread_program::state::VersionedThread;
use clockwork_thread_program::state::{LookupTables, VersionedThread};
use lazy_static::lazy_static;
use log::info;
use solana_client::{
Expand All @@ -25,13 +25,13 @@ use solana_program::pubkey::Pubkey;
use solana_sdk::{
commitment_config::CommitmentConfig,
signature::{Keypair, Signature},
transaction::Transaction,
transaction::{VersionedTransaction},
};
use tokio::{runtime::Runtime, sync::RwLock};

use crate::{config::PluginConfig, pool_position::PoolPosition, utils::read_or_new_keypair};

use super::AccountGet;
use super::{AccountGet, LookupTablesGet};

/// Number of slots to wait before checking for a confirmed transaction.
static TRANSACTION_CONFIRMATION_PERIOD: u64 = 24;
Expand Down Expand Up @@ -428,7 +428,7 @@ impl TxExecutor {
observed_slot: u64,
due_slot: u64,
thread_pubkey: Pubkey,
) -> Option<(Pubkey, Transaction, u64)> {
) -> Option<(Pubkey, VersionedTransaction, u64)> {
let thread = match client.clone().get::<VersionedThread>(&thread_pubkey).await {
Err(_err) => {
self.increment_simulation_failure(thread_pubkey).await;
Expand All @@ -447,6 +447,15 @@ impl TxExecutor {
return None;
}
}
let lookup_tables_key = LookupTables::pubkey(thread.authority(), thread.pubkey());

let address_lookup_tables = match client.clone().get_lookup_tables(&lookup_tables_key).await
{
Err(_err) => {
return None;
}
Ok(address_lookup_tables) => address_lookup_tables,
};

if let Ok(tx) = crate::builders::build_thread_exec_tx(
client.clone(),
Expand All @@ -455,6 +464,7 @@ impl TxExecutor {
thread,
thread_pubkey,
self.config.worker_id,
address_lookup_tables,
)
.await
{
Expand Down Expand Up @@ -490,7 +500,7 @@ impl TxExecutor {
self: Arc<Self>,
slot: u64,
thread_pubkey: Pubkey,
tx: &Transaction,
tx: &VersionedTransaction,
) -> PluginResult<()> {
let r_transaction_history = self.transaction_history.read().await;
if let Some(metadata) = r_transaction_history.get(&thread_pubkey) {
Expand All @@ -502,7 +512,10 @@ impl TxExecutor {
Ok(())
}

async fn simulate_tx(self: Arc<Self>, tx: &Transaction) -> PluginResult<Transaction> {
async fn simulate_tx(
self: Arc<Self>,
tx: &VersionedTransaction,
) -> PluginResult<VersionedTransaction> {
TPU_CLIENT
.get()
.await
Expand Down Expand Up @@ -531,8 +544,13 @@ impl TxExecutor {
})?
}

async fn submit_tx(self: Arc<Self>, tx: &Transaction) -> PluginResult<Transaction> {
if !TPU_CLIENT.get().await.send_transaction(tx).await {
async fn submit_tx(
self: Arc<Self>,
tx: &VersionedTransaction,
) -> PluginResult<VersionedTransaction> {
let serialized_tx = serialize(tx).unwrap();

if !TPU_CLIENT.get().await.send_wire_transaction(serialized_tx).await {
return Err(GeyserPluginError::Custom(
"Failed to send transaction".into(),
));
Expand Down
1 change: 1 addition & 0 deletions programs/thread/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ clockwork-utils = { path = "../../utils", version = "=2.0.17" }
pyth-sdk-solana = "0.7.1"
static-pubkey = "1.0.3"
version = "3.0.0"
bincode = "1.3.3"
4 changes: 4 additions & 0 deletions programs/thread/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,8 @@ pub enum ClockworkError {
/// Thrown if the user attempts to withdraw SOL that would put a thread below it's minimum rent threshold.
#[msg("Withdrawing this amount would leave the thread with less than the minimum required SOL for rent exemption")]
WithdrawalTooLarge,

/// Thrown if the size of the instruction to be added to the thread is larger than the next instruction size
#[msg("Instruction too large for thread")]
InstructionTooLarge,
}
Loading