Skip to content

Commit

Permalink
make queryBlockRangea an argument to query log functions (#237)
Browse files Browse the repository at this point in the history
* make queryBlockRangea an argument to query log functions

* fix operatorsinfo and rebuild mocks

* fix bug
  • Loading branch information
samlaf authored May 9, 2024
1 parent 7e4891d commit 9c7a75a
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 34 deletions.
22 changes: 16 additions & 6 deletions chainio/clients/avsregistry/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
stakeregistry "github.com/Layr-Labs/eigensdk-go/contracts/bindings/StakeRegistry"
)

// eth_getLogs is limited to a 10,000 range, so we need to iterate over the range
const QueryBlockRange = 10_000
// different node providers have different eth_getLogs range limits. 10k is an arbitrary choice that should work for most
var DefaultQueryBlockRange = big.NewInt(10_000)

type AvsRegistryReader interface {
GetQuorumCount(opts *bind.CallOpts) (uint8, error)
Expand Down Expand Up @@ -76,12 +76,14 @@ type AvsRegistryReader interface {
ctx context.Context,
startBlock *big.Int,
stopBlock *big.Int,
blockRange *big.Int,
) ([]types.OperatorAddr, []types.OperatorPubkeys, error)

QueryExistingRegisteredOperatorSockets(
ctx context.Context,
startBlock *big.Int,
stopBlock *big.Int,
blockRange *big.Int,
) (map[types.OperatorId]types.Socket, error)
}

