Skip to content

Commit

Permalink
common: make monitorQueue sequential, promise-guard concurrent execut…
Browse files Browse the repository at this point in the history
…eApprovedActions, add logging
  • Loading branch information
dwerner committed Dec 27, 2024
1 parent e37c35b commit 27ab3e0
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 31 deletions.
16 changes: 11 additions & 5 deletions packages/indexer-common/src/allocations/tap-collector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
allocationSigner,
tapAllocationIdProof,
parseGraphQLAllocation,
sequentialTimerMap,
} from '..'
import { BigNumber } from 'ethers'
import pReduce from 'p-reduce'
Expand Down Expand Up @@ -183,8 +184,12 @@ export class TapCollector {
}

private getPendingRAVs(): Eventual<RavWithAllocation[]> {
return this.allocations.throttle(RAV_CHECK_INTERVAL_MS).tryMap(
async (allocations) => {
return sequentialTimerMap(
{
logger: this.logger,
milliseconds: RAV_CHECK_INTERVAL_MS,
},
async () => {
let ravs = await this.pendingRAVs()
if (ravs.length === 0) {
this.logger.info(`No pending RAVs to process`)
Expand All @@ -193,9 +198,10 @@ export class TapCollector {
if (ravs.length > 0) {
ravs = await this.filterAndUpdateRavs(ravs)
}
this.logger.debug(`matching allocations for pending ravs`, {
allocationCount: allocations.length,
ravCount: ravs.length,
const allocations: Allocation[] = await this.getAllocationsfromAllocationIds(ravs)
this.logger.info(`Retrieved allocations for pending RAVs`, {
ravs: ravs.length,
allocations: allocations.length,
})
return ravs
.map((rav) => {
Expand Down
30 changes: 28 additions & 2 deletions packages/indexer-common/src/indexer-management/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ export class ActionManager {
declare models: IndexerManagementModels
declare allocationManagers: NetworkMapped<AllocationManager>

executeBatchActionsPromise: Promise<Action[]> | undefined

static async create(
multiNetworks: MultiNetworks<Network>,
logger: Logger,
Expand Down Expand Up @@ -228,19 +230,32 @@ export class ActionManager {
return updatedActions
}

// a promise guard to ensure that only one batch of actions is executed at a time
async executeApprovedActions(network: Network): Promise<Action[]> {
if (this.executeBatchActionsPromise) {
this.logger.warn('Previous batch action execution is still in progress')
return this.executeBatchActionsPromise
}
this.executeBatchActionsPromise = this.executeApprovedActionsInner(network)
const updatedActions = await this.executeBatchActionsPromise
this.executeBatchActionsPromise = undefined
return updatedActions
}

async executeApprovedActionsInner(network: Network): Promise<Action[]> {
let updatedActions: Action[] = []
const protocolNetwork = network.specification.networkIdentifier
const logger = this.logger.child({
function: 'executeApprovedActions',
protocolNetwork,
})

logger.trace('Begin database transaction for executing approved actions')
logger.debug('Begin database transaction for executing approved actions')
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
await this.models.Action.sequelize!.transaction(
{ isolationLevel: Transaction.ISOLATION_LEVELS.SERIALIZABLE },
async (transaction) => {
const transactionOpenTime = Date.now()
let approvedActions
try {
// Execute already approved actions in the order of type and priority.
Expand Down Expand Up @@ -276,22 +291,33 @@ export class ActionManager {
return []
}
try {
logger.debug('Executing batch action', {
approvedActions,
startTimeMs: Date.now() - transactionOpenTime,
})

// This will return all results if successful, if failed it will return the failed actions
const allocationManager =
this.allocationManagers[network.specification.networkIdentifier]
const results = await allocationManager.executeBatch(approvedActions)

logger.debug('Completed batch action execution', {
results,
endTimeMs: Date.now() - transactionOpenTime,
})
updatedActions = await this.updateActionStatuses(results, transaction)

logger.debug('Updated action statuses', {
updatedActions,
updatedTimeMs: Date.now() - transactionOpenTime,
})
} catch (error) {
logger.error(`Failed to execute batch tx on staking contract: ${error}`)
throw indexerError(IndexerErrorCode.IE072, error)
}
},
)
logger.trace('End database transaction for executing approved actions')
logger.debug('End database transaction for executing approved actions')
return updatedActions
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ export class AllocationManager {
return await this.confirmTransactions(result, actions)
}

async executeTransactions(actions: Action[]): Promise<TransactionResult> {
private async executeTransactions(actions: Action[]): Promise<TransactionResult> {
const logger = this.logger.child({ function: 'executeTransactions' })
logger.trace('Begin executing transactions', { actions })
if (actions.length < 1) {
Expand Down
38 changes: 15 additions & 23 deletions packages/indexer-common/src/sequential-timer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ export interface TimerTaskContext {
function logWorkTime(
workStarted: number,
logger: Logger,
loopTime: number,
caller: string | undefined,
milliseconds: number,
) {
const workTimeWarningThreshold = 5000
const workTime = Date.now() - workStarted
if (workTime > milliseconds + workTimeWarningThreshold) {
logger.warn(
'timer work took longer than the sequential timer was configured for (>5s)',
`timer work took ${
(workTime - milliseconds) / 1000
}s longer than expected, next execution in ${milliseconds / 1000}s`,
{
loopTime,
workTime,
milliseconds,
caller,
Expand All @@ -57,7 +57,6 @@ export function sequentialTimerReduce<T, U>(
// obtain the calling method name from the call stack
const stack = new Error().stack
const caller = stack?.split('\n')[2].trim()
let lastWorkStarted = Date.now()

let acc: U = initial
let previousT: T | undefined
Expand All @@ -74,26 +73,23 @@ export function sequentialTimerReduce<T, U>(
function work() {
const workStarted = Date.now()
const promiseOrT = reducer(acc, workStarted)
const loopTime = workStarted - lastWorkStarted

lastWorkStarted = workStarted
if (isPromiseLike(promiseOrT)) {
promiseOrT.then(
function onfulfilled(value) {
outputReduce(value)
logWorkTime(workStarted, logger, loopTime, caller, milliseconds)
setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted)))
logWorkTime(workStarted, logger, caller, milliseconds)
setTimeout(work, milliseconds)
},
function onrejected(err) {
console.error(err)
logWorkTime(workStarted, logger, loopTime, caller, milliseconds)
setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted)))
logWorkTime(workStarted, logger, caller, milliseconds)
setTimeout(work, milliseconds)
},
)
} else {
outputReduce(promiseOrT)
logWorkTime(workStarted, logger, loopTime, caller, milliseconds)
setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted)))
logWorkTime(workStarted, logger, caller, milliseconds)
setTimeout(work, milliseconds)
}
}
// initial call
Expand All @@ -118,8 +114,6 @@ export function sequentialTimerMap<U>(
// obtain the calling method name from the call stack
const stack = new Error().stack
const caller = stack?.split('\n')[2].trim()
let lastWorkStarted = Date.now()

const output = mutable<U>()

let latestU: U | undefined
Expand All @@ -135,27 +129,25 @@ export function sequentialTimerMap<U>(
function work() {
const workStarted = Date.now()
const promiseOrU = mapper(workStarted)
const loopTime = workStarted - lastWorkStarted
lastWorkStarted = workStarted

if (isPromiseLike(promiseOrU)) {
promiseOrU.then(
function onfulfilled(value) {
checkMappedValue(value)
logWorkTime(workStarted, logger, loopTime, caller, milliseconds)
setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted)))
logWorkTime(workStarted, logger, caller, milliseconds)
setTimeout(work, milliseconds)
},
function onrejected(err) {
options?.onError(err)
logWorkTime(workStarted, logger, loopTime, caller, milliseconds)
setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted)))
logWorkTime(workStarted, logger, caller, milliseconds)
setTimeout(work, milliseconds)
},
)
} else {
// resolved value
checkMappedValue(promiseOrU)
logWorkTime(workStarted, logger, loopTime, caller, milliseconds)
setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted)))
logWorkTime(workStarted, logger, caller, milliseconds)
setTimeout(work, milliseconds)
}
}

Expand Down

0 comments on commit 27ab3e0

Please sign in to comment.