diff --git a/packages/indexer-common/src/allocations/tap-collector.ts b/packages/indexer-common/src/allocations/tap-collector.ts index e1561577f..500694b30 100644 --- a/packages/indexer-common/src/allocations/tap-collector.ts +++ b/packages/indexer-common/src/allocations/tap-collector.ts @@ -21,6 +21,7 @@ import { allocationSigner, tapAllocationIdProof, parseGraphQLAllocation, + sequentialTimerMap, } from '..' import { BigNumber } from 'ethers' import pReduce from 'p-reduce' @@ -183,8 +184,12 @@ export class TapCollector { } private getPendingRAVs(): Eventual { - return this.allocations.throttle(RAV_CHECK_INTERVAL_MS).tryMap( - async (allocations) => { + return sequentialTimerMap( + { + logger: this.logger, + milliseconds: RAV_CHECK_INTERVAL_MS, + }, + async () => { let ravs = await this.pendingRAVs() if (ravs.length === 0) { this.logger.info(`No pending RAVs to process`) @@ -193,9 +198,10 @@ export class TapCollector { if (ravs.length > 0) { ravs = await this.filterAndUpdateRavs(ravs) } - this.logger.debug(`matching allocations for pending ravs`, { - allocationCount: allocations.length, - ravCount: ravs.length, + const allocations: Allocation[] = await this.getAllocationsfromAllocationIds(ravs) + this.logger.info(`Retrieved allocations for pending RAVs`, { + ravs: ravs.length, + allocations: allocations.length, }) return ravs .map((rav) => { diff --git a/packages/indexer-common/src/indexer-management/actions.ts b/packages/indexer-common/src/indexer-management/actions.ts index 3eb56a44b..95a9e668c 100644 --- a/packages/indexer-common/src/indexer-management/actions.ts +++ b/packages/indexer-common/src/indexer-management/actions.ts @@ -31,6 +31,8 @@ export class ActionManager { declare models: IndexerManagementModels declare allocationManagers: NetworkMapped + executeBatchActionsPromise: Promise | undefined + static async create( multiNetworks: MultiNetworks, logger: Logger, @@ -228,7 +230,19 @@ export class ActionManager { return updatedActions } + // a promise guard to ensure that only one batch of actions is executed at a time async executeApprovedActions(network: Network): Promise { + if (this.executeBatchActionsPromise) { + this.logger.warn('Previous batch action execution is still in progress') + return this.executeBatchActionsPromise + } + this.executeBatchActionsPromise = this.executeApprovedActionsInner(network) + const updatedActions = await this.executeBatchActionsPromise + this.executeBatchActionsPromise = undefined + return updatedActions + } + + async executeApprovedActionsInner(network: Network): Promise { let updatedActions: Action[] = [] const protocolNetwork = network.specification.networkIdentifier const logger = this.logger.child({ @@ -236,11 +250,12 @@ export class ActionManager { protocolNetwork, }) - logger.trace('Begin database transaction for executing approved actions') + logger.debug('Begin database transaction for executing approved actions') // eslint-disable-next-line @typescript-eslint/no-non-null-assertion await this.models.Action.sequelize!.transaction( { isolationLevel: Transaction.ISOLATION_LEVELS.SERIALIZABLE }, async (transaction) => { + const transactionOpenTime = Date.now() let approvedActions try { // Execute already approved actions in the order of type and priority. @@ -276,6 +291,11 @@ export class ActionManager { return [] } try { + logger.debug('Executing batch action', { + approvedActions, + startTimeMs: Date.now() - transactionOpenTime, + }) + // This will return all results if successful, if failed it will return the failed actions const allocationManager = this.allocationManagers[network.specification.networkIdentifier] @@ -283,15 +303,21 @@ export class ActionManager { logger.debug('Completed batch action execution', { results, + endTimeMs: Date.now() - transactionOpenTime, }) updatedActions = await this.updateActionStatuses(results, transaction) + + logger.debug('Updated action statuses', { + updatedActions, + updatedTimeMs: Date.now() - transactionOpenTime, + }) } catch (error) { logger.error(`Failed to execute batch tx on staking contract: ${error}`) throw indexerError(IndexerErrorCode.IE072, error) } }, ) - logger.trace('End database transaction for executing approved actions') + logger.debug('End database transaction for executing approved actions') return updatedActions } diff --git a/packages/indexer-common/src/indexer-management/allocations.ts b/packages/indexer-common/src/indexer-management/allocations.ts index 797c54476..f556224b3 100644 --- a/packages/indexer-common/src/indexer-management/allocations.ts +++ b/packages/indexer-common/src/indexer-management/allocations.ts @@ -116,7 +116,7 @@ export class AllocationManager { return await this.confirmTransactions(result, actions) } - async executeTransactions(actions: Action[]): Promise { + private async executeTransactions(actions: Action[]): Promise { const logger = this.logger.child({ function: 'executeTransactions' }) logger.trace('Begin executing transactions', { actions }) if (actions.length < 1) { diff --git a/packages/indexer-common/src/sequential-timer.ts b/packages/indexer-common/src/sequential-timer.ts index 9a50d4835..6d43a94af 100644 --- a/packages/indexer-common/src/sequential-timer.ts +++ b/packages/indexer-common/src/sequential-timer.ts @@ -20,7 +20,6 @@ export interface TimerTaskContext { function logWorkTime( workStarted: number, logger: Logger, - loopTime: number, caller: string | undefined, milliseconds: number, ) { @@ -28,9 +27,10 @@ function logWorkTime( const workTime = Date.now() - workStarted if (workTime > milliseconds + workTimeWarningThreshold) { logger.warn( - 'timer work took longer than the sequential timer was configured for (>5s)', + `timer work took ${ + (workTime - milliseconds) / 1000 + }s longer than expected, next execution in ${milliseconds / 1000}s`, { - loopTime, workTime, milliseconds, caller, @@ -57,7 +57,6 @@ export function sequentialTimerReduce( // obtain the calling method name from the call stack const stack = new Error().stack const caller = stack?.split('\n')[2].trim() - let lastWorkStarted = Date.now() let acc: U = initial let previousT: T | undefined @@ -74,26 +73,23 @@ export function sequentialTimerReduce( function work() { const workStarted = Date.now() const promiseOrT = reducer(acc, workStarted) - const loopTime = workStarted - lastWorkStarted - - lastWorkStarted = workStarted if (isPromiseLike(promiseOrT)) { promiseOrT.then( function onfulfilled(value) { outputReduce(value) - logWorkTime(workStarted, logger, loopTime, caller, milliseconds) - setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted))) + logWorkTime(workStarted, logger, caller, milliseconds) + setTimeout(work, milliseconds) }, function onrejected(err) { console.error(err) - logWorkTime(workStarted, logger, loopTime, caller, milliseconds) - setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted))) + logWorkTime(workStarted, logger, caller, milliseconds) + setTimeout(work, milliseconds) }, ) } else { outputReduce(promiseOrT) - logWorkTime(workStarted, logger, loopTime, caller, milliseconds) - setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted))) + logWorkTime(workStarted, logger, caller, milliseconds) + setTimeout(work, milliseconds) } } // initial call @@ -118,8 +114,6 @@ export function sequentialTimerMap( // obtain the calling method name from the call stack const stack = new Error().stack const caller = stack?.split('\n')[2].trim() - let lastWorkStarted = Date.now() - const output = mutable() let latestU: U | undefined @@ -135,27 +129,25 @@ export function sequentialTimerMap( function work() { const workStarted = Date.now() const promiseOrU = mapper(workStarted) - const loopTime = workStarted - lastWorkStarted - lastWorkStarted = workStarted if (isPromiseLike(promiseOrU)) { promiseOrU.then( function onfulfilled(value) { checkMappedValue(value) - logWorkTime(workStarted, logger, loopTime, caller, milliseconds) - setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted))) + logWorkTime(workStarted, logger, caller, milliseconds) + setTimeout(work, milliseconds) }, function onrejected(err) { options?.onError(err) - logWorkTime(workStarted, logger, loopTime, caller, milliseconds) - setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted))) + logWorkTime(workStarted, logger, caller, milliseconds) + setTimeout(work, milliseconds) }, ) } else { // resolved value checkMappedValue(promiseOrU) - logWorkTime(workStarted, logger, loopTime, caller, milliseconds) - setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted))) + logWorkTime(workStarted, logger, caller, milliseconds) + setTimeout(work, milliseconds) } }