diff --git a/services/bls_aggregation/blsagg.go b/services/bls_aggregation/blsagg.go index d3c630d6..4bf9f18a 100644 --- a/services/bls_aggregation/blsagg.go +++ b/services/bls_aggregation/blsagg.go @@ -336,12 +336,6 @@ func (a *BlsAggregatorService) singleTaskAggregatorGoroutineFunc( signedTaskResponseDigest, ) - err := a.verifySignature(taskIndex, signedTaskResponseDigest, operatorsAvsStateDict) - signedTaskResponseDigest.SignatureVerificationErrorC <- err - if err != nil { - continue - } - // compute the taskResponseDigest using the hash function taskResponseDigest, err := a.hashFunction(signedTaskResponseDigest.TaskResponse) if err != nil { @@ -350,8 +344,30 @@ func (a *BlsAggregatorService) singleTaskAggregatorGoroutineFunc( // happens.. continue } - // after verifying signature we aggregate its sig and pubkey, and update the signed stake amount + + // check if the operator has already signed for this digest digestAggregatedOperators, ok := aggregatedOperatorsDict[taskResponseDigest] + if ok { + if digestAggregatedOperators.signersOperatorIdsSet[signedTaskResponseDigest.OperatorId] { + a.logger.Info( + "Duplicate signature received", + "operatorId", fmt.Sprintf("%x", signedTaskResponseDigest.OperatorId), + "taskIndex", taskIndex, + ) + signedTaskResponseDigest.SignatureVerificationErrorC <- fmt.Errorf("duplicate signature from operator %x for task %d", signedTaskResponseDigest.OperatorId, taskIndex) + continue + } + } + + err = a.verifySignature(taskIndex, signedTaskResponseDigest, operatorsAvsStateDict) + // return the err (or nil) to the operator, and then proceed to do aggregation logic asynchronously (when no + // error) + signedTaskResponseDigest.SignatureVerificationErrorC <- err + if err != nil { + continue + } + + // after verifying signature we aggregate its sig and pubkey, and update the signed stake amount if !ok { // first operator to sign on this digest digestAggregatedOperators = aggregatedOperators{ diff --git a/services/bls_aggregation/blsagg_test.go b/services/bls_aggregation/blsagg_test.go index cb736ced..fd10338c 100644 --- a/services/bls_aggregation/blsagg_test.go +++ b/services/bls_aggregation/blsagg_test.go @@ -413,6 +413,101 @@ func TestBlsAgg(t *testing.T) { } }) + t.Run("1 quorum 2 operators operator 1 double sign", func(t *testing.T) { + testOperator1 := types.TestOperator{ + OperatorId: types.OperatorId{1}, + StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(100)}, + BlsKeypair: newBlsKeyPairPanics("0x1"), + } + testOperator2 := types.TestOperator{ + OperatorId: types.OperatorId{2}, + StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(100)}, + BlsKeypair: newBlsKeyPairPanics("0x2"), + } + + blockNum := uint32(1) + taskIndex := types.TaskIndex(0) + quorumNumbers := types.QuorumNums{0} + quorumThresholdPercentages := []types.QuorumThresholdPercentage{100} + taskResponse := mockTaskResponse{123} + + fakeAvsRegistryService := avsregistry.NewFakeAvsRegistryService( + blockNum, + []types.TestOperator{testOperator1, testOperator2}, + ) + logger := testutils.GetTestLogger() + blsAggServ := NewBlsAggregatorService(fakeAvsRegistryService, hashFunction, logger) + + logger.Info("Initializing new task", "taskIndex", taskIndex) + err := blsAggServ.InitializeNewTask( + taskIndex, + blockNum, + quorumNumbers, + quorumThresholdPercentages, + 10*time.Second, // Longer expiry time for testing + ) + require.NoError(t, err) + + taskResponseDigest, err := hashFunction(taskResponse) + require.NoError(t, err) + + logger.Info("Processing first signature", "operatorId", testOperator1.OperatorId) + blsSigOp1 := testOperator1.BlsKeypair.SignMessage(taskResponseDigest) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse, + blsSigOp1, + testOperator1.OperatorId, + ) + require.NoError(t, err) + + logger.Info("Processing second signature (Operator 1 double sign)", "operatorId", testOperator1.OperatorId) + blsSigOp1Dup := testOperator1.BlsKeypair.SignMessage(taskResponseDigest) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse, + blsSigOp1Dup, + testOperator1.OperatorId, + ) + + if err != nil { + logger.Info("Received error from second signature", "error", err) + require.Contains(t, err.Error(), "duplicate signature") + } else { + t.Fatal("Expected an error for duplicate signature, but got nil") + } + + logger.Info("Processing second signature", "operatorId", testOperator2.OperatorId) + blsSigOp2 := testOperator2.BlsKeypair.SignMessage(taskResponseDigest) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse, + blsSigOp2, + testOperator2.OperatorId, + ) + require.NoError(t, err) + + wantAggregationServiceResponse := BlsAggregationServiceResponse{ + Err: nil, + TaskIndex: taskIndex, + TaskResponse: taskResponse, + TaskResponseDigest: taskResponseDigest, + NonSignersPubkeysG1: []*bls.G1Point{}, + QuorumApksG1: []*bls.G1Point{testOperator1.BlsKeypair.GetPubKeyG1(). + Add(testOperator2.BlsKeypair.GetPubKeyG1()), + }, + SignersApkG2: testOperator1.BlsKeypair.GetPubKeyG2(). + Add(testOperator2.BlsKeypair.GetPubKeyG2()), + SignersAggSigG1: testOperator1.BlsKeypair.SignMessage(taskResponseDigest). + Add(testOperator2.BlsKeypair.SignMessage(taskResponseDigest)), + } + gotAggregationServiceResponse := <-blsAggServ.aggregatedResponsesC + require.EqualValues(t, wantAggregationServiceResponse, gotAggregationServiceResponse) + + }) t.Run("1 quorum 1 operator 0 signatures - task expired", func(t *testing.T) { testOperator1 := types.TestOperator{ OperatorId: types.OperatorId{1},