Skip to content

Commit

Permalink
common, agent: logging improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
tilacog committed Nov 9, 2023
1 parent 03e8b19 commit 7631f2f
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 39 deletions.
17 changes: 13 additions & 4 deletions packages/indexer-agent/src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
networkIsL2,
networkIsL1,
DeploymentManagementMode,
deploySubgraph,
} from '@graphprotocol/indexer-common'

import PQueue from 'p-queue'
Expand Down Expand Up @@ -195,6 +196,7 @@ export class Agent {
offchainSubgraphs: SubgraphDeploymentID[]
autoMigrationSupport: boolean
deploymentManagement: DeploymentManagementMode
ipfsURL: URL

constructor(
logger: Logger,
Expand All @@ -206,6 +208,7 @@ export class Agent {
offchainSubgraphs: SubgraphDeploymentID[],
autoMigrationSupport: boolean,
deploymentManagement: DeploymentManagementMode,
ipfsURL: URL,
) {
this.logger = logger.child({ component: 'Agent' })
this.metrics = metrics
Expand All @@ -215,6 +218,7 @@ export class Agent {
this.offchainSubgraphs = offchainSubgraphs
this.autoMigrationSupport = !!autoMigrationSupport
this.deploymentManagement = deploymentManagement
this.ipfsURL = ipfsURL
}

async start(): Promise<Agent> {
Expand Down Expand Up @@ -986,10 +990,15 @@ export class Agent {
deployment: deployment.display,
})

// Ensure the deployment is deployed to the indexer
// Note: we're not waiting here, as sometimes indexing a subgraph
// will block if the IPFS files cannot be retrieved
await this.graphNode.ensure(name, deployment)
await deploySubgraph(
name,
deployment,
this.graphNode,
this.ipfsURL,
this.indexerManagement,
'eip155:5',
this.logger,
)
}),
)

