Skip to content

Commit

Permalink
ethmonitor: update IsStreamingEnabled() check to also test explicit o…
Browse files Browse the repository at this point in the history
…ptions.StreamingDisabled
  • Loading branch information
pkieltyka committed Oct 30, 2024
1 parent 10cea80 commit 0bbb842
Showing 1 changed file with 7 additions and 3 deletions.
10 changes: 7 additions & 3 deletions ethmonitor/ethmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,10 @@ func (m *Monitor) Provider() ethrpc.Interface {
return m.provider
}

func (m *Monitor) IsStreamingEnabled() bool {
return !m.options.StreamingDisabled && m.provider.IsStreamingEnabled()
}

func (m *Monitor) listenNewHead() <-chan uint64 {
ch := make(chan uint64)

Expand Down Expand Up @@ -339,7 +343,7 @@ func (m *Monitor) listenNewHead() <-chan uint64 {
// the monitor directly too.

// listen for new heads either via streaming or polling
if !m.options.StreamingDisabled && m.provider.IsStreamingEnabled() && streamingErrCount < m.options.StreamingErrNumToSwitchToPolling {
if m.IsStreamingEnabled() && streamingErrCount < m.options.StreamingErrNumToSwitchToPolling {
// Streaming mode if available, where we listen for new heads
// and push the new block number to the nextBlock channel.
m.log.Info("ethmonitor: starting stream head listener")
Expand Down Expand Up @@ -387,7 +391,7 @@ func (m *Monitor) listenNewHead() <-chan uint64 {
retryStreamingTimer := time.NewTimer(m.options.StreamingRetryAfter)
for {
// if streaming is enabled, we'll retry streaming
if m.provider.IsStreamingEnabled() {
if m.IsStreamingEnabled() {
select {
case <-retryStreamingTimer.C:
// retry streaming
Expand Down Expand Up @@ -759,7 +763,7 @@ func (m *Monitor) fetchNextBlock(ctx context.Context) (*types.Block, []byte, boo
nextBlockPayload, err := m.fetchRawBlockByNumber(ctx, m.nextBlockNumber)
if errors.Is(err, ethereum.NotFound) {
miss = true
if m.provider.IsStreamingEnabled() {
if m.IsStreamingEnabled() {
// in streaming mode, we'll use a shorter time to pause before we refetch
time.Sleep(200 * time.Millisecond)
} else {
Expand Down

0 comments on commit 0bbb842

Please sign in to comment.