Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SlotViz Struct #2835

Open
wants to merge 1 commit into
base: NOBIDS/refractor_rewards_exporter
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 60 additions & 8 deletions exporter/slot_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package exporter

import (
"bytes"
"context"
"database/sql"
"encoding/gob"
"eth2-exporter/db"
"eth2-exporter/rpc"
"eth2-exporter/services"
Expand All @@ -18,11 +20,12 @@ import (
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"

"github.com/go-redis/redis/v8"
eth_rewards "github.com/gobitfly/eth-rewards"
"github.com/gobitfly/eth-rewards/beacon"
)

func RunSlotExporter(firstRun bool) error {
func RunSlotExporter(firstRun bool, redisClient *redis.Client) error {

var err error
var clClient rpc.Client
Expand Down Expand Up @@ -64,7 +67,7 @@ func RunSlotExporter(firstRun bool) error {
if len(dbSlots) > 0 {
if dbSlots[0] != 0 {
logger.Infof("exporting genesis slot as it is missing in the database")
err := ExportSlot(clClient, 0, utils.EpochOfSlot(0) == head.HeadEpoch, tx)
err := ExportSlot(clClient, 0, utils.EpochOfSlot(0) == head.HeadEpoch, tx, redisClient)
if err != nil {
return fmt.Errorf("error exporting slot %v: %w", 0, err)
}
Expand All @@ -84,7 +87,7 @@ func RunSlotExporter(firstRun bool) error {
if previousSlot != currentSlot-1 {
logger.Infof("slots between %v and %v are missing, exporting them", previousSlot, currentSlot)
for slot := previousSlot + 1; slot <= currentSlot-1; slot++ {
err := ExportSlot(clClient, slot, false, tx)
err := ExportSlot(clClient, slot, false, tx, redisClient)

if err != nil {
return fmt.Errorf("error exporting slot %v: %w", slot, err)
Expand All @@ -102,7 +105,7 @@ func RunSlotExporter(firstRun bool) error {
if err != nil {
if err == sql.ErrNoRows {
logger.Infof("db is empty, export genesis slot")
err := ExportSlot(clClient, 0, utils.EpochOfSlot(0) == head.HeadEpoch, tx)
err := ExportSlot(clClient, 0, utils.EpochOfSlot(0) == head.HeadEpoch, tx, redisClient)
if err != nil {
return fmt.Errorf("error exporting slot %v: %w", 0, err)
}
Expand All @@ -116,7 +119,7 @@ func RunSlotExporter(firstRun bool) error {
if lastDbSlot != head.HeadSlot {
slotsExported := 0
for slot := lastDbSlot + 1; slot <= head.HeadSlot; slot++ { // export any new slots
err := ExportSlot(clClient, slot, utils.EpochOfSlot(slot) == head.HeadEpoch, tx)
err := ExportSlot(clClient, slot, utils.EpochOfSlot(slot) == head.HeadEpoch, tx, redisClient)
if err != nil {
return fmt.Errorf("error exporting slot %v: %w", slot, err)
}
Expand Down Expand Up @@ -181,7 +184,7 @@ func RunSlotExporter(firstRun bool) error {
if err != nil {
return fmt.Errorf("error setting block %v as finalized (orphaned): %w", dbSlot.Slot, err)
}
err = ExportSlot(clClient, dbSlot.Slot, utils.EpochOfSlot(dbSlot.Slot) == head.HeadEpoch, tx)
err = ExportSlot(clClient, dbSlot.Slot, utils.EpochOfSlot(dbSlot.Slot) == head.HeadEpoch, tx, redisClient)
if err != nil {
return fmt.Errorf("error exporting slot %v: %w", dbSlot.Slot, err)
}
Expand Down Expand Up @@ -211,7 +214,7 @@ func RunSlotExporter(firstRun bool) error {
} else { // check if a late slot has been proposed in the meantime
if len(dbSlot.BlockRoot) < 32 && header != nil { // we have no slot in the db, but the node has a slot, export it
logger.Infof("exporting new slot %v", dbSlot.Slot)
err := ExportSlot(clClient, dbSlot.Slot, utils.EpochOfSlot(dbSlot.Slot) == head.HeadEpoch, tx)
err := ExportSlot(clClient, dbSlot.Slot, utils.EpochOfSlot(dbSlot.Slot) == head.HeadEpoch, tx, redisClient)
if err != nil {
return fmt.Errorf("error exporting slot %v: %w", dbSlot.Slot, err)
}
Expand Down Expand Up @@ -289,7 +292,7 @@ func updateEpochStatusAndValidatorQueue(clClient rpc.Client, epoch uint64, tx *s
return nil
}

func ExportSlot(client rpc.Client, slot uint64, isHeadEpoch bool, tx *sqlx.Tx) error {
func ExportSlot(client rpc.Client, slot uint64, isHeadEpoch bool, tx *sqlx.Tx, redisClient *redis.Client) error {

isFirstSlotOfEpoch := slot%utils.Config.Chain.ClConfig.SlotsPerEpoch == 0
epoch := slot / utils.Config.Chain.ClConfig.SlotsPerEpoch
Expand Down Expand Up @@ -461,6 +464,55 @@ func ExportSlot(client rpc.Client, slot uint64, isHeadEpoch bool, tx *sqlx.Tx) e
return fmt.Errorf("error exporting proposal to bigtable for slot %v: %w", block.Slot, err)
}

// save the block to redis if it was produced during the last 60 minutes
if time.Since(utils.SlotToTime(block.Slot)) < time.Hour {
var serializedBlockData bytes.Buffer
enc := gob.NewEncoder(&serializedBlockData)

// TODO: replace with: RedisCachedBlockSlotViz
redisCachedBlock := &types.RedisCachedBlock{
Proposer: block.Proposer,
BlockRoot: block.BlockRoot,
Slot: block.Slot,
ParentRoot: block.ParentRoot,
StateRoot: block.StateRoot,
Signature: block.Signature,
RandaoReveal: block.RandaoReveal,
Graffiti: block.Graffiti,
Eth1Data: block.Eth1Data,
BodyRoot: block.BodyRoot,
ProposerSlashings: block.ProposerSlashings,
AttesterSlashings: block.AttesterSlashings,
Attestations: block.Attestations,
Deposits: block.Deposits,
VoluntaryExits: block.VoluntaryExits,
SyncAggregate: block.SyncAggregate,
SignedBLSToExecutionChange: block.SignedBLSToExecutionChange,
AttestationDuties: block.AttestationDuties,
SyncDuties: block.SyncDuties,
Finalized: block.Finalized,
EpochAssignments: block.EpochAssignments,
}
err := enc.Encode(redisCachedBlock)
if err != nil {
return fmt.Errorf("error serializing block to gob for slot %v: %w", block.Slot, err)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

key := fmt.Sprintf("%d:%s:%d", utils.Config.Chain.ClConfig.DepositChainID, "block", block.Slot)

expirationTime := utils.EpochToTime(epoch + 7) // keep it for at least 7 epochs in the cache
expirationDuration := time.Until(expirationTime)
logger.Infof("writing block to redis with a TTL of %v", expirationDuration)
err = redisClient.Set(ctx, key, serializedBlockData.Bytes(), expirationDuration).Err()
if err != nil {
return fmt.Errorf("error writing block to redis for slot %v: %w", block.Slot, err)
}
logger.Infof("writing block to redis completed")
}

// save the block data to the db
err = db.SaveBlock(block, false, tx)
if err != nil {
Expand Down
54 changes: 54 additions & 0 deletions types/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,60 @@ type Block struct {
EpochAssignments *EpochAssignments
Validators []*Validator
}
type RedisCachedBlock struct {
Proposer uint64
BlockRoot []byte
Slot uint64
ParentRoot []byte
StateRoot []byte
Signature []byte
RandaoReveal []byte
Graffiti []byte
Eth1Data *Eth1Data
BodyRoot []byte
ProposerSlashings []*ProposerSlashing
AttesterSlashings []*AttesterSlashing
Attestations []*Attestation
Deposits []*Deposit
VoluntaryExits []*VoluntaryExit
SyncAggregate *SyncAggregate // warning: sync aggregate may be nil, for phase0 blocks
SignedBLSToExecutionChange []*SignedBLSToExecutionChange
AttestationDuties map[ValidatorIndex][]Slot
SyncDuties map[ValidatorIndex]bool
Finalized bool
EpochAssignments *EpochAssignments
}

type ValidatorIndex32 uint32

type RedisCachedBlockSlotViz struct {
Proposer ValidatorIndex32
ProposerCLReward int64 // negative in case missed
ProposerELReward uint64
Slot uint64 // Slot/32 = epoch; to check for finality, slotviz needs epoch data as well (with sync committee participants usw) to work
AttestationSlashings map[ValidatorIndex32][]RedisCachedBlockSlashing // slasher index -> RedisCachedBlockSlashing
ProposerSlashings map[ValidatorIndex32][]RedisCachedBlockSlashing
AttestationDuties map[ValidatorIndex32][]RedisCachedBlockAttestations
SyncDuties map[ValidatorIndex32]RedisCachedBlockSync
}

type RedisCachedBlockSlashing struct {
Slashed ValidatorIndex32
Slot Slot
Earnings uint64 // slasher perspective
Penalty uint64 // slashed perspective
}

type RedisCachedBlockAttestations struct {
ForSlot Slot
SourceEarnings int64 // negative in case missed
TargetEarnings int64
HeadEarnings int64
}

type RedisCachedBlockSync struct {
Earnings int64
}

type SignedBLSToExecutionChange struct {
Message BLSToExecutionChange
Expand Down
Loading