Skip to content

Commit

Permalink
Auto 4601/add log retry integration test (#10355)
Browse files Browse the repository at this point in the history
* Log Trigger Pipeline Retry Integration Test

This commit offers a proposal for completing an integration test that runs log
trigger upkeeps, a full check pipeline, fails mercury server calls, and allows
the application to retry the upkeeps.

Additions include:
- mock mercury server that can have a custom handler per test
- upkeep setup helpers for creating/registering log triggered upkeeps
- integration test for log event retry flow

* successful run of retry integration test

* cleaned up test and packer and added tests

* corrected pipeline error types and retryable states

* fix test and streams name change

* fix streams lookup test

* fix AllowedToUseMercury test

* fix mercury test

* apply code suggestions
  • Loading branch information
EasterTheBunny authored Sep 13, 2023
1 parent 4a2fc41 commit 50134ee
Show file tree
Hide file tree
Showing 7 changed files with 719 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,6 @@ type Packer interface {
UnpackLogTriggerConfig(raw []byte) (automation_utils_2_1.LogTriggerConfig, error)
PackReport(report automation_utils_2_1.KeeperRegistryBase21Report) ([]byte, error)
UnpackReport(raw []byte) (automation_utils_2_1.KeeperRegistryBase21Report, error)
PackGetUpkeepPrivilegeConfig(upkeepId *big.Int) ([]byte, error)
UnpackGetUpkeepPrivilegeConfig(resp []byte) ([]byte, error)
}
15 changes: 15 additions & 0 deletions core/services/ocr2/plugins/ocr2keeper/evm21/encoding/packer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,21 @@ func (p *abiPacker) UnpackCheckResult(payload ocr2keepers.UpkeepPayload, raw str
return result, nil
}

func (p *abiPacker) PackGetUpkeepPrivilegeConfig(upkeepId *big.Int) ([]byte, error) {
return p.abi.Pack("getUpkeepPrivilegeConfig", upkeepId)
}

func (p *abiPacker) UnpackGetUpkeepPrivilegeConfig(resp []byte) ([]byte, error) {
out, err := p.abi.Methods["getUpkeepPrivilegeConfig"].Outputs.UnpackValues(resp)
if err != nil {
return nil, fmt.Errorf("%w: unpack getUpkeepPrivilegeConfig return", err)
}

bts := *abi.ConvertType(out[0], new([]byte)).(*[]byte)

return bts, nil
}

func (p *abiPacker) UnpackCheckCallbackResult(callbackResp []byte) (PipelineExecutionState, bool, []byte, uint8, *big.Int, error) {
out, err := p.abi.Methods["checkCallback"].Outputs.UnpackValues(callbackResp)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package encoding

import (
"encoding/json"
"fmt"
"math/big"
"strings"
Expand Down Expand Up @@ -355,6 +356,97 @@ func TestPacker_PackReport_UnpackReport(t *testing.T) {
assert.Equal(t, hexutil.Encode(res), expected)
}

func TestPacker_PackGetUpkeepPrivilegeConfig(t *testing.T) {
tests := []struct {
name string
upkeepId *big.Int
raw []byte
errored bool
}{
{
name: "happy path",
upkeepId: func() *big.Int {
id, _ := new(big.Int).SetString("52236098515066839510538748191966098678939830769967377496848891145101407612976", 10)

return id
}(),
raw: func() []byte {
b, _ := hexutil.Decode("0x19d97a94737c9583000000000000000000000001ea8ed6d0617dd5b3b87374020efaf030")

return b
}(),
errored: false,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
packer, err := newPacker()
require.NoError(t, err, "valid packer required for test")

b, err := packer.PackGetUpkeepPrivilegeConfig(test.upkeepId)

if !test.errored {
require.NoError(t, err, "no error expected from packing")

assert.Equal(t, test.raw, b, "raw bytes for output should match expected")
} else {
assert.NotNil(t, err, "error expected from packing function")
}
})
}
}

func TestPacker_UnpackGetUpkeepPrivilegeConfig(t *testing.T) {
tests := []struct {
name string
raw []byte
errored bool
}{
{
name: "happy path",
raw: func() []byte {
b, _ := hexutil.Decode("0x000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000177b226d657263757279456e61626c6564223a747275657d000000000000000000")

return b
}(),
errored: false,
},
{
name: "error empty config",
raw: func() []byte {
b, _ := hexutil.Decode("0x")

return b
}(),
errored: true,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
packer, err := newPacker()
require.NoError(t, err, "valid packer required for test")

b, err := packer.UnpackGetUpkeepPrivilegeConfig(test.raw)

if !test.errored {
require.NoError(t, err, "should unpack bytes from abi encoded value")

// the actual struct to unmarshal into is not available to this
// package so basic json encoding is the limit of the following test
var data map[string]interface{}
err = json.Unmarshal(b, &data)

assert.NoError(t, err, "packed data should unmarshal using json encoding")
assert.Equal(t, []byte(`{"mercuryEnabled":true}`), b)
} else {
assert.NotNil(t, err, "error expected from unpack function")
}
})
}
}

func newPacker() (*abiPacker, error) {
keepersABI, err := abi.JSON(strings.NewReader(iregistry21.IKeeperRegistryMasterABI))
if err != nil {
Expand Down
34 changes: 28 additions & 6 deletions core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,22 +220,44 @@ func (r *EvmRegistry) allowedToUseMercury(opts *bind.CallOpts, upkeepId *big.Int
return encoding.NoPipelineError, encoding.UpkeepFailureReasonNone, false, allowed.(bool), nil
}

cfg, err := r.registry.GetUpkeepPrivilegeConfig(opts, upkeepId)
payload, err := r.packer.PackGetUpkeepPrivilegeConfig(upkeepId)
if err != nil {
// pack error, no retryable
r.lggr.Warnf("failed to pack getUpkeepPrivilegeConfig data for upkeepId %s: %s", upkeepId, err)

return encoding.PackUnpackDecodeFailed, encoding.UpkeepFailureReasonNone, false, false, fmt.Errorf("failed to pack upkeepId: %w", err)
}

var resultBytes hexutil.Bytes
args := map[string]interface{}{
"to": r.addr.Hex(),
"data": hexutil.Bytes(payload),
}

// call checkCallback function at the block which OCR3 has agreed upon
err = r.client.CallContext(opts.Context, &resultBytes, "eth_call", args, opts.BlockNumber)
if err != nil {
return encoding.RpcFlakyFailure, encoding.UpkeepFailureReasonNone, true, false, fmt.Errorf("failed to get upkeep privilege config: %v", err)
}

cfg, err := r.packer.UnpackGetUpkeepPrivilegeConfig(resultBytes)
if err != nil {
return encoding.PackUnpackDecodeFailed, encoding.UpkeepFailureReasonNone, false, false, fmt.Errorf("failed to get upkeep privilege config: %v", err)
}

if len(cfg) == 0 {
r.mercury.allowListCache.Set(upkeepId.String(), false, cache.DefaultExpiration)
return encoding.NoPipelineError, encoding.UpkeepFailureReasonMercuryAccessNotAllowed, false, false, fmt.Errorf("upkeep privilege config is empty")
}

var a UpkeepPrivilegeConfig
err = json.Unmarshal(cfg, &a)
if err != nil {
var privilegeConfig UpkeepPrivilegeConfig
if err := json.Unmarshal(cfg, &privilegeConfig); err != nil {
return encoding.MercuryUnmarshalError, encoding.UpkeepFailureReasonNone, false, false, fmt.Errorf("failed to unmarshal privilege config: %v", err)
}
r.mercury.allowListCache.Set(upkeepId.String(), a.MercuryEnabled, cache.DefaultExpiration)
return encoding.NoPipelineError, encoding.UpkeepFailureReasonNone, false, a.MercuryEnabled, nil

r.mercury.allowListCache.Set(upkeepId.String(), privilegeConfig.MercuryEnabled, cache.DefaultExpiration)

return encoding.NoPipelineError, encoding.UpkeepFailureReasonNone, false, privilegeConfig.MercuryEnabled, nil
}

// decodeStreamsLookup decodes the revert error StreamsLookup(string feedParamKey, string[] feeds, string feedParamKey, uint256 time, byte[] extraData)
Expand Down
82 changes: 66 additions & 16 deletions core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"testing"

"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/patrickmn/go-cache"
Expand Down Expand Up @@ -184,14 +185,30 @@ func TestEvmRegistry_StreamsLookup(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := setupEVMRegistry(t)
client := new(evmClientMocks.Client)
r.client = client

if !tt.cachedAdminCfg && !tt.hasError {
mockReg := mocks.NewRegistry(t)
cfg := UpkeepPrivilegeConfig{MercuryEnabled: tt.hasPermission}
b, err := json.Marshal(cfg)
assert.Nil(t, err)
mockReg.On("GetUpkeepPrivilegeConfig", mock.Anything, upkeepId).Return(b, nil)
r.registry = mockReg
bCfg, err := json.Marshal(cfg)
require.Nil(t, err)

bContractCfg, err := r.abi.Methods["getUpkeepPrivilegeConfig"].Outputs.PackValues([]interface{}{bCfg})
require.Nil(t, err)

payload, err := r.abi.Pack("getUpkeepPrivilegeConfig", upkeepId)
require.Nil(t, err)

args := map[string]interface{}{
"to": r.addr.Hex(),
"data": hexutil.Bytes(payload),
}

client.On("CallContext", mock.Anything, mock.AnythingOfType("*hexutil.Bytes"), "eth_call", args, mock.AnythingOfType("*big.Int")).Return(nil).
Run(func(args mock.Arguments) {
b := args.Get(1).(*hexutil.Bytes)
*b = bContractCfg
}).Once()
}

if len(tt.blobs) > 0 {
Expand Down Expand Up @@ -227,13 +244,11 @@ func TestEvmRegistry_StreamsLookup(t *testing.T) {
"to": r.addr.Hex(),
"data": hexutil.Bytes(payload),
}
client := new(evmClientMocks.Client)
client.On("CallContext", mock.Anything, mock.AnythingOfType("*hexutil.Bytes"), "eth_call", args, hexutil.EncodeUint64(uint64(blockNum))).Return(nil).
Run(func(args mock.Arguments) {
b := args.Get(1).(*hexutil.Bytes)
*b = tt.checkCallbackResp
}).Once()
r.client = client
}

got := r.streamsLookup(context.Background(), tt.input)
Expand Down Expand Up @@ -337,24 +352,59 @@ func TestEvmRegistry_AllowedToUseMercury(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
r := setupEVMRegistry(t)

client := new(evmClientMocks.Client)
r.client = client

if tt.cached {
r.mercury.allowListCache.Set(upkeepId.String(), tt.allowed, cache.DefaultExpiration)
} else {
if tt.err != nil {
mockReg := mocks.NewRegistry(t)
mockReg.On("GetUpkeepPrivilegeConfig", mock.Anything, upkeepId).Return(tt.config, tt.ethCallErr)
r.registry = mockReg
bContractCfg, err := r.abi.Methods["getUpkeepPrivilegeConfig"].Outputs.PackValues([]interface{}{tt.config})
require.Nil(t, err)

payload, err := r.abi.Pack("getUpkeepPrivilegeConfig", upkeepId)
require.Nil(t, err)

args := map[string]interface{}{
"to": r.addr.Hex(),
"data": hexutil.Bytes(payload),
}

client.On("CallContext", mock.Anything, mock.AnythingOfType("*hexutil.Bytes"), "eth_call", args, mock.AnythingOfType("*big.Int")).
Return(tt.ethCallErr).
Run(func(args mock.Arguments) {
b := args.Get(1).(*hexutil.Bytes)
*b = bContractCfg
}).Once()
} else {
mockReg := mocks.NewRegistry(t)
cfg := UpkeepPrivilegeConfig{MercuryEnabled: tt.allowed}
b, err := json.Marshal(cfg)
assert.Nil(t, err)
mockReg.On("GetUpkeepPrivilegeConfig", mock.Anything, upkeepId).Return(b, nil)
r.registry = mockReg
bCfg, err := json.Marshal(cfg)
require.Nil(t, err)

bContractCfg, err := r.abi.Methods["getUpkeepPrivilegeConfig"].Outputs.PackValues([]interface{}{bCfg})
require.Nil(t, err)

payload, err := r.abi.Pack("getUpkeepPrivilegeConfig", upkeepId)
require.Nil(t, err)

args := map[string]interface{}{
"to": r.addr.Hex(),
"data": hexutil.Bytes(payload),
}

client.On("CallContext", mock.Anything, mock.AnythingOfType("*hexutil.Bytes"), "eth_call", args, mock.AnythingOfType("*big.Int")).Return(nil).
Run(func(args mock.Arguments) {
b := args.Get(1).(*hexutil.Bytes)
*b = bContractCfg
}).Once()
}
}

state, reason, retryable, allowed, err := r.allowedToUseMercury(nil, upkeepId)
opts := &bind.CallOpts{
BlockNumber: big.NewInt(10),
}

state, reason, retryable, allowed, err := r.allowedToUseMercury(opts, upkeepId)
assert.Equal(t, tt.err, err)
assert.Equal(t, tt.allowed, allowed)
assert.Equal(t, tt.state, state)
Expand Down
Loading

0 comments on commit 50134ee

Please sign in to comment.