Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: handle errors during sync #38

Merged
merged 1 commit into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 73 additions & 18 deletions packages/automated-dispute/src/services/eboProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { isNativeError } from "util/types";
import { BlockNumberService } from "@ebo-agent/blocknumber";
import { Address, ILogger } from "@ebo-agent/shared";

Expand Down Expand Up @@ -52,42 +53,96 @@ export class EboProcessor {
// and trigger a request creation if there's no actor handling an <epoch, chain> request.
// This process should somehow check if there's already a request created for the epoch
// and chain that has no agent assigned and create it if that's the case.
try {
if (!this.lastCheckedBlock) {
this.lastCheckedBlock = await this.getEpochStartBlock();
}

if (!this.lastCheckedBlock) {
this.lastCheckedBlock = await this.getEpochStartBlock();
}
const lastBlock = await this.getLastFinalizedBlock();
const events = await this.getEvents(this.lastCheckedBlock, lastBlock);

const lastBlock = await this.protocolProvider.getLastFinalizedBlock();
const events = await this.protocolProvider.getEvents(this.lastCheckedBlock, lastBlock);
const eventsByRequestId = this.groupEventsByRequest(events);
const eventsByRequestId = this.groupEventsByRequest(events);
const synchableRequests = this.calculateSynchableRequests([
...eventsByRequestId.keys(),
]);

const synchableRequests = this.calculateSynchableRequests([...eventsByRequestId.keys()]);
const synchedRequests = [...synchableRequests].map(async (requestId: RequestId) => {
try {
const events = eventsByRequestId.get(requestId) ?? [];
this.logger.info(
`Reading events for the following requests:\n${synchableRequests.join(", ")}`,
);

await this.syncRequest(requestId, events, lastBlock);
} catch (err) {
this.onActorError(requestId, err as Error);
}
});
const synchedRequests = [...synchableRequests].map(async (requestId: RequestId) => {
try {
const events = eventsByRequestId.get(requestId) ?? [];

await this.syncRequest(requestId, events, lastBlock);
} catch (err) {
this.onActorError(requestId, err as Error);
}
});

await Promise.all(synchedRequests);

await Promise.all(synchedRequests);
this.logger.info(`Consumed events up to block ${lastBlock}.`);

this.lastCheckedBlock = lastBlock;
this.lastCheckedBlock = lastBlock;
} catch (err) {
if (isNativeError(err)) {
this.logger.error(`Sync failed: ` + `${err.message}\n\n` + `${err.stack}`);
} else {
this.logger.error(`Sync failed: ${err}`);
}

// TODO: notify
}
}

/**
* Fetches the first block of the current epoch.
*
* @returns the first block of the current epoch
*/
private async getEpochStartBlock() {
private async getEpochStartBlock(): Promise<bigint> {
this.logger.info("Fetching current epoch start block...");

const { currentEpochBlockNumber } = await this.protocolProvider.getCurrentEpoch();

this.logger.info(`Current epoch start block ${currentEpochBlockNumber} fetched.`);

return currentEpochBlockNumber;
}

/**
* Fetches the last finalized block on the protocol chain.
*
* @returns the last finalized block
*/
private async getLastFinalizedBlock(): Promise<bigint> {
this.logger.info("Fetching last finalized block...");

const lastBlock = await this.protocolProvider.getLastFinalizedBlock();

this.logger.info(`Last finalized block ${lastBlock} fetched.`);

return lastBlock;
}

/**
* Fetches the events to process during the sync.
*
* @param fromBlock block number lower bound for event search
* @param toBlock block number upper bound for event search
* @returns an array of events
*/
private async getEvents(fromBlock: bigint, toBlock: bigint): Promise<EboEvent<EboEventName>[]> {
this.logger.info(`Fetching events between blocks ${fromBlock} and ${toBlock}...`);

const events = await this.protocolProvider.getEvents(fromBlock, toBlock);

this.logger.info(`${events.length} events fetched.`);

return events;
}

/**
* Group events by its normalized request ID.
* .
Expand Down
52 changes: 51 additions & 1 deletion packages/automated-dispute/tests/services/eboProcessor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,57 @@ describe("EboProcessor", () => {
);
});

it("fetches events since last block checked after first events fetch", async () => {
it("keeps the last block checked unaltered when something fails during sync", async () => {
const initialCurrentBlock = 1n;

const { processor, protocolProvider, actorsManager } = mocks.buildEboProcessor(logger);
const { actor } = mocks.buildEboActor(request, logger);

const currentEpoch = {
currentEpoch: 1n,
currentEpochBlockNumber: 1n,
currentEpochTimestamp: BigInt(Date.UTC(2024, 1, 1, 0, 0, 0, 0)),
};

const mockProtocolProviderGetEvents = vi
.spyOn(protocolProvider, "getEvents")
.mockImplementationOnce(() => {
// Simulate failure during first synch
throw new Error();
})
.mockResolvedValueOnce([]);

vi.spyOn(protocolProvider, "getLastFinalizedBlock")
.mockResolvedValueOnce(initialCurrentBlock + 10n)
.mockResolvedValueOnce(initialCurrentBlock + 20n);

vi.spyOn(protocolProvider, "getCurrentEpoch").mockResolvedValue(currentEpoch);
vi.spyOn(actorsManager, "createActor").mockReturnValue(actor);
vi.spyOn(actor, "processEvents").mockImplementation(() => Promise.resolve());
vi.spyOn(actor, "onLastBlockUpdated").mockImplementation(() => Promise.resolve());
vi.spyOn(actor, "canBeTerminated").mockResolvedValue(false);

await processor.start(msBetweenChecks);

expect(mockProtocolProviderGetEvents).toHaveBeenNthCalledWith(
1,
currentEpoch.currentEpochBlockNumber,
initialCurrentBlock + 10n,
);

expect(mockProtocolProviderGetEvents).toHaveBeenCalledTimes(1);
expect(logger.error).toHaveBeenCalledWith(expect.stringMatching("Sync failed"));

await vi.advanceTimersByTimeAsync(msBetweenChecks);

expect(mockProtocolProviderGetEvents).toHaveBeenNthCalledWith(
2,
currentEpoch.currentEpochBlockNumber,
initialCurrentBlock + 20n,
);
});

it("fetches non-consumed events if event fetching fails", async () => {
const { processor, protocolProvider, actorsManager } = mocks.buildEboProcessor(logger);
const { actor } = mocks.buildEboActor(request, logger);

Expand Down