diff --git a/rollup/rollup_sync_service/rollup_sync_service.go b/rollup/rollup_sync_service/rollup_sync_service.go index 49e6fb1b74eb..a379c23ee4aa 100644 --- a/rollup/rollup_sync_service/rollup_sync_service.go +++ b/rollup/rollup_sync_service/rollup_sync_service.go @@ -7,7 +7,7 @@ import ( "math/big" "os" "reflect" - "sync" + "runtime/internal/atomic" "time" "github.com/scroll-tech/da-codec/encoding" @@ -57,15 +57,13 @@ type RollupSyncService struct { cancel context.CancelFunc client *L1Client db ethdb.Database - latestProcessedBlock uint64 + latestProcessedBlock atomic.Uint64 scrollChainABI *abi.ABI l1CommitBatchEventSignature common.Hash l1RevertBatchEventSignature common.Hash l1FinalizeBatchEventSignature common.Hash bc *core.BlockChain stack *node.Node - - stateMu sync.Mutex // protects the service state, e.g. db and latestProcessedBlock updates } func NewRollupSyncService(ctx context.Context, genesisConfig *params.ChainConfig, db ethdb.Database, l1Client sync_service.EthClient, bc *core.BlockChain, stack *node.Node) (*RollupSyncService, error) { @@ -109,7 +107,6 @@ func NewRollupSyncService(ctx context.Context, genesisConfig *params.ChainConfig cancel: cancel, client: client, db: db, - latestProcessedBlock: latestProcessedBlock, scrollChainABI: scrollChainABI, l1CommitBatchEventSignature: scrollChainABI.Events["CommitBatch"].ID, l1RevertBatchEventSignature: scrollChainABI.Events["RevertBatch"].ID, @@ -118,6 +115,8 @@ func NewRollupSyncService(ctx context.Context, genesisConfig *params.ChainConfig stack: stack, } + service.latestProcessedBlock.Store(latestProcessedBlock) + return &service, nil } @@ -126,7 +125,7 @@ func (s *RollupSyncService) Start() { return } - log.Info("Starting rollup event sync background service", "latest processed block", s.latestProcessedBlock) + log.Info("Starting rollup event sync background service", "latest processed block", s.latestProcessedBlock.Load()) go func() { syncTicker := time.NewTicker(defaultSyncInterval) @@ -142,7 +141,7 @@ func (s *RollupSyncService) Start() { case <-syncTicker.C: s.fetchRollupEvents() case <-logTicker.C: - log.Info("Sync rollup events progress update", "latestProcessedBlock", s.latestProcessedBlock) + log.Info("Sync rollup events progress update", "latestProcessedBlock", s.latestProcessedBlock.Load()) } } }() @@ -166,29 +165,22 @@ func (s *RollupSyncService) ResetToHeight(height uint64) { return } - s.stateMu.Lock() - defer s.stateMu.Unlock() - - rawdb.WriteRollupEventSyncedL1BlockNumber(s.db, height) - s.latestProcessedBlock = height + s.latestProcessedBlock.Store(height) log.Info("Reset sync service", "height", height) } func (s *RollupSyncService) fetchRollupEvents() { - s.stateMu.Lock() - defer s.stateMu.Unlock() - latestConfirmed, err := s.client.getLatestFinalizedBlockNumber() if err != nil { log.Warn("failed to get latest confirmed block number", "err", err) return } - log.Trace("Sync service fetch rollup events", "latest processed block", s.latestProcessedBlock, "latest confirmed", latestConfirmed) + log.Trace("Sync service fetch rollup events", "latest processed block", s.latestProcessedBlock.Load(), "latest confirmed", latestConfirmed) // query in batches - for from := s.latestProcessedBlock + 1; from <= latestConfirmed; from += defaultFetchBlockRange { + for from := s.latestProcessedBlock.Load() + 1; from <= latestConfirmed; from += defaultFetchBlockRange { if s.ctx.Err() != nil { log.Info("Context canceled", "reason", s.ctx.Err()) return @@ -210,7 +202,7 @@ func (s *RollupSyncService) fetchRollupEvents() { return } - s.latestProcessedBlock = to + s.latestProcessedBlock.Store(to) } } diff --git a/rollup/sync_service/sync_service.go b/rollup/sync_service/sync_service.go index 89f6b8a36865..05efe802e05a 100644 --- a/rollup/sync_service/sync_service.go +++ b/rollup/sync_service/sync_service.go @@ -4,7 +4,7 @@ import ( "context" "fmt" "reflect" - "sync" + "sync/atomic" "time" "github.com/scroll-tech/go-ethereum/core" @@ -49,10 +49,8 @@ type SyncService struct { db ethdb.Database msgCountFeed event.Feed pollInterval time.Duration - latestProcessedBlock uint64 + latestProcessedBlock atomic.Uint64 scope event.SubscriptionScope - - stateMu sync.Mutex // protects the service state, e.g. db and latestProcessedBlock updates } func NewSyncService(ctx context.Context, genesisConfig *params.ChainConfig, nodeConfig *node.Config, db ethdb.Database, l1Client EthClient) (*SyncService, error) { @@ -82,14 +80,15 @@ func NewSyncService(ctx context.Context, genesisConfig *params.ChainConfig, node ctx, cancel := context.WithCancel(ctx) service := SyncService{ - ctx: ctx, - cancel: cancel, - client: client, - db: db, - pollInterval: DefaultPollInterval, - latestProcessedBlock: latestProcessedBlock, + ctx: ctx, + cancel: cancel, + client: client, + db: db, + pollInterval: DefaultPollInterval, } + service.latestProcessedBlock.Store(latestProcessedBlock) + return &service, nil } @@ -99,14 +98,14 @@ func (s *SyncService) Start() { } // wait for initial sync before starting node - log.Info("Starting L1 message sync service", "latestProcessedBlock", s.latestProcessedBlock) + log.Info("Starting L1 message sync service", "latestProcessedBlock", s.latestProcessedBlock.Load()) // block node startup during initial sync and print some helpful logs latestConfirmed, err := s.client.getLatestConfirmedBlockNumber(s.ctx) - if err == nil && latestConfirmed > s.latestProcessedBlock+1000 { + if err == nil && latestConfirmed > s.latestProcessedBlock.Load()+1000 { log.Warn("Running initial sync of L1 messages before starting l2geth, this might take a while...") s.fetchMessages() - log.Info("L1 message initial sync completed", "latestProcessedBlock", s.latestProcessedBlock) + log.Info("L1 message initial sync completed", "latestProcessedBlock", s.latestProcessedBlock.Load()) } go func() { @@ -148,11 +147,7 @@ func (s *SyncService) ResetToHeight(height uint64) { return } - s.stateMu.Lock() - defer s.stateMu.Unlock() - - rawdb.WriteSyncedL1BlockNumber(s.db, height) - s.latestProcessedBlock = height + s.latestProcessedBlock.Store(height) log.Info("Reset sync service", "height", height) } @@ -164,16 +159,13 @@ func (s *SyncService) SubscribeNewL1MsgsEvent(ch chan<- core.NewL1MsgsEvent) eve } func (s *SyncService) fetchMessages() { - s.stateMu.Lock() - defer s.stateMu.Unlock() - latestConfirmed, err := s.client.getLatestConfirmedBlockNumber(s.ctx) if err != nil { log.Warn("Failed to get latest confirmed block number", "err", err) return } - log.Trace("Sync service fetchMessages", "latestProcessedBlock", s.latestProcessedBlock, "latestConfirmed", latestConfirmed) + log.Trace("Sync service fetchMessages", "latestProcessedBlock", s.latestProcessedBlock.Load(), "latestConfirmed", latestConfirmed) // keep track of next queue index we're expecting to see queueIndex := rawdb.ReadHighestSyncedQueueIndex(s.db) @@ -203,7 +195,7 @@ func (s *SyncService) fetchMessages() { numMessagesPendingDbWrite = 0 } - s.latestProcessedBlock = lastBlock + s.latestProcessedBlock.Store(lastBlock) } // ticker for logging progress @@ -211,7 +203,7 @@ func (s *SyncService) fetchMessages() { numMsgsCollected := 0 // query in batches - for from := s.latestProcessedBlock + 1; from <= latestConfirmed; from += DefaultFetchBlockRange { + for from := s.latestProcessedBlock.Load() + 1; from <= latestConfirmed; from += DefaultFetchBlockRange { select { case <-s.ctx.Done(): // flush pending writes to database @@ -220,8 +212,8 @@ func (s *SyncService) fetchMessages() { } return case <-t.C: - progress := 100 * float64(s.latestProcessedBlock) / float64(latestConfirmed) - log.Info("Syncing L1 messages", "processed", s.latestProcessedBlock, "confirmed", latestConfirmed, "collected", numMsgsCollected, "progress(%)", progress) + progress := 100 * float64(s.latestProcessedBlock.Load()) / float64(latestConfirmed) + log.Info("Syncing L1 messages", "processed", s.latestProcessedBlock.Load(), "confirmed", latestConfirmed, "collected", numMsgsCollected, "progress(%)", progress) default: }