Skip to content

Commit

Permalink
Sync miner reward. (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
Argeric authored Apr 7, 2024
1 parent e864c46 commit 1266496
Show file tree
Hide file tree
Showing 12 changed files with 1,101 additions and 105 deletions.
2 changes: 2 additions & 0 deletions cmd/data_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ var migrationModels = []interface{}{
&store.Submit{},
&store.AddressSubmit{},
&store.SubmitStat{},
&store.Reward{},
&store.AddressReward{},
}

func MustInitDataContext() DataContext {
Expand Down
7 changes: 7 additions & 0 deletions config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ charge:
# Token decimals
decimals: 18

# Reward contract info configurations
reward:
# Reward contract address
address: 0x7a6bb8a34cc7a418350d3cf2409c580ba9ef2dbf
# Distribute reward event signature
rewardEventSignature: 0x83617a1b0f847971f005bd162dde513cfe93df96e6293c3bbb5fe9c40629dd4c

# MySQL database configurations
store:
mysql:
Expand Down
801 changes: 801 additions & 0 deletions contract/OnePoolReward.go

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion contract/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,23 @@ import (
)

var (
flowFilterer *contract.FlowFilterer
flowFilterer *contract.FlowFilterer
rewardFilterer *OnePoolRewardFilterer
)

func init() {
flowFilterer, _ = contract.NewFlowFilterer(common.HexToAddress(""), nil)
rewardFilterer, _ = NewOnePoolRewardFilterer(common.HexToAddress(""), nil)
}

func DummyFlowFilterer() *contract.FlowFilterer {
return flowFilterer
}

func DummyRewardFilterer() *OnePoolRewardFilterer {
return rewardFilterer
}

func TokenInfo(w3c *web3go.Client, address string) (string, string, uint8, error) {
ethClient, _ := w3c.ToClientForContract()
token, err := NewErc20Token(common.HexToAddress(address), ethClient)
Expand Down
44 changes: 41 additions & 3 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type MysqlStore struct {
*SubmitStore
*SubmitStatStore
*AddressSubmitStore
*RewardStore
*AddressRewardStore
}

func MustNewStore(db *gorm.DB) *MysqlStore {
Expand All @@ -31,10 +33,12 @@ func MustNewStore(db *gorm.DB) *MysqlStore {
SubmitStore: newSubmitStore(db),
SubmitStatStore: newSubmitStatStore(db),
AddressSubmitStore: newAddressSubmitStore(db),
RewardStore: newRewardStore(db),
AddressRewardStore: newAddressRewardStore(db),
}
}

func (ms *MysqlStore) Push(block *Block, submits []*Submit) error {
func (ms *MysqlStore) Push(block *Block, submits []*Submit, rewards []*Reward) error {
addressSubmits := make([]AddressSubmit, 0)
if len(submits) > 0 {
for _, submit := range submits {
Expand All @@ -53,6 +57,21 @@ func (ms *MysqlStore) Push(block *Block, submits []*Submit) error {
}
}

addressRewards := make([]AddressReward, 0)
if len(rewards) > 0 {
for _, reward := range rewards {
addressReward := AddressReward{
MinerID: reward.MinerID,
PricingIndex: reward.PricingIndex,
Amount: reward.Amount,
BlockNumber: reward.BlockNumber,
BlockTime: reward.BlockTime,
TxHash: reward.TxHash,
}
addressRewards = append(addressRewards, addressReward)
}
}

return ms.Store.DB.Transaction(func(dbTx *gorm.DB) error {
// save blocks
if err := ms.BlockStore.Add(dbTx, block); err != nil {
Expand All @@ -69,6 +88,16 @@ func (ms *MysqlStore) Push(block *Block, submits []*Submit) error {
}
}

// save distribute rewards
if len(rewards) > 0 {
if err := ms.RewardStore.Add(dbTx, rewards); err != nil {
return errors.WithMessage(err, "failed to save rewards")
}
if err := ms.AddressRewardStore.Add(dbTx, addressRewards); err != nil {
return errors.WithMessage(err, "failed to save address rewards")
}
}

return nil
})
}
Expand All @@ -84,10 +113,19 @@ func (ms *MysqlStore) Pop(block uint64) error {

return ms.Store.DB.Transaction(func(dbTx *gorm.DB) error {
if err := ms.BlockStore.Pop(dbTx, block); err != nil {
return errors.WithMessage(err, "failed to remove block")
return errors.WithMessage(err, "failed to remove blocks")
}
if err := ms.SubmitStore.Pop(dbTx, block); err != nil {
return errors.WithMessage(err, "failed to remove flow submits")
return errors.WithMessage(err, "failed to remove submits")
}
if err := ms.AddressSubmitStore.Pop(dbTx, block); err != nil {
return errors.WithMessage(err, "failed to remove address submits")
}
if err := ms.RewardStore.Pop(dbTx, block); err != nil {
return errors.WithMessage(err, "failed to remove rewards")
}
if err := ms.AddressRewardStore.Pop(dbTx, block); err != nil {
return errors.WithMessage(err, "failed to remove address rewards")
}
return nil
})
Expand Down
40 changes: 40 additions & 0 deletions store/store_address_reward.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package store

import (
"time"

"github.com/Conflux-Chain/go-conflux-util/store/mysql"
"github.com/shopspring/decimal"
"gorm.io/gorm"
)

type AddressReward struct {
MinerID uint64 `gorm:"primaryKey;autoIncrement:false"`
PricingIndex uint64 `gorm:"primaryKey;autoIncrement:false"`
Amount decimal.Decimal `gorm:"type:decimal(65);not null"`
BlockNumber uint64 `gorm:"not null;index:idx_bn"`
BlockTime time.Time `gorm:"not null"`
TxHash string `gorm:"size:66;not null"`
}

func (AddressReward) TableName() string {
return "address_rewards"
}

type AddressRewardStore struct {
*mysql.Store
}

func newAddressRewardStore(db *gorm.DB) *AddressRewardStore {
return &AddressRewardStore{
Store: mysql.NewStore(db),
}
}

func (ars *AddressRewardStore) Add(dbTx *gorm.DB, addressRewards []AddressReward) error {
return dbTx.CreateInBatches(addressRewards, batchSizeInsert).Error
}

func (ars *AddressRewardStore) Pop(dbTx *gorm.DB, block uint64) error {
return dbTx.Where("block_number >= ?", block).Delete(&AddressReward{}).Error
}
10 changes: 7 additions & 3 deletions store/store_address_submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (
)

type AddressSubmit struct {
SenderID uint64 `gorm:"primary_key;autoIncrement:false"`
SubmissionIndex uint64 `gorm:"primary_key;autoIncrement:false"`
SenderID uint64 `gorm:"primaryKey;autoIncrement:false"`
SubmissionIndex uint64 `gorm:"primaryKey;autoIncrement:false"`
RootHash string `gorm:"size:66;index:idx_root"`
Length uint64 `gorm:"not null"`

BlockNumber uint64 `gorm:"not null"`
BlockNumber uint64 `gorm:"not null;index:idx_bn"`
BlockTime time.Time `gorm:"not null"`
TxHash string `gorm:"size:66;not null"`

Expand Down Expand Up @@ -42,6 +42,10 @@ func (ass *AddressSubmitStore) Add(dbTx *gorm.DB, addressSubmits []AddressSubmit
return dbTx.CreateInBatches(addressSubmits, batchSizeInsert).Error
}

func (ass *AddressSubmitStore) Pop(dbTx *gorm.DB, block uint64) error {
return dbTx.Where("block_number >= ?", block).Delete(&AddressSubmit{}).Error
}

func (ass *AddressSubmitStore) UpdateByPrimaryKey(dbTx *gorm.DB, s *AddressSubmit) error {
db := ass.DB
if dbTx != nil {
Expand Down
61 changes: 61 additions & 0 deletions store/store_reward.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package store

import (
"time"

nhContract "github.com/0glabs/0g-storage-scan/contract"
"github.com/Conflux-Chain/go-conflux-util/store/mysql"
"github.com/openweb3/web3go/types"
"github.com/shopspring/decimal"
"gorm.io/gorm"
)

type Reward struct {
PricingIndex uint64 `gorm:"primaryKey;autoIncrement:false"`
Miner string `gorm:"-"`
MinerID uint64 `gorm:"not null"`
Amount decimal.Decimal `gorm:"type:decimal(65);not null"`
BlockNumber uint64 `gorm:"not null;index:idx_bn"`
BlockTime time.Time `gorm:"not null"`
TxHash string `gorm:"size:66;not null"`
}

func NewReward(blockTime time.Time, log types.Log, filter *nhContract.OnePoolRewardFilterer) (*Reward, error) {
distributeReward, err := filter.ParseDistributeReward(*log.ToEthLog())
if err != nil {
return nil, err
}

reward := &Reward{
PricingIndex: distributeReward.PricingIndex.Uint64(),
Miner: distributeReward.Beneficiary.String(),
Amount: decimal.NewFromBigInt(distributeReward.Amount, 0),
BlockNumber: log.BlockNumber,
BlockTime: blockTime,
TxHash: log.TxHash.String(),
}

return reward, nil
}

func (Reward) TableName() string {
return "rewards"
}

type RewardStore struct {
*mysql.Store
}

func newRewardStore(db *gorm.DB) *RewardStore {
return &RewardStore{
Store: mysql.NewStore(db),
}
}

func (rs *RewardStore) Add(dbTx *gorm.DB, rewards []*Reward) error {
return dbTx.CreateInBatches(rewards, batchSizeInsert).Error
}

func (rs *RewardStore) Pop(dbTx *gorm.DB, block uint64) error {
return dbTx.Where("block_number >= ?", block).Delete(&Reward{}).Error
}
4 changes: 2 additions & 2 deletions store/store_submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Submit struct {
SenderID uint64 `gorm:"not null"`
Length uint64 `gorm:"not null"`

BlockNumber uint64 `gorm:"not null"`
BlockNumber uint64 `gorm:"not null;index:idx_bn"`
BlockTime time.Time `gorm:"not null"`
TxHash string `gorm:"size:66;not null"`

Expand Down Expand Up @@ -153,7 +153,7 @@ func (ss *SubmitStore) List(rootHash *string, idDesc bool, skip, limit int) (int

func (ss *SubmitStore) BatchGetNotFinalized(batch int) ([]Submit, error) {
submits := new([]Submit)
if err := ss.DB.Raw("select submission_index, sender_id from submits where status < ? limit ?",
if err := ss.DB.Raw("select submission_index, sender_id, total_seg_num from submits where status < ? limit ?",
Uploaded, batch).Scan(submits).Error; err != nil {
return nil, err
}
Expand Down
18 changes: 16 additions & 2 deletions sync/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func getEthDataByReceipts(w3c *web3go.Client, blockNumber uint64) (*EthData, err
return &EthData{Number: blockNumber, Block: block, Receipts: txReceipts}, nil
}

func getEthDataByLogs(w3c *web3go.Client, blockNumber uint64, flowAddr common.Address, flowSubmitSig common.Hash) (*EthData, error) {
func getEthDataByLogs(w3c *web3go.Client, blockNumber uint64, addresses []common.Address, topics [][]common.Hash) (*EthData, error) {
// get block
block, err := w3c.Eth.BlockByNumber(types.BlockNumber(blockNumber), true)
if err != nil {
Expand All @@ -86,7 +86,8 @@ func getEthDataByLogs(w3c *web3go.Client, blockNumber uint64, flowAddr common.Ad
}

// batch get logs
logArray, err := batchGetFlowSubmits(w3c, blockNumber, blockNumber, flowAddr, flowSubmitSig)
//logArray, err := batchGetFlowSubmits(w3c, blockNumber, blockNumber, flowAddr, flowSubmitSig)
logArray, err := batchGetLogs(w3c, blockNumber, blockNumber, addresses, topics)
if err != nil {
return nil, errors.WithMessagef(err, "failed to get flow submits in batch at block %v", blockNumber)
}
Expand Down Expand Up @@ -134,6 +135,19 @@ func batchGetFlowSubmits(w3c *web3go.Client, blockFrom, blockTo uint64, flowAddr
return w3c.Eth.Logs(logFilter)
}

func batchGetLogs(w3c *web3go.Client, blockFrom, blockTo uint64, addresses []common.Address,
topics [][]common.Hash) ([]types.Log, error) {
bnFrom := types.NewBlockNumber(int64(blockFrom))
bnTo := types.NewBlockNumber(int64(blockTo))
logFilter := types.FilterQuery{
FromBlock: &bnFrom,
ToBlock: &bnTo,
Addresses: addresses,
Topics: topics,
}
return w3c.Eth.Logs(logFilter)
}

func batchGetBlockTimes(ctx context.Context, w3c *web3go.Client, blkNums []types.BlockNumber,
batchSize uint64) (map[uint64]uint64, error) {
if len(blkNums) == 0 {
Expand Down
Loading

0 comments on commit 1266496

Please sign in to comment.