Expand Down
13 changes: 1 addition & 12 deletions packages/indexer-agent/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,18 +226,6 @@ export const start = {
default: 100,
group: 'Query Fees',
})
.option('ipfs-endpoint', {
description: `Endpoint to an ipfs node to quickly query subgraph manifest data`,
type: 'string',
default: 'https://ipfs.network.thegraph.com',
group: 'Indexer Infrastructure',
})
.option('auto-graft-resolver-limit', {
description: `Maximum depth of grafting dependency to automatically resolve`,
type: 'number',
default: 0,
group: 'Indexer Infrastructure',
})
.option('inject-dai', {
description:
'Inject the GRT to DAI/USDC conversion rate into cost model variables',
Expand Down Expand Up @@ -648,6 +636,7 @@ export async function run(
argv.offchainSubgraphs.map((s: string) => new SubgraphDeploymentID(s)),
argv.enableAutoMigrationSupport,
argv.deploymentManagement,
argv.ipfsEndpoint,
)
await agent.start()
}
Expand Down
20 changes: 11 additions & 9 deletions packages/indexer-common/src/__tests__/grafting.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import {
} from '../grafting'
import { SubgraphDeploymentID } from '@graphprotocol/common-ts'
import { IndexerErrorCode } from '../errors'
import { SubgraphDeploymentDecisionKind } from '../types'
import { LoggerInterface, SubgraphDeploymentDecisionKind } from '../types'

// Create a mock for the fetchSubgraphManifest function
const fakeSubgraphManifestResolver = jest.fn()

const fakeLogger = jest.fn() as unknown as LoggerInterface

// Fake IPFS Hashes:
const target = 'QmWaVSK24D1m53Ej2PaddWcb1HZKAV4bjiKkrUwtP3HrZX'
const base1 = 'QmWaVSK24D1m53Ej2PaddWcb1HZKAV4bjiKkrUwtP3HrYj'
Expand Down Expand Up @@ -93,7 +95,7 @@ describe('determineSubgraphDeploymentDecisions function', () => {

let threwError = false
try {
determineSubgraphDeploymentDecisions(subgraphLineage)
determineSubgraphDeploymentDecisions(subgraphLineage, fakeLogger)
} catch (err) {
expect(err.code).toStrictEqual(IndexerErrorCode.IE075)
expect(err.cause).toStrictEqual(
Expand Down Expand Up @@ -122,7 +124,7 @@ describe('determineSubgraphDeploymentDecisions function', () => {
},
],
}
expect(determineSubgraphDeploymentDecisions(subgraphLineage)).toEqual([])
expect(determineSubgraphDeploymentDecisions(subgraphLineage, fakeLogger)).toEqual([])
})

test('should throw an error if an unsynced base is unhealthy', () => {
Expand All @@ -146,7 +148,7 @@ describe('determineSubgraphDeploymentDecisions function', () => {

let threwError = false
try {
determineSubgraphDeploymentDecisions(subgraphLineage)
determineSubgraphDeploymentDecisions(subgraphLineage, fakeLogger)
} catch (err) {
expect(err.code).toStrictEqual(IndexerErrorCode.IE075)
expect(err.cause).toStrictEqual({
Expand All @@ -169,7 +171,7 @@ describe('determineSubgraphDeploymentDecisions function', () => {
},
],
}
const decisions = determineSubgraphDeploymentDecisions(subgraphLineage)
const decisions = determineSubgraphDeploymentDecisions(subgraphLineage, fakeLogger)
const expected = [
{
deployment: new SubgraphDeploymentID(base1),
Expand All @@ -196,7 +198,7 @@ describe('determineSubgraphDeploymentDecisions function', () => {
},
],
}
const decisions = determineSubgraphDeploymentDecisions(subgraphLineage)
const decisions = determineSubgraphDeploymentDecisions(subgraphLineage, fakeLogger)
const expected = [
{
deployment: new SubgraphDeploymentID(base2),
Expand Down Expand Up @@ -224,7 +226,7 @@ describe('determineSubgraphDeploymentDecisions function', () => {
},
],
}
const decisions = determineSubgraphDeploymentDecisions(subgraphLineage)
const decisions = determineSubgraphDeploymentDecisions(subgraphLineage, fakeLogger)
const expected = [
{
deployment: new SubgraphDeploymentID(base1),
Expand Down Expand Up @@ -268,7 +270,7 @@ describe('determineSubgraphDeploymentDecisions function', () => {
},
],
}
const decisions = determineSubgraphDeploymentDecisions(subgraphLineage)
const decisions = determineSubgraphDeploymentDecisions(subgraphLineage, fakeLogger)
const expected = [
{
deployment: new SubgraphDeploymentID(base3),
Expand Down Expand Up @@ -332,7 +334,7 @@ describe('determineSubgraphDeploymentDecisions function', () => {
},
],
}
const decisions = determineSubgraphDeploymentDecisions(subgraphLineage)
const decisions = determineSubgraphDeploymentDecisions(subgraphLineage, fakeLogger)
const expected = [
{
deployment: new SubgraphDeploymentID(base4),
Expand Down
43 changes: 40 additions & 3 deletions packages/indexer-common/src/grafting.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,15 @@ export async function getIndexingStatus(

export function determineSubgraphDeploymentDecisions(
subgraphLineage: SubgraphLineageWithStatus,
parentLogger: LoggerInterface,
): GraftBaseDeploymentDecision[] {
const deploymentDecisions: GraftBaseDeploymentDecision[] = []

const logger = parentLogger.child({
function: 'determineSubgraphDeploymentDecisions',
subgraphLineage: formatLineage(subgraphLineage),
})

// Check lineage size before making any assumptions.
if (!subgraphLineage.bases.length) {
throw indexerError(
Expand All @@ -143,10 +149,17 @@ export function determineSubgraphDeploymentDecisions(
for (let i = lastIndex; i >= 0; i--) {
const graft = subgraphLineage.bases[i]
const desiredBlockHeight = graft.block
const traceLogger = logger.child({
graft: formatDeploymentContainer(graft),
earliestValidBaseIndex,
graftIndex: i,
})
traceLogger.trace('Inspecting graft base deployment status')
if (!graft.indexingStatus || !graft.indexingStatus.latestBlock) {
// Ignore undeployed graft bases beyond the earliest valid one.
if (i <= earliestValidBaseIndex) {
// Graph Node is not aware of this subgraph deployment. We must deploy it and look no further.
traceLogger.trace('Deploy candidate found')
deploymentDecisions.push({
deployment: graft.deployment,
kind: SubgraphDeploymentDecisionKind.DEPLOY,
Expand All @@ -157,7 +170,9 @@ export function determineSubgraphDeploymentDecisions(
} else {
// Deployment exists.
// Is it sufficiently synced?
traceLogger.trace('Deployment exists')
if (graft.indexingStatus.latestBlock.number >= desiredBlockHeight) {
traceLogger.trace('Deployment is sufficiently synced')
// If so, we can stop syncing it.
deploymentDecisions.push({
deployment: graft.deployment,
Expand Down Expand Up @@ -187,9 +202,6 @@ export async function queryGraftBaseStatuses(
concurrency: number = 5,
): Promise<SubgraphLineageWithStatus> {
const logger = parentLogger.child({ function: 'queryGraftBaseStatuses' })
logger.debug('Attempting to resolve graft bases for target subgraph')

// Fetch deployment details for each graft base
logger.debug('Querying Graph-Node for graft bases deployment status')
const graftBasesDeploymentStatus = await pMap(
subgraphLineage.bases,
Expand All @@ -199,9 +211,34 @@ export async function queryGraftBaseStatuses(
concurrency,
},
)
logger.trace('Got graft bases deployment statuses from Graph-Node', {
graftBasesDeploymentStatus: formatDeploymentContainers(graftBasesDeploymentStatus),
})

return {
target: subgraphLineage.target,
bases: graftBasesDeploymentStatus,
}
}

// Logging utilities

export function formatDeploymentContainer(base: { deployment: SubgraphDeploymentID }) {
return {
...base,
deployment: base.deployment.display,
}
}

export function formatDeploymentContainers(
bases: { deployment: SubgraphDeploymentID }[],
): any {
return bases.map(formatDeploymentContainer)
}

export function formatLineage(lineage: SubgraphLineageWithStatus): any {
return {
target: lineage.target.display,
bases: formatDeploymentContainers(lineage.bases),
}
}
9 changes: 5 additions & 4 deletions packages/indexer-common/src/graph-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ export class GraphNode implements GraphNodeInterface {

async indexNodes(): Promise<indexNode[]> {
try {
this.logger.trace(`Querying indexing statuses`)
this.logger.trace(`Querying index nodes`)
const result = await this.status
.query(gql`
{
Expand All @@ -205,7 +205,7 @@ export class GraphNode implements GraphNodeInterface {
throw result.error
}

this.logger.trace(`Queried indexing statuses`, {
this.logger.trace(`Queried index nodes`, {
data: result.data,
})

Expand Down Expand Up @@ -461,7 +461,6 @@ export class GraphNode implements GraphNodeInterface {
} catch (error) {
throw indexerError(IndexerErrorCode.IE018, error)
}

return (
result.data.indexingStatuses
// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand All @@ -473,7 +472,7 @@ export class GraphNode implements GraphNodeInterface {
}

try {
return await pRetry(queryIndexingStatuses, {
const indexingStatuses = await pRetry(queryIndexingStatuses, {
retries: 5,
maxTimeout: 10000,
onFailedAttempt: (err) => {
Expand All @@ -485,6 +484,8 @@ export class GraphNode implements GraphNodeInterface {
})
},
} as pRetry.Options)
this.logger.trace('Queried indexing statuses', indexingStatuses)
return indexingStatuses
} catch (error) {
const err = indexerError(IndexerErrorCode.IE018, error)
this.logger.error(`Failed to query indexing status API`, {
Expand Down
18 changes: 12 additions & 6 deletions packages/indexer-common/src/subgraph-deployment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import {
SubgraphLineage,
queryGraftBaseStatuses,
GraftBaseDeploymentDecision,
formatGraftBases,
formatGraftBase,
} from './grafting'
import { SubgraphIdentifierType, fetchSubgraphManifest } from './subgraphs'
import {
Expand Down Expand Up @@ -55,9 +57,9 @@ export async function deploySubgraph(
// Inspect target subgraph's grafting lineage.
const logger = parentLogger.child({
function: 'deploySubgraph',
subgraphDeployment,
subgraphDeployment: subgraphDeployment.display,
})
logger.debug('Attempting to resolve graft bases for target subgraph.')
logger.debug('Resolving graft bases for target subgraph deployment.')
const subgraphManifestResolver = async (subgraphID: SubgraphDeploymentID) =>
await fetchSubgraphManifest(ipfsURL, subgraphID, logger)
const subgraphLineage = await discoverLineage(
Expand Down Expand Up @@ -90,10 +92,13 @@ async function deployGraftedSubgraph(
): Promise<void> {
const logger = parentLogger.child({
function: 'deployGraftedSubgraph',
targetSubgraph: subgraphLineage.target.ipfsHash,
graftBases: subgraphLineage.bases,
targetSubgraph: subgraphLineage.target.display,
graftBases: formatGraftBases(subgraphLineage.bases),
})
logger.debug('Attempting to deploy first viable base for grafted subgraph.')
logger.debug(
'Target subgraph deployment has graft bases. ' +
'Deploying first viable base for grafted subgraph.',
)

// Fetch the deployment status for all graft bases.
const lineageDeploymentStatus = await queryGraftBaseStatuses(
Expand All @@ -105,6 +110,7 @@ async function deployGraftedSubgraph(
// Inspect if we need to deploy or remove a sufficiently synced graft base.
const deploymentDecisions = determineSubgraphDeploymentDecisions(
lineageDeploymentStatus,
logger,
)
for (const deploymentDecision of deploymentDecisions) {
switch (deploymentDecision.kind) {
Expand Down Expand Up @@ -193,7 +199,7 @@ async function deleteTemporaryIndexingRule(
}
const logger = parentLogger.child({
identifier,
deploymentDecision,
deploymentDecision: formatGraftBase(deploymentDecision),
})

// Query indexing management client for a indexing rule matching the IPFS hash of this
Expand Down
2 changes: 1 addition & 1 deletion packages/indexer-common/src/subgraphs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ export async function fetchSubgraphManifest(

const logger = parentLogger.child({
function: 'fetchSubgraphManifest',
subgraphDeployment: SubgraphDeploymentID,
targetDeployment: targetDeployment.display,
subgraphManifestSearchURL,
})

Expand Down

0 comments on commit 7631f2f

Please sign in to comment.