Skip to content

Commit

Permalink
feat(stream): add method for forced sequential execution
Browse files Browse the repository at this point in the history
  • Loading branch information
joeymeere committed Nov 13, 2024
1 parent 5476a60 commit c390d81
Showing 1 changed file with 139 additions and 0 deletions.
139 changes: 139 additions & 0 deletions packages/stream/solana/StreamClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,145 @@ export class SolanaStreamClient extends BaseStreamClient {
return { txs: signatures, metadatas, metadataToRecipient, errors };
}

/**
* Creates multiple stream/vesting contracts, and send all transactions sequentially.
* All fees are paid by sender (escrow metadata account rent, escrow token account rent, recipient's associated token account rent, Streamflow's service fee).
*/
public async createMultipleSequential(
data: ICreateMultipleStreamData,
extParams: ICreateStreamSolanaExt,
): Promise<IMultiTransactionResult> {
const { recipients, ...streamParams } = data;

const { sender, metadataPubKeys: metadataPubKeysExt, isNative, computePrice, computeLimit } = extParams;

const metadatas: string[] = [];
const metadataToRecipient: MetadataRecipientHashMap = {};
const errors: ICreateMultiError[] = [];
const signatures: string[] = [];
const batch: BatchItem[] = [];
const instructionsBatch: {
ixs: TransactionInstruction[];
metadata: Keypair | undefined;
recipient: string;
}[] = [];
const metadataPubKeys = metadataPubKeysExt || [];

const partnerPublicKey = data.partner ? new PublicKey(data.partner) : WITHDRAWOR_PUBLIC_KEY;
const mintPublicKey = new PublicKey(data.tokenId);

if (recipients.length === 0) {
throw new Error("Recipients array is empty!");
}

if (!sender.publicKey) {
throw new Error("Sender's PublicKey is not available, check passed wallet adapter!");
}

for (let i = 0; i < recipients.length; i++) {
const recipientData = recipients[i];
const createStreamData = { ...streamParams, ...recipientData };
const createStreamExtParams = {
sender,
metadataPubKeys: metadataPubKeys[i] ? [metadataPubKeys[i]] : undefined,
computePrice,
computeLimit,
};

const { ixs, metadata, metadataPubKey } = await this.prepareCreateInstructions(
createStreamData,
createStreamExtParams,
);

metadataToRecipient[metadataPubKey.toBase58()] = recipientData;

metadatas.push(metadataPubKey.toBase58());
instructionsBatch.push({
ixs,
metadata,
recipient: recipientData.recipient,
});
}

const { value: hash, context } = await this.connection.getLatestBlockhashAndContext();

for (const { ixs, metadata, recipient } of instructionsBatch) {
const messageV0 = new TransactionMessage({
payerKey: sender.publicKey,
recentBlockhash: hash.blockhash,
instructions: ixs,
}).compileToV0Message();
const tx = new VersionedTransaction(messageV0);
if (metadata) {
tx.sign([metadata]);
}
batch.push({ tx, recipient });
}

const prepareInstructions = await this.getCreateATAInstructions(
[STREAMFLOW_TREASURY_PUBLIC_KEY, partnerPublicKey],
mintPublicKey,
sender,
true,
);

if (isNative) {
const totalDepositedAmount = recipients.reduce((acc, recipient) => recipient.amount.add(acc), new BN(0));
const nativeInstructions = await prepareWrappedAccount(this.connection, sender.publicKey, totalDepositedAmount);
prepareInstructions.push(...nativeInstructions);
}

if (prepareInstructions.length > 0) {
const messageV0 = new TransactionMessage({
payerKey: sender.publicKey,
recentBlockhash: hash.blockhash,
instructions: prepareInstructions,
}).compileToV0Message();
const tx = new VersionedTransaction(messageV0);

batch.push({
tx,
recipient: sender.publicKey.toBase58(),
});
}

const signedBatch: BatchItem[] = await signAllTransactionWithRecipients(sender, batch);

if (prepareInstructions.length > 0) {
const prepareTx = signedBatch.pop();
await sendAndConfirmStreamRawTransaction(
this.connection,
prepareTx!,
{ hash, context },
{ sendThrottler: this.sendThrottler },
);
}

const responses: PromiseSettledResult<string>[] = [];

for (const batchTx of signedBatch) {
responses.push(
...(await Promise.allSettled([
executeTransaction(this.connection, batchTx.tx, { hash, context }, { sendThrottler: this.sendThrottler }),
])),
);
}

responses.forEach((item, index) => {
if (item.status === "fulfilled") {
signatures.push(item.value);
} else {
errors.push({
recipient: signedBatch[index].recipient,
error: item.reason,
contractErrorCode: this.extractErrorCode(item.reason) || undefined,
});
}
});

return { txs: signatures, metadatas, metadataToRecipient, errors };
}

/**
* Attempts withdrawing from the specified stream.
*/
Expand Down

0 comments on commit c390d81

Please sign in to comment.