From 76f078567f29812b8173f6ecaa51cf610392d275 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Fri, 6 Oct 2023 16:54:27 +0100 Subject: [PATCH 1/5] Always wait for doCheck to complete before returning Use threadctrl to avail of context cancellation but still rely on a wait group to blocl until the lookups finish Update tests --- .../ocr2keeper/evm21/registry_check_pipeline.go | 5 ++++- .../plugins/ocr2keeper/evm21/streams_lookup.go | 15 ++++++++++++--- .../ocr2keeper/evm21/streams_lookup_test.go | 15 ++++++++++++++- 3 files changed, 30 insertions(+), 5 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/registry_check_pipeline.go b/core/services/ocr2/plugins/ocr2keeper/evm21/registry_check_pipeline.go index d3530994702..c9752ea14db 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/registry_check_pipeline.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/registry_check_pipeline.go @@ -41,7 +41,10 @@ func (r *EvmRegistry) CheckUpkeeps(ctx context.Context, keys ...ocr2keepers.Upke } chResult := make(chan checkResult, 1) - go r.doCheck(ctx, keys, chResult) + + r.threadCtrl.Go(func(ctx context.Context) { + r.doCheck(ctx, keys, chResult) + }) select { case rs := <-chResult: diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go index 660550afe97..4c7488910d6 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go @@ -149,10 +149,14 @@ func (r *EvmRegistry) streamsLookup(ctx context.Context, checkResults []ocr2keep } var wg sync.WaitGroup + for i, lookup := range lookups { wg.Add(1) - go r.doLookup(ctx, &wg, lookup, i, checkResults, lggr) + r.threadCtrl.Go(func(ctx context.Context) { + r.doLookup(ctx, &wg, lookup, i, checkResults, lggr) + }) } + wg.Wait() // don't surface error to plugin bc StreamsLookup process should be self-contained. @@ -289,14 +293,19 @@ func (r *EvmRegistry) doMercuryRequest(ctx context.Context, sl *StreamsLookup, p if sl.FeedParamKey == feedIdHex && sl.TimeParamKey == blockNumber { // only mercury v0.2 for i := range sl.Feeds { - go r.singleFeedRequest(ctx, ch, i, sl, lggr) + i := i + r.threadCtrl.Go(func(ctx context.Context) { + r.singleFeedRequest(ctx, ch, i, sl, lggr) + }) } } else if sl.FeedParamKey == feedIDs { // only mercury v0.3 resultLen = 1 isMercuryV03 = true ch = make(chan MercuryData, resultLen) - go r.multiFeedsRequest(ctx, ch, sl, lggr) + r.threadCtrl.Go(func(ctx context.Context) { + r.multiFeedsRequest(ctx, ch, sl, lggr) + }) } else { return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0 * time.Second, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", sl.FeedParamKey, sl.TimeParamKey, sl.Feeds) } diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup_test.go index 8d7c67d80ce..145d701454d 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup_test.go @@ -25,6 +25,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evm21/encoding" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evm21/mocks" + "github.com/smartcontractkit/chainlink/v2/core/utils" evmClientMocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" @@ -70,7 +71,8 @@ func setupEVMRegistry(t *testing.T) *EvmRegistry { allowListCache: cache.New(defaultAllowListExpiration, cleanupInterval), pluginRetryCache: cache.New(defaultPluginRetryExpiration, cleanupInterval), }, - hc: mockHttpClient, + hc: mockHttpClient, + threadCtrl: utils.NewThreadControl(), } return r } @@ -220,6 +222,7 @@ func TestEvmRegistry_StreamsLookup(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { r := setupEVMRegistry(t) + defer r.Close() client := new(evmClientMocks.Client) r.client = client @@ -362,6 +365,7 @@ func TestEvmRegistry_AllowedToUseMercury(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { r := setupEVMRegistry(t) + defer r.Close() client := new(evmClientMocks.Client) r.client = client @@ -576,9 +580,12 @@ func TestEvmRegistry_DoMercuryRequestV02(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { r := setupEVMRegistry(t) + defer r.Close() + if tt.pluginRetries != 0 { r.mercury.pluginRetryCache.Set(tt.pluginRetryKey, tt.pluginRetries, cache.DefaultExpiration) } + hc := mocks.NewHttpClient(t) for _, blob := range tt.mockChainlinkBlobs { @@ -812,6 +819,8 @@ func TestEvmRegistry_SingleFeedRequest(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { r := setupEVMRegistry(t) + defer r.Close() + hc := mocks.NewHttpClient(t) mr := MercuryV02Response{ChainlinkBlob: tt.blob} @@ -1157,6 +1166,8 @@ func TestEvmRegistry_MultiFeedRequest(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { r := setupEVMRegistry(t) + defer r.Close() + if tt.pluginRetries != 0 { r.mercury.pluginRetryCache.Set(tt.pluginRetryKey, tt.pluginRetries, cache.DefaultExpiration) } @@ -1319,6 +1330,8 @@ func TestEvmRegistry_CheckCallback(t *testing.T) { t.Run(tt.name, func(t *testing.T) { client := new(evmClientMocks.Client) r := setupEVMRegistry(t) + defer r.Close() + payload, err := r.abi.Pack("checkCallback", tt.lookup.upkeepId, values, tt.lookup.ExtraData) require.Nil(t, err) args := map[string]interface{}{ From 4c824d84aec10e0a86fac8fbe7d99b39089aca7a Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Wed, 8 Nov 2023 20:41:34 +0000 Subject: [PATCH 2/5] Revert async streamsLookup to fix test --- core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go index 4c7488910d6..79486986e9e 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go @@ -152,9 +152,7 @@ func (r *EvmRegistry) streamsLookup(ctx context.Context, checkResults []ocr2keep for i, lookup := range lookups { wg.Add(1) - r.threadCtrl.Go(func(ctx context.Context) { - r.doLookup(ctx, &wg, lookup, i, checkResults, lggr) - }) + go r.doLookup(ctx, &wg, lookup, i, checkResults, lggr) } wg.Wait() From b21971fedd84e656c0258848d3c6121618cd26fe Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Wed, 8 Nov 2023 21:22:31 +0000 Subject: [PATCH 3/5] Rearrange thread control as a test --- .../ocr2/plugins/ocr2keeper/evm21/streams_lookup.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go index 79486986e9e..ce29a693700 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go @@ -150,10 +150,13 @@ func (r *EvmRegistry) streamsLookup(ctx context.Context, checkResults []ocr2keep var wg sync.WaitGroup - for i, lookup := range lookups { - wg.Add(1) - go r.doLookup(ctx, &wg, lookup, i, checkResults, lggr) - } + wg.Add(len(lookups)) + + r.threadCtrl.Go(func(ctx context.Context) { + for i, lookup := range lookups { + r.doLookup(ctx, &wg, lookup, i, checkResults, lggr) + } + }) wg.Wait() From f243563749b75c8bb53a2e24364a6b8a838f6454 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Wed, 8 Nov 2023 23:04:57 +0000 Subject: [PATCH 4/5] Copy index --- .../plugins/ocr2keeper/evm21/streams_lookup.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go index ce29a693700..5e5da10997a 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go @@ -150,13 +150,13 @@ func (r *EvmRegistry) streamsLookup(ctx context.Context, checkResults []ocr2keep var wg sync.WaitGroup - wg.Add(len(lookups)) - - r.threadCtrl.Go(func(ctx context.Context) { - for i, lookup := range lookups { - r.doLookup(ctx, &wg, lookup, i, checkResults, lggr) - } - }) + for i, lookup := range lookups { + idx := i + wg.Add(1) + r.threadCtrl.Go(func(ctx context.Context) { + r.doLookup(ctx, &wg, lookup, idx, checkResults, lggr) + }) + } wg.Wait() From c6e09f1e84f44eb14cb4e127f1d1e11ec3a5a431 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Wed, 8 Nov 2023 23:16:52 +0000 Subject: [PATCH 5/5] WIP --- core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go index 5e5da10997a..fb2821a74b7 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go @@ -151,10 +151,10 @@ func (r *EvmRegistry) streamsLookup(ctx context.Context, checkResults []ocr2keep var wg sync.WaitGroup for i, lookup := range lookups { - idx := i + i := i wg.Add(1) r.threadCtrl.Go(func(ctx context.Context) { - r.doLookup(ctx, &wg, lookup, idx, checkResults, lggr) + r.doLookup(ctx, &wg, lookup, i, checkResults, lggr) }) }