Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: improve snapshot generation performance #251

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .testdataVersion
Original file line number Diff line number Diff line change
@@ -1 +1 @@
da8f00be49d1447f22934629d9d6819e3d763af9
d67ef5d895bdc0ccd6a006a1683ac3e58f820ad0
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (m *Migration) Up(db *sql.DB, grm *gorm.DB, cfg *config.Config) error {
duration integer,
block_number bigint not null,
block_time timestamp without time zone not null,
block_date date not null,
block_date text not null,
reward_type varchar not null
)`,
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package _202502180836_snapshotUniqueConstraints

import (
"database/sql"
"github.com/Layr-Labs/sidecar/internal/config"
"github.com/pkg/errors"
"gorm.io/gorm"
)

type Migration struct {
}

func (m *Migration) Up(db *sql.DB, grm *gorm.DB, cfg *config.Config) error {
queries := []string{
`alter table staker_shares add constraint uniq_staker_shares unique (staker, strategy, transaction_hash, log_index, block_number)`,
`create table if not exists staker_share_snapshots (
staker varchar,
strategy varchar,
shares numeric,
snapshot date
)`,
`alter table staker_share_snapshots add constraint uniq_staker_share_snapshots unique (staker, strategy, snapshot)`,

// re-add indexes that likely got nuked due to dropping and re-creating the snapshot table
`create index if not exists idx_staker_share_snapshots_staker_strategy_snapshot on staker_share_snapshots (staker, strategy, snapshot)`,
`create index if not exists idx_staker_share_snapshots_strategy_snapshot on staker_share_snapshots (strategy, snapshot)`,

`alter table staker_delegation_snapshots add constraint uniq_staker_delegation_snapshots unique (staker, operator, snapshot)`,
`create index if not exists idx_staker_delegation_snapshots_operator_snapshot on staker_delegation_snapshots (operator, snapshot)`,

`alter table operator_share_snapshots add constraint uniq_operator_share_snapshots unique (operator, strategy, snapshot)`,

`alter table operator_shares add constraint uniq_operator_shares unique (operator, strategy, transaction_hash, log_index, block_number)`,

`alter table operator_pi_split_snapshots add constraint uniq_operator_pi_split_snapshots unique (operator, split, snapshot)`,

`create table if not exists operator_directed_rewards(
avs varchar,
reward_hash varchar,
token varchar,
operator varchar,
operator_index integer,
amount numeric,
strategy varchar,
strategy_index integer,
multiplier numeric(78),
start_timestamp timestamp(6),
end_timestamp timestamp(6),
duration bigint,
block_number bigint,
block_time timestamp(6),
block_date text
);`,
`alter table operator_directed_rewards add constraint uniq_operator_directed_rewards unique (avs, reward_hash, strategy_index, operator_index)`,

`alter table operator_avs_strategy_snapshots add constraint uniq_operator_avs_strategy_snapshots unique (operator, avs, strategy, snapshot)`,

`alter table operator_avs_registration_snapshots add constraint uniq_operator_avs_registration_snapshots unique (operator, avs, snapshot)`,

`alter table default_operator_split_snapshots add constraint uniq_default_operator_split_snapshots unique (snapshot)`,

`alter table combined_rewards add constraint uniq_combined_rewards unique (avs, reward_hash, strategy_index)`,

`alter table operator_avs_split_snapshots add constraint uniq_operator_avs_split_snapshots unique (operator, avs, snapshot)`,
}

for _, query := range queries {
res := grm.Exec(query)
if res.Error != nil {
return errors.Wrapf(res.Error, "failed to execute query: %s", query)
}
}
return nil
}

func (m *Migration) GetName() string {
return "202502180836_snapshotUniqueConstraints"
}
2 changes: 2 additions & 0 deletions pkg/postgres/migrations/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
_202501241111_addIndexesForRpcFunctions "github.com/Layr-Labs/sidecar/pkg/postgres/migrations/202501241111_addIndexesForRpcFunctions"
_202502100846_goldTableRewardHashIndex "github.com/Layr-Labs/sidecar/pkg/postgres/migrations/202502100846_goldTableRewardHashIndex"
_202502180836_snapshotUniqueConstraints "github.com/Layr-Labs/sidecar/pkg/postgres/migrations/202502180836_snapshotUniqueConstraints"
"time"

"github.com/Layr-Labs/sidecar/internal/config"
Expand Down Expand Up @@ -138,6 +139,7 @@ func (m *Migrator) MigrateAll() error {
&_202501071401_defaultOperatorSplitSnapshots.Migration{},
&_202501241111_addIndexesForRpcFunctions.Migration{},
&_202502100846_goldTableRewardHashIndex.Migration{},
&_202502180836_snapshotUniqueConstraints.Migration{},
}

for _, migration := range migrations {
Expand Down
94 changes: 51 additions & 43 deletions pkg/rewards/combinedRewards.go
Original file line number Diff line number Diff line change
@@ -1,61 +1,69 @@
package rewards

import "github.com/Layr-Labs/sidecar/pkg/rewardsUtils"
import (
"github.com/Layr-Labs/sidecar/pkg/rewardsUtils"
"go.uber.org/zap"
)

const rewardsCombinedQuery = `
with combined_rewards as (
select
rs.avs,
rs.reward_hash,
rs.token,
rs.amount,
rs.strategy,
rs.strategy_index,
rs.multiplier,
rs.start_timestamp,
rs.end_timestamp,
rs.duration,
rs.block_number,
b.block_time::timestamp(6),
to_char(b.block_time, 'YYYY-MM-DD') AS block_date,
rs.reward_type
from reward_submissions as rs
left join blocks as b on (b.number = rs.block_number)
-- pipeline bronze table uses this to filter the correct records
where b.block_time < TIMESTAMP '{{.cutoffDate}}'
)
insert into combined_rewards (avs, reward_hash, token, amount, start_timestamp, duration, end_timestamp, strategy, multiplier, strategy_index, block_number, block_time, block_date, reward_type)
with _combined_rewards as (
select
avs,
reward_hash,
token,
amount,
start_timestamp,
duration,
end_timestamp,
strategy,
multiplier,
strategy_index,
block_number,
block_time,
block_date,
reward_type
from combined_rewards
rs.avs,
rs.reward_hash,
rs.token,
rs.amount,
rs.strategy,
rs.strategy_index,
rs.multiplier,
rs.start_timestamp,
rs.end_timestamp,
rs.duration,
rs.block_number,
b.block_time::timestamp(6),
to_char(b.block_time, 'YYYY-MM-DD') AS block_date,
rs.reward_type
from reward_submissions as rs
left join blocks as b on (b.number = rs.block_number)
-- pipeline bronze table uses this to filter the correct records
where b.block_time < TIMESTAMP '{{.cutoffDate}}'
)
select
avs,
reward_hash,
token,
amount,
start_timestamp,
duration,
end_timestamp,
strategy,
multiplier,
strategy_index,
block_number,
block_time,
block_date,
reward_type
from _combined_rewards
on conflict on constraint uniq_combined_rewards do nothing;
`

func (r *RewardsCalculator) GenerateAndInsertCombinedRewards(snapshotDate string) error {
tableName := "combined_rewards"

query, err := rewardsUtils.RenderQueryTemplate(rewardsCombinedQuery, map[string]interface{}{
"cutoffDate": snapshotDate,
})
if err != nil {
r.logger.Sugar().Errorw("Failed to render rewards combined query", "error", err)
r.logger.Sugar().Errorw("Failed to render rewards combined query",
zap.Error(err),
)
return err
}

err = r.generateAndInsertFromQuery(tableName, query, nil)
if err != nil {
r.logger.Sugar().Errorw("Failed to generate combined rewards", "error", err)
res := r.grm.Exec(query)
if res.Error != nil {
r.logger.Sugar().Errorw("Failed to generate combined rewards",
zap.String("snapshotDate", snapshotDate),
zap.Error(res.Error),
)
return err
}
return nil
Expand Down
27 changes: 18 additions & 9 deletions pkg/rewards/defaultOperatorSplitSnapshots.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package rewards

import "github.com/Layr-Labs/sidecar/pkg/rewardsUtils"
import (
"github.com/Layr-Labs/sidecar/pkg/rewardsUtils"
"go.uber.org/zap"
)

const defaultOperatorSplitSnapshotQuery = `
insert into default_operator_split_snapshots (split, snapshot)
WITH default_operator_splits_with_block_info as (
select
dos.new_default_operator_split_bips as split,
Expand Down Expand Up @@ -56,24 +60,29 @@ final_results as (
CROSS JOIN
generate_series(DATE(start_time), DATE(end_time) - interval '1' day, interval '1' day) AS d
)
select * from final_results
select
split,
snapshot
from final_results
on conflict on constraint uniq_default_operator_split_snapshots do nothing;
`

func (r *RewardsCalculator) GenerateAndInsertDefaultOperatorSplitSnapshots(snapshotDate string) error {
tableName := "default_operator_split_snapshots"

query, err := rewardsUtils.RenderQueryTemplate(defaultOperatorSplitSnapshotQuery, map[string]interface{}{
"cutoffDate": snapshotDate,
})
if err != nil {
r.logger.Sugar().Errorw("Failed to render query template", "error", err)
r.logger.Sugar().Errorw("Failed to render query template", zap.Error(err))
return err
}

err = r.generateAndInsertFromQuery(tableName, query, nil)
if err != nil {
r.logger.Sugar().Errorw("Failed to generate default_operator_split_snapshots", "error", err)
return err
res := r.grm.Exec(query)
if res.Error != nil {
r.logger.Sugar().Errorw("Failed to generate default_operator_split_snapshots",
zap.String("snapshotDate", snapshotDate),
zap.Error(res.Error),
)
return res.Error
}
return nil
}
Expand Down
22 changes: 14 additions & 8 deletions pkg/rewards/operatorAvsRegistrationSnapshots.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package rewards

import "github.com/Layr-Labs/sidecar/pkg/rewardsUtils"
import (
"github.com/Layr-Labs/sidecar/pkg/rewardsUtils"
"go.uber.org/zap"
)

// Operator AVS Registration Windows: Ranges at which an operator has registered for an AVS
// 0. Ranked: Rank the operator state changes by block_time and log_index since sqlite lacks LEAD/LAG functions
Expand All @@ -18,6 +21,7 @@ import "github.com/Layr-Labs/sidecar/pkg/rewardsUtils"
// Entry Exit
// Since exits (deregistrations) are rounded down, we must only look at the day 2 snapshot on a pipeline run on day 3.
const operatorAvsRegistrationSnapshotsQuery = `
insert into operator_avs_registration_snapshots (operator, avs, snapshot)
WITH state_changes as (
select
aosc.*,
Expand Down Expand Up @@ -100,23 +104,25 @@ SELECT
d AS snapshot
FROM cleaned_records
CROSS JOIN generate_series(DATE(start_time), DATE(end_time) - interval '1' day, interval '1' day) AS d
on conflict on constraint uniq_operator_avs_registration_snapshots do nothing;
`

func (r *RewardsCalculator) GenerateAndInsertOperatorAvsRegistrationSnapshots(snapshotDate string) error {
tableName := "operator_avs_registration_snapshots"

query, err := rewardsUtils.RenderQueryTemplate(operatorAvsRegistrationSnapshotsQuery, map[string]interface{}{
"cutoffDate": snapshotDate,
})
if err != nil {
r.logger.Sugar().Errorw("Failed to render operator AVS registration snapshots query", "error", err)
r.logger.Sugar().Errorw("Failed to render operator AVS registration snapshots query", zap.Error(err))
return err
}

err = r.generateAndInsertFromQuery(tableName, query, nil)
if err != nil {
r.logger.Sugar().Errorw("Failed to generate operator_avs_registration_snapshots", "error", err)
return err
res := r.grm.Exec(query)
if res.Error != nil {
r.logger.Sugar().Errorw("Failed to generate operator_avs_registration_snapshots",
zap.String("snapshotDate", snapshotDate),
zap.Error(res.Error),
)
return res.Error
}
return nil
}
Expand Down
25 changes: 18 additions & 7 deletions pkg/rewards/operatorAvsSplitSnapshots.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package rewards

import "github.com/Layr-Labs/sidecar/pkg/rewardsUtils"
import (
"github.com/Layr-Labs/sidecar/pkg/rewardsUtils"
"go.uber.org/zap"
)

const operatorAvsSplitSnapshotQuery = `
insert into operator_avs_split_snapshots (operator, avs, split, snapshot)
WITH operator_avs_splits_with_block_info as (
select
oas.operator,
Expand Down Expand Up @@ -62,12 +66,16 @@ final_results as (
CROSS JOIN
generate_series(DATE(start_time), DATE(end_time) - interval '1' day, interval '1' day) AS d
)
select * from final_results
select
operator,
avs,
split,
snapshot
from final_results
on conflict on constraint uniq_operator_avs_split_snapshots do nothing
`

func (r *RewardsCalculator) GenerateAndInsertOperatorAvsSplitSnapshots(snapshotDate string) error {
tableName := "operator_avs_split_snapshots"

query, err := rewardsUtils.RenderQueryTemplate(operatorAvsSplitSnapshotQuery, map[string]interface{}{
"cutoffDate": snapshotDate,
})
Expand All @@ -76,9 +84,12 @@ func (r *RewardsCalculator) GenerateAndInsertOperatorAvsSplitSnapshots(snapshotD
return err
}

err = r.generateAndInsertFromQuery(tableName, query, nil)
if err != nil {
r.logger.Sugar().Errorw("Failed to generate operator_avs_split_snapshots", "error", err)
res := r.grm.Exec(query)
if res.Error != nil {
r.logger.Sugar().Errorw("Failed to generate operator_avs_split_snapshots",
zap.String("snapshotDate", snapshotDate),
zap.Error(err),
)
return err
}
return nil
Expand Down
Loading
Loading