Skip to content

Commit

Permalink
Add BLS Aggregation window waiting after quorum (#386)
Browse files Browse the repository at this point in the history
  • Loading branch information
TomasArrachea authored Nov 15, 2024
1 parent e050883 commit 6c859ec
Show file tree
Hide file tree
Showing 2 changed files with 578 additions and 60 deletions.
221 changes: 161 additions & 60 deletions services/bls_aggregation/blsagg.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,13 @@ type aggregatedOperators struct {
// BlsAggregationService is the interface provided to avs aggregator code for doing bls aggregation
// Currently its only implementation is the BlsAggregatorService, so see the comment there for more details
type BlsAggregationService interface {
// InitializeNewTask should be called whenever a new task is created. ProcessNewSignature will return an error
// if the task it is trying to process has not been initialized yet.
// quorumNumbers and quorumThresholdPercentages set the requirements for this task to be considered complete, which
// InitializeNewTask creates a new task goroutine meant to process new signed task responses for that task
// (that are sent via ProcessNewSignature) and adds a channel to a.taskChans to send the signed task responses to
// it. The quorumNumbers and quorumThresholdPercentages set the requirements for this task to be considered
// complete, which
// happens when a particular TaskResponseDigest (received via the a.taskChans[taskIndex]) has been signed by signers
// whose stake
// in each of the listed quorums adds up to at least quorumThresholdPercentages[i] of the total stake in that quorum
// whose stake in each of the listed quorums adds up to at least quorumThresholdPercentages[i] of the total stake in
// that quorum
InitializeNewTask(
taskIndex types.TaskIndex,
taskCreatedBlock uint32,
Expand All @@ -95,6 +96,27 @@ type BlsAggregationService interface {
timeToExpiry time.Duration,
) error

// InitializeNewTaskWithWindow creates a new task goroutine meant to process new signed task responses for that task
// (that are sent via ProcessNewSignature) and adds a channel to a.taskChans to send the signed task responses to
// it. The quorumNumbers and quorumThresholdPercentages set the requirements for this task to be considered
// complete, which
// happens when a particular TaskResponseDigest (received via the a.taskChans[taskIndex]) has been signed by signers
// whose stake in each of the listed quorums adds up to at least quorumThresholdPercentages[i] of the total stake in
// that quorum.
// Once the quorum is reached, the task is still open for a window of `windowDuration` time to receive more
// signatures,
// before sending the aggregation response through the aggregatedResponsesC channel.
// If the task expiration is reached before the window finishes, the task response will still be sent to the
// aggregatedResponsesC channel.
InitializeNewTaskWithWindow(
taskIndex types.TaskIndex,
taskCreatedBlock uint32,
quorumNumbers types.QuorumNums,
quorumThresholdPercentages types.QuorumThresholdPercentages,
timeToExpiry time.Duration,
windowDuration time.Duration,
) error

// ProcessNewSignature processes a new signature over a taskResponseDigest for a particular taskIndex by a
// particular operator It verifies that the signature is correct and returns an error if it is not, and then
// aggregates the signature and stake of
Expand Down Expand Up @@ -185,17 +207,43 @@ func (a *BlsAggregatorService) GetResponseChannel() <-chan BlsAggregationService
}

// InitializeNewTask creates a new task goroutine meant to process new signed task responses for that task
// (that are sent via ProcessNewSignature) and adds a channel to a.taskChans to send the signed task responses to it
// quorumNumbers and quorumThresholdPercentages set the requirements for this task to be considered complete, which
// happens
// when a particular TaskResponseDigest (received via the a.taskChans[taskIndex]) has been signed by signers whose stake
// in each of the listed quorums adds up to at least quorumThresholdPercentages[i] of the total stake in that quorum
// (that are sent via ProcessNewSignature) and adds a channel to a.taskChans to send the signed task responses to it.
// The quorumNumbers and quorumThresholdPercentages set the requirements for this task to be considered complete, which
// happens when a particular TaskResponseDigest (received via the a.taskChans[taskIndex]) has been signed by signers
// whose stake in each of the listed quorums adds up to at least quorumThresholdPercentages[i] of the total stake in
// that quorum
func (a *BlsAggregatorService) InitializeNewTask(
taskIndex types.TaskIndex,
taskCreatedBlock uint32,
quorumNumbers types.QuorumNums,
quorumThresholdPercentages types.QuorumThresholdPercentages,
timeToExpiry time.Duration,
) error {
return a.InitializeNewTaskWithWindow(
taskIndex,
taskCreatedBlock,
quorumNumbers,
quorumThresholdPercentages,
timeToExpiry,
0,
)
}

// InitializeNewTaskWithWindow creates a new task goroutine meant to process new signed task responses for that task
// (that are sent via ProcessNewSignature) and adds a channel to a.taskChans to send the signed task responses to it.
// The quorumNumbers and quorumThresholdPercentages set the requirements for this task to be considered complete, which
// happens when a particular TaskResponseDigest (received via the a.taskChans[taskIndex]) has been signed by signers
// whose stake in each of the listed quorums adds up to at least quorumThresholdPercentages[i] of the total stake in
// that quorum.
// Once the quorum is reached, the task is still open for a window of `windowDuration` time to receive more signatures,
// before sending the aggregation response through the aggregatedResponsesC channel.
func (a *BlsAggregatorService) InitializeNewTaskWithWindow(
taskIndex types.TaskIndex,
taskCreatedBlock uint32,
quorumNumbers types.QuorumNums,
quorumThresholdPercentages types.QuorumThresholdPercentages,
timeToExpiry time.Duration,
windowDuration time.Duration,
) error {
a.logger.Debug(
"AggregatorService initializing new task",
Expand Down Expand Up @@ -225,6 +273,7 @@ func (a *BlsAggregatorService) InitializeNewTask(
quorumNumbers,
quorumThresholdPercentages,
timeToExpiry,
windowDuration,
signedTaskRespsC,
)
return nil
Expand Down Expand Up @@ -271,6 +320,7 @@ func (a *BlsAggregatorService) singleTaskAggregatorGoroutineFunc(
quorumNumbers types.QuorumNums,
quorumThresholdPercentages []types.QuorumThresholdPercentage,
timeToExpiry time.Duration,
windowDuration time.Duration,
signedTaskRespsC <-chan types.SignedTaskResponseDigest,
) {
a.logger.Debug("AggregatorService goroutine processing new task",
Expand Down Expand Up @@ -342,6 +392,13 @@ func (a *BlsAggregatorService) singleTaskAggregatorGoroutineFunc(
taskExpiredTimer := time.NewTimer(timeToExpiry)

aggregatedOperatorsDict := map[types.TaskResponseDigest]aggregatedOperators{}
// The windowTimer is initialized to be longer than the taskExpiredTimer as it will
// be overwritten once the stake threshold is met
windowTimer := time.NewTimer(timeToExpiry + 1*time.Second)
openWindow := false
var lastSignedTaskResponseDigest types.SignedTaskResponseDigest
var lastDigestAggregatedOperators aggregatedOperators
var lastTaskResponseDigest types.TaskResponseDigest
for {
select {
case signedTaskResponseDigest := <-signedTaskRespsC:
Expand Down Expand Up @@ -415,11 +472,17 @@ func (a *BlsAggregatorService) singleTaskAggregatorGoroutineFunc(
digestAggregatedOperators.signersTotalStakePerQuorum[quorumNum].Add(digestAggregatedOperators.signersTotalStakePerQuorum[quorumNum], stake)
}
}

// update the buffer variables to be used when the window timer fires
lastDigestAggregatedOperators = digestAggregatedOperators
lastTaskResponseDigest = taskResponseDigest
lastSignedTaskResponseDigest = signedTaskResponseDigest

// update the aggregatedOperatorsDict. Note that we need to assign the whole struct value at once,
// because of https://github.com/golang/go/issues/3117
aggregatedOperatorsDict[taskResponseDigest] = digestAggregatedOperators

if checkIfStakeThresholdsMet(
if !openWindow && checkIfStakeThresholdsMet(
a.logger,
digestAggregatedOperators.signersTotalStakePerQuorum,
totalStakePerQuorum,
Expand All @@ -429,67 +492,105 @@ func (a *BlsAggregatorService) singleTaskAggregatorGoroutineFunc(
"taskIndex", taskIndex,
"taskResponseDigest", taskResponseDigest)

nonSignersOperatorIds := []types.OperatorId{}
for operatorId := range operatorsAvsStateDict {
if _, operatorSigned := digestAggregatedOperators.signersOperatorIdsSet[operatorId]; !operatorSigned {
nonSignersOperatorIds = append(nonSignersOperatorIds, operatorId)
}
}

// the contract requires a sorted nonSignersOperatorIds
sort.SliceStable(nonSignersOperatorIds, func(i, j int) bool {
iOprInt := new(big.Int).SetBytes(nonSignersOperatorIds[i][:])
jOprInt := new(big.Int).SetBytes(nonSignersOperatorIds[j][:])
return iOprInt.Cmp(jOprInt) == -1
})

nonSignersG1Pubkeys := []*bls.G1Point{}
for _, operatorId := range nonSignersOperatorIds {
operator := operatorsAvsStateDict[operatorId]
nonSignersG1Pubkeys = append(nonSignersG1Pubkeys, operator.OperatorInfo.Pubkeys.G1Pubkey)
}

indices, err := a.avsRegistryService.GetCheckSignaturesIndices(
&bind.CallOpts{},
openWindow = true
windowTimer = time.NewTimer(windowDuration)
a.logger.Debug("Window timer started")
}
case <-taskExpiredTimer.C:
if openWindow {
a.sendAggregatedResponse(
operatorsAvsStateDict,
taskIndex,
taskCreatedBlock,
lastSignedTaskResponseDigest,
lastDigestAggregatedOperators,
quorumNumbers,
nonSignersOperatorIds,
lastTaskResponseDigest,
quorumApksG1,
)
if err != nil {
a.aggregatedResponsesC <- BlsAggregationServiceResponse{
Err: utils.WrapError(errors.New("Failed to get check signatures indices"), err),
TaskIndex: taskIndex,
}
return
}

blsAggregationServiceResponse := BlsAggregationServiceResponse{
Err: nil,
TaskIndex: taskIndex,
TaskResponse: signedTaskResponseDigest.TaskResponse,
TaskResponseDigest: taskResponseDigest,
NonSignersPubkeysG1: nonSignersG1Pubkeys,
QuorumApksG1: quorumApksG1,
SignersApkG2: digestAggregatedOperators.signersApkG2,
SignersAggSigG1: digestAggregatedOperators.signersAggSigG1,
NonSignerQuorumBitmapIndices: indices.NonSignerQuorumBitmapIndices,
QuorumApkIndices: indices.QuorumApkIndices,
TotalStakeIndices: indices.TotalStakeIndices,
NonSignerStakeIndices: indices.NonSignerStakeIndices,
}
a.aggregatedResponsesC <- blsAggregationServiceResponse
taskExpiredTimer.Stop()
return
}
case <-taskExpiredTimer.C:

a.aggregatedResponsesC <- BlsAggregationServiceResponse{
Err: TaskExpiredErrorFn(taskIndex),
TaskIndex: taskIndex,
}
return
case <-windowTimer.C:
a.logger.Debug("Window timer expired")
a.sendAggregatedResponse(
operatorsAvsStateDict,
taskIndex,
taskCreatedBlock,
lastSignedTaskResponseDigest,
lastDigestAggregatedOperators,
quorumNumbers,
lastTaskResponseDigest,
quorumApksG1,
)
return
}
}
}

func (a *BlsAggregatorService) sendAggregatedResponse(
operatorsAvsStateDict map[types.OperatorId]types.OperatorAvsState,
taskIndex types.TaskIndex,
taskCreatedBlock uint32,
signedTaskResponseDigest types.SignedTaskResponseDigest,
digestAggregatedOperators aggregatedOperators,
quorumNumbers types.QuorumNums,
taskResponseDigest types.TaskResponseDigest,
quorumApksG1 []*bls.G1Point,
) {
nonSignersOperatorIds := []types.OperatorId{}
for operatorId := range operatorsAvsStateDict {
if _, operatorSigned := digestAggregatedOperators.signersOperatorIdsSet[operatorId]; !operatorSigned {
nonSignersOperatorIds = append(nonSignersOperatorIds, operatorId)
}
}

// the contract requires a sorted nonSignersOperatorIds
sort.SliceStable(nonSignersOperatorIds, func(i, j int) bool {
iOprInt := new(big.Int).SetBytes(nonSignersOperatorIds[i][:])
jOprInt := new(big.Int).SetBytes(nonSignersOperatorIds[j][:])
return iOprInt.Cmp(jOprInt) == -1
})

nonSignersG1Pubkeys := []*bls.G1Point{}
for _, operatorId := range nonSignersOperatorIds {
operator := operatorsAvsStateDict[operatorId]
nonSignersG1Pubkeys = append(nonSignersG1Pubkeys, operator.OperatorInfo.Pubkeys.G1Pubkey)
}

indices, err := a.avsRegistryService.GetCheckSignaturesIndices(
&bind.CallOpts{},
taskCreatedBlock,
quorumNumbers,
nonSignersOperatorIds,
)
if err != nil {
a.aggregatedResponsesC <- BlsAggregationServiceResponse{
Err: utils.WrapError(errors.New("Failed to get check signatures indices"), err),
TaskIndex: taskIndex,
}
return
}

blsAggregationServiceResponse := BlsAggregationServiceResponse{
Err: nil,
TaskIndex: taskIndex,
TaskResponse: signedTaskResponseDigest.TaskResponse,
TaskResponseDigest: taskResponseDigest,
NonSignersPubkeysG1: nonSignersG1Pubkeys,
QuorumApksG1: quorumApksG1,
SignersApkG2: digestAggregatedOperators.signersApkG2,
SignersAggSigG1: digestAggregatedOperators.signersAggSigG1,
NonSignerQuorumBitmapIndices: indices.NonSignerQuorumBitmapIndices,
QuorumApkIndices: indices.QuorumApkIndices,
TotalStakeIndices: indices.TotalStakeIndices,
NonSignerStakeIndices: indices.NonSignerStakeIndices,
}
a.aggregatedResponsesC <- blsAggregationServiceResponse
}

// closeTaskGoroutine is run when the goroutine processing taskIndex's task responses ends (for whatever reason)
Expand Down
Loading

0 comments on commit 6c859ec

Please sign in to comment.