Skip to content

Commit

Permalink
Always wait for doCheck to complete before returning (#10878)
Browse files Browse the repository at this point in the history
* Always wait for doCheck to complete before returning
Use threadctrl to avail of context cancellation but still rely on a wait group to blocl until the lookups finish
Update tests

* Revert async streamsLookup to fix test

* Rearrange thread control as a test

* Copy index

* WIP
  • Loading branch information
ferglor authored Nov 13, 2023
1 parent 35e146f commit de50273
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ func (r *EvmRegistry) CheckUpkeeps(ctx context.Context, keys ...ocr2keepers.Upke
}

chResult := make(chan checkResult, 1)
go r.doCheck(ctx, keys, chResult)

r.threadCtrl.Go(func(ctx context.Context) {
r.doCheck(ctx, keys, chResult)
})

select {
case rs := <-chResult:
Expand Down
16 changes: 13 additions & 3 deletions core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,15 @@ func (r *EvmRegistry) streamsLookup(ctx context.Context, checkResults []ocr2keep
}

var wg sync.WaitGroup

for i, lookup := range lookups {
i := i
wg.Add(1)
go r.doLookup(ctx, &wg, lookup, i, checkResults, lggr)
r.threadCtrl.Go(func(ctx context.Context) {
r.doLookup(ctx, &wg, lookup, i, checkResults, lggr)
})
}

wg.Wait()

// don't surface error to plugin bc StreamsLookup process should be self-contained.
Expand Down Expand Up @@ -289,14 +294,19 @@ func (r *EvmRegistry) doMercuryRequest(ctx context.Context, sl *StreamsLookup, p
if sl.FeedParamKey == feedIdHex && sl.TimeParamKey == blockNumber {
// only mercury v0.2
for i := range sl.Feeds {
go r.singleFeedRequest(ctx, ch, i, sl, lggr)
i := i
r.threadCtrl.Go(func(ctx context.Context) {
r.singleFeedRequest(ctx, ch, i, sl, lggr)
})
}
} else if sl.FeedParamKey == feedIDs {
// only mercury v0.3
resultLen = 1
isMercuryV03 = true
ch = make(chan MercuryData, resultLen)
go r.multiFeedsRequest(ctx, ch, sl, lggr)
r.threadCtrl.Go(func(ctx context.Context) {
r.multiFeedsRequest(ctx, ch, sl, lggr)
})
} else {
return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0 * time.Second, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", sl.FeedParamKey, sl.TimeParamKey, sl.Feeds)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evm21/encoding"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evm21/mocks"
"github.com/smartcontractkit/chainlink/v2/core/utils"

evmClientMocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
Expand Down Expand Up @@ -70,7 +71,8 @@ func setupEVMRegistry(t *testing.T) *EvmRegistry {
allowListCache: cache.New(defaultAllowListExpiration, cleanupInterval),
pluginRetryCache: cache.New(defaultPluginRetryExpiration, cleanupInterval),
},
hc: mockHttpClient,
hc: mockHttpClient,
threadCtrl: utils.NewThreadControl(),
}
return r
}
Expand Down Expand Up @@ -220,6 +222,7 @@ func TestEvmRegistry_StreamsLookup(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := setupEVMRegistry(t)
defer r.Close()
client := new(evmClientMocks.Client)
r.client = client

Expand Down Expand Up @@ -362,6 +365,7 @@ func TestEvmRegistry_AllowedToUseMercury(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := setupEVMRegistry(t)
defer r.Close()

client := new(evmClientMocks.Client)
r.client = client
Expand Down Expand Up @@ -576,9 +580,12 @@ func TestEvmRegistry_DoMercuryRequestV02(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := setupEVMRegistry(t)
defer r.Close()

if tt.pluginRetries != 0 {
r.mercury.pluginRetryCache.Set(tt.pluginRetryKey, tt.pluginRetries, cache.DefaultExpiration)
}

hc := mocks.NewHttpClient(t)

for _, blob := range tt.mockChainlinkBlobs {
Expand Down Expand Up @@ -812,6 +819,8 @@ func TestEvmRegistry_SingleFeedRequest(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := setupEVMRegistry(t)
defer r.Close()

hc := mocks.NewHttpClient(t)

mr := MercuryV02Response{ChainlinkBlob: tt.blob}
Expand Down Expand Up @@ -1157,6 +1166,8 @@ func TestEvmRegistry_MultiFeedRequest(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := setupEVMRegistry(t)
defer r.Close()

if tt.pluginRetries != 0 {
r.mercury.pluginRetryCache.Set(tt.pluginRetryKey, tt.pluginRetries, cache.DefaultExpiration)
}
Expand Down Expand Up @@ -1319,6 +1330,8 @@ func TestEvmRegistry_CheckCallback(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
client := new(evmClientMocks.Client)
r := setupEVMRegistry(t)
defer r.Close()

payload, err := r.abi.Pack("checkCallback", tt.lookup.upkeepId, values, tt.lookup.ExtraData)
require.Nil(t, err)
args := map[string]interface{}{
Expand Down

0 comments on commit de50273

Please sign in to comment.