From 554605673c54771199bf1a2c28cc566e4785a493 Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Thu, 26 Dec 2024 13:18:52 -0800 Subject: [PATCH 1/8] common: tweak timer logging --- packages/indexer-common/src/sequential-timer.ts | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/packages/indexer-common/src/sequential-timer.ts b/packages/indexer-common/src/sequential-timer.ts index 701cf36fa..9a50d4835 100644 --- a/packages/indexer-common/src/sequential-timer.ts +++ b/packages/indexer-common/src/sequential-timer.ts @@ -24,15 +24,16 @@ function logWorkTime( caller: string | undefined, milliseconds: number, ) { - const workTimeWarningThreshold = 1000 + const workTimeWarningThreshold = 5000 const workTime = Date.now() - workStarted - logger.debug(`timer loop took ${loopTime}ms workTime ${workTime} caller(${caller})`) - if (loopTime > milliseconds + workTimeWarningThreshold) { + if (workTime > milliseconds + workTimeWarningThreshold) { logger.warn( - 'timer work took longer than the sequential timer was configured for (+1s)', + 'timer work took longer than the sequential timer was configured for (>5s)', { loopTime, + workTime, milliseconds, + caller, }, ) } From c7b5a5c8e5e0e85fa6f7cd0d11fd3b96086034e5 Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Thu, 26 Dec 2024 14:47:39 -0800 Subject: [PATCH 2/8] common: make scalar optional and turned off when tap is enabled --- .../src/indexer-management/allocations.ts | 31 ++++++++++------- .../resolvers/allocations.ts | 26 +++++++++------ packages/indexer-common/src/network.ts | 33 ++++++++++++------- 3 files changed, 57 insertions(+), 33 deletions(-) diff --git a/packages/indexer-common/src/indexer-management/allocations.ts b/packages/indexer-common/src/indexer-management/allocations.ts index d892d14e1..797c54476 100644 --- a/packages/indexer-common/src/indexer-management/allocations.ts +++ b/packages/indexer-common/src/indexer-management/allocations.ts @@ -484,8 +484,9 @@ export class AllocationManager { epoch: createAllocationEventLogs.epoch.toString(), }) + // TODO: deprecated // Remember allocation - await this.network.receiptCollector.rememberAllocations(actionID, [ + await this.network.receiptCollector?.rememberAllocations(actionID, [ createAllocationEventLogs.allocationID, ]) @@ -638,12 +639,16 @@ export class AllocationManager { logger.info('Identifying receipts worth collecting', { allocation: closeAllocationEventLogs.allocationID, }) + let isCollectingQueryFees = false const allocation = await this.network.networkMonitor.allocation(allocationID) - // Collect query fees for this allocation - const isCollectingQueryFees = await this.network.receiptCollector.collectReceipts( - actionID, - allocation, - ) + if (this.network.receiptCollector) { + // TODO: deprecated + // Collect query fees for this allocation + isCollectingQueryFees = await this.network.receiptCollector.collectReceipts( + actionID, + allocation, + ) + } // Upsert a rule so the agent keeps the deployment synced but doesn't allocate to it logger.debug( @@ -925,11 +930,15 @@ export class AllocationManager { try { allocation = await this.network.networkMonitor.allocation(allocationID) // Collect query fees for this allocation - isCollectingQueryFees = await this.network.receiptCollector.collectReceipts( - actionID, - allocation, - ) - logger.debug('Finished receipt collection') + + // TODO: deprecated + if (this.network.receiptCollector) { + isCollectingQueryFees = await this.network.receiptCollector.collectReceipts( + actionID, + allocation, + ) + logger.debug('Finished receipt collection') + } } catch (err) { logger.error('Failed to collect receipts', { err, diff --git a/packages/indexer-common/src/indexer-management/resolvers/allocations.ts b/packages/indexer-common/src/indexer-management/resolvers/allocations.ts index 385c9a424..8a6b58a39 100644 --- a/packages/indexer-common/src/indexer-management/resolvers/allocations.ts +++ b/packages/indexer-common/src/indexer-management/resolvers/allocations.ts @@ -711,11 +711,12 @@ export default { allocation: closeAllocationEventLogs.allocationID, }) + // TODO: deprecated // Collect query fees for this allocation - const isCollectingQueryFees = await receiptCollector.collectReceipts( - 0, - allocationData, - ) + let isCollectingQueryFees = false + if (receiptCollector) { + isCollectingQueryFees = await receiptCollector.collectReceipts(0, allocationData) + } logger.debug( `Updating indexing rules, so indexer-agent keeps the deployment synced but doesn't reallocate to it`, @@ -1016,11 +1017,12 @@ export default { allocation: closeAllocationEventLogs.allocationID, }) - // Collect query fees for this allocation - const isCollectingQueryFees = await receiptCollector.collectReceipts( - 0, - allocationData, - ) + // TODO: deprecated + let isCollectingQueryFees = false + if (receiptCollector) { + // Collect query fees for this allocation + isCollectingQueryFees = await receiptCollector.collectReceipts(0, allocationData) + } logger.debug( `Updating indexing rules, so indexer-agent will now manage the active allocation`, @@ -1061,6 +1063,7 @@ export default { } }, + // TODO: deprecated submitCollectReceiptsJob: async ( { allocation, @@ -1092,7 +1095,10 @@ export default { }) // Collect query fees for this allocation - const collecting = await receiptCollector.collectReceipts(0, allocationData) + let collecting = false + if (receiptCollector) { + collecting = await receiptCollector.collectReceipts(0, allocationData) + } logger.info(`Submitted allocation receipt collection job for execution`, { allocationID: allocation, diff --git a/packages/indexer-common/src/network.ts b/packages/indexer-common/src/network.ts index d955c494d..13b5d9ff8 100644 --- a/packages/indexer-common/src/network.ts +++ b/packages/indexer-common/src/network.ts @@ -47,7 +47,10 @@ export class Network { networkProvider: providers.StaticJsonRpcProvider transactionManager: TransactionManager networkMonitor: NetworkMonitor - receiptCollector: AllocationReceiptCollector + + // TODO: deprecated + receiptCollector: AllocationReceiptCollector | undefined + tapCollector: TapCollector | undefined specification: spec.NetworkSpecification paused: Eventual @@ -61,7 +64,7 @@ export class Network { networkProvider: providers.StaticJsonRpcProvider, transactionManager: TransactionManager, networkMonitor: NetworkMonitor, - receiptCollector: AllocationReceiptCollector, + receiptCollector: AllocationReceiptCollector | undefined, tapCollector: TapCollector | undefined, specification: spec.NetworkSpecification, paused: Eventual, @@ -272,16 +275,22 @@ export class Network { // -------------------------------------------------------------------------------- // * Allocation Receipt Collector // -------------------------------------------------------------------------------- - const scalarCollector = await AllocationReceiptCollector.create({ - logger, - metrics, - transactionManager: transactionManager, - models: queryFeeModels, - allocationExchange: contracts.allocationExchange, - allocations, - networkSpecification: specification, - networkSubgraph, - }) + let scalarCollector: AllocationReceiptCollector | undefined = undefined + if (!(tapContracts && tapSubgraph)) { + logger.warn( + "deprecated scalar voucher collector is enabled - you probably don't want this", + ) + scalarCollector = await AllocationReceiptCollector.create({ + logger, + metrics, + transactionManager: transactionManager, + models: queryFeeModels, + allocationExchange: contracts.allocationExchange, + allocations, + networkSpecification: specification, + networkSubgraph, + }) + } // -------------------------------------------------------------------------------- // * TAP Collector From 77a412b5f7ec04d3ff9698f0758925ca3b0c5bcc Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Thu, 26 Dec 2024 15:24:15 -0800 Subject: [PATCH 3/8] common: fix RAV component log name --- packages/indexer-common/src/allocations/tap-collector.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/indexer-common/src/allocations/tap-collector.ts b/packages/indexer-common/src/allocations/tap-collector.ts index 00b8a930a..90c4f7ca6 100644 --- a/packages/indexer-common/src/allocations/tap-collector.ts +++ b/packages/indexer-common/src/allocations/tap-collector.ts @@ -127,7 +127,7 @@ export class TapCollector { networkSubgraph, }: TapCollectorOptions): TapCollector { const collector = new TapCollector() - collector.logger = logger.child({ component: 'AllocationReceiptCollector' }) + collector.logger = logger.child({ component: 'TapCollector' }) collector.metrics = registerReceiptMetrics( metrics, networkSpecification.networkIdentifier, From e37c35bd1441993aa8de469442c59b8b68f5d6ae Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Thu, 26 Dec 2024 16:38:35 -0800 Subject: [PATCH 4/8] common: drive getPendingRAVs from allocations eventual instead of re-querying, disable jsonify of allocations --- .../src/allocations/tap-collector.ts | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/packages/indexer-common/src/allocations/tap-collector.ts b/packages/indexer-common/src/allocations/tap-collector.ts index 90c4f7ca6..e1561577f 100644 --- a/packages/indexer-common/src/allocations/tap-collector.ts +++ b/packages/indexer-common/src/allocations/tap-collector.ts @@ -21,7 +21,6 @@ import { allocationSigner, tapAllocationIdProof, parseGraphQLAllocation, - sequentialTimerMap, } from '..' import { BigNumber } from 'ethers' import pReduce from 'p-reduce' @@ -184,12 +183,8 @@ export class TapCollector { } private getPendingRAVs(): Eventual { - return sequentialTimerMap( - { - logger: this.logger, - milliseconds: RAV_CHECK_INTERVAL_MS, - }, - async () => { + return this.allocations.throttle(RAV_CHECK_INTERVAL_MS).tryMap( + async (allocations) => { let ravs = await this.pendingRAVs() if (ravs.length === 0) { this.logger.info(`No pending RAVs to process`) @@ -198,10 +193,10 @@ export class TapCollector { if (ravs.length > 0) { ravs = await this.filterAndUpdateRavs(ravs) } - const allocations: Allocation[] = await this.getAllocationsfromAllocationIds(ravs) - this.logger.info( - `Retrieved allocations for pending RAVs \n: ${JSON.stringify(allocations)}`, - ) + this.logger.debug(`matching allocations for pending ravs`, { + allocationCount: allocations.length, + ravCount: ravs.length, + }) return ravs .map((rav) => { const signedRav = rav.getSignedRAV() From 27ab3e000eac2eca0a07925120ee1ce45db3178b Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Fri, 27 Dec 2024 10:35:11 -0800 Subject: [PATCH 5/8] common: make monitorQueue sequential, promise-guard concurrent executeApprovedActions, add logging --- .../src/allocations/tap-collector.ts | 16 +++++--- .../src/indexer-management/actions.ts | 30 ++++++++++++++- .../src/indexer-management/allocations.ts | 2 +- .../indexer-common/src/sequential-timer.ts | 38 ++++++++----------- 4 files changed, 55 insertions(+), 31 deletions(-) diff --git a/packages/indexer-common/src/allocations/tap-collector.ts b/packages/indexer-common/src/allocations/tap-collector.ts index e1561577f..500694b30 100644 --- a/packages/indexer-common/src/allocations/tap-collector.ts +++ b/packages/indexer-common/src/allocations/tap-collector.ts @@ -21,6 +21,7 @@ import { allocationSigner, tapAllocationIdProof, parseGraphQLAllocation, + sequentialTimerMap, } from '..' import { BigNumber } from 'ethers' import pReduce from 'p-reduce' @@ -183,8 +184,12 @@ export class TapCollector { } private getPendingRAVs(): Eventual { - return this.allocations.throttle(RAV_CHECK_INTERVAL_MS).tryMap( - async (allocations) => { + return sequentialTimerMap( + { + logger: this.logger, + milliseconds: RAV_CHECK_INTERVAL_MS, + }, + async () => { let ravs = await this.pendingRAVs() if (ravs.length === 0) { this.logger.info(`No pending RAVs to process`) @@ -193,9 +198,10 @@ export class TapCollector { if (ravs.length > 0) { ravs = await this.filterAndUpdateRavs(ravs) } - this.logger.debug(`matching allocations for pending ravs`, { - allocationCount: allocations.length, - ravCount: ravs.length, + const allocations: Allocation[] = await this.getAllocationsfromAllocationIds(ravs) + this.logger.info(`Retrieved allocations for pending RAVs`, { + ravs: ravs.length, + allocations: allocations.length, }) return ravs .map((rav) => { diff --git a/packages/indexer-common/src/indexer-management/actions.ts b/packages/indexer-common/src/indexer-management/actions.ts index 3eb56a44b..95a9e668c 100644 --- a/packages/indexer-common/src/indexer-management/actions.ts +++ b/packages/indexer-common/src/indexer-management/actions.ts @@ -31,6 +31,8 @@ export class ActionManager { declare models: IndexerManagementModels declare allocationManagers: NetworkMapped + executeBatchActionsPromise: Promise | undefined + static async create( multiNetworks: MultiNetworks, logger: Logger, @@ -228,7 +230,19 @@ export class ActionManager { return updatedActions } + // a promise guard to ensure that only one batch of actions is executed at a time async executeApprovedActions(network: Network): Promise { + if (this.executeBatchActionsPromise) { + this.logger.warn('Previous batch action execution is still in progress') + return this.executeBatchActionsPromise + } + this.executeBatchActionsPromise = this.executeApprovedActionsInner(network) + const updatedActions = await this.executeBatchActionsPromise + this.executeBatchActionsPromise = undefined + return updatedActions + } + + async executeApprovedActionsInner(network: Network): Promise { let updatedActions: Action[] = [] const protocolNetwork = network.specification.networkIdentifier const logger = this.logger.child({ @@ -236,11 +250,12 @@ export class ActionManager { protocolNetwork, }) - logger.trace('Begin database transaction for executing approved actions') + logger.debug('Begin database transaction for executing approved actions') // eslint-disable-next-line @typescript-eslint/no-non-null-assertion await this.models.Action.sequelize!.transaction( { isolationLevel: Transaction.ISOLATION_LEVELS.SERIALIZABLE }, async (transaction) => { + const transactionOpenTime = Date.now() let approvedActions try { // Execute already approved actions in the order of type and priority. @@ -276,6 +291,11 @@ export class ActionManager { return [] } try { + logger.debug('Executing batch action', { + approvedActions, + startTimeMs: Date.now() - transactionOpenTime, + }) + // This will return all results if successful, if failed it will return the failed actions const allocationManager = this.allocationManagers[network.specification.networkIdentifier] @@ -283,15 +303,21 @@ export class ActionManager { logger.debug('Completed batch action execution', { results, + endTimeMs: Date.now() - transactionOpenTime, }) updatedActions = await this.updateActionStatuses(results, transaction) + + logger.debug('Updated action statuses', { + updatedActions, + updatedTimeMs: Date.now() - transactionOpenTime, + }) } catch (error) { logger.error(`Failed to execute batch tx on staking contract: ${error}`) throw indexerError(IndexerErrorCode.IE072, error) } }, ) - logger.trace('End database transaction for executing approved actions') + logger.debug('End database transaction for executing approved actions') return updatedActions } diff --git a/packages/indexer-common/src/indexer-management/allocations.ts b/packages/indexer-common/src/indexer-management/allocations.ts index 797c54476..f556224b3 100644 --- a/packages/indexer-common/src/indexer-management/allocations.ts +++ b/packages/indexer-common/src/indexer-management/allocations.ts @@ -116,7 +116,7 @@ export class AllocationManager { return await this.confirmTransactions(result, actions) } - async executeTransactions(actions: Action[]): Promise { + private async executeTransactions(actions: Action[]): Promise { const logger = this.logger.child({ function: 'executeTransactions' }) logger.trace('Begin executing transactions', { actions }) if (actions.length < 1) { diff --git a/packages/indexer-common/src/sequential-timer.ts b/packages/indexer-common/src/sequential-timer.ts index 9a50d4835..6d43a94af 100644 --- a/packages/indexer-common/src/sequential-timer.ts +++ b/packages/indexer-common/src/sequential-timer.ts @@ -20,7 +20,6 @@ export interface TimerTaskContext { function logWorkTime( workStarted: number, logger: Logger, - loopTime: number, caller: string | undefined, milliseconds: number, ) { @@ -28,9 +27,10 @@ function logWorkTime( const workTime = Date.now() - workStarted if (workTime > milliseconds + workTimeWarningThreshold) { logger.warn( - 'timer work took longer than the sequential timer was configured for (>5s)', + `timer work took ${ + (workTime - milliseconds) / 1000 + }s longer than expected, next execution in ${milliseconds / 1000}s`, { - loopTime, workTime, milliseconds, caller, @@ -57,7 +57,6 @@ export function sequentialTimerReduce( // obtain the calling method name from the call stack const stack = new Error().stack const caller = stack?.split('\n')[2].trim() - let lastWorkStarted = Date.now() let acc: U = initial let previousT: T | undefined @@ -74,26 +73,23 @@ export function sequentialTimerReduce( function work() { const workStarted = Date.now() const promiseOrT = reducer(acc, workStarted) - const loopTime = workStarted - lastWorkStarted - - lastWorkStarted = workStarted if (isPromiseLike(promiseOrT)) { promiseOrT.then( function onfulfilled(value) { outputReduce(value) - logWorkTime(workStarted, logger, loopTime, caller, milliseconds) - setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted))) + logWorkTime(workStarted, logger, caller, milliseconds) + setTimeout(work, milliseconds) }, function onrejected(err) { console.error(err) - logWorkTime(workStarted, logger, loopTime, caller, milliseconds) - setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted))) + logWorkTime(workStarted, logger, caller, milliseconds) + setTimeout(work, milliseconds) }, ) } else { outputReduce(promiseOrT) - logWorkTime(workStarted, logger, loopTime, caller, milliseconds) - setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted))) + logWorkTime(workStarted, logger, caller, milliseconds) + setTimeout(work, milliseconds) } } // initial call @@ -118,8 +114,6 @@ export function sequentialTimerMap( // obtain the calling method name from the call stack const stack = new Error().stack const caller = stack?.split('\n')[2].trim() - let lastWorkStarted = Date.now() - const output = mutable() let latestU: U | undefined @@ -135,27 +129,25 @@ export function sequentialTimerMap( function work() { const workStarted = Date.now() const promiseOrU = mapper(workStarted) - const loopTime = workStarted - lastWorkStarted - lastWorkStarted = workStarted if (isPromiseLike(promiseOrU)) { promiseOrU.then( function onfulfilled(value) { checkMappedValue(value) - logWorkTime(workStarted, logger, loopTime, caller, milliseconds) - setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted))) + logWorkTime(workStarted, logger, caller, milliseconds) + setTimeout(work, milliseconds) }, function onrejected(err) { options?.onError(err) - logWorkTime(workStarted, logger, loopTime, caller, milliseconds) - setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted))) + logWorkTime(workStarted, logger, caller, milliseconds) + setTimeout(work, milliseconds) }, ) } else { // resolved value checkMappedValue(promiseOrU) - logWorkTime(workStarted, logger, loopTime, caller, milliseconds) - setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted))) + logWorkTime(workStarted, logger, caller, milliseconds) + setTimeout(work, milliseconds) } } From a8037f2f456203abd775db8bde30d39e00b90e6c Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Fri, 27 Dec 2024 12:03:25 -0800 Subject: [PATCH 6/8] common: limit graph-node status query to actions' deploymentIDs --- packages/indexer-common/src/graph-node.ts | 113 +++++++++++++++++- .../src/indexer-management/actions.ts | 11 +- .../src/indexer-management/allocations.ts | 8 +- 3 files changed, 122 insertions(+), 10 deletions(-) diff --git a/packages/indexer-common/src/graph-node.ts b/packages/indexer-common/src/graph-node.ts index 9e47d6616..7f2c57971 100644 --- a/packages/indexer-common/src/graph-node.ts +++ b/packages/indexer-common/src/graph-node.ts @@ -12,6 +12,7 @@ import { BlockPointer, ChainIndexingStatus, IndexingStatus } from './types' import pRetry, { Options } from 'p-retry' import axios, { AxiosInstance } from 'axios' import fetch from 'isomorphic-fetch' +import { Action } from './indexer-management' interface indexNode { id: string @@ -103,9 +104,15 @@ export class GraphNode { await pRetry( async () => { const deployments = await this.subgraphDeployments() - this.logger.info(`Successfully connected to indexing status API`, { - currentDeployments: deployments.map((deployment) => deployment.display), - }) + if (deployments.length < 100) { + this.logger.info(`Successfully connected to indexing status API`, { + currentDeployments: deployments.map((deployment) => deployment.display), + }) + } else { + this.logger.info(`Successfully connected to indexing status API`, { + currentDeploymentCount: deployments.length, + }) + } }, { retries: 10, @@ -148,10 +155,98 @@ export class GraphNode { ) } + public async subgraphDeploymentAssignmentsForAllocateActions( + subgraphStatus: SubgraphStatus, + actions: Action[], + ): Promise { + const deploymentIDs = actions.map((action) => action.deploymentID) + + const nodeOnlyResult = await this.status + .query( + gql` + query indexingStatuses($subgraphs: [String!]!) { + indexingStatuses(subgraphs: $subgraphs) { + subgraphDeployment: subgraph + node + } + } + `, + { subgraphs: deploymentIDs }, + ) + .toPromise() + + if (nodeOnlyResult.error) { + throw nodeOnlyResult.error + } + + const withAssignments: string[] = nodeOnlyResult.data.indexingStatuses + .filter( + (result: { node: string | null }) => + result.node !== null && result.node !== undefined, + ) + .map((result: { subgraphDeployment: string }) => result.subgraphDeployment) + + const result = await this.status + .query( + gql` + query indexingStatuses($subgraphs: [String!]!) { + indexingStatuses(subgraphs: $subgraphs) { + subgraphDeployment: subgraph + node + paused + } + } + `, + { subgraphs: withAssignments }, + ) + .toPromise() + + if (result.error) { + throw result.error + } + + if (!result.data.indexingStatuses || result.data.length === 0) { + this.logger.warn(`No 'indexingStatuses' data returned from index nodes`, { + data: result.data, + }) + return [] + } + + type QueryResult = { + subgraphDeployment: string + node: string | undefined + paused: boolean | undefined + } + + const results = result.data.indexingStatuses + .filter((status: QueryResult) => { + if (subgraphStatus === SubgraphStatus.ACTIVE) { + return ( + status.paused === false || + (status.paused === undefined && status.node !== 'removed') + ) + } else if (subgraphStatus === SubgraphStatus.PAUSED) { + return status.node === 'removed' || status.paused === true + } else if (subgraphStatus === SubgraphStatus.ALL) { + return true + } + }) + .map((status: QueryResult) => { + return { + id: new SubgraphDeploymentID(status.subgraphDeployment), + node: status.node, + paused: status.paused ?? status.node === 'removed', + } + }) + + return results + } + public async subgraphDeploymentsAssignments( subgraphStatus: SubgraphStatus, ): Promise { try { + const startTimeMs = Date.now() this.logger.debug('Fetch subgraph deployment assignments') // FIXME: remove this initial check for just node when graph-node releases @@ -170,6 +265,10 @@ export class GraphNode { ) .toPromise() + this.logger.debug( + `Fetch subgraph deployment assignments took ${Date.now() - startTimeMs}ms`, + ) + if (nodeOnlyResult.error) { throw nodeOnlyResult.error } @@ -214,7 +313,7 @@ export class GraphNode { paused: boolean | undefined } - return result.data.indexingStatuses + const results = result.data.indexingStatuses .filter((status: QueryResult) => { if (subgraphStatus === SubgraphStatus.ACTIVE) { return ( @@ -234,6 +333,12 @@ export class GraphNode { paused: status.paused ?? status.node === 'removed', } }) + this.logger.debug( + `Fetching mapped subgraph deployment ${results.length} assignments took ${ + Date.now() - startTimeMs + }ms`, + ) + return results } catch (error) { const err = indexerError(IndexerErrorCode.IE018, error) this.logger.error(`Failed to query indexing status API`, { err }) diff --git a/packages/indexer-common/src/indexer-management/actions.ts b/packages/indexer-common/src/indexer-management/actions.ts index 95a9e668c..8f806cce5 100644 --- a/packages/indexer-common/src/indexer-management/actions.ts +++ b/packages/indexer-common/src/indexer-management/actions.ts @@ -236,9 +236,14 @@ export class ActionManager { this.logger.warn('Previous batch action execution is still in progress') return this.executeBatchActionsPromise } - this.executeBatchActionsPromise = this.executeApprovedActionsInner(network) - const updatedActions = await this.executeBatchActionsPromise - this.executeBatchActionsPromise = undefined + + let updatedActions: Action[] = [] + try { + this.executeBatchActionsPromise = this.executeApprovedActionsInner(network) + updatedActions = await this.executeBatchActionsPromise + } finally { + this.executeBatchActionsPromise = undefined + } return updatedActions } diff --git a/packages/indexer-common/src/indexer-management/allocations.ts b/packages/indexer-common/src/indexer-management/allocations.ts index f556224b3..c30192152 100644 --- a/packages/indexer-common/src/indexer-management/allocations.ts +++ b/packages/indexer-common/src/indexer-management/allocations.ts @@ -322,9 +322,11 @@ export class AllocationManager { logger.info('Ensure subgraph deployments are deployed before we allocate to them', { allocateActions, }) - const currentAssignments = await this.graphNode.subgraphDeploymentsAssignments( - SubgraphStatus.ALL, - ) + const currentAssignments = + await this.graphNode.subgraphDeploymentAssignmentsForAllocateActions( + SubgraphStatus.ALL, + actions, + ) await pMap( allocateActions, async (action: Action) => From 9617638a03622ef18acf23dcd212c94df24994aa Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Fri, 27 Dec 2024 13:59:55 -0800 Subject: [PATCH 7/8] common: skip querying active deployments if in MANUAL or poiDisputeMonitoring is disabled --- packages/indexer-agent/src/agent.ts | 28 ++- packages/indexer-common/src/graph-node.ts | 207 ++++++++++-------- .../src/indexer-management/allocations.ts | 4 +- .../src/indexer-management/monitor.ts | 4 + 4 files changed, 148 insertions(+), 95 deletions(-) diff --git a/packages/indexer-agent/src/agent.ts b/packages/indexer-agent/src/agent.ts index 15166f345..c32812a90 100644 --- a/packages/indexer-agent/src/agent.ts +++ b/packages/indexer-agent/src/agent.ts @@ -324,12 +324,32 @@ export class Agent { }, ) + // Skip fetching active deployments if the deployment management mode is manual and POI tracking is disabled const activeDeployments: Eventual = sequentialTimerMap( - { logger, milliseconds: requestIntervalSmall }, - () => { - logger.trace('Fetching active deployments') - return this.graphNode.subgraphDeployments() + { logger, milliseconds: requestIntervalLarge }, + async () => { + const deployments = await this.multiNetworks.map( + async ({ network }) => { + if ( + this.deploymentManagement === DeploymentManagementMode.AUTO || + network.networkMonitor.poiDisputeMonitoringEnabled() + ) { + logger.trace('Fetching active deployments') + const assignments = + await this.graphNode.subgraphDeploymentsAssignments( + SubgraphStatus.ACTIVE, + ) + return assignments.map(assignment => assignment.id) + } else { + logger.info( + "Skipping fetching active deployments fetch since DeploymentManagementMode = 'manual' and POI tracking is disabled", + ) + return [] + } + }, + ) + return deployments.values }, { onError: error => diff --git a/packages/indexer-common/src/graph-node.ts b/packages/indexer-common/src/graph-node.ts index 7f2c57971..494247e44 100644 --- a/packages/indexer-common/src/graph-node.ts +++ b/packages/indexer-common/src/graph-node.ts @@ -12,7 +12,6 @@ import { BlockPointer, ChainIndexingStatus, IndexingStatus } from './types' import pRetry, { Options } from 'p-retry' import axios, { AxiosInstance } from 'axios' import fetch from 'isomorphic-fetch' -import { Action } from './indexer-management' interface indexNode { id: string @@ -103,15 +102,11 @@ export class GraphNode { this.logger.info(`Check if indexing status API is available`) await pRetry( async () => { - const deployments = await this.subgraphDeployments() - if (deployments.length < 100) { - this.logger.info(`Successfully connected to indexing status API`, { - currentDeployments: deployments.map((deployment) => deployment.display), - }) + if (await this.statusEndpointConnected()) { + this.logger.info(`Successfully connected to indexing status API`, {}) } else { - this.logger.info(`Successfully connected to indexing status API`, { - currentDeploymentCount: deployments.length, - }) + this.logger.error(`Failed to connect to indexing status API`) + throw new Error('Indexing status API not available') } }, { @@ -149,97 +144,120 @@ export class GraphNode { return new URL(deploymentIpfsHash, this.queryBaseURL).toString() } - public async subgraphDeployments(): Promise { - return (await this.subgraphDeploymentsAssignments(SubgraphStatus.ACTIVE)).map( - (details) => details.id, - ) + // Simple query to make sure the status endpoint is connected + public async statusEndpointConnected(): Promise { + try { + const result = await this.status + .query( + gql` + query { + __typename + } + `, + undefined, + ) + .toPromise() + + if (result.error) { + throw result.error + } + + return !!result.data + } catch (error) { + this.logger.error(`Failed to query status endpoint`, { error }) + return false + } } - public async subgraphDeploymentAssignmentsForAllocateActions( + public async subgraphDeploymentAssignmentsByDeploymentID( subgraphStatus: SubgraphStatus, - actions: Action[], + deploymentIDs: string[], ): Promise { - const deploymentIDs = actions.map((action) => action.deploymentID) - - const nodeOnlyResult = await this.status - .query( - gql` - query indexingStatuses($subgraphs: [String!]!) { - indexingStatuses(subgraphs: $subgraphs) { - subgraphDeployment: subgraph - node + try { + const nodeOnlyResult = await this.status + .query( + gql` + query indexingStatuses($subgraphs: [String!]!) { + indexingStatuses(subgraphs: $subgraphs) { + subgraphDeployment: subgraph + node + } } - } - `, - { subgraphs: deploymentIDs }, - ) - .toPromise() + `, + { subgraphs: deploymentIDs }, + ) + .toPromise() - if (nodeOnlyResult.error) { - throw nodeOnlyResult.error - } + if (nodeOnlyResult.error) { + throw nodeOnlyResult.error + } - const withAssignments: string[] = nodeOnlyResult.data.indexingStatuses - .filter( - (result: { node: string | null }) => - result.node !== null && result.node !== undefined, - ) - .map((result: { subgraphDeployment: string }) => result.subgraphDeployment) - - const result = await this.status - .query( - gql` - query indexingStatuses($subgraphs: [String!]!) { - indexingStatuses(subgraphs: $subgraphs) { - subgraphDeployment: subgraph - node - paused + const withAssignments: string[] = nodeOnlyResult.data.indexingStatuses + .filter( + (result: { node: string | null }) => + result.node !== null && result.node !== undefined, + ) + .map((result: { subgraphDeployment: string }) => result.subgraphDeployment) + + const result = await this.status + .query( + gql` + query indexingStatuses($subgraphs: [String!]!) { + indexingStatuses(subgraphs: $subgraphs) { + subgraphDeployment: subgraph + node + paused + } } - } - `, - { subgraphs: withAssignments }, - ) - .toPromise() + `, + { subgraphs: withAssignments }, + ) + .toPromise() - if (result.error) { - throw result.error - } + if (result.error) { + throw result.error + } - if (!result.data.indexingStatuses || result.data.length === 0) { - this.logger.warn(`No 'indexingStatuses' data returned from index nodes`, { - data: result.data, - }) - return [] - } + if (!result.data.indexingStatuses || result.data.length === 0) { + this.logger.warn(`No 'indexingStatuses' data returned from index nodes`, { + data: result.data, + }) + return [] + } - type QueryResult = { - subgraphDeployment: string - node: string | undefined - paused: boolean | undefined - } + type QueryResult = { + subgraphDeployment: string + node: string | undefined + paused: boolean | undefined + } - const results = result.data.indexingStatuses - .filter((status: QueryResult) => { - if (subgraphStatus === SubgraphStatus.ACTIVE) { - return ( - status.paused === false || - (status.paused === undefined && status.node !== 'removed') - ) - } else if (subgraphStatus === SubgraphStatus.PAUSED) { - return status.node === 'removed' || status.paused === true - } else if (subgraphStatus === SubgraphStatus.ALL) { - return true - } - }) - .map((status: QueryResult) => { - return { - id: new SubgraphDeploymentID(status.subgraphDeployment), - node: status.node, - paused: status.paused ?? status.node === 'removed', - } - }) + const results = result.data.indexingStatuses + .filter((status: QueryResult) => { + if (subgraphStatus === SubgraphStatus.ACTIVE) { + return ( + status.paused === false || + (status.paused === undefined && status.node !== 'removed') + ) + } else if (subgraphStatus === SubgraphStatus.PAUSED) { + return status.node === 'removed' || status.paused === true + } else if (subgraphStatus === SubgraphStatus.ALL) { + return true + } + }) + .map((status: QueryResult) => { + return { + id: new SubgraphDeploymentID(status.subgraphDeployment), + node: status.node, + paused: status.paused ?? status.node === 'removed', + } + }) - return results + return results + } catch (error) { + const err = indexerError(IndexerErrorCode.IE018, error) + this.logger.error(`Failed to query indexing status API`, { err }) + throw err + } } public async subgraphDeploymentsAssignments( @@ -265,8 +283,11 @@ export class GraphNode { ) .toPromise() + const deploymentCount = nodeOnlyResult.data?.indexingStatuses?.length ?? 0 this.logger.debug( - `Fetch subgraph deployment assignments took ${Date.now() - startTimeMs}ms`, + `Fetch subgraph deployment assignments (1/2, node only) took ${ + Date.now() - startTimeMs + }ms for ${deploymentCount} deployments`, ) if (nodeOnlyResult.error) { @@ -313,6 +334,12 @@ export class GraphNode { paused: boolean | undefined } + const deploymentCount2 = result.data?.indexingStatuses?.length ?? 0 + this.logger.debug( + `Fetch subgraph deployment assignments (2/2, with paused) took ${ + Date.now() - startTimeMs + }ms and returned ${deploymentCount}/${deploymentCount2} deployments`, + ) const results = result.data.indexingStatuses .filter((status: QueryResult) => { if (subgraphStatus === SubgraphStatus.ACTIVE) { @@ -557,7 +584,9 @@ export class GraphNode { try { const deploymentAssignments = currentAssignments ?? - (await this.subgraphDeploymentsAssignments(SubgraphStatus.ALL)) + (await this.subgraphDeploymentAssignmentsByDeploymentID(SubgraphStatus.ALL, [ + deployment.ipfsHash, + ])) const matchingAssignment = deploymentAssignments.find( (deploymentAssignment) => deploymentAssignment.id.ipfsHash == deployment.ipfsHash, ) diff --git a/packages/indexer-common/src/indexer-management/allocations.ts b/packages/indexer-common/src/indexer-management/allocations.ts index c30192152..a26f82cb5 100644 --- a/packages/indexer-common/src/indexer-management/allocations.ts +++ b/packages/indexer-common/src/indexer-management/allocations.ts @@ -323,9 +323,9 @@ export class AllocationManager { allocateActions, }) const currentAssignments = - await this.graphNode.subgraphDeploymentAssignmentsForAllocateActions( + await this.graphNode.subgraphDeploymentAssignmentsByDeploymentID( SubgraphStatus.ALL, - actions, + actions.map((action) => action.deploymentID!), ) await pMap( allocateActions, diff --git a/packages/indexer-common/src/indexer-management/monitor.ts b/packages/indexer-common/src/indexer-management/monitor.ts index 3e78ab3ce..56a8b13cd 100644 --- a/packages/indexer-common/src/indexer-management/monitor.ts +++ b/packages/indexer-common/src/indexer-management/monitor.ts @@ -51,6 +51,10 @@ export class NetworkMonitor { private epochSubgraph: EpochSubgraph, ) {} + poiDisputeMonitoringEnabled(): boolean { + return this.indexerOptions.poiDisputeMonitoring + } + async currentEpochNumber(): Promise { return (await this.contracts.epochManager.currentEpoch()).toNumber() } From af220ff8a5157c1869e8a1716d05e77a12034e7a Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Fri, 27 Dec 2024 14:28:20 -0800 Subject: [PATCH 8/8] common: limit indexer status query by subgraph ids, add time logging for allocations query --- packages/indexer-common/src/indexer-management/allocations.ts | 4 +++- packages/indexer-common/src/indexer-management/monitor.ts | 4 ++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/packages/indexer-common/src/indexer-management/allocations.ts b/packages/indexer-common/src/indexer-management/allocations.ts index a26f82cb5..035aa0932 100644 --- a/packages/indexer-common/src/indexer-management/allocations.ts +++ b/packages/indexer-common/src/indexer-management/allocations.ts @@ -251,7 +251,9 @@ export class AllocationManager { 2, ), currentEpoch, - indexingStatuses: await this.graphNode.indexingStatus([]), + indexingStatuses: await this.graphNode.indexingStatus( + actions.map((action) => new SubgraphDeploymentID(action.deploymentID!)), + ), } return await pMap( actions, diff --git a/packages/indexer-common/src/indexer-management/monitor.ts b/packages/indexer-common/src/indexer-management/monitor.ts index 56a8b13cd..249a414f3 100644 --- a/packages/indexer-common/src/indexer-management/monitor.ts +++ b/packages/indexer-common/src/indexer-management/monitor.ts @@ -132,6 +132,7 @@ export class NetworkMonitor { } async allocations(status: AllocationStatus): Promise { + const startTimeMs = Date.now() try { this.logger.debug(`Fetch ${status} allocations`) let dataRemaining = true @@ -202,6 +203,9 @@ export class NetworkMonitor { ) } + this.logger.debug( + `Finished fetching ${status} allocations in ${Date.now() - startTimeMs}ms`, + ) return allocations } catch (error) { const err = indexerError(IndexerErrorCode.IE010, error)