diff --git a/packages/indexer-common/src/__tests__/grafting.test.ts b/packages/indexer-common/src/__tests__/grafting.test.ts index 48d60790e..baa007bb0 100644 --- a/packages/indexer-common/src/__tests__/grafting.test.ts +++ b/packages/indexer-common/src/__tests__/grafting.test.ts @@ -5,7 +5,7 @@ import { SubgraphLineageWithStatus, } from '../grafting' import { SubgraphDeploymentID } from '@graphprotocol/common-ts' -import { indexerError, IndexerErrorCode } from '../errors' +import { IndexerErrorCode } from '../errors' import { SubgraphDeploymentDecisionKind } from '../types' // Create a mock for the fetchSubgraphManifest function @@ -173,7 +173,8 @@ describe('determineSubgraphDeploymentDecisions function', () => { const expected = [ { deployment: new SubgraphDeploymentID(base1), - deploymentDecision: SubgraphDeploymentDecisionKind.DEPLOY, + kind: SubgraphDeploymentDecisionKind.DEPLOY, + expectedBlockHeight: 1, }, ] expect(decisions).toEqual(expected) @@ -199,7 +200,8 @@ describe('determineSubgraphDeploymentDecisions function', () => { const expected = [ { deployment: new SubgraphDeploymentID(base2), - deploymentDecision: SubgraphDeploymentDecisionKind.DEPLOY, + kind: SubgraphDeploymentDecisionKind.DEPLOY, + expectedBlockHeight: 20, }, ] expect(decisions).toEqual(expected) @@ -226,7 +228,8 @@ describe('determineSubgraphDeploymentDecisions function', () => { const expected = [ { deployment: new SubgraphDeploymentID(base1), - deploymentDecision: SubgraphDeploymentDecisionKind.REMOVE, + kind: SubgraphDeploymentDecisionKind.REMOVE, + expectedBlockHeight: 10, }, ] expect(decisions).toEqual(expected) @@ -269,15 +272,18 @@ describe('determineSubgraphDeploymentDecisions function', () => { const expected = [ { deployment: new SubgraphDeploymentID(base3), - deploymentDecision: SubgraphDeploymentDecisionKind.REMOVE, + kind: SubgraphDeploymentDecisionKind.REMOVE, + expectedBlockHeight: 10, }, { deployment: new SubgraphDeploymentID(base2), - deploymentDecision: SubgraphDeploymentDecisionKind.REMOVE, + kind: SubgraphDeploymentDecisionKind.REMOVE, + expectedBlockHeight: 20, }, { deployment: new SubgraphDeploymentID(base1), - deploymentDecision: SubgraphDeploymentDecisionKind.DEPLOY, + kind: SubgraphDeploymentDecisionKind.DEPLOY, + expectedBlockHeight: 30, }, ] expect(decisions).toEqual(expected) @@ -330,16 +336,19 @@ describe('determineSubgraphDeploymentDecisions function', () => { const expected = [ { deployment: new SubgraphDeploymentID(base4), - deploymentDecision: SubgraphDeploymentDecisionKind.REMOVE, + kind: SubgraphDeploymentDecisionKind.REMOVE, + expectedBlockHeight: 5, }, // Base 3 is intentionally left out of the result. { deployment: new SubgraphDeploymentID(base2), - deploymentDecision: SubgraphDeploymentDecisionKind.REMOVE, + kind: SubgraphDeploymentDecisionKind.REMOVE, + expectedBlockHeight: 20, }, { deployment: new SubgraphDeploymentID(base1), - deploymentDecision: SubgraphDeploymentDecisionKind.DEPLOY, + kind: SubgraphDeploymentDecisionKind.DEPLOY, + expectedBlockHeight: 30, }, ] expect(decisions).toEqual(expected) diff --git a/packages/indexer-common/src/__tests__/subgraph.test.ts b/packages/indexer-common/src/__tests__/subgraph.test.ts index 23653713f..4ed780ff4 100644 --- a/packages/indexer-common/src/__tests__/subgraph.test.ts +++ b/packages/indexer-common/src/__tests__/subgraph.test.ts @@ -1,10 +1,10 @@ import { DocumentNode, print } from 'graphql' import { SubgraphFreshnessChecker, - LoggerInterface, ProviderInterface, SubgraphQueryInterface, } from '../subgraphs' +import { LoggerInterface } from '../types' import { QueryResult } from '../network-subgraph' import gql from 'graphql-tag' import { mergeSelectionSets } from '../utils' diff --git a/packages/indexer-common/src/grafting.ts b/packages/indexer-common/src/grafting.ts index 6d0f9b9e5..67e763a6c 100644 --- a/packages/indexer-common/src/grafting.ts +++ b/packages/indexer-common/src/grafting.ts @@ -2,15 +2,17 @@ import { SubgraphDeploymentID } from '@graphprotocol/common-ts' import { GraphNodeInterface } from './graph-node' import { BlockPointer, + LoggerInterface, SubgraphDeploymentDecision, SubgraphDeploymentDecisionKind, SubgraphManifest, } from './types' import { indexerError, IndexerErrorCode } from './errors' +import pMap from 'p-map' // Any type that can return a SubgraphManifest when given a // SubgraphDeploymentID as input. -type SubgraphManifestResolver = ( +export type SubgraphManifestResolver = ( subgraphID: SubgraphDeploymentID, ) => Promise @@ -49,6 +51,10 @@ export interface SubgraphLineageWithStatus extends SubgraphLineage { bases: GraftSubject[] } +export interface GraftBaseDeploymentDecision extends SubgraphDeploymentDecision { + expectedBlockHeight: number +} + // Discovers all graft dependencies for a given subgraph. export async function discoverLineage( subgraphManifestResolver: SubgraphManifestResolver, @@ -114,8 +120,8 @@ export async function getIndexingStatus( export function determineSubgraphDeploymentDecisions( subgraphLineage: SubgraphLineageWithStatus, -): SubgraphDeploymentDecision[] { - const deploymentDecisions: SubgraphDeploymentDecision[] = [] +): GraftBaseDeploymentDecision[] { + const deploymentDecisions: GraftBaseDeploymentDecision[] = [] // Check lineage size before making any assumptions. if (!subgraphLineage.bases.length) { @@ -143,7 +149,8 @@ export function determineSubgraphDeploymentDecisions( // Graph Node is not aware of this subgraph deployment. We must deploy it and look no further. deploymentDecisions.push({ deployment: graft.deployment, - deploymentDecision: SubgraphDeploymentDecisionKind.DEPLOY, + kind: SubgraphDeploymentDecisionKind.DEPLOY, + expectedBlockHeight: graft.block, }) break } @@ -154,7 +161,8 @@ export function determineSubgraphDeploymentDecisions( // If so, we can stop syncing it. deploymentDecisions.push({ deployment: graft.deployment, - deploymentDecision: SubgraphDeploymentDecisionKind.REMOVE, + kind: SubgraphDeploymentDecisionKind.REMOVE, + expectedBlockHeight: graft.block, }) continue } @@ -169,3 +177,31 @@ export function determineSubgraphDeploymentDecisions( } return deploymentDecisions } + +// Queries the Graph Node to get the deployment status of each graft base in the +// subgraph lineage. +export async function queryGraftBaseStatuses( + subgraphLineage: SubgraphLineage, + graphNode: GraphNodeInterface, + parentLogger: LoggerInterface, + concurrency: number = 5, +): Promise { + const logger = parentLogger.child({ function: 'queryGraftBaseStatuses' }) + logger.debug('Attempting to resolve graft bases for target subgraph') + + // Fetch deployment details for each graft base + logger.debug('Querying Graph-Node for graft bases deployment status') + const graftBasesDeploymentStatus = await pMap( + subgraphLineage.bases, + async (graftBase: GraftBase) => await getIndexingStatus(graftBase, graphNode), + { + stopOnError: true, + concurrency, + }, + ) + + return { + target: subgraphLineage.target, + bases: graftBasesDeploymentStatus, + } +} diff --git a/packages/indexer-common/src/graph-node.ts b/packages/indexer-common/src/graph-node.ts index e8b895b59..946d6bb72 100644 --- a/packages/indexer-common/src/graph-node.ts +++ b/packages/indexer-common/src/graph-node.ts @@ -62,6 +62,8 @@ export const parseGraphQLBlockPointer = (block: any): BlockPointer | null => export interface GraphNodeInterface { indexingStatus(deployments: SubgraphDeploymentID[]): Promise + ensure(name: string, deployment: SubgraphDeploymentID): Promise + remove(deployment: SubgraphDeploymentID): Promise } export class GraphNode implements GraphNodeInterface { @@ -391,11 +393,16 @@ export class GraphNode implements GraphNodeInterface { } catch (error) { if (!(error instanceof IndexerError)) { const errorCode = IndexerErrorCode.IE020 - this.logger.error(INDEXER_ERROR_MESSAGES[errorCode], { + const unknownIndexerError = indexerError(IndexerErrorCode.IE020, error) + const payload = { name, deployment: deployment.display, - error: indexerError(errorCode, error), - }) + error: unknownIndexerError, + } + this.logger.error(INDEXER_ERROR_MESSAGES[errorCode], payload) + throw unknownIndexerError + } else { + throw error } } } diff --git a/packages/indexer-common/src/subgraph-deployment.ts b/packages/indexer-common/src/subgraph-deployment.ts new file mode 100644 index 000000000..3b03e4e13 --- /dev/null +++ b/packages/indexer-common/src/subgraph-deployment.ts @@ -0,0 +1,283 @@ +import { SubgraphDeploymentID } from '@graphprotocol/common-ts' +import { LoggerInterface, SubgraphDeploymentDecisionKind } from './types' +import { GraphNodeInterface } from './graph-node' +import { Client, gql } from '@urql/core' +import { + discoverLineage, + determineSubgraphDeploymentDecisions, + SubgraphLineage, + queryGraftBaseStatuses, + GraftBaseDeploymentDecision, +} from './grafting' +import { SubgraphIdentifierType, fetchSubgraphManifest } from './subgraphs' +import { + IndexingDecisionBasis, + IndexingRuleAttributes, + IndexingRuleIdentifier, +} from './indexer-management' +import { IndexerErrorCode, indexerError } from './errors' + +const SET_INDEXING_RULE_MUTATION = gql` + mutation setIndexingRule($rule: IndexingRuleInput!) { + setIndexingRule(rule: $rule) { + identifier + identifierType + custom + decisionBasis + protocolNetwork + } + } +` + +const GET_INDEXING_RULE_QUERY = gql` + query indexingRule($identifier: IndexingRuleIdentifier!) { + indexingRule(identifier: $identifier, merged: false) { + custom + } + } +` +const DELETE_INDEXING_RULE_MUTATION = gql` + mutation deleteIndexingRule($identifier: IndexingRuleIdentifier!) { + deleteIndexingRule(identifier: $identifier) + } +` + +// Deploys a specified subgraph and handles grafting scenarios. +export async function deploySubgraph( + subgraphName: string, + subgraphDeployment: SubgraphDeploymentID, + graphNode: GraphNodeInterface, + ipfsURL: URL, + indexerManagement: Client, + protocolNetwork: string, + parentLogger: LoggerInterface, +): Promise { + // Inspect target subgraph's grafting lineage. + const logger = parentLogger.child({ + function: 'deploySubgraph', + subgraphDeployment, + }) + logger.debug('Attempting to resolve graft bases for target subgraph.') + const subgraphManifestResolver = async (subgraphID: SubgraphDeploymentID) => + await fetchSubgraphManifest(ipfsURL, subgraphID, logger) + const subgraphLineage = await discoverLineage( + subgraphManifestResolver, + subgraphDeployment, + ) + // If there's no graft base, deploy it right away + if (!subgraphLineage.bases.length) { + logger.debug('Subgraph has no graft dependencies.') + return await graphNode.ensure(subgraphName, subgraphDeployment) + } else { + return await deployGraftedSubgraph( + subgraphLineage, + graphNode, + indexerManagement, + protocolNetwork, + logger, + ) + } +} +// Attempts to deploy the first viable base for a grafted subgraph. +// Will create a new indexing rule for the next viable graft base and remove old rules +// for sufficiently synced bases. +async function deployGraftedSubgraph( + subgraphLineage: SubgraphLineage, + graphNode: GraphNodeInterface, + indexerManagement: Client, + protocolNetwork: string, + parentLogger: LoggerInterface, +): Promise { + const logger = parentLogger.child({ + function: 'deployGraftedSubgraph', + targetSubgraph: subgraphLineage.target.ipfsHash, + graftBases: subgraphLineage.bases, + }) + logger.debug('Attempting to deploy first viable base for grafted subgraph.') + + // Fetch the deployment status for all graft bases. + const lineageDeploymentStatus = await queryGraftBaseStatuses( + subgraphLineage, + graphNode, + logger, + ) + + // Inspect if we need to deploy or remove a sufficiently synced graft base. + const deploymentDecisions = determineSubgraphDeploymentDecisions( + lineageDeploymentStatus, + ) + for (const deploymentDecision of deploymentDecisions) { + switch (deploymentDecision.kind) { + case SubgraphDeploymentDecisionKind.DEPLOY: { + // Create an offchain deployment rule for this subgraph. + await createIndexingRuleForGraftBase( + deploymentDecision, + protocolNetwork, + indexerManagement, + logger, + ) + // Deploy the graft base subgraph. + const subgraphName = `indexer-agent/${deploymentDecision.deployment.ipfsHash.slice( + -10, + )}` + logger.info(`Graft Base subgraph deployment`, { + name: subgraphName, + deployment: deploymentDecision.deployment.display, + }) + await graphNode.ensure(subgraphName, deploymentDecision.deployment) + break + } + case SubgraphDeploymentDecisionKind.REMOVE: + await deleteTemporaryIndexingRule( + deploymentDecision, + protocolNetwork, + indexerManagement, + logger, + ) + break + + default: + throw new Error('Unknown deployment decision') + } + } +} + +async function createIndexingRuleForGraftBase( + deploymentDecision: GraftBaseDeploymentDecision, + protocolNetwork: string, + indexerManagement: Client, + logger: LoggerInterface, +): Promise { + const rule: Partial = { + identifier: deploymentDecision.deployment.ipfsHash, + identifierType: SubgraphIdentifierType.DEPLOYMENT, + decisionBasis: IndexingDecisionBasis.OFFCHAIN, + custom: JSON.stringify({ + type: 'graftBase', + targetDeployment: deploymentDecision.deployment.ipfsHash, + block: deploymentDecision.expectedBlockHeight, + }), + protocolNetwork, + } + try { + const result = await indexerManagement + .mutation(SET_INDEXING_RULE_MUTATION, { rule }) + .toPromise() + if (result.error) { + throw result.error + } + logger.debug('Created temporary offchain indexing rule for graft base.', { + deploymentDecision, + }) + } catch (indexerManagementError) { + const error = indexerError(IndexerErrorCode.IE075, indexerManagementError) + logger.warn( + 'Failed to create a temporary offchain indexing rule to support a graft base deployment.', + { error, deploymentDecision }, + ) + throw error + } +} + +// Delete an IndexingRule from the database by querying its ID and checking the 'custom' +// field to ensure it is a temporary rule. +async function deleteTemporaryIndexingRule( + deploymentDecision: GraftBaseDeploymentDecision, + protocolNetwork: string, + indexerManagement: Client, + parentLogger: LoggerInterface, +): Promise { + const identifier: IndexingRuleIdentifier = { + identifier: deploymentDecision.deployment.ipfsHash, + protocolNetwork, + } + const logger = parentLogger.child({ + identifier, + deploymentDecision, + }) + + // Query indexing management client for a indexing rule matching the IPFS hash of this + // subgraph deployment. + const indexingRule = await queryIndexingRule(identifier, indexerManagement, logger) + + if (!indexingRule) { + logger.warn( + 'Failed to find the temporary offchain indexing rule that supported a graft base deployment.', + ) + return + } + + // Check if this is a temporary indexing rule. We should not remove it if there's no + // grafting information stored in its 'custom' field. + const safeToRemove = checkTemporaryIndexingRule( + indexingRule, + deploymentDecision.deployment.ipfsHash, + ) + if (!safeToRemove) { + logger.info( + 'Found indexing rule that was used to support a graft base deployment, ' + + 'but it is not safe to remove it as it might still be in use', + ) + return + } + + // Remove the indexing rule + const deleteResult = await indexerManagement + .mutation(DELETE_INDEXING_RULE_MUTATION, { identifier }) + .toPromise() + if (deleteResult.error) { + throw deleteResult.error + } +} + +type TemporaryIndexingRuleTag = Pick + +async function queryIndexingRule( + identifier: IndexingRuleIdentifier, + indexerManagement: Client, + logger: LoggerInterface, +): Promise { + try { + const result = await indexerManagement + .query(GET_INDEXING_RULE_QUERY, { + identifier, + }) + .toPromise() + if (result.error) { + throw result.error + } + return result.data.indexingRule + } catch (indexerManagementError) { + const error = indexerError(IndexerErrorCode.IE075, indexerManagementError) + logger.warn('Failed to query a temporary offchain indexing rule for its removal.', { + error, + }) + throw error + } +} + +// Returns true if we can identify a tag in this IndexingRule indicating that it is +// temporary, created for the expected graft base deployment. +function checkTemporaryIndexingRule( + rule: TemporaryIndexingRuleTag, + expectedSubgraphDeployment: string, +): boolean { + // Check if we have a string in the 'custom' field. + if (!rule.custom || typeof rule.custom !== 'string') { + return false + } + // Check if that string is a JSON. + let tag + try { + tag = JSON.parse(rule.custom) + } catch (error) { + return false + } + if (!tag || typeof tag !== 'object') { + return false + } + if (tag.type === 'graftBase' && tag.targetDeployment === expectedSubgraphDeployment) { + return true + } + return false +} diff --git a/packages/indexer-common/src/subgraphs.ts b/packages/indexer-common/src/subgraphs.ts index df842ee18..e01e207dd 100644 --- a/packages/indexer-common/src/subgraphs.ts +++ b/packages/indexer-common/src/subgraphs.ts @@ -1,7 +1,12 @@ import { base58 } from 'ethers/lib/utils' import { BigNumber, utils } from 'ethers' import { Logger, SubgraphDeploymentID } from '@graphprotocol/common-ts' -import { SubgraphDeployment, SubgraphManifest, SubgraphManifestSchema } from './types' +import { + LoggerInterface, + SubgraphDeployment, + SubgraphManifest, + SubgraphManifestSchema, +} from './types' import { INDEXING_RULE_GLOBAL, IndexingDecisionBasis, @@ -335,13 +340,6 @@ export interface ProviderInterface { } /* eslint-disable @typescript-eslint/no-explicit-any */ -export interface LoggerInterface { - trace(msg: string, o?: object, ...args: any[]): void - error(msg: string, o?: object, ...args: any[]): void - warn(msg: string, o?: object, ...args: any[]): void - child(bindings: Record): Logger -} - export interface SubgraphQueryInterface { query( query: DocumentNode, diff --git a/packages/indexer-common/src/types.ts b/packages/indexer-common/src/types.ts index d5896f05b..ff2ee5d55 100644 --- a/packages/indexer-common/src/types.ts +++ b/packages/indexer-common/src/types.ts @@ -121,14 +121,25 @@ export const SubgraphManifestSchema = z.object({ export type SubgraphManifest = z.infer +// This enum should aim to match Graph-Node indexer endpoint routes. export enum SubgraphDeploymentDecisionKind { - CREATE = 'create', + // CREATE = 'create', // We don't have any use for the CREATE variant, for now. DEPLOY = 'deploy', REMOVE = 'remove', - // Possible new members: PAUSE, DROP, NOOP } export interface SubgraphDeploymentDecision { deployment: SubgraphDeploymentID - deploymentDecision: SubgraphDeploymentDecisionKind + kind: SubgraphDeploymentDecisionKind } + +/* eslint-disable @typescript-eslint/no-explicit-any */ +export interface LoggerInterface { + trace(msg: string, o?: object, ...args: any[]): void + debug(msg: string, o?: object, ...args: any[]): void + info(msg: string, o?: object, ...args: any[]): void + warn(msg: string, o?: object, ...args: any[]): void + error(msg: string, o?: object, ...args: any[]): void + child(bindings: Record): LoggerInterface +} +/* eslint-disable @typescript-eslint/no-explicit-any */