Skip to content

Commit

Permalink
Add chain id in logging
Browse files Browse the repository at this point in the history
  • Loading branch information
davidcauchi committed Jan 14, 2025
1 parent 11a42ba commit f77a055
Showing 1 changed file with 13 additions and 13 deletions.
26 changes: 13 additions & 13 deletions lib/blockchain/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -1325,7 +1325,7 @@ func (e *EthereumClient) InitializeHeaderSubscription() error {

// startPollingHeaders starts a polling loop to fetch new headers at regular intervals
func (e *EthereumClient) startHeaderPolling() error {
e.l.Info().Msg("Starting Polling Headers")
e.l.Info().Int64("Chain Id", e.GetChainID().Int64()).Msg("Starting Polling Headers")
// pollInterval := time.Second * 30
chainPollingRegistry.Lock()
pollInterval := chainPollingRegistry.m[e.GetChainID().Int64()].pollInterval
Expand All @@ -1338,17 +1338,17 @@ func (e *EthereumClient) startHeaderPolling() error {
initCtx, initCancel := context.WithTimeout(context.Background(), e.NetworkConfig.Timeout.Duration)
defer initCancel()

e.l.Info().Msg("Attempting to fetch latest header during initialization")
e.l.Info().Int64("Chain Id", e.GetChainID().Int64()).Msg("Attempting to fetch latest header during initialization")
latestHeader, err := e.HeaderByNumber(initCtx, nil) // Fetch the latest header
if err != nil {
e.l.Error().Err(err).Msg("Failed to fetch the latest header during initialization")
e.l.Error().Int64("Chain Id", e.GetChainID().Int64()).Err(err).Msg("Failed to fetch the latest header during initialization")
return fmt.Errorf("failed to fetch the latest header during initialization: %w", err)
}
e.l.Info().Str("HeaderHash", latestHeader.Hash.String()).Msg("Successfully fetched latest header during initialization")
e.l.Info().Int64("Chain Id", e.GetChainID().Int64()).Str("HeaderHash", latestHeader.Hash.String()).Msg("Successfully fetched latest header during initialization")

go func() {
defer e.subscriptionWg.Done()
e.l.Info().Msg("Polling for headers goroutine started")
e.l.Info().Int64("Chain Id", e.GetChainID().Int64()).Msg("Polling for headers goroutine started")
// Initialize lastHeaderNumber dynamically
lastHeaderNumber := latestHeader.Number.Uint64() - 1
for {
Expand All @@ -1358,28 +1358,28 @@ func (e *EthereumClient) startHeaderPolling() error {
e.sharedPoller.mu.Lock()
now := time.Now()
if now.Sub(e.sharedPoller.lastPolled) < e.sharedPoller.pollInterval || e.sharedPoller.fetching {
e.l.Debug().Msg("Not my time to poll, skipping")
e.l.Debug().Int64("Chain Id", e.GetChainID().Int64()).Msg("Not my time to poll, skipping")
e.sharedPoller.mu.Unlock()
continue
}
e.sharedPoller.lastPolled = now
e.sharedPoller.fetching = true
e.sharedPoller.mu.Unlock()

e.l.Debug().Msg("Polling Headers")
e.l.Debug().Int64("Chain Id", e.GetChainID().Int64()).Msg("Polling Headers")

// Create a new context with timeout for each HeaderByNumber call
pollCtx, pollCancel := context.WithTimeout(context.Background(), e.NetworkConfig.Timeout.Duration)
latestHeader, err := e.HeaderByNumber(pollCtx, nil) // nil gets the latest header
pollCancel()

if err != nil {
e.l.Error().Err(err).Msg("Error fetching latest header during polling")
e.l.Error().Int64("Chain Id", e.GetChainID().Int64()).Err(err).Msg("Error fetching latest header during polling")
continue
}

if latestHeader.Number.Uint64() > lastHeaderNumber {
e.l.Info().Uint64("LatestHeaderNumber", latestHeader.Number.Uint64()).Msg("New headers detected")
e.l.Info().Int64("Chain Id", e.GetChainID().Int64()).Uint64("LatestHeaderNumber", latestHeader.Number.Uint64()).Msg("New headers detected")
// Process headers from (lastHeaderNumber + 1) to latestHeader.Number
// We may need to add a rate limiter, if we run into issues.
for blockNum := lastHeaderNumber + 1; blockNum <= latestHeader.Number.Uint64(); blockNum++ {
Expand All @@ -1388,24 +1388,24 @@ func (e *EthereumClient) startHeaderPolling() error {
header, err := e.HeaderByNumber(blockCtx, big.NewInt(int64(blockNum)))
blockCancel()
if err != nil {
e.l.Error().Err(err).Uint64("BlockNumber", blockNum).Msg("Error fetching header during range processing")
e.l.Error().Int64("Chain Id", e.GetChainID().Int64()).Err(err).Uint64("BlockNumber", blockNum).Msg("Error fetching header during range processing")
continue
}

lastHeaderNumber = header.Number.Uint64()

if err := e.receiveHeader(header); err != nil {
e.l.Error().Err(err).Uint64("BlockNumber", blockNum).Msg("Error processing header")
e.l.Error().Int64("Chain Id", e.GetChainID().Int64()).Err(err).Uint64("BlockNumber", blockNum).Msg("Error processing header")
continue
}
e.l.Debug().Uint64("BlockNumber", blockNum).Msg("Processing header")
e.l.Debug().Int64("Chain Id", e.GetChainID().Int64()).Uint64("BlockNumber", blockNum).Msg("Processing header")
}
}
e.sharedPoller.mu.Lock()
e.sharedPoller.fetching = false
e.sharedPoller.mu.Unlock()
case <-e.doneChan:
e.l.Debug().Str("Network", e.NetworkConfig.Name).Msg("Polling loop cancelled")
e.l.Debug().Int64("Chain Id", e.GetChainID().Int64()).Str("Network", e.NetworkConfig.Name).Msg("Polling loop cancelled")
ticker.Stop()
e.Client.Close()
return
Expand Down

0 comments on commit f77a055

Please sign in to comment.