Skip to content

Commit

Permalink
feat: trying lightweight list batch update
Browse files Browse the repository at this point in the history
  • Loading branch information
juanmardefago committed May 24, 2024
1 parent 234cde4 commit 0cfa7e1
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 112 deletions.
14 changes: 4 additions & 10 deletions schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,9 @@ type Indexer @entity {
"Delegators to this Indexer"
delegators: [DelegatedStake!]! @derivedFrom(field: "indexer")

"Delegator relation entities. For internal usage for performance reasons"
relations: [IndexerDelegatedStakeRelation!]! @derivedFrom(field: "indexer")

"CURRENT tokens delegated to the indexer"
delegatedTokens: BigInt!

Expand Down Expand Up @@ -1071,16 +1074,7 @@ type IndexerDelegatedStakeRelation @entity {
id: Bytes!

"Indexer entity where the delegation resides"
indexer: Indexer!

"Delegator entity owner of said delegation stake"
delegator: Delegator!

"DelegatedStake entity that represents the delegation"
stake: DelegatedStake!

"Whether the delegation is active or not. Useful to avoid updating non-active delegations without deleting the entity."
active: Boolean!
indexer: Indexer
}

type IndexerDailyData @entity {
Expand Down
4 changes: 0 additions & 4 deletions src/mappings/gns.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,6 @@ export function handleSubgraphPublished(event: SubgraphPublished): void {
subgraphVersion.subgraphDeployment = subgraphDeploymentID
subgraphVersion.version = versionNumber.toI32()
subgraphVersion.createdAt = event.block.timestamp.toI32()
let hexHash = changetype<Bytes>(addQm(event.params.versionMetadata))
let base58Hash = hexHash.toBase58()
subgraphVersion.metadataHash = event.params.versionMetadata
subgraphVersion.save()
}
Expand Down Expand Up @@ -637,8 +635,6 @@ export function handleSubgraphVersionUpdated(event: SubgraphVersionUpdated): voi
subgraphVersion.subgraphDeployment = subgraphDeploymentID
subgraphVersion.version = versionNumber.toI32()
subgraphVersion.createdAt = event.block.timestamp.toI32()
let hexHash = changetype<Bytes>(addQm(event.params.versionMetadata))
let base58Hash = hexHash.toBase58()
subgraphVersion.metadataHash = event.params.versionMetadata
subgraphVersion.save()
}
Expand Down
163 changes: 69 additions & 94 deletions src/mappings/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,6 @@ export function createOrLoadDelegatedStake(

if (delegatedStake == null) {
let indexerEntity = Indexer.load(indexer)!
let relationId = compoundId(
indexer,
changetype<Bytes>(Bytes.fromBigInt(indexerEntity.delegatorsCount)),
)
delegatedStake = new DelegatedStake(id)
delegatedStake.indexer = indexer
delegatedStake.delegator = delegator.id
Expand All @@ -236,15 +232,11 @@ export function createOrLoadDelegatedStake(
delegatedStake.originalDelegation = BIGDECIMAL_ZERO
delegatedStake.currentDelegation = BIGDECIMAL_ZERO
delegatedStake.createdAt = timestamp
delegatedStake.relation = relationId
delegatedStake.relation = id
delegatedStake.save()

let relation = new IndexerDelegatedStakeRelation(relationId)

relation.indexer = indexerEntity.id
relation.stake = delegatedStake.id
relation.delegator = delegator.id
relation.active = true
let relation = new IndexerDelegatedStakeRelation(id) // minimal amount of data to avoid running out of memory
relation.indexer = indexerEntity.id // only needed for field derivation and active status
relation.save()

indexerEntity.delegatorsCount = indexerEntity.delegatorsCount.plus(BIGINT_ONE)
Expand Down Expand Up @@ -623,37 +615,35 @@ export function batchUpdateDelegatorsForIndexer(indexerId: Bytes, timestamp: Big
// Loading it again here to make sure we have the latest up to date data on the entity.
let indexer = Indexer.load(indexerId)!
// pre-calculates a lot of data for all delegators that exists for a specific indexer
// using already existing links with the indexer-delegatedStake relations
for (let i = 0; i < indexer.delegatorsCount.toI32(); i++) {
let relationId = compoundId(indexer.id, changetype<Bytes>(Bytes.fromBigInt(BigInt.fromString(i.toString()))))
let relation = IndexerDelegatedStakeRelation.load(relationId)!
if (relation.active) {
let delegatedStake = DelegatedStake.load(relation.stake)!
let delegator = Delegator.load(delegatedStake.delegator)!
// Only update core entities if there's a change in the exchange rate
if (delegatedStake.latestIndexerExchangeRate != indexer.delegationExchangeRate) {
let oldUnrealizedRewards = delegatedStake.unrealizedRewards

delegatedStake.latestIndexerExchangeRate = indexer.delegationExchangeRate
delegatedStake.currentDelegation =
delegatedStake.latestIndexerExchangeRate * delegatedStake.shareAmount.toBigDecimal()
delegatedStake.unrealizedRewards = avoidNegativeRoundingError(
delegatedStake.currentDelegation - delegatedStake.originalDelegation,
)
delegatedStake.save()

let diffUnrealized = delegatedStake.unrealizedRewards - oldUnrealizedRewards

delegator.totalUnrealizedRewards = avoidNegativeRoundingError(
delegator.totalUnrealizedRewards.plus(diffUnrealized),
)
delegator.currentDelegation = delegator.currentDelegation.plus(diffUnrealized)
delegator.save()
}

getAndUpdateDelegatedStakeDailyData(delegatedStake as DelegatedStake, timestamp)
getAndUpdateDelegatorDailyData(delegator as Delegator, timestamp)
// uses lightweight relation entity to derive the full list. hopefully it doesn't run out of memory on big deleg count indexers
let relations = indexer.relations.load()

for (let i = 0; i < relations.length; i++) {
let delegatedStake = DelegatedStake.load(relations[i].id)!
let delegator = Delegator.load(delegatedStake.delegator)!
// Only update core entities if there's a change in the exchange rate
if (delegatedStake.latestIndexerExchangeRate != indexer.delegationExchangeRate) {
let oldUnrealizedRewards = delegatedStake.unrealizedRewards

delegatedStake.latestIndexerExchangeRate = indexer.delegationExchangeRate
delegatedStake.currentDelegation =
delegatedStake.latestIndexerExchangeRate.times(delegatedStake.shareAmount.toBigDecimal())
delegatedStake.unrealizedRewards = avoidNegativeRoundingError(
delegatedStake.currentDelegation.minus(delegatedStake.originalDelegation),
)
delegatedStake.save()

let diffUnrealized = delegatedStake.unrealizedRewards.minus(oldUnrealizedRewards)

delegator.totalUnrealizedRewards = avoidNegativeRoundingError(
delegator.totalUnrealizedRewards.plus(diffUnrealized),
)
delegator.currentDelegation = delegator.currentDelegation.plus(diffUnrealized)
delegator.save()
}

getAndUpdateDelegatedStakeDailyData(delegatedStake as DelegatedStake, timestamp)
getAndUpdateDelegatorDailyData(delegator as Delegator, timestamp)
}
}

Expand All @@ -663,16 +653,13 @@ export function getAndUpdateNetworkDailyData(
): GraphNetworkDailyData {
let dayNumber = timestamp.toI32() / SECONDS_PER_DAY - LAUNCH_DAY
let id = compoundId(entity.id, Bytes.fromI32(dayNumber))
let dailyData = GraphNetworkDailyData.load(id)

if (dailyData == null) {
dailyData = new GraphNetworkDailyData(id)

dailyData.dayStart = BigInt.fromI32((timestamp.toI32() / SECONDS_PER_DAY) * SECONDS_PER_DAY)
dailyData.dayEnd = dailyData.dayStart.plus(BigInt.fromI32(SECONDS_PER_DAY))
dailyData.dayNumber = dayNumber
dailyData.network = entity.id
}
// not checking for previous entity since we don't want to waste a load and data will be overwritten anyways
let dailyData = new GraphNetworkDailyData(id)
dailyData.dayStart = BigInt.fromI32((timestamp.toI32() / SECONDS_PER_DAY) * SECONDS_PER_DAY)
dailyData.dayEnd = dailyData.dayStart.plus(BigInt.fromI32(SECONDS_PER_DAY))
dailyData.dayNumber = dayNumber
dailyData.network = entity.id

dailyData.delegationRatio = entity.delegationRatio
dailyData.totalTokensStaked = entity.totalTokensStaked
Expand Down Expand Up @@ -706,18 +693,15 @@ export function getAndUpdateNetworkDailyData(
export function getAndUpdateIndexerDailyData(entity: Indexer, timestamp: BigInt): IndexerDailyData {
let dayNumber = timestamp.toI32() / SECONDS_PER_DAY - LAUNCH_DAY
let id = compoundId(entity.id, Bytes.fromI32(dayNumber))
let dailyData = IndexerDailyData.load(id)

if (dailyData == null) {
dailyData = new IndexerDailyData(id)

dailyData.dayStart = BigInt.fromI32((timestamp.toI32() / SECONDS_PER_DAY) * SECONDS_PER_DAY)
dailyData.dayEnd = dailyData.dayStart.plus(BigInt.fromI32(SECONDS_PER_DAY))
dailyData.dayNumber = dayNumber
dailyData.indexer = entity.id
dailyData.netDailyDelegatedTokens = BIGINT_ZERO
dailyData.delegatorsCount = BIGINT_ZERO
}
// not checking for previous entity since we don't want to waste a load and data will be overwritten anyways
let dailyData = new IndexerDailyData(id)
dailyData.dayStart = BigInt.fromI32((timestamp.toI32() / SECONDS_PER_DAY) * SECONDS_PER_DAY)
dailyData.dayEnd = dailyData.dayStart.plus(BigInt.fromI32(SECONDS_PER_DAY))
dailyData.dayNumber = dayNumber
dailyData.indexer = entity.id
dailyData.netDailyDelegatedTokens = BIGINT_ZERO
dailyData.delegatorsCount = BIGINT_ZERO

dailyData.stakedTokens = entity.stakedTokens
dailyData.delegatedTokens = entity.delegatedTokens
Expand Down Expand Up @@ -748,20 +732,17 @@ export function getAndUpdateDelegatedStakeDailyData(
): DelegatedStakeDailyData {
let dayNumber = timestamp.toI32() / SECONDS_PER_DAY - LAUNCH_DAY
let stakeId = compoundId(stakeEntity.id, Bytes.fromI32(dayNumber))
let stakeDailyData = DelegatedStakeDailyData.load(stakeId)

if (stakeDailyData == null) {
stakeDailyData = new DelegatedStakeDailyData(stakeId)

stakeDailyData.dayStart = BigInt.fromI32(
(timestamp.toI32() / SECONDS_PER_DAY) * SECONDS_PER_DAY,
)
stakeDailyData.dayEnd = stakeDailyData.dayStart.plus(BigInt.fromI32(SECONDS_PER_DAY))
stakeDailyData.dayNumber = dayNumber
stakeDailyData.stake = stakeEntity.id
stakeDailyData.delegator = stakeEntity.delegator
stakeDailyData.indexer = stakeEntity.indexer
}

// not checking for previous entity since we don't want to waste a load and data will be overwritten anyways
let stakeDailyData = new DelegatedStakeDailyData(stakeId)
stakeDailyData.dayStart = BigInt.fromI32(
(timestamp.toI32() / SECONDS_PER_DAY) * SECONDS_PER_DAY,
)
stakeDailyData.dayEnd = stakeDailyData.dayStart.plus(BigInt.fromI32(SECONDS_PER_DAY))
stakeDailyData.dayNumber = dayNumber
stakeDailyData.stake = stakeEntity.id
stakeDailyData.delegator = stakeEntity.delegator
stakeDailyData.indexer = stakeEntity.indexer

stakeDailyData.stakedTokens = stakeEntity.stakedTokens
stakeDailyData.lockedTokens = stakeEntity.lockedTokens
Expand All @@ -784,16 +765,13 @@ export function getAndUpdateDelegatorDailyData(
): DelegatorDailyData {
let dayNumber = timestamp.toI32() / SECONDS_PER_DAY - LAUNCH_DAY
let id = compoundId(entity.id, Bytes.fromI32(dayNumber))
let dailyData = DelegatorDailyData.load(id)

if (dailyData == null) {
dailyData = new DelegatorDailyData(id)

dailyData.dayStart = BigInt.fromI32((timestamp.toI32() / SECONDS_PER_DAY) * SECONDS_PER_DAY)
dailyData.dayEnd = dailyData.dayStart.plus(BigInt.fromI32(SECONDS_PER_DAY))
dailyData.dayNumber = dayNumber
dailyData.delegator = entity.id
}

// not checking for previous entity since we don't want to waste a load and data will be overwritten anyways
let dailyData = new DelegatorDailyData(id)
dailyData.dayStart = BigInt.fromI32((timestamp.toI32() / SECONDS_PER_DAY) * SECONDS_PER_DAY)
dailyData.dayEnd = dailyData.dayStart.plus(BigInt.fromI32(SECONDS_PER_DAY))
dailyData.dayNumber = dayNumber
dailyData.delegator = entity.id

dailyData.stakesCount = entity.stakesCount
dailyData.activeStakesCount = entity.activeStakesCount
Expand All @@ -814,16 +792,13 @@ export function getAndUpdateSubgraphDeploymentDailyData(
): SubgraphDeploymentDailyData {
let dayId = timestamp.toI32() / SECONDS_PER_DAY - LAUNCH_DAY
let id = compoundId(entity.id, Bytes.fromI32(dayId))
let dailyData = SubgraphDeploymentDailyData.load(id)

if (dailyData == null) {
dailyData = new SubgraphDeploymentDailyData(id)

dailyData.dayStart = BigInt.fromI32((timestamp.toI32() / SECONDS_PER_DAY) * SECONDS_PER_DAY)
dailyData.dayEnd = dailyData.dayStart.plus(BigInt.fromI32(SECONDS_PER_DAY))
dailyData.dayNumber = dayId
dailyData.subgraphDeployment = entity.id
}
// not checking for previous entity since we don't want to waste a load and data will be overwritten anyways
let dailyData = new SubgraphDeploymentDailyData(id)
dailyData.dayStart = BigInt.fromI32((timestamp.toI32() / SECONDS_PER_DAY) * SECONDS_PER_DAY)
dailyData.dayEnd = dailyData.dayStart.plus(BigInt.fromI32(SECONDS_PER_DAY))
dailyData.dayNumber = dayId
dailyData.subgraphDeployment = entity.id

dailyData.stakedTokens = entity.stakedTokens
dailyData.signalledTokens = entity.signalledTokens
Expand Down
2 changes: 1 addition & 1 deletion src/mappings/l1staking.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ export function handleDelegationTransferredToL2(event: DelegationTransferredToL2

// De-activate relation with indexer after batch update, so last datapoints are created properly
let relation = IndexerDelegatedStakeRelation.load(delegation.relation)!
relation.active = false
relation.indexer = null
relation.save()
getAndUpdateNetworkDailyData(graphNetwork as GraphNetwork, event.block.timestamp)
}
Expand Down
4 changes: 2 additions & 2 deletions src/mappings/staking.ts
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ export function handleStakeDelegated(event: StakeDelegated): void {
// We wouldn't want to re-activate the relation in that case
if (!delegatedStake.shareAmount.equals(BIGINT_ZERO)) {
let relation = IndexerDelegatedStakeRelation.load(delegatedStake.relation)!
relation.active = true
relation.indexer = indexer.id
relation.save()
}

Expand Down Expand Up @@ -375,7 +375,7 @@ export function handleStakeDelegatedLocked(event: StakeDelegatedLocked): void {
// De-activate relation with indexer after batch update, so last datapoints are created properly
if (delegatedStake.shareAmount.equals(BIGINT_ZERO)) {
let relation = IndexerDelegatedStakeRelation.load(delegatedStake.relation)!
relation.active = false
relation.indexer = null
relation.save()
}
getAndUpdateNetworkDailyData(graphNetwork as GraphNetwork, event.block.timestamp)
Expand Down
2 changes: 1 addition & 1 deletion subgraph.template.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
specVersion: 1.0.0
specVersion: 1.1.0
description: Graph Network analytics subgraph ({{network}})
repository: https://github.com/graphprotocol/graph-network-analytics-subgraph
features:
Expand Down

0 comments on commit 0cfa7e1

Please sign in to comment.