Skip to content

Commit

Permalink
Merge branch 'dev' into datatype_conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
pablodeymo committed Sep 24, 2024
2 parents 253f620 + 6f241c3 commit 74770d4
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 11 deletions.
14 changes: 12 additions & 2 deletions chainio/clients/avsregistry/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,12 @@ func (r *ChainReader) QueryExistingRegisteredOperatorPubKeys(

operatorAddresses := make([]types.OperatorAddr, 0)
operatorPubkeys := make([]types.OperatorPubkeys, 0)
for i := startBlock; i.Cmp(stopBlock) <= 0; i.Add(i, blockRange) {
// QueryExistingRegisteredOperatorPubKeys and QueryExistingRegisteredOperatorSockets
// both run in parallel and they read and mutate the same variable startBlock,
// so we clone it to prevent the race condition.
// TODO: we might want to eventually change the function signature to pass a uint,
// but that would be a breaking change
for i := new(big.Int).Set(startBlock); i.Cmp(stopBlock) <= 0; i.Add(i, blockRange) {
// Subtract 1 since FilterQuery is inclusive
toBlock := big.NewInt(0).Add(i, big.NewInt(0).Sub(blockRange, big.NewInt(1)))
if toBlock.Cmp(stopBlock) > 0 {
Expand Down Expand Up @@ -521,7 +526,12 @@ func (r *ChainReader) QueryExistingRegisteredOperatorSockets(
}

operatorIdToSocketMap := make(map[types.OperatorId]types.Socket)
for i := startBlock; i.Cmp(stopBlock) <= 0; i.Add(i, blockRange) {
// QueryExistingRegisteredOperatorPubKeys and QueryExistingRegisteredOperatorSockets
// both run in parallel and they read and mutate the same variable startBlock,
// so we clone it to prevent the race condition.
// TODO: we might want to eventually change the function signature to pass a uint,
// but that would be a breaking change
for i := new(big.Int).Set(startBlock); i.Cmp(stopBlock) <= 0; i.Add(i, blockRange) {
// Subtract 1 since FilterQuery is inclusive
toBlock := big.NewInt(0).Add(i, big.NewInt(0).Sub(blockRange, big.NewInt(1)))
if toBlock.Cmp(stopBlock) > 0 {
Expand Down
4 changes: 2 additions & 2 deletions chainio/clients/eth/instrumented_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func (iec *InstrumentedClient) TransactionByHash(
if err != nil {
return nil, false, err
}
rpcRequestDuration := time.Since(start)
rpcRequestDuration := time.Since(start).Seconds()
// we only observe the duration of successful calls (even though this is not well defined in the spec)
iec.rpcCallsCollector.ObserveRPCRequestDurationSeconds(
float64(rpcRequestDuration),
Expand Down Expand Up @@ -455,7 +455,7 @@ func instrumentFunction[T any](
if err != nil {
return value, err
}
rpcRequestDuration := time.Since(start)
rpcRequestDuration := time.Since(start).Seconds()
// we only observe the duration of successful calls (even though this is not well defined in the spec)
iec.rpcCallsCollector.ObserveRPCRequestDurationSeconds(
float64(rpcRequestDuration),
Expand Down
30 changes: 23 additions & 7 deletions services/bls_aggregation/blsagg.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,12 +336,6 @@ func (a *BlsAggregatorService) singleTaskAggregatorGoroutineFunc(
signedTaskResponseDigest,
)

err := a.verifySignature(taskIndex, signedTaskResponseDigest, operatorsAvsStateDict)
signedTaskResponseDigest.SignatureVerificationErrorC <- err
if err != nil {
continue
}

// compute the taskResponseDigest using the hash function
taskResponseDigest, err := a.hashFunction(signedTaskResponseDigest.TaskResponse)
if err != nil {
Expand All @@ -350,8 +344,30 @@ func (a *BlsAggregatorService) singleTaskAggregatorGoroutineFunc(
// happens..
continue
}
// after verifying signature we aggregate its sig and pubkey, and update the signed stake amount

// check if the operator has already signed for this digest
digestAggregatedOperators, ok := aggregatedOperatorsDict[taskResponseDigest]
if ok {
if digestAggregatedOperators.signersOperatorIdsSet[signedTaskResponseDigest.OperatorId] {
a.logger.Info(
"Duplicate signature received",
"operatorId", fmt.Sprintf("%x", signedTaskResponseDigest.OperatorId),
"taskIndex", taskIndex,
)
signedTaskResponseDigest.SignatureVerificationErrorC <- fmt.Errorf("duplicate signature from operator %x for task %d", signedTaskResponseDigest.OperatorId, taskIndex)
continue
}
}

err = a.verifySignature(taskIndex, signedTaskResponseDigest, operatorsAvsStateDict)
// return the err (or nil) to the operator, and then proceed to do aggregation logic asynchronously (when no
// error)
signedTaskResponseDigest.SignatureVerificationErrorC <- err
if err != nil {
continue
}

// after verifying signature we aggregate its sig and pubkey, and update the signed stake amount
if !ok {
// first operator to sign on this digest
digestAggregatedOperators = aggregatedOperators{
Expand Down
95 changes: 95 additions & 0 deletions services/bls_aggregation/blsagg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,101 @@ func TestBlsAgg(t *testing.T) {
}
})

t.Run("1 quorum 2 operators operator 1 double sign", func(t *testing.T) {
testOperator1 := types.TestOperator{
OperatorId: types.OperatorId{1},
StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(100)},
BlsKeypair: newBlsKeyPairPanics("0x1"),
}
testOperator2 := types.TestOperator{
OperatorId: types.OperatorId{2},
StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(100)},
BlsKeypair: newBlsKeyPairPanics("0x2"),
}

blockNum := uint32(1)
taskIndex := types.TaskIndex(0)
quorumNumbers := types.QuorumNums{0}
quorumThresholdPercentages := []types.QuorumThresholdPercentage{100}
taskResponse := mockTaskResponse{123}

fakeAvsRegistryService := avsregistry.NewFakeAvsRegistryService(
blockNum,
[]types.TestOperator{testOperator1, testOperator2},
)
logger := testutils.GetTestLogger()
blsAggServ := NewBlsAggregatorService(fakeAvsRegistryService, hashFunction, logger)

logger.Info("Initializing new task", "taskIndex", taskIndex)
err := blsAggServ.InitializeNewTask(
taskIndex,
blockNum,
quorumNumbers,
quorumThresholdPercentages,
10*time.Second, // Longer expiry time for testing
)
require.NoError(t, err)

taskResponseDigest, err := hashFunction(taskResponse)
require.NoError(t, err)

logger.Info("Processing first signature", "operatorId", testOperator1.OperatorId)
blsSigOp1 := testOperator1.BlsKeypair.SignMessage(taskResponseDigest)
err = blsAggServ.ProcessNewSignature(
context.Background(),
taskIndex,
taskResponse,
blsSigOp1,
testOperator1.OperatorId,
)
require.NoError(t, err)

logger.Info("Processing second signature (Operator 1 double sign)", "operatorId", testOperator1.OperatorId)
blsSigOp1Dup := testOperator1.BlsKeypair.SignMessage(taskResponseDigest)
err = blsAggServ.ProcessNewSignature(
context.Background(),
taskIndex,
taskResponse,
blsSigOp1Dup,
testOperator1.OperatorId,
)

if err != nil {
logger.Info("Received error from second signature", "error", err)
require.Contains(t, err.Error(), "duplicate signature")
} else {
t.Fatal("Expected an error for duplicate signature, but got nil")
}

logger.Info("Processing second signature", "operatorId", testOperator2.OperatorId)
blsSigOp2 := testOperator2.BlsKeypair.SignMessage(taskResponseDigest)
err = blsAggServ.ProcessNewSignature(
context.Background(),
taskIndex,
taskResponse,
blsSigOp2,
testOperator2.OperatorId,
)
require.NoError(t, err)

wantAggregationServiceResponse := BlsAggregationServiceResponse{
Err: nil,
TaskIndex: taskIndex,
TaskResponse: taskResponse,
TaskResponseDigest: taskResponseDigest,
NonSignersPubkeysG1: []*bls.G1Point{},
QuorumApksG1: []*bls.G1Point{testOperator1.BlsKeypair.GetPubKeyG1().
Add(testOperator2.BlsKeypair.GetPubKeyG1()),
},
SignersApkG2: testOperator1.BlsKeypair.GetPubKeyG2().
Add(testOperator2.BlsKeypair.GetPubKeyG2()),
SignersAggSigG1: testOperator1.BlsKeypair.SignMessage(taskResponseDigest).
Add(testOperator2.BlsKeypair.SignMessage(taskResponseDigest)),
}
gotAggregationServiceResponse := <-blsAggServ.aggregatedResponsesC
require.EqualValues(t, wantAggregationServiceResponse, gotAggregationServiceResponse)

})
t.Run("1 quorum 1 operator 0 signatures - task expired", func(t *testing.T) {
testOperator1 := types.TestOperator{
OperatorId: types.OperatorId{1},
Expand Down

0 comments on commit 74770d4

Please sign in to comment.