Skip to content

Commit

Permalink
feat: wait function for account/processor
Browse files Browse the repository at this point in the history
  • Loading branch information
jowparks committed Sep 5, 2023
1 parent 72da3b8 commit 9fede9a
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 8 deletions.
17 changes: 16 additions & 1 deletion example/src/Client/Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,22 @@ export class Client {
}

public addAccount(privateKey: string) {
this.accountsManager.addAccount(privateKey);
return this.accountsManager.addAccount(privateKey);
}

public async waitForProcessorSync() {
await this.blockProcessor.waitForProcessorSync();
}

public async waitForAccountSync(publicAddress: string) {
await this.waitForProcessorSync();
console.log(
`Processor synced. Waiting for account ${publicAddress} to sync`,
);

const head = await this.blockCache.getHeadSequence();
await this.accountsManager.waitForAccountSync(publicAddress, head);
console.log(`Account ${publicAddress} synced to head ${head}`);
}

public async start() {
Expand Down
47 changes: 46 additions & 1 deletion example/src/Client/utils/AccountsManager.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { EventEmitter } from "events";

import {
generateKeyFromPrivateKey,
Key,
Expand All @@ -24,6 +26,7 @@ export interface DecryptedNoteValue {

interface AccountData {
key: Key;
head: number;
assets: Map<
string,
{
Expand All @@ -37,13 +40,15 @@ export class AccountsManager {
private blockCache: BlockCache;
/** publicKey => AccountData */
private accounts: Map<string, AccountData> = new Map();
private events: EventEmitter = new EventEmitter();

constructor(blockCache: BlockCache) {
this.blockCache = blockCache;
}

public addAccount(privateKey: string) {
this.accounts.set(...this._makeAccountData(privateKey));
const accountData = this._makeAccountData(privateKey);
this.accounts.set(...accountData);

this.blockCache
.createReadStream()
Expand All @@ -53,7 +58,42 @@ export class AccountsManager {
return;
}
this._processBlockForTransactions(value);
this.events.emit("accounts-updated");
});

return accountData[0];
}

public waitForAccountSync(
publicAddress: string,
sequence: number,
): Promise<void> {
return new Promise((resolve, reject) => {
const checkSequence = () => {
const accountData = this.accounts.get(publicAddress);
if (!accountData) {
this.events.removeListener("accounts-updated", checkSequence);
return reject(
new Error(`Account with public address ${publicAddress} not found`),
);
}
logThrottled(
`Waiting for sync to complete, ${accountData.head}/${sequence}`,
1000,
accountData.head,
);
if (accountData.head >= sequence) {
this.events.removeListener("accounts-updated", checkSequence);
return resolve();
}
};

// Check initially
checkSequence();

// Listen for account updates
this.events.on("accounts-updated", checkSequence);
});
}

public getPublicAddresses() {
Expand All @@ -66,6 +106,7 @@ export class AccountsManager {
key.publicAddress,
{
key,
head: 0,
assets: new Map(),
},
];
Expand All @@ -86,6 +127,10 @@ export class AccountsManager {

// @todo: Process spends
});

for (const [_, account] of this.accounts) {
account.head = parsedBlock.sequence;
}
}

private _processNote(
Expand Down
27 changes: 22 additions & 5 deletions example/src/Client/utils/BlockProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { EventEmitter } from "events";

import { ServiceError } from "@grpc/grpc-js";
import { BlockCache } from "./BlockCache";
import {
Expand All @@ -16,6 +18,7 @@ export class BlockProcessor {
private pollInterval?: NodeJS.Timer;
private isProcessingBlocks: boolean = false;
private blockCache: BlockCache;
private events: EventEmitter = new EventEmitter(); // Event emitter for block events

constructor(client: LightStreamerClient, blockCache: BlockCache) {
this.client = client;
Expand All @@ -40,13 +43,26 @@ export class BlockProcessor {
clearInterval(this.pollInterval);
}

public waitForProcessorSync(): Promise<void> {
console.log("Waiting for processor to sync");
if (!this.isProcessingBlocks) {
return Promise.resolve();
}
console.log("Processor is currently syncing. Waiting for it to finish");
return new Promise((resolve) => {
const checkBlockSequence = async () => {
this.events.removeListener("blocks-processed", checkBlockSequence);
return resolve();
};
// Listen for block updates
this.events.on("blocks-processed", checkBlockSequence);
});
}

private async _pollForNewBlocks() {
if (this.isProcessingBlocks) {
return;
}

this.isProcessingBlocks = true;

const [latestBlockError, latestBlock] = await this._getLatestBlock();

if (latestBlockError) {
Expand All @@ -65,14 +81,14 @@ export class BlockProcessor {
return;
}

this.isProcessingBlocks = true;

const batchSize = process.env["BLOCK_PROCESSING_BATCH_SIZE"]
? parseInt(process.env["BLOCK_PROCESSING_BATCH_SIZE"])
: 100;

for (let i = cachedHeadSequence; i < headSequence; i += batchSize) {
await this._processBlockRange(i, Math.min(i + batchSize, headSequence));
}

this.isProcessingBlocks = false;
}

Expand Down Expand Up @@ -112,6 +128,7 @@ export class BlockProcessor {
});

stream.on("end", () => {
this.events.emit("blocks-processed", endSequence);
res(true);
});
});
Expand Down
5 changes: 4 additions & 1 deletion example/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ async function main() {
throw new Error("SPENDING_KEY not found");
}

client.addAccount(spendingKey);
const publicAddress = client.addAccount(spendingKey);
console.log("Added account");
await client.waitForAccountSync(publicAddress);
console.log("Account synced");

await client.waitUntilClose();
}
Expand Down

0 comments on commit 9fede9a

Please sign in to comment.