From 6ec28a8e1ccecb10ae1d93636a9496cf4a045e1a Mon Sep 17 00:00:00 2001 From: ChenYe Date: Wed, 19 Jun 2024 10:13:45 +0800 Subject: [PATCH] fix: race condition on blsagg --- services/bls_aggregation/blsagg.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/services/bls_aggregation/blsagg.go b/services/bls_aggregation/blsagg.go index 394bf993..fab356f9 100644 --- a/services/bls_aggregation/blsagg.go +++ b/services/bls_aggregation/blsagg.go @@ -172,13 +172,18 @@ func (a *BlsAggregatorService) InitializeNewTask( timeToExpiry time.Duration, ) error { a.logger.Debug("AggregatorService initializing new task", "taskIndex", taskIndex, "taskCreatedBlock", taskCreatedBlock, "quorumNumbers", quorumNumbers, "quorumThresholdPercentages", quorumThresholdPercentages, "timeToExpiry", timeToExpiry) - if _, taskExists := a.signedTaskRespsCs[taskIndex]; taskExists { - return TaskAlreadyInitializedErrorFn(taskIndex) - } - signedTaskRespsC := make(chan types.SignedTaskResponseDigest) + a.taskChansMutex.Lock() - a.signedTaskRespsCs[taskIndex] = signedTaskRespsC + signedTaskRespsC, taskExists := a.signedTaskRespsCs[taskIndex] + if !taskExists { + signedTaskRespsC = make(chan types.SignedTaskResponseDigest) + a.signedTaskRespsCs[taskIndex] = signedTaskRespsC + } a.taskChansMutex.Unlock() + if taskExists { + return TaskAlreadyInitializedErrorFn(taskIndex) + } + go a.singleTaskAggregatorGoroutineFunc(taskIndex, taskCreatedBlock, quorumNumbers, quorumThresholdPercentages, timeToExpiry, signedTaskRespsC) return nil }