Skip to content

Commit

Permalink
refactor: allocations eventual, allocation signer, sync interval
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Dec 7, 2023
1 parent 2374fce commit 72dee1b
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 151 deletions.
11 changes: 10 additions & 1 deletion packages/indexer-common/src/allocations/keys.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ const deriveKeyPair = (
}
}

export const allocationSigner = (wallet: Wallet, allocation: Allocation): string => {
// Returns the private key of allocation signer
export const allocationSignerPrivateKey = (
wallet: Wallet,
allocation: Allocation,
): string => {
const hdNode = utils.HDNode.fromMnemonic(wallet.mnemonic.phrase)

// The allocation was either created at the epoch it intended to or one
Expand All @@ -38,6 +42,11 @@ export const allocationSigner = (wallet: Wallet, allocation: Allocation): string
)
}

// Returns allocation signer wallet
export const allocationSigner = (wallet: Wallet, allocation: Allocation): Signer => {
return new Wallet(allocationSignerPrivateKey(wallet, allocation))
}

/**
* Derive an allocation ID that is specific to the current epoch,
* the deployment ID, the indexer's private key AND that doesn't
Expand Down
152 changes: 150 additions & 2 deletions packages/indexer-common/src/allocations/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import { Address } from '@graphprotocol/common-ts'
import { BigNumber } from 'ethers'
import { SubgraphDeployment } from '@graphprotocol/indexer-common'
import {
indexerError,
IndexerErrorCode,
NetworkSubgraph,
SubgraphDeployment,
parseGraphQLAllocation,
} from '@graphprotocol/indexer-common'
import gql from 'graphql-tag'

import { Logger, Eventual, timer, Address } from '@graphprotocol/common-ts'

export interface Allocation {
id: Address
Expand All @@ -26,3 +34,143 @@ export enum AllocationStatus {
FINALIZED = 'Finalized',
CLAIMED = 'Claimed',
}

export interface MonitorEligibleAllocationsOptions {
indexer: Address
logger: Logger
networkSubgraph: NetworkSubgraph
interval: number
protocolNetwork: string
}

export const monitorEligibleAllocations = ({
indexer,
logger: parentLogger,
networkSubgraph,
protocolNetwork,
interval,
}: MonitorEligibleAllocationsOptions): Eventual<Allocation[]> => {
const logger = parentLogger.child({ component: 'AllocationMonitor' })

const refreshAllocations = async (
currentAllocations: Allocation[],
): Promise<Allocation[]> => {
logger.debug('Refresh eligible allocations', {
protocolNetwork,
})

try {
const currentEpochResult = await networkSubgraph.query(gql`
query {
graphNetwork(id: "1") {
currentEpoch
}
}
`)
if (currentEpochResult.error) {
throw currentEpochResult.error
}

if (
!currentEpochResult.data ||
!currentEpochResult.data.graphNetwork ||
!currentEpochResult.data.graphNetwork.currentEpoch
) {
throw new Error(`Failed to fetch current epoch from network subgraph`)
}

const currentEpoch = currentEpochResult.data.graphNetwork.currentEpoch

const result = await networkSubgraph.query(
gql`
query allocations($indexer: String!, $closedAtEpochThreshold: Int!) {
indexer(id: $indexer) {
activeAllocations: totalAllocations(
where: { status: Active }
orderDirection: desc
first: 1000
) {
id
indexer {
id
}
allocatedTokens
createdAtBlockHash
createdAtEpoch
closedAtEpoch
subgraphDeployment {
id
stakedTokens
signalledTokens
queryFeesAmount
}
}
recentlyClosedAllocations: totalAllocations(
where: { status: Closed, closedAtEpoch_gte: $closedAtEpochThreshold }
orderDirection: desc
first: 1000
) {
id
indexer {
id
}
allocatedTokens
createdAtBlockHash
createdAtEpoch
closedAtEpoch
subgraphDeployment {
id
stakedTokens
signalledTokens
queryFeesAmount
}
}
}
}
`,
{
indexer: indexer.toLowerCase(),
closedAtEpochThreshold: currentEpoch - 1, // allocation can be closed within the last epoch or later
},
)

if (result.error) {
throw result.error
}

if (!result.data) {
throw new Error(`No data / indexer not found on chain`)
}

if (!result.data.indexer) {
throw new Error(`Indexer not found on chain`)
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
return [
...result.data.indexer.activeAllocations,
...result.data.indexer.recentlyClosedAllocations,
].map((x) => parseGraphQLAllocation(x, protocolNetwork))
} catch (err) {
logger.warn(`Failed to query indexer allocations, keeping existing`, {
allocations: currentAllocations.map((allocation) => allocation.id),
err: indexerError(IndexerErrorCode.IE010, err),
})
return currentAllocations
}
}

const allocations = timer(interval).reduce(refreshAllocations, [])

allocations.pipe((allocations) => {
logger.info(`Eligible allocations`, {
allocations: allocations.map((allocation) => ({
allocation: allocation.id,
deployment: allocation.subgraphDeployment.id.display,
closedAtEpoch: allocation.closedAtEpoch,
})),
})
})

return allocations
}
1 change: 1 addition & 0 deletions packages/indexer-common/src/network-specification.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ export const NetworkSpecification = z
transactionMonitoring: TransactionMonitoring,
subgraphs: ProtocolSubgraphs,
networkProvider: NetworkProvider,
allocationSyncInterval: positiveNumber().default(120000),
dai: Dai,
})
.strict()
Expand Down
151 changes: 4 additions & 147 deletions packages/indexer-service/src/allocations.ts
Original file line number Diff line number Diff line change
@@ -1,160 +1,17 @@
import gql from 'graphql-tag'
import pMap from 'p-map'
import { Wallet } from 'ethers'
import { NativeAttestationSigner } from '@graphprotocol/indexer-native'

import { Logger, Eventual, timer, Address } from '@graphprotocol/common-ts'
import { Logger, Eventual } from '@graphprotocol/common-ts'

import { LRUCache } from '@thi.ng/cache'
import {
Allocation,
allocationSigner,
allocationSignerPrivateKey,
indexerError,
IndexerErrorCode,
NetworkSubgraph,
parseGraphQLAllocation,
} from '@graphprotocol/indexer-common'

export interface MonitorEligibleAllocationsOptions {
indexer: Address
logger: Logger
networkSubgraph: NetworkSubgraph
interval: number
protocolNetwork: string
}

export const monitorEligibleAllocations = ({
indexer,
logger: parentLogger,
networkSubgraph,
protocolNetwork,
interval,
}: MonitorEligibleAllocationsOptions): Eventual<Allocation[]> => {
const logger = parentLogger.child({ component: 'AllocationMonitor' })

const refreshAllocations = async (
currentAllocations: Allocation[],
): Promise<Allocation[]> => {
logger.debug('Refresh eligible allocations', {
protocolNetwork,
})

try {
const currentEpochResult = await networkSubgraph.query(gql`
query {
graphNetwork(id: "1") {
currentEpoch
}
}
`)
if (currentEpochResult.error) {
throw currentEpochResult.error
}

if (
!currentEpochResult.data ||
!currentEpochResult.data.graphNetwork ||
!currentEpochResult.data.graphNetwork.currentEpoch
) {
throw new Error(`Failed to fetch current epoch from network subgraph`)
}

const currentEpoch = currentEpochResult.data.graphNetwork.currentEpoch

const result = await networkSubgraph.query(
gql`
query allocations($indexer: String!, $closedAtEpochThreshold: Int!) {
indexer(id: $indexer) {
activeAllocations: totalAllocations(
where: { status: Active }
orderDirection: desc
first: 1000
) {
id
indexer {
id
}
allocatedTokens
createdAtBlockHash
createdAtEpoch
closedAtEpoch
subgraphDeployment {
id
stakedTokens
signalledTokens
queryFeesAmount
}
}
recentlyClosedAllocations: totalAllocations(
where: { status: Closed, closedAtEpoch_gte: $closedAtEpochThreshold }
orderDirection: desc
first: 1000
) {
id
indexer {
id
}
allocatedTokens
createdAtBlockHash
createdAtEpoch
closedAtEpoch
subgraphDeployment {
id
stakedTokens
signalledTokens
queryFeesAmount
}
}
}
}
`,
{
indexer: indexer.toLowerCase(),
closedAtEpochThreshold: currentEpoch - 1, // allocation can be closed within the last epoch or later
},
)

if (result.error) {
throw result.error
}

if (!result.data) {
throw new Error(`No data / indexer not found on chain`)
}

if (!result.data.indexer) {
throw new Error(`Indexer not found on chain`)
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
return [
...result.data.indexer.activeAllocations,
...result.data.indexer.recentlyClosedAllocations,
].map(x => parseGraphQLAllocation(x, protocolNetwork))
} catch (err) {
logger.warn(`Failed to query indexer allocations, keeping existing`, {
allocations: currentAllocations.map(allocation => allocation.id),
err: indexerError(IndexerErrorCode.IE010, err),
})
return currentAllocations
}
}

const allocations = timer(interval).reduce(refreshAllocations, [])

allocations.pipe(allocations => {
logger.info(`Eligible allocations`, {
allocations: allocations.map(allocation => ({
allocation: allocation.id,
deployment: allocation.subgraphDeployment.id.display,
closedAtEpoch: allocation.closedAtEpoch,
})),
})
})

return allocations
}

export interface EnsureAttestationSignersOptions {
logger: Logger
allocations: Eventual<Allocation[]>
Expand Down Expand Up @@ -195,11 +52,11 @@ export const ensureAttestationSigners = ({
})

// Derive an epoch and subgraph specific private key
const signer = allocationSigner(wallet, allocation)
const signerPK = allocationSignerPrivateKey(wallet, allocation)
const nativeSigner = new NativeAttestationSigner(
chainId,
disputeManagerAddress,
signer,
signerPK,
allocation.subgraphDeployment.id.bytes32,
)

Expand Down
3 changes: 2 additions & 1 deletion packages/indexer-service/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
indexerError,
IndexerErrorCode,
GraphNode,
monitorEligibleAllocations,
Network,
NetworkSubgraph,
registerIndexerErrorMetrics,
Expand All @@ -31,7 +32,7 @@ import {

import { createServer } from '../server'
import { QueryProcessor } from '../queries'
import { ensureAttestationSigners, monitorEligibleAllocations } from '../allocations'
import { ensureAttestationSigners } from '../allocations'
import { AllocationReceiptManager } from '../query-fees'
import pRetry from 'p-retry'

Expand Down

0 comments on commit 72dee1b

Please sign in to comment.