From 3da977033efa836f88977cfd34a27607a7952bb2 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Wed, 17 Apr 2024 18:12:23 -0400 Subject: [PATCH 01/19] Create polling transformer --- common/client/polling_transformer.go | 98 +++++++++++++++++++++++ common/client/polling_transformer_test.go | 64 +++++++++++++++ 2 files changed, 162 insertions(+) create mode 100644 common/client/polling_transformer.go create mode 100644 common/client/polling_transformer_test.go diff --git a/common/client/polling_transformer.go b/common/client/polling_transformer.go new file mode 100644 index 00000000000..ed0261a852f --- /dev/null +++ b/common/client/polling_transformer.go @@ -0,0 +1,98 @@ +package client + +import ( + "context" + "sync" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" +) + +// PollingTransformer is a component that polls a function at a given interval +// and delivers the result to subscribers +type PollingTransformer[HEAD Head] struct { + interval time.Duration + pollFunc func() (HEAD, error) + + logger logger.Logger + + subscribers []chan<- HEAD + + isPolling bool + stopCh services.StopChan + wg sync.WaitGroup +} + +func NewPollingTransformer[HEAD Head](pollInterval time.Duration, pollFunc func() (HEAD, error), logger logger.Logger) *PollingTransformer[HEAD] { + return &PollingTransformer[HEAD]{ + interval: pollInterval, + pollFunc: pollFunc, + logger: logger, + isPolling: false, + } +} + +// Subscribe adds a Subscriber to the polling transformer +func (pt *PollingTransformer[HEAD]) Subscribe(sub chan<- HEAD) { + pt.subscribers = append(pt.subscribers, sub) +} + +// Unsubscribe removes a Subscriber from the polling transformer +func (pt *PollingTransformer[HEAD]) Unsubscribe(sub chan<- HEAD) { + for i, s := range pt.subscribers { + if s == sub { + close(s) + pt.subscribers = append(pt.subscribers[:i], pt.subscribers[i+1:]...) + return + } + } +} + +// StartPolling starts the polling loop and delivers the polled value to subscribers +func (pt *PollingTransformer[HEAD]) StartPolling() { + pt.stopCh = make(chan struct{}) + pt.wg.Add(1) + go pt.pollingLoop(pt.stopCh.NewCtx()) + pt.isPolling = true +} + +// pollingLoop polls the pollFunc at the given interval and delivers the result to subscribers +func (pt *PollingTransformer[HEAD]) pollingLoop(ctx context.Context, cancel context.CancelFunc) { + defer pt.wg.Done() + defer cancel() + + pollT := time.NewTicker(pt.interval) + defer pollT.Stop() + + for { + select { + case <-ctx.Done(): + for _, subscriber := range pt.subscribers { + close(subscriber) + } + return + case <-pollT.C: + head, err := pt.pollFunc() + if err != nil { + // TODO: handle error + } + pt.logger.Debugw("PollingTransformer: polled value", "head", head) + for _, subscriber := range pt.subscribers { + select { + case subscriber <- head: + // Successfully sent head + default: + // Subscriber's channel is closed + pt.Unsubscribe(subscriber) + } + } + } + } +} + +// StopPolling stops the polling loop +func (pt *PollingTransformer[HEAD]) StopPolling() { + close(pt.stopCh) + pt.wg.Wait() +} diff --git a/common/client/polling_transformer_test.go b/common/client/polling_transformer_test.go new file mode 100644 index 00000000000..f285f6f6abe --- /dev/null +++ b/common/client/polling_transformer_test.go @@ -0,0 +1,64 @@ +package client + +import ( + "math/big" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" +) + +type TestHead struct { + blockNumber int64 +} + +var _ Head = &TestHead{} + +func (th *TestHead) BlockNumber() int64 { + return th.blockNumber +} + +func (th *TestHead) BlockDifficulty() *big.Int { + return nil +} + +func (th *TestHead) IsValid() bool { + return true +} + +func Test_Polling_Transformer(t *testing.T) { + t.Parallel() + + // Mock polling function that returns a new value every time it's called + var lastBlockNumber int64 + pollFunc := func() (Head, error) { + lastBlockNumber++ + return &TestHead{lastBlockNumber}, nil + } + + pt := NewPollingTransformer(time.Millisecond, pollFunc, logger.Test(t)) + pt.StartPolling() + defer pt.StopPolling() + + // Create a subscriber channel + subscriber := make(chan Head) + pt.Subscribe(subscriber) + defer pt.Unsubscribe(subscriber) + + // Create a goroutine to receive updates from the subscriber + pollCount := 0 + pollMax := 50 + go func() { + for i := 0; i < pollMax; i++ { + value := <-subscriber + pollCount++ + require.Equal(t, int64(pollCount), value.BlockNumber()) + } + }() + + // Wait for a short duration to allow for some polling iterations + time.Sleep(100 * time.Millisecond) + require.Equal(t, pollMax, pollCount) +} From 8800cd15981b53e33a57f975adc070085e68abc7 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Mon, 22 Apr 2024 12:56:30 -0400 Subject: [PATCH 02/19] Update poller --- common/client/poller.go | 87 ++++++++++++++++++++ common/client/poller_test.go | 46 +++++++++++ common/client/polling_transformer.go | 98 ----------------------- common/client/polling_transformer_test.go | 64 --------------- 4 files changed, 133 insertions(+), 162 deletions(-) create mode 100644 common/client/poller.go create mode 100644 common/client/poller_test.go delete mode 100644 common/client/polling_transformer.go delete mode 100644 common/client/polling_transformer_test.go diff --git a/common/client/poller.go b/common/client/poller.go new file mode 100644 index 00000000000..f25a16d98c3 --- /dev/null +++ b/common/client/poller.go @@ -0,0 +1,87 @@ +package client + +import ( + "sync" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" + + "github.com/smartcontractkit/chainlink/v2/common/types" +) + +// Poller is a component that polls a function at a given interval +// and delivers the result to a channel. It is used to poll for new heads +// and implements the Subscription interface. +type Poller[ + HEAD Head, +] struct { + services.StateMachine + pollingInterval time.Duration + pollingFunc func() (HEAD, error) + logger logger.Logger + + channel chan<- HEAD + errCh chan error + + stopCh chan struct{} + wg sync.WaitGroup +} + +// NewPoller creates a new Poller instance +func NewPoller[ + HEAD Head, +](pollingInterval time.Duration, pollingFunc func() (HEAD, error), channel chan<- HEAD, logger logger.Logger) Poller[HEAD] { + return Poller[HEAD]{ + pollingInterval: pollingInterval, + pollingFunc: pollingFunc, + logger: logger, + channel: channel, + stopCh: make(chan struct{}), + } +} + +var _ types.Subscription = &Poller[Head]{} + +// Subscribe starts the polling process +func (p *Poller[HEAD]) Subscribe() error { + return p.StartOnce("Poller", func() error { + p.wg.Add(1) + go p.pollingLoop() + return nil + }) +} + +// Unsubscribe cancels the sending of events to the data channel +func (p *Poller[HEAD]) Unsubscribe() { + _ = p.StopOnce("Poller", func() error { + close(p.stopCh) + p.wg.Wait() + return nil + }) + close(p.errCh) +} + +func (p *Poller[HEAD]) Err() <-chan error { + return p.errCh +} + +func (p *Poller[HEAD]) pollingLoop() { + ticker := time.NewTicker(p.pollingInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + result, err := p.pollingFunc() + if err != nil { + p.logger.Error("error occurred when calling polling function:", err) + continue + } + p.channel <- result + case <-p.stopCh: + p.wg.Done() + return + } + } +} diff --git a/common/client/poller_test.go b/common/client/poller_test.go new file mode 100644 index 00000000000..c9df02b0faa --- /dev/null +++ b/common/client/poller_test.go @@ -0,0 +1,46 @@ +package client + +import ( + "math/big" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" +) + +func Test_Poller(t *testing.T) { + // Mock polling function that returns a new value every time it's called + var pollNumber int + pollFunc := func() (Head, error) { + pollNumber++ + h := head{ + BlockNumber: int64(pollNumber), + BlockDifficulty: big.NewInt(int64(pollNumber)), + } + return h.ToMockHead(t), nil + } + + // data channel to receive updates from the poller + channel := make(chan Head, 1) + + // Create poller and subscribe to receive data + poller := NewPoller[Head](time.Millisecond, pollFunc, channel, logger.Test(t)) + require.NoError(t, poller.Subscribe()) + + // Create goroutine to receive updates from the subscriber + pollCount := 0 + pollMax := 50 + go func() { + for ; pollCount < pollMax; pollCount++ { + h := <-channel + assert.Equal(t, int64(pollNumber), h.BlockNumber()) + } + }() + + // Wait for a short duration to allow for some polling iterations + time.Sleep(100 * time.Millisecond) + require.Equal(t, pollMax, pollCount) +} diff --git a/common/client/polling_transformer.go b/common/client/polling_transformer.go deleted file mode 100644 index ed0261a852f..00000000000 --- a/common/client/polling_transformer.go +++ /dev/null @@ -1,98 +0,0 @@ -package client - -import ( - "context" - "sync" - "time" - - "github.com/smartcontractkit/chainlink-common/pkg/logger" - "github.com/smartcontractkit/chainlink-common/pkg/services" -) - -// PollingTransformer is a component that polls a function at a given interval -// and delivers the result to subscribers -type PollingTransformer[HEAD Head] struct { - interval time.Duration - pollFunc func() (HEAD, error) - - logger logger.Logger - - subscribers []chan<- HEAD - - isPolling bool - stopCh services.StopChan - wg sync.WaitGroup -} - -func NewPollingTransformer[HEAD Head](pollInterval time.Duration, pollFunc func() (HEAD, error), logger logger.Logger) *PollingTransformer[HEAD] { - return &PollingTransformer[HEAD]{ - interval: pollInterval, - pollFunc: pollFunc, - logger: logger, - isPolling: false, - } -} - -// Subscribe adds a Subscriber to the polling transformer -func (pt *PollingTransformer[HEAD]) Subscribe(sub chan<- HEAD) { - pt.subscribers = append(pt.subscribers, sub) -} - -// Unsubscribe removes a Subscriber from the polling transformer -func (pt *PollingTransformer[HEAD]) Unsubscribe(sub chan<- HEAD) { - for i, s := range pt.subscribers { - if s == sub { - close(s) - pt.subscribers = append(pt.subscribers[:i], pt.subscribers[i+1:]...) - return - } - } -} - -// StartPolling starts the polling loop and delivers the polled value to subscribers -func (pt *PollingTransformer[HEAD]) StartPolling() { - pt.stopCh = make(chan struct{}) - pt.wg.Add(1) - go pt.pollingLoop(pt.stopCh.NewCtx()) - pt.isPolling = true -} - -// pollingLoop polls the pollFunc at the given interval and delivers the result to subscribers -func (pt *PollingTransformer[HEAD]) pollingLoop(ctx context.Context, cancel context.CancelFunc) { - defer pt.wg.Done() - defer cancel() - - pollT := time.NewTicker(pt.interval) - defer pollT.Stop() - - for { - select { - case <-ctx.Done(): - for _, subscriber := range pt.subscribers { - close(subscriber) - } - return - case <-pollT.C: - head, err := pt.pollFunc() - if err != nil { - // TODO: handle error - } - pt.logger.Debugw("PollingTransformer: polled value", "head", head) - for _, subscriber := range pt.subscribers { - select { - case subscriber <- head: - // Successfully sent head - default: - // Subscriber's channel is closed - pt.Unsubscribe(subscriber) - } - } - } - } -} - -// StopPolling stops the polling loop -func (pt *PollingTransformer[HEAD]) StopPolling() { - close(pt.stopCh) - pt.wg.Wait() -} diff --git a/common/client/polling_transformer_test.go b/common/client/polling_transformer_test.go deleted file mode 100644 index f285f6f6abe..00000000000 --- a/common/client/polling_transformer_test.go +++ /dev/null @@ -1,64 +0,0 @@ -package client - -import ( - "math/big" - "testing" - "time" - - "github.com/stretchr/testify/require" - - "github.com/smartcontractkit/chainlink-common/pkg/logger" -) - -type TestHead struct { - blockNumber int64 -} - -var _ Head = &TestHead{} - -func (th *TestHead) BlockNumber() int64 { - return th.blockNumber -} - -func (th *TestHead) BlockDifficulty() *big.Int { - return nil -} - -func (th *TestHead) IsValid() bool { - return true -} - -func Test_Polling_Transformer(t *testing.T) { - t.Parallel() - - // Mock polling function that returns a new value every time it's called - var lastBlockNumber int64 - pollFunc := func() (Head, error) { - lastBlockNumber++ - return &TestHead{lastBlockNumber}, nil - } - - pt := NewPollingTransformer(time.Millisecond, pollFunc, logger.Test(t)) - pt.StartPolling() - defer pt.StopPolling() - - // Create a subscriber channel - subscriber := make(chan Head) - pt.Subscribe(subscriber) - defer pt.Unsubscribe(subscriber) - - // Create a goroutine to receive updates from the subscriber - pollCount := 0 - pollMax := 50 - go func() { - for i := 0; i < pollMax; i++ { - value := <-subscriber - pollCount++ - require.Equal(t, int64(pollCount), value.BlockNumber()) - } - }() - - // Wait for a short duration to allow for some polling iterations - time.Sleep(100 * time.Millisecond) - require.Equal(t, pollMax, pollCount) -} From 004765a6aed9676213508db73c28e4641bf77c38 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Mon, 22 Apr 2024 13:18:37 -0400 Subject: [PATCH 03/19] Rename to HeadPoller --- common/client/{poller.go => head_poller.go} | 31 +++++++++++-------- .../{poller_test.go => head_poller_test.go} | 7 ++--- 2 files changed, 21 insertions(+), 17 deletions(-) rename common/client/{poller.go => head_poller.go} (63%) rename common/client/{poller_test.go => head_poller_test.go} (87%) diff --git a/common/client/poller.go b/common/client/head_poller.go similarity index 63% rename from common/client/poller.go rename to common/client/head_poller.go index f25a16d98c3..93dc42de011 100644 --- a/common/client/poller.go +++ b/common/client/head_poller.go @@ -10,10 +10,10 @@ import ( "github.com/smartcontractkit/chainlink/v2/common/types" ) -// Poller is a component that polls a function at a given interval +// HeadPoller is a component that polls a function at a given interval // and delivers the result to a channel. It is used to poll for new heads // and implements the Subscription interface. -type Poller[ +type HeadPoller[ HEAD Head, ] struct { services.StateMachine @@ -28,11 +28,15 @@ type Poller[ wg sync.WaitGroup } -// NewPoller creates a new Poller instance -func NewPoller[ +// TODO: write an error to the Err()<- chan if the parent context is canceled/closed? +// TODO: Do we want to add ctx to the NewHeadPoller constructor? +// TODO: Should we start the polling loop right away or wait for Subsribe? + +// NewHeadPoller creates a new HeadPoller instance +func NewHeadPoller[ HEAD Head, -](pollingInterval time.Duration, pollingFunc func() (HEAD, error), channel chan<- HEAD, logger logger.Logger) Poller[HEAD] { - return Poller[HEAD]{ +](pollingInterval time.Duration, pollingFunc func() (HEAD, error), channel chan<- HEAD, logger logger.Logger) HeadPoller[HEAD] { + return HeadPoller[HEAD]{ pollingInterval: pollingInterval, pollingFunc: pollingFunc, logger: logger, @@ -41,11 +45,11 @@ func NewPoller[ } } -var _ types.Subscription = &Poller[Head]{} +var _ types.Subscription = &HeadPoller[Head]{} // Subscribe starts the polling process -func (p *Poller[HEAD]) Subscribe() error { - return p.StartOnce("Poller", func() error { +func (p *HeadPoller[HEAD]) Subscribe() error { + return p.StartOnce("HeadPoller", func() error { p.wg.Add(1) go p.pollingLoop() return nil @@ -53,8 +57,8 @@ func (p *Poller[HEAD]) Subscribe() error { } // Unsubscribe cancels the sending of events to the data channel -func (p *Poller[HEAD]) Unsubscribe() { - _ = p.StopOnce("Poller", func() error { +func (p *HeadPoller[HEAD]) Unsubscribe() { + _ = p.StopOnce("HeadPoller", func() error { close(p.stopCh) p.wg.Wait() return nil @@ -62,11 +66,11 @@ func (p *Poller[HEAD]) Unsubscribe() { close(p.errCh) } -func (p *Poller[HEAD]) Err() <-chan error { +func (p *HeadPoller[HEAD]) Err() <-chan error { return p.errCh } -func (p *Poller[HEAD]) pollingLoop() { +func (p *HeadPoller[HEAD]) pollingLoop() { ticker := time.NewTicker(p.pollingInterval) defer ticker.Stop() @@ -76,6 +80,7 @@ func (p *Poller[HEAD]) pollingLoop() { result, err := p.pollingFunc() if err != nil { p.logger.Error("error occurred when calling polling function:", err) + p.errCh <- err continue } p.channel <- result diff --git a/common/client/poller_test.go b/common/client/head_poller_test.go similarity index 87% rename from common/client/poller_test.go rename to common/client/head_poller_test.go index c9df02b0faa..c6f32454e70 100644 --- a/common/client/poller_test.go +++ b/common/client/head_poller_test.go @@ -5,10 +5,9 @@ import ( "testing" "time" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - - "github.com/smartcontractkit/chainlink-common/pkg/logger" ) func Test_Poller(t *testing.T) { @@ -27,10 +26,10 @@ func Test_Poller(t *testing.T) { channel := make(chan Head, 1) // Create poller and subscribe to receive data - poller := NewPoller[Head](time.Millisecond, pollFunc, channel, logger.Test(t)) + poller := NewHeadPoller[Head](time.Millisecond, pollFunc, channel, logger.Test(t)) require.NoError(t, poller.Subscribe()) - // Create goroutine to receive updates from the subscriber + // Create goroutine to receive updates from the poller pollCount := 0 pollMax := 50 go func() { From 8914d1b6a6b6a9f94e27c4104742af309d45fe68 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Mon, 22 Apr 2024 13:38:32 -0400 Subject: [PATCH 04/19] lint --- common/client/head_poller_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/common/client/head_poller_test.go b/common/client/head_poller_test.go index c6f32454e70..a9b021a8816 100644 --- a/common/client/head_poller_test.go +++ b/common/client/head_poller_test.go @@ -5,9 +5,10 @@ import ( "testing" "time" - "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" ) func Test_Poller(t *testing.T) { From 1ec7b77aaa5ddeff94f8e74f0fabfc996ced5e34 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Tue, 23 Apr 2024 09:23:09 -0400 Subject: [PATCH 05/19] update poller --- common/client/head_poller.go | 6 ++++-- common/client/head_poller_test.go | 1 + 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/common/client/head_poller.go b/common/client/head_poller.go index 93dc42de011..5dbf8fce511 100644 --- a/common/client/head_poller.go +++ b/common/client/head_poller.go @@ -61,9 +61,9 @@ func (p *HeadPoller[HEAD]) Unsubscribe() { _ = p.StopOnce("HeadPoller", func() error { close(p.stopCh) p.wg.Wait() + close(p.errCh) return nil }) - close(p.errCh) } func (p *HeadPoller[HEAD]) Err() <-chan error { @@ -71,6 +71,8 @@ func (p *HeadPoller[HEAD]) Err() <-chan error { } func (p *HeadPoller[HEAD]) pollingLoop() { + defer p.wg.Done() + ticker := time.NewTicker(p.pollingInterval) defer ticker.Stop() @@ -83,9 +85,9 @@ func (p *HeadPoller[HEAD]) pollingLoop() { p.errCh <- err continue } + // TODO: Fix this so we don't block the polling loop if we want to exit! p.channel <- result case <-p.stopCh: - p.wg.Done() return } } diff --git a/common/client/head_poller_test.go b/common/client/head_poller_test.go index a9b021a8816..c9f0164b9df 100644 --- a/common/client/head_poller_test.go +++ b/common/client/head_poller_test.go @@ -43,4 +43,5 @@ func Test_Poller(t *testing.T) { // Wait for a short duration to allow for some polling iterations time.Sleep(100 * time.Millisecond) require.Equal(t, pollMax, pollCount) + poller.Unsubscribe() } From 0dfb9b5a8220c7074134723afd67d9a5ee7d8e8f Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Tue, 23 Apr 2024 12:00:18 -0400 Subject: [PATCH 06/19] Update head poller --- common/client/head_poller.go | 30 ++++++++++++++++++++++-------- common/client/head_poller_test.go | 10 ++++++++-- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/common/client/head_poller.go b/common/client/head_poller.go index 5dbf8fce511..db1d726f10c 100644 --- a/common/client/head_poller.go +++ b/common/client/head_poller.go @@ -1,6 +1,7 @@ package client import ( + "context" "sync" "time" @@ -41,17 +42,17 @@ func NewHeadPoller[ pollingFunc: pollingFunc, logger: logger, channel: channel, + errCh: make(chan error), stopCh: make(chan struct{}), } } var _ types.Subscription = &HeadPoller[Head]{} -// Subscribe starts the polling process -func (p *HeadPoller[HEAD]) Subscribe() error { +func (p *HeadPoller[HEAD]) Start(ctx context.Context) error { return p.StartOnce("HeadPoller", func() error { p.wg.Add(1) - go p.pollingLoop() + go p.pollingLoop(ctx) return nil }) } @@ -70,7 +71,7 @@ func (p *HeadPoller[HEAD]) Err() <-chan error { return p.errCh } -func (p *HeadPoller[HEAD]) pollingLoop() { +func (p *HeadPoller[HEAD]) pollingLoop(ctx context.Context) { defer p.wg.Done() ticker := time.NewTicker(p.pollingInterval) @@ -78,6 +79,11 @@ func (p *HeadPoller[HEAD]) pollingLoop() { for { select { + case <-ctx.Done(): + p.errCh <- ctx.Err() + return + case <-p.stopCh: + return case <-ticker.C: result, err := p.pollingFunc() if err != nil { @@ -85,10 +91,18 @@ func (p *HeadPoller[HEAD]) pollingLoop() { p.errCh <- err continue } - // TODO: Fix this so we don't block the polling loop if we want to exit! - p.channel <- result - case <-p.stopCh: - return + + // TODO: If channel is full, should we drop the message? + // TODO: Or maybe stop polling until the channel has room? + sendResult: + for { + select { + case p.channel <- result: + break sendResult + case <-p.stopCh: + return + } + } } } } diff --git a/common/client/head_poller_test.go b/common/client/head_poller_test.go index c9f0164b9df..6bbf553c114 100644 --- a/common/client/head_poller_test.go +++ b/common/client/head_poller_test.go @@ -1,6 +1,7 @@ package client import ( + "context" "math/big" "testing" "time" @@ -28,7 +29,10 @@ func Test_Poller(t *testing.T) { // Create poller and subscribe to receive data poller := NewHeadPoller[Head](time.Millisecond, pollFunc, channel, logger.Test(t)) - require.NoError(t, poller.Subscribe()) + ctx := context.Background() + + require.NoError(t, poller.Start(ctx)) + defer poller.Unsubscribe() // Create goroutine to receive updates from the poller pollCount := 0 @@ -43,5 +47,7 @@ func Test_Poller(t *testing.T) { // Wait for a short duration to allow for some polling iterations time.Sleep(100 * time.Millisecond) require.Equal(t, pollMax, pollCount) - poller.Unsubscribe() } + +// TODO: Test error in pollingFunc +// TODO: Test context cancellation From 21268f998a64624c58e01bccea6d25d6230c2c99 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Tue, 23 Apr 2024 14:47:17 -0400 Subject: [PATCH 07/19] Update poller --- common/client/head_poller.go | 43 ++++++++++--------------------- common/client/head_poller_test.go | 41 +++++++++++++++++++++++------ 2 files changed, 47 insertions(+), 37 deletions(-) diff --git a/common/client/head_poller.go b/common/client/head_poller.go index db1d726f10c..fef7fdd91b3 100644 --- a/common/client/head_poller.go +++ b/common/client/head_poller.go @@ -1,11 +1,9 @@ package client import ( - "context" "sync" "time" - "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink/v2/common/types" @@ -20,27 +18,20 @@ type HeadPoller[ services.StateMachine pollingInterval time.Duration pollingFunc func() (HEAD, error) - logger logger.Logger - - channel chan<- HEAD - errCh chan error + channel chan<- HEAD + errCh chan error stopCh chan struct{} wg sync.WaitGroup } -// TODO: write an error to the Err()<- chan if the parent context is canceled/closed? -// TODO: Do we want to add ctx to the NewHeadPoller constructor? -// TODO: Should we start the polling loop right away or wait for Subsribe? - // NewHeadPoller creates a new HeadPoller instance func NewHeadPoller[ HEAD Head, -](pollingInterval time.Duration, pollingFunc func() (HEAD, error), channel chan<- HEAD, logger logger.Logger) HeadPoller[HEAD] { +](pollingInterval time.Duration, pollingFunc func() (HEAD, error), channel chan<- HEAD) HeadPoller[HEAD] { return HeadPoller[HEAD]{ pollingInterval: pollingInterval, pollingFunc: pollingFunc, - logger: logger, channel: channel, errCh: make(chan error), stopCh: make(chan struct{}), @@ -49,10 +40,10 @@ func NewHeadPoller[ var _ types.Subscription = &HeadPoller[Head]{} -func (p *HeadPoller[HEAD]) Start(ctx context.Context) error { +func (p *HeadPoller[HEAD]) Start() error { return p.StartOnce("HeadPoller", func() error { p.wg.Add(1) - go p.pollingLoop(ctx) + go p.pollingLoop() return nil }) } @@ -71,7 +62,7 @@ func (p *HeadPoller[HEAD]) Err() <-chan error { return p.errCh } -func (p *HeadPoller[HEAD]) pollingLoop(ctx context.Context) { +func (p *HeadPoller[HEAD]) pollingLoop() { defer p.wg.Done() ticker := time.NewTicker(p.pollingInterval) @@ -79,30 +70,24 @@ func (p *HeadPoller[HEAD]) pollingLoop(ctx context.Context) { for { select { - case <-ctx.Done(): - p.errCh <- ctx.Err() - return case <-p.stopCh: return case <-ticker.C: result, err := p.pollingFunc() if err != nil { - p.logger.Error("error occurred when calling polling function:", err) - p.errCh <- err - continue - } - - // TODO: If channel is full, should we drop the message? - // TODO: Or maybe stop polling until the channel has room? - sendResult: - for { select { - case p.channel <- result: - break sendResult + case p.errCh <- err: + continue case <-p.stopCh: return } } + + select { + case p.channel <- result: + case <-p.stopCh: + return + } } } } diff --git a/common/client/head_poller_test.go b/common/client/head_poller_test.go index 6bbf553c114..187de2ebbfe 100644 --- a/common/client/head_poller_test.go +++ b/common/client/head_poller_test.go @@ -1,15 +1,13 @@ package client import ( - "context" + "github.com/pkg/errors" "math/big" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - - "github.com/smartcontractkit/chainlink-common/pkg/logger" ) func Test_Poller(t *testing.T) { @@ -28,10 +26,9 @@ func Test_Poller(t *testing.T) { channel := make(chan Head, 1) // Create poller and subscribe to receive data - poller := NewHeadPoller[Head](time.Millisecond, pollFunc, channel, logger.Test(t)) - ctx := context.Background() + poller := NewHeadPoller[Head](time.Millisecond, pollFunc, channel) - require.NoError(t, poller.Start(ctx)) + require.NoError(t, poller.Start()) defer poller.Unsubscribe() // Create goroutine to receive updates from the poller @@ -49,5 +46,33 @@ func Test_Poller(t *testing.T) { require.Equal(t, pollMax, pollCount) } -// TODO: Test error in pollingFunc -// TODO: Test context cancellation +func Test_Poller_Error(t *testing.T) { + // Mock polling function that returns an error every time it's called + pollFunc := func() (Head, error) { + return nil, errors.New("polling error") + } + + // data channel to receive updates from the poller + channel := make(chan Head, 1) + + // Create poller and subscribe to receive data + poller := NewHeadPoller[Head](time.Millisecond, pollFunc, channel) + + require.NoError(t, poller.Start()) + defer poller.Unsubscribe() + + // Create goroutine to receive updates from the poller + pollCount := 0 + pollMax := 50 + go func() { + for ; pollCount < pollMax; pollCount++ { + err := <-poller.Err() + require.Error(t, err) + require.Equal(t, "polling error", err.Error()) + } + }() + + // Wait for a short duration to allow for some polling iterations + time.Sleep(100 * time.Millisecond) + require.Equal(t, pollMax, pollCount) +} From 5309ccb4730a46f5803f5e8ae08771abe564ef49 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Tue, 23 Apr 2024 14:48:48 -0400 Subject: [PATCH 08/19] lint --- common/client/head_poller_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/client/head_poller_test.go b/common/client/head_poller_test.go index 187de2ebbfe..5c26e7a0d81 100644 --- a/common/client/head_poller_test.go +++ b/common/client/head_poller_test.go @@ -1,7 +1,7 @@ package client import ( - "github.com/pkg/errors" + "errors" "math/big" "testing" "time" @@ -47,7 +47,7 @@ func Test_Poller(t *testing.T) { } func Test_Poller_Error(t *testing.T) { - // Mock polling function that returns an error every time it's called + // Mock polling function that returns an error pollFunc := func() (Head, error) { return nil, errors.New("polling error") } From 8de40ace2f4e97b4de7b71d077897bb2de597ff7 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Wed, 24 Apr 2024 15:48:40 -0400 Subject: [PATCH 09/19] Refactor Poller --- common/client/head_poller.go | 93 -------------------------- common/client/head_poller_test.go | 78 ---------------------- common/client/poller.go | 106 ++++++++++++++++++++++++++++++ common/client/poller_test.go | 100 ++++++++++++++++++++++++++++ 4 files changed, 206 insertions(+), 171 deletions(-) delete mode 100644 common/client/head_poller.go delete mode 100644 common/client/head_poller_test.go create mode 100644 common/client/poller.go create mode 100644 common/client/poller_test.go diff --git a/common/client/head_poller.go b/common/client/head_poller.go deleted file mode 100644 index fef7fdd91b3..00000000000 --- a/common/client/head_poller.go +++ /dev/null @@ -1,93 +0,0 @@ -package client - -import ( - "sync" - "time" - - "github.com/smartcontractkit/chainlink-common/pkg/services" - - "github.com/smartcontractkit/chainlink/v2/common/types" -) - -// HeadPoller is a component that polls a function at a given interval -// and delivers the result to a channel. It is used to poll for new heads -// and implements the Subscription interface. -type HeadPoller[ - HEAD Head, -] struct { - services.StateMachine - pollingInterval time.Duration - pollingFunc func() (HEAD, error) - channel chan<- HEAD - errCh chan error - - stopCh chan struct{} - wg sync.WaitGroup -} - -// NewHeadPoller creates a new HeadPoller instance -func NewHeadPoller[ - HEAD Head, -](pollingInterval time.Duration, pollingFunc func() (HEAD, error), channel chan<- HEAD) HeadPoller[HEAD] { - return HeadPoller[HEAD]{ - pollingInterval: pollingInterval, - pollingFunc: pollingFunc, - channel: channel, - errCh: make(chan error), - stopCh: make(chan struct{}), - } -} - -var _ types.Subscription = &HeadPoller[Head]{} - -func (p *HeadPoller[HEAD]) Start() error { - return p.StartOnce("HeadPoller", func() error { - p.wg.Add(1) - go p.pollingLoop() - return nil - }) -} - -// Unsubscribe cancels the sending of events to the data channel -func (p *HeadPoller[HEAD]) Unsubscribe() { - _ = p.StopOnce("HeadPoller", func() error { - close(p.stopCh) - p.wg.Wait() - close(p.errCh) - return nil - }) -} - -func (p *HeadPoller[HEAD]) Err() <-chan error { - return p.errCh -} - -func (p *HeadPoller[HEAD]) pollingLoop() { - defer p.wg.Done() - - ticker := time.NewTicker(p.pollingInterval) - defer ticker.Stop() - - for { - select { - case <-p.stopCh: - return - case <-ticker.C: - result, err := p.pollingFunc() - if err != nil { - select { - case p.errCh <- err: - continue - case <-p.stopCh: - return - } - } - - select { - case p.channel <- result: - case <-p.stopCh: - return - } - } - } -} diff --git a/common/client/head_poller_test.go b/common/client/head_poller_test.go deleted file mode 100644 index 5c26e7a0d81..00000000000 --- a/common/client/head_poller_test.go +++ /dev/null @@ -1,78 +0,0 @@ -package client - -import ( - "errors" - "math/big" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func Test_Poller(t *testing.T) { - // Mock polling function that returns a new value every time it's called - var pollNumber int - pollFunc := func() (Head, error) { - pollNumber++ - h := head{ - BlockNumber: int64(pollNumber), - BlockDifficulty: big.NewInt(int64(pollNumber)), - } - return h.ToMockHead(t), nil - } - - // data channel to receive updates from the poller - channel := make(chan Head, 1) - - // Create poller and subscribe to receive data - poller := NewHeadPoller[Head](time.Millisecond, pollFunc, channel) - - require.NoError(t, poller.Start()) - defer poller.Unsubscribe() - - // Create goroutine to receive updates from the poller - pollCount := 0 - pollMax := 50 - go func() { - for ; pollCount < pollMax; pollCount++ { - h := <-channel - assert.Equal(t, int64(pollNumber), h.BlockNumber()) - } - }() - - // Wait for a short duration to allow for some polling iterations - time.Sleep(100 * time.Millisecond) - require.Equal(t, pollMax, pollCount) -} - -func Test_Poller_Error(t *testing.T) { - // Mock polling function that returns an error - pollFunc := func() (Head, error) { - return nil, errors.New("polling error") - } - - // data channel to receive updates from the poller - channel := make(chan Head, 1) - - // Create poller and subscribe to receive data - poller := NewHeadPoller[Head](time.Millisecond, pollFunc, channel) - - require.NoError(t, poller.Start()) - defer poller.Unsubscribe() - - // Create goroutine to receive updates from the poller - pollCount := 0 - pollMax := 50 - go func() { - for ; pollCount < pollMax; pollCount++ { - err := <-poller.Err() - require.Error(t, err) - require.Equal(t, "polling error", err.Error()) - } - }() - - // Wait for a short duration to allow for some polling iterations - time.Sleep(100 * time.Millisecond) - require.Equal(t, pollMax, pollCount) -} diff --git a/common/client/poller.go b/common/client/poller.go new file mode 100644 index 00000000000..6410fa9905c --- /dev/null +++ b/common/client/poller.go @@ -0,0 +1,106 @@ +package client + +import ( + "context" + "sync" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" + + "github.com/smartcontractkit/chainlink/v2/common/types" +) + +// Poller is a component that polls a function at a given interval +// and delivers the result to a channel. It is used to poll for new heads +// and implements the Subscription interface. +type Poller[ + T any, +] struct { + services.StateMachine + pollingInterval time.Duration + pollingFunc func(ctx context.Context) (T, error) + pollingTimeout *time.Duration + logger logger.Logger + channel chan<- T + errCh chan error + + stopCh chan struct{} + wg sync.WaitGroup +} + +// NewPoller creates a new Poller instance +func NewPoller[ + T any, +](pollingInterval time.Duration, pollingFunc func(ctx context.Context) (T, error), pollingTimeout *time.Duration, channel chan<- T, logger logger.Logger) Poller[T] { + return Poller[T]{ + pollingInterval: pollingInterval, + pollingFunc: pollingFunc, + pollingTimeout: pollingTimeout, + channel: channel, + logger: logger, + errCh: make(chan error), + stopCh: make(chan struct{}), + } +} + +var _ types.Subscription = &Poller[any]{} + +func (p *Poller[T]) Start() error { + return p.StartOnce("Poller", func() error { + p.wg.Add(1) + go p.pollingLoop() + return nil + }) +} + +// Unsubscribe cancels the sending of events to the data channel +func (p *Poller[T]) Unsubscribe() { + _ = p.StopOnce("Poller", func() error { + close(p.stopCh) + p.wg.Wait() + close(p.errCh) + return nil + }) +} + +func (p *Poller[T]) Err() <-chan error { + return p.errCh +} + +func (p *Poller[T]) pollingLoop() { + defer p.wg.Done() + + ticker := time.NewTicker(p.pollingInterval) + defer ticker.Stop() + + for { + select { + case <-p.stopCh: + return + case <-ticker.C: + ctx := context.Background() + cancel := context.CancelFunc(func() {}) + if p.pollingTimeout != nil { + ctx, cancel = context.WithTimeout(context.Background(), *p.pollingTimeout) + } + result, err := p.pollingFunc(ctx) + cancel() + if err != nil { + p.logger.Warnw("Polling error", "error", err) + select { + case p.errCh <- err: + continue + case <-p.stopCh: + return + } + } + + select { + case p.channel <- result: + case <-p.stopCh: + return + } + } + } +} diff --git a/common/client/poller_test.go b/common/client/poller_test.go new file mode 100644 index 00000000000..2481f60f81d --- /dev/null +++ b/common/client/poller_test.go @@ -0,0 +1,100 @@ +package client + +import ( + "context" + "errors" + "math/big" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" +) + +func Test_Poller(t *testing.T) { + pollingTimeout := 10 * time.Millisecond + lggr, err := logger.New() + require.NoError(t, err) + + t.Run("Test Polling for Heads", func(t *testing.T) { + // Mock polling function that returns a new value every time it's called + var pollNumber int + pollFunc := func(ctx context.Context) (Head, error) { + pollNumber++ + h := head{ + BlockNumber: int64(pollNumber), + BlockDifficulty: big.NewInt(int64(pollNumber)), + } + return h.ToMockHead(t), nil + } + + // data channel to receive updates from the poller + channel := make(chan Head, 1) + + // Create poller and start to receive data + poller := NewPoller[Head](time.Millisecond, pollFunc, &pollingTimeout, channel, lggr) + require.NoError(t, poller.Start()) + defer poller.Unsubscribe() + + done := make(chan struct{}) + // Create goroutine to receive updates from the poller + pollCount := 0 + pollMax := 50 + go func() { + for ; pollCount < pollMax; pollCount++ { + h := <-channel + assert.Equal(t, int64(pollNumber), h.BlockNumber()) + } + close(done) + }() + + <-done + }) + + t.Run("Test polling errors", func(t *testing.T) { + // Mock polling function that returns an error + pollFunc := func(ctx context.Context) (Head, error) { + return nil, errors.New("polling error") + } + + // data channel to receive updates from the poller + channel := make(chan Head, 1) + + // Create poller and subscribe to receive data + poller := NewPoller[Head](time.Millisecond, pollFunc, &pollingTimeout, channel, lggr) + + require.NoError(t, poller.Start()) + defer poller.Unsubscribe() + + done := make(chan struct{}) + + // Create goroutine to receive updates from the poller + pollCount := 0 + pollMax := 50 + go func() { + for ; pollCount < pollMax; pollCount++ { + err := <-poller.Err() + require.Error(t, err) + require.Equal(t, "polling error", err.Error()) + } + close(done) + }() + + <-done + }) +} + +func Test_Poller_Unsubscribe(t *testing.T) { + t.Run("Test multiple unsubscribe", func(t *testing.T) { + // TODO: to the p.channel. And one that ensure we can call Unsubscribe twice without panic. + poller := NewPoller[Head](time.Millisecond, nil, nil, nil, nil) + poller.Unsubscribe() + poller.Unsubscribe() + }) + + t.Run("Test exit with no subscribers", func(t *testing.T) { + // TODO: Add test case that ensures Unsubscribe exits even if no one is listening + }) +} From e05b7396dd744183de27e252ec04bb4405f69666 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Wed, 24 Apr 2024 15:57:23 -0400 Subject: [PATCH 10/19] Update poller_test.go --- common/client/poller_test.go | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/common/client/poller_test.go b/common/client/poller_test.go index 2481f60f81d..c97f76df073 100644 --- a/common/client/poller_test.go +++ b/common/client/poller_test.go @@ -18,7 +18,7 @@ func Test_Poller(t *testing.T) { lggr, err := logger.New() require.NoError(t, err) - t.Run("Test Polling for Heads", func(t *testing.T) { + t.Run("Test polling for heads", func(t *testing.T) { // Mock polling function that returns a new value every time it's called var pollNumber int pollFunc := func(ctx context.Context) (Head, error) { @@ -75,9 +75,13 @@ func Test_Poller(t *testing.T) { pollMax := 50 go func() { for ; pollCount < pollMax; pollCount++ { - err := <-poller.Err() - require.Error(t, err) - require.Equal(t, "polling error", err.Error()) + select { + case <-channel: + require.Fail(t, "should not receive any data") + case err := <-poller.Err(): + require.Error(t, err) + require.Equal(t, "polling error", err.Error()) + } } close(done) }() @@ -90,11 +94,17 @@ func Test_Poller_Unsubscribe(t *testing.T) { t.Run("Test multiple unsubscribe", func(t *testing.T) { // TODO: to the p.channel. And one that ensure we can call Unsubscribe twice without panic. poller := NewPoller[Head](time.Millisecond, nil, nil, nil, nil) + err := poller.Start() + require.NoError(t, err) poller.Unsubscribe() poller.Unsubscribe() }) - t.Run("Test exit with no subscribers", func(t *testing.T) { + t.Run("Test unsubscribe with no subscribers", func(t *testing.T) { // TODO: Add test case that ensures Unsubscribe exits even if no one is listening + poller := NewPoller[Head](time.Millisecond, nil, nil, nil, nil) + err := poller.Start() + require.NoError(t, err) + poller.Unsubscribe() }) } From daa79f010c45920599593555b774248d25271d86 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Wed, 24 Apr 2024 16:07:11 -0400 Subject: [PATCH 11/19] Update poller --- common/client/poller.go | 8 +++++--- common/client/poller_test.go | 23 ++++++++++------------- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/common/client/poller.go b/common/client/poller.go index 6410fa9905c..507ac0d6e30 100644 --- a/common/client/poller.go +++ b/common/client/poller.go @@ -21,7 +21,7 @@ type Poller[ pollingInterval time.Duration pollingFunc func(ctx context.Context) (T, error) pollingTimeout *time.Duration - logger logger.Logger + logger *logger.Logger channel chan<- T errCh chan error @@ -32,7 +32,7 @@ type Poller[ // NewPoller creates a new Poller instance func NewPoller[ T any, -](pollingInterval time.Duration, pollingFunc func(ctx context.Context) (T, error), pollingTimeout *time.Duration, channel chan<- T, logger logger.Logger) Poller[T] { +](pollingInterval time.Duration, pollingFunc func(ctx context.Context) (T, error), pollingTimeout *time.Duration, channel chan<- T, logger *logger.Logger) Poller[T] { return Poller[T]{ pollingInterval: pollingInterval, pollingFunc: pollingFunc, @@ -87,7 +87,9 @@ func (p *Poller[T]) pollingLoop() { result, err := p.pollingFunc(ctx) cancel() if err != nil { - p.logger.Warnw("Polling error", "error", err) + if p.logger != nil { + (*p.logger).Warnw("Polling error", "error", err) + } select { case p.errCh <- err: continue diff --git a/common/client/poller_test.go b/common/client/poller_test.go index c97f76df073..b6afa190bb2 100644 --- a/common/client/poller_test.go +++ b/common/client/poller_test.go @@ -13,6 +13,8 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" ) +// TODO: Fix race conditions in tests! + func Test_Poller(t *testing.T) { pollingTimeout := 10 * time.Millisecond lggr, err := logger.New() @@ -34,22 +36,21 @@ func Test_Poller(t *testing.T) { channel := make(chan Head, 1) // Create poller and start to receive data - poller := NewPoller[Head](time.Millisecond, pollFunc, &pollingTimeout, channel, lggr) + poller := NewPoller[Head](time.Millisecond, pollFunc, &pollingTimeout, channel, &lggr) require.NoError(t, poller.Start()) defer poller.Unsubscribe() - done := make(chan struct{}) // Create goroutine to receive updates from the poller - pollCount := 0 - pollMax := 50 + done := make(chan struct{}) go func() { + pollCount := 0 + pollMax := 50 for ; pollCount < pollMax; pollCount++ { h := <-channel assert.Equal(t, int64(pollNumber), h.BlockNumber()) } close(done) }() - <-done }) @@ -63,17 +64,15 @@ func Test_Poller(t *testing.T) { channel := make(chan Head, 1) // Create poller and subscribe to receive data - poller := NewPoller[Head](time.Millisecond, pollFunc, &pollingTimeout, channel, lggr) - + poller := NewPoller[Head](time.Millisecond, pollFunc, &pollingTimeout, channel, &lggr) require.NoError(t, poller.Start()) defer poller.Unsubscribe() - done := make(chan struct{}) - // Create goroutine to receive updates from the poller - pollCount := 0 - pollMax := 50 + done := make(chan struct{}) go func() { + pollCount := 0 + pollMax := 50 for ; pollCount < pollMax; pollCount++ { select { case <-channel: @@ -85,14 +84,12 @@ func Test_Poller(t *testing.T) { } close(done) }() - <-done }) } func Test_Poller_Unsubscribe(t *testing.T) { t.Run("Test multiple unsubscribe", func(t *testing.T) { - // TODO: to the p.channel. And one that ensure we can call Unsubscribe twice without panic. poller := NewPoller[Head](time.Millisecond, nil, nil, nil, nil) err := poller.Start() require.NoError(t, err) From 41c3ad4bd05f572e0605f3f3185b8012b0dca545 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Thu, 25 Apr 2024 10:18:13 -0400 Subject: [PATCH 12/19] Synchronize tests --- common/client/poller_test.go | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/common/client/poller_test.go b/common/client/poller_test.go index b6afa190bb2..dec343f3bc8 100644 --- a/common/client/poller_test.go +++ b/common/client/poller_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "math/big" + "sync" "testing" "time" @@ -16,14 +17,16 @@ import ( // TODO: Fix race conditions in tests! func Test_Poller(t *testing.T) { - pollingTimeout := 10 * time.Millisecond lggr, err := logger.New() require.NoError(t, err) t.Run("Test polling for heads", func(t *testing.T) { // Mock polling function that returns a new value every time it's called var pollNumber int + pollLock := sync.Mutex{} pollFunc := func(ctx context.Context) (Head, error) { + pollLock.Lock() + defer pollLock.Unlock() pollNumber++ h := head{ BlockNumber: int64(pollNumber), @@ -36,7 +39,7 @@ func Test_Poller(t *testing.T) { channel := make(chan Head, 1) // Create poller and start to receive data - poller := NewPoller[Head](time.Millisecond, pollFunc, &pollingTimeout, channel, &lggr) + poller := NewPoller[Head](time.Millisecond, pollFunc, nil, channel, &lggr) require.NoError(t, poller.Start()) defer poller.Unsubscribe() @@ -64,7 +67,7 @@ func Test_Poller(t *testing.T) { channel := make(chan Head, 1) // Create poller and subscribe to receive data - poller := NewPoller[Head](time.Millisecond, pollFunc, &pollingTimeout, channel, &lggr) + poller := NewPoller[Head](time.Millisecond, pollFunc, nil, channel, &lggr) require.NoError(t, poller.Start()) defer poller.Unsubscribe() @@ -86,6 +89,17 @@ func Test_Poller(t *testing.T) { }() <-done }) + + /* TODO + t.Run("Test polling timeout", func(t *testing.T) { + pollingTimeout := 10 * time.Millisecond + pollFunc := func(ctx context.Context) (Head, error) { + <-ctx.Done() + return nil, ctx.Err() + } + }) + */ + } func Test_Poller_Unsubscribe(t *testing.T) { From 4e809ede2babe406134ec7ec327fa46bc8d82ec2 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Thu, 25 Apr 2024 12:37:09 -0400 Subject: [PATCH 13/19] Refactor with timeout --- common/client/poller.go | 71 +++++++++++++------ common/client/poller_test.go | 129 ++++++++++++++++++++++++++++------- 2 files changed, 154 insertions(+), 46 deletions(-) diff --git a/common/client/poller.go b/common/client/poller.go index 507ac0d6e30..c8133fcc863 100644 --- a/common/client/poller.go +++ b/common/client/poller.go @@ -5,6 +5,8 @@ import ( "sync" "time" + "github.com/pkg/errors" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" @@ -14,12 +16,11 @@ import ( // Poller is a component that polls a function at a given interval // and delivers the result to a channel. It is used to poll for new heads // and implements the Subscription interface. -type Poller[ - T any, -] struct { +type Poller[T any] struct { services.StateMachine pollingInterval time.Duration - pollingFunc func(ctx context.Context) (T, error) + pollingFunc func(ctx context.Context, args ...interface{}) (T, error) + pollingArgs []interface{} pollingTimeout *time.Duration logger *logger.Logger channel chan<- T @@ -32,10 +33,11 @@ type Poller[ // NewPoller creates a new Poller instance func NewPoller[ T any, -](pollingInterval time.Duration, pollingFunc func(ctx context.Context) (T, error), pollingTimeout *time.Duration, channel chan<- T, logger *logger.Logger) Poller[T] { +](pollingInterval time.Duration, pollingFunc func(ctx context.Context, args ...interface{}) (T, error), pollingTimeout *time.Duration, channel chan<- T, logger *logger.Logger, args ...interface{}) Poller[T] { return Poller[T]{ pollingInterval: pollingInterval, pollingFunc: pollingFunc, + pollingArgs: args, pollingTimeout: pollingTimeout, channel: channel, logger: logger, @@ -79,30 +81,57 @@ func (p *Poller[T]) pollingLoop() { case <-p.stopCh: return case <-ticker.C: - ctx := context.Background() - cancel := context.CancelFunc(func() {}) + // Set polling timeout + pollingCtx := context.Background() + cancelPolling := context.CancelFunc(func() {}) if p.pollingTimeout != nil { - ctx, cancel = context.WithTimeout(context.Background(), *p.pollingTimeout) + pollingCtx, cancelPolling = context.WithTimeout(pollingCtx, *p.pollingTimeout) } - result, err := p.pollingFunc(ctx) - cancel() - if err != nil { - if p.logger != nil { - (*p.logger).Warnw("Polling error", "error", err) + + // Execute polling function in goroutine + var result T + var err error + pollingDone := make(chan struct{}) + go func() { + defer func() { + if r := recover(); r != nil { + err = errors.Errorf("panic: %v", r) + } + close(pollingDone) + }() + result, err = p.pollingFunc(pollingCtx, p.pollingArgs...) + }() + + // Wait for polling to complete or timeout + select { + case <-pollingCtx.Done(): + cancelPolling() + p.writeError(errors.New("polling timeout exceeded")) + case <-pollingDone: + cancelPolling() + if err != nil { + p.writeError(err) + continue } + // Send result to channel or block if channel is full select { - case p.errCh <- err: - continue + case p.channel <- result: case <-p.stopCh: return } } - - select { - case p.channel <- result: - case <-p.stopCh: - return - } } } } + +func (p *Poller[T]) writeError(err error) { + if p.logger != nil { + (*p.logger).Warnw("Polling error", "error", err) + } + // Send error to channel or block if channel is full + select { + case p.errCh <- err: + case <-p.stopCh: + return + } +} diff --git a/common/client/poller_test.go b/common/client/poller_test.go index dec343f3bc8..90e8634dc0c 100644 --- a/common/client/poller_test.go +++ b/common/client/poller_test.go @@ -2,7 +2,7 @@ package client import ( "context" - "errors" + "fmt" "math/big" "sync" "testing" @@ -14,8 +14,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" ) -// TODO: Fix race conditions in tests! - func Test_Poller(t *testing.T) { lggr, err := logger.New() require.NoError(t, err) @@ -24,7 +22,7 @@ func Test_Poller(t *testing.T) { // Mock polling function that returns a new value every time it's called var pollNumber int pollLock := sync.Mutex{} - pollFunc := func(ctx context.Context) (Head, error) { + pollFunc := func(ctx context.Context, args ...interface{}) (Head, error) { pollLock.Lock() defer pollLock.Unlock() pollNumber++ @@ -37,34 +35,43 @@ func Test_Poller(t *testing.T) { // data channel to receive updates from the poller channel := make(chan Head, 1) + defer close(channel) // Create poller and start to receive data poller := NewPoller[Head](time.Millisecond, pollFunc, nil, channel, &lggr) require.NoError(t, poller.Start()) defer poller.Unsubscribe() - // Create goroutine to receive updates from the poller + // Monitor error channel done := make(chan struct{}) - go func() { + defer close(done) + monitorPollingErrors(t, poller.Err(), done) + + // Receive updates from the poller + func() { pollCount := 0 pollMax := 50 for ; pollCount < pollMax; pollCount++ { h := <-channel - assert.Equal(t, int64(pollNumber), h.BlockNumber()) + assert.Equal(t, int64(pollCount+1), h.BlockNumber()) } - close(done) }() - <-done }) t.Run("Test polling errors", func(t *testing.T) { // Mock polling function that returns an error - pollFunc := func(ctx context.Context) (Head, error) { - return nil, errors.New("polling error") + var pollNumber int + pollLock := sync.Mutex{} + pollFunc := func(ctx context.Context, args ...interface{}) (Head, error) { + pollLock.Lock() + defer pollLock.Unlock() + pollNumber++ + return nil, fmt.Errorf("polling error %d", pollNumber) } // data channel to receive updates from the poller channel := make(chan Head, 1) + defer close(channel) // Create poller and subscribe to receive data poller := NewPoller[Head](time.Millisecond, pollFunc, nil, channel, &lggr) @@ -72,8 +79,7 @@ func Test_Poller(t *testing.T) { defer poller.Unsubscribe() // Create goroutine to receive updates from the poller - done := make(chan struct{}) - go func() { + func() { pollCount := 0 pollMax := 50 for ; pollCount < pollMax; pollCount++ { @@ -82,24 +88,88 @@ func Test_Poller(t *testing.T) { require.Fail(t, "should not receive any data") case err := <-poller.Err(): require.Error(t, err) - require.Equal(t, "polling error", err.Error()) + require.Equal(t, fmt.Sprintf("polling error %d", pollCount+1), err.Error()) } } - close(done) }() - <-done }) - /* TODO t.Run("Test polling timeout", func(t *testing.T) { - pollingTimeout := 10 * time.Millisecond - pollFunc := func(ctx context.Context) (Head, error) { - <-ctx.Done() - return nil, ctx.Err() + pollFunc := func(ctx context.Context, args ...interface{}) (Head, error) { + time.Sleep(10 * time.Millisecond) + return nil, nil } + + // Set instant timeout + pollingTimeout := time.Duration(0) + + // data channel to receive updates from the poller + channel := make(chan Head, 1) + defer close(channel) + + // Create poller and subscribe to receive data + poller := NewPoller[Head](time.Millisecond, pollFunc, &pollingTimeout, channel, &lggr) + require.NoError(t, poller.Start()) + defer poller.Unsubscribe() + + // Create goroutine to receive updates from the poller + func() { + err := <-poller.Err() + require.Error(t, err) + require.Equal(t, "polling timeout exceeded", err.Error()) + }() }) - */ + t.Run("Test polling with args", func(t *testing.T) { + pollFunc := func(ctx context.Context, args ...interface{}) (Head, error) { + require.Equal(t, args[0], "arg1") + require.Equal(t, args[1], "arg2") + require.Equal(t, args[2], "arg3") + return nil, nil + } + + // data channel to receive updates from the poller + channel := make(chan Head, 1) + defer close(channel) + + // Create poller and subscribe to receive data + args := []interface{}{"arg1", "arg2", "arg3"} + poller := NewPoller[Head](time.Millisecond, pollFunc, nil, channel, &lggr, args...) + require.NoError(t, poller.Start()) + defer poller.Unsubscribe() + + // Ensure no errors are received + done := make(chan struct{}) + defer close(done) + monitorPollingErrors(t, poller.Err(), done) + + // Create goroutine to receive updates from the poller + func() { + h := <-channel + require.Nil(t, h) + }() + }) + + t.Run("Test panic in polling function", func(t *testing.T) { + pollFunc := func(ctx context.Context, args ...interface{}) (Head, error) { + panic("panic test") + } + + // data channel to receive updates from the poller + channel := make(chan Head, 1) + defer close(channel) + + // Create poller and subscribe to receive data + poller := NewPoller[Head](time.Millisecond, pollFunc, nil, channel, &lggr) + require.NoError(t, poller.Start()) + defer poller.Unsubscribe() + + // Create goroutine to receive updates from the poller + func() { + err := <-poller.Err() + require.Equal(t, "panic: panic test", err.Error()) + }() + }) } func Test_Poller_Unsubscribe(t *testing.T) { @@ -112,10 +182,19 @@ func Test_Poller_Unsubscribe(t *testing.T) { }) t.Run("Test unsubscribe with no subscribers", func(t *testing.T) { - // TODO: Add test case that ensures Unsubscribe exits even if no one is listening poller := NewPoller[Head](time.Millisecond, nil, nil, nil, nil) - err := poller.Start() - require.NoError(t, err) poller.Unsubscribe() }) } + +// monitorPollingErrors fails the test if an error is received on the error channel +func monitorPollingErrors(t *testing.T, errCh <-chan error, done <-chan struct{}) { + go func() { + select { + case err := <-errCh: + require.NoError(t, err) + case <-done: + return + } + }() +} From dea8dd02f4bc24cac5ecf5032ac7a2ae2ed1aeaf Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Thu, 25 Apr 2024 13:52:06 -0400 Subject: [PATCH 14/19] Check test logs --- common/client/poller.go | 14 ++---- common/client/poller_test.go | 85 +++++++++++++----------------------- 2 files changed, 35 insertions(+), 64 deletions(-) diff --git a/common/client/poller.go b/common/client/poller.go index c8133fcc863..1c608d1462c 100644 --- a/common/client/poller.go +++ b/common/client/poller.go @@ -106,11 +106,11 @@ func (p *Poller[T]) pollingLoop() { select { case <-pollingCtx.Done(): cancelPolling() - p.writeError(errors.New("polling timeout exceeded")) + p.logError(errors.New("polling timeout exceeded")) case <-pollingDone: cancelPolling() if err != nil { - p.writeError(err) + p.logError(err) continue } // Send result to channel or block if channel is full @@ -124,14 +124,8 @@ func (p *Poller[T]) pollingLoop() { } } -func (p *Poller[T]) writeError(err error) { +func (p *Poller[T]) logError(err error) { if p.logger != nil { - (*p.logger).Warnw("Polling error", "error", err) - } - // Send error to channel or block if channel is full - select { - case p.errCh <- err: - case <-p.stopCh: - return + (*p.logger).Errorf("polling error: %v", err) } } diff --git a/common/client/poller_test.go b/common/client/poller_test.go index 90e8634dc0c..bb424b12a7a 100644 --- a/common/client/poller_test.go +++ b/common/client/poller_test.go @@ -10,13 +10,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" "github.com/smartcontractkit/chainlink-common/pkg/logger" ) func Test_Poller(t *testing.T) { - lggr, err := logger.New() - require.NoError(t, err) + lggr := logger.Test(t) t.Run("Test polling for heads", func(t *testing.T) { // Mock polling function that returns a new value every time it's called @@ -42,11 +42,6 @@ func Test_Poller(t *testing.T) { require.NoError(t, poller.Start()) defer poller.Unsubscribe() - // Monitor error channel - done := make(chan struct{}) - defer close(done) - monitorPollingErrors(t, poller.Err(), done) - // Receive updates from the poller func() { pollCount := 0 @@ -73,25 +68,24 @@ func Test_Poller(t *testing.T) { channel := make(chan Head, 1) defer close(channel) + olggr, observedLogs := logger.TestObserved(t, zap.WarnLevel) + // Create poller and subscribe to receive data - poller := NewPoller[Head](time.Millisecond, pollFunc, nil, channel, &lggr) + poller := NewPoller[Head](time.Millisecond, pollFunc, nil, channel, &olggr) require.NoError(t, poller.Start()) defer poller.Unsubscribe() - // Create goroutine to receive updates from the poller - func() { - pollCount := 0 - pollMax := 50 - for ; pollCount < pollMax; pollCount++ { - select { - case <-channel: - require.Fail(t, "should not receive any data") - case err := <-poller.Err(): - require.Error(t, err) - require.Equal(t, fmt.Sprintf("polling error %d", pollCount+1), err.Error()) + // Ensure that all errors were logged as expected + logsSeen := func() bool { + for pollCount := 0; pollCount < 50; pollCount++ { + numLogs := observedLogs.FilterMessage(fmt.Sprintf("polling error: polling error %d", pollCount+1)).Len() + if numLogs != 1 { + return false } } - }() + return true + } + require.Eventually(t, logsSeen, time.Second, time.Millisecond) }) t.Run("Test polling timeout", func(t *testing.T) { @@ -107,17 +101,18 @@ func Test_Poller(t *testing.T) { channel := make(chan Head, 1) defer close(channel) + olggr, observedLogs := logger.TestObserved(t, zap.WarnLevel) + // Create poller and subscribe to receive data - poller := NewPoller[Head](time.Millisecond, pollFunc, &pollingTimeout, channel, &lggr) + poller := NewPoller[Head](time.Millisecond, pollFunc, &pollingTimeout, channel, &olggr) require.NoError(t, poller.Start()) defer poller.Unsubscribe() // Create goroutine to receive updates from the poller - func() { - err := <-poller.Err() - require.Error(t, err) - require.Equal(t, "polling timeout exceeded", err.Error()) - }() + logsSeen := func() bool { + return observedLogs.FilterMessage("polling error: polling timeout exceeded").Len() > 10 + } + require.Eventually(t, logsSeen, time.Second, time.Millisecond) }) t.Run("Test polling with args", func(t *testing.T) { @@ -138,16 +133,8 @@ func Test_Poller(t *testing.T) { require.NoError(t, poller.Start()) defer poller.Unsubscribe() - // Ensure no errors are received - done := make(chan struct{}) - defer close(done) - monitorPollingErrors(t, poller.Err(), done) - - // Create goroutine to receive updates from the poller - func() { - h := <-channel - require.Nil(t, h) - }() + h := <-channel + require.Nil(t, h) }) t.Run("Test panic in polling function", func(t *testing.T) { @@ -159,16 +146,18 @@ func Test_Poller(t *testing.T) { channel := make(chan Head, 1) defer close(channel) + olggr, observedLogs := logger.TestObserved(t, zap.WarnLevel) + // Create poller and subscribe to receive data - poller := NewPoller[Head](time.Millisecond, pollFunc, nil, channel, &lggr) + poller := NewPoller[Head](time.Millisecond, pollFunc, nil, channel, &olggr) require.NoError(t, poller.Start()) defer poller.Unsubscribe() - // Create goroutine to receive updates from the poller - func() { - err := <-poller.Err() - require.Equal(t, "panic: panic test", err.Error()) - }() + // Ensure that panic is caught and logged + logsSeen := func() bool { + return observedLogs.FilterMessage("polling error: panic: panic test").Len() > 10 + } + require.Eventually(t, logsSeen, time.Second, time.Millisecond) }) } @@ -186,15 +175,3 @@ func Test_Poller_Unsubscribe(t *testing.T) { poller.Unsubscribe() }) } - -// monitorPollingErrors fails the test if an error is received on the error channel -func monitorPollingErrors(t *testing.T, errCh <-chan error, done <-chan struct{}) { - go func() { - select { - case err := <-errCh: - require.NoError(t, err) - case <-done: - return - } - }() -} From fb068a7e843874169d8aa6dd78cd01d18b8a6d4f Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Fri, 26 Apr 2024 10:20:49 -0400 Subject: [PATCH 15/19] Update Poller --- common/client/poller.go | 63 +++++------------ common/client/poller_test.go | 130 ++++++++++++++++++++++------------- 2 files changed, 100 insertions(+), 93 deletions(-) diff --git a/common/client/poller.go b/common/client/poller.go index 1c608d1462c..9b6ff6f1355 100644 --- a/common/client/poller.go +++ b/common/client/poller.go @@ -5,8 +5,6 @@ import ( "sync" "time" - "github.com/pkg/errors" - "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" @@ -14,30 +12,28 @@ import ( ) // Poller is a component that polls a function at a given interval -// and delivers the result to a channel. It is used to poll for new heads -// and implements the Subscription interface. +// and delivers the result to a channel. It is used by multinode to poll +// for new heads and implements the Subscription interface. type Poller[T any] struct { services.StateMachine pollingInterval time.Duration - pollingFunc func(ctx context.Context, args ...interface{}) (T, error) - pollingArgs []interface{} - pollingTimeout *time.Duration + pollingFunc func(ctx context.Context) (T, error) + pollingTimeout time.Duration logger *logger.Logger channel chan<- T errCh chan error - stopCh chan struct{} + stopCh services.StopChan wg sync.WaitGroup } // NewPoller creates a new Poller instance func NewPoller[ T any, -](pollingInterval time.Duration, pollingFunc func(ctx context.Context, args ...interface{}) (T, error), pollingTimeout *time.Duration, channel chan<- T, logger *logger.Logger, args ...interface{}) Poller[T] { +](pollingInterval time.Duration, pollingFunc func(ctx context.Context) (T, error), pollingTimeout time.Duration, channel chan<- T, logger *logger.Logger) Poller[T] { return Poller[T]{ pollingInterval: pollingInterval, pollingFunc: pollingFunc, - pollingArgs: args, pollingTimeout: pollingTimeout, channel: channel, logger: logger, @@ -82,43 +78,20 @@ func (p *Poller[T]) pollingLoop() { return case <-ticker.C: // Set polling timeout - pollingCtx := context.Background() - cancelPolling := context.CancelFunc(func() {}) - if p.pollingTimeout != nil { - pollingCtx, cancelPolling = context.WithTimeout(pollingCtx, *p.pollingTimeout) + pollingCtx, cancelPolling := context.WithTimeout(context.Background(), p.pollingTimeout) + p.stopCh.CtxCancel(pollingCtx, cancelPolling) + // Execute polling function + result, err := p.pollingFunc(pollingCtx) + cancelPolling() + if err != nil { + p.logError(err) + continue } - - // Execute polling function in goroutine - var result T - var err error - pollingDone := make(chan struct{}) - go func() { - defer func() { - if r := recover(); r != nil { - err = errors.Errorf("panic: %v", r) - } - close(pollingDone) - }() - result, err = p.pollingFunc(pollingCtx, p.pollingArgs...) - }() - - // Wait for polling to complete or timeout + // Send result to channel or block if channel is full select { - case <-pollingCtx.Done(): - cancelPolling() - p.logError(errors.New("polling timeout exceeded")) - case <-pollingDone: - cancelPolling() - if err != nil { - p.logError(err) - continue - } - // Send result to channel or block if channel is full - select { - case p.channel <- result: - case <-p.stopCh: - return - } + case p.channel <- result: + case <-p.stopCh: + return } } } diff --git a/common/client/poller_test.go b/common/client/poller_test.go index bb424b12a7a..2e868ef74f4 100644 --- a/common/client/poller_test.go +++ b/common/client/poller_test.go @@ -18,11 +18,28 @@ import ( func Test_Poller(t *testing.T) { lggr := logger.Test(t) + t.Run("Test multiple start", func(t *testing.T) { + pollFunc := func(ctx context.Context) (Head, error) { + return nil, nil + } + + channel := make(chan Head, 1) + defer close(channel) + + poller := NewPoller[Head](time.Millisecond, pollFunc, time.Second, channel, &lggr) + err := poller.Start() + require.NoError(t, err) + + err = poller.Start() + require.Error(t, err) + poller.Unsubscribe() + }) + t.Run("Test polling for heads", func(t *testing.T) { // Mock polling function that returns a new value every time it's called var pollNumber int pollLock := sync.Mutex{} - pollFunc := func(ctx context.Context, args ...interface{}) (Head, error) { + pollFunc := func(ctx context.Context) (Head, error) { pollLock.Lock() defer pollLock.Unlock() pollNumber++ @@ -38,26 +55,24 @@ func Test_Poller(t *testing.T) { defer close(channel) // Create poller and start to receive data - poller := NewPoller[Head](time.Millisecond, pollFunc, nil, channel, &lggr) + poller := NewPoller[Head](time.Millisecond, pollFunc, time.Second, channel, &lggr) require.NoError(t, poller.Start()) defer poller.Unsubscribe() // Receive updates from the poller - func() { - pollCount := 0 - pollMax := 50 - for ; pollCount < pollMax; pollCount++ { - h := <-channel - assert.Equal(t, int64(pollCount+1), h.BlockNumber()) - } - }() + pollCount := 0 + pollMax := 50 + for ; pollCount < pollMax; pollCount++ { + h := <-channel + assert.Equal(t, int64(pollCount+1), h.BlockNumber()) + } }) t.Run("Test polling errors", func(t *testing.T) { // Mock polling function that returns an error var pollNumber int pollLock := sync.Mutex{} - pollFunc := func(ctx context.Context, args ...interface{}) (Head, error) { + pollFunc := func(ctx context.Context) (Head, error) { pollLock.Lock() defer pollLock.Unlock() pollNumber++ @@ -71,7 +86,7 @@ func Test_Poller(t *testing.T) { olggr, observedLogs := logger.TestObserved(t, zap.WarnLevel) // Create poller and subscribe to receive data - poller := NewPoller[Head](time.Millisecond, pollFunc, nil, channel, &olggr) + poller := NewPoller[Head](time.Millisecond, pollFunc, time.Second, channel, &olggr) require.NoError(t, poller.Start()) defer poller.Unsubscribe() @@ -89,8 +104,10 @@ func Test_Poller(t *testing.T) { }) t.Run("Test polling timeout", func(t *testing.T) { - pollFunc := func(ctx context.Context, args ...interface{}) (Head, error) { - time.Sleep(10 * time.Millisecond) + pollFunc := func(ctx context.Context) (Head, error) { + if <-ctx.Done(); true { + return nil, ctx.Err() + } return nil, nil } @@ -104,43 +121,34 @@ func Test_Poller(t *testing.T) { olggr, observedLogs := logger.TestObserved(t, zap.WarnLevel) // Create poller and subscribe to receive data - poller := NewPoller[Head](time.Millisecond, pollFunc, &pollingTimeout, channel, &olggr) + poller := NewPoller[Head](time.Millisecond, pollFunc, pollingTimeout, channel, &olggr) require.NoError(t, poller.Start()) defer poller.Unsubscribe() - // Create goroutine to receive updates from the poller + // Ensure that timeout errors were logged as expected logsSeen := func() bool { - return observedLogs.FilterMessage("polling error: polling timeout exceeded").Len() > 10 + return observedLogs.FilterMessage("polling error: context deadline exceeded").Len() > 10 } require.Eventually(t, logsSeen, time.Second, time.Millisecond) }) - t.Run("Test polling with args", func(t *testing.T) { - pollFunc := func(ctx context.Context, args ...interface{}) (Head, error) { - require.Equal(t, args[0], "arg1") - require.Equal(t, args[1], "arg2") - require.Equal(t, args[2], "arg3") - return nil, nil + t.Run("Test unsubscribe during polling", func(t *testing.T) { + pollFunc := func(ctx context.Context) (Head, error) { + time.Sleep(10 * time.Millisecond) + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + h := head{ + BlockNumber: 0, + BlockDifficulty: big.NewInt(0), + } + return h.ToMockHead(t), nil + } } - // data channel to receive updates from the poller - channel := make(chan Head, 1) - defer close(channel) - - // Create poller and subscribe to receive data - args := []interface{}{"arg1", "arg2", "arg3"} - poller := NewPoller[Head](time.Millisecond, pollFunc, nil, channel, &lggr, args...) - require.NoError(t, poller.Start()) - defer poller.Unsubscribe() - - h := <-channel - require.Nil(t, h) - }) - - t.Run("Test panic in polling function", func(t *testing.T) { - pollFunc := func(ctx context.Context, args ...interface{}) (Head, error) { - panic("panic test") - } + // Set long timeout + pollingTimeout := time.Minute // data channel to receive updates from the poller channel := make(chan Head, 1) @@ -149,29 +157,55 @@ func Test_Poller(t *testing.T) { olggr, observedLogs := logger.TestObserved(t, zap.WarnLevel) // Create poller and subscribe to receive data - poller := NewPoller[Head](time.Millisecond, pollFunc, nil, channel, &olggr) + poller := NewPoller[Head](time.Millisecond, pollFunc, pollingTimeout, channel, &olggr) require.NoError(t, poller.Start()) - defer poller.Unsubscribe() - // Ensure that panic is caught and logged + // Unsubscribe during polling + time.Sleep(10 * time.Millisecond) + poller.Unsubscribe() + + // Ensure that timeout errors were logged as expected logsSeen := func() bool { - return observedLogs.FilterMessage("polling error: panic: panic test").Len() > 10 + return observedLogs.FilterMessage("polling error: context canceled").Len() > 1 } require.Eventually(t, logsSeen, time.Second, time.Millisecond) }) } func Test_Poller_Unsubscribe(t *testing.T) { + lggr := logger.Test(t) + pollFunc := func(ctx context.Context) (Head, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + h := head{ + BlockNumber: 0, + BlockDifficulty: big.NewInt(0), + } + return h.ToMockHead(t), nil + } + } + t.Run("Test multiple unsubscribe", func(t *testing.T) { - poller := NewPoller[Head](time.Millisecond, nil, nil, nil, nil) + channel := make(chan Head, 1) + poller := NewPoller[Head](time.Millisecond, pollFunc, time.Second, channel, &lggr) err := poller.Start() require.NoError(t, err) + + <-channel poller.Unsubscribe() poller.Unsubscribe() }) - t.Run("Test unsubscribe with no subscribers", func(t *testing.T) { - poller := NewPoller[Head](time.Millisecond, nil, nil, nil, nil) + t.Run("Test unsubscribe with closed channel", func(t *testing.T) { + channel := make(chan Head, 1) + poller := NewPoller[Head](time.Millisecond, pollFunc, time.Second, channel, &lggr) + err := poller.Start() + require.NoError(t, err) + + <-channel + close(channel) poller.Unsubscribe() }) } From d16555e59505517c6a1a41db6cd50acd3dd37b1a Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Fri, 26 Apr 2024 10:33:24 -0400 Subject: [PATCH 16/19] Update poller_test.go --- common/client/poller_test.go | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/common/client/poller_test.go b/common/client/poller_test.go index 2e868ef74f4..090b3bae7ce 100644 --- a/common/client/poller_test.go +++ b/common/client/poller_test.go @@ -134,17 +134,11 @@ func Test_Poller(t *testing.T) { t.Run("Test unsubscribe during polling", func(t *testing.T) { pollFunc := func(ctx context.Context) (Head, error) { - time.Sleep(10 * time.Millisecond) - select { - case <-ctx.Done(): + // Block in polling function until context is cancelled + if <-ctx.Done(); true { return nil, ctx.Err() - default: - h := head{ - BlockNumber: 0, - BlockDifficulty: big.NewInt(0), - } - return h.ToMockHead(t), nil } + return nil, nil } // Set long timeout @@ -160,13 +154,13 @@ func Test_Poller(t *testing.T) { poller := NewPoller[Head](time.Millisecond, pollFunc, pollingTimeout, channel, &olggr) require.NoError(t, poller.Start()) - // Unsubscribe during polling - time.Sleep(10 * time.Millisecond) + // Unsubscribe while blocked in polling function + time.Sleep(20 * time.Millisecond) poller.Unsubscribe() - // Ensure that timeout errors were logged as expected + // Ensure error was logged logsSeen := func() bool { - return observedLogs.FilterMessage("polling error: context canceled").Len() > 1 + return observedLogs.FilterMessage("polling error: context canceled").Len() >= 1 } require.Eventually(t, logsSeen, time.Second, time.Millisecond) }) From 351a083d8ef1bf66f4a220ad74cce3eba954fdbd Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Fri, 26 Apr 2024 10:38:09 -0400 Subject: [PATCH 17/19] Update poller_test.go --- common/client/poller_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/common/client/poller_test.go b/common/client/poller_test.go index 090b3bae7ce..c6a4b8796c1 100644 --- a/common/client/poller_test.go +++ b/common/client/poller_test.go @@ -127,13 +127,15 @@ func Test_Poller(t *testing.T) { // Ensure that timeout errors were logged as expected logsSeen := func() bool { - return observedLogs.FilterMessage("polling error: context deadline exceeded").Len() > 10 + return observedLogs.FilterMessage("polling error: context deadline exceeded").Len() >= 1 } require.Eventually(t, logsSeen, time.Second, time.Millisecond) }) t.Run("Test unsubscribe during polling", func(t *testing.T) { + wait := make(chan struct{}) pollFunc := func(ctx context.Context) (Head, error) { + close(wait) // Block in polling function until context is cancelled if <-ctx.Done(); true { return nil, ctx.Err() @@ -155,7 +157,7 @@ func Test_Poller(t *testing.T) { require.NoError(t, poller.Start()) // Unsubscribe while blocked in polling function - time.Sleep(20 * time.Millisecond) + <-wait poller.Unsubscribe() // Ensure error was logged From 04c4cfadc787bba4d640eaef0e50687aea4ee067 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Fri, 26 Apr 2024 11:06:18 -0400 Subject: [PATCH 18/19] Simplify poller --- common/client/poller.go | 12 +++--------- common/client/poller_test.go | 14 +++++++------- 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/common/client/poller.go b/common/client/poller.go index 9b6ff6f1355..e25faddb7a8 100644 --- a/common/client/poller.go +++ b/common/client/poller.go @@ -19,7 +19,7 @@ type Poller[T any] struct { pollingInterval time.Duration pollingFunc func(ctx context.Context) (T, error) pollingTimeout time.Duration - logger *logger.Logger + logger logger.Logger channel chan<- T errCh chan error @@ -30,7 +30,7 @@ type Poller[T any] struct { // NewPoller creates a new Poller instance func NewPoller[ T any, -](pollingInterval time.Duration, pollingFunc func(ctx context.Context) (T, error), pollingTimeout time.Duration, channel chan<- T, logger *logger.Logger) Poller[T] { +](pollingInterval time.Duration, pollingFunc func(ctx context.Context) (T, error), pollingTimeout time.Duration, channel chan<- T, logger logger.Logger) Poller[T] { return Poller[T]{ pollingInterval: pollingInterval, pollingFunc: pollingFunc, @@ -84,7 +84,7 @@ func (p *Poller[T]) pollingLoop() { result, err := p.pollingFunc(pollingCtx) cancelPolling() if err != nil { - p.logError(err) + p.logger.Errorf("polling error: %v", err) continue } // Send result to channel or block if channel is full @@ -96,9 +96,3 @@ func (p *Poller[T]) pollingLoop() { } } } - -func (p *Poller[T]) logError(err error) { - if p.logger != nil { - (*p.logger).Errorf("polling error: %v", err) - } -} diff --git a/common/client/poller_test.go b/common/client/poller_test.go index c6a4b8796c1..3f11c759adb 100644 --- a/common/client/poller_test.go +++ b/common/client/poller_test.go @@ -26,7 +26,7 @@ func Test_Poller(t *testing.T) { channel := make(chan Head, 1) defer close(channel) - poller := NewPoller[Head](time.Millisecond, pollFunc, time.Second, channel, &lggr) + poller := NewPoller[Head](time.Millisecond, pollFunc, time.Second, channel, lggr) err := poller.Start() require.NoError(t, err) @@ -55,7 +55,7 @@ func Test_Poller(t *testing.T) { defer close(channel) // Create poller and start to receive data - poller := NewPoller[Head](time.Millisecond, pollFunc, time.Second, channel, &lggr) + poller := NewPoller[Head](time.Millisecond, pollFunc, time.Second, channel, lggr) require.NoError(t, poller.Start()) defer poller.Unsubscribe() @@ -86,7 +86,7 @@ func Test_Poller(t *testing.T) { olggr, observedLogs := logger.TestObserved(t, zap.WarnLevel) // Create poller and subscribe to receive data - poller := NewPoller[Head](time.Millisecond, pollFunc, time.Second, channel, &olggr) + poller := NewPoller[Head](time.Millisecond, pollFunc, time.Second, channel, olggr) require.NoError(t, poller.Start()) defer poller.Unsubscribe() @@ -121,7 +121,7 @@ func Test_Poller(t *testing.T) { olggr, observedLogs := logger.TestObserved(t, zap.WarnLevel) // Create poller and subscribe to receive data - poller := NewPoller[Head](time.Millisecond, pollFunc, pollingTimeout, channel, &olggr) + poller := NewPoller[Head](time.Millisecond, pollFunc, pollingTimeout, channel, olggr) require.NoError(t, poller.Start()) defer poller.Unsubscribe() @@ -153,7 +153,7 @@ func Test_Poller(t *testing.T) { olggr, observedLogs := logger.TestObserved(t, zap.WarnLevel) // Create poller and subscribe to receive data - poller := NewPoller[Head](time.Millisecond, pollFunc, pollingTimeout, channel, &olggr) + poller := NewPoller[Head](time.Millisecond, pollFunc, pollingTimeout, channel, olggr) require.NoError(t, poller.Start()) // Unsubscribe while blocked in polling function @@ -185,7 +185,7 @@ func Test_Poller_Unsubscribe(t *testing.T) { t.Run("Test multiple unsubscribe", func(t *testing.T) { channel := make(chan Head, 1) - poller := NewPoller[Head](time.Millisecond, pollFunc, time.Second, channel, &lggr) + poller := NewPoller[Head](time.Millisecond, pollFunc, time.Second, channel, lggr) err := poller.Start() require.NoError(t, err) @@ -196,7 +196,7 @@ func Test_Poller_Unsubscribe(t *testing.T) { t.Run("Test unsubscribe with closed channel", func(t *testing.T) { channel := make(chan Head, 1) - poller := NewPoller[Head](time.Millisecond, pollFunc, time.Second, channel, &lggr) + poller := NewPoller[Head](time.Millisecond, pollFunc, time.Second, channel, lggr) err := poller.Start() require.NoError(t, err) From 8f3619dda5676f3ab27c977dbb647a0ac3b2ec8c Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Fri, 26 Apr 2024 11:12:17 -0400 Subject: [PATCH 19/19] Set logging to warn --- common/client/poller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/client/poller.go b/common/client/poller.go index e25faddb7a8..b21f28fe604 100644 --- a/common/client/poller.go +++ b/common/client/poller.go @@ -84,7 +84,7 @@ func (p *Poller[T]) pollingLoop() { result, err := p.pollingFunc(pollingCtx) cancelPolling() if err != nil { - p.logger.Errorf("polling error: %v", err) + p.logger.Warnf("polling error: %v", err) continue } // Send result to channel or block if channel is full