diff --git a/apps/agent/test/e2e/scenarios/01_happy_path/index.spec.ts b/apps/agent/test/e2e/scenarios/01_happy_path/index.spec.ts index b6809e0..1cbcdc7 100644 --- a/apps/agent/test/e2e/scenarios/01_happy_path/index.spec.ts +++ b/apps/agent/test/e2e/scenarios/01_happy_path/index.spec.ts @@ -2,6 +2,8 @@ import { EboActorsManager, EboProcessor, NotificationService, + oracleAbi, + ProphetCodec, ProtocolProvider, } from "@ebo-agent/automated-dispute"; import { BlockNumberService } from "@ebo-agent/blocknumber"; @@ -11,6 +13,7 @@ import { Account, Address, createTestClient, + getAbiItem, Hex, http, keccak256, @@ -25,7 +28,8 @@ import { import { arbitrumSepolia } from "viem/chains"; import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; -import type { AnvilClient, DeployContractsOutput } from "../../utils/prophet-e2e-scaffold/index.js"; +import type { DeployContractsOutput } from "../../utils/prophet-e2e-scaffold/index.js"; +import { getCurrentEpoch, setEpochLength } from "../../utils/prophet-e2e-scaffold/epochManager.js"; import { createAnvilServer, deployContracts, @@ -40,9 +44,6 @@ const E2E_TEST_TIMEOUT = 30_000; // TODO: it'd be nice to have zod here const KEYSTORE_PASSWORD = process.env.KEYSTORE_PASSWORD || ""; -// TODO: use env vars here -const FORK_URL = "https://arbitrum-sepolia.gateway.tenderly.co"; - // TODO: probably could be added as a submodule inside the e2e folder const EBO_CORE_PATH = "../../../EBO-core/"; @@ -53,6 +54,7 @@ const HORIZON_STAKING_ADDRESS = "0x3F53F9f9a5d7F36dCC869f8D2F227499c411c0cf"; // Extracted from https://thegraph.com/docs/en/network/contracts/ const EPOCH_MANAGER_ADDRESS = "0x7975475801BEf845f10Ce7784DC69aB1e0344f11"; +const GOVERNOR_ADDRESS = "0xadE6B8EB69a49B56929C1d4F4b428d791861dB6f"; // Arbitrum // const GRT_HOLDER = "0x00669A4CF01450B64E8A2A20E9b1FCB71E61eF03"; @@ -64,32 +66,32 @@ const EPOCH_MANAGER_ADDRESS = "0x7975475801BEf845f10Ce7784DC69aB1e0344f11"; // TODO: this is currently hardcoded on the contract's Deploy script, change when defined const ARBITRATOR_ADDRESS: Address = padHex("0x100", { dir: "left", size: 20 }); -// const ARBITRUM_ID = "eip155:42161"; const ARBITRUM_SEPOLIA_ID = "eip155:421614"; -const PROTOCOL_L1_CHAIN_ID = "eip155:1"; const PROTOCOL_L2_CHAIN = arbitrumSepolia; const PROTOCOL_L2_CHAIN_ID = ARBITRUM_SEPOLIA_ID; const PROTOCOL_L2_LOCAL_RPC_HOST = "127.0.0.1"; const PROTOCOL_L2_LOCAL_RPC_PORT = 8545; +const FORK_L2_URL = "https://arbitrum-sepolia.gateway.tenderly.co"; + const PROTOCOL_L2_LOCAL_URL = `http://${PROTOCOL_L2_LOCAL_RPC_HOST}:${PROTOCOL_L2_LOCAL_RPC_PORT}/1`; describe.sequential("single agent", () => { - let protocolAnvil: CreateServerReturnType; + let l2ProtocolAnvil: CreateServerReturnType; let protocolContracts: DeployContractsOutput; let accounts: { privateKey: Hex; account: Account; walletClient: WalletClient }[]; beforeEach(async () => { - protocolAnvil = await createAnvilServer( + l2ProtocolAnvil = await createAnvilServer( PROTOCOL_L2_LOCAL_RPC_HOST, PROTOCOL_L2_LOCAL_RPC_PORT, { - forkUrl: FORK_URL, + forkUrl: FORK_L2_URL, + slotsInAnEpoch: 1, blockTime: 0.1, - slotsInAnEpoch: 1, // To "finalize" blocks fast enough }, ); @@ -111,7 +113,7 @@ describe.sequential("single agent", () => { chain: PROTOCOL_L2_CHAIN, grtHolder: GRT_HOLDER, grtContractAddress: GRT_CONTRACT_ADDRESS, - grtFundAmount: parseEther("5"), + grtFundAmount: parseEther("50"), }), ]; @@ -121,7 +123,7 @@ describe.sequential("single agent", () => { grtAddress: GRT_CONTRACT_ADDRESS, horizonStakingAddress: HORIZON_STAKING_ADDRESS, chainsToAdd: [PROTOCOL_L2_CHAIN_ID], - bondAmount: parseEther("0.5"), + grtProvisionAmount: parseEther("45"), anvilClient: createTestClient({ mode: "anvil", transport: http(PROTOCOL_L2_LOCAL_URL), @@ -134,7 +136,7 @@ describe.sequential("single agent", () => { }, E2E_SCENARIO_SETUP_TIMEOUT); afterEach(async () => { - await protocolAnvil.stop(); + await l2ProtocolAnvil.stop(); }); test.skip("basic flow", { timeout: E2E_TEST_TIMEOUT }, async () => { @@ -143,7 +145,8 @@ describe.sequential("single agent", () => { const protocolProvider = new ProtocolProvider( { l1: { - chainId: PROTOCOL_L1_CHAIN_ID, + chainId: PROTOCOL_L2_CHAIN_ID, + // Using the same RPC due to Anvil's arbitrum block number bug urls: [PROTOCOL_L2_LOCAL_URL], transactionReceiptConfirmations: 1, timeout: 1_000, @@ -213,30 +216,113 @@ describe.sequential("single agent", () => { .extend(publicActions) .extend(walletActions); + // Set epoch length to a big enough epoch length as in sepolia is way too short at the moment + await setEpochLength({ + length: 100_000n, + client: anvilClient, + epochManagerAddress: EPOCH_MANAGER_ADDRESS, + governorAddress: GOVERNOR_ADDRESS, + }); + const initBlock = await anvilClient.getBlockNumber(); + const currentEpoch = await getCurrentEpoch({ + client: anvilClient, + epochManagerAddress: EPOCH_MANAGER_ADDRESS, + }); processor.start(3000); - // TODO: replace by NewEpoch event - const requestCreatedAbi = parseAbiItem( - "event RequestCreated(bytes32 indexed _requestId, uint256 indexed _epoch, string indexed _chainId)", - ); + const requestCreatedAbi = getAbiItem({ abi: oracleAbi, name: "RequestCreated" }); + + let chainRequestId: Hex; - const eventFound = await waitForEvent({ + const requestCreatedEvent = await waitForEvent({ client: anvilClient, filter: { - address: protocolContracts["EBORequestCreator"], + address: protocolContracts["Oracle"], fromBlock: initBlock, event: requestCreatedAbi, strict: true, }, matcher: (log) => { - return log.args._chainId === keccak256(toHex(PROTOCOL_L2_CHAIN_ID)); + const { requestModuleData } = log.args._request; + const { chainId } = ProphetCodec.decodeRequestRequestModuleData(requestModuleData); + + if (chainId !== ARBITRUM_SEPOLIA_ID) return false; + + chainRequestId = log.args._requestId; + + return true; + }, + pollingIntervalMs: 100, + blockTimeout: initBlock + 1000n, + }); + + expect(requestCreatedEvent).toBe(true); + + const responseProposedAbi = getAbiItem({ abi: oracleAbi, name: "ResponseProposed" }); + + const responseProposedEvent = await waitForEvent({ + client: anvilClient, + filter: { + address: protocolContracts["Oracle"], + fromBlock: initBlock, + event: responseProposedAbi, + strict: true, + }, + matcher: (log) => { + return log.args._requestId === chainRequestId; }, pollingIntervalMs: 100, blockTimeout: initBlock + 1000n, }); - expect(eventFound).toBe(true); + expect(responseProposedEvent).toBe(true); + + await anvilClient.increaseTime({ seconds: 60 * 60 * 24 * 7 * 4 }); + + const oracleRequestFinalizedAbi = getAbiItem({ + abi: oracleAbi, + name: "OracleRequestFinalized", + }); + + const [oracleRequestFinalizedEvent, newEpochEvent] = await Promise.all([ + waitForEvent({ + client: anvilClient, + filter: { + address: protocolContracts["Oracle"], + fromBlock: initBlock, + event: oracleRequestFinalizedAbi, + strict: true, + }, + matcher: (log) => { + return log.args._requestId === chainRequestId; + }, + pollingIntervalMs: 100, + blockTimeout: initBlock + 1000n, + }), + waitForEvent({ + client: anvilClient, + filter: { + address: protocolContracts["EBOFinalityModule"], + fromBlock: initBlock, + event: parseAbiItem( + "event NewEpoch(uint256 indexed _epoch, string indexed _chainId, uint256 _blockNumber)", + ), + strict: true, + }, + matcher: (log) => { + return ( + log.args._chainId === keccak256(toHex(ARBITRUM_SEPOLIA_ID)) && + log.args._epoch === currentEpoch + ); + }, + pollingIntervalMs: 100, + blockTimeout: initBlock + 1000n, + }), + ]); + + expect(oracleRequestFinalizedEvent).toBeDefined(); + expect(newEpochEvent).toBeDefined(); }); }); diff --git a/apps/agent/test/e2e/utils/prophet-e2e-scaffold/eboCore.ts b/apps/agent/test/e2e/utils/prophet-e2e-scaffold/eboCore.ts index e903076..e347a16 100644 --- a/apps/agent/test/e2e/utils/prophet-e2e-scaffold/eboCore.ts +++ b/apps/agent/test/e2e/utils/prophet-e2e-scaffold/eboCore.ts @@ -9,6 +9,7 @@ import { createTestClient, createWalletClient, encodeFunctionData, + formatEther, http, HttpTransport, parseAbi, @@ -226,8 +227,8 @@ interface SetUpProphetInput { accounts: Account[]; /** Map of deployed contracts */ deployedContracts: DeployContractsOutput; - /** Bond amount */ - bondAmount: bigint; + /** GRT amount to provision account with to be able to bond tokens throughout its operation */ + grtProvisionAmount: bigint; /** Arbitrator address to use to add chains into EBORequestCreator */ arbitratorAddress: Address; /** GRT address */ @@ -244,7 +245,13 @@ interface SetUpProphetInput { * @param input {@link SetUpProphetInput} */ export async function setUpProphet(input: SetUpProphetInput) { - const { chainsToAdd, accounts, deployedContracts, anvilClient, bondAmount } = input; + const { + chainsToAdd, + accounts, + deployedContracts, + anvilClient, + grtProvisionAmount: bondAmount, + } = input; const { arbitratorAddress, grtAddress, horizonStakingAddress } = input; await approveEboProphetModules(accounts, deployedContracts, anvilClient); @@ -328,7 +335,7 @@ async function stakeGrtWithProvision( horizonStaking: Address; horizonAccountingExtension: Address; }, - bondSize: bigint, + grtProvisionAmount: bigint, anvilClient: AnvilClient, ) { console.log("Staking GRT into Horizon..."); @@ -343,7 +350,7 @@ async function stakeGrtWithProvision( to: grt, data: encodeFunctionData({ abi: parseAbi(["function approve(address, uint256)"]), - args: [horizonStaking, bondSize * 5n], + args: [horizonStaking, grtProvisionAmount], }), }); @@ -351,14 +358,14 @@ async function stakeGrtWithProvision( hash: approveHash, }); - console.log(`Staking for ${account.address} ${bondSize}...`); + console.log(`Staking for ${account.address} ${formatEther(grtProvisionAmount)} GRT...`); const stakeHash = await anvilClient.sendTransaction({ account: account, to: horizonStaking, data: encodeFunctionData({ abi: parseAbi(["function stake(uint256)"]), - args: [bondSize], + args: [grtProvisionAmount], }), }); @@ -366,7 +373,9 @@ async function stakeGrtWithProvision( hash: stakeHash, }); - console.log(`Provisioning ${bondSize} for ${account.address}...`); + console.log( + `Provisioning ${account.address} with ${formatEther(grtProvisionAmount)} GRT...`, + ); const provisionHash = await anvilClient.sendTransaction({ account: account, @@ -376,7 +385,7 @@ async function stakeGrtWithProvision( args: [ account.address, horizonAccountingExtension, - bondSize, + grtProvisionAmount, // TODO: use contract call to get this value // https://github.com/defi-wonderland/EBO-core/blob/175bcd57c3254a90dd6fcbf53b3db3359085551f/src/contracts/HorizonAccountingExtension.sol#L38C26-L38C42 1_000_000, diff --git a/apps/agent/test/e2e/utils/prophet-e2e-scaffold/epochManager.ts b/apps/agent/test/e2e/utils/prophet-e2e-scaffold/epochManager.ts new file mode 100644 index 0000000..545c4ca --- /dev/null +++ b/apps/agent/test/e2e/utils/prophet-e2e-scaffold/epochManager.ts @@ -0,0 +1,61 @@ +import { epochManagerAbi } from "@ebo-agent/automated-dispute"; +import { Address, Chain, HttpTransport } from "viem"; + +import { AnvilClient } from "./anvil"; + +type SetEpochLengthInput = { + client: AnvilClient; + governorAddress: Address; + epochManagerAddress: Address; + length: bigint; +}; + +export const setEpochLength = async (params: SetEpochLengthInput) => { + const { client, governorAddress, epochManagerAddress, length } = params; + + client.impersonateAccount({ + address: governorAddress, + }); + + const tx = await client.writeContract({ + address: epochManagerAddress, + account: governorAddress, + abi: epochManagerAbi, + functionName: "setEpochLength", + args: [length], + }); + + await client.waitForTransactionReceipt({ hash: tx }); + + client.stopImpersonatingAccount({ + address: governorAddress, + }); +}; + +type GetEpochLengthInput = Omit; + +export const getEpochLength = async (params: GetEpochLengthInput) => { + const { client, governorAddress, epochManagerAddress } = params; + + return await client.readContract({ + address: epochManagerAddress, + account: governorAddress, + abi: epochManagerAbi, + functionName: "epochLength", + }); +}; + +type GetCurrentEpochInput = { + client: AnvilClient; + epochManagerAddress: Address; +}; + +export const getCurrentEpoch = async (params: GetCurrentEpochInput) => { + const { client, epochManagerAddress } = params; + + return await client.readContract({ + address: epochManagerAddress, + abi: epochManagerAbi, + functionName: "currentEpoch", + }); +}; diff --git a/packages/automated-dispute/src/external.ts b/packages/automated-dispute/src/external.ts index f771947..f426520 100644 --- a/packages/automated-dispute/src/external.ts +++ b/packages/automated-dispute/src/external.ts @@ -1,4 +1,5 @@ -export { EboProcessor, EboActorsManager, DiscordNotifier } from "./services/index.js"; +export { EboProcessor, EboActorsManager, DiscordNotifier, ProphetCodec } from "./services/index.js"; export type { NotificationService } from "./interfaces/index.js"; export { ProtocolProvider } from "./providers/index.js"; export type { AccountingModules } from "./types/index.js"; +export { oracleAbi, epochManagerAbi } from "./abis/index.js"; diff --git a/packages/automated-dispute/src/services/eboProcessor.ts b/packages/automated-dispute/src/services/eboProcessor.ts index aea4825..f6f105c 100644 --- a/packages/automated-dispute/src/services/eboProcessor.ts +++ b/packages/automated-dispute/src/services/eboProcessor.ts @@ -1,9 +1,13 @@ import { isNativeError } from "util/types"; import { BlockNumberService } from "@ebo-agent/blocknumber"; import { Caip2ChainId, Caip2Utils, HexUtils, ILogger, UnixTimestamp } from "@ebo-agent/shared"; -import { Block } from "viem"; +import { Block, ContractFunctionRevertedError } from "viem"; -import { PendingModulesApproval, ProcessorAlreadyStarted } from "../exceptions/index.js"; +import { + PastEventEnqueueError, + PendingModulesApproval, + ProcessorAlreadyStarted, +} from "../exceptions/index.js"; import { isRequestCreatedEvent } from "../guards.js"; import { NotificationService } from "../interfaces/index.js"; import { ProtocolProvider } from "../providers/index.js"; @@ -110,11 +114,16 @@ export class EboProcessor { const currentEpoch = await this.getCurrentEpoch(); if (!this.lastCheckedBlock) { - this.lastCheckedBlock = currentEpoch.firstBlockNumber; + // We want to emulate the previous epoch being fully checked + this.lastCheckedBlock = currentEpoch.firstBlockNumber - 1n; } const lastBlock = await this.getLastFinalizedBlock(); - const events = await this.getEvents(this.lastCheckedBlock, lastBlock.number); + + // Events will sync starting from the block after the last checked one, + // making the block interval exclusive on its lower bound: + // (last checked block, last block] + const events = await this.getEvents(this.lastCheckedBlock + 1n, lastBlock.number); const eventsByRequestId = this.groupEventsByRequest(events); const synchableRequests = this.calculateSynchableRequests([ @@ -262,7 +271,20 @@ export class EboProcessor { return; } - events.forEach((event) => actor.enqueue(event)); + events.forEach((event) => { + try { + actor.enqueue(event); + } catch (err) { + if (err instanceof PastEventEnqueueError) { + this.logger.warn( + `Dropping already enqueued event at ${event.blockNumber} block ` + + `with log index ${event.logIndex}`, + ); + } else { + throw err; + } + } + }); const lastBlockTimestamp = lastBlock.timestamp as UnixTimestamp; @@ -383,6 +405,12 @@ export class EboProcessor { return !isHandled; }); + if (!unhandledEpochChain || unhandledEpochChain.length === 0) { + this.logger.info(`No requests to create for epoch ${epoch}`); + + return; + } + this.logger.info("Creating missing requests..."); const epochChainRequests = unhandledEpochChain.map(async (chain) => { @@ -396,7 +424,16 @@ export class EboProcessor { // Request creation must be notified but it's not critical, as it will be // retried during next sync. - // TODO: warn when getting a EBORequestCreator_RequestAlreadyCreated + if (err instanceof ContractFunctionRevertedError) { + if (err.name === "EBORequestCreator_RequestAlreadyCreated") { + this.logger.info( + `Request for epoch ${epoch} and chain ${chain} already created`, + ); + + return; + } + } + this.logger.error( `Could not create a request for epoch ${epoch} and chain ${chain}.`, ); diff --git a/packages/automated-dispute/src/services/prophetCodec.ts b/packages/automated-dispute/src/services/prophetCodec.ts index 22cd1b8..69512ef 100644 --- a/packages/automated-dispute/src/services/prophetCodec.ts +++ b/packages/automated-dispute/src/services/prophetCodec.ts @@ -4,28 +4,46 @@ import { Address, decodeAbiParameters, encodeAbiParameters } from "viem"; import { Request, Response } from "../types/prophet.js"; const REQUEST_MODULE_DATA_REQUEST_ABI_FIELDS = [ - { name: "epoch", type: "uint256" }, - { name: "chainId", type: "string" }, - { name: "accountingExtension", type: "address" }, - { name: "paymentAmount", type: "uint256" }, + { + components: [ + { name: "epoch", type: "uint256" }, + { name: "chainId", type: "string" }, + { name: "accountingExtension", type: "address" }, + { name: "paymentAmount", type: "uint256" }, + ], + name: "requestModuleData", + type: "tuple", + }, ] as const; const RESPONSE_MODULE_DATA_REQUEST_ABI_FIELDS = [ - { name: "accountingExtension", type: "address" }, - { name: "bondToken", type: "address" }, - { name: "bondSize", type: "uint256" }, - { name: "deadline", type: "uint256" }, - { name: "disputeWindow", type: "uint256" }, + { + components: [ + { name: "accountingExtension", type: "address" }, + { name: "bondToken", type: "address" }, + { name: "bondSize", type: "uint256" }, + { name: "deadline", type: "uint256" }, + { name: "disputeWindow", type: "uint256" }, + ], + name: "responseModuleData", + type: "tuple", + }, ] as const; const DISPUTE_MODULE_DATA_REQUEST_ABI_FIELDS = [ - { name: "accountingExtension", type: "address" }, - { name: "bondToken", type: "address" }, - { name: "bondSize", type: "uint256" }, - { name: "maxNumberOfEscalations", type: "uint256" }, - { name: "bondEscalationDeadline", type: "uint256" }, - { name: "tyingBuffer", type: "uint256" }, - { name: "disputeWindow", type: "uint256" }, + { + components: [ + { name: "accountingExtension", type: "address" }, + { name: "bondToken", type: "address" }, + { name: "bondSize", type: "uint256" }, + { name: "maxNumberOfEscalations", type: "uint256" }, + { name: "bondEscalationDeadline", type: "uint256" }, + { name: "tyingBuffer", type: "uint256" }, + { name: "disputeWindow", type: "uint256" }, + ], + name: "disputeModuleData", + type: "tuple", + }, ] as const; const RESPONSE_RESPONSE_ABI_FIELDS = [{ name: "block", type: "uint256" }] as const; @@ -48,10 +66,10 @@ export class ProphetCodec { ); return { - epoch: decodeParameters[0], - chainId: decodeParameters[1] as Caip2ChainId, - accountingExtension: decodeParameters[2] as Address, - paymentAmount: decodeParameters[3], + epoch: decodeParameters[0].epoch, + chainId: decodeParameters[0].chainId as Caip2ChainId, + accountingExtension: decodeParameters[0].accountingExtension as Address, + paymentAmount: decodeParameters[0].paymentAmount, }; } @@ -66,12 +84,7 @@ export class ProphetCodec { static encodeRequestRequestModuleData( requestModuleData: Request["decodedData"]["requestModuleData"], ): Request["prophetData"]["requestModuleData"] { - return encodeAbiParameters(REQUEST_MODULE_DATA_REQUEST_ABI_FIELDS, [ - requestModuleData.epoch, - requestModuleData.chainId, - requestModuleData.accountingExtension, - requestModuleData.paymentAmount, - ]); + return encodeAbiParameters(REQUEST_MODULE_DATA_REQUEST_ABI_FIELDS, [requestModuleData]); } /** @@ -89,13 +102,7 @@ export class ProphetCodec { responseModuleData, ); - return { - accountingExtension: decodedParameters[0], - bondToken: decodedParameters[1], - bondSize: decodedParameters[2], - deadline: decodedParameters[3], - disputeWindow: decodedParameters[4], - }; + return decodedParameters[0]; } /** @@ -108,13 +115,7 @@ export class ProphetCodec { static encodeRequestResponseModuleData( responseModuleData: Request["decodedData"]["responseModuleData"], ): Request["prophetData"]["responseModuleData"] { - return encodeAbiParameters(RESPONSE_MODULE_DATA_REQUEST_ABI_FIELDS, [ - responseModuleData.accountingExtension, - responseModuleData.bondToken, - responseModuleData.bondSize, - responseModuleData.deadline, - responseModuleData.disputeWindow, - ]); + return encodeAbiParameters(RESPONSE_MODULE_DATA_REQUEST_ABI_FIELDS, [responseModuleData]); } /** @@ -132,15 +133,7 @@ export class ProphetCodec { disputeModuleData, ); - return { - accountingExtension: decodedParameters[0], - bondToken: decodedParameters[1], - bondSize: decodedParameters[2], - maxNumberOfEscalations: decodedParameters[3], - bondEscalationDeadline: decodedParameters[4], - tyingBuffer: decodedParameters[5], - disputeWindow: decodedParameters[6], - }; + return decodedParameters[0]; } /** @@ -153,15 +146,7 @@ export class ProphetCodec { static encodeRequestDisputeModuleData( disputeModuleData: Request["decodedData"]["disputeModuleData"], ): Request["prophetData"]["disputeModuleData"] { - return encodeAbiParameters(DISPUTE_MODULE_DATA_REQUEST_ABI_FIELDS, [ - disputeModuleData.accountingExtension, - disputeModuleData.bondToken, - disputeModuleData.bondSize, - disputeModuleData.maxNumberOfEscalations, - disputeModuleData.bondEscalationDeadline, - disputeModuleData.tyingBuffer, - disputeModuleData.disputeWindow, - ]); + return encodeAbiParameters(DISPUTE_MODULE_DATA_REQUEST_ABI_FIELDS, [disputeModuleData]); } /** diff --git a/packages/automated-dispute/tests/services/eboProcessor.spec.ts b/packages/automated-dispute/tests/services/eboProcessor.spec.ts index 198bf72..320a4d7 100644 --- a/packages/automated-dispute/tests/services/eboProcessor.spec.ts +++ b/packages/automated-dispute/tests/services/eboProcessor.spec.ts @@ -2,7 +2,11 @@ import { UnixTimestamp } from "@ebo-agent/shared"; import { Block, Hex } from "viem"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import { PendingModulesApproval, ProcessorAlreadyStarted } from "../../src/exceptions/index.js"; +import { + PastEventEnqueueError, + PendingModulesApproval, + ProcessorAlreadyStarted, +} from "../../src/exceptions/index.js"; import { NotificationService } from "../../src/interfaces/notificationService.js"; import { AccountingModules, @@ -244,6 +248,61 @@ describe("EboProcessor", () => { ); }); + it("drops past events and keeps operating", async () => { + const { processor, protocolProvider, actorsManager } = mocks.buildEboProcessor( + logger, + accountingModules, + notifier, + ); + const { actor } = mocks.buildEboActor(request, logger); + + const currentEpoch = { + number: 1n, + firstBlockNumber: 1n, + startTimestamp: BigInt(Date.UTC(2024, 1, 1, 0, 0, 0, 0)) as UnixTimestamp, + }; + + const currentBlock = { + number: currentEpoch.firstBlockNumber + 10n, + } as unknown as Block; + + const requestCreatedEvent: EboEvent<"RequestCreated"> = { + name: "RequestCreated", + blockNumber: 1n, + logIndex: 1, + timestamp: BigInt(Date.UTC(2024, 1, 1, 0, 0, 0, 0)) as UnixTimestamp, + requestId: request.id, + metadata: { + requestId: request.id, + request: request.prophetData, + ipfsHash: "0x01" as Hex, + }, + }; + + vi.spyOn(protocolProvider, "getAccountingApprovedModules").mockResolvedValue( + allModulesApproved, + ); + vi.spyOn(protocolProvider, "getCurrentEpoch").mockResolvedValue(currentEpoch); + vi.spyOn(protocolProvider, "getLastFinalizedBlock").mockResolvedValue(currentBlock); + vi.spyOn(actorsManager, "createActor").mockReturnValue(actor); + vi.spyOn(actorsManager, "getActor").mockReturnValue(actor); + vi.spyOn(actor, "processEvents").mockImplementation(() => Promise.resolve()); + vi.spyOn(actor, "onLastBlockUpdated").mockImplementation(() => Promise.resolve()); + vi.spyOn(actor, "canBeTerminated").mockResolvedValue(false); + vi.spyOn(actor, "enqueue").mockImplementation(() => { + throw new PastEventEnqueueError(requestCreatedEvent, requestCreatedEvent); + }); + + const mockGetEvents = vi.spyOn(protocolProvider, "getEvents"); + mockGetEvents.mockResolvedValue([requestCreatedEvent]); + + await processor.start(msBetweenChecks); + + expect(logger.warn).toHaveBeenCalledWith( + expect.stringContaining("Dropping already enqueued event"), + ); + }); + it("keeps the last block checked unaltered when something fails during sync", async () => { const initialCurrentBlock = 1n; @@ -357,7 +416,7 @@ describe("EboProcessor", () => { const mockGetEvents = vi.spyOn(protocolProvider, "getEvents"); mockGetEvents.mockResolvedValue([requestCreatedEvent]); - processor["lastCheckedBlock"] = mockLastCheckedBlock; + processor["lastCheckedBlock"] = mockLastCheckedBlock - 1n; await processor.start(msBetweenChecks);