Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue: action-batch #1070

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
28 changes: 24 additions & 4 deletions packages/indexer-agent/src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -324,12 +324,32 @@ export class Agent {
},
)

// Skip fetching active deployments if the deployment management mode is manual and POI tracking is disabled
const activeDeployments: Eventual<SubgraphDeploymentID[]> =
sequentialTimerMap(
{ logger, milliseconds: requestIntervalSmall },
() => {
logger.trace('Fetching active deployments')
return this.graphNode.subgraphDeployments()
{ logger, milliseconds: requestIntervalLarge },
async () => {
const deployments = await this.multiNetworks.map(
async ({ network }) => {
if (
this.deploymentManagement === DeploymentManagementMode.AUTO ||
network.networkMonitor.poiDisputeMonitoringEnabled()
) {
logger.trace('Fetching active deployments')
const assignments =
await this.graphNode.subgraphDeploymentsAssignments(
SubgraphStatus.ACTIVE,
)
return assignments.map(assignment => assignment.id)
} else {
logger.info(
"Skipping fetching active deployments fetch since DeploymentManagementMode = 'manual' and POI tracking is disabled",
)
return []
}
},
)
return deployments.values
},
{
onError: error =>
Expand Down
9 changes: 5 additions & 4 deletions packages/indexer-common/src/allocations/tap-collector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ export class TapCollector {
networkSubgraph,
}: TapCollectorOptions): TapCollector {
const collector = new TapCollector()
collector.logger = logger.child({ component: 'AllocationReceiptCollector' })
collector.logger = logger.child({ component: 'TapCollector' })
collector.metrics = registerReceiptMetrics(
metrics,
networkSpecification.networkIdentifier,
Expand Down Expand Up @@ -199,9 +199,10 @@ export class TapCollector {
ravs = await this.filterAndUpdateRavs(ravs)
}
const allocations: Allocation[] = await this.getAllocationsfromAllocationIds(ravs)
this.logger.info(
`Retrieved allocations for pending RAVs \n: ${JSON.stringify(allocations)}`,
)
this.logger.info(`Retrieved allocations for pending RAVs`, {
ravs: ravs.length,
allocations: allocations.length,
})
return ravs
.map((rav) => {
const signedRav = rav.getSignedRAV()
Expand Down
154 changes: 144 additions & 10 deletions packages/indexer-common/src/graph-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,12 @@ export class GraphNode {
this.logger.info(`Check if indexing status API is available`)
await pRetry(
async () => {
const deployments = await this.subgraphDeployments()
this.logger.info(`Successfully connected to indexing status API`, {
currentDeployments: deployments.map((deployment) => deployment.display),
})
if (await this.statusEndpointConnected()) {
this.logger.info(`Successfully connected to indexing status API`, {})
} else {
this.logger.error(`Failed to connect to indexing status API`)
throw new Error('Indexing status API not available')
}
},
{
retries: 10,
Expand Down Expand Up @@ -142,16 +144,127 @@ export class GraphNode {
return new URL(deploymentIpfsHash, this.queryBaseURL).toString()
}

public async subgraphDeployments(): Promise<SubgraphDeploymentID[]> {
return (await this.subgraphDeploymentsAssignments(SubgraphStatus.ACTIVE)).map(
(details) => details.id,
)
// Simple query to make sure the status endpoint is connected
public async statusEndpointConnected(): Promise<boolean> {
try {
const result = await this.status
.query(
gql`
query {
__typename
}
`,
undefined,
)
.toPromise()

if (result.error) {
throw result.error
}

return !!result.data
} catch (error) {
this.logger.error(`Failed to query status endpoint`, { error })
return false
}
}

public async subgraphDeploymentAssignmentsByDeploymentID(
subgraphStatus: SubgraphStatus,
deploymentIDs: string[],
): Promise<SubgraphDeploymentAssignment[]> {
try {
const nodeOnlyResult = await this.status
.query(
gql`
query indexingStatuses($subgraphs: [String!]!) {
indexingStatuses(subgraphs: $subgraphs) {
subgraphDeployment: subgraph
node
}
}
`,
{ subgraphs: deploymentIDs },
)
.toPromise()

if (nodeOnlyResult.error) {
throw nodeOnlyResult.error
}

const withAssignments: string[] = nodeOnlyResult.data.indexingStatuses
.filter(
(result: { node: string | null }) =>
result.node !== null && result.node !== undefined,
)
.map((result: { subgraphDeployment: string }) => result.subgraphDeployment)

const result = await this.status
.query(
gql`
query indexingStatuses($subgraphs: [String!]!) {
indexingStatuses(subgraphs: $subgraphs) {
subgraphDeployment: subgraph
node
paused
}
}
`,
{ subgraphs: withAssignments },
)
.toPromise()

if (result.error) {
throw result.error
}

if (!result.data.indexingStatuses || result.data.length === 0) {
this.logger.warn(`No 'indexingStatuses' data returned from index nodes`, {
data: result.data,
})
return []
}

type QueryResult = {
subgraphDeployment: string
node: string | undefined
paused: boolean | undefined
}

const results = result.data.indexingStatuses
.filter((status: QueryResult) => {
if (subgraphStatus === SubgraphStatus.ACTIVE) {
return (
status.paused === false ||
(status.paused === undefined && status.node !== 'removed')
)
} else if (subgraphStatus === SubgraphStatus.PAUSED) {
return status.node === 'removed' || status.paused === true
} else if (subgraphStatus === SubgraphStatus.ALL) {
return true
}
})
.map((status: QueryResult) => {
return {
id: new SubgraphDeploymentID(status.subgraphDeployment),
node: status.node,
paused: status.paused ?? status.node === 'removed',
}
})

return results
} catch (error) {
const err = indexerError(IndexerErrorCode.IE018, error)
this.logger.error(`Failed to query indexing status API`, { err })
throw err
}
}

public async subgraphDeploymentsAssignments(
subgraphStatus: SubgraphStatus,
): Promise<SubgraphDeploymentAssignment[]> {
try {
const startTimeMs = Date.now()
this.logger.debug('Fetch subgraph deployment assignments')

// FIXME: remove this initial check for just node when graph-node releases
Expand All @@ -170,6 +283,13 @@ export class GraphNode {
)
.toPromise()

const deploymentCount = nodeOnlyResult.data?.indexingStatuses?.length ?? 0
this.logger.debug(
`Fetch subgraph deployment assignments (1/2, node only) took ${
Date.now() - startTimeMs
}ms for ${deploymentCount} deployments`,
)

if (nodeOnlyResult.error) {
throw nodeOnlyResult.error
}
Expand Down Expand Up @@ -214,7 +334,13 @@ export class GraphNode {
paused: boolean | undefined
}

return result.data.indexingStatuses
const deploymentCount2 = result.data?.indexingStatuses?.length ?? 0
this.logger.debug(
`Fetch subgraph deployment assignments (2/2, with paused) took ${
Date.now() - startTimeMs
}ms and returned ${deploymentCount}/${deploymentCount2} deployments`,
)
const results = result.data.indexingStatuses
.filter((status: QueryResult) => {
if (subgraphStatus === SubgraphStatus.ACTIVE) {
return (
Expand All @@ -234,6 +360,12 @@ export class GraphNode {
paused: status.paused ?? status.node === 'removed',
}
})
this.logger.debug(
`Fetching mapped subgraph deployment ${results.length} assignments took ${
Date.now() - startTimeMs
}ms`,
)
return results
} catch (error) {
const err = indexerError(IndexerErrorCode.IE018, error)
this.logger.error(`Failed to query indexing status API`, { err })
Expand Down Expand Up @@ -452,7 +584,9 @@ export class GraphNode {
try {
const deploymentAssignments =
currentAssignments ??
(await this.subgraphDeploymentsAssignments(SubgraphStatus.ALL))
(await this.subgraphDeploymentAssignmentsByDeploymentID(SubgraphStatus.ALL, [
deployment.ipfsHash,
]))
const matchingAssignment = deploymentAssignments.find(
(deploymentAssignment) => deploymentAssignment.id.ipfsHash == deployment.ipfsHash,
)
Expand Down
35 changes: 33 additions & 2 deletions packages/indexer-common/src/indexer-management/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ export class ActionManager {
declare models: IndexerManagementModels
declare allocationManagers: NetworkMapped<AllocationManager>

executeBatchActionsPromise: Promise<Action[]> | undefined

static async create(
multiNetworks: MultiNetworks<Network>,
logger: Logger,
Expand Down Expand Up @@ -228,19 +230,37 @@ export class ActionManager {
return updatedActions
}

// a promise guard to ensure that only one batch of actions is executed at a time
async executeApprovedActions(network: Network): Promise<Action[]> {
if (this.executeBatchActionsPromise) {
this.logger.warn('Previous batch action execution is still in progress')
return this.executeBatchActionsPromise
}

let updatedActions: Action[] = []
try {
this.executeBatchActionsPromise = this.executeApprovedActionsInner(network)
updatedActions = await this.executeBatchActionsPromise
} finally {
this.executeBatchActionsPromise = undefined
}
return updatedActions
}

async executeApprovedActionsInner(network: Network): Promise<Action[]> {
let updatedActions: Action[] = []
const protocolNetwork = network.specification.networkIdentifier
const logger = this.logger.child({
function: 'executeApprovedActions',
protocolNetwork,
})

logger.trace('Begin database transaction for executing approved actions')
logger.debug('Begin database transaction for executing approved actions')
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
await this.models.Action.sequelize!.transaction(
{ isolationLevel: Transaction.ISOLATION_LEVELS.SERIALIZABLE },
async (transaction) => {
const transactionOpenTime = Date.now()
let approvedActions
try {
// Execute already approved actions in the order of type and priority.
Expand Down Expand Up @@ -276,22 +296,33 @@ export class ActionManager {
return []
}
try {
logger.debug('Executing batch action', {
approvedActions,
startTimeMs: Date.now() - transactionOpenTime,
})

// This will return all results if successful, if failed it will return the failed actions
const allocationManager =
this.allocationManagers[network.specification.networkIdentifier]
const results = await allocationManager.executeBatch(approvedActions)

logger.debug('Completed batch action execution', {
results,
endTimeMs: Date.now() - transactionOpenTime,
})
updatedActions = await this.updateActionStatuses(results, transaction)

logger.debug('Updated action statuses', {
updatedActions,
updatedTimeMs: Date.now() - transactionOpenTime,
})
} catch (error) {
logger.error(`Failed to execute batch tx on staking contract: ${error}`)
throw indexerError(IndexerErrorCode.IE072, error)
}
},
)
logger.trace('End database transaction for executing approved actions')
logger.debug('End database transaction for executing approved actions')
return updatedActions
}

Expand Down
Loading
Loading