diff --git a/.gitignore b/.gitignore index 712b54b9..2127aa72 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ out/ build dist cache +tmp/ # Coverage diff --git a/apps/agent/package.json b/apps/agent/package.json index 90005f25..3d4638bc 100644 --- a/apps/agent/package.json +++ b/apps/agent/package.json @@ -29,6 +29,7 @@ "devDependencies": { "execa": "9.4.0", "prool": "0.0.16", + "tsx": "4.19.1", "viem": "2.21.10", "vitest": "2.0.3" } diff --git a/apps/agent/src/index.ts b/apps/agent/src/index.ts index 7494172f..aca5dcfd 100644 --- a/apps/agent/src/index.ts +++ b/apps/agent/src/index.ts @@ -1,25 +1,35 @@ import { inspect } from "util"; import { isNativeError } from "util/types"; import { - DiscordNotifier, EboActorsManager, EboProcessor, + NotificationService, ProtocolProvider, } from "@ebo-agent/automated-dispute"; import { BlockNumberService } from "@ebo-agent/blocknumber"; -import { Logger } from "@ebo-agent/shared"; +import { Logger, stringify } from "@ebo-agent/shared"; import { config } from "./config/index.js"; const logger = Logger.getInstance(); const main = async (): Promise => { + logger.debug("Initializing agent..."); + + logger.debug("Initializing block number service..."); + logger.debug(stringify(config.blockNumberService)); + const blockNumberService = new BlockNumberService( config.blockNumberService.chainRpcUrls, config.blockNumberService.blockmetaConfig, logger, ); + logger.debug("Block number service initialized."); + + logger.debug("Initializing protocol provider..."); + logger.debug(stringify(config.protocolProvider)); + const protocolProvider = new ProtocolProvider( config.protocolProvider.rpcsConfig, config.protocolProvider.contracts, @@ -27,15 +37,24 @@ const main = async (): Promise => { blockNumberService, ); + logger.debug("Protocol provider initialized."); + + const discordConfig = { + discordBotToken: config.DISCORD_BOT_TOKEN, + discordChannelId: config.DISCORD_CHANNEL_ID, + }; + + logger.debug("Initializing notifier..."); + logger.debug(stringify(discordConfig)); + + // const notifier = await DiscordNotifier.create(discordConfig, logger); + // FIXME: during E2E DiscordNotifier is not able to start even if setting a valid token + const notifier = { notifyError: (_e, _ctx) => {} } as NotificationService; + const actorsManager = new EboActorsManager(); - const notifier = await DiscordNotifier.create( - { - discordBotToken: config.DISCORD_BOT_TOKEN, - discordChannelId: config.DISCORD_CHANNEL_ID, - }, - logger, - ); + logger.debug("Initializing EBO processor..."); + logger.debug(stringify(config.processor)); const processor = new EboProcessor( config.processor.accountingModules, @@ -46,6 +65,9 @@ const main = async (): Promise => { notifier, ); + logger.debug("EBO processor initialized."); + logger.debug("Starting processing..."); + await processor.start(config.processor.msBetweenChecks); }; diff --git a/apps/agent/test/e2e/scenarios/01_happy_path/index.spec.ts b/apps/agent/test/e2e/scenarios/01_happy_path/index.spec.ts index 0681efbf..e676a143 100644 --- a/apps/agent/test/e2e/scenarios/01_happy_path/index.spec.ts +++ b/apps/agent/test/e2e/scenarios/01_happy_path/index.spec.ts @@ -1,11 +1,7 @@ -import { - EboActorsManager, - EboProcessor, - NotificationService, - oracleAbi, - ProphetCodec, - ProtocolProvider, -} from "@ebo-agent/automated-dispute"; +import { ChildProcessWithoutNullStreams } from "child_process"; +import fs from "fs"; +import path from "path"; +import { oracleAbi, ProphetCodec, ProtocolProvider } from "@ebo-agent/automated-dispute"; import { RequestId } from "@ebo-agent/automated-dispute/dist/types/prophet.js"; import { BlockNumberService } from "@ebo-agent/blocknumber"; import { Caip2ChainId, Logger } from "@ebo-agent/shared"; @@ -27,7 +23,7 @@ import { WalletClient, } from "viem"; import { arbitrumSepolia } from "viem/chains"; -import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, test } from "vitest"; import type { DeployContractsOutput } from "../../utils/prophet-e2e-scaffold/index.js"; import { getCurrentEpoch, setEpochLength } from "../../utils/prophet-e2e-scaffold/epochManager.js"; @@ -38,6 +34,7 @@ import { setUpProphet, waitForEvent, } from "../../utils/prophet-e2e-scaffold/index.js"; +import { killAgent, spawnAgent } from "../../utils/prophet-e2e-scaffold/spawnAgent.js"; const E2E_SCENARIO_SETUP_TIMEOUT = 60_000; const E2E_TEST_TIMEOUT = 30_000; @@ -84,11 +81,24 @@ const newEventAbi = parseAbiItem( ); describe.sequential("single agent", () => { + const tmpConfigDir = path.join(__dirname, "/tmp"); + const tmpConfigFile = path.join(tmpConfigDir, "/config.yml"); + + let agent: ChildProcessWithoutNullStreams; + let l2ProtocolAnvil: CreateServerReturnType; let protocolContracts: DeployContractsOutput; let accounts: { privateKey: Hex; account: Account; walletClient: WalletClient }[]; + beforeAll(() => { + fs.mkdirSync(tmpConfigDir, { recursive: true }); + }); + + afterAll(() => { + fs.rmSync(tmpConfigDir, { recursive: true, force: true }); + }); + beforeEach(async () => { l2ProtocolAnvil = await createAnvilServer( PROTOCOL_L2_LOCAL_RPC_HOST, @@ -142,76 +152,14 @@ describe.sequential("single agent", () => { afterEach(async () => { await l2ProtocolAnvil.stop(); + + await killAgent({ + process: agent, + configPath: tmpConfigFile, + }); }); test.skip("basic flow", { timeout: E2E_TEST_TIMEOUT }, async () => { - const logger = Logger.getInstance(); - - const protocolProvider = new ProtocolProvider( - { - l1: { - chainId: PROTOCOL_L2_CHAIN_ID, - // Using the same RPC due to Anvil's arbitrum block number bug - urls: [PROTOCOL_L2_LOCAL_URL], - transactionReceiptConfirmations: 1, - timeout: 1_000, - retryInterval: 500, - }, - l2: { - chainId: PROTOCOL_L2_CHAIN_ID, - urls: [PROTOCOL_L2_LOCAL_URL], - transactionReceiptConfirmations: 1, - timeout: 1_000, - retryInterval: 500, - }, - }, - { - bondEscalationModule: protocolContracts["BondEscalationModule"], - eboRequestCreator: protocolContracts["EBORequestCreator"], - epochManager: EPOCH_MANAGER_ADDRESS, - oracle: protocolContracts["Oracle"], - horizonAccountingExtension: protocolContracts["HorizonAccountingExtension"], - }, - accounts[0].privateKey, - ); - - vi.spyOn(protocolProvider, "getAccountingApprovedModules").mockResolvedValue([ - protocolContracts["EBORequestModule"], - protocolContracts["BondedResponseModule"], - protocolContracts["BondEscalationModule"], - ]); - - const blockNumberService = new BlockNumberService( - new Map([[PROTOCOL_L2_CHAIN_ID, [PROTOCOL_L2_LOCAL_URL]]]), - { - baseUrl: new URL("http://not.needed/"), - bearerToken: "not.needed", - bearerTokenExpirationWindow: 1000, - servicePaths: { - block: "/block", - blockByTime: "/blockByTime", - }, - }, - logger, - ); - - const actorsManager = new EboActorsManager(); - - const processor = new EboProcessor( - { - requestModule: protocolContracts["EBORequestModule"], - responseModule: protocolContracts["BondedResponseModule"], - escalationModule: protocolContracts["BondEscalationModule"], - }, - protocolProvider, - blockNumberService, - actorsManager, - logger, - { - notifyError: vi.fn(), - } as unknown as NotificationService, - ); - const anvilClient = createTestClient({ mode: "anvil", account: GRT_HOLDER, @@ -235,7 +183,63 @@ describe.sequential("single agent", () => { epochManagerAddress: EPOCH_MANAGER_ADDRESS, }); - processor.start(3000); + agent = spawnAgent({ + configPath: tmpConfigFile, + config: { + protocolProvider: { + contracts: { + oracle: protocolContracts["Oracle"], + bondEscalationModule: protocolContracts["BondEscalationModule"], + eboRequestCreator: protocolContracts["EBORequestCreator"], + horizonAccountingExtension: protocolContracts["HorizonAccountingExtension"], + epochManager: EPOCH_MANAGER_ADDRESS, + }, + rpcsConfig: { + l1: { + chainId: PROTOCOL_L2_CHAIN_ID, + transactionReceiptConfirmations: 1, + timeout: 1_000, + retryInterval: 500, + }, + l2: { + chainId: PROTOCOL_L2_CHAIN_ID, + transactionReceiptConfirmations: 1, + timeout: 1_000, + retryInterval: 500, + }, + }, + }, + blockNumberService: { + blockmetaConfig: { + baseUrl: new URL("http://not.needed/"), + bearerTokenExpirationWindow: 1000, + servicePaths: { + block: "/block", + blockByTime: "/blockByTime", + }, + }, + }, + processor: { + accountingModules: { + responseModule: protocolContracts["BondedResponseModule"], + escalationModule: protocolContracts["BondEscalationModule"], + }, + msBetweenChecks: 3000, + }, + }, + env: { + PROTOCOL_PROVIDER_PRIVATE_KEY: accounts[0].privateKey, + PROTOCOL_PROVIDER_L1_RPC_URLS: [PROTOCOL_L2_LOCAL_URL], + // Using the same RPC due to Anvil's arbitrum block number bug + PROTOCOL_PROVIDER_L2_RPC_URLS: [PROTOCOL_L2_LOCAL_URL], + BLOCK_NUMBER_BLOCKMETA_TOKEN: "not.needed", + BLOCK_NUMBER_RPC_URLS_MAP: new Map([ + [PROTOCOL_L2_CHAIN_ID, [PROTOCOL_L2_LOCAL_URL]], + ]), + DISCORD_BOT_TOKEN: "", + DISCORD_CHANNEL_ID: "", + }, + }); const requestCreatedAbi = getAbiItem({ abi: oracleAbi, name: "RequestCreated" }); @@ -265,7 +269,10 @@ describe.sequential("single agent", () => { expect(requestCreatedEvent).toBeDefined(); - const responseProposedAbi = getAbiItem({ abi: oracleAbi, name: "ResponseProposed" }); + const responseProposedAbi = getAbiItem({ + abi: oracleAbi, + name: "ResponseProposed", + }); const responseProposedEvent = await waitForEvent({ client: anvilClient, @@ -352,6 +359,20 @@ describe.sequential("single agent", () => { test.skip("dispute response and propose a new one", { timeout: E2E_TEST_TIMEOUT }, async () => { const logger = Logger.getInstance(); + const blockNumberService = new BlockNumberService( + new Map([[PROTOCOL_L2_CHAIN_ID, [PROTOCOL_L2_LOCAL_URL]]]), + { + baseUrl: new URL("http://not.needed/"), + bearerToken: "not.needed", + bearerTokenExpirationWindow: 1000, + servicePaths: { + block: "/block", + blockByTime: "/blockByTime", + }, + }, + logger, + ); + const protocolProvider = new ProtocolProvider( { l1: { @@ -378,43 +399,7 @@ describe.sequential("single agent", () => { horizonAccountingExtension: protocolContracts["HorizonAccountingExtension"], }, accounts[0].privateKey, - ); - - vi.spyOn(protocolProvider, "getAccountingApprovedModules").mockResolvedValue([ - protocolContracts["EBORequestModule"], - protocolContracts["BondedResponseModule"], - protocolContracts["BondEscalationModule"], - ]); - - const blockNumberService = new BlockNumberService( - new Map([[PROTOCOL_L2_CHAIN_ID, [PROTOCOL_L2_LOCAL_URL]]]), - { - baseUrl: new URL("http://not.needed/"), - bearerToken: "not.needed", - bearerTokenExpirationWindow: 1000, - servicePaths: { - block: "/block", - blockByTime: "/blockByTime", - }, - }, - logger, - ); - - const actorsManager = new EboActorsManager(); - - const processor = new EboProcessor( - { - requestModule: protocolContracts["EBORequestModule"], - responseModule: protocolContracts["BondedResponseModule"], - escalationModule: protocolContracts["BondEscalationModule"], - }, - protocolProvider, blockNumberService, - actorsManager, - logger, - { - notifyError: vi.fn(), - } as unknown as NotificationService, ); const anvilClient = createTestClient({ @@ -476,7 +461,63 @@ describe.sequential("single agent", () => { blockTimeout: initBlock + 1000n, }); - processor.start(3000); + agent = spawnAgent({ + configPath: tmpConfigFile, + config: { + protocolProvider: { + contracts: { + oracle: protocolContracts["Oracle"], + bondEscalationModule: protocolContracts["BondEscalationModule"], + eboRequestCreator: protocolContracts["EBORequestCreator"], + horizonAccountingExtension: protocolContracts["HorizonAccountingExtension"], + epochManager: EPOCH_MANAGER_ADDRESS, + }, + rpcsConfig: { + l1: { + chainId: PROTOCOL_L2_CHAIN_ID, + transactionReceiptConfirmations: 1, + timeout: 1_000, + retryInterval: 500, + }, + l2: { + chainId: PROTOCOL_L2_CHAIN_ID, + transactionReceiptConfirmations: 1, + timeout: 1_000, + retryInterval: 500, + }, + }, + }, + blockNumberService: { + blockmetaConfig: { + baseUrl: new URL("http://not.needed/"), + bearerTokenExpirationWindow: 1000, + servicePaths: { + block: "/block", + blockByTime: "/blockByTime", + }, + }, + }, + processor: { + accountingModules: { + responseModule: protocolContracts["BondedResponseModule"], + escalationModule: protocolContracts["BondEscalationModule"], + }, + msBetweenChecks: 3000, + }, + }, + env: { + PROTOCOL_PROVIDER_PRIVATE_KEY: accounts[0].privateKey, + PROTOCOL_PROVIDER_L1_RPC_URLS: [PROTOCOL_L2_LOCAL_URL], + // Using the same RPC due to Anvil's arbitrum block number bug + PROTOCOL_PROVIDER_L2_RPC_URLS: [PROTOCOL_L2_LOCAL_URL], + BLOCK_NUMBER_BLOCKMETA_TOKEN: "not.needed", + BLOCK_NUMBER_RPC_URLS_MAP: new Map([ + [PROTOCOL_L2_CHAIN_ID, [PROTOCOL_L2_LOCAL_URL]], + ]), + DISCORD_BOT_TOKEN: "", + DISCORD_CHANNEL_ID: "", + }, + }); const badResponseDisputedEvent = await waitForEvent({ client: anvilClient, diff --git a/apps/agent/test/e2e/utils/prophet-e2e-scaffold/spawnAgent.ts b/apps/agent/test/e2e/utils/prophet-e2e-scaffold/spawnAgent.ts new file mode 100644 index 00000000..11878b6a --- /dev/null +++ b/apps/agent/test/e2e/utils/prophet-e2e-scaffold/spawnAgent.ts @@ -0,0 +1,71 @@ +import { ChildProcessWithoutNullStreams, spawn } from "child_process"; +import fs from "fs"; +import yaml from "yaml"; + +import { eboAgentConfigSchema, envSchema } from "../../../../src/config/schemas.js"; + +type AgentConfig = Zod.infer; +type AgentEnv = Omit, "EBO_AGENT_CONFIG_FILE_PATH">; + +export const spawnAgent = (params: { config: AgentConfig; configPath: string; env: AgentEnv }) => { + const { config, configPath, env } = params; + + generateConfigFile(configPath, config); + + console.log("Spawning node"); + + const blockNumberUrls = JSON.stringify( + Object.fromEntries(env.BLOCK_NUMBER_RPC_URLS_MAP.entries()), + ); + + const agentProcess = spawn("pnpm", ["start"], { + env: { + ...process.env, + ...env, + PROTOCOL_PROVIDER_L1_RPC_URLS: env.PROTOCOL_PROVIDER_L1_RPC_URLS.toString(), + PROTOCOL_PROVIDER_L2_RPC_URLS: env.PROTOCOL_PROVIDER_L2_RPC_URLS.toString(), + BLOCK_NUMBER_RPC_URLS_MAP: blockNumberUrls, + EBO_AGENT_CONFIG_FILE_PATH: configPath, + }, + }); + + agentProcess.on("error", (data) => { + console.error(data); + }); + + agentProcess.stdout.on("data", (data) => { + console.log(`data: ${data}`); + }); + + agentProcess.stderr.on("data", (data) => { + console.log(`data: ${data}`); + }); + + return agentProcess; +}; + +export const killAgent = (params: { + process: ChildProcessWithoutNullStreams; + configPath: string; +}) => { + const { process, configPath } = params; + + if (process) { + // Need to clear process channel to actually kill it + process.stdin.end(); + process.stdout.destroy(); + process.stderr.destroy(); + + process.kill("SIGKILL"); + } + + fs.rmSync(configPath); + + console.log("Agent killed"); +}; + +const generateConfigFile = (path: string, config: AgentConfig) => { + const content = yaml.stringify(config); + + fs.writeFileSync(path, content, "utf-8"); +}; diff --git a/packages/automated-dispute/src/services/discordNotifier.ts b/packages/automated-dispute/src/services/discordNotifier.ts index 44286271..d274e882 100644 --- a/packages/automated-dispute/src/services/discordNotifier.ts +++ b/packages/automated-dispute/src/services/discordNotifier.ts @@ -41,14 +41,21 @@ export class DiscordNotifier implements NotificationService { try { await client.login(config.discordBotToken); - await new Promise((resolve) => { + + await new Promise((resolve, reject) => { client.once("ready", () => { logger.info("Discord bot is ready"); + resolve(); }); + + client.once("error", (error: Error) => { + reject(error); + }); }); } catch (error) { - logger.error(`FFailed to initialize Discord notifier: ${error}`); + logger.error(`Failed to initialize Discord notifier: ${error}`); + throw error; } diff --git a/packages/automated-dispute/src/services/eboActor.ts b/packages/automated-dispute/src/services/eboActor.ts index db2c4d36..3016c696 100644 --- a/packages/automated-dispute/src/services/eboActor.ts +++ b/packages/automated-dispute/src/services/eboActor.ts @@ -311,7 +311,13 @@ export class EboActor { const request = this.getActorRequest(); const disputes: Dispute[] = this.getActiveDisputes(); - const settledDisputes = disputes.map(async (dispute) => { + const settleableDisputes = disputes.filter((dispute) => + this.canBeSettled(request, dispute, atTimestamp), + ); + + this.logger.info(`Settling ${settleableDisputes.length} disputes...`); + + for (const dispute of settleableDisputes) { const responseId = dispute.prophetData.responseId; const response = this.registry.getResponse(responseId); @@ -324,13 +330,8 @@ export class EboActor { throw new DisputeWithoutResponse(dispute); } - if (this.canBeSettled(request, dispute, atTimestamp)) { - await this.settleDispute(request, response, dispute); - } - }); - - // Any of the disputes not being handled correctly should make the actor fail - await Promise.all(settledDisputes); + await this.settleDispute(request, response, dispute); + } } /** diff --git a/packages/automated-dispute/tests/services/eboActor/onLastBlockupdated.spec.ts b/packages/automated-dispute/tests/services/eboActor/onLastBlockupdated.spec.ts index 22130775..70eeaad3 100644 --- a/packages/automated-dispute/tests/services/eboActor/onLastBlockupdated.spec.ts +++ b/packages/automated-dispute/tests/services/eboActor/onLastBlockupdated.spec.ts @@ -138,6 +138,7 @@ describe("EboActor", () => { // Skipping finalize flow with this mock vi.spyOn(registry, "getResponses").mockReturnValue([]); vi.spyOn(registry, "getDisputes").mockReturnValue([dispute]); + actor["canBeSettled"] = vi.fn().mockReturnValue(true); const newBlockNumber = disputeDeadline + 1n; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index ec19c872..c1834a7c 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -99,6 +99,9 @@ importers: prool: specifier: 0.0.16 version: 0.0.16 + tsx: + specifier: 4.19.1 + version: 4.19.1 vitest: specifier: 2.0.3 version: 2.0.3(@types/node@20.14.12)