Expand Down Expand Up @@ -368,6 +370,7 @@ func (r *AvsRegistryChainReader) QueryExistingRegisteredOperatorPubKeys(
ctx context.Context,
startBlock *big.Int,
stopBlock *big.Int,
blockRange *big.Int,
) ([]types.OperatorAddr, []types.OperatorPubkeys, error) {
blsApkRegistryAbi, err := apkreg.ContractBLSApkRegistryMetaData.GetAbi()
if err != nil {
Expand All @@ -384,12 +387,15 @@ func (r *AvsRegistryChainReader) QueryExistingRegisteredOperatorPubKeys(
}
stopBlock = big.NewInt(int64(curBlockNum))
}
if blockRange == nil {
blockRange = DefaultQueryBlockRange
}

operatorAddresses := make([]types.OperatorAddr, 0)
operatorPubkeys := make([]types.OperatorPubkeys, 0)
for i := startBlock; i.Cmp(stopBlock) <= 0; i.Add(i, big.NewInt(QueryBlockRange)) {
for i := startBlock; i.Cmp(stopBlock) <= 0; i.Add(i, blockRange) {
// Subtract 1 since FilterQuery is inclusive
toBlock := big.NewInt(0).Add(i, big.NewInt(QueryBlockRange-1))
toBlock := big.NewInt(0).Add(i, big.NewInt(0).Sub(blockRange, big.NewInt(1)))
if toBlock.Cmp(stopBlock) > 0 {
toBlock = stopBlock
}
Expand Down Expand Up @@ -458,6 +464,7 @@ func (r *AvsRegistryChainReader) QueryExistingRegisteredOperatorSockets(
ctx context.Context,
startBlock *big.Int,
stopBlock *big.Int,
blockRange *big.Int,
) (map[types.OperatorId]types.Socket, error) {

if startBlock == nil {
Expand All @@ -470,11 +477,14 @@ func (r *AvsRegistryChainReader) QueryExistingRegisteredOperatorSockets(
}
stopBlock = big.NewInt(int64(curBlockNum))
}
if blockRange == nil {
blockRange = DefaultQueryBlockRange
}

operatorIdToSocketMap := make(map[types.OperatorId]types.Socket)
for i := startBlock; i.Cmp(stopBlock) <= 0; i.Add(i, big.NewInt(QueryBlockRange)) {
for i := startBlock; i.Cmp(stopBlock) <= 0; i.Add(i, blockRange) {
// Subtract 1 since FilterQuery is inclusive
toBlock := big.NewInt(0).Add(i, big.NewInt(QueryBlockRange-1))
toBlock := big.NewInt(0).Add(i, big.NewInt(0).Sub(blockRange, big.NewInt(1)))
if toBlock.Cmp(stopBlock) > 0 {
toBlock = stopBlock
}
Expand Down
16 changes: 8 additions & 8 deletions chainio/mocks/avsRegistryContractsReader.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 22 additions & 13 deletions services/operatorsinfo/operatorsinfo_inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package operatorsinfo

import (
"context"
"math/big"
"sync"

"github.com/Layr-Labs/eigensdk-go/chainio/clients/avsregistry"
Expand All @@ -11,6 +12,8 @@ import (
"github.com/ethereum/go-ethereum/common"
)

var defaultLogFilterQueryBlockRange = big.NewInt(10_000)

// OperatorsInfoServiceInMemory is a stateful goroutine (see https://gobyexample.com/stateful-goroutines)
// implementation of OperatorsInfoService that listen for the NewPubkeyRegistration and OperatorSocketUpdate events using a websocket connection
// to an eth client and stores the pubkeys/sockets in memory. Another possible implementation is using a mutex
Expand All @@ -23,10 +26,11 @@ import (
// better than silently failing, since it will be easier to debug. Naturally, this means that this aggregator using this service needs
// to be replicated and load-balanced, so that when it fails traffic can be switched to the other aggregator.
type OperatorsInfoServiceInMemory struct {
avsRegistrySubscriber avsregistry.AvsRegistrySubscriber
avsRegistryReader avsregistry.AvsRegistryReader
logger logging.Logger
queryC chan<- query
logFilterQueryBlockRange *big.Int
avsRegistrySubscriber avsregistry.AvsRegistrySubscriber
avsRegistryReader avsregistry.AvsRegistryReader
logger logging.Logger
queryC chan<- query
// queried via the queryC channel, so don't need mutex to access
pubkeyDict map[common.Address]types.OperatorPubkeys
operatorAddrToId map[common.Address]types.OperatorId
Expand Down Expand Up @@ -55,17 +59,22 @@ func NewOperatorsInfoServiceInMemory(
ctx context.Context,
avsRegistrySubscriber avsregistry.AvsRegistrySubscriber,
avsRegistryReader avsregistry.AvsRegistryReader,
logFilterQueryBlockRange *big.Int,
logger logging.Logger,
) *OperatorsInfoServiceInMemory {
queryC := make(chan query)
if logFilterQueryBlockRange == nil {
logFilterQueryBlockRange = defaultLogFilterQueryBlockRange
}
pkcs := &OperatorsInfoServiceInMemory{
avsRegistrySubscriber: avsRegistrySubscriber,
avsRegistryReader: avsRegistryReader,
logger: logger,
queryC: queryC,
pubkeyDict: make(map[common.Address]types.OperatorPubkeys),
operatorAddrToId: make(map[common.Address]types.OperatorId),
socketDict: make(map[types.OperatorId]types.Socket),
avsRegistrySubscriber: avsRegistrySubscriber,
avsRegistryReader: avsRegistryReader,
logFilterQueryBlockRange: logFilterQueryBlockRange,
logger: logger,
queryC: queryC,
pubkeyDict: make(map[common.Address]types.OperatorPubkeys),
operatorAddrToId: make(map[common.Address]types.OperatorId),
socketDict: make(map[types.OperatorId]types.Socket),
}
// We use this waitgroup to wait on the initialization of the inmemory pubkey dict,
// which requires querying the past events of the pubkey registration contract
Expand Down Expand Up @@ -154,14 +163,14 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine(ctx context.Con
func (ops *OperatorsInfoServiceInMemory) queryPastRegisteredOperatorEventsAndFillDb(ctx context.Context) {
// Querying with nil startBlock and stopBlock will return all events. It doesn't matter if we query some events that we will receive again in the websocket,
// since we will just overwrite the pubkey dict with the same values.
alreadyRegisteredOperatorAddrs, alreadyRegisteredOperatorPubkeys, err := ops.avsRegistryReader.QueryExistingRegisteredOperatorPubKeys(ctx, nil, nil)
alreadyRegisteredOperatorAddrs, alreadyRegisteredOperatorPubkeys, err := ops.avsRegistryReader.QueryExistingRegisteredOperatorPubKeys(ctx, nil, nil, ops.logFilterQueryBlockRange)
if err != nil {
ops.logger.Error("Fatal error querying existing registered operators", "err", err, "service", "OperatorPubkeysServiceInMemory")
panic(err)
}
ops.logger.Debug("List of queried operator registration events in blsApkRegistry", "alreadyRegisteredOperatorAddr", alreadyRegisteredOperatorAddrs, "service", "OperatorPubkeysServiceInMemory")

socketsMap, err := ops.avsRegistryReader.QueryExistingRegisteredOperatorSockets(ctx, nil, nil)
socketsMap, err := ops.avsRegistryReader.QueryExistingRegisteredOperatorSockets(ctx, nil, nil, ops.logFilterQueryBlockRange)
if err != nil {
ops.logger.Error("Fatal error querying existing registered operator sockets", "err", err, "service", "OperatorPubkeysServiceInMemory")
panic(err)
Expand Down
14 changes: 7 additions & 7 deletions services/operatorsinfo/operatorsinfo_inmemory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ func TestGetOperatorInfo(t *testing.T) {
errC := make(chan error)
mockSubscription.EXPECT().Err().AnyTimes().Return(errC)
mockAvsRegistrySubscriber.EXPECT().SubscribeToNewPubkeyRegistrations().Return(nil, mockSubscription, nil)
mockAvsReader.EXPECT().QueryExistingRegisteredOperatorPubKeys(gomock.Any(), nil, nil).Return(nil, nil, nil)
mockAvsReader.EXPECT().QueryExistingRegisteredOperatorPubKeys(gomock.Any(), nil, nil, defaultLogFilterQueryBlockRange).Return(nil, nil, nil)
mockAvsRegistrySubscriber.EXPECT().SubscribeToOperatorSocketUpdates().Return(nil, mockSubscription, nil)
mockAvsReader.EXPECT().QueryExistingRegisteredOperatorSockets(gomock.Any(), nil, nil).Return(nil, nil)
mockAvsReader.EXPECT().QueryExistingRegisteredOperatorSockets(gomock.Any(), nil, nil, defaultLogFilterQueryBlockRange).Return(nil, nil)
},
queryOperatorAddr: testOperator1.operatorAddr,
wantOperatorFound: false,
Expand All @@ -73,10 +73,10 @@ func TestGetOperatorInfo(t *testing.T) {
errC := make(chan error)
mockSubscription.EXPECT().Err().AnyTimes().Return(errC)
mockAvsRegistrySubscriber.EXPECT().SubscribeToNewPubkeyRegistrations().Return(nil, mockSubscription, nil)
mockAvsReader.EXPECT().QueryExistingRegisteredOperatorPubKeys(gomock.Any(), nil, nil).
mockAvsReader.EXPECT().QueryExistingRegisteredOperatorPubKeys(gomock.Any(), nil, nil, defaultLogFilterQueryBlockRange).
Return([]common.Address{testOperator1.operatorAddr}, []types.OperatorPubkeys{testOperator1.operatorInfo.Pubkeys}, nil)
mockAvsRegistrySubscriber.EXPECT().SubscribeToOperatorSocketUpdates().Return(nil, mockSubscription, nil)
mockAvsReader.EXPECT().QueryExistingRegisteredOperatorSockets(gomock.Any(), nil, nil).
mockAvsReader.EXPECT().QueryExistingRegisteredOperatorSockets(gomock.Any(), nil, nil, defaultLogFilterQueryBlockRange).
Return(map[types.OperatorId]types.Socket{
types.OperatorIdFromG1Pubkey(testOperator1.operatorInfo.Pubkeys.G1Pubkey): testOperator1.operatorInfo.Socket,
}, nil)
Expand Down Expand Up @@ -106,10 +106,10 @@ func TestGetOperatorInfo(t *testing.T) {
operatorSocketUpdateEventC <- operatorSocketUpdateEvent
mockSubscription.EXPECT().Err().AnyTimes().Return(errC)
mockAvsRegistrySubscriber.EXPECT().SubscribeToNewPubkeyRegistrations().Return(pubkeyRegistrationEventC, mockSubscription, nil)
mockAvsReader.EXPECT().QueryExistingRegisteredOperatorPubKeys(gomock.Any(), nil, nil).
mockAvsReader.EXPECT().QueryExistingRegisteredOperatorPubKeys(gomock.Any(), nil, nil, defaultLogFilterQueryBlockRange).
Return([]common.Address{}, []types.OperatorPubkeys{}, nil)
mockAvsRegistrySubscriber.EXPECT().SubscribeToOperatorSocketUpdates().Return(operatorSocketUpdateEventC, mockSubscription, nil)
mockAvsReader.EXPECT().QueryExistingRegisteredOperatorSockets(gomock.Any(), nil, nil).Return(nil, nil)
mockAvsReader.EXPECT().QueryExistingRegisteredOperatorSockets(gomock.Any(), nil, nil, defaultLogFilterQueryBlockRange).Return(nil, nil)
},
queryOperatorAddr: testOperator1.operatorAddr,
wantOperatorFound: true,
Expand All @@ -129,7 +129,7 @@ func TestGetOperatorInfo(t *testing.T) {
tt.mocksInitializationFunc(mockAvsRegistrySubscriber, mockAvsReader, mockSubscription)
}
// Create a new instance of the operatorpubkeys service
service := NewOperatorsInfoServiceInMemory(context.Background(), mockAvsRegistrySubscriber, mockAvsReader, logger)
service := NewOperatorsInfoServiceInMemory(context.Background(), mockAvsRegistrySubscriber, mockAvsReader, nil, logger)
time.Sleep(2 * time.Second) // need to give it time to process the subscription events.. not sure if there's a better way to do this.

// Call the GetOperatorPubkeys method with the test operator address
Expand Down

0 comments on commit 9c7a75a

Please sign in to comment.