Skip to content

Commit

Permalink
optimize locks
Browse files Browse the repository at this point in the history
  • Loading branch information
tudor-malene committed Feb 4, 2025
1 parent 487bf49 commit 1684303
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 12 deletions.
11 changes: 8 additions & 3 deletions go/common/subscription/new_heads_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,16 @@ func (nhs *NewHeadsService) onNewBatch(head *common.BatchHeader) error {
msg = ConvertBatchHeader(head)
}

nhs.notifiersMutex.Lock()
defer nhs.notifiersMutex.Unlock()
// copy the notifiers
nhs.notifiersMutex.RLock()
notifiers := make(map[rpc.ID]*rpc.Notifier)
for id, notifier := range nhs.newHeadNotifiers {
notifiers[id] = notifier
}
nhs.notifiersMutex.RUnlock()

// for each new head, notify all registered subscriptions
for id, notifier := range nhs.newHeadNotifiers {
for id, notifier := range notifiers {
if nhs.stopped.Load() {
return nil
}
Expand Down
11 changes: 7 additions & 4 deletions go/enclave/components/batch_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,6 @@ func (br *batchRegistry) OnL1Reorg(_ *BlockIngestionType) {

func (br *batchRegistry) OnBatchExecuted(batchHeader *common.BatchHeader, txExecResults []*core.TxExecResult) error {
defer core.LogMethodDuration(br.logger, measure.NewStopwatch(), "OnBatchExecuted", log.BatchHashKey, batchHeader.Hash())
br.callbackMutex.RLock()
defer br.callbackMutex.RUnlock()

txs, err := br.storage.FetchBatchTransactionsBySeq(context.Background(), batchHeader.SequencerOrderNo.Uint64())
if err != nil && !errors.Is(err, errutil.ErrNotFound) {
Expand All @@ -116,12 +114,17 @@ func (br *batchRegistry) OnBatchExecuted(batchHeader *common.BatchHeader, txExec
}

br.headBatchSeq.Store(batchHeader.SequencerOrderNo)
if br.batchesCallback != nil {

br.callbackMutex.RLock()
callback := br.batchesCallback
br.callbackMutex.RUnlock()

if callback != nil {
txReceipts := make([]*types.Receipt, len(txExecResults))
for i, txExecResult := range txExecResults {
txReceipts[i] = txExecResult.Receipt
}
br.batchesCallback(batch, txReceipts)
callback(batch, txReceipts)
}

br.lastExecutedBatch.Mark()
Expand Down
3 changes: 0 additions & 3 deletions go/enclave/enclave_admin_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,6 @@ func (e *enclaveAdminService) SubmitBatch(ctx context.Context, extBatch *common.
return err
}

e.dataInMutex.Lock()
defer e.dataInMutex.Unlock()

// if the signature is valid, then store the batch together with the converted hash
err = e.storage.StoreBatch(ctx, batch, convertedHeader.Hash())
if err != nil {
Expand Down
8 changes: 6 additions & 2 deletions go/enclave/events/subscription_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ func (s *SubscriptionManager) RemoveSubscription(id gethrpc.ID) {
// The assumption is that this function is called synchronously after the batch is produced
func (s *SubscriptionManager) GetSubscribedLogsForBatch(ctx context.Context, batch *core.Batch, receipts types.Receipts) (common.EncryptedSubscriptionLogs, error) {
s.subscriptionMutex.RLock()
defer s.subscriptionMutex.RUnlock()
subs := make(map[gethrpc.ID]*logSubscription)
for key, value := range s.subscriptions {
subs[key] = value
}
s.subscriptionMutex.RUnlock()

// exit early if there are no subscriptions
if len(s.subscriptions) == 0 {
Expand All @@ -101,7 +105,7 @@ func (s *SubscriptionManager) GetSubscribedLogsForBatch(ctx context.Context, bat
return nil, nil
}

for id, sub := range s.subscriptions {
for id, sub := range subs {
relevantLogsForSub, err := s.storage.FilterLogs(ctx, sub.ViewingKeyEncryptor.AccountAddress, nil, nil, &h, sub.Subscription.Filter.Addresses, sub.Subscription.Filter.Topics)
if err != nil {
return nil, err
Expand Down

0 comments on commit 1684303

Please sign in to comment.