diff --git a/example/src/Client/Client.ts b/example/src/Client/Client.ts index d1915c5..c58ec77 100644 --- a/example/src/Client/Client.ts +++ b/example/src/Client/Client.ts @@ -22,7 +22,22 @@ export class Client { } public addAccount(privateKey: string) { - this.accountsManager.addAccount(privateKey); + return this.accountsManager.addAccount(privateKey); + } + + public async waitForProcessorSync() { + await this.blockProcessor.waitForProcessorSync(); + } + + public async waitForAccountSync(publicAddress: string) { + await this.waitForProcessorSync(); + console.log( + `Processor synced. Waiting for account ${publicAddress} to sync`, + ); + + const head = await this.blockCache.getHeadSequence(); + await this.accountsManager.waitForAccountSync(publicAddress, head); + console.log(`Account ${publicAddress} synced to head ${head}`); } public async start() { diff --git a/example/src/Client/utils/AccountsManager.ts b/example/src/Client/utils/AccountsManager.ts index f6b5aeb..7f01312 100644 --- a/example/src/Client/utils/AccountsManager.ts +++ b/example/src/Client/utils/AccountsManager.ts @@ -1,3 +1,5 @@ +import { EventEmitter } from "events"; + import { generateKeyFromPrivateKey, Key, @@ -24,6 +26,7 @@ export interface DecryptedNoteValue { interface AccountData { key: Key; + head: number; assets: Map< string, { @@ -37,13 +40,15 @@ export class AccountsManager { private blockCache: BlockCache; /** publicKey => AccountData */ private accounts: Map = new Map(); + private events: EventEmitter = new EventEmitter(); constructor(blockCache: BlockCache) { this.blockCache = blockCache; } public addAccount(privateKey: string) { - this.accounts.set(...this._makeAccountData(privateKey)); + const accountData = this._makeAccountData(privateKey); + this.accounts.set(...accountData); this.blockCache .createReadStream() @@ -53,7 +58,42 @@ export class AccountsManager { return; } this._processBlockForTransactions(value); + this.events.emit("accounts-updated"); }); + + return accountData[0]; + } + + public waitForAccountSync( + publicAddress: string, + sequence: number, + ): Promise { + return new Promise((resolve, reject) => { + const checkSequence = () => { + const accountData = this.accounts.get(publicAddress); + if (!accountData) { + this.events.removeListener("accounts-updated", checkSequence); + return reject( + new Error(`Account with public address ${publicAddress} not found`), + ); + } + logThrottled( + `Waiting for sync to complete, ${accountData.head}/${sequence}`, + 1000, + accountData.head, + ); + if (accountData.head >= sequence) { + this.events.removeListener("accounts-updated", checkSequence); + return resolve(); + } + }; + + // Check initially + checkSequence(); + + // Listen for account updates + this.events.on("accounts-updated", checkSequence); + }); } public getPublicAddresses() { @@ -66,6 +106,7 @@ export class AccountsManager { key.publicAddress, { key, + head: 0, assets: new Map(), }, ]; @@ -86,6 +127,10 @@ export class AccountsManager { // @todo: Process spends }); + + for (const [_, account] of this.accounts) { + account.head = parsedBlock.sequence; + } } private _processNote( diff --git a/example/src/Client/utils/BlockProcessor.ts b/example/src/Client/utils/BlockProcessor.ts index b1acba5..f358cb7 100644 --- a/example/src/Client/utils/BlockProcessor.ts +++ b/example/src/Client/utils/BlockProcessor.ts @@ -1,3 +1,5 @@ +import { EventEmitter } from "events"; + import { ServiceError } from "@grpc/grpc-js"; import { BlockCache } from "./BlockCache"; import { @@ -16,6 +18,7 @@ export class BlockProcessor { private pollInterval?: NodeJS.Timer; private isProcessingBlocks: boolean = false; private blockCache: BlockCache; + private events: EventEmitter = new EventEmitter(); // Event emitter for block events constructor(client: LightStreamerClient, blockCache: BlockCache) { this.client = client; @@ -40,13 +43,26 @@ export class BlockProcessor { clearInterval(this.pollInterval); } + public waitForProcessorSync(): Promise { + console.log("Waiting for processor to sync"); + if (!this.isProcessingBlocks) { + return Promise.resolve(); + } + console.log("Processor is currently syncing. Waiting for it to finish"); + return new Promise((resolve) => { + const checkBlockSequence = async () => { + this.events.removeListener("blocks-processed", checkBlockSequence); + return resolve(); + }; + // Listen for block updates + this.events.on("blocks-processed", checkBlockSequence); + }); + } + private async _pollForNewBlocks() { if (this.isProcessingBlocks) { return; } - - this.isProcessingBlocks = true; - const [latestBlockError, latestBlock] = await this._getLatestBlock(); if (latestBlockError) { @@ -65,14 +81,14 @@ export class BlockProcessor { return; } + this.isProcessingBlocks = true; + const batchSize = process.env["BLOCK_PROCESSING_BATCH_SIZE"] ? parseInt(process.env["BLOCK_PROCESSING_BATCH_SIZE"]) : 100; - for (let i = cachedHeadSequence; i < headSequence; i += batchSize) { await this._processBlockRange(i, Math.min(i + batchSize, headSequence)); } - this.isProcessingBlocks = false; } @@ -112,6 +128,7 @@ export class BlockProcessor { }); stream.on("end", () => { + this.events.emit("blocks-processed", endSequence); res(true); }); }); diff --git a/example/src/index.ts b/example/src/index.ts index 59a72a8..edd5864 100644 --- a/example/src/index.ts +++ b/example/src/index.ts @@ -13,7 +13,10 @@ async function main() { throw new Error("SPENDING_KEY not found"); } - client.addAccount(spendingKey); + const publicAddress = client.addAccount(spendingKey); + console.log("Added account"); + await client.waitForAccountSync(publicAddress); + console.log("Account synced"); await client.waitUntilClose(); }