From 03f33e13f80c228cfc2e4db7aded7a116ccdc631 Mon Sep 17 00:00:00 2001 From: Madhur Shrimal Date: Tue, 6 Aug 2024 12:19:29 -0800 Subject: [PATCH 1/5] chore: deprecate earning receiver address --- Makefile | 3 +- chainio/clients/elcontracts/writer.go | 10 +- internal/fakes/eth_client.go | 9 +- metrics/collectors/economic/economic.go | 31 +- .../collectors/rpc_calls/rpc_calls_test.go | 6 +- metrics/eigenmetrics.go | 11 +- metrics/eigenmetrics_example_test.go | 12 +- metrics/eigenmetrics_test.go | 3 +- metrics/metrics.go | 3 +- nodeapi/nodeapi_example_test.go | 7 +- nodeapi/nodeapi_test.go | 27 +- services/avsregistry/avsregistry.go | 28 +- .../avsregistry/avsregistry_chaincaller.go | 47 +- .../avsregistry_chaincaller_test.go | 50 +- services/avsregistry/avsregistry_fake.go | 19 +- services/bls_aggregation/blsagg.go | 141 ++- services/bls_aggregation/blsagg_test.go | 914 ++++++++++++------ services/operatorsinfo/operatorsinfo.go | 8 +- .../operatorsinfo/operatorsinfo_inmemory.go | 190 +++- .../operatorsinfo_inmemory_test.go | 33 +- .../operatorsinfo/operatorsinfo_subgraph.go | 19 +- types/operator.go | 5 - types/operator_test.go | 20 - 23 files changed, 1154 insertions(+), 442 deletions(-) diff --git a/Makefile b/Makefile index 438a94f0..24416d0a 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,8 @@ GO_LINES_IGNORED_DIRS=contracts GO_PACKAGES=./chainio/... ./crypto/... ./logging/... \ ./types/... ./utils/... ./signer/... ./cmd/... \ - ./signerv2/... + ./signerv2/... ./aws/... ./internal/... ./metrics/... \ + ./nodeapi/... ./cmd/... ./services/... GO_FOLDERS=$(shell echo ${GO_PACKAGES} | sed -e "s/\.\///g" | sed -e "s/\/\.\.\.//g") help: @grep -E '^[a-zA-Z0-9_-]+:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' diff --git a/chainio/clients/elcontracts/writer.go b/chainio/clients/elcontracts/writer.go index fa294aeb..b8b9724d 100644 --- a/chainio/clients/elcontracts/writer.go +++ b/chainio/clients/elcontracts/writer.go @@ -162,7 +162,10 @@ func (w *ChainWriter) RegisterAsOperator(ctx context.Context, operator types.Ope w.logger.Infof("registering operator %s to EigenLayer", operator.Address) opDetails := delegationmanager.IDelegationManagerOperatorDetails{ - DeprecatedEarningsReceiver: gethcommon.HexToAddress(operator.EarningsReceiverAddress), + // Earning receiver has been deprecated but just to make sure we have something in contract + // We just force it to be operator address + // Any reward related setup is via RewardsCoordinator contract + DeprecatedEarningsReceiver: gethcommon.HexToAddress(operator.Address), StakerOptOutWindowBlocks: operator.StakerOptOutWindowBlocks, DelegationApprover: gethcommon.HexToAddress(operator.DelegationApproverAddress), } @@ -194,7 +197,10 @@ func (w *ChainWriter) UpdateOperatorDetails( w.logger.Infof("updating operator details of operator %s to EigenLayer", operator.Address) opDetails := delegationmanager.IDelegationManagerOperatorDetails{ - DeprecatedEarningsReceiver: gethcommon.HexToAddress(operator.EarningsReceiverAddress), + // Earning receiver has been deprecated but just to make sure we have something in contract + // We just force it to be operator address + // Any reward related setup is via RewardsCoordinator contract + DeprecatedEarningsReceiver: gethcommon.HexToAddress(operator.Address), DelegationApprover: gethcommon.HexToAddress(operator.DelegationApproverAddress), StakerOptOutWindowBlocks: operator.StakerOptOutWindowBlocks, } diff --git a/internal/fakes/eth_client.go b/internal/fakes/eth_client.go index 7c167443..3ce6804d 100644 --- a/internal/fakes/eth_client.go +++ b/internal/fakes/eth_client.go @@ -3,10 +3,11 @@ package fakes import ( "context" "errors" + "math/big" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" - "math/big" ) const ( @@ -90,6 +91,10 @@ func (f *EthClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]t return []types.Log{}, nil } -func (f *EthClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) { +func (f *EthClient) SubscribeFilterLogs( + ctx context.Context, + q ethereum.FilterQuery, + ch chan<- types.Log, +) (ethereum.Subscription, error) { return nil, nil } diff --git a/metrics/collectors/economic/economic.go b/metrics/collectors/economic/economic.go index 5ab8dec5..4dfb0f7f 100644 --- a/metrics/collectors/economic/economic.go +++ b/metrics/collectors/economic/economic.go @@ -44,7 +44,8 @@ type Collector struct { operatorId types.OperatorId quorumNames map[types.QuorumNum]string // metrics - // TODO(samlaf): I feel like eigenlayer-core metrics like slashingStatus and delegatedShares, which are not avs specific, + // TODO(samlaf): I feel like eigenlayer-core metrics like slashingStatus and delegatedShares, which are not avs + // specific, // should not be here, and should instead be collected by some eigenlayer-cli daemon or something, since // otherwise every avs will be exporting these same metrics for no reason. @@ -85,8 +86,9 @@ func NewCollector( avsRegistryReader: avsRegistryReader, logger: logger, operatorAddr: operatorAddr, - // we don't fetch operatorId here because operator might not yet be registered (and hence not have an operatorId) - // we cache operatorId dynamically in the collect function() inside, which allows constructing collector before registering operator + // we don't fetch operatorId here because operator might not yet be registered (and hence not have an + // operatorId) we cache operatorId dynamically in the collect function() inside, which allows constructing + // collector before registering operator operatorId: [32]byte{}, quorumNames: quorumNames, slashingStatus: prometheus.NewDesc( @@ -145,10 +147,12 @@ func (ec *Collector) initOperatorId() error { // constant metrics with the results func (ec *Collector) Collect(ch chan<- prometheus.Metric) { // collect slashingStatus metric - // TODO(samlaf): note that this call is not avs specific, so every avs will have the same value if the operator has been slashed + // TODO(samlaf): note that this call is not avs specific, so every avs will have the same value if the operator has + // been slashed // if we want instead to only output 1 if the operator has been slashed for a specific avs, we have 2 choices: // 1. keep this collector format but query the OperatorFrozen event from a subgraph - // 2. subscribe to the event and keep a local state of whether the operator has been slashed, exporting it via normal prometheus instrumentation + // 2. subscribe to the event and keep a local state of whether the operator has been slashed, exporting it via + // normal prometheus instrumentation operatorIsFrozen, err := ec.elReader.OperatorIsFrozen(nil, ec.operatorAddr) if err != nil { ec.logger.Error("Failed to get slashing incurred", "err", err) @@ -163,7 +167,11 @@ func (ec *Collector) Collect(ch chan<- prometheus.Metric) { // collect registeredStake metric err = ec.initOperatorId() if err != nil { - ec.logger.Warn("Failed to fetch and cache operator id. Skipping collection of registeredStake metric.", "err", err) + ec.logger.Warn( + "Failed to fetch and cache operator id. Skipping collection of registeredStake metric.", + "err", + err, + ) } else { // probably should start using the avsregistry service instead of avsRegistryReader so that we can // swap out backend for a subgraph eventually @@ -189,14 +197,17 @@ func (ec *Collector) Collect(ch chan<- prometheus.Metric) { // // We'll emit all 3 units in case this is needed for whatever reason // // might want to change this behavior if this is emitting too many metrics // sharesWeiFloat, _ := sharesWei.Float64() - // // TODO(samlaf): add the token name.. probably need to have a hardcoded dict per env (mainnet, goerli, etc)? Is it really that important..? - // ch <- prometheus.MustNewConstMetric(ec.delegatedShares, prometheus.GaugeValue, sharesWeiFloat, strategyAddr.String(), "wei", "token") + // // TODO(samlaf): add the token name.. probably need to have a hardcoded dict per env (mainnet, goerli, etc)? Is + // it really that important..? ch <- prometheus.MustNewConstMetric(ec.delegatedShares, prometheus.GaugeValue, + // sharesWeiFloat, strategyAddr.String(), "wei", "token") // sharesGweiFloat, _ := sharesWei.Div(sharesWei, big.NewInt(1e9)).Float64() - // ch <- prometheus.MustNewConstMetric(ec.delegatedShares, prometheus.GaugeValue, sharesGweiFloat, strategyAddr.String(), "gwei", "token") + // ch <- prometheus.MustNewConstMetric(ec.delegatedShares, prometheus.GaugeValue, sharesGweiFloat, + // strategyAddr.String(), "gwei", "token") // sharesEtherFloat, _ := sharesWei.Div(sharesWei, big.NewInt(1e18)).Float64() - // ch <- prometheus.MustNewConstMetric(ec.delegatedShares, prometheus.GaugeValue, sharesEtherFloat, strategyAddr.String(), "ether", "token") + // ch <- prometheus.MustNewConstMetric(ec.delegatedShares, prometheus.GaugeValue, sharesEtherFloat, + // strategyAddr.String(), "ether", "token") // } // } } diff --git a/metrics/collectors/rpc_calls/rpc_calls_test.go b/metrics/collectors/rpc_calls/rpc_calls_test.go index 97e84bc1..6f4d69ca 100644 --- a/metrics/collectors/rpc_calls/rpc_calls_test.go +++ b/metrics/collectors/rpc_calls/rpc_calls_test.go @@ -18,5 +18,9 @@ func TestRpcCallsCollector(t *testing.T) { // assert.Equal(t, 1.0, testutil.ToFloat64(suite.metrics.rpcRequestDurationSeconds)) rpcCallsCollector.AddRPCRequestTotal("testmethod", "testclient/testversion") - assert.Equal(t, 1.0, testutil.ToFloat64(rpcCallsCollector.rpcRequestTotal.WithLabelValues("testmethod", "testclient/testversion"))) + assert.Equal( + t, + 1.0, + testutil.ToFloat64(rpcCallsCollector.rpcRequestTotal.WithLabelValues("testmethod", "testclient/testversion")), + ) } diff --git a/metrics/eigenmetrics.go b/metrics/eigenmetrics.go index b5790153..5fc76350 100644 --- a/metrics/eigenmetrics.go +++ b/metrics/eigenmetrics.go @@ -1,4 +1,5 @@ -// Package metrics implements the avs node prometheus metrics spec: https://docs.eigenlayer.xyz/eigenlayer/avs-guides/spec/metrics/metrics-prom-spec +// Package metrics implements the avs node prometheus metrics spec: +// https://docs.eigenlayer.xyz/eigenlayer/avs-guides/spec/metrics/metrics-prom-spec package metrics import ( @@ -28,7 +29,8 @@ var _ Metrics = (*EigenMetrics)(nil) // Follows the structure from https://pkg.go.dev/github.com/prometheus/client_golang/prometheus#hdr-A_Basic_Example // TODO(samlaf): I think each avs runs in a separate docker bridge network. -// In order for prometheus to scrape the metrics does the address need to be 0.0.0.0:port to accept connections from other networks? +// In order for prometheus to scrape the metrics does the address need to be 0.0.0.0:port to accept connections from +// other networks? func NewEigenMetrics(avsName, ipPortAddress string, reg prometheus.Registerer, logger logging.Logger) *EigenMetrics { metrics := &EigenMetrics{ @@ -61,8 +63,9 @@ func (m *EigenMetrics) initMetrics() { // Performance score starts as 100, and goes down if node doesn't perform well m.performanceScore.Set(100) - // TODO(samlaf): should we initialize the feeEarnedTotal? This would require the user to pass in a list of tokens for which to initialize the metric - // same for rpcRequestDurationSeconds and rpcRequestTotal... we could initialize them to be 0 on every json-rpc... but is that really necessary? + // TODO(samlaf): should we initialize the feeEarnedTotal? This would require the user to pass in a list of tokens + // for which to initialize the metric same for rpcRequestDurationSeconds and rpcRequestTotal... we could initialize + // them to be 0 on every json-rpc... but is that really necessary? } // AddEigenFeeEarnedTotal adds the fee earned to the total fee earned metric diff --git a/metrics/eigenmetrics_example_test.go b/metrics/eigenmetrics_example_test.go index 18181764..bf55ab83 100644 --- a/metrics/eigenmetrics_example_test.go +++ b/metrics/eigenmetrics_example_test.go @@ -55,9 +55,17 @@ func ExampleEigenMetrics() { 0: "ethQuorum", 1: "someOtherTokenQuorum", } - // We must register the economic metrics separately because they are exported metrics (from jsonrpc or subgraph calls) + // We must register the economic metrics separately because they are exported metrics (from jsonrpc or subgraph + // calls) // and not instrumented metrics: see https://prometheus.io/docs/instrumenting/writing_clientlibs/#overall-structure - economicMetricsCollector := economic.NewCollector(clients.ElChainReader, clients.AvsRegistryChainReader, "exampleAvs", logger, operatorEcdsaAddr, quorumNames) + economicMetricsCollector := economic.NewCollector( + clients.ElChainReader, + clients.AvsRegistryChainReader, + "exampleAvs", + logger, + operatorEcdsaAddr, + quorumNames, + ) reg.MustRegister(economicMetricsCollector) rpcCallsCollector := rpccalls.NewCollector("exampleAvs", reg) diff --git a/metrics/eigenmetrics_test.go b/metrics/eigenmetrics_test.go index ab1acbfe..b836923a 100644 --- a/metrics/eigenmetrics_test.go +++ b/metrics/eigenmetrics_test.go @@ -53,7 +53,8 @@ func (suite *MetricsTestSuite) TestEigenMetricsServerIntegration() { assert.NoError(suite.T(), err) // We only check for "eigen_performance_score" since it's the only metric that doesn't have a label - // the other metrics have labels (they are vectors) so they don't appear in the output unless we use them once at least + // the other metrics have labels (they are vectors) so they don't appear in the output unless we use them once at + // least assert.Contains(suite.T(), string(body), "eigen_fees_earned_total") assert.Contains(suite.T(), string(body), "eigen_performance_score") } diff --git a/metrics/metrics.go b/metrics/metrics.go index 1f3bcb14..41a0d5a6 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -7,7 +7,8 @@ import ( ) // Metrics is the interface for the EigenMetrics server -// it only wraps 2 of the 6 methods required by the spec (https://docs.eigenlayer.xyz/eigenlayer/avs-guides/spec/metrics/metrics-prom-spec) +// it only wraps 2 of the 6 methods required by the spec +// (https://docs.eigenlayer.xyz/eigenlayer/avs-guides/spec/metrics/metrics-prom-spec) // the others are implemented by the economics and rpc_calls collectors, // and need to be registered with the metrics server prometheus registry type Metrics interface { diff --git a/nodeapi/nodeapi_example_test.go b/nodeapi/nodeapi_example_test.go index 050ed69d..7c9e7844 100644 --- a/nodeapi/nodeapi_example_test.go +++ b/nodeapi/nodeapi_example_test.go @@ -14,7 +14,12 @@ func ExampleNodeApi() { nodeApi := nodeapi.NewNodeApi("testAvs", "v0.0.1", "localhost:8080", logger) // register a service with the nodeApi. This could be a db, a cache, a queue, etc. // see https://docs.eigenlayer.xyz/eigenlayer/avs-guides/spec/api/#get-eigennodeservices - nodeApi.RegisterNewService("testServiceId", "testServiceName", "testServiceDescription", nodeapi.ServiceStatusInitializing) + nodeApi.RegisterNewService( + "testServiceId", + "testServiceName", + "testServiceDescription", + nodeapi.ServiceStatusInitializing, + ) // this starts the nodeApi server in a goroutine, so no need to wrap it in a go func nodeApi.Start() diff --git a/nodeapi/nodeapi_test.go b/nodeapi/nodeapi_test.go index dadb8176..fd0dcb68 100644 --- a/nodeapi/nodeapi_test.go +++ b/nodeapi/nodeapi_test.go @@ -39,7 +39,11 @@ func TestNodeHandler(t *testing.T) { assert.NoError(t, err) assert.Equal(t, http.StatusOK, res.StatusCode) - assert.Equal(t, "{\"node_name\":\"testAvs\",\"node_version\":\"v0.0.1\",\"spec_version\":\"v0.0.1\"}\n", string(data)) + assert.Equal( + t, + "{\"node_name\":\"testAvs\",\"node_version\":\"v0.0.1\",\"spec_version\":\"v0.0.1\"}\n", + string(data), + ) } func TestHealthHandler(t *testing.T) { @@ -107,7 +111,12 @@ func TestServicesHandler(t *testing.T) { "one service": { nodeApi: func() *NodeApi { nodeApi := NewNodeApi("testAvs", "v0.0.1", "localhost:8080", logger) - nodeApi.RegisterNewService("testServiceId", "testServiceName", "testServiceDescription", ServiceStatusUp) + nodeApi.RegisterNewService( + "testServiceId", + "testServiceName", + "testServiceDescription", + ServiceStatusUp, + ) return nodeApi }(), wantStatusCode: http.StatusOK, @@ -116,8 +125,18 @@ func TestServicesHandler(t *testing.T) { "two services": { nodeApi: func() *NodeApi { nodeApi := NewNodeApi("testAvs", "v0.0.1", "localhost:8080", logger) - nodeApi.RegisterNewService("testServiceId", "testServiceName", "testServiceDescription", ServiceStatusUp) - nodeApi.RegisterNewService("testServiceId2", "testServiceName2", "testServiceDescription2", ServiceStatusDown) + nodeApi.RegisterNewService( + "testServiceId", + "testServiceName", + "testServiceDescription", + ServiceStatusUp, + ) + nodeApi.RegisterNewService( + "testServiceId2", + "testServiceName2", + "testServiceDescription2", + ServiceStatusDown, + ) return nodeApi }(), wantStatusCode: http.StatusOK, diff --git a/services/avsregistry/avsregistry.go b/services/avsregistry/avsregistry.go index f77951bf..e731b37b 100644 --- a/services/avsregistry/avsregistry.go +++ b/services/avsregistry/avsregistry.go @@ -8,16 +8,32 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" ) -// AvsRegistryServicemService is a service that indexes the Avs Registry contracts and provides a way to query for operator state +// AvsRegistryServicemService is a service that indexes the Avs Registry contracts and provides a way to query for +// operator state // at certain blocks, including operatorIds, pubkeys, and staking status in each quorum. type AvsRegistryService interface { // GetOperatorsAvsState returns the state of an avs wrt to a list of quorums at a certain block. // The state includes the operatorId, pubkey, and staking amount in each quorum. - GetOperatorsAvsStateAtBlock(ctx context.Context, quorumNumbers types.QuorumNums, blockNumber types.BlockNum) (map[types.OperatorId]types.OperatorAvsState, error) + GetOperatorsAvsStateAtBlock( + ctx context.Context, + quorumNumbers types.QuorumNums, + blockNumber types.BlockNum, + ) (map[types.OperatorId]types.OperatorAvsState, error) // GetQuorumsAvsStateAtBlock returns the aggregated data for a list of quorums at a certain block. // The aggregated data includes the aggregated pubkey and total stake in each quorum. - // This information is derivable from the Operators Avs State (returned from GetOperatorsAvsStateAtBlock), but this function is provided for convenience. - GetQuorumsAvsStateAtBlock(ctx context.Context, quorumNumbers types.QuorumNums, blockNumber types.BlockNum) (map[types.QuorumNum]types.QuorumAvsState, error) - // GetCheckSignaturesIndices returns the registry indices of the nonsigner operators specified by nonSignerOperatorIds who were registered at referenceBlockNumber. - GetCheckSignaturesIndices(opts *bind.CallOpts, referenceBlockNumber types.BlockNum, quorumNumbers types.QuorumNums, nonSignerOperatorIds []types.OperatorId) (opstateretriever.OperatorStateRetrieverCheckSignaturesIndices, error) + // This information is derivable from the Operators Avs State (returned from GetOperatorsAvsStateAtBlock), but this + // function is provided for convenience. + GetQuorumsAvsStateAtBlock( + ctx context.Context, + quorumNumbers types.QuorumNums, + blockNumber types.BlockNum, + ) (map[types.QuorumNum]types.QuorumAvsState, error) + // GetCheckSignaturesIndices returns the registry indices of the nonsigner operators specified by + // nonSignerOperatorIds who were registered at referenceBlockNumber. + GetCheckSignaturesIndices( + opts *bind.CallOpts, + referenceBlockNumber types.BlockNum, + quorumNumbers types.QuorumNums, + nonSignerOperatorIds []types.OperatorId, + ) (opstateretriever.OperatorStateRetrieverCheckSignaturesIndices, error) } diff --git a/services/avsregistry/avsregistry_chaincaller.go b/services/avsregistry/avsregistry_chaincaller.go index 7b3601e3..5b2da475 100644 --- a/services/avsregistry/avsregistry_chaincaller.go +++ b/services/avsregistry/avsregistry_chaincaller.go @@ -3,9 +3,10 @@ package avsregistry import ( "context" "fmt" + "math/big" + opstateretriever "github.com/Layr-Labs/eigensdk-go/contracts/bindings/OperatorStateRetriever" "github.com/ethereum/go-ethereum/common" - "math/big" "github.com/Layr-Labs/eigensdk-go/crypto/bls" "github.com/Layr-Labs/eigensdk-go/logging" @@ -45,7 +46,11 @@ type AvsRegistryServiceChainCaller struct { var _ AvsRegistryService = (*AvsRegistryServiceChainCaller)(nil) -func NewAvsRegistryServiceChainCaller(reader avsRegistryReader, operatorInfoService opinfoservice.OperatorsInfoService, logger logging.Logger) *AvsRegistryServiceChainCaller { +func NewAvsRegistryServiceChainCaller( + reader avsRegistryReader, + operatorInfoService opinfoservice.OperatorsInfoService, + logger logging.Logger, +) *AvsRegistryServiceChainCaller { return &AvsRegistryServiceChainCaller{ avsRegistryReader: reader, operatorInfoService: operatorInfoService, @@ -53,16 +58,29 @@ func NewAvsRegistryServiceChainCaller(reader avsRegistryReader, operatorInfoServ } } -func (ar *AvsRegistryServiceChainCaller) GetOperatorsAvsStateAtBlock(ctx context.Context, quorumNumbers types.QuorumNums, blockNumber types.BlockNum) (map[types.OperatorId]types.OperatorAvsState, error) { +func (ar *AvsRegistryServiceChainCaller) GetOperatorsAvsStateAtBlock( + ctx context.Context, + quorumNumbers types.QuorumNums, + blockNumber types.BlockNum, +) (map[types.OperatorId]types.OperatorAvsState, error) { operatorsAvsState := make(map[types.OperatorId]types.OperatorAvsState) - // Get operator state for each quorum by querying BLSOperatorStateRetriever (this call is why this service implementation is called ChainCaller) - operatorsStakesInQuorums, err := ar.avsRegistryReader.GetOperatorsStakeInQuorumsAtBlock(&bind.CallOpts{Context: ctx}, quorumNumbers, blockNumber) + // Get operator state for each quorum by querying BLSOperatorStateRetriever (this call is why this service + // implementation is called ChainCaller) + operatorsStakesInQuorums, err := ar.avsRegistryReader.GetOperatorsStakeInQuorumsAtBlock( + &bind.CallOpts{Context: ctx}, + quorumNumbers, + blockNumber, + ) if err != nil { return nil, utils.WrapError("Failed to get operator state", err) } numquorums := len(quorumNumbers) if len(operatorsStakesInQuorums) != numquorums { - ar.logger.Fatal("Number of quorums returned from GetOperatorsStakeInQuorumsAtBlock does not match number of quorums requested. Probably pointing to old contract or wrong implementation.", "service", "AvsRegistryServiceChainCaller") + ar.logger.Fatal( + "Number of quorums returned from GetOperatorsStakeInQuorumsAtBlock does not match number of quorums requested. Probably pointing to old contract or wrong implementation.", + "service", + "AvsRegistryServiceChainCaller", + ) } for quorumIdx, quorumNum := range quorumNumbers { @@ -90,7 +108,11 @@ func (ar *AvsRegistryServiceChainCaller) GetOperatorsAvsStateAtBlock(ctx context return operatorsAvsState, nil } -func (ar *AvsRegistryServiceChainCaller) GetQuorumsAvsStateAtBlock(ctx context.Context, quorumNumbers types.QuorumNums, blockNumber types.BlockNum) (map[types.QuorumNum]types.QuorumAvsState, error) { +func (ar *AvsRegistryServiceChainCaller) GetQuorumsAvsStateAtBlock( + ctx context.Context, + quorumNumbers types.QuorumNums, + blockNumber types.BlockNum, +) (map[types.QuorumNum]types.QuorumAvsState, error) { operatorsAvsState, err := ar.GetOperatorsAvsStateAtBlock(ctx, quorumNumbers, blockNumber) if err != nil { return nil, utils.WrapError("Failed to get quorum state", err) @@ -116,14 +138,21 @@ func (ar *AvsRegistryServiceChainCaller) GetQuorumsAvsStateAtBlock(ctx context.C return quorumsAvsState, nil } -func (ar *AvsRegistryServiceChainCaller) getOperatorInfo(ctx context.Context, operatorId types.OperatorId) (types.OperatorInfo, error) { +func (ar *AvsRegistryServiceChainCaller) getOperatorInfo( + ctx context.Context, + operatorId types.OperatorId, +) (types.OperatorInfo, error) { operatorAddr, err := ar.avsRegistryReader.GetOperatorFromId(&bind.CallOpts{Context: ctx}, operatorId) if err != nil { return types.OperatorInfo{}, utils.WrapError("Failed to get operator address from pubkey hash", err) } info, ok := ar.operatorInfoService.GetOperatorInfo(ctx, operatorAddr) if !ok { - return types.OperatorInfo{}, fmt.Errorf("Failed to get operator info from operatorInfoService (operatorAddr: %v, operatorId: %v)", operatorAddr, operatorId) + return types.OperatorInfo{}, fmt.Errorf( + "Failed to get operator info from operatorInfoService (operatorAddr: %v, operatorId: %v)", + operatorAddr, + operatorId, + ) } return info, nil } diff --git a/services/avsregistry/avsregistry_chaincaller_test.go b/services/avsregistry/avsregistry_chaincaller_test.go index 5da31fd2..485f4d5b 100644 --- a/services/avsregistry/avsregistry_chaincaller_test.go +++ b/services/avsregistry/avsregistry_chaincaller_test.go @@ -26,7 +26,10 @@ func newFakeOperatorInfoService(operatorInfo types.OperatorInfo) *fakeOperatorIn } } -func (f *fakeOperatorInfoService) GetOperatorInfo(ctx context.Context, operator common.Address) (operatorInfo types.OperatorInfo, operatorFound bool) { +func (f *fakeOperatorInfoService) GetOperatorInfo( + ctx context.Context, + operator common.Address, +) (operatorInfo types.OperatorInfo, operatorFound bool) { return f.operatorInfo, true } @@ -38,7 +41,10 @@ func TestAvsRegistryServiceChainCaller_getOperatorPubkeys(t *testing.T) { OperatorInfo: types.OperatorInfo{ Pubkeys: types.OperatorPubkeys{ G1Pubkey: bls.NewG1Point(big.NewInt(1), big.NewInt(1)), - G2Pubkey: bls.NewG2Point([2]*big.Int{big.NewInt(1), big.NewInt(1)}, [2]*big.Int{big.NewInt(1), big.NewInt(1)}), + G2Pubkey: bls.NewG2Point( + [2]*big.Int{big.NewInt(1), big.NewInt(1)}, + [2]*big.Int{big.NewInt(1), big.NewInt(1)}, + ), }, Socket: "localhost:8080", }, @@ -76,7 +82,11 @@ func TestAvsRegistryServiceChainCaller_getOperatorPubkeys(t *testing.T) { t.Fatalf("GetOperatorPubkeys returned wrong error. Got: %v, want: %v.", gotErr, tt.wantErr) } if tt.wantErr == nil && !reflect.DeepEqual(tt.wantOperatorInfo, gotOperatorInfo) { - t.Fatalf("GetOperatorPubkeys returned wrong operator pubkeys. Got: %v, want: %v.", gotOperatorInfo, tt.wantOperatorInfo) + t.Fatalf( + "GetOperatorPubkeys returned wrong operator pubkeys. Got: %v, want: %v.", + gotOperatorInfo, + tt.wantOperatorInfo, + ) } }) } @@ -90,7 +100,10 @@ func TestAvsRegistryServiceChainCaller_GetOperatorsAvsState(t *testing.T) { OperatorInfo: types.OperatorInfo{ Pubkeys: types.OperatorPubkeys{ G1Pubkey: bls.NewG1Point(big.NewInt(1), big.NewInt(1)), - G2Pubkey: bls.NewG2Point([2]*big.Int{big.NewInt(1), big.NewInt(1)}, [2]*big.Int{big.NewInt(1), big.NewInt(1)}), + G2Pubkey: bls.NewG2Point( + [2]*big.Int{big.NewInt(1), big.NewInt(1)}, + [2]*big.Int{big.NewInt(1), big.NewInt(1)}, + ), }, Socket: "localhost:8080", }, @@ -131,12 +144,20 @@ func TestAvsRegistryServiceChainCaller_GetOperatorsAvsState(t *testing.T) { service := NewAvsRegistryServiceChainCaller(mockAvsRegistryReader, mockOperatorsInfoService, logger) // Call the GetOperatorPubkeys method with the test operator address - gotOperatorsAvsStateDict, gotErr := service.GetOperatorsAvsStateAtBlock(context.Background(), tt.queryQuorumNumbers, tt.queryBlockNum) + gotOperatorsAvsStateDict, gotErr := service.GetOperatorsAvsStateAtBlock( + context.Background(), + tt.queryQuorumNumbers, + tt.queryBlockNum, + ) if !errors.Is(gotErr, tt.wantErr) { t.Fatalf("GetOperatorsAvsState returned wrong error. Got: %v, want: %v.", gotErr, tt.wantErr) } if tt.wantErr == nil && !reflect.DeepEqual(tt.wantOperatorsAvsStateDict, gotOperatorsAvsStateDict) { - t.Fatalf("GetOperatorsAvsState returned wrong operatorsAvsStateDict. Got: %v, want: %v.", gotOperatorsAvsStateDict, tt.wantOperatorsAvsStateDict) + t.Fatalf( + "GetOperatorsAvsState returned wrong operatorsAvsStateDict. Got: %v, want: %v.", + gotOperatorsAvsStateDict, + tt.wantOperatorsAvsStateDict, + ) } }) } @@ -150,7 +171,10 @@ func TestAvsRegistryServiceChainCaller_GetQuorumsAvsState(t *testing.T) { OperatorInfo: types.OperatorInfo{ Pubkeys: types.OperatorPubkeys{ G1Pubkey: bls.NewG1Point(big.NewInt(1), big.NewInt(1)), - G2Pubkey: bls.NewG2Point([2]*big.Int{big.NewInt(1), big.NewInt(1)}, [2]*big.Int{big.NewInt(1), big.NewInt(1)}), + G2Pubkey: bls.NewG2Point( + [2]*big.Int{big.NewInt(1), big.NewInt(1)}, + [2]*big.Int{big.NewInt(1), big.NewInt(1)}, + ), }, Socket: "localhost:8080", }, @@ -191,12 +215,20 @@ func TestAvsRegistryServiceChainCaller_GetQuorumsAvsState(t *testing.T) { service := NewAvsRegistryServiceChainCaller(mockAvsRegistryReader, mockOperatorsInfoService, logger) // Call the GetOperatorPubkeys method with the test operator address - aggG1PubkeyPerQuorum, gotErr := service.GetQuorumsAvsStateAtBlock(context.Background(), tt.queryQuorumNumbers, tt.queryBlockNum) + aggG1PubkeyPerQuorum, gotErr := service.GetQuorumsAvsStateAtBlock( + context.Background(), + tt.queryQuorumNumbers, + tt.queryBlockNum, + ) if !errors.Is(gotErr, tt.wantErr) { t.Fatalf("GetOperatorsAvsState returned wrong error. Got: %v, want: %v.", gotErr, tt.wantErr) } if tt.wantErr == nil && !reflect.DeepEqual(tt.wantQuorumsAvsStateDict, aggG1PubkeyPerQuorum) { - t.Fatalf("GetOperatorsAvsState returned wrong aggG1PubkeyPerQuorum. Got: %v, want: %v.", aggG1PubkeyPerQuorum, tt.wantQuorumsAvsStateDict) + t.Fatalf( + "GetOperatorsAvsState returned wrong aggG1PubkeyPerQuorum. Got: %v, want: %v.", + aggG1PubkeyPerQuorum, + tt.wantQuorumsAvsStateDict, + ) } }) } diff --git a/services/avsregistry/avsregistry_fake.go b/services/avsregistry/avsregistry_fake.go index c913bd45..d5cf8e59 100644 --- a/services/avsregistry/avsregistry_fake.go +++ b/services/avsregistry/avsregistry_fake.go @@ -25,8 +25,11 @@ func NewFakeAvsRegistryService(blockNum types.BlockNum, operators []types.TestOp fakeAvsRegistryService.operators[blockNum][operator.OperatorId] = types.OperatorAvsState{ OperatorId: operator.OperatorId, OperatorInfo: types.OperatorInfo{ - Pubkeys: types.OperatorPubkeys{G1Pubkey: operator.BlsKeypair.GetPubKeyG1(), G2Pubkey: operator.BlsKeypair.GetPubKeyG2()}, - Socket: "localhost:8080", + Pubkeys: types.OperatorPubkeys{ + G1Pubkey: operator.BlsKeypair.GetPubKeyG1(), + G2Pubkey: operator.BlsKeypair.GetPubKeyG2(), + }, + Socket: "localhost:8080", }, StakePerQuorum: operator.StakePerQuorum, BlockNumber: blockNum, @@ -37,7 +40,11 @@ func NewFakeAvsRegistryService(blockNum types.BlockNum, operators []types.TestOp var _ AvsRegistryService = (*FakeAvsRegistryService)(nil) -func (f *FakeAvsRegistryService) GetOperatorsAvsStateAtBlock(ctx context.Context, quorumNumbers types.QuorumNums, blockNumber types.BlockNum) (map[types.OperatorId]types.OperatorAvsState, error) { +func (f *FakeAvsRegistryService) GetOperatorsAvsStateAtBlock( + ctx context.Context, + quorumNumbers types.QuorumNums, + blockNumber types.BlockNum, +) (map[types.OperatorId]types.OperatorAvsState, error) { operatorsAvsState, ok := f.operators[blockNumber] if !ok { return nil, errors.New("block number not found") @@ -45,7 +52,11 @@ func (f *FakeAvsRegistryService) GetOperatorsAvsStateAtBlock(ctx context.Context return operatorsAvsState, nil } -func (f *FakeAvsRegistryService) GetQuorumsAvsStateAtBlock(ctx context.Context, quorumNumbers types.QuorumNums, blockNumber types.BlockNum) (map[types.QuorumNum]types.QuorumAvsState, error) { +func (f *FakeAvsRegistryService) GetQuorumsAvsStateAtBlock( + ctx context.Context, + quorumNumbers types.QuorumNums, + blockNumber types.BlockNum, +) (map[types.QuorumNum]types.QuorumAvsState, error) { operatorsAvsState, ok := f.operators[blockNumber] if !ok { return nil, errors.New("block number not found") diff --git a/services/bls_aggregation/blsagg.go b/services/bls_aggregation/blsagg.go index bdab6691..db6a1850 100644 --- a/services/bls_aggregation/blsagg.go +++ b/services/bls_aggregation/blsagg.go @@ -18,7 +18,8 @@ import ( ) var ( - // TODO: refactor these errors to use a custom struct with taskIndex field instead of wrapping taskIndex in the error string directly. + // TODO: refactor these errors to use a custom struct with taskIndex field instead of wrapping taskIndex in the + // error string directly. // see https://go.dev/blog/go1.13-errors TaskInitializationErrorFn = func(err error, taskIndex types.TaskIndex) error { return fmt.Errorf("Failed to initialize task %d: %w", taskIndex, err) @@ -82,8 +83,9 @@ type aggregatedOperators struct { type BlsAggregationService interface { // InitializeNewTask should be called whenever a new task is created. ProcessNewSignature will return an error // if the task it is trying to process has not been initialized yet. - // quorumNumbers and quorumThresholdPercentages set the requirements for this task to be considered complete, which happens - // when a particular TaskResponseDigest (received via the a.taskChans[taskIndex]) has been signed by signers whose stake + // quorumNumbers and quorumThresholdPercentages set the requirements for this task to be considered complete, which + // happens when a particular TaskResponseDigest (received via the a.taskChans[taskIndex]) has been signed by signers + // whose stake // in each of the listed quorums adds up to at least quorumThresholdPercentages[i] of the total stake in that quorum InitializeNewTask( taskIndex types.TaskIndex, @@ -93,11 +95,13 @@ type BlsAggregationService interface { timeToExpiry time.Duration, ) error - // ProcessNewSignature processes a new signature over a taskResponseDigest for a particular taskIndex by a particular operator - // It verifies that the signature is correct and returns an error if it is not, and then aggregates the signature and stake of + // ProcessNewSignature processes a new signature over a taskResponseDigest for a particular taskIndex by a + // particular operator It verifies that the signature is correct and returns an error if it is not, and then + // aggregates the signature and stake of // the operator with all other signatures for the same taskIndex and taskResponseDigest pair. - // Note: This function currently only verifies signatures over the taskResponseDigest directly, so avs code needs to verify that the digest - // passed to ProcessNewSignature is indeed the digest of a valid taskResponse (that is, BlsAggregationService does not verify semantic integrity of the taskResponses) + // Note: This function currently only verifies signatures over the taskResponseDigest directly, so avs code needs to + // verify that the digest passed to ProcessNewSignature is indeed the digest of a valid taskResponse (that is, + // BlsAggregationService does not verify semantic integrity of the taskResponses) ProcessNewSignature( ctx context.Context, taskIndex types.TaskIndex, @@ -144,7 +148,11 @@ type BlsAggregatorService struct { var _ BlsAggregationService = (*BlsAggregatorService)(nil) -func NewBlsAggregatorService(avsRegistryService avsregistry.AvsRegistryService, hashFunction types.TaskResponseHashFunction, logger logging.Logger) *BlsAggregatorService { +func NewBlsAggregatorService( + avsRegistryService avsregistry.AvsRegistryService, + hashFunction types.TaskResponseHashFunction, + logger logging.Logger, +) *BlsAggregatorService { return &BlsAggregatorService{ aggregatedResponsesC: make(chan BlsAggregationServiceResponse), signedTaskRespsCs: make(map[types.TaskIndex]chan types.SignedTaskResponseDigest), @@ -161,7 +169,8 @@ func (a *BlsAggregatorService) GetResponseChannel() <-chan BlsAggregationService // InitializeNewTask creates a new task goroutine meant to process new signed task responses for that task // (that are sent via ProcessNewSignature) and adds a channel to a.taskChans to send the signed task responses to it -// quorumNumbers and quorumThresholdPercentages set the requirements for this task to be considered complete, which happens +// quorumNumbers and quorumThresholdPercentages set the requirements for this task to be considered complete, which +// happens // when a particular TaskResponseDigest (received via the a.taskChans[taskIndex]) has been signed by signers whose stake // in each of the listed quorums adds up to at least quorumThresholdPercentages[i] of the total stake in that quorum func (a *BlsAggregatorService) InitializeNewTask( @@ -171,7 +180,19 @@ func (a *BlsAggregatorService) InitializeNewTask( quorumThresholdPercentages types.QuorumThresholdPercentages, timeToExpiry time.Duration, ) error { - a.logger.Debug("AggregatorService initializing new task", "taskIndex", taskIndex, "taskCreatedBlock", taskCreatedBlock, "quorumNumbers", quorumNumbers, "quorumThresholdPercentages", quorumThresholdPercentages, "timeToExpiry", timeToExpiry) + a.logger.Debug( + "AggregatorService initializing new task", + "taskIndex", + taskIndex, + "taskCreatedBlock", + taskCreatedBlock, + "quorumNumbers", + quorumNumbers, + "quorumThresholdPercentages", + quorumThresholdPercentages, + "timeToExpiry", + timeToExpiry, + ) a.taskChansMutex.Lock() defer a.taskChansMutex.Unlock() @@ -181,7 +202,14 @@ func (a *BlsAggregatorService) InitializeNewTask( signedTaskRespsC := make(chan types.SignedTaskResponseDigest) a.signedTaskRespsCs[taskIndex] = signedTaskRespsC - go a.singleTaskAggregatorGoroutineFunc(taskIndex, taskCreatedBlock, quorumNumbers, quorumThresholdPercentages, timeToExpiry, signedTaskRespsC) + go a.singleTaskAggregatorGoroutineFunc( + taskIndex, + taskCreatedBlock, + quorumNumbers, + quorumThresholdPercentages, + timeToExpiry, + signedTaskRespsC, + ) return nil } @@ -233,18 +261,38 @@ func (a *BlsAggregatorService) singleTaskAggregatorGoroutineFunc( for i, quorumNumber := range quorumNumbers { quorumThresholdPercentagesMap[quorumNumber] = quorumThresholdPercentages[i] } - operatorsAvsStateDict, err := a.avsRegistryService.GetOperatorsAvsStateAtBlock(context.Background(), quorumNumbers, taskCreatedBlock) + operatorsAvsStateDict, err := a.avsRegistryService.GetOperatorsAvsStateAtBlock( + context.Background(), + quorumNumbers, + taskCreatedBlock, + ) if err != nil { - a.logger.Error("Task goroutine failed to get operators state from avs registry", "taskIndex", taskIndex, "err", err) + a.logger.Error( + "Task goroutine failed to get operators state from avs registry", + "taskIndex", + taskIndex, + "err", + err, + ) a.aggregatedResponsesC <- BlsAggregationServiceResponse{ Err: TaskInitializationErrorFn(fmt.Errorf("AggregatorService failed to get operators state from avs registry at blockNum %d: %w", taskCreatedBlock, err), taskIndex), TaskIndex: taskIndex, } return } - quorumsAvsStakeDict, err := a.avsRegistryService.GetQuorumsAvsStateAtBlock(context.Background(), quorumNumbers, taskCreatedBlock) + quorumsAvsStakeDict, err := a.avsRegistryService.GetQuorumsAvsStateAtBlock( + context.Background(), + quorumNumbers, + taskCreatedBlock, + ) if err != nil { - a.logger.Error("Task goroutine failed to get quorums state from avs registry", "taskIndex", taskIndex, "err", err) + a.logger.Error( + "Task goroutine failed to get quorums state from avs registry", + "taskIndex", + taskIndex, + "err", + err, + ) a.aggregatedResponsesC <- BlsAggregationServiceResponse{ Err: TaskInitializationErrorFn(fmt.Errorf("Aggregator failed to get quorums state from avs registry: %w", err), taskIndex), TaskIndex: taskIndex, @@ -268,7 +316,13 @@ func (a *BlsAggregatorService) singleTaskAggregatorGoroutineFunc( for { select { case signedTaskResponseDigest := <-signedTaskRespsC: - a.logger.Debug("Task goroutine received new signed task response digest", "taskIndex", taskIndex, "signedTaskResponseDigest", signedTaskResponseDigest) + a.logger.Debug( + "Task goroutine received new signed task response digest", + "taskIndex", + taskIndex, + "signedTaskResponseDigest", + signedTaskResponseDigest, + ) err := a.verifySignature(taskIndex, signedTaskResponseDigest, operatorsAvsStateDict) signedTaskResponseDigest.SignatureVerificationErrorC <- err @@ -280,7 +334,8 @@ func (a *BlsAggregatorService) singleTaskAggregatorGoroutineFunc( taskResponseDigest, err := a.hashFunction(signedTaskResponseDigest.TaskResponse) if err != nil { // this error should never happen, because we've already hashed the taskResponse in verifySignature, - // but keeping here in case the verifySignature implementation ever changes or some catastrophic bug happens.. + // but keeping here in case the verifySignature implementation ever changes or some catastrophic bug + // happens.. continue } // after verifying signature we aggregate its sig and pubkey, and update the signed stake amount @@ -288,11 +343,15 @@ func (a *BlsAggregatorService) singleTaskAggregatorGoroutineFunc( if !ok { // first operator to sign on this digest digestAggregatedOperators = aggregatedOperators{ - // we've already verified that the operator is part of the task's quorum, so we don't need checks here - signersApkG2: bls.NewZeroG2Point().Add(operatorsAvsStateDict[signedTaskResponseDigest.OperatorId].OperatorInfo.Pubkeys.G2Pubkey), - signersAggSigG1: signedTaskResponseDigest.BlsSignature, - signersOperatorIdsSet: map[types.OperatorId]bool{signedTaskResponseDigest.OperatorId: true}, - signersTotalStakePerQuorum: cloneStakePerQuorumMap(operatorsAvsStateDict[signedTaskResponseDigest.OperatorId].StakePerQuorum), + // we've already verified that the operator is part of the task's quorum, so we don't need checks + // here + signersApkG2: bls.NewZeroG2Point(). + Add(operatorsAvsStateDict[signedTaskResponseDigest.OperatorId].OperatorInfo.Pubkeys.G2Pubkey), + signersAggSigG1: signedTaskResponseDigest.BlsSignature, + signersOperatorIdsSet: map[types.OperatorId]bool{signedTaskResponseDigest.OperatorId: true}, + signersTotalStakePerQuorum: cloneStakePerQuorumMap( + operatorsAvsStateDict[signedTaskResponseDigest.OperatorId].StakePerQuorum, + ), } } else { digestAggregatedOperators.signersAggSigG1.Add(signedTaskResponseDigest.BlsSignature) @@ -311,7 +370,12 @@ func (a *BlsAggregatorService) singleTaskAggregatorGoroutineFunc( // because of https://github.com/golang/go/issues/3117 aggregatedOperatorsDict[taskResponseDigest] = digestAggregatedOperators - if checkIfStakeThresholdsMet(a.logger, digestAggregatedOperators.signersTotalStakePerQuorum, totalStakePerQuorum, quorumThresholdPercentagesMap) { + if checkIfStakeThresholdsMet( + a.logger, + digestAggregatedOperators.signersTotalStakePerQuorum, + totalStakePerQuorum, + quorumThresholdPercentagesMap, + ) { nonSignersOperatorIds := []types.OperatorId{} for operatorId := range operatorsAvsStateDict { if _, operatorSigned := digestAggregatedOperators.signersOperatorIdsSet[operatorId]; !operatorSigned { @@ -332,7 +396,12 @@ func (a *BlsAggregatorService) singleTaskAggregatorGoroutineFunc( nonSignersG1Pubkeys = append(nonSignersG1Pubkeys, operator.OperatorInfo.Pubkeys.G1Pubkey) } - indices, err := a.avsRegistryService.GetCheckSignaturesIndices(&bind.CallOpts{}, taskCreatedBlock, quorumNumbers, nonSignersOperatorIds) + indices, err := a.avsRegistryService.GetCheckSignaturesIndices( + &bind.CallOpts{}, + taskCreatedBlock, + quorumNumbers, + nonSignersOperatorIds, + ) if err != nil { a.aggregatedResponsesC <- BlsAggregationServiceResponse{ Err: utils.WrapError(errors.New("Failed to get check signatures indices"), err), @@ -400,15 +469,33 @@ func (a *BlsAggregatorService) verifySignature( taskResponseDigest, err := a.hashFunction(signedTaskResponseDigest.TaskResponse) if err != nil { - a.logger.Error("Failed to hash task response, skipping.", "taskIndex", taskIndex, "signedTaskResponseDigest", signedTaskResponseDigest, "err", err) + a.logger.Error( + "Failed to hash task response, skipping.", + "taskIndex", + taskIndex, + "signedTaskResponseDigest", + signedTaskResponseDigest, + "err", + err, + ) return HashFunctionError(err) } // verify that the msg actually came from the correct operator operatorG2Pubkey := operatorsAvsStateDict[signedTaskResponseDigest.OperatorId].OperatorInfo.Pubkeys.G2Pubkey if operatorG2Pubkey == nil { - a.logger.Error("Operator G2 pubkey not found", "operatorId", signedTaskResponseDigest.OperatorId, "taskId", taskIndex) - return fmt.Errorf("taskId %d: Operator G2 pubkey not found (operatorId: %x)", taskIndex, signedTaskResponseDigest.OperatorId) + a.logger.Error( + "Operator G2 pubkey not found", + "operatorId", + signedTaskResponseDigest.OperatorId, + "taskId", + taskIndex, + ) + return fmt.Errorf( + "taskId %d: Operator G2 pubkey not found (operatorId: %x)", + taskIndex, + signedTaskResponseDigest.OperatorId, + ) } a.logger.Debug("Verifying signed task response digest signature", "operatorG2Pubkey", operatorG2Pubkey, diff --git a/services/bls_aggregation/blsagg_test.go b/services/bls_aggregation/blsagg_test.go index 96bfa951..cfdc79ae 100644 --- a/services/bls_aggregation/blsagg_test.go +++ b/services/bls_aggregation/blsagg_test.go @@ -4,13 +4,14 @@ import ( "context" "crypto/sha256" "encoding/json" - "github.com/ethereum/go-ethereum/ethclient" "log/slog" "math/big" "os" "testing" "time" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/Layr-Labs/eigensdk-go/chainio/clients" "github.com/Layr-Labs/eigensdk-go/chainio/utils" "github.com/Layr-Labs/eigensdk-go/crypto/bls" @@ -80,9 +81,21 @@ func TestBlsAgg(t *testing.T) { logger := testutils.GetTestLogger() blsAggServ := NewBlsAggregatorService(fakeAvsRegistryService, hashFunction, logger) - err = blsAggServ.InitializeNewTask(taskIndex, blockNum, quorumNumbers, quorumThresholdPercentages, tasksTimeToExpiry) + err = blsAggServ.InitializeNewTask( + taskIndex, + blockNum, + quorumNumbers, + quorumThresholdPercentages, + tasksTimeToExpiry, + ) require.Nil(t, err) - err = blsAggServ.ProcessNewSignature(context.Background(), taskIndex, taskResponse, blsSig, testOperator1.OperatorId) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse, + blsSig, + testOperator1.OperatorId, + ) require.Nil(t, err) wantAggregationServiceResponse := BlsAggregationServiceResponse{ Err: nil, @@ -124,20 +137,47 @@ func TestBlsAgg(t *testing.T) { taskResponseDigest, err := hashFunction(taskResponse) require.Nil(t, err) - fakeAvsRegistryService := avsregistry.NewFakeAvsRegistryService(blockNum, []types.TestOperator{testOperator1, testOperator2, testOperator3}) + fakeAvsRegistryService := avsregistry.NewFakeAvsRegistryService( + blockNum, + []types.TestOperator{testOperator1, testOperator2, testOperator3}, + ) logger := testutils.GetTestLogger() blsAggServ := NewBlsAggregatorService(fakeAvsRegistryService, hashFunction, logger) - err = blsAggServ.InitializeNewTask(taskIndex, blockNum, quorumNumbers, quorumThresholdPercentages, tasksTimeToExpiry) + err = blsAggServ.InitializeNewTask( + taskIndex, + blockNum, + quorumNumbers, + quorumThresholdPercentages, + tasksTimeToExpiry, + ) require.Nil(t, err) blsSigOp1 := testOperator1.BlsKeypair.SignMessage(taskResponseDigest) - err = blsAggServ.ProcessNewSignature(context.Background(), taskIndex, taskResponse, blsSigOp1, testOperator1.OperatorId) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse, + blsSigOp1, + testOperator1.OperatorId, + ) require.Nil(t, err) blsSigOp2 := testOperator2.BlsKeypair.SignMessage(taskResponseDigest) - err = blsAggServ.ProcessNewSignature(context.Background(), taskIndex, taskResponse, blsSigOp2, testOperator2.OperatorId) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse, + blsSigOp2, + testOperator2.OperatorId, + ) require.Nil(t, err) blsSigOp3 := testOperator3.BlsKeypair.SignMessage(taskResponseDigest) - err = blsAggServ.ProcessNewSignature(context.Background(), taskIndex, taskResponse, blsSigOp3, testOperator3.OperatorId) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse, + blsSigOp3, + testOperator3.OperatorId, + ) require.Nil(t, err) wantAggregationServiceResponse := BlsAggregationServiceResponse{ @@ -181,17 +221,38 @@ func TestBlsAgg(t *testing.T) { taskResponseDigest, err := hashFunction(taskResponse) require.Nil(t, err) - fakeAvsRegistryService := avsregistry.NewFakeAvsRegistryService(blockNum, []types.TestOperator{testOperator1, testOperator2}) + fakeAvsRegistryService := avsregistry.NewFakeAvsRegistryService( + blockNum, + []types.TestOperator{testOperator1, testOperator2}, + ) logger := testutils.GetTestLogger() blsAggServ := NewBlsAggregatorService(fakeAvsRegistryService, hashFunction, logger) - err = blsAggServ.InitializeNewTask(taskIndex, blockNum, quorumNumbers, quorumThresholdPercentages, tasksTimeToExpiry) + err = blsAggServ.InitializeNewTask( + taskIndex, + blockNum, + quorumNumbers, + quorumThresholdPercentages, + tasksTimeToExpiry, + ) require.Nil(t, err) blsSigOp1 := testOperator1.BlsKeypair.SignMessage(taskResponseDigest) - err = blsAggServ.ProcessNewSignature(context.Background(), taskIndex, taskResponse, blsSigOp1, testOperator1.OperatorId) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse, + blsSigOp1, + testOperator1.OperatorId, + ) require.Nil(t, err) blsSigOp2 := testOperator2.BlsKeypair.SignMessage(taskResponseDigest) - err = blsAggServ.ProcessNewSignature(context.Background(), taskIndex, taskResponse, blsSigOp2, testOperator2.OperatorId) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse, + blsSigOp2, + testOperator2.OperatorId, + ) require.Nil(t, err) wantAggregationServiceResponse := BlsAggregationServiceResponse{ @@ -201,11 +262,16 @@ func TestBlsAgg(t *testing.T) { TaskResponseDigest: taskResponseDigest, NonSignersPubkeysG1: []*bls.G1Point{}, QuorumApksG1: []*bls.G1Point{ - bls.NewZeroG1Point().Add(testOperator1.BlsKeypair.GetPubKeyG1()).Add(testOperator2.BlsKeypair.GetPubKeyG1()), - bls.NewZeroG1Point().Add(testOperator1.BlsKeypair.GetPubKeyG1()).Add(testOperator2.BlsKeypair.GetPubKeyG1()), + bls.NewZeroG1Point(). + Add(testOperator1.BlsKeypair.GetPubKeyG1()). + Add(testOperator2.BlsKeypair.GetPubKeyG1()), + bls.NewZeroG1Point(). + Add(testOperator1.BlsKeypair.GetPubKeyG1()). + Add(testOperator2.BlsKeypair.GetPubKeyG1()), }, - SignersApkG2: testOperator1.BlsKeypair.GetPubKeyG2().Add(testOperator2.BlsKeypair.GetPubKeyG2()), - SignersAggSigG1: testOperator1.BlsKeypair.SignMessage(taskResponseDigest).Add(testOperator2.BlsKeypair.SignMessage(taskResponseDigest)), + SignersApkG2: testOperator1.BlsKeypair.GetPubKeyG2().Add(testOperator2.BlsKeypair.GetPubKeyG2()), + SignersAggSigG1: testOperator1.BlsKeypair.SignMessage(taskResponseDigest). + Add(testOperator2.BlsKeypair.SignMessage(taskResponseDigest)), } gotAggregationServiceResponse := <-blsAggServ.aggregatedResponsesC require.EqualValues(t, wantAggregationServiceResponse, gotAggregationServiceResponse) @@ -226,7 +292,10 @@ func TestBlsAgg(t *testing.T) { quorumThresholdPercentages := []types.QuorumThresholdPercentage{100, 100} blockNum := uint32(1) - fakeAvsRegistryService := avsregistry.NewFakeAvsRegistryService(blockNum, []types.TestOperator{testOperator1, testOperator2}) + fakeAvsRegistryService := avsregistry.NewFakeAvsRegistryService( + blockNum, + []types.TestOperator{testOperator1, testOperator2}, + ) logger := testutils.GetTestLogger() blsAggServ := NewBlsAggregatorService(fakeAvsRegistryService, hashFunction, logger) @@ -235,26 +304,62 @@ func TestBlsAgg(t *testing.T) { task1Response := mockTaskResponse{123} task1ResponseDigest, err := hashFunction(task1Response) require.Nil(t, err) - err = blsAggServ.InitializeNewTask(task1Index, blockNum, quorumNumbers, quorumThresholdPercentages, tasksTimeToExpiry) + err = blsAggServ.InitializeNewTask( + task1Index, + blockNum, + quorumNumbers, + quorumThresholdPercentages, + tasksTimeToExpiry, + ) require.Nil(t, err) task2Index := types.TaskIndex(2) task2Response := mockTaskResponse{234} task2ResponseDigest, err := hashFunction(task2Response) require.Nil(t, err) - err = blsAggServ.InitializeNewTask(task2Index, blockNum, quorumNumbers, quorumThresholdPercentages, tasksTimeToExpiry) + err = blsAggServ.InitializeNewTask( + task2Index, + blockNum, + quorumNumbers, + quorumThresholdPercentages, + tasksTimeToExpiry, + ) require.Nil(t, err) blsSigTask1Op1 := testOperator1.BlsKeypair.SignMessage(task1ResponseDigest) - err = blsAggServ.ProcessNewSignature(context.Background(), task1Index, task1Response, blsSigTask1Op1, testOperator1.OperatorId) + err = blsAggServ.ProcessNewSignature( + context.Background(), + task1Index, + task1Response, + blsSigTask1Op1, + testOperator1.OperatorId, + ) require.Nil(t, err) blsSigTask2Op1 := testOperator1.BlsKeypair.SignMessage(task2ResponseDigest) - err = blsAggServ.ProcessNewSignature(context.Background(), task2Index, task2Response, blsSigTask2Op1, testOperator1.OperatorId) + err = blsAggServ.ProcessNewSignature( + context.Background(), + task2Index, + task2Response, + blsSigTask2Op1, + testOperator1.OperatorId, + ) require.Nil(t, err) blsSigTask1Op2 := testOperator2.BlsKeypair.SignMessage(task1ResponseDigest) - err = blsAggServ.ProcessNewSignature(context.Background(), task1Index, task1Response, blsSigTask1Op2, testOperator2.OperatorId) + err = blsAggServ.ProcessNewSignature( + context.Background(), + task1Index, + task1Response, + blsSigTask1Op2, + testOperator2.OperatorId, + ) require.Nil(t, err) blsSigTask2Op2 := testOperator2.BlsKeypair.SignMessage(task2ResponseDigest) - err = blsAggServ.ProcessNewSignature(context.Background(), task2Index, task2Response, blsSigTask2Op2, testOperator2.OperatorId) + err = blsAggServ.ProcessNewSignature( + context.Background(), + task2Index, + task2Response, + blsSigTask2Op2, + testOperator2.OperatorId, + ) require.Nil(t, err) wantAggregationServiceResponseTask1 := BlsAggregationServiceResponse{ @@ -264,11 +369,17 @@ func TestBlsAgg(t *testing.T) { TaskResponseDigest: task1ResponseDigest, NonSignersPubkeysG1: []*bls.G1Point{}, QuorumApksG1: []*bls.G1Point{ - bls.NewZeroG1Point().Add(testOperator1.BlsKeypair.GetPubKeyG1()).Add(testOperator2.BlsKeypair.GetPubKeyG1()), - bls.NewZeroG1Point().Add(testOperator1.BlsKeypair.GetPubKeyG1()).Add(testOperator2.BlsKeypair.GetPubKeyG1()), + bls.NewZeroG1Point(). + Add(testOperator1.BlsKeypair.GetPubKeyG1()). + Add(testOperator2.BlsKeypair.GetPubKeyG1()), + bls.NewZeroG1Point(). + Add(testOperator1.BlsKeypair.GetPubKeyG1()). + Add(testOperator2.BlsKeypair.GetPubKeyG1()), }, - SignersApkG2: bls.NewZeroG2Point().Add(testOperator1.BlsKeypair.GetPubKeyG2().Add(testOperator2.BlsKeypair.GetPubKeyG2())), - SignersAggSigG1: testOperator1.BlsKeypair.SignMessage(task1ResponseDigest).Add(testOperator2.BlsKeypair.SignMessage(task1ResponseDigest)), + SignersApkG2: bls.NewZeroG2Point(). + Add(testOperator1.BlsKeypair.GetPubKeyG2().Add(testOperator2.BlsKeypair.GetPubKeyG2())), + SignersAggSigG1: testOperator1.BlsKeypair.SignMessage(task1ResponseDigest). + Add(testOperator2.BlsKeypair.SignMessage(task1ResponseDigest)), } wantAggregationServiceResponseTask2 := BlsAggregationServiceResponse{ Err: nil, @@ -277,11 +388,16 @@ func TestBlsAgg(t *testing.T) { TaskResponseDigest: task2ResponseDigest, NonSignersPubkeysG1: []*bls.G1Point{}, QuorumApksG1: []*bls.G1Point{ - bls.NewZeroG1Point().Add(testOperator1.BlsKeypair.GetPubKeyG1()).Add(testOperator2.BlsKeypair.GetPubKeyG1()), - bls.NewZeroG1Point().Add(testOperator1.BlsKeypair.GetPubKeyG1()).Add(testOperator2.BlsKeypair.GetPubKeyG1()), + bls.NewZeroG1Point(). + Add(testOperator1.BlsKeypair.GetPubKeyG1()). + Add(testOperator2.BlsKeypair.GetPubKeyG1()), + bls.NewZeroG1Point(). + Add(testOperator1.BlsKeypair.GetPubKeyG1()). + Add(testOperator2.BlsKeypair.GetPubKeyG1()), }, - SignersApkG2: testOperator1.BlsKeypair.GetPubKeyG2().Add(testOperator2.BlsKeypair.GetPubKeyG2()), - SignersAggSigG1: testOperator1.BlsKeypair.SignMessage(task2ResponseDigest).Add(testOperator2.BlsKeypair.SignMessage(task2ResponseDigest)), + SignersApkG2: testOperator1.BlsKeypair.GetPubKeyG2().Add(testOperator2.BlsKeypair.GetPubKeyG2()), + SignersAggSigG1: testOperator1.BlsKeypair.SignMessage(task2ResponseDigest). + Add(testOperator2.BlsKeypair.SignMessage(task2ResponseDigest)), } // we don't know which of task1 or task2 responses will be received first @@ -312,7 +428,13 @@ func TestBlsAgg(t *testing.T) { logger := testutils.GetTestLogger() blsAggServ := NewBlsAggregatorService(fakeAvsRegistryService, hashFunction, logger) - err := blsAggServ.InitializeNewTask(taskIndex, blockNum, quorumNumbers, quorumThresholdPercentages, tasksTimeToExpiry) + err := blsAggServ.InitializeNewTask( + taskIndex, + blockNum, + quorumNumbers, + quorumThresholdPercentages, + tasksTimeToExpiry, + ) require.Nil(t, err) wantAggregationServiceResponse := BlsAggregationServiceResponse{ Err: TaskExpiredErrorFn(taskIndex), @@ -341,13 +463,28 @@ func TestBlsAgg(t *testing.T) { blsSig := testOperator1.BlsKeypair.SignMessage(taskResponseDigest) blockNum := uint32(1) - fakeAvsRegistryService := avsregistry.NewFakeAvsRegistryService(blockNum, []types.TestOperator{testOperator1, testOperator2}) + fakeAvsRegistryService := avsregistry.NewFakeAvsRegistryService( + blockNum, + []types.TestOperator{testOperator1, testOperator2}, + ) logger := testutils.GetTestLogger() blsAggServ := NewBlsAggregatorService(fakeAvsRegistryService, hashFunction, logger) - err = blsAggServ.InitializeNewTask(taskIndex, blockNum, quorumNumbers, quorumThresholdPercentages, tasksTimeToExpiry) + err = blsAggServ.InitializeNewTask( + taskIndex, + blockNum, + quorumNumbers, + quorumThresholdPercentages, + tasksTimeToExpiry, + ) require.Nil(t, err) - err = blsAggServ.ProcessNewSignature(context.Background(), taskIndex, taskResponse, blsSig, testOperator1.OperatorId) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse, + blsSig, + testOperator1.OperatorId, + ) require.Nil(t, err) wantAggregationServiceResponse := BlsAggregationServiceResponse{ Err: nil, @@ -355,9 +492,11 @@ func TestBlsAgg(t *testing.T) { TaskResponse: taskResponse, TaskResponseDigest: taskResponseDigest, NonSignersPubkeysG1: []*bls.G1Point{testOperator2.BlsKeypair.GetPubKeyG1()}, - QuorumApksG1: []*bls.G1Point{testOperator1.BlsKeypair.GetPubKeyG1().Add(testOperator2.BlsKeypair.GetPubKeyG1())}, - SignersApkG2: testOperator1.BlsKeypair.GetPubKeyG2(), - SignersAggSigG1: testOperator1.BlsKeypair.SignMessage(taskResponseDigest), + QuorumApksG1: []*bls.G1Point{ + testOperator1.BlsKeypair.GetPubKeyG1().Add(testOperator2.BlsKeypair.GetPubKeyG1()), + }, + SignersApkG2: testOperator1.BlsKeypair.GetPubKeyG2(), + SignersAggSigG1: testOperator1.BlsKeypair.SignMessage(taskResponseDigest), } gotAggregationServiceResponse := <-blsAggServ.aggregatedResponsesC require.Equal(t, wantAggregationServiceResponse, gotAggregationServiceResponse) @@ -383,13 +522,28 @@ func TestBlsAgg(t *testing.T) { require.Nil(t, err) blsSig := testOperator1.BlsKeypair.SignMessage(taskResponseDigest) - fakeAvsRegistryService := avsregistry.NewFakeAvsRegistryService(blockNum, []types.TestOperator{testOperator1, testOperator2}) + fakeAvsRegistryService := avsregistry.NewFakeAvsRegistryService( + blockNum, + []types.TestOperator{testOperator1, testOperator2}, + ) logger := testutils.GetTestLogger() blsAggServ := NewBlsAggregatorService(fakeAvsRegistryService, hashFunction, logger) - err = blsAggServ.InitializeNewTask(taskIndex, blockNum, quorumNumbers, quorumThresholdPercentages, tasksTimeToExpiry) + err = blsAggServ.InitializeNewTask( + taskIndex, + blockNum, + quorumNumbers, + quorumThresholdPercentages, + tasksTimeToExpiry, + ) require.Nil(t, err) - err = blsAggServ.ProcessNewSignature(context.Background(), taskIndex, taskResponse, blsSig, testOperator1.OperatorId) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse, + blsSig, + testOperator1.OperatorId, + ) require.Nil(t, err) wantAggregationServiceResponse := BlsAggregationServiceResponse{ Err: TaskExpiredErrorFn(taskIndex), @@ -419,17 +573,38 @@ func TestBlsAgg(t *testing.T) { require.Nil(t, err) blockNum := uint32(1) - fakeAvsRegistryService := avsregistry.NewFakeAvsRegistryService(blockNum, []types.TestOperator{testOperator1, testOperator2}) + fakeAvsRegistryService := avsregistry.NewFakeAvsRegistryService( + blockNum, + []types.TestOperator{testOperator1, testOperator2}, + ) logger := testutils.GetTestLogger() blsAggServ := NewBlsAggregatorService(fakeAvsRegistryService, hashFunction, logger) - err = blsAggServ.InitializeNewTask(taskIndex, blockNum, quorumNumbers, quorumThresholdPercentages, tasksTimeToExpiry) + err = blsAggServ.InitializeNewTask( + taskIndex, + blockNum, + quorumNumbers, + quorumThresholdPercentages, + tasksTimeToExpiry, + ) require.Nil(t, err) blsSigOp1 := testOperator1.BlsKeypair.SignMessage(taskResponseDigest) - err = blsAggServ.ProcessNewSignature(context.Background(), taskIndex, taskResponse, blsSigOp1, testOperator1.OperatorId) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse, + blsSigOp1, + testOperator1.OperatorId, + ) require.Nil(t, err) blsSigOp2 := testOperator2.BlsKeypair.SignMessage(taskResponseDigest) - err = blsAggServ.ProcessNewSignature(context.Background(), taskIndex, taskResponse, blsSigOp2, testOperator2.OperatorId) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse, + blsSigOp2, + testOperator2.OperatorId, + ) require.Nil(t, err) wantAggregationServiceResponse := BlsAggregationServiceResponse{ @@ -442,117 +617,171 @@ func TestBlsAgg(t *testing.T) { bls.NewZeroG1Point().Add(testOperator1.BlsKeypair.GetPubKeyG1()), bls.NewZeroG1Point().Add(testOperator2.BlsKeypair.GetPubKeyG1()), }, - SignersApkG2: testOperator1.BlsKeypair.GetPubKeyG2().Add(testOperator2.BlsKeypair.GetPubKeyG2()), - SignersAggSigG1: testOperator1.BlsKeypair.SignMessage(taskResponseDigest).Add(testOperator2.BlsKeypair.SignMessage(taskResponseDigest)), - } - gotAggregationServiceResponse := <-blsAggServ.aggregatedResponsesC - require.EqualValues(t, wantAggregationServiceResponse, gotAggregationServiceResponse) - }) - - t.Run("2 quorums 3 operators which just stake one quorum; 2 correct signature quorumThreshold 50% - verified", func(t *testing.T) { - testOperator1 := types.TestOperator{ - OperatorId: types.OperatorId{1}, - // Note the quorums is {0, 1}, but operator id 1 just stake 0. - StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(100)}, - BlsKeypair: newBlsKeyPairPanics("0x1"), - } - testOperator2 := types.TestOperator{ - OperatorId: types.OperatorId{2}, - // Note the quorums is {0, 1}, but operator id 2 just stake 1. - StakePerQuorum: map[types.QuorumNum]types.StakeAmount{1: big.NewInt(200)}, - BlsKeypair: newBlsKeyPairPanics("0x2"), - } - testOperator3 := types.TestOperator{ - OperatorId: types.OperatorId{3}, - StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(100), 1: big.NewInt(200)}, - BlsKeypair: newBlsKeyPairPanics("0x3"), - } - taskIndex := types.TaskIndex(0) - quorumNumbers := types.QuorumNums{0, 1} - quorumThresholdPercentages := []types.QuorumThresholdPercentage{50, 50} - taskResponse := mockTaskResponse{123} - taskResponseDigest, err := hashFunction(taskResponse) - require.Nil(t, err) - blockNum := uint32(1) - - fakeAvsRegistryService := avsregistry.NewFakeAvsRegistryService(blockNum, []types.TestOperator{testOperator1, testOperator2, testOperator3}) - logger := testutils.GetTestLogger() - blsAggServ := NewBlsAggregatorService(fakeAvsRegistryService, hashFunction, logger) - - err = blsAggServ.InitializeNewTask(taskIndex, blockNum, quorumNumbers, quorumThresholdPercentages, tasksTimeToExpiry) - require.Nil(t, err) - blsSigOp1 := testOperator1.BlsKeypair.SignMessage(taskResponseDigest) - err = blsAggServ.ProcessNewSignature(context.Background(), taskIndex, taskResponse, blsSigOp1, testOperator1.OperatorId) - require.Nil(t, err) - blsSigOp2 := testOperator2.BlsKeypair.SignMessage(taskResponseDigest) - err = blsAggServ.ProcessNewSignature(context.Background(), taskIndex, taskResponse, blsSigOp2, testOperator2.OperatorId) - require.Nil(t, err) - - wantAggregationServiceResponse := BlsAggregationServiceResponse{ - Err: nil, - TaskIndex: taskIndex, - TaskResponse: taskResponse, - TaskResponseDigest: taskResponseDigest, - NonSignersPubkeysG1: []*bls.G1Point{ - testOperator3.BlsKeypair.GetPubKeyG1(), - }, - QuorumApksG1: []*bls.G1Point{ - bls.NewZeroG1Point().Add(testOperator1.BlsKeypair.GetPubKeyG1()).Add(testOperator3.BlsKeypair.GetPubKeyG1()), - bls.NewZeroG1Point().Add(testOperator2.BlsKeypair.GetPubKeyG1()).Add(testOperator3.BlsKeypair.GetPubKeyG1()), - }, - SignersApkG2: testOperator1.BlsKeypair.GetPubKeyG2().Add(testOperator2.BlsKeypair.GetPubKeyG2()), - SignersAggSigG1: testOperator1.BlsKeypair.SignMessage(taskResponseDigest).Add(testOperator2.BlsKeypair.SignMessage(taskResponseDigest)), + SignersApkG2: testOperator1.BlsKeypair.GetPubKeyG2().Add(testOperator2.BlsKeypair.GetPubKeyG2()), + SignersAggSigG1: testOperator1.BlsKeypair.SignMessage(taskResponseDigest). + Add(testOperator2.BlsKeypair.SignMessage(taskResponseDigest)), } gotAggregationServiceResponse := <-blsAggServ.aggregatedResponsesC require.EqualValues(t, wantAggregationServiceResponse, gotAggregationServiceResponse) }) - t.Run("2 quorums 3 operators which just stake one quorum; 2 correct signature quorumThreshold 60% - task expired", func(t *testing.T) { - testOperator1 := types.TestOperator{ - OperatorId: types.OperatorId{1}, - // Note the quorums is {0, 1}, but operator id 1 just stake 0. - StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(100)}, - BlsKeypair: newBlsKeyPairPanics("0x1"), - } - testOperator2 := types.TestOperator{ - OperatorId: types.OperatorId{2}, - // Note the quorums is {0, 1}, but operator id 2 just stake 1. - StakePerQuorum: map[types.QuorumNum]types.StakeAmount{1: big.NewInt(200)}, - BlsKeypair: newBlsKeyPairPanics("0x2"), - } - testOperator3 := types.TestOperator{ - OperatorId: types.OperatorId{3}, - StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(100), 1: big.NewInt(200)}, - BlsKeypair: newBlsKeyPairPanics("0x3"), - } - taskIndex := types.TaskIndex(0) - quorumNumbers := types.QuorumNums{0, 1} - quorumThresholdPercentages := []types.QuorumThresholdPercentage{60, 60} - taskResponse := mockTaskResponse{123} - taskResponseDigest, err := hashFunction(taskResponse) - require.Nil(t, err) - blockNum := uint32(1) - - fakeAvsRegistryService := avsregistry.NewFakeAvsRegistryService(blockNum, []types.TestOperator{testOperator1, testOperator2, testOperator3}) - logger := testutils.GetTestLogger() - blsAggServ := NewBlsAggregatorService(fakeAvsRegistryService, hashFunction, logger) - - err = blsAggServ.InitializeNewTask(taskIndex, blockNum, quorumNumbers, quorumThresholdPercentages, tasksTimeToExpiry) - require.Nil(t, err) - blsSigOp1 := testOperator1.BlsKeypair.SignMessage(taskResponseDigest) - err = blsAggServ.ProcessNewSignature(context.Background(), taskIndex, taskResponse, blsSigOp1, testOperator1.OperatorId) - require.Nil(t, err) - blsSigOp2 := testOperator2.BlsKeypair.SignMessage(taskResponseDigest) - err = blsAggServ.ProcessNewSignature(context.Background(), taskIndex, taskResponse, blsSigOp2, testOperator2.OperatorId) - require.Nil(t, err) - - wantAggregationServiceResponse := BlsAggregationServiceResponse{ - Err: TaskExpiredErrorFn(taskIndex), - } - gotAggregationServiceResponse := <-blsAggServ.aggregatedResponsesC - require.EqualValues(t, wantAggregationServiceResponse, gotAggregationServiceResponse) - require.EqualValues(t, taskIndex, gotAggregationServiceResponse.TaskIndex) - }) + t.Run( + "2 quorums 3 operators which just stake one quorum; 2 correct signature quorumThreshold 50% - verified", + func(t *testing.T) { + testOperator1 := types.TestOperator{ + OperatorId: types.OperatorId{1}, + // Note the quorums is {0, 1}, but operator id 1 just stake 0. + StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(100)}, + BlsKeypair: newBlsKeyPairPanics("0x1"), + } + testOperator2 := types.TestOperator{ + OperatorId: types.OperatorId{2}, + // Note the quorums is {0, 1}, but operator id 2 just stake 1. + StakePerQuorum: map[types.QuorumNum]types.StakeAmount{1: big.NewInt(200)}, + BlsKeypair: newBlsKeyPairPanics("0x2"), + } + testOperator3 := types.TestOperator{ + OperatorId: types.OperatorId{3}, + StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(100), 1: big.NewInt(200)}, + BlsKeypair: newBlsKeyPairPanics("0x3"), + } + taskIndex := types.TaskIndex(0) + quorumNumbers := types.QuorumNums{0, 1} + quorumThresholdPercentages := []types.QuorumThresholdPercentage{50, 50} + taskResponse := mockTaskResponse{123} + taskResponseDigest, err := hashFunction(taskResponse) + require.Nil(t, err) + blockNum := uint32(1) + + fakeAvsRegistryService := avsregistry.NewFakeAvsRegistryService( + blockNum, + []types.TestOperator{testOperator1, testOperator2, testOperator3}, + ) + logger := testutils.GetTestLogger() + blsAggServ := NewBlsAggregatorService(fakeAvsRegistryService, hashFunction, logger) + + err = blsAggServ.InitializeNewTask( + taskIndex, + blockNum, + quorumNumbers, + quorumThresholdPercentages, + tasksTimeToExpiry, + ) + require.Nil(t, err) + blsSigOp1 := testOperator1.BlsKeypair.SignMessage(taskResponseDigest) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse, + blsSigOp1, + testOperator1.OperatorId, + ) + require.Nil(t, err) + blsSigOp2 := testOperator2.BlsKeypair.SignMessage(taskResponseDigest) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse, + blsSigOp2, + testOperator2.OperatorId, + ) + require.Nil(t, err) + + wantAggregationServiceResponse := BlsAggregationServiceResponse{ + Err: nil, + TaskIndex: taskIndex, + TaskResponse: taskResponse, + TaskResponseDigest: taskResponseDigest, + NonSignersPubkeysG1: []*bls.G1Point{ + testOperator3.BlsKeypair.GetPubKeyG1(), + }, + QuorumApksG1: []*bls.G1Point{ + bls.NewZeroG1Point(). + Add(testOperator1.BlsKeypair.GetPubKeyG1()). + Add(testOperator3.BlsKeypair.GetPubKeyG1()), + bls.NewZeroG1Point(). + Add(testOperator2.BlsKeypair.GetPubKeyG1()). + Add(testOperator3.BlsKeypair.GetPubKeyG1()), + }, + SignersApkG2: testOperator1.BlsKeypair.GetPubKeyG2().Add(testOperator2.BlsKeypair.GetPubKeyG2()), + SignersAggSigG1: testOperator1.BlsKeypair.SignMessage(taskResponseDigest). + Add(testOperator2.BlsKeypair.SignMessage(taskResponseDigest)), + } + gotAggregationServiceResponse := <-blsAggServ.aggregatedResponsesC + require.EqualValues(t, wantAggregationServiceResponse, gotAggregationServiceResponse) + }, + ) + + t.Run( + "2 quorums 3 operators which just stake one quorum; 2 correct signature quorumThreshold 60% - task expired", + func(t *testing.T) { + testOperator1 := types.TestOperator{ + OperatorId: types.OperatorId{1}, + // Note the quorums is {0, 1}, but operator id 1 just stake 0. + StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(100)}, + BlsKeypair: newBlsKeyPairPanics("0x1"), + } + testOperator2 := types.TestOperator{ + OperatorId: types.OperatorId{2}, + // Note the quorums is {0, 1}, but operator id 2 just stake 1. + StakePerQuorum: map[types.QuorumNum]types.StakeAmount{1: big.NewInt(200)}, + BlsKeypair: newBlsKeyPairPanics("0x2"), + } + testOperator3 := types.TestOperator{ + OperatorId: types.OperatorId{3}, + StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(100), 1: big.NewInt(200)}, + BlsKeypair: newBlsKeyPairPanics("0x3"), + } + taskIndex := types.TaskIndex(0) + quorumNumbers := types.QuorumNums{0, 1} + quorumThresholdPercentages := []types.QuorumThresholdPercentage{60, 60} + taskResponse := mockTaskResponse{123} + taskResponseDigest, err := hashFunction(taskResponse) + require.Nil(t, err) + blockNum := uint32(1) + + fakeAvsRegistryService := avsregistry.NewFakeAvsRegistryService( + blockNum, + []types.TestOperator{testOperator1, testOperator2, testOperator3}, + ) + logger := testutils.GetTestLogger() + blsAggServ := NewBlsAggregatorService(fakeAvsRegistryService, hashFunction, logger) + + err = blsAggServ.InitializeNewTask( + taskIndex, + blockNum, + quorumNumbers, + quorumThresholdPercentages, + tasksTimeToExpiry, + ) + require.Nil(t, err) + blsSigOp1 := testOperator1.BlsKeypair.SignMessage(taskResponseDigest) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse, + blsSigOp1, + testOperator1.OperatorId, + ) + require.Nil(t, err) + blsSigOp2 := testOperator2.BlsKeypair.SignMessage(taskResponseDigest) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse, + blsSigOp2, + testOperator2.OperatorId, + ) + require.Nil(t, err) + + wantAggregationServiceResponse := BlsAggregationServiceResponse{ + Err: TaskExpiredErrorFn(taskIndex), + } + gotAggregationServiceResponse := <-blsAggServ.aggregatedResponsesC + require.EqualValues(t, wantAggregationServiceResponse, gotAggregationServiceResponse) + require.EqualValues(t, taskIndex, gotAggregationServiceResponse.TaskIndex) + }, + ) t.Run("2 quorums 1 operators which just stake one quorum; 1 signatures - task expired", func(t *testing.T) { testOperator1 := types.TestOperator{ @@ -573,10 +802,22 @@ func TestBlsAgg(t *testing.T) { logger := testutils.GetTestLogger() blsAggServ := NewBlsAggregatorService(fakeAvsRegistryService, hashFunction, logger) - err = blsAggServ.InitializeNewTask(taskIndex, blockNum, quorumNumbers, quorumThresholdPercentages, tasksTimeToExpiry) + err = blsAggServ.InitializeNewTask( + taskIndex, + blockNum, + quorumNumbers, + quorumThresholdPercentages, + tasksTimeToExpiry, + ) require.Nil(t, err) blsSigOp1 := testOperator1.BlsKeypair.SignMessage(taskResponseDigest) - err = blsAggServ.ProcessNewSignature(context.Background(), taskIndex, taskResponse, blsSigOp1, testOperator1.OperatorId) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse, + blsSigOp1, + testOperator1.OperatorId, + ) require.Nil(t, err) wantAggregationServiceResponse := BlsAggregationServiceResponse{ @@ -587,43 +828,61 @@ func TestBlsAgg(t *testing.T) { require.EqualValues(t, taskIndex, gotAggregationServiceResponse.TaskIndex) }) - t.Run("2 quorums 2 operators, 1 operator which just stake one quorum; 1 signatures - task expired", func(t *testing.T) { - testOperator1 := types.TestOperator{ - OperatorId: types.OperatorId{1}, - // Note the quorums is {0, 1}, but operator id 1 just stake 0. - StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(100)}, - BlsKeypair: newBlsKeyPairPanics("0x1"), - } - testOperator2 := types.TestOperator{ - OperatorId: types.OperatorId{2}, - StakePerQuorum: map[types.QuorumNum]types.StakeAmount{1: big.NewInt(200)}, - BlsKeypair: newBlsKeyPairPanics("0x2"), - } - taskIndex := types.TaskIndex(0) - quorumNumbers := types.QuorumNums{0, 1} - quorumThresholdPercentages := []types.QuorumThresholdPercentage{100, 100} - taskResponse := mockTaskResponse{123} - taskResponseDigest, err := hashFunction(taskResponse) - require.Nil(t, err) - blockNum := uint32(1) - - fakeAvsRegistryService := avsregistry.NewFakeAvsRegistryService(blockNum, []types.TestOperator{testOperator1, testOperator2}) - logger := testutils.GetTestLogger() - blsAggServ := NewBlsAggregatorService(fakeAvsRegistryService, hashFunction, logger) - - err = blsAggServ.InitializeNewTask(taskIndex, blockNum, quorumNumbers, quorumThresholdPercentages, tasksTimeToExpiry) - require.Nil(t, err) - blsSigOp1 := testOperator1.BlsKeypair.SignMessage(taskResponseDigest) - err = blsAggServ.ProcessNewSignature(context.Background(), taskIndex, taskResponse, blsSigOp1, testOperator1.OperatorId) - require.Nil(t, err) - - wantAggregationServiceResponse := BlsAggregationServiceResponse{ - Err: TaskExpiredErrorFn(taskIndex), - } - gotAggregationServiceResponse := <-blsAggServ.aggregatedResponsesC - require.EqualValues(t, wantAggregationServiceResponse, gotAggregationServiceResponse) - require.EqualValues(t, taskIndex, gotAggregationServiceResponse.TaskIndex) - }) + t.Run( + "2 quorums 2 operators, 1 operator which just stake one quorum; 1 signatures - task expired", + func(t *testing.T) { + testOperator1 := types.TestOperator{ + OperatorId: types.OperatorId{1}, + // Note the quorums is {0, 1}, but operator id 1 just stake 0. + StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(100)}, + BlsKeypair: newBlsKeyPairPanics("0x1"), + } + testOperator2 := types.TestOperator{ + OperatorId: types.OperatorId{2}, + StakePerQuorum: map[types.QuorumNum]types.StakeAmount{1: big.NewInt(200)}, + BlsKeypair: newBlsKeyPairPanics("0x2"), + } + taskIndex := types.TaskIndex(0) + quorumNumbers := types.QuorumNums{0, 1} + quorumThresholdPercentages := []types.QuorumThresholdPercentage{100, 100} + taskResponse := mockTaskResponse{123} + taskResponseDigest, err := hashFunction(taskResponse) + require.Nil(t, err) + blockNum := uint32(1) + + fakeAvsRegistryService := avsregistry.NewFakeAvsRegistryService( + blockNum, + []types.TestOperator{testOperator1, testOperator2}, + ) + logger := testutils.GetTestLogger() + blsAggServ := NewBlsAggregatorService(fakeAvsRegistryService, hashFunction, logger) + + err = blsAggServ.InitializeNewTask( + taskIndex, + blockNum, + quorumNumbers, + quorumThresholdPercentages, + tasksTimeToExpiry, + ) + require.Nil(t, err) + blsSigOp1 := testOperator1.BlsKeypair.SignMessage(taskResponseDigest) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse, + blsSigOp1, + testOperator1.OperatorId, + ) + require.Nil(t, err) + + wantAggregationServiceResponse := BlsAggregationServiceResponse{ + Err: TaskExpiredErrorFn(taskIndex), + } + gotAggregationServiceResponse := <-blsAggServ.aggregatedResponsesC + require.EqualValues(t, wantAggregationServiceResponse, gotAggregationServiceResponse) + require.EqualValues(t, taskIndex, gotAggregationServiceResponse.TaskIndex) + }, + ) t.Run("send signature of task that isn't initialized - task not found error", func(t *testing.T) { testOperator1 := types.TestOperator{ @@ -642,65 +901,90 @@ func TestBlsAgg(t *testing.T) { logger := testutils.GetTestLogger() blsAggServ := NewBlsAggregatorService(fakeAvsRegistryService, hashFunction, logger) - err = blsAggServ.ProcessNewSignature(context.Background(), taskIndex, taskResponse, blsSig, testOperator1.OperatorId) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse, + blsSig, + testOperator1.OperatorId, + ) require.Equal(t, TaskNotFoundErrorFn(taskIndex), err) }) // this is an edge case as typically we would send new tasks and listen for task responses in a for select loop // but this test makes sure the context deadline exceeded can get us out of a deadlock - t.Run("send new signedTaskDigest before listen on responseChan - context timeout cancels the request to prevent deadlock", func(t *testing.T) { - testOperator1 := types.TestOperator{ - OperatorId: types.OperatorId{1}, - StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(100), 1: big.NewInt(200)}, - BlsKeypair: newBlsKeyPairPanics("0x1"), - } - testOperator2 := types.TestOperator{ - OperatorId: types.OperatorId{2}, - StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(100), 1: big.NewInt(200)}, - BlsKeypair: newBlsKeyPairPanics("0x2"), - } - blockNum := uint32(1) - taskIndex := types.TaskIndex(0) - quorumNumbers := types.QuorumNums{0} - quorumThresholdPercentages := []types.QuorumThresholdPercentage{100} - - fakeAvsRegistryService := avsregistry.NewFakeAvsRegistryService(blockNum, []types.TestOperator{testOperator1}) - logger := testutils.GetTestLogger() - blsAggServ := NewBlsAggregatorService(fakeAvsRegistryService, hashFunction, logger) - - err := blsAggServ.InitializeNewTask(taskIndex, blockNum, quorumNumbers, quorumThresholdPercentages, tasksTimeToExpiry) - require.Nil(t, err) - taskResponse1 := mockTaskResponse{1} - taskResponseDigest1, err := hashFunction(taskResponse1) - require.Nil(t, err) - blsSigOp1 := testOperator1.BlsKeypair.SignMessage(taskResponseDigest1) - err = blsAggServ.ProcessNewSignature(context.Background(), taskIndex, taskResponse1, blsSigOp1, testOperator1.OperatorId) - require.Nil(t, err) - - taskResponse2 := mockTaskResponse{2} - taskResponseDigest2, err := hashFunction(taskResponse2) - require.Nil(t, err) - blsSigOp2 := testOperator2.BlsKeypair.SignMessage(taskResponseDigest2) - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - err = blsAggServ.ProcessNewSignature(ctx, taskIndex, taskResponse2, blsSigOp2, testOperator2.OperatorId) - // this should timeout because the task goroutine is blocked on the response channel (since we only listen for it below) - require.Equal(t, context.DeadlineExceeded, err) - - wantAggregationServiceResponse := BlsAggregationServiceResponse{ - Err: nil, - TaskIndex: taskIndex, - TaskResponse: taskResponse1, - TaskResponseDigest: taskResponseDigest1, - NonSignersPubkeysG1: []*bls.G1Point{}, - QuorumApksG1: []*bls.G1Point{testOperator1.BlsKeypair.GetPubKeyG1()}, - SignersApkG2: testOperator1.BlsKeypair.GetPubKeyG2(), - SignersAggSigG1: testOperator1.BlsKeypair.SignMessage(taskResponseDigest1), - } - gotAggregationServiceResponse := <-blsAggServ.aggregatedResponsesC - require.Equal(t, wantAggregationServiceResponse, gotAggregationServiceResponse) - require.EqualValues(t, taskIndex, gotAggregationServiceResponse.TaskIndex) - }) + t.Run( + "send new signedTaskDigest before listen on responseChan - context timeout cancels the request to prevent deadlock", + func(t *testing.T) { + testOperator1 := types.TestOperator{ + OperatorId: types.OperatorId{1}, + StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(100), 1: big.NewInt(200)}, + BlsKeypair: newBlsKeyPairPanics("0x1"), + } + testOperator2 := types.TestOperator{ + OperatorId: types.OperatorId{2}, + StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(100), 1: big.NewInt(200)}, + BlsKeypair: newBlsKeyPairPanics("0x2"), + } + blockNum := uint32(1) + taskIndex := types.TaskIndex(0) + quorumNumbers := types.QuorumNums{0} + quorumThresholdPercentages := []types.QuorumThresholdPercentage{100} + + fakeAvsRegistryService := avsregistry.NewFakeAvsRegistryService( + blockNum, + []types.TestOperator{testOperator1}, + ) + logger := testutils.GetTestLogger() + blsAggServ := NewBlsAggregatorService(fakeAvsRegistryService, hashFunction, logger) + + err := blsAggServ.InitializeNewTask( + taskIndex, + blockNum, + quorumNumbers, + quorumThresholdPercentages, + tasksTimeToExpiry, + ) + require.Nil(t, err) + taskResponse1 := mockTaskResponse{1} + taskResponseDigest1, err := hashFunction(taskResponse1) + require.Nil(t, err) + blsSigOp1 := testOperator1.BlsKeypair.SignMessage(taskResponseDigest1) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse1, + blsSigOp1, + testOperator1.OperatorId, + ) + require.Nil(t, err) + + taskResponse2 := mockTaskResponse{2} + taskResponseDigest2, err := hashFunction(taskResponse2) + require.Nil(t, err) + blsSigOp2 := testOperator2.BlsKeypair.SignMessage(taskResponseDigest2) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + err = blsAggServ.ProcessNewSignature(ctx, taskIndex, taskResponse2, blsSigOp2, testOperator2.OperatorId) + // this should timeout because the task goroutine is blocked on the response channel (since we only listen + // for it below) + require.Equal(t, context.DeadlineExceeded, err) + + wantAggregationServiceResponse := BlsAggregationServiceResponse{ + Err: nil, + TaskIndex: taskIndex, + TaskResponse: taskResponse1, + TaskResponseDigest: taskResponseDigest1, + NonSignersPubkeysG1: []*bls.G1Point{}, + QuorumApksG1: []*bls.G1Point{testOperator1.BlsKeypair.GetPubKeyG1()}, + SignersApkG2: testOperator1.BlsKeypair.GetPubKeyG2(), + SignersAggSigG1: testOperator1.BlsKeypair.SignMessage(taskResponseDigest1), + } + gotAggregationServiceResponse := <-blsAggServ.aggregatedResponsesC + require.Equal(t, wantAggregationServiceResponse, gotAggregationServiceResponse) + require.EqualValues(t, taskIndex, gotAggregationServiceResponse.TaskIndex) + }, + ) t.Run("1 quorum 2 operator 2 signatures on 2 different msgs - task expired", func(t *testing.T) { testOperator1 := types.TestOperator{ @@ -718,23 +1002,44 @@ func TestBlsAgg(t *testing.T) { quorumNumbers := types.QuorumNums{0} quorumThresholdPercentages := []types.QuorumThresholdPercentage{100} - fakeAvsRegistryService := avsregistry.NewFakeAvsRegistryService(blockNum, []types.TestOperator{testOperator1, testOperator2}) + fakeAvsRegistryService := avsregistry.NewFakeAvsRegistryService( + blockNum, + []types.TestOperator{testOperator1, testOperator2}, + ) logger := testutils.GetTestLogger() blsAggServ := NewBlsAggregatorService(fakeAvsRegistryService, hashFunction, logger) - err := blsAggServ.InitializeNewTask(taskIndex, blockNum, quorumNumbers, quorumThresholdPercentages, tasksTimeToExpiry) + err := blsAggServ.InitializeNewTask( + taskIndex, + blockNum, + quorumNumbers, + quorumThresholdPercentages, + tasksTimeToExpiry, + ) require.Nil(t, err) taskResponse1 := mockTaskResponse{1} taskResponseDigest1, err := hashFunction(taskResponse1) require.Nil(t, err) blsSigOp1 := testOperator1.BlsKeypair.SignMessage(taskResponseDigest1) - err = blsAggServ.ProcessNewSignature(context.Background(), taskIndex, taskResponse1, blsSigOp1, testOperator1.OperatorId) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse1, + blsSigOp1, + testOperator1.OperatorId, + ) require.Nil(t, err) taskResponse2 := mockTaskResponse{2} taskResponseDigest2, err := hashFunction(taskResponse2) require.Nil(t, err) blsSigOp2 := testOperator2.BlsKeypair.SignMessage(taskResponseDigest2) - err = blsAggServ.ProcessNewSignature(context.Background(), taskIndex, taskResponse2, blsSigOp2, testOperator2.OperatorId) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse2, + blsSigOp2, + testOperator2.OperatorId, + ) require.Nil(t, err) wantAggregationServiceResponse := BlsAggregationServiceResponse{ Err: TaskExpiredErrorFn(taskIndex), @@ -744,32 +1049,50 @@ func TestBlsAgg(t *testing.T) { require.EqualValues(t, taskIndex, gotAggregationServiceResponse.TaskIndex) }) - t.Run("1 quorum 1 operator 1 invalid signature (TaskResponseDigest does not match TaskResponse)", func(t *testing.T) { - testOperator1 := types.TestOperator{ - OperatorId: types.OperatorId{1}, - StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(100), 1: big.NewInt(200)}, - BlsKeypair: newBlsKeyPairPanics("0x1"), - } - blockNum := uint32(1) - taskIndex := types.TaskIndex(0) - quorumNumbers := types.QuorumNums{0} - quorumThresholdPercentages := []types.QuorumThresholdPercentage{100} - taskResponse := mockTaskResponse{123} // Initialize with appropriate data - - taskResponseDigest, err := wrongHashFunction(taskResponse) - require.Nil(t, err) - - blsSig := testOperator1.BlsKeypair.SignMessage(taskResponseDigest) - - fakeAvsRegistryService := avsregistry.NewFakeAvsRegistryService(blockNum, []types.TestOperator{testOperator1}) - logger := testutils.GetTestLogger() - blsAggServ := NewBlsAggregatorService(fakeAvsRegistryService, hashFunction, logger) - - err = blsAggServ.InitializeNewTask(taskIndex, blockNum, quorumNumbers, quorumThresholdPercentages, tasksTimeToExpiry) - require.Nil(t, err) - err = blsAggServ.ProcessNewSignature(context.Background(), taskIndex, taskResponse, blsSig, testOperator1.OperatorId) - require.EqualError(t, err, "Signature verification failed. Incorrect Signature.") - }) + t.Run( + "1 quorum 1 operator 1 invalid signature (TaskResponseDigest does not match TaskResponse)", + func(t *testing.T) { + testOperator1 := types.TestOperator{ + OperatorId: types.OperatorId{1}, + StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(100), 1: big.NewInt(200)}, + BlsKeypair: newBlsKeyPairPanics("0x1"), + } + blockNum := uint32(1) + taskIndex := types.TaskIndex(0) + quorumNumbers := types.QuorumNums{0} + quorumThresholdPercentages := []types.QuorumThresholdPercentage{100} + taskResponse := mockTaskResponse{123} // Initialize with appropriate data + + taskResponseDigest, err := wrongHashFunction(taskResponse) + require.Nil(t, err) + + blsSig := testOperator1.BlsKeypair.SignMessage(taskResponseDigest) + + fakeAvsRegistryService := avsregistry.NewFakeAvsRegistryService( + blockNum, + []types.TestOperator{testOperator1}, + ) + logger := testutils.GetTestLogger() + blsAggServ := NewBlsAggregatorService(fakeAvsRegistryService, hashFunction, logger) + + err = blsAggServ.InitializeNewTask( + taskIndex, + blockNum, + quorumNumbers, + quorumThresholdPercentages, + tasksTimeToExpiry, + ) + require.Nil(t, err) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse, + blsSig, + testOperator1.OperatorId, + ) + require.EqualError(t, err, "Signature verification failed. Incorrect Signature.") + }, + ) } func TestIntegrationBlsAgg(t *testing.T) { @@ -822,8 +1145,18 @@ func TestIntegrationBlsAgg(t *testing.T) { require.NoError(t, err) // create aggregation service - operatorsInfoService := operatorsinfo.NewOperatorsInfoServiceInMemory(context.TODO(), avsClients.AvsRegistryChainSubscriber, avsClients.AvsRegistryChainReader, nil, logger) - avsRegistryService := avsregistry.NewAvsRegistryServiceChainCaller(avsClients.AvsRegistryChainReader, operatorsInfoService, logger) + operatorsInfoService := operatorsinfo.NewOperatorsInfoServiceInMemory( + context.TODO(), + avsClients.AvsRegistryChainSubscriber, + avsClients.AvsRegistryChainReader, + nil, + logger, + ) + avsRegistryService := avsregistry.NewAvsRegistryServiceChainCaller( + avsClients.AvsRegistryChainReader, + operatorsInfoService, + logger, + ) blsAggServ := NewBlsAggregatorService(avsRegistryService, hashFunction, logger) // register operator @@ -835,14 +1168,21 @@ func TestIntegrationBlsAgg(t *testing.T) { curBlockNum, err := ethHttpClient.BlockNumber(context.Background()) require.NoError(t, err) referenceBlockNumber := uint32(curBlockNum) - // need to advance chain by 1 block because of the check in signatureChecker where RBN must be < current block number + // need to advance chain by 1 block because of the check in signatureChecker where RBN must be < current block + // number testutils.AdvanceChainByNBlocksExecInContainer(context.TODO(), 1, anvilC) taskIndex := types.TaskIndex(0) taskResponse := mockTaskResponse{123} // Initialize with appropriate data quorumThresholdPercentages := []types.QuorumThresholdPercentage{100} // initialize the task - err = blsAggServ.InitializeNewTask(taskIndex, uint32(referenceBlockNumber), quorumNumbers, quorumThresholdPercentages, tasksTimeToExpiry) + err = blsAggServ.InitializeNewTask( + taskIndex, + uint32(referenceBlockNumber), + quorumNumbers, + quorumThresholdPercentages, + tasksTimeToExpiry, + ) require.Nil(t, err) // compute the signature and send it to the aggregation service @@ -854,7 +1194,13 @@ func TestIntegrationBlsAgg(t *testing.T) { // wait for the response from the aggregation service and check the signature blsAggServiceResp := <-blsAggServ.aggregatedResponsesC - _, _, err = avsServiceManager.CheckSignatures(&bind.CallOpts{}, taskResponseDigest, quorumNumbers.UnderlyingType(), uint32(referenceBlockNumber), blsAggServiceResp.toNonSignerStakesAndSignature()) + _, _, err = avsServiceManager.CheckSignatures( + &bind.CallOpts{}, + taskResponseDigest, + quorumNumbers.UnderlyingType(), + uint32(referenceBlockNumber), + blsAggServiceResp.toNonSignerStakesAndSignature(), + ) require.NoError(t, err) }) @@ -881,10 +1227,12 @@ func (blsAggServiceResp *BlsAggregationServiceResponse) toNonSignerStakesAndSign quorumApks = append(quorumApks, avssm.BN254G1Point(utils.ConvertToBN254G1Point(quorumApk))) } nonSignerStakesAndSignature := avssm.IBLSSignatureCheckerNonSignerStakesAndSignature{ - NonSignerPubkeys: nonSignerPubkeys, - QuorumApks: quorumApks, - ApkG2: avssm.BN254G2Point(utils.ConvertToBN254G2Point(blsAggServiceResp.SignersApkG2)), - Sigma: avssm.BN254G1Point(utils.ConvertToBN254G1Point(blsAggServiceResp.SignersAggSigG1.G1Point)), + NonSignerPubkeys: nonSignerPubkeys, + QuorumApks: quorumApks, + ApkG2: avssm.BN254G2Point(utils.ConvertToBN254G2Point(blsAggServiceResp.SignersApkG2)), + Sigma: avssm.BN254G1Point( + utils.ConvertToBN254G1Point(blsAggServiceResp.SignersAggSigG1.G1Point), + ), NonSignerQuorumBitmapIndices: blsAggServiceResp.NonSignerQuorumBitmapIndices, QuorumApkIndices: blsAggServiceResp.QuorumApkIndices, TotalStakeIndices: blsAggServiceResp.TotalStakeIndices, diff --git a/services/operatorsinfo/operatorsinfo.go b/services/operatorsinfo/operatorsinfo.go index 945aaece..c9a71c6b 100644 --- a/services/operatorsinfo/operatorsinfo.go +++ b/services/operatorsinfo/operatorsinfo.go @@ -7,13 +7,15 @@ import ( "github.com/ethereum/go-ethereum/common" ) -// OperatorsInfoService is a service that indexes the BLSApkRegistry contract and provides a way to query for operator pubkeys. +// OperatorsInfoService is a service that indexes the BLSApkRegistry contract and provides a way to query for operator +// pubkeys. // Currently BLSApkRegistry only stores the hash of the G1 and G2 pubkeys, so this service needs to listen to the // event NewPubkeyRegistration(address indexed operator, BN254.G1Point pubkeyG1, BN254.G2Point pubkeyG2) // and store the actual pubkeys, so that AVS aggregators can get the pubkeys of the operators registered with their AVS. // -// TODO: having this service as a separate service (instead of merged with avsregistry service) is a vestige of the past when -// we had a separate blsPublicKeyCompendium shared contract between all AVSs. We should eventually merge this back into avsregistry. +// TODO: having this service as a separate service (instead of merged with avsregistry service) is a vestige of the past +// when we had a separate blsPublicKeyCompendium shared contract between all AVSs. We should eventually merge this back +// into avsregistry. type OperatorsInfoService interface { // GetOperatorInfo returns the info of the operator with the given address. // it returns operatorFound=false if the operator is not found. diff --git a/services/operatorsinfo/operatorsinfo_inmemory.go b/services/operatorsinfo/operatorsinfo_inmemory.go index 9e289680..b5b43deb 100644 --- a/services/operatorsinfo/operatorsinfo_inmemory.go +++ b/services/operatorsinfo/operatorsinfo_inmemory.go @@ -3,11 +3,12 @@ package operatorsinfo import ( "context" "errors" + "math/big" + "sync" + blsapkreg "github.com/Layr-Labs/eigensdk-go/contracts/bindings/BLSApkRegistry" regcoord "github.com/Layr-Labs/eigensdk-go/contracts/bindings/RegistryCoordinator" "github.com/ethereum/go-ethereum/event" - "math/big" - "sync" "github.com/Layr-Labs/eigensdk-go/crypto/bls" "github.com/Layr-Labs/eigensdk-go/logging" @@ -40,15 +41,18 @@ type avsRegistrySubscriber interface { } // OperatorsInfoServiceInMemory is a stateful goroutine (see https://gobyexample.com/stateful-goroutines) -// implementation of OperatorsInfoService that listen for the NewPubkeyRegistration and OperatorSocketUpdate events using a websocket connection +// implementation of OperatorsInfoService that listen for the NewPubkeyRegistration and OperatorSocketUpdate events +// using a websocket connection // to an eth client and stores the pubkeys/sockets in memory. Another possible implementation is using a mutex // (https://gobyexample.com/mutexes) instead. We can switch to that if we ever find a good reason to. // // Warning: this service should probably not be used in production. Haven't done a thorough analysis of all the clients -// but there is still an open PR about an issue with ws subscription on geth: https://github.com/ethereum/go-ethereum/issues/23845 +// but there is still an open PR about an issue with ws subscription on geth: +// https://github.com/ethereum/go-ethereum/issues/23845 // Another reason to note for infra/devops engineer who would put this into production, is that this service crashes on // websocket connection errors or when failing to query past events. The philosophy here is that hard crashing is -// better than silently failing, since it will be easier to debug. Naturally, this means that this aggregator using this service needs +// better than silently failing, since it will be easier to debug. Naturally, this means that this aggregator using this +// service needs // to be replicated and load-balanced, so that when it fails traffic can be switched to the other aggregator. type OperatorsInfoServiceInMemory struct { logFilterQueryBlockRange *big.Int @@ -67,7 +71,8 @@ type query struct { respC chan<- resp } type resp struct { - // TODO: possible for socket to be empty string if haven't received the event yet... would be a crazy race condition though. + // TODO: possible for socket to be empty string if haven't received the event yet... would be a crazy race condition + // though. operatorInfo types.OperatorInfo // false if operators were not present in the pubkey dict operatorExists bool @@ -77,9 +82,11 @@ var _ OperatorsInfoService = (*OperatorsInfoServiceInMemory)(nil) // NewOperatorsInfoServiceInMemory constructs a OperatorsInfoServiceInMemory and starts it in a goroutine. // It takes a context as argument because the "backfilling" of the database is done inside this constructor, -// so we wait for all past NewPubkeyRegistration/OperatorSocketUpdate events to be queried and the db to be filled before returning the service. +// so we wait for all past NewPubkeyRegistration/OperatorSocketUpdate events to be queried and the db to be filled +// before returning the service. // The constructor is thus following a RAII-like pattern, of initializing the serving during construction. -// Using a separate initialize() function might lead to some users forgetting to call it and the service not behaving properly. +// Using a separate initialize() function might lead to some users forgetting to call it and the service not behaving +// properly. func NewOperatorsInfoServiceInMemory( ctx context.Context, avsRegistrySubscriber avsRegistrySubscriber, @@ -110,70 +117,149 @@ func NewOperatorsInfoServiceInMemory( return pkcs } -func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine(ctx context.Context, queryC <-chan query, wg *sync.WaitGroup) { +func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( + ctx context.Context, + queryC <-chan query, + wg *sync.WaitGroup, +) { go func() { // TODO(samlaf): we should probably save the service in the logger itself and add it automatically to all logs - ops.logger.Debug("Subscribing to new pubkey registration events on blsApkRegistry contract", "service", "OperatorPubkeysServiceInMemory") + ops.logger.Debug( + "Subscribing to new pubkey registration events on blsApkRegistry contract", + "service", + "OperatorPubkeysServiceInMemory", + ) newPubkeyRegistrationC, newPubkeyRegistrationSub, err := ops.avsRegistrySubscriber.SubscribeToNewPubkeyRegistrations() if err != nil { - ops.logger.Error("Fatal error opening websocket subscription for new pubkey registrations", "err", err, "service", "OperatorPubkeysServiceInMemory") + ops.logger.Error( + "Fatal error opening websocket subscription for new pubkey registrations", + "err", + err, + "service", + "OperatorPubkeysServiceInMemory", + ) // see the warning above the struct definition to understand why we panic here panic(err) } newSocketRegistrationC, newSocketRegistrationSub, err := ops.avsRegistrySubscriber.SubscribeToOperatorSocketUpdates() if err != nil { - ops.logger.Error("Fatal error opening websocket subscription for new socket registrations", "err", err, "service", "OperatorPubkeysServiceInMemory") + ops.logger.Error( + "Fatal error opening websocket subscription for new socket registrations", + "err", + err, + "service", + "OperatorPubkeysServiceInMemory", + ) panic(err) } err = ops.queryPastRegisteredOperatorEventsAndFillDb(ctx) if err != nil { - ops.logger.Error("Fatal error querying past registered operator events and filling db", "err", err, "service", "OperatorPubkeysServiceInMemory") + ops.logger.Error( + "Fatal error querying past registered operator events and filling db", + "err", + err, + "service", + "OperatorPubkeysServiceInMemory", + ) panic(err) } - // The constructor can return after we have backfilled the db by querying the events of operators that have registered with the blsApkRegistry + // The constructor can return after we have backfilled the db by querying the events of operators that have + // registered with the blsApkRegistry // before the block at which we started the ws subscription above wg.Done() for { select { case <-ctx.Done(): - // TODO(samlaf): should we do anything here? Seems like this only happens when the aggregator is shutting down and we want graceful exit + // TODO(samlaf): should we do anything here? Seems like this only happens when the aggregator is + // shutting down and we want graceful exit ops.logger.Infof("OperatorPubkeysServiceInMemory: Context cancelled, exiting") return case err := <-newPubkeyRegistrationSub.Err(): - ops.logger.Error("Error in websocket subscription for new pubkey registration events. Attempting to reconnect...", "err", err, "service", "OperatorPubkeysServiceInMemory") + ops.logger.Error( + "Error in websocket subscription for new pubkey registration events. Attempting to reconnect...", + "err", + err, + "service", + "OperatorPubkeysServiceInMemory", + ) newPubkeyRegistrationSub.Unsubscribe() newPubkeyRegistrationC, newPubkeyRegistrationSub, err = ops.avsRegistrySubscriber.SubscribeToNewPubkeyRegistrations() if err != nil { - ops.logger.Error("Error opening websocket subscription for new pubkey registrations", "err", err, "service", "OperatorPubkeysServiceInMemory") + ops.logger.Error( + "Error opening websocket subscription for new pubkey registrations", + "err", + err, + "service", + "OperatorPubkeysServiceInMemory", + ) // see the warning above the struct definition to understand why we panic here panic(err) } case err := <-newSocketRegistrationSub.Err(): - ops.logger.Error("Error in websocket subscription for new socket registration events. Attempting to reconnect...", "err", err, "service", "OperatorPubkeysServiceInMemory") + ops.logger.Error( + "Error in websocket subscription for new socket registration events. Attempting to reconnect...", + "err", + err, + "service", + "OperatorPubkeysServiceInMemory", + ) newSocketRegistrationSub.Unsubscribe() newSocketRegistrationC, newSocketRegistrationSub, err = ops.avsRegistrySubscriber.SubscribeToOperatorSocketUpdates() if err != nil { - ops.logger.Error("Error opening websocket subscription for new socket registrations", "err", err, "service", "OperatorPubkeysServiceInMemory") + ops.logger.Error( + "Error opening websocket subscription for new socket registrations", + "err", + err, + "service", + "OperatorPubkeysServiceInMemory", + ) panic(err) } case newPubkeyRegistrationEvent := <-newPubkeyRegistrationC: operatorAddr := newPubkeyRegistrationEvent.Operator ops.pubkeyDict[operatorAddr] = types.OperatorPubkeys{ - G1Pubkey: bls.NewG1Point(newPubkeyRegistrationEvent.PubkeyG1.X, newPubkeyRegistrationEvent.PubkeyG1.Y), - G2Pubkey: bls.NewG2Point(newPubkeyRegistrationEvent.PubkeyG2.X, newPubkeyRegistrationEvent.PubkeyG2.Y), + G1Pubkey: bls.NewG1Point( + newPubkeyRegistrationEvent.PubkeyG1.X, + newPubkeyRegistrationEvent.PubkeyG1.Y, + ), + G2Pubkey: bls.NewG2Point( + newPubkeyRegistrationEvent.PubkeyG2.X, + newPubkeyRegistrationEvent.PubkeyG2.Y, + ), } operatorId := types.OperatorIdFromContractG1Pubkey(newPubkeyRegistrationEvent.PubkeyG1) ops.operatorAddrToId[operatorAddr] = operatorId - ops.logger.Debug("Added operator pubkeys to pubkey dict from new pubkey registration event", - "service", "OperatorPubkeysServiceInMemory", "block", newPubkeyRegistrationEvent.Raw.BlockNumber, - "operatorAddr", operatorAddr, "operatorId", operatorId, - "G1pubkey", ops.pubkeyDict[operatorAddr].G1Pubkey, "G2pubkey", ops.pubkeyDict[operatorAddr].G2Pubkey, + ops.logger.Debug( + "Added operator pubkeys to pubkey dict from new pubkey registration event", + "service", + "OperatorPubkeysServiceInMemory", + "block", + newPubkeyRegistrationEvent.Raw.BlockNumber, + "operatorAddr", + operatorAddr, + "operatorId", + operatorId, + "G1pubkey", + ops.pubkeyDict[operatorAddr].G1Pubkey, + "G2pubkey", + ops.pubkeyDict[operatorAddr].G2Pubkey, ) case newSocketRegistrationEvent := <-newSocketRegistrationC: - ops.logger.Debug("Received new socket registration event", "service", "OperatorPubkeysServiceInMemory", "operatorId", types.OperatorId(newSocketRegistrationEvent.OperatorId), "socket", newSocketRegistrationEvent.Socket) - ops.updateSocketMapping(newSocketRegistrationEvent.OperatorId, types.Socket(newSocketRegistrationEvent.Socket)) + ops.logger.Debug( + "Received new socket registration event", + "service", + "OperatorPubkeysServiceInMemory", + "operatorId", + types.OperatorId(newSocketRegistrationEvent.OperatorId), + "socket", + newSocketRegistrationEvent.Socket, + ) + ops.updateSocketMapping( + newSocketRegistrationEvent.OperatorId, + types.Socket(newSocketRegistrationEvent.Socket), + ) // Receive a query from GetOperatorPubkeys case query := <-queryC: pubkeys, ok := ops.pubkeyDict[query.operatorAddr] @@ -190,23 +276,35 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine(ctx context.Con } func (ops *OperatorsInfoServiceInMemory) queryPastRegisteredOperatorEventsAndFillDb(ctx context.Context) error { - // Querying with nil startBlock and stopBlock will return all events. It doesn't matter if we query some events that we will receive again in the websocket, + // Querying with nil startBlock and stopBlock will return all events. It doesn't matter if we query some events that + // we will receive again in the websocket, // since we will just overwrite the pubkey dict with the same values. wg := sync.WaitGroup{} var alreadyRegisteredOperatorAddrs []common.Address var alreadyRegisteredOperatorPubkeys []types.OperatorPubkeys var pubkeysErr error - // we make both Queries in parallel because they take time and we don't want to wait for one to finish before starting the other + // we make both Queries in parallel because they take time and we don't want to wait for one to finish before + // starting the other wg.Add(2) go func() { - alreadyRegisteredOperatorAddrs, alreadyRegisteredOperatorPubkeys, pubkeysErr = ops.avsRegistryReader.QueryExistingRegisteredOperatorPubKeys(ctx, nil, nil, ops.logFilterQueryBlockRange) + alreadyRegisteredOperatorAddrs, alreadyRegisteredOperatorPubkeys, pubkeysErr = ops.avsRegistryReader.QueryExistingRegisteredOperatorPubKeys( + ctx, + nil, + nil, + ops.logFilterQueryBlockRange, + ) wg.Done() }() var socketsMap map[types.OperatorId]types.Socket var socketsErr error go func() { - socketsMap, socketsErr = ops.avsRegistryReader.QueryExistingRegisteredOperatorSockets(ctx, nil, nil, ops.logFilterQueryBlockRange) + socketsMap, socketsErr = ops.avsRegistryReader.QueryExistingRegisteredOperatorSockets( + ctx, + nil, + nil, + ops.logFilterQueryBlockRange, + ) wg.Done() }() @@ -217,12 +315,29 @@ func (ops *OperatorsInfoServiceInMemory) queryPastRegisteredOperatorEventsAndFil if socketsErr != nil { return utils.WrapError(errors.New("error querying existing registered operator sockets"), socketsErr) } - ops.logger.Debug("List of queried operator registration events in blsApkRegistry", "alreadyRegisteredOperatorAddr", alreadyRegisteredOperatorAddrs, "alreadyRegisteredOperatorPubkeys", alreadyRegisteredOperatorPubkeys, "service", "OperatorPubkeysServiceInMemory") + ops.logger.Debug( + "List of queried operator registration events in blsApkRegistry", + "alreadyRegisteredOperatorAddr", + alreadyRegisteredOperatorAddrs, + "alreadyRegisteredOperatorPubkeys", + alreadyRegisteredOperatorPubkeys, + "service", + "OperatorPubkeysServiceInMemory", + ) for operatorId, socket := range socketsMap { - // we print each socket info on a separate line because slog for some reason doesn't pass map keys via their LogValue() function, - // so operatorId (of custom type Bytes32) prints as a byte array instead of its hex representation from LogValue() + // we print each socket info on a separate line because slog for some reason doesn't pass map keys via their + // LogValue() function, so operatorId (of custom type Bytes32) prints as a byte array instead of its hex + // representation from LogValue() // passing the Bytes32 directly to an slog log statements does call LogValue() and prints the hex representation - ops.logger.Debug("operator socket returned from registration events query", "operatorId", operatorId, "socket", socket, "service", "OperatorPubkeysServiceInMemory") + ops.logger.Debug( + "operator socket returned from registration events query", + "operatorId", + operatorId, + "socket", + socket, + "service", + "OperatorPubkeysServiceInMemory", + ) } // Fill the pubkeydict db with the operators and pubkeys found @@ -237,7 +352,10 @@ func (ops *OperatorsInfoServiceInMemory) queryPastRegisteredOperatorEventsAndFil } // TODO(samlaf): we might want to also add an async version of this method that returns a channel of operator pubkeys? -func (ops *OperatorsInfoServiceInMemory) GetOperatorInfo(ctx context.Context, operator common.Address) (operatorPubkeys types.OperatorInfo, operatorFound bool) { +func (ops *OperatorsInfoServiceInMemory) GetOperatorInfo( + ctx context.Context, + operator common.Address, +) (operatorPubkeys types.OperatorInfo, operatorFound bool) { respC := make(chan resp) ops.queryC <- query{operator, respC} select { diff --git a/services/operatorsinfo/operatorsinfo_inmemory_test.go b/services/operatorsinfo/operatorsinfo_inmemory_test.go index 200766a0..c3e473a0 100644 --- a/services/operatorsinfo/operatorsinfo_inmemory_test.go +++ b/services/operatorsinfo/operatorsinfo_inmemory_test.go @@ -3,7 +3,6 @@ package operatorsinfo import ( "context" - "github.com/ethereum/go-ethereum/event" "log/slog" "math/big" "os" @@ -11,6 +10,8 @@ import ( "testing" "time" + "github.com/ethereum/go-ethereum/event" + apkregistrybindings "github.com/Layr-Labs/eigensdk-go/contracts/bindings/BLSApkRegistry" blsapkreg "github.com/Layr-Labs/eigensdk-go/contracts/bindings/BLSApkRegistry" regcoord "github.com/Layr-Labs/eigensdk-go/contracts/bindings/RegistryCoordinator" @@ -139,20 +140,40 @@ func TestGetOperatorInfo(t *testing.T) { t.Run(tt.name, func(t *testing.T) { // Create mocks mockSubscription := newFakeEventSubscription(tt.eventErrC) - mockAvsRegistrySubscriber := newFakeAVSRegistrySubscriber(mockSubscription, tt.pubkeyRegistrationEventC, tt.operatorSocketUpdateEventC) + mockAvsRegistrySubscriber := newFakeAVSRegistrySubscriber( + mockSubscription, + tt.pubkeyRegistrationEventC, + tt.operatorSocketUpdateEventC, + ) mockAvsReader := fakes.NewFakeAVSRegistryReader(tt.operator, nil) // Create a new instance of the operatorpubkeys service - service := NewOperatorsInfoServiceInMemory(context.Background(), mockAvsRegistrySubscriber, mockAvsReader, nil, logger) - time.Sleep(2 * time.Second) // need to give it time to process the subscription events.. not sure if there's a better way to do this. + service := NewOperatorsInfoServiceInMemory( + context.Background(), + mockAvsRegistrySubscriber, + mockAvsReader, + nil, + logger, + ) + time.Sleep( + 2 * time.Second, + ) // need to give it time to process the subscription events.. not sure if there's a better way to do this. // Call the GetOperatorPubkeys method with the test operator address gotOperatorsInfo, gotOperatorFound := service.GetOperatorInfo(context.Background(), tt.queryOperatorAddr) if tt.wantOperatorFound != gotOperatorFound { - t.Fatalf("GetOperatorPubkeys returned wrong ok. Got: %v, want: %v.", gotOperatorFound, tt.wantOperatorFound) + t.Fatalf( + "GetOperatorPubkeys returned wrong ok. Got: %v, want: %v.", + gotOperatorFound, + tt.wantOperatorFound, + ) } if tt.wantOperatorFound == true && !reflect.DeepEqual(tt.wantOperatorInfo, gotOperatorsInfo) { - t.Fatalf("GetOperatorPubkeys returned wrong operator pubkeys. Got: %v, want: %v.", gotOperatorsInfo, tt.wantOperatorInfo) + t.Fatalf( + "GetOperatorPubkeys returned wrong operator pubkeys. Got: %v, want: %v.", + gotOperatorsInfo, + tt.wantOperatorInfo, + ) } }) } diff --git a/services/operatorsinfo/operatorsinfo_subgraph.go b/services/operatorsinfo/operatorsinfo_subgraph.go index 4a6346a0..9411b3aa 100644 --- a/services/operatorsinfo/operatorsinfo_subgraph.go +++ b/services/operatorsinfo/operatorsinfo_subgraph.go @@ -36,7 +36,8 @@ type ( SocketUpdates []SocketUpdates `graphql:"socketUpdates(first: 1, orderBy: blockNumber, orderDirection: desc)"` } IndexedOperatorInfo struct { - // PubKeyG1 and PubKeyG2 are the public keys of the operator, which are retrieved from the EigenDAPubKeyCompendium smart contract + // PubKeyG1 and PubKeyG2 are the public keys of the operator, which are retrieved from the + // EigenDAPubKeyCompendium smart contract PubkeyG1 *G1Point PubkeyG2 *G2Point // Socket is the socket address of the operator, in the form "host:port" @@ -57,9 +58,11 @@ var _ OperatorsInfoService = (*OperatorsInfoServiceSubgraph)(nil) // NewOperatorsInfoServiceSubgraph constructs a OperatorsInfoServiceSubgraph and starts it in a goroutine. // It takes a context as argument because the "backfilling" of the database is done inside this constructor, -// so we wait for all past NewPubkeyRegistration/OperatorSocketUpdate events to be queried and the db to be filled before returning the service. +// so we wait for all past NewPubkeyRegistration/OperatorSocketUpdate events to be queried and the db to be filled +// before returning the service. // The constructor is thus following a RAII-like pattern, of initializing the serving during construction. -// Using a separate initialize() function might lead to some users forgetting to call it and the service not behaving properly. +// Using a separate initialize() function might lead to some users forgetting to call it and the service not behaving +// properly. func NewOperatorsInfoServiceSubgraph( ctx context.Context, client GraphQLQuerier, @@ -73,7 +76,10 @@ func NewOperatorsInfoServiceSubgraph( } // TODO(samlaf): we might want to also add an async version of this method that returns a channel of operator pubkeys? -func (ops *OperatorsInfoServiceSubgraph) GetOperatorInfo(ctx context.Context, operator common.Address) (operatorPubkeys types.OperatorInfo, operatorFound bool) { +func (ops *OperatorsInfoServiceSubgraph) GetOperatorInfo( + ctx context.Context, + operator common.Address, +) (operatorPubkeys types.OperatorInfo, operatorFound bool) { operatorInfo, err := ops.getIndexedOperatorInfoByOperatorId(ctx, operator) if err != nil { return types.OperatorInfo{}, false @@ -81,7 +87,10 @@ func (ops *OperatorsInfoServiceSubgraph) GetOperatorInfo(ctx context.Context, op return *operatorInfo, true } -func (ops *OperatorsInfoServiceSubgraph) getIndexedOperatorInfoByOperatorId(ctx context.Context, operator common.Address) (*types.OperatorInfo, error) { +func (ops *OperatorsInfoServiceSubgraph) getIndexedOperatorInfoByOperatorId( + ctx context.Context, + operator common.Address, +) (*types.OperatorInfo, error) { var ( query QueryOperatorByAddressGql variables = map[string]any{ diff --git a/types/operator.go b/types/operator.go index 9db61d45..174ac502 100644 --- a/types/operator.go +++ b/types/operator.go @@ -25,7 +25,6 @@ type Operator struct { Address string `yaml:"address" json:"address"` // https://github.com/Layr-Labs/eigenlayer-contracts/blob/delegation-redesign/src/contracts/interfaces/IDelegationManager.sol#L18 - EarningsReceiverAddress string `yaml:"earnings_receiver_address" json:"earnings_receiver_address"` DelegationApproverAddress string `yaml:"delegation_approver_address" json:"delegation_approver_address"` StakerOptOutWindowBlocks uint32 `yaml:"staker_opt_out_window_blocks" json:"staker_opt_out_window_blocks"` @@ -38,10 +37,6 @@ func (o Operator) Validate() error { return ErrInvalidOperatorAddress } - if !utils.IsValidEthereumAddress(o.EarningsReceiverAddress) { - return ErrInvalidEarningsReceiverAddress - } - if o.DelegationApproverAddress != ZeroAddress && !utils.IsValidEthereumAddress(o.DelegationApproverAddress) { return ErrInvalidDelegationApproverAddress } diff --git a/types/operator_test.go b/types/operator_test.go index 3dbde0d7..45604354 100644 --- a/types/operator_test.go +++ b/types/operator_test.go @@ -23,7 +23,6 @@ func TestOperatorValidate(t *testing.T) { operator: Operator{ Address: "0xd5e099c71b797516c10ed0f0d895f429c2781142", DelegationApproverAddress: "0xd5e099c71b797516c10ed0f0d895f429c2781142", - EarningsReceiverAddress: "0xd5e099c71b797516c10ed0f0d895f429c2781142", StakerOptOutWindowBlocks: 100, MetadataUrl: "https://madhur-test-public.s3.us-east-2.amazonaws.com/metadata.json", }, @@ -34,7 +33,6 @@ func TestOperatorValidate(t *testing.T) { operator: Operator{ Address: "0xd5e099c71b797516c10ed0f0d895f429c2781142", DelegationApproverAddress: ZeroAddress, - EarningsReceiverAddress: "0xd5e099c71b797516c10ed0f0d895f429c2781142", StakerOptOutWindowBlocks: 100, MetadataUrl: "https://madhur-test-public.s3.us-east-2.amazonaws.com/metadata.json", }, @@ -45,7 +43,6 @@ func TestOperatorValidate(t *testing.T) { operator: Operator{ Address: "0xd5e099c71b797516c10ed0f0d895f429c2781142", DelegationApproverAddress: "0xd5e099c71b797516c10ed0f0d895f429c2781142", - EarningsReceiverAddress: "0xd5e099c71b797516c10ed0f0d895f429c2781142", StakerOptOutWindowBlocks: 100, MetadataUrl: "", }, @@ -57,7 +54,6 @@ func TestOperatorValidate(t *testing.T) { operator: Operator{ Address: "0xd5e099c71b797516c10ed0f0d895f429c2781142", DelegationApproverAddress: "0xd5e099c71b797516c10ed0f0d895f429c2781142", - EarningsReceiverAddress: "0xd5e099c71b797516c10ed0f0d895f429c2781142", StakerOptOutWindowBlocks: 100, MetadataUrl: "http://localhost:8080/metadata.json", }, @@ -69,7 +65,6 @@ func TestOperatorValidate(t *testing.T) { operator: Operator{ Address: "0xd5e099c71b797516c10ed0f0d895f429c2781142", DelegationApproverAddress: "0xd5e099c71b797516c10ed0f0d895f429c2781142", - EarningsReceiverAddress: "0xd5e099c71b797516c10ed0f0d895f429c2781142", StakerOptOutWindowBlocks: 100, MetadataUrl: "http://127.0.0.1:8080/metadata.json", }, @@ -81,7 +76,6 @@ func TestOperatorValidate(t *testing.T) { operator: Operator{ Address: "0xd5e099c71b797516c10ed0f0d895f429c2781142", DelegationApproverAddress: "0xd5e099c71b797516c10ed0f0d895f429c2781142", - EarningsReceiverAddress: "0xd5e099c71b797516c10ed0f0d895f429c2781142", StakerOptOutWindowBlocks: 100, MetadataUrl: "https://example.com/metadata.json", }, @@ -96,31 +90,17 @@ func TestOperatorValidate(t *testing.T) { operator: Operator{ Address: "0xa", DelegationApproverAddress: "0xd5e099c71b797516c10ed0f0d895f429c2781142", - EarningsReceiverAddress: "0xd5e099c71b797516c10ed0f0d895f429c2781142", StakerOptOutWindowBlocks: 100, MetadataUrl: "https://example.com/metadata.json", }, wantErr: true, expectedErr: ErrInvalidOperatorAddress, }, - { - name: "failed operator validation - wrong earning receivers address address", - operator: Operator{ - Address: "0xd5e099c71b797516c10ed0f0d895f429c2781142", - DelegationApproverAddress: "0xd5e099c71b797516c10ed0f0d895f429c2781142", - EarningsReceiverAddress: "0xasdf", - StakerOptOutWindowBlocks: 100, - MetadataUrl: "https://example.com/metadata.json", - }, - wantErr: true, - expectedErr: ErrInvalidEarningsReceiverAddress, - }, { name: "failed operator validation - wrong DelegationApproverAddress address", operator: Operator{ Address: "0xd5e099c71b797516c10ed0f0d895f429c2781142", DelegationApproverAddress: "0x12", - EarningsReceiverAddress: "0xd5e099c71b797516c10ed0f0d895f429c2781142", StakerOptOutWindowBlocks: 100, MetadataUrl: "https://example.com/metadata.json", }, From bef4c3c2e8d83270da7742534e770b1a52f0c5c8 Mon Sep 17 00:00:00 2001 From: Madhur Shrimal Date: Tue, 6 Aug 2024 12:26:49 -0800 Subject: [PATCH 2/5] fix test --- chainio/clients/elcontracts/reader.go | 1 - 1 file changed, 1 deletion(-) diff --git a/chainio/clients/elcontracts/reader.go b/chainio/clients/elcontracts/reader.go index f3c8a3ef..fd3a0b57 100644 --- a/chainio/clients/elcontracts/reader.go +++ b/chainio/clients/elcontracts/reader.go @@ -143,7 +143,6 @@ func (r *ChainReader) GetOperatorDetails(opts *bind.CallOpts, operator types.Ope return types.Operator{ Address: operator.Address, - EarningsReceiverAddress: operatorDetails.DeprecatedEarningsReceiver.Hex(), StakerOptOutWindowBlocks: operatorDetails.StakerOptOutWindowBlocks, DelegationApproverAddress: operatorDetails.DelegationApprover.Hex(), }, nil From edc9a8ff5dc989959b26a3efc9a48112082813eb Mon Sep 17 00:00:00 2001 From: Madhur Shrimal Date: Tue, 6 Aug 2024 12:31:32 -0800 Subject: [PATCH 3/5] fmt --- metrics/collectors/economic/economic.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/metrics/collectors/economic/economic.go b/metrics/collectors/economic/economic.go index 4dfb0f7f..7403ece8 100644 --- a/metrics/collectors/economic/economic.go +++ b/metrics/collectors/economic/economic.go @@ -198,16 +198,17 @@ func (ec *Collector) Collect(ch chan<- prometheus.Metric) { // // might want to change this behavior if this is emitting too many metrics // sharesWeiFloat, _ := sharesWei.Float64() // // TODO(samlaf): add the token name.. probably need to have a hardcoded dict per env (mainnet, goerli, etc)? Is - // it really that important..? ch <- prometheus.MustNewConstMetric(ec.delegatedShares, prometheus.GaugeValue, - // sharesWeiFloat, strategyAddr.String(), "wei", "token") + // it really that important..? + // ch <- prometheus.MustNewConstMetric(ec.delegatedShares, prometheus.GaugeValue, + // sharesWeiFloat, strategyAddr.String(), "wei", "token") // sharesGweiFloat, _ := sharesWei.Div(sharesWei, big.NewInt(1e9)).Float64() // ch <- prometheus.MustNewConstMetric(ec.delegatedShares, prometheus.GaugeValue, sharesGweiFloat, - // strategyAddr.String(), "gwei", "token") + // strategyAddr.String(), "gwei", "token") // sharesEtherFloat, _ := sharesWei.Div(sharesWei, big.NewInt(1e18)).Float64() // ch <- prometheus.MustNewConstMetric(ec.delegatedShares, prometheus.GaugeValue, sharesEtherFloat, - // strategyAddr.String(), "ether", "token") + // strategyAddr.String(), "ether", "token") // } // } } From 59e06bd2200cbde8af3915d3cfed764003925863 Mon Sep 17 00:00:00 2001 From: Madhur Shrimal Date: Tue, 6 Aug 2024 12:36:12 -0800 Subject: [PATCH 4/5] testutils --- Makefile | 2 +- testutils/anvil.go | 24 ++++++++++++++++++------ testutils/localstack.go | 15 +++++++++++---- 3 files changed, 30 insertions(+), 11 deletions(-) diff --git a/Makefile b/Makefile index 24416d0a..0073e5e6 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ GO_LINES_IGNORED_DIRS=contracts GO_PACKAGES=./chainio/... ./crypto/... ./logging/... \ ./types/... ./utils/... ./signer/... ./cmd/... \ ./signerv2/... ./aws/... ./internal/... ./metrics/... \ - ./nodeapi/... ./cmd/... ./services/... + ./nodeapi/... ./cmd/... ./services/... ./testutils/... GO_FOLDERS=$(shell echo ${GO_PACKAGES} | sed -e "s/\.\///g" | sed -e "s/\/\.\.\.//g") help: @grep -E '^[a-zA-Z0-9_-]+:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' diff --git a/testutils/anvil.go b/testutils/anvil.go index c4da31ca..ebaf0a9c 100644 --- a/testutils/anvil.go +++ b/testutils/anvil.go @@ -3,12 +3,13 @@ package testutils import ( "context" "fmt" - "github.com/ethereum/go-ethereum/ethclient" "log" "os/exec" "path/filepath" "runtime" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/testcontainers/testcontainers-go" @@ -69,8 +70,12 @@ func GetContractAddressesFromContractRegistry(ethHttpUrl string) (mockAvsContrac panic(err) } // The ContractsRegistry contract should always be deployed at this address on anvil - // it's the address of the contract created at nonce 0 by the first anvil account (0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266) - contractsRegistry, err := contractreg.NewContractContractsRegistry(common.HexToAddress("0x5FbDB2315678afecb367f032d93F642f64180aa3"), ethHttpClient) + // it's the address of the contract created at nonce 0 by the first anvil account + // (0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266) + contractsRegistry, err := contractreg.NewContractContractsRegistry( + common.HexToAddress("0x5FbDB2315678afecb367f032d93F642f64180aa3"), + ethHttpClient, + ) if err != nil { panic(err) } @@ -89,7 +94,10 @@ func GetContractAddressesFromContractRegistry(ethHttpUrl string) (mockAvsContrac if mockAvsRegistryCoordinatorAddr == (common.Address{}) { panic("mockAvsRegistryCoordinatorAddr is empty") } - mockAvsOperatorStateRetrieverAddr, err := contractsRegistry.Contracts(&bind.CallOpts{}, "mockAvsOperatorStateRetriever") + mockAvsOperatorStateRetrieverAddr, err := contractsRegistry.Contracts( + &bind.CallOpts{}, + "mockAvsOperatorStateRetriever", + ) if err != nil { panic(err) } @@ -133,9 +141,13 @@ func AdvanceChainByNBlocks(n int, anvilEndpoint string) { } } -// Prefer this function over AdvanceChainByNBlocks b/c it doesn't require cast to be installed on the host machine, whereas this one doesn't. +// Prefer this function over AdvanceChainByNBlocks b/c it doesn't require cast to be installed on the host machine, +// whereas this one doesn't. func AdvanceChainByNBlocksExecInContainer(ctx context.Context, n int, anvilC testcontainers.Container) { - c, _, err := anvilC.Exec(ctx, []string{"cast", "rpc", "anvil_mine", fmt.Sprintf("%d", n), "--rpc-url", "http://localhost:8545"}) + c, _, err := anvilC.Exec( + ctx, + []string{"cast", "rpc", "anvil_mine", fmt.Sprintf("%d", n), "--rpc-url", "http://localhost:8545"}, + ) if err != nil { panic(err) } diff --git a/testutils/localstack.go b/testutils/localstack.go index 41e31049..bf3722df 100644 --- a/testutils/localstack.go +++ b/testutils/localstack.go @@ -16,9 +16,11 @@ const LocalStackPort = "4566" func StartLocalstackContainer(name string) (testcontainers.Container, error) { fmt.Println("Starting Localstack container") req := testcontainers.ContainerRequest{ - Image: "localstack/localstack:latest", - Name: fmt.Sprintf("localstack-test-%s", name), - Env: map[string]string{"LOCALSTACK_HOST": fmt.Sprintf("localhost.localstack.cloud:%s", LocalStackPort)}, + Image: "localstack/localstack:latest", + Name: fmt.Sprintf("localstack-test-%s", name), + Env: map[string]string{ + "LOCALSTACK_HOST": fmt.Sprintf("localhost.localstack.cloud:%s", LocalStackPort), + }, ExposedPorts: []string{LocalStackPort}, WaitingFor: wait.ForLog("Ready."), AutoRemove: true, @@ -30,7 +32,12 @@ func StartLocalstackContainer(name string) (testcontainers.Container, error) { } func NewKMSClient(localStackPort string) (*kms.Client, error) { - cfg, err := aws.GetAWSConfig("localstack", "localstack", "us-east-1", fmt.Sprintf("http://127.0.0.1:%s", localStackPort)) + cfg, err := aws.GetAWSConfig( + "localstack", + "localstack", + "us-east-1", + fmt.Sprintf("http://127.0.0.1:%s", localStackPort), + ) if err != nil { return nil, fmt.Errorf("failed to load AWS config: %w", err) } From 3a5f418fe1bb0ec640e531d17d1d88dbc19aa966 Mon Sep 17 00:00:00 2001 From: Madhur Shrimal Date: Tue, 6 Aug 2024 13:19:22 -0800 Subject: [PATCH 5/5] fix comment --- chainio/clients/elcontracts/writer.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/chainio/clients/elcontracts/writer.go b/chainio/clients/elcontracts/writer.go index b8b9724d..9658be62 100644 --- a/chainio/clients/elcontracts/writer.go +++ b/chainio/clients/elcontracts/writer.go @@ -162,8 +162,7 @@ func (w *ChainWriter) RegisterAsOperator(ctx context.Context, operator types.Ope w.logger.Infof("registering operator %s to EigenLayer", operator.Address) opDetails := delegationmanager.IDelegationManagerOperatorDetails{ - // Earning receiver has been deprecated but just to make sure we have something in contract - // We just force it to be operator address + // Earning receiver has been deprecated, so we just use the operator address as a dummy value // Any reward related setup is via RewardsCoordinator contract DeprecatedEarningsReceiver: gethcommon.HexToAddress(operator.Address), StakerOptOutWindowBlocks: operator.StakerOptOutWindowBlocks, @@ -197,8 +196,7 @@ func (w *ChainWriter) UpdateOperatorDetails( w.logger.Infof("updating operator details of operator %s to EigenLayer", operator.Address) opDetails := delegationmanager.IDelegationManagerOperatorDetails{ - // Earning receiver has been deprecated but just to make sure we have something in contract - // We just force it to be operator address + // Earning receiver has been deprecated, so we just use the operator address as a dummy value // Any reward related setup is via RewardsCoordinator contract DeprecatedEarningsReceiver: gethcommon.HexToAddress(operator.Address), DelegationApprover: gethcommon.HexToAddress(operator.DelegationApproverAddress),