diff --git a/core/services/relay/evm/address.go b/core/services/relay/evm/address.go index 0a78270c714..1a3e93ed3ca 100644 --- a/core/services/relay/evm/address.go +++ b/core/services/relay/evm/address.go @@ -5,7 +5,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" - pkgerrors "github.com/pkg/errors" + "github.com/pkg/errors" "github.com/smartcontractkit/libocr/offchainreporting2plus/types" ) @@ -13,10 +13,10 @@ func AccountToAddress(accounts []types.Account) (addresses []common.Address, err for _, signer := range accounts { bytes, err := hexutil.Decode(string(signer)) if err != nil { - return []common.Address{}, pkgerrors.Wrap(err, fmt.Sprintf("given address is not valid %s", signer)) + return []common.Address{}, errors.Wrap(err, fmt.Sprintf("given address is not valid %s", signer)) } if len(bytes) != 20 { - return []common.Address{}, pkgerrors.Errorf("address is not 20 bytes %s", signer) + return []common.Address{}, errors.Errorf("address is not 20 bytes %s", signer) } addresses = append(addresses, common.BytesToAddress(bytes)) } @@ -26,7 +26,7 @@ func AccountToAddress(accounts []types.Account) (addresses []common.Address, err func OnchainPublicKeyToAddress(publicKeys []types.OnchainPublicKey) (addresses []common.Address, err error) { for _, signer := range publicKeys { if len(signer) != 20 { - return []common.Address{}, pkgerrors.Errorf("address is not 20 bytes %s", signer) + return []common.Address{}, errors.Errorf("address is not 20 bytes %s", signer) } addresses = append(addresses, common.BytesToAddress(signer)) } diff --git a/core/services/relay/evm/chain_reader_test.go b/core/services/relay/evm/chain_reader_test.go index 02e9d4e3f6a..64d9f9f1cac 100644 --- a/core/services/relay/evm/chain_reader_test.go +++ b/core/services/relay/evm/chain_reader_test.go @@ -52,19 +52,21 @@ func TestChainReader(t *testing.T) { it := &chainReaderInterfaceTester{} RunChainReaderInterfaceTests(t, it) RunChainReaderInterfaceTests(t, commontestutils.WrapChainReaderTesterForLoop(it)) + t.Run("Dynamically typed topics can be used to filter and have type correct in return", func(t *testing.T) { it.Setup(t) + // bind event before firing it to avoid log poller race + ctx := testutils.Context(t) + cr := it.GetChainReader(t) + require.NoError(t, cr.Bind(ctx, it.GetBindings(t))) + anyString := "foo" tx, err := it.evmTest.LatestValueHolderTransactor.TriggerEventWithDynamicTopic(it.auth, anyString) require.NoError(t, err) it.sim.Commit() it.incNonce() it.awaitTx(t, tx) - ctx := testutils.Context(t) - - cr := it.GetChainReader(t) - require.NoError(t, cr.Bind(ctx, it.GetBindings(t))) input := struct{ Field string }{Field: anyString} tp := cr.(clcommontypes.ContractTypeProvider) @@ -84,20 +86,24 @@ func TestChainReader(t *testing.T) { t.Run("Multiple topics can filter together", func(t *testing.T) { it.Setup(t) + + // bind event before firing it to avoid log poller race + ctx := testutils.Context(t) + cr := it.GetChainReader(t) + require.NoError(t, cr.Bind(ctx, it.GetBindings(t))) + triggerFourTopics(t, it, int32(1), int32(2), int32(3)) triggerFourTopics(t, it, int32(2), int32(2), int32(3)) triggerFourTopics(t, it, int32(1), int32(3), int32(3)) triggerFourTopics(t, it, int32(1), int32(2), int32(4)) - ctx := testutils.Context(t) - cr := it.GetChainReader(t) - require.NoError(t, cr.Bind(ctx, it.GetBindings(t))) var latest struct{ Field1, Field2, Field3 int32 } params := struct{ Field1, Field2, Field3 int32 }{Field1: 1, Field2: 2, Field3: 3} - time.Sleep(it.MaxWaitTimeForEvents()) + require.Eventually(t, func() bool { + return cr.GetLatestValue(ctx, AnyContractName, triggerWithAllTopics, params, &latest) == nil + }, it.MaxWaitTimeForEvents(), time.Millisecond*10) - require.NoError(t, cr.GetLatestValue(ctx, AnyContractName, triggerWithAllTopics, params, &latest)) assert.Equal(t, int32(1), latest.Field1) assert.Equal(t, int32(2), latest.Field2) assert.Equal(t, int32(3), latest.Field3) @@ -257,7 +263,7 @@ func (it *chainReaderInterfaceTester) GetChainReader(t *testing.T) clcommontypes lggr := logger.NullLogger db := pgtest.NewSqlxDB(t) - lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.SimulatedChainID, db, lggr, pgtest.NewQConfig(true)), it.chain.Client(), lggr, time.Millisecond, false, 0, 1, 1, 10000) + lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.SimulatedChainID, db, lggr, pgtest.NewQConfig(true)), it.chain.Client(), lggr, time.Millisecond, false, 0, 1, 1, 10000, 0) require.NoError(t, lp.Start(ctx)) it.chain.On("LogPoller").Return(lp) cr, err := evm.NewChainReaderService(lggr, lp, it.chain, it.chainConfig) diff --git a/core/services/relay/evm/codec_test.go b/core/services/relay/evm/codec_test.go index b13051cb010..2f9a4639d41 100644 --- a/core/services/relay/evm/codec_test.go +++ b/core/services/relay/evm/codec_test.go @@ -50,6 +50,31 @@ func TestCodec(t *testing.T) { }) } +func TestCodec_SimpleEncode(t *testing.T) { + codecName := "my_codec" + input := map[string]any{ + "Report": int32(6), + "Meta": "abcdefg", + } + evmEncoderConfig := `[{"Name":"Report","Type":"int32"},{"Name":"Meta","Type":"string"}]` + + codecConfig := types.CodecConfig{Configs: map[string]types.ChainCodecConfig{ + codecName: {TypeABI: evmEncoderConfig}, + }} + c, err := evm.NewCodec(codecConfig) + require.NoError(t, err) + + result, err := c.Encode(testutils.Context(t), input, codecName) + require.NoError(t, err) + expected := + "0000000000000000000000000000000000000000000000000000000000000006" + // int32(6) + "0000000000000000000000000000000000000000000000000000000000000040" + // total bytes occupied by the string (64) + "0000000000000000000000000000000000000000000000000000000000000007" + // length of the string (7 chars) + "6162636465666700000000000000000000000000000000000000000000000000" // actual string + + require.Equal(t, expected, hexutil.Encode(result)[2:]) +} + type codecInterfaceTester struct{} func (it *codecInterfaceTester) Setup(_ *testing.T) {} diff --git a/core/services/relay/evm/config_poller.go b/core/services/relay/evm/config_poller.go index ef9934f4e10..bb962fc6ed5 100644 --- a/core/services/relay/evm/config_poller.go +++ b/core/services/relay/evm/config_poller.go @@ -5,10 +5,9 @@ import ( "database/sql" "fmt" - "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" - pkgerrors "github.com/pkg/errors" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -34,60 +33,9 @@ var ( ) ) -var ( - // ConfigSet Common to all OCR2 evm based contracts: https://github.com/smartcontractkit/libocr/blob/master/contract2/dev/OCR2Abstract.sol - ConfigSet common.Hash - - defaultABI abi.ABI -) - -const configSetEventName = "ConfigSet" - -func init() { - var err error - abiPointer, err := ocr2aggregator.OCR2AggregatorMetaData.GetAbi() - if err != nil { - panic(err) - } - defaultABI = *abiPointer - ConfigSet = defaultABI.Events[configSetEventName].ID -} - -func unpackLogData(d []byte) (*ocr2aggregator.OCR2AggregatorConfigSet, error) { - unpacked := new(ocr2aggregator.OCR2AggregatorConfigSet) - err := defaultABI.UnpackIntoInterface(unpacked, configSetEventName, d) - if err != nil { - return nil, pkgerrors.Wrap(err, "failed to unpack log data") - } - return unpacked, nil -} - -func configFromLog(logData []byte) (ocrtypes.ContractConfig, error) { - unpacked, err := unpackLogData(logData) - if err != nil { - return ocrtypes.ContractConfig{}, err - } - - var transmitAccounts []ocrtypes.Account - for _, addr := range unpacked.Transmitters { - transmitAccounts = append(transmitAccounts, ocrtypes.Account(addr.Hex())) - } - var signers []ocrtypes.OnchainPublicKey - for _, addr := range unpacked.Signers { - addr := addr - signers = append(signers, addr[:]) - } - - return ocrtypes.ContractConfig{ - ConfigDigest: unpacked.ConfigDigest, - ConfigCount: unpacked.ConfigCount, - Signers: signers, - Transmitters: transmitAccounts, - F: unpacked.F, - OnchainConfig: unpacked.OnchainConfig, - OffchainConfigVersion: unpacked.OffchainConfigVersion, - OffchainConfig: unpacked.OffchainConfig, - }, nil +type LogDecoder interface { + EventSig() common.Hash + Decode(rawLog []byte) (ocrtypes.ContractConfig, error) } type configPoller struct { @@ -105,18 +53,30 @@ type configPoller struct { // contract allows us work around such restrictions. configStoreContractAddr *common.Address configStoreContract *ocrconfigurationstoreevmsimple.OCRConfigurationStoreEVMSimple + + // Depending on the exact contract used, the raw config log may be shaped + // in different ways + ld LogDecoder } func configPollerFilterName(addr common.Address) string { return logpoller.FilterName("OCR2ConfigPoller", addr.String()) } -func NewConfigPoller(lggr logger.Logger, client client.Client, destChainPoller logpoller.LogPoller, aggregatorContractAddr common.Address, configStoreAddr *common.Address) (evmRelayTypes.ConfigPoller, error) { - return newConfigPoller(lggr, client, destChainPoller, aggregatorContractAddr, configStoreAddr) +type CPConfig struct { + Client client.Client + DestinationChainPoller logpoller.LogPoller + AggregatorContractAddress common.Address + ConfigStoreAddress *common.Address + LogDecoder LogDecoder +} + +func NewConfigPoller(lggr logger.Logger, cfg CPConfig) (evmRelayTypes.ConfigPoller, error) { + return newConfigPoller(lggr, cfg.Client, cfg.DestinationChainPoller, cfg.AggregatorContractAddress, cfg.ConfigStoreAddress, cfg.LogDecoder) } -func newConfigPoller(lggr logger.Logger, client client.Client, destChainPoller logpoller.LogPoller, aggregatorContractAddr common.Address, configStoreAddr *common.Address) (*configPoller, error) { - err := destChainPoller.RegisterFilter(logpoller.Filter{Name: configPollerFilterName(aggregatorContractAddr), EventSigs: []common.Hash{ConfigSet}, Addresses: []common.Address{aggregatorContractAddr}}) +func newConfigPoller(lggr logger.Logger, client client.Client, destChainPoller logpoller.LogPoller, aggregatorContractAddr common.Address, configStoreAddr *common.Address, ld LogDecoder) (*configPoller, error) { + err := destChainPoller.RegisterFilter(logpoller.Filter{Name: configPollerFilterName(aggregatorContractAddr), EventSigs: []common.Hash{ld.EventSig()}, Addresses: []common.Address{aggregatorContractAddr}}) if err != nil { return nil, err } @@ -133,6 +93,7 @@ func newConfigPoller(lggr logger.Logger, client client.Client, destChainPoller l aggregatorContractAddr: aggregatorContractAddr, client: client, aggregatorContract: aggregatorContract, + ld: ld, } if configStoreAddr != nil { @@ -164,9 +125,9 @@ func (cp *configPoller) Replay(ctx context.Context, fromBlock int64) error { // LatestConfigDetails returns the latest config details from the logs func (cp *configPoller) LatestConfigDetails(ctx context.Context) (changedInBlock uint64, configDigest ocrtypes.ConfigDigest, err error) { - latest, err := cp.destChainLogPoller.LatestLogByEventSigWithConfs(ConfigSet, cp.aggregatorContractAddr, 1, pg.WithParentCtx(ctx)) + latest, err := cp.destChainLogPoller.LatestLogByEventSigWithConfs(cp.ld.EventSig(), cp.aggregatorContractAddr, 1, pg.WithParentCtx(ctx)) if err != nil { - if pkgerrors.Is(err, sql.ErrNoRows) { + if errors.Is(err, sql.ErrNoRows) { if cp.isConfigStoreAvailable() { // Fallback to RPC call in case logs have been pruned and configStoreContract is available return cp.callLatestConfigDetails(ctx) @@ -176,7 +137,7 @@ func (cp *configPoller) LatestConfigDetails(ctx context.Context) (changedInBlock } return 0, ocrtypes.ConfigDigest{}, err } - latestConfigSet, err := configFromLog(latest.Data) + latestConfigSet, err := cp.ld.Decode(latest.Data) if err != nil { return 0, ocrtypes.ConfigDigest{}, err } @@ -185,7 +146,7 @@ func (cp *configPoller) LatestConfigDetails(ctx context.Context) (changedInBlock // LatestConfig returns the latest config from the logs on a certain block func (cp *configPoller) LatestConfig(ctx context.Context, changedInBlock uint64) (ocrtypes.ContractConfig, error) { - lgs, err := cp.destChainLogPoller.Logs(int64(changedInBlock), int64(changedInBlock), ConfigSet, cp.aggregatorContractAddr, pg.WithParentCtx(ctx)) + lgs, err := cp.destChainLogPoller.Logs(int64(changedInBlock), int64(changedInBlock), cp.ld.EventSig(), cp.aggregatorContractAddr, pg.WithParentCtx(ctx)) if err != nil { return ocrtypes.ContractConfig{}, err } @@ -196,7 +157,7 @@ func (cp *configPoller) LatestConfig(ctx context.Context, changedInBlock uint64) } return ocrtypes.ContractConfig{}, fmt.Errorf("no logs found for config on contract %s (chain %s) at block %d", cp.aggregatorContractAddr.Hex(), cp.client.ConfiguredChainID().String(), changedInBlock) } - latestConfigSet, err := configFromLog(lgs[len(lgs)-1].Data) + latestConfigSet, err := cp.ld.Decode(lgs[len(lgs)-1].Data) if err != nil { return ocrtypes.ContractConfig{}, err } @@ -208,7 +169,7 @@ func (cp *configPoller) LatestConfig(ctx context.Context, changedInBlock uint64) func (cp *configPoller) LatestBlockHeight(ctx context.Context) (blockHeight uint64, err error) { latest, err := cp.destChainLogPoller.LatestBlock(pg.WithParentCtx(ctx)) if err != nil { - if pkgerrors.Is(err, sql.ErrNoRows) { + if errors.Is(err, sql.ErrNoRows) { return 0, nil } return 0, err diff --git a/core/services/relay/evm/config_poller_test.go b/core/services/relay/evm/config_poller_test.go index 28687573760..089db6decd5 100644 --- a/core/services/relay/evm/config_poller_test.go +++ b/core/services/relay/evm/config_poller_test.go @@ -14,7 +14,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/onsi/gomega" - pkgerrors "github.com/pkg/errors" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -28,6 +28,7 @@ import ( ocrtypes2 "github.com/smartcontractkit/libocr/offchainreporting2plus/types" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" evmClientMocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks" @@ -55,6 +56,8 @@ func TestConfigPoller(t *testing.T) { var linkTokenAddress common.Address var accessAddress common.Address + ld := OCR2AggregatorLogDecoder + { key, err := crypto.GenerateKey() require.NoError(t, err) @@ -87,12 +90,12 @@ func TestConfigPoller(t *testing.T) { cfg := pgtest.NewQConfig(false) ethClient = evmclient.NewSimulatedBackendClient(t, b, testutils.SimulatedChainID) lorm := logpoller.NewORM(testutils.SimulatedChainID, db, lggr, cfg) - lp = logpoller.NewLogPoller(lorm, ethClient, lggr, 100*time.Millisecond, false, 1, 2, 2, 1000) + lp = logpoller.NewLogPoller(lorm, ethClient, lggr, 100*time.Millisecond, false, 1, 2, 2, 1000, 0) servicetest.Run(t, lp) } t.Run("LatestConfig errors if there is no config in logs and config store is unconfigured", func(t *testing.T) { - cp, err := NewConfigPoller(lggr, ethClient, lp, ocrAddress, nil) + cp, err := NewConfigPoller(lggr, CPConfig{ethClient, lp, ocrAddress, nil, ld}) require.NoError(t, err) _, err = cp.LatestConfig(testutils.Context(t), 0) @@ -101,7 +104,7 @@ func TestConfigPoller(t *testing.T) { }) t.Run("happy path (with config store)", func(t *testing.T) { - cp, err := NewConfigPoller(lggr, ethClient, lp, ocrAddress, &configStoreContractAddr) + cp, err := NewConfigPoller(lggr, CPConfig{ethClient, lp, ocrAddress, &configStoreContractAddr, ld}) require.NoError(t, err) // Should have no config to begin with. _, configDigest, err := cp.LatestConfigDetails(testutils.Context(t)) @@ -172,7 +175,7 @@ func TestConfigPoller(t *testing.T) { mp.On("LatestLogByEventSigWithConfs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, sql.ErrNoRows) t.Run("if callLatestConfigDetails succeeds", func(t *testing.T) { - cp, err := newConfigPoller(lggr, ethClient, mp, ocrAddress, &configStoreContractAddr) + cp, err := newConfigPoller(lggr, ethClient, mp, ocrAddress, &configStoreContractAddr, ld) require.NoError(t, err) t.Run("when config has not been set, returns zero values", func(t *testing.T) { @@ -208,8 +211,8 @@ func TestConfigPoller(t *testing.T) { t.Run("returns error if callLatestConfigDetails fails", func(t *testing.T) { failingClient := new(evmClientMocks.Client) failingClient.On("ConfiguredChainID").Return(big.NewInt(42)) - failingClient.On("CallContract", mock.Anything, mock.Anything, mock.Anything).Return(nil, pkgerrors.New("something exploded")) - cp, err := newConfigPoller(lggr, failingClient, mp, ocrAddress, &configStoreContractAddr) + failingClient.On("CallContract", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("something exploded")) + cp, err := newConfigPoller(lggr, failingClient, mp, ocrAddress, &configStoreContractAddr, ld) require.NoError(t, err) cp.configStoreContractAddr = &configStoreContractAddr @@ -248,7 +251,7 @@ func TestConfigPoller(t *testing.T) { mp.On("LatestLogByEventSigWithConfs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, sql.ErrNoRows) t.Run("if callReadConfig succeeds", func(t *testing.T) { - cp, err := newConfigPoller(lggr, ethClient, mp, ocrAddress, &configStoreContractAddr) + cp, err := newConfigPoller(lggr, ethClient, mp, ocrAddress, &configStoreContractAddr, ld) require.NoError(t, err) t.Run("when config has not been set, returns error", func(t *testing.T) { @@ -309,8 +312,8 @@ func TestConfigPoller(t *testing.T) { failingClient.On("CallContract", mock.Anything, mock.MatchedBy(func(callArgs ethereum.CallMsg) bool { // initial call to retrieve config store address from aggregator return *callArgs.To == ocrAddress - }), mock.Anything).Return(nil, pkgerrors.New("something exploded")).Once() - cp, err := newConfigPoller(lggr, failingClient, mp, ocrAddress, &configStoreContractAddr) + }), mock.Anything).Return(nil, errors.New("something exploded")).Once() + cp, err := newConfigPoller(lggr, failingClient, mp, ocrAddress, &configStoreContractAddr, ld) require.NoError(t, err) _, err = cp.LatestConfig(testutils.Context(t), 0) diff --git a/core/services/relay/evm/contract_transmitter.go b/core/services/relay/evm/contract_transmitter.go index 767a8f98a6d..76360e34e1a 100644 --- a/core/services/relay/evm/contract_transmitter.go +++ b/core/services/relay/evm/contract_transmitter.go @@ -10,7 +10,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" gethcommon "github.com/ethereum/go-ethereum/common" - pkgerrors "github.com/pkg/errors" + "github.com/pkg/errors" "github.com/smartcontractkit/libocr/offchainreporting2plus/chains/evmutil" ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" @@ -66,7 +66,7 @@ func NewOCRContractTransmitter( ) (*contractTransmitter, error) { transmitted, ok := contractABI.Events["Transmitted"] if !ok { - return nil, pkgerrors.New("invalid ABI, missing transmitted") + return nil, errors.New("invalid ABI, missing transmitted") } err := lp.RegisterFilter(logpoller.Filter{Name: transmitterFilterName(address), EventSigs: []common.Hash{transmitted.ID}, Addresses: []common.Address{address}}) @@ -94,7 +94,7 @@ func (oc *contractTransmitter) Transmit(ctx context.Context, reportCtx ocrtypes. var ss [][32]byte var vs [32]byte if len(signatures) > 32 { - return pkgerrors.New("too many signatures, maximum is 32") + return errors.New("too many signatures, maximum is 32") } for i, as := range signatures { r, s, v, err := evmutil.SplitSignature(as.Signature) @@ -116,10 +116,10 @@ func (oc *contractTransmitter) Transmit(ctx context.Context, reportCtx ocrtypes. payload, err := oc.contractABI.Pack("transmit", rawReportCtx, []byte(report), rs, ss, vs) if err != nil { - return pkgerrors.Wrap(err, "abi.Pack failed") + return errors.Wrap(err, "abi.Pack failed") } - return pkgerrors.Wrap(oc.transmitter.CreateEthTransaction(ctx, oc.contractAddress, payload, txMeta), "failed to send Eth transaction") + return errors.Wrap(oc.transmitter.CreateEthTransaction(ctx, oc.contractAddress, payload, txMeta), "failed to send Eth transaction") } type contractReader interface { @@ -142,7 +142,7 @@ func parseTransmitted(log []byte) ([32]byte, uint32, error) { return [32]byte{}, 0, err } if len(transmitted) < 2 { - return [32]byte{}, 0, pkgerrors.New("transmitted event log has too few arguments") + return [32]byte{}, 0, errors.New("transmitted event log has too few arguments") } configDigest := *abi.ConvertType(transmitted[0], new([32]byte)).(*[32]byte) epoch := *abi.ConvertType(transmitted[1], new(uint32)).(*uint32) @@ -184,7 +184,7 @@ func (oc *contractTransmitter) LatestConfigDigestAndEpoch(ctx context.Context) ( latest, err := oc.lp.LatestLogByEventSigWithConfs( oc.transmittedEventSig, oc.contractAddress, 1, pg.WithParentCtx(ctx)) if err != nil { - if pkgerrors.Is(err, sql.ErrNoRows) { + if errors.Is(err, sql.ErrNoRows) { // No transmissions yet return configDigest, 0, nil } diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index 596f53308b6..a02885cb556 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -17,7 +17,6 @@ import ( "github.com/smartcontractkit/libocr/gethwrappers2/ocr2aggregator" "github.com/smartcontractkit/libocr/offchainreporting2/reportingplugin/median" "github.com/smartcontractkit/libocr/offchainreporting2/reportingplugin/median/evmreportcodec" - "github.com/smartcontractkit/libocr/offchainreporting2plus/chains/evmutil" ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" "github.com/smartcontractkit/chainlink-common/pkg/services" @@ -29,6 +28,9 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" + "github.com/smartcontractkit/chainlink/v2/core/services/llo" + "github.com/smartcontractkit/chainlink/v2/core/services/llo/bm" + lloconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/llo/config" mercuryconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/mercury/config" "github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon" "github.com/smartcontractkit/chainlink/v2/core/services/pg" @@ -42,6 +44,28 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" ) +var ( + OCR2AggregatorTransmissionContractABI abi.ABI + OCR2AggregatorLogDecoder LogDecoder + ChannelVerifierLogDecoder LogDecoder +) + +func init() { + var err error + OCR2AggregatorTransmissionContractABI, err = abi.JSON(strings.NewReader(ocr2aggregator.OCR2AggregatorMetaData.ABI)) + if err != nil { + panic(err) + } + OCR2AggregatorLogDecoder, err = newOCR2AggregatorLogDecoder() + if err != nil { + panic(err) + } + ChannelVerifierLogDecoder, err = newChannelVerifierLogDecoder() + if err != nil { + panic(err) + } +} + var _ commontypes.Relayer = &Relayer{} //nolint:staticcheck type Relayer struct { @@ -53,6 +77,10 @@ type Relayer struct { pgCfg pg.QConfig chainReader commontypes.ChainReader codec commontypes.Codec + + // LLO/data streams + cdcFactory llo.ChannelDefinitionCacheFactory + orm llo.ORM } type CSAETHKeystore interface { @@ -91,6 +119,9 @@ func NewRelayer(lggr logger.Logger, chain legacyevm.Chain, opts RelayerOpts) (*R return nil, fmt.Errorf("cannot create evm relayer: %w", err) } lggr = lggr.Named("Relayer") + + orm := llo.NewORM(pg.NewQ(opts.DB, lggr, opts.QConfig), chain.ID()) + cdcFactory := llo.NewChannelDefinitionCacheFactory(lggr, orm, chain.LogPoller()) return &Relayer{ db: opts.DB, chain: chain, @@ -98,6 +129,8 @@ func NewRelayer(lggr logger.Logger, chain legacyevm.Chain, opts RelayerOpts) (*R ks: opts.CSAETHKeystore, mercuryPool: opts.MercuryPool, pgCfg: opts.QConfig, + cdcFactory: cdcFactory, + orm: orm, }, nil } @@ -125,6 +158,32 @@ func (r *Relayer) HealthReport() (report map[string]error) { return } +func (r *Relayer) NewPluginProvider(rargs commontypes.RelayArgs, pargs commontypes.PluginArgs) (commontypes.PluginProvider, error) { + + // TODO https://smartcontract-it.atlassian.net/browse/BCF-2887 + ctx := context.Background() + + lggr := r.lggr.Named("PluginProvider").Named(rargs.ExternalJobID.String()) + + configWatcher, err := newStandardConfigProvider(r.lggr, r.chain, types.NewRelayOpts(rargs)) + if err != nil { + return nil, err + } + + transmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, pargs.TransmitterID, r.ks.Eth(), configWatcher, configTransmitterOpts{}, OCR2AggregatorTransmissionContractABI) + if err != nil { + return nil, err + } + + return NewPluginProvider( + r.chainReader, + r.codec, + transmitter, + configWatcher, + lggr, + ), nil +} + func (r *Relayer) NewMercuryProvider(rargs commontypes.RelayArgs, pargs commontypes.PluginArgs) (commontypes.MercuryProvider, error) { lggr := r.lggr.Named("MercuryProvider").Named(rargs.ExternalJobID.String()) relayOpts := types.NewRelayOpts(rargs) @@ -146,7 +205,7 @@ func (r *Relayer) NewMercuryProvider(rargs commontypes.RelayArgs, pargs commonty if relayConfig.ChainID.String() != r.chain.ID().String() { return nil, fmt.Errorf("internal error: chain id in spec does not match this relayer's chain: have %s expected %s", relayConfig.ChainID.String(), r.chain.ID().String()) } - cw, err := newConfigProvider(lggr, r.chain, relayOpts) + cp, err := newMercuryConfigProvider(lggr, r.chain, relayOpts) if err != nil { return nil, pkgerrors.WithStack(err) } @@ -182,22 +241,80 @@ func (r *Relayer) NewMercuryProvider(rargs commontypes.RelayArgs, pargs commonty default: return nil, fmt.Errorf("invalid feed version %d", feedID.Version()) } - transmitter := mercury.NewTransmitter(lggr, cw.ContractConfigTracker(), client, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.db, r.pgCfg, transmitterCodec) + transmitter := mercury.NewTransmitter(lggr, client, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.db, r.pgCfg, transmitterCodec) - return NewMercuryProvider(cw, r.chainReader, r.codec, NewMercuryChainReader(r.chain.HeadTracker()), transmitter, reportCodecV1, reportCodecV2, reportCodecV3, lggr), nil + return NewMercuryProvider(cp, r.chainReader, r.codec, NewMercuryChainReader(r.chain.HeadTracker()), transmitter, reportCodecV1, reportCodecV2, reportCodecV3, lggr), nil } func (r *Relayer) NewLLOProvider(rargs commontypes.RelayArgs, pargs commontypes.PluginArgs) (commontypes.LLOProvider, error) { - return nil, errors.New("not implemented") + relayOpts := types.NewRelayOpts(rargs) + var relayConfig types.RelayConfig + { + var err error + relayConfig, err = relayOpts.RelayConfig() + if err != nil { + return nil, fmt.Errorf("failed to get relay config: %w", err) + } + } + + var lloCfg lloconfig.PluginConfig + if err := json.Unmarshal(pargs.PluginConfig, &lloCfg); err != nil { + return nil, pkgerrors.WithStack(err) + } + if err := lloCfg.Validate(); err != nil { + return nil, err + } + + if relayConfig.ChainID.String() != r.chain.ID().String() { + return nil, fmt.Errorf("internal error: chain id in spec does not match this relayer's chain: have %s expected %s", relayConfig.ChainID.String(), r.chain.ID().String()) + } + cp, err := newLLOConfigProvider(r.lggr, r.chain, relayOpts) + if err != nil { + return nil, pkgerrors.WithStack(err) + } + + if !relayConfig.EffectiveTransmitterID.Valid { + return nil, pkgerrors.New("EffectiveTransmitterID must be specified") + } + privKey, err := r.ks.CSA().Get(relayConfig.EffectiveTransmitterID.String) + if err != nil { + return nil, pkgerrors.Wrap(err, "failed to get CSA key for mercury connection") + } + + // FIXME: Remove after benchmarking is done + // https://smartcontract-it.atlassian.net/browse/MERC-3487 + var transmitter llo.Transmitter + if lloCfg.BenchmarkMode { + r.lggr.Info("Benchmark mode enabled, using dummy transmitter. NOTE: THIS WILL NOT TRANSMIT ANYTHING") + transmitter = bm.NewTransmitter(r.lggr, privKey.PublicKey) + } else { + var client wsrpc.Client + client, err = r.mercuryPool.Checkout(context.Background(), privKey, lloCfg.ServerPubKey, lloCfg.ServerURL()) + if err != nil { + return nil, err + } + transmitter = llo.NewTransmitter(r.lggr, client, privKey.PublicKey) + } + + cdc, err := r.cdcFactory.NewCache(lloCfg) + if err != nil { + return nil, err + } + return NewLLOProvider(cp, transmitter, r.lggr, cdc), nil } func (r *Relayer) NewFunctionsProvider(rargs commontypes.RelayArgs, pargs commontypes.PluginArgs) (commontypes.FunctionsProvider, error) { + + // TODO https://smartcontract-it.atlassian.net/browse/BCF-2887 + ctx := context.Background() + lggr := r.lggr.Named("FunctionsProvider").Named(rargs.ExternalJobID.String()) // TODO(FUN-668): Not ready yet (doesn't implement FunctionsEvents() properly) - return NewFunctionsProvider(r.chain, rargs, pargs, lggr, r.ks.Eth(), functions.FunctionsPlugin) + return NewFunctionsProvider(ctx, r.chain, rargs, pargs, lggr, r.ks.Eth(), functions.FunctionsPlugin) } -func (r *Relayer) NewConfigProvider(args commontypes.RelayArgs) (commontypes.ConfigProvider, error) { +// NewConfigProvider is called by bootstrap jobs +func (r *Relayer) NewConfigProvider(args commontypes.RelayArgs) (configProvider commontypes.ConfigProvider, err error) { lggr := r.lggr.Named("ConfigProvider").Named(args.ExternalJobID.String()) relayOpts := types.NewRelayOpts(args) relayConfig, err := relayOpts.RelayConfig() @@ -209,7 +326,27 @@ func (r *Relayer) NewConfigProvider(args commontypes.RelayArgs) (commontypes.Con return nil, fmt.Errorf("internal error: chain id in spec does not match this relayer's chain: have %s expected %s", relayConfig.ChainID.String(), r.chain.ID().String()) } - configProvider, err := newConfigProvider(lggr, r.chain, relayOpts) + // Handle legacy jobs which did not yet specify provider type and + // switched between median/mercury based on presence of feed ID + if args.ProviderType == "" { + if relayConfig.FeedID == nil { + args.ProviderType = "median" + } else { + args.ProviderType = "mercury" + } + } + + switch args.ProviderType { + case "median": + configProvider, err = newStandardConfigProvider(lggr, r.chain, relayOpts) + case "mercury": + configProvider, err = newMercuryConfigProvider(lggr, r.chain, relayOpts) + case "llo": + configProvider, err = newLLOConfigProvider(lggr, r.chain, relayOpts) + default: + return nil, fmt.Errorf("unrecognized provider type: %q", args.ProviderType) + } + if err != nil { // Never return (*configProvider)(nil) return nil, err @@ -239,7 +376,6 @@ type configWatcher struct { services.StateMachine lggr logger.Logger contractAddress common.Address - contractABI abi.ABI offchainDigester ocrtypes.OffchainConfigDigester configPoller types.ConfigPoller chain legacyevm.Chain @@ -252,7 +388,6 @@ type configWatcher struct { func newConfigWatcher(lggr logger.Logger, contractAddress common.Address, - contractABI abi.ABI, offchainDigester ocrtypes.OffchainConfigDigester, configPoller types.ConfigPoller, chain legacyevm.Chain, @@ -263,7 +398,6 @@ func newConfigWatcher(lggr logger.Logger, return &configWatcher{ lggr: lggr.Named("ConfigWatcher").Named(contractAddress.String()), contractAddress: contractAddress, - contractABI: contractABI, offchainDigester: offchainDigester, configPoller: configPoller, chain: chain, @@ -319,63 +453,12 @@ func (c *configWatcher) ContractConfigTracker() ocrtypes.ContractConfigTracker { return c.configPoller } -func newConfigProvider(lggr logger.Logger, chain legacyevm.Chain, opts *types.RelayOpts) (*configWatcher, error) { - if !common.IsHexAddress(opts.ContractID) { - return nil, pkgerrors.Errorf("invalid contractID, expected hex address") - } - - aggregatorAddress := common.HexToAddress(opts.ContractID) - contractABI, err := abi.JSON(strings.NewReader(ocr2aggregator.OCR2AggregatorMetaData.ABI)) - if err != nil { - return nil, pkgerrors.Wrap(err, "could not get contract ABI JSON") - } - var cp types.ConfigPoller - - relayConfig, err := opts.RelayConfig() - if err != nil { - return nil, fmt.Errorf("failed to get relay config: %w", err) - } - if relayConfig.FeedID != nil { - cp, err = mercury.NewConfigPoller( - lggr.Named(relayConfig.FeedID.String()), - chain.LogPoller(), - aggregatorAddress, - *relayConfig.FeedID, - // TODO: Does mercury need to support config contract? DF-19182 - ) - } else { - cp, err = NewConfigPoller( - lggr, - chain.Client(), - chain.LogPoller(), - aggregatorAddress, - relayConfig.ConfigContractAddress, - ) - } - if err != nil { - return nil, err - } - - var offchainConfigDigester ocrtypes.OffchainConfigDigester - if relayConfig.FeedID != nil { - // Mercury - offchainConfigDigester = mercury.NewOffchainConfigDigester(*relayConfig.FeedID, chain.Config().EVM().ChainID(), aggregatorAddress) - } else { - // Non-mercury - offchainConfigDigester = evmutil.EVMOffchainConfigDigester{ - ChainID: chain.Config().EVM().ChainID().Uint64(), - ContractAddress: aggregatorAddress, - } - } - return newConfigWatcher(lggr, aggregatorAddress, contractABI, offchainConfigDigester, cp, chain, relayConfig.FromBlock, opts.New), nil -} - type configTransmitterOpts struct { // override the gas limit default provided in the config watcher pluginGasLimit *uint32 } -func newContractTransmitter(lggr logger.Logger, rargs commontypes.RelayArgs, transmitterID string, ethKeystore keystore.Eth, configWatcher *configWatcher, opts configTransmitterOpts) (*contractTransmitter, error) { +func newOnChainContractTransmitter(ctx context.Context, lggr logger.Logger, rargs commontypes.RelayArgs, transmitterID string, ethKeystore keystore.Eth, configWatcher *configWatcher, opts configTransmitterOpts, transmissionContractABI abi.ABI) (*contractTransmitter, error) { var relayConfig types.RelayConfig if err := json.Unmarshal(rargs.RelayConfig, &relayConfig); err != nil { return nil, err @@ -398,7 +481,7 @@ func newContractTransmitter(lggr logger.Logger, rargs commontypes.RelayArgs, tra if sendingKeysLength > 1 && s == effectiveTransmitterAddress.String() { return nil, pkgerrors.New("the transmitter is a local sending key with transaction forwarding enabled") } - if err := ethKeystore.CheckEnabled(common.HexToAddress(s), configWatcher.chain.Config().EVM().ChainID()); err != nil { + if err := ethKeystore.CheckEnabled(ctx, common.HexToAddress(s), configWatcher.chain.Config().EVM().ChainID()); err != nil { return nil, pkgerrors.Wrap(err, "one of the sending keys given is not enabled") } fromAddresses = append(fromAddresses, common.HexToAddress(s)) @@ -439,7 +522,7 @@ func newContractTransmitter(lggr logger.Logger, rargs commontypes.RelayArgs, tra return NewOCRContractTransmitter( configWatcher.contractAddress, configWatcher.chain.Client(), - configWatcher.contractABI, + transmissionContractABI, transmitter, configWatcher.chain.LogPoller(), lggr, @@ -448,6 +531,9 @@ func newContractTransmitter(lggr logger.Logger, rargs commontypes.RelayArgs, tra } func (r *Relayer) NewMedianProvider(rargs commontypes.RelayArgs, pargs commontypes.PluginArgs) (commontypes.MedianProvider, error) { + // TODO https://smartcontract-it.atlassian.net/browse/BCF-2887 + ctx := context.Background() + lggr := r.lggr.Named("MedianProvider").Named(rargs.ExternalJobID.String()) relayOpts := types.NewRelayOpts(rargs) relayConfig, err := relayOpts.RelayConfig() @@ -463,13 +549,14 @@ func (r *Relayer) NewMedianProvider(rargs commontypes.RelayArgs, pargs commontyp } contractID := common.HexToAddress(relayOpts.ContractID) - configWatcher, err := newConfigProvider(lggr, r.chain, relayOpts) + configWatcher, err := newStandardConfigProvider(lggr, r.chain, relayOpts) if err != nil { return nil, err } reportCodec := evmreportcodec.ReportCodec{} - contractTransmitter, err := newContractTransmitter(lggr, rargs, pargs.TransmitterID, r.ks.Eth(), configWatcher, configTransmitterOpts{}) + + contractTransmitter, err := newOnChainContractTransmitter(ctx, lggr, rargs, pargs.TransmitterID, r.ks.Eth(), configWatcher, configTransmitterOpts{}, OCR2AggregatorTransmissionContractABI) if err != nil { return nil, err } diff --git a/core/services/relay/evm/functions.go b/core/services/relay/evm/functions.go index d11ab274c06..c10134f3acc 100644 --- a/core/services/relay/evm/functions.go +++ b/core/services/relay/evm/functions.go @@ -4,13 +4,9 @@ import ( "context" "encoding/json" "fmt" - "strings" - "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" - pkgerrors "github.com/pkg/errors" - - "github.com/smartcontractkit/libocr/gethwrappers2/ocr2aggregator" + "github.com/pkg/errors" "go.uber.org/multierr" @@ -94,7 +90,7 @@ func (p *functionsProvider) Codec() commontypes.Codec { return nil } -func NewFunctionsProvider(chain legacyevm.Chain, rargs commontypes.RelayArgs, pargs commontypes.PluginArgs, lggr logger.Logger, ethKeystore keystore.Eth, pluginType functionsRelay.FunctionsPluginType) (evmRelayTypes.FunctionsProvider, error) { +func NewFunctionsProvider(ctx context.Context, chain legacyevm.Chain, rargs commontypes.RelayArgs, pargs commontypes.PluginArgs, lggr logger.Logger, ethKeystore keystore.Eth, pluginType functionsRelay.FunctionsPluginType) (evmRelayTypes.FunctionsProvider, error) { relayOpts := evmRelayTypes.NewRelayOpts(rargs) relayConfig, err := relayOpts.RelayConfig() if err != nil { @@ -108,7 +104,7 @@ func NewFunctionsProvider(chain legacyevm.Chain, rargs commontypes.RelayArgs, pa return nil, err } if !common.IsHexAddress(rargs.ContractID) { - return nil, pkgerrors.Errorf("invalid contractID, expected hex address") + return nil, errors.Errorf("invalid contractID, expected hex address") } var pluginConfig config.PluginConfig if err2 := json.Unmarshal(pargs.PluginConfig, &pluginConfig); err2 != nil { @@ -125,7 +121,7 @@ func NewFunctionsProvider(chain legacyevm.Chain, rargs commontypes.RelayArgs, pa } var contractTransmitter ContractTransmitter if relayConfig.SendingKeys != nil { - contractTransmitter, err = newFunctionsContractTransmitter(pluginConfig.ContractVersion, rargs, pargs.TransmitterID, configWatcher, ethKeystore, logPollerWrapper, lggr) + contractTransmitter, err = newFunctionsContractTransmitter(ctx, pluginConfig.ContractVersion, rargs, pargs.TransmitterID, configWatcher, ethKeystore, logPollerWrapper, lggr) if err != nil { return nil, err } @@ -141,14 +137,10 @@ func NewFunctionsProvider(chain legacyevm.Chain, rargs commontypes.RelayArgs, pa func newFunctionsConfigProvider(pluginType functionsRelay.FunctionsPluginType, chain legacyevm.Chain, args commontypes.RelayArgs, fromBlock uint64, logPollerWrapper evmRelayTypes.LogPollerWrapper, lggr logger.Logger) (*configWatcher, error) { if !common.IsHexAddress(args.ContractID) { - return nil, pkgerrors.Errorf("invalid contractID, expected hex address") + return nil, errors.Errorf("invalid contractID, expected hex address") } routerContractAddress := common.HexToAddress(args.ContractID) - contractABI, err := abi.JSON(strings.NewReader(ocr2aggregator.OCR2AggregatorMetaData.ABI)) - if err != nil { - return nil, pkgerrors.Wrap(err, "could not get contract ABI JSON") - } cp, err := functionsRelay.NewFunctionsConfigPoller(pluginType, chain.LogPoller(), lggr) if err != nil { @@ -159,10 +151,10 @@ func newFunctionsConfigProvider(pluginType functionsRelay.FunctionsPluginType, c offchainConfigDigester := functionsRelay.NewFunctionsOffchainConfigDigester(pluginType, chain.ID().Uint64()) logPollerWrapper.SubscribeToUpdates("FunctionsOffchainConfigDigester", offchainConfigDigester) - return newConfigWatcher(lggr, routerContractAddress, contractABI, offchainConfigDigester, cp, chain, fromBlock, args.New), nil + return newConfigWatcher(lggr, routerContractAddress, offchainConfigDigester, cp, chain, fromBlock, args.New), nil } -func newFunctionsContractTransmitter(contractVersion uint32, rargs commontypes.RelayArgs, transmitterID string, configWatcher *configWatcher, ethKeystore keystore.Eth, logPollerWrapper evmRelayTypes.LogPollerWrapper, lggr logger.Logger) (ContractTransmitter, error) { +func newFunctionsContractTransmitter(ctx context.Context, contractVersion uint32, rargs commontypes.RelayArgs, transmitterID string, configWatcher *configWatcher, ethKeystore keystore.Eth, logPollerWrapper evmRelayTypes.LogPollerWrapper, lggr logger.Logger) (ContractTransmitter, error) { var relayConfig evmRelayTypes.RelayConfig if err := json.Unmarshal(rargs.RelayConfig, &relayConfig); err != nil { return nil, err @@ -170,23 +162,23 @@ func newFunctionsContractTransmitter(contractVersion uint32, rargs commontypes.R var fromAddresses []common.Address sendingKeys := relayConfig.SendingKeys if !relayConfig.EffectiveTransmitterID.Valid { - return nil, pkgerrors.New("EffectiveTransmitterID must be specified") + return nil, errors.New("EffectiveTransmitterID must be specified") } effectiveTransmitterAddress := common.HexToAddress(relayConfig.EffectiveTransmitterID.String) sendingKeysLength := len(sendingKeys) if sendingKeysLength == 0 { - return nil, pkgerrors.New("no sending keys provided") + return nil, errors.New("no sending keys provided") } // If we are using multiple sending keys, then a forwarder is needed to rotate transmissions. // Ensure that this forwarder is not set to a local sending key, and ensure our sending keys are enabled. for _, s := range sendingKeys { if sendingKeysLength > 1 && s == effectiveTransmitterAddress.String() { - return nil, pkgerrors.New("the transmitter is a local sending key with transaction forwarding enabled") + return nil, errors.New("the transmitter is a local sending key with transaction forwarding enabled") } - if err := ethKeystore.CheckEnabled(common.HexToAddress(s), configWatcher.chain.Config().EVM().ChainID()); err != nil { - return nil, pkgerrors.Wrap(err, "one of the sending keys given is not enabled") + if err := ethKeystore.CheckEnabled(ctx, common.HexToAddress(s), configWatcher.chain.Config().EVM().ChainID()); err != nil { + return nil, errors.Wrap(err, "one of the sending keys given is not enabled") } fromAddresses = append(fromAddresses, common.HexToAddress(s)) } @@ -217,12 +209,12 @@ func newFunctionsContractTransmitter(contractVersion uint32, rargs commontypes.R ) if err != nil { - return nil, pkgerrors.Wrap(err, "failed to create transmitter") + return nil, errors.Wrap(err, "failed to create transmitter") } functionsTransmitter, err := functionsRelay.NewFunctionsContractTransmitter( configWatcher.chain.Client(), - configWatcher.contractABI, + OCR2AggregatorTransmissionContractABI, transmitter, configWatcher.chain.LogPoller(), lggr, diff --git a/core/services/relay/evm/functions/config_poller.go b/core/services/relay/evm/functions/config_poller.go index a1975171fd7..7a59d499898 100644 --- a/core/services/relay/evm/functions/config_poller.go +++ b/core/services/relay/evm/functions/config_poller.go @@ -8,7 +8,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" - pkgerrors "github.com/pkg/errors" + "github.com/pkg/errors" "github.com/smartcontractkit/libocr/gethwrappers2/ocr2aggregator" ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2/types" @@ -58,7 +58,7 @@ func unpackLogData(d []byte) (*ocr2aggregator.OCR2AggregatorConfigSet, error) { unpacked := new(ocr2aggregator.OCR2AggregatorConfigSet) err := defaultABI.UnpackIntoInterface(unpacked, configSetEventName, d) if err != nil { - return nil, pkgerrors.Wrap(err, "failed to unpack log data") + return nil, errors.Wrap(err, "failed to unpack log data") } return unpacked, nil } @@ -88,7 +88,7 @@ func configFromLog(logData []byte, pluginType FunctionsPluginType) (ocrtypes.Con case S4Plugin: binary.BigEndian.PutUint16(unpacked.ConfigDigest[:2], uint16(S4DigestPrefix)) default: - return ocrtypes.ContractConfig{}, pkgerrors.New("unknown plugin type") + return ocrtypes.ContractConfig{}, errors.New("unknown plugin type") } return ocrtypes.ContractConfig{ @@ -138,7 +138,7 @@ func (cp *configPoller) LatestConfigDetails(ctx context.Context) (changedInBlock latest, err := cp.destChainLogPoller.LatestLogByEventSigWithConfs(ConfigSet, *contractAddr, 1, pg.WithParentCtx(ctx)) if err != nil { - if pkgerrors.Is(err, sql.ErrNoRows) { + if errors.Is(err, sql.ErrNoRows) { return 0, ocrtypes.ConfigDigest{}, nil } return 0, ocrtypes.ConfigDigest{}, err @@ -155,7 +155,7 @@ func (cp *configPoller) LatestConfig(ctx context.Context, changedInBlock uint64) // (unlikely), we'll return an error here and libocr will re-try. contractAddr := cp.targetContract.Load() if contractAddr == nil { - return ocrtypes.ContractConfig{}, pkgerrors.New("no target contract address set yet") + return ocrtypes.ContractConfig{}, errors.New("no target contract address set yet") } lgs, err := cp.destChainLogPoller.Logs(int64(changedInBlock), int64(changedInBlock), ConfigSet, *contractAddr, pg.WithParentCtx(ctx)) @@ -163,7 +163,7 @@ func (cp *configPoller) LatestConfig(ctx context.Context, changedInBlock uint64) return ocrtypes.ContractConfig{}, err } if len(lgs) == 0 { - return ocrtypes.ContractConfig{}, pkgerrors.New("no logs found") + return ocrtypes.ContractConfig{}, errors.New("no logs found") } latestConfigSet, err := configFromLog(lgs[len(lgs)-1].Data, cp.pluginType) if err != nil { @@ -176,7 +176,7 @@ func (cp *configPoller) LatestConfig(ctx context.Context, changedInBlock uint64) func (cp *configPoller) LatestBlockHeight(ctx context.Context) (blockHeight uint64, err error) { latest, err := cp.destChainLogPoller.LatestBlock(pg.WithParentCtx(ctx)) if err != nil { - if pkgerrors.Is(err, sql.ErrNoRows) { + if errors.Is(err, sql.ErrNoRows) { return 0, nil } return 0, err diff --git a/core/services/relay/evm/functions/config_poller_test.go b/core/services/relay/evm/functions/config_poller_test.go index 2cf373d2e86..6a8c682a81b 100644 --- a/core/services/relay/evm/functions/config_poller_test.go +++ b/core/services/relay/evm/functions/config_poller_test.go @@ -81,7 +81,7 @@ func runTest(t *testing.T, pluginType functions.FunctionsPluginType, expectedDig defer ethClient.Close() lggr := logger.TestLogger(t) lorm := logpoller.NewORM(big.NewInt(1337), db, lggr, cfg) - lp := logpoller.NewLogPoller(lorm, ethClient, lggr, 100*time.Millisecond, false, 1, 2, 2, 1000) + lp := logpoller.NewLogPoller(lorm, ethClient, lggr, 100*time.Millisecond, false, 1, 2, 2, 1000, 0) servicetest.Run(t, lp) configPoller, err := functions.NewFunctionsConfigPoller(pluginType, lp, lggr) require.NoError(t, err) diff --git a/core/services/relay/evm/functions/contract_transmitter.go b/core/services/relay/evm/functions/contract_transmitter.go index 672179c2506..78a5ff39bb7 100644 --- a/core/services/relay/evm/functions/contract_transmitter.go +++ b/core/services/relay/evm/functions/contract_transmitter.go @@ -12,7 +12,7 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" - pkgerrors "github.com/pkg/errors" + "github.com/pkg/errors" "github.com/smartcontractkit/libocr/offchainreporting2plus/chains/evmutil" ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" @@ -73,7 +73,7 @@ func NewFunctionsContractTransmitter( ) (*contractTransmitter, error) { transmitted, ok := contractABI.Events["Transmitted"] if !ok { - return nil, pkgerrors.New("invalid ABI, missing transmitted") + return nil, errors.New("invalid ABI, missing transmitted") } if contractVersion != 1 { @@ -106,7 +106,7 @@ func (oc *contractTransmitter) Transmit(ctx context.Context, reportCtx ocrtypes. var ss [][32]byte var vs [32]byte if len(signatures) > 32 { - return pkgerrors.New("too many signatures, maximum is 32") + return errors.New("too many signatures, maximum is 32") } for i, as := range signatures { r, s, v, err := evmutil.SplitSignature(as.Signature) @@ -125,21 +125,22 @@ func (oc *contractTransmitter) Transmit(ctx context.Context, reportCtx ocrtypes. } var destinationContract common.Address - if oc.contractVersion == 1 { + switch oc.contractVersion { + case 1: oc.lggr.Debugw("FunctionsContractTransmitter: start", "reportLenBytes", len(report)) requests, err2 := oc.reportCodec.DecodeReport(report) if err2 != nil { - return pkgerrors.Wrap(err2, "FunctionsContractTransmitter: DecodeReport failed") + return errors.Wrap(err2, "FunctionsContractTransmitter: DecodeReport failed") } if len(requests) == 0 { - return pkgerrors.New("FunctionsContractTransmitter: no requests in report") + return errors.New("FunctionsContractTransmitter: no requests in report") } if len(requests[0].CoordinatorContract) != common.AddressLength { return fmt.Errorf("FunctionsContractTransmitter: incorrect length of CoordinatorContract field: %d", len(requests[0].CoordinatorContract)) } destinationContract.SetBytes(requests[0].CoordinatorContract) if destinationContract == (common.Address{}) { - return pkgerrors.New("FunctionsContractTransmitter: destination coordinator contract is zero") + return errors.New("FunctionsContractTransmitter: destination coordinator contract is zero") } // Sanity check - every report should contain requests with the same coordinator contract. for _, req := range requests[1:] { @@ -152,16 +153,16 @@ func (oc *contractTransmitter) Transmit(ctx context.Context, reportCtx ocrtypes. } } oc.lggr.Debugw("FunctionsContractTransmitter: ready", "nRequests", len(requests), "coordinatorContract", destinationContract.Hex()) - } else { + default: return fmt.Errorf("unsupported contract version: %d", oc.contractVersion) } payload, err := oc.contractABI.Pack("transmit", rawReportCtx, []byte(report), rs, ss, vs) if err != nil { - return pkgerrors.Wrap(err, "abi.Pack failed") + return errors.Wrap(err, "abi.Pack failed") } oc.lggr.Debugw("FunctionsContractTransmitter: transmitting report", "contractAddress", destinationContract, "txMeta", txMeta, "payloadSize", len(payload)) - return pkgerrors.Wrap(oc.transmitter.CreateEthTransaction(ctx, destinationContract, payload, txMeta), "failed to send Eth transaction") + return errors.Wrap(oc.transmitter.CreateEthTransaction(ctx, destinationContract, payload, txMeta), "failed to send Eth transaction") } type contractReader interface { @@ -184,7 +185,7 @@ func parseTransmitted(log []byte) ([32]byte, uint32, error) { return [32]byte{}, 0, err } if len(transmitted) < 2 { - return [32]byte{}, 0, pkgerrors.New("transmitted event log has too few arguments") + return [32]byte{}, 0, errors.New("transmitted event log has too few arguments") } configDigest := *abi.ConvertType(transmitted[0], new([32]byte)).(*[32]byte) epoch := *abi.ConvertType(transmitted[1], new(uint32)).(*uint32) @@ -209,7 +210,7 @@ func callContract(ctx context.Context, addr common.Address, contractABI abi.ABI, func (oc *contractTransmitter) LatestConfigDigestAndEpoch(ctx context.Context) (ocrtypes.ConfigDigest, uint32, error) { contractAddr := oc.contractAddress.Load() if contractAddr == nil { - return ocrtypes.ConfigDigest{}, 0, pkgerrors.New("destination contract address not set") + return ocrtypes.ConfigDigest{}, 0, errors.New("destination contract address not set") } latestConfigDigestAndEpoch, err := callContract(ctx, *contractAddr, oc.contractABI, "latestConfigDigestAndEpoch", nil, oc.contractReader) if err != nil { @@ -230,7 +231,7 @@ func (oc *contractTransmitter) LatestConfigDigestAndEpoch(ctx context.Context) ( latest, err := oc.lp.LatestLogByEventSigWithConfs( oc.transmittedEventSig, *contractAddr, 1, pg.WithParentCtx(ctx)) if err != nil { - if pkgerrors.Is(err, sql.ErrNoRows) { + if errors.Is(err, sql.ErrNoRows) { // No transmissions yet return configDigest, 0, nil } diff --git a/core/services/relay/evm/functions/logpoller_wrapper.go b/core/services/relay/evm/functions/logpoller_wrapper.go index a9da88deedc..f11b6bee1e0 100644 --- a/core/services/relay/evm/functions/logpoller_wrapper.go +++ b/core/services/relay/evm/functions/logpoller_wrapper.go @@ -8,7 +8,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" - pkgerrors "github.com/pkg/errors" + "github.com/pkg/errors" "github.com/smartcontractkit/chainlink-common/pkg/services" @@ -92,7 +92,7 @@ func NewLogPollerWrapper(routerContractAddress common.Address, pluginConfig conf } if blockOffset >= pastBlocksToPoll || requestBlockOffset >= pastBlocksToPoll || responseBlockOffset >= pastBlocksToPoll { lggr.Errorw("invalid config: number of required confirmation blocks >= pastBlocksToPoll", "pastBlocksToPoll", pastBlocksToPoll, "minIncomingConfirmations", pluginConfig.MinIncomingConfirmations, "minRequestConfirmations", pluginConfig.MinRequestConfirmations, "minResponseConfirmations", pluginConfig.MinResponseConfirmations) - return nil, pkgerrors.Errorf("invalid config: number of required confirmation blocks >= pastBlocksToPoll") + return nil, errors.Errorf("invalid config: number of required confirmation blocks >= pastBlocksToPoll") } return &logPollerWrapper{ @@ -118,7 +118,7 @@ func (l *logPollerWrapper) Start(context.Context) error { l.mu.Lock() defer l.mu.Unlock() if l.pluginConfig.ContractVersion != 1 { - return pkgerrors.New("only contract version 1 is supported") + return errors.New("only contract version 1 is supported") } l.closeWait.Add(1) go l.checkForRouteUpdates() @@ -168,7 +168,7 @@ func (l *logPollerWrapper) LatestEvents() ([]evmRelayTypes.OracleRequest, []evmR resultsResp := []evmRelayTypes.OracleResponse{} if len(coordinators) == 0 { l.lggr.Debug("LatestEvents: no non-zero coordinators to check") - return resultsReq, resultsResp, pkgerrors.New("no non-zero coordinators to check") + return resultsReq, resultsResp, errors.New("no non-zero coordinators to check") } for _, coordinator := range coordinators { @@ -304,12 +304,11 @@ func (l *logPollerWrapper) filterPreviouslyDetectedEvents(logs []logpoller.Log, expiredRequests := 0 for _, detectedEvent := range detectedEvents.detectedEventsOrdered { expirationTime := time.Now().Add(-time.Second * time.Duration(l.logPollerCacheDurationSec)) - if detectedEvent.timeDetected.Before(expirationTime) { - delete(detectedEvents.isPreviouslyDetected, detectedEvent.requestId) - expiredRequests++ - } else { + if !detectedEvent.timeDetected.Before(expirationTime) { break } + delete(detectedEvents.isPreviouslyDetected, detectedEvent.requestId) + expiredRequests++ } detectedEvents.detectedEventsOrdered = detectedEvents.detectedEventsOrdered[expiredRequests:] l.lggr.Debugw("filterPreviouslyDetectedEvents: done", "filterType", filterType, "nLogs", len(logs), "nFilteredLogs", len(filteredLogs), "nExpiredRequests", expiredRequests, "previouslyDetectedCacheSize", len(detectedEvents.detectedEventsOrdered)) diff --git a/core/services/relay/evm/median.go b/core/services/relay/evm/median.go index 756dc56371f..e3200d8e867 100644 --- a/core/services/relay/evm/median.go +++ b/core/services/relay/evm/median.go @@ -8,7 +8,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/jmoiron/sqlx" - pkgerrors "github.com/pkg/errors" + "github.com/pkg/errors" "github.com/smartcontractkit/libocr/gethwrappers2/ocr2aggregator" "github.com/smartcontractkit/libocr/offchainreporting2/reportingplugin/median" "github.com/smartcontractkit/libocr/offchainreporting2plus/types" @@ -34,17 +34,17 @@ func newMedianContract(configTracker types.ContractConfigTracker, contractAddres lggr = lggr.Named("MedianContract") contract, err := offchain_aggregator_wrapper.NewOffchainAggregator(contractAddress, chain.Client()) if err != nil { - return nil, pkgerrors.Wrap(err, "could not instantiate NewOffchainAggregator") + return nil, errors.Wrap(err, "could not instantiate NewOffchainAggregator") } contractFilterer, err := ocr2aggregator.NewOCR2AggregatorFilterer(contractAddress, chain.Client()) if err != nil { - return nil, pkgerrors.Wrap(err, "could not instantiate NewOffchainAggregatorFilterer") + return nil, errors.Wrap(err, "could not instantiate NewOffchainAggregatorFilterer") } contractCaller, err := ocr2aggregator.NewOCR2AggregatorCaller(contractAddress, chain.Client()) if err != nil { - return nil, pkgerrors.Wrap(err, "could not instantiate NewOffchainAggregatorCaller") + return nil, errors.Wrap(err, "could not instantiate NewOffchainAggregatorCaller") } return &medianContract{ @@ -86,7 +86,7 @@ func (oc *medianContract) HealthReport() map[string]error { func (oc *medianContract) LatestTransmissionDetails(ctx context.Context) (ocrtypes.ConfigDigest, uint32, uint8, *big.Int, time.Time, error) { opts := bind.CallOpts{Context: ctx, Pending: false} result, err := oc.contractCaller.LatestTransmissionDetails(&opts) - return result.ConfigDigest, result.Epoch, result.Round, result.LatestAnswer, time.Unix(int64(result.LatestTimestamp), 0), pkgerrors.Wrap(err, "error getting LatestTransmissionDetails") + return result.ConfigDigest, result.Epoch, result.Round, result.LatestAnswer, time.Unix(int64(result.LatestTimestamp), 0), errors.Wrap(err, "error getting LatestTransmissionDetails") } // LatestRoundRequested returns the configDigest, epoch, and round from the latest diff --git a/core/services/relay/evm/mercury/config_digest.go b/core/services/relay/evm/mercury/config_digest.go index b9431fe923f..291a723ee3a 100644 --- a/core/services/relay/evm/mercury/config_digest.go +++ b/core/services/relay/evm/mercury/config_digest.go @@ -61,7 +61,7 @@ func configDigest( panic("copy too little data") } binary.BigEndian.PutUint16(configDigest[:2], uint16(types.ConfigDigestPrefixMercuryV02)) - if !(configDigest[0] == 0 || configDigest[1] == 6) { + if !(configDigest[0] == 0 && configDigest[1] == 6) { // assertion panic("unexpected mismatch") } diff --git a/core/services/relay/evm/mercury/config_poller.go b/core/services/relay/evm/mercury/config_poller.go index dbc29754fed..98ef78020c7 100644 --- a/core/services/relay/evm/mercury/config_poller.go +++ b/core/services/relay/evm/mercury/config_poller.go @@ -8,7 +8,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" - pkgerrors "github.com/pkg/errors" + "github.com/pkg/errors" ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" @@ -48,7 +48,7 @@ func unpackLogData(d []byte) (*verifier.VerifierConfigSet, error) { err := verifierABI.UnpackIntoInterface(unpacked, configSetEventName, d) if err != nil { - return nil, pkgerrors.Wrap(err, "failed to unpack log data") + return nil, errors.Wrap(err, "failed to unpack log data") } return unpacked, nil @@ -168,7 +168,7 @@ func (cp *ConfigPoller) LatestConfig(ctx context.Context, changedInBlock uint64) func (cp *ConfigPoller) LatestBlockHeight(ctx context.Context) (blockHeight uint64, err error) { latest, err := cp.destChainLogPoller.LatestBlock(pg.WithParentCtx(ctx)) if err != nil { - if pkgerrors.Is(err, sql.ErrNoRows) { + if errors.Is(err, sql.ErrNoRows) { return 0, nil } return 0, err diff --git a/core/services/relay/evm/mercury/config_poller_test.go b/core/services/relay/evm/mercury/config_poller_test.go index 44c61712b6d..f828938f954 100644 --- a/core/services/relay/evm/mercury/config_poller_test.go +++ b/core/services/relay/evm/mercury/config_poller_test.go @@ -7,7 +7,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/onsi/gomega" - pkgerrors "github.com/pkg/errors" + "github.com/pkg/errors" confighelper2 "github.com/smartcontractkit/libocr/offchainreporting2plus/confighelper" "github.com/smartcontractkit/libocr/offchainreporting2plus/types" ocrtypes2 "github.com/smartcontractkit/libocr/offchainreporting2plus/types" @@ -115,7 +115,7 @@ func TestMercuryConfigPoller(t *testing.T) { func onchainPublicKeyToAddress(publicKeys []types.OnchainPublicKey) (addresses []common.Address, err error) { for _, signer := range publicKeys { if len(signer) != 20 { - return []common.Address{}, pkgerrors.Errorf("address is not 20 bytes %s", signer) + return []common.Address{}, errors.Errorf("address is not 20 bytes %s", signer) } addresses = append(addresses, common.BytesToAddress(signer)) } diff --git a/core/services/relay/evm/mercury/helpers_test.go b/core/services/relay/evm/mercury/helpers_test.go index f1686ee00c8..8283e80916e 100644 --- a/core/services/relay/evm/mercury/helpers_test.go +++ b/core/services/relay/evm/mercury/helpers_test.go @@ -167,7 +167,7 @@ func SetupTH(t *testing.T, feedID common.Hash) TestHarness { ethClient := evmclient.NewSimulatedBackendClient(t, b, big.NewInt(1337)) lggr := logger.TestLogger(t) lorm := logpoller.NewORM(big.NewInt(1337), db, lggr, cfg) - lp := logpoller.NewLogPoller(lorm, ethClient, lggr, 100*time.Millisecond, false, 1, 2, 2, 1000) + lp := logpoller.NewLogPoller(lorm, ethClient, lggr, 100*time.Millisecond, false, 1, 2, 2, 1000, 0) servicetest.Run(t, lp) configPoller, err := NewConfigPoller(lggr, lp, verifierAddress, feedID) diff --git a/core/services/relay/evm/mercury/offchain_config_digester.go b/core/services/relay/evm/mercury/offchain_config_digester.go index 5188605d0e5..f9ba0b23095 100644 --- a/core/services/relay/evm/mercury/offchain_config_digester.go +++ b/core/services/relay/evm/mercury/offchain_config_digester.go @@ -6,7 +6,7 @@ import ( "math/big" "github.com/ethereum/go-ethereum/common" - pkgerrors "github.com/pkg/errors" + "github.com/pkg/errors" ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" "github.com/smartcontractkit/wsrpc/credentials" @@ -32,7 +32,7 @@ func (d OffchainConfigDigester) ConfigDigest(cc ocrtypes.ContractConfig) (ocrtyp signers := []common.Address{} for i, signer := range cc.Signers { if len(signer) != 20 { - return ocrtypes.ConfigDigest{}, pkgerrors.Errorf("%v-th evm signer should be a 20 byte address, but got %x", i, signer) + return ocrtypes.ConfigDigest{}, errors.Errorf("%v-th evm signer should be a 20 byte address, but got %x", i, signer) } a := common.BytesToAddress(signer) signers = append(signers, a) @@ -40,12 +40,12 @@ func (d OffchainConfigDigester) ConfigDigest(cc ocrtypes.ContractConfig) (ocrtyp transmitters := []credentials.StaticSizedPublicKey{} for i, transmitter := range cc.Transmitters { if len(transmitter) != 2*ed25519.PublicKeySize { - return ocrtypes.ConfigDigest{}, pkgerrors.Errorf("%v-th evm transmitter should be a 64 character hex-encoded ed25519 public key, but got '%v' (%d chars)", i, transmitter, len(transmitter)) + return ocrtypes.ConfigDigest{}, errors.Errorf("%v-th evm transmitter should be a 64 character hex-encoded ed25519 public key, but got '%v' (%d chars)", i, transmitter, len(transmitter)) } var t credentials.StaticSizedPublicKey b, err := hex.DecodeString(string(transmitter)) if err != nil { - return ocrtypes.ConfigDigest{}, pkgerrors.Wrapf(err, "%v-th evm transmitter is not valid hex, got: %q", i, transmitter) + return ocrtypes.ConfigDigest{}, errors.Wrapf(err, "%v-th evm transmitter is not valid hex, got: %q", i, transmitter) } copy(t[:], b) diff --git a/core/services/relay/evm/mercury/transmitter.go b/core/services/relay/evm/mercury/transmitter.go index 6a4bfeb6dd0..9444b904b89 100644 --- a/core/services/relay/evm/mercury/transmitter.go +++ b/core/services/relay/evm/mercury/transmitter.go @@ -108,7 +108,6 @@ type mercuryTransmitter struct { services.StateMachine lggr logger.Logger rpcClient wsrpc.Client - cfgTracker ConfigTracker persistenceManager *PersistenceManager codec TransmitterReportDecoder @@ -149,14 +148,13 @@ func getPayloadTypes() abi.Arguments { }) } -func NewTransmitter(lggr logger.Logger, cfgTracker ConfigTracker, rpcClient wsrpc.Client, fromAccount ed25519.PublicKey, jobID int32, feedID [32]byte, db *sqlx.DB, cfg pg.QConfig, codec TransmitterReportDecoder) *mercuryTransmitter { +func NewTransmitter(lggr logger.Logger, rpcClient wsrpc.Client, fromAccount ed25519.PublicKey, jobID int32, feedID [32]byte, db *sqlx.DB, cfg pg.QConfig, codec TransmitterReportDecoder) *mercuryTransmitter { feedIDHex := fmt.Sprintf("0x%x", feedID[:]) persistenceManager := NewPersistenceManager(lggr, NewORM(db, lggr, cfg), jobID, maxTransmitQueueSize, flushDeletesFrequency, pruneFrequency) return &mercuryTransmitter{ services.StateMachine{}, lggr.Named("MercuryTransmitter").With("feedID", feedIDHex), rpcClient, - cfgTracker, persistenceManager, codec, feedID, diff --git a/core/services/relay/evm/mercury/transmitter_test.go b/core/services/relay/evm/mercury/transmitter_test.go index 3be9fa0f407..188beff5113 100644 --- a/core/services/relay/evm/mercury/transmitter_test.go +++ b/core/services/relay/evm/mercury/transmitter_test.go @@ -6,7 +6,7 @@ import ( "testing" "github.com/ethereum/go-ethereum/common/hexutil" - pkgerrors "github.com/pkg/errors" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -27,6 +27,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`) pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`) q := NewTransmitQueue(lggr, "", 0, nil, nil) + codec := new(mockCodec) t.Run("v1 report transmission successfully enqueued", func(t *testing.T) { report := sampleV1Report @@ -40,7 +41,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { return out, nil }, } - mt := NewTransmitter(lggr, nil, c, sampleClientPubKey, jobID, sampleFeedID, db, pgtest.NewQConfig(true), nil) + mt := NewTransmitter(lggr, c, sampleClientPubKey, jobID, sampleFeedID, db, pgtest.NewQConfig(true), codec) mt.queue = q err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs) @@ -58,7 +59,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { return out, nil }, } - mt := NewTransmitter(lggr, nil, c, sampleClientPubKey, jobID, sampleFeedID, db, pgtest.NewQConfig(true), nil) + mt := NewTransmitter(lggr, c, sampleClientPubKey, jobID, sampleFeedID, db, pgtest.NewQConfig(true), codec) mt.queue = q err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs) @@ -76,7 +77,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { return out, nil }, } - mt := NewTransmitter(lggr, nil, c, sampleClientPubKey, jobID, sampleFeedID, db, pgtest.NewQConfig(true), nil) + mt := NewTransmitter(lggr, c, sampleClientPubKey, jobID, sampleFeedID, db, pgtest.NewQConfig(true), codec) mt.queue = q err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs) @@ -88,6 +89,8 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) { t.Parallel() lggr := logger.TestLogger(t) db := pgtest.NewSqlxDB(t) + var jobID int32 + codec := new(mockCodec) t.Run("successful query", func(t *testing.T) { c := mocks.MockWSRPCClient{ @@ -101,7 +104,7 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) { return out, nil }, } - mt := NewTransmitter(lggr, nil, c, sampleClientPubKey, 0, sampleFeedID, db, pgtest.NewQConfig(true), nil) + mt := NewTransmitter(lggr, c, sampleClientPubKey, jobID, sampleFeedID, db, pgtest.NewQConfig(true), codec) ts, err := mt.LatestTimestamp(testutils.Context(t)) require.NoError(t, err) @@ -116,7 +119,7 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) { return out, nil }, } - mt := NewTransmitter(lggr, nil, c, sampleClientPubKey, 0, sampleFeedID, db, pgtest.NewQConfig(true), nil) + mt := NewTransmitter(lggr, c, sampleClientPubKey, jobID, sampleFeedID, db, pgtest.NewQConfig(true), codec) ts, err := mt.LatestTimestamp(testutils.Context(t)) require.NoError(t, err) @@ -126,10 +129,10 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) { t.Run("failing query", func(t *testing.T) { c := mocks.MockWSRPCClient{ LatestReportF: func(ctx context.Context, in *pb.LatestReportRequest) (out *pb.LatestReportResponse, err error) { - return nil, pkgerrors.New("something exploded") + return nil, errors.New("something exploded") }, } - mt := NewTransmitter(lggr, nil, c, sampleClientPubKey, 0, sampleFeedID, db, pgtest.NewQConfig(true), nil) + mt := NewTransmitter(lggr, c, sampleClientPubKey, jobID, sampleFeedID, db, pgtest.NewQConfig(true), codec) _, err := mt.LatestTimestamp(testutils.Context(t)) require.Error(t, err) assert.Contains(t, err.Error(), "something exploded") @@ -151,6 +154,7 @@ func Test_MercuryTransmitter_LatestPrice(t *testing.T) { t.Parallel() lggr := logger.TestLogger(t) db := pgtest.NewSqlxDB(t) + var jobID int32 codec := new(mockCodec) @@ -167,7 +171,7 @@ func Test_MercuryTransmitter_LatestPrice(t *testing.T) { return out, nil }, } - mt := NewTransmitter(lggr, nil, c, sampleClientPubKey, 0, sampleFeedID, db, pgtest.NewQConfig(true), codec) + mt := NewTransmitter(lggr, c, sampleClientPubKey, jobID, sampleFeedID, db, pgtest.NewQConfig(true), codec) t.Run("BenchmarkPriceFromReport succeeds", func(t *testing.T) { codec.val = originalPrice @@ -180,7 +184,7 @@ func Test_MercuryTransmitter_LatestPrice(t *testing.T) { }) t.Run("BenchmarkPriceFromReport fails", func(t *testing.T) { codec.val = nil - codec.err = pkgerrors.New("something exploded") + codec.err = errors.New("something exploded") _, err := mt.LatestPrice(testutils.Context(t), sampleFeedID) require.Error(t, err) @@ -197,7 +201,7 @@ func Test_MercuryTransmitter_LatestPrice(t *testing.T) { return out, nil }, } - mt := NewTransmitter(lggr, nil, c, sampleClientPubKey, 0, sampleFeedID, db, pgtest.NewQConfig(true), nil) + mt := NewTransmitter(lggr, c, sampleClientPubKey, jobID, sampleFeedID, db, pgtest.NewQConfig(true), codec) price, err := mt.LatestPrice(testutils.Context(t), sampleFeedID) require.NoError(t, err) @@ -207,10 +211,10 @@ func Test_MercuryTransmitter_LatestPrice(t *testing.T) { t.Run("failing query", func(t *testing.T) { c := mocks.MockWSRPCClient{ LatestReportF: func(ctx context.Context, in *pb.LatestReportRequest) (out *pb.LatestReportResponse, err error) { - return nil, pkgerrors.New("something exploded") + return nil, errors.New("something exploded") }, } - mt := NewTransmitter(lggr, nil, c, sampleClientPubKey, 0, sampleFeedID, db, pgtest.NewQConfig(true), nil) + mt := NewTransmitter(lggr, c, sampleClientPubKey, jobID, sampleFeedID, db, pgtest.NewQConfig(true), codec) _, err := mt.LatestPrice(testutils.Context(t), sampleFeedID) require.Error(t, err) assert.Contains(t, err.Error(), "something exploded") @@ -222,6 +226,8 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) { lggr := logger.TestLogger(t) db := pgtest.NewSqlxDB(t) + var jobID int32 + codec := new(mockCodec) t.Run("successful query", func(t *testing.T) { c := mocks.MockWSRPCClient{ @@ -235,7 +241,7 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) { return out, nil }, } - mt := NewTransmitter(lggr, nil, c, sampleClientPubKey, 0, sampleFeedID, db, pgtest.NewQConfig(true), nil) + mt := NewTransmitter(lggr, c, sampleClientPubKey, jobID, sampleFeedID, db, pgtest.NewQConfig(true), codec) bn, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t)) require.NoError(t, err) @@ -250,7 +256,7 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) { return out, nil }, } - mt := NewTransmitter(lggr, nil, c, sampleClientPubKey, 0, sampleFeedID, db, pgtest.NewQConfig(true), nil) + mt := NewTransmitter(lggr, c, sampleClientPubKey, jobID, sampleFeedID, db, pgtest.NewQConfig(true), codec) bn, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t)) require.NoError(t, err) @@ -259,10 +265,10 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) { t.Run("failing query", func(t *testing.T) { c := mocks.MockWSRPCClient{ LatestReportF: func(ctx context.Context, in *pb.LatestReportRequest) (out *pb.LatestReportResponse, err error) { - return nil, pkgerrors.New("something exploded") + return nil, errors.New("something exploded") }, } - mt := NewTransmitter(lggr, nil, c, sampleClientPubKey, 0, sampleFeedID, db, pgtest.NewQConfig(true), nil) + mt := NewTransmitter(lggr, c, sampleClientPubKey, 0, sampleFeedID, db, pgtest.NewQConfig(true), codec) _, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t)) require.Error(t, err) assert.Contains(t, err.Error(), "something exploded") @@ -279,7 +285,7 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) { return out, nil }, } - mt := NewTransmitter(lggr, nil, c, sampleClientPubKey, 0, sampleFeedID, db, pgtest.NewQConfig(true), nil) + mt := NewTransmitter(lggr, c, sampleClientPubKey, 0, sampleFeedID, db, pgtest.NewQConfig(true), codec) _, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t)) require.Error(t, err) assert.Contains(t, err.Error(), "latestReport failed; mismatched feed IDs, expected: 0x1c916b4aa7e57ca7b68ae1bf45653f56b656fd3aa335ef7fae696b663f1b8472, got: 0x") diff --git a/core/services/relay/evm/mercury/v1/data_source_test.go b/core/services/relay/evm/mercury/v1/data_source_test.go index c96030297d7..e0769fe5b64 100644 --- a/core/services/relay/evm/mercury/v1/data_source_test.go +++ b/core/services/relay/evm/mercury/v1/data_source_test.go @@ -10,7 +10,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - pkgerrors "github.com/pkg/errors" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" @@ -140,7 +140,7 @@ func TestMercury_Observe(t *testing.T) { }) t.Run("if querying latest report fails", func(t *testing.T) { orm.report = nil - orm.err = pkgerrors.New("something exploded") + orm.err = errors.New("something exploded") obs, err := ds.Observe(ctx, repts, true) assert.NoError(t, err) @@ -164,7 +164,7 @@ func TestMercury_Observe(t *testing.T) { t.Run("without latest report in database", func(t *testing.T) { t.Run("if FetchInitialMaxFinalizedBlockNumber returns error", func(t *testing.T) { - fetcher.err = pkgerrors.New("mock fetcher error") + fetcher.err = errors.New("mock fetcher error") obs, err := ds.Observe(ctx, repts, true) assert.NoError(t, err) @@ -228,7 +228,7 @@ func TestMercury_Observe(t *testing.T) { t.Cleanup(func() { runner.Err = nil }) - runner.Err = pkgerrors.New("run execution failed") + runner.Err = errors.New("run execution failed") _, err := ds.Observe(ctx, repts, false) assert.EqualError(t, err, "Observe failed while executing run: error executing run for spec ID 0: run execution failed") diff --git a/core/services/relay/evm/mercury/v2/data_source_test.go b/core/services/relay/evm/mercury/v2/data_source_test.go index df736f76428..c9ae37ae018 100644 --- a/core/services/relay/evm/mercury/v2/data_source_test.go +++ b/core/services/relay/evm/mercury/v2/data_source_test.go @@ -5,7 +5,7 @@ import ( "math/big" "testing" - pkgerrors "github.com/pkg/errors" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" @@ -111,7 +111,7 @@ func Test_Datasource(t *testing.T) { }) t.Run("if querying latest report fails", func(t *testing.T) { orm.report = nil - orm.err = pkgerrors.New("something exploded") + orm.err = errors.New("something exploded") obs, err := ds.Observe(ctx, repts, true) assert.NoError(t, err) @@ -134,7 +134,7 @@ func Test_Datasource(t *testing.T) { orm.err = nil t.Run("if LatestTimestamp returns error", func(t *testing.T) { - fetcher.tsErr = pkgerrors.New("some error") + fetcher.tsErr = errors.New("some error") obs, err := ds.Observe(ctx, repts, true) assert.NoError(t, err) @@ -202,7 +202,7 @@ func Test_Datasource(t *testing.T) { ds.pipelineRunner = &mercurymocks.MockRunner{ Trrs: goodTrrs, - Err: pkgerrors.New("run execution failed"), + Err: errors.New("run execution failed"), } _, err := ds.Observe(ctx, repts, false) @@ -221,7 +221,7 @@ func Test_Datasource(t *testing.T) { badTrrs := []pipeline.TaskRunResult{ { // benchmark price - Result: pipeline.Result{Error: pkgerrors.New("some error with bp")}, + Result: pipeline.Result{Error: errors.New("some error with bp")}, Task: &mercurymocks.MockTask{}, }, } @@ -263,8 +263,8 @@ func Test_Datasource(t *testing.T) { fetcher.nativePriceErr = nil }) - fetcher.linkPriceErr = pkgerrors.New("some error fetching link price") - fetcher.nativePriceErr = pkgerrors.New("some error fetching native price") + fetcher.linkPriceErr = errors.New("some error fetching link price") + fetcher.nativePriceErr = errors.New("some error fetching native price") obs, err := ds.Observe(ctx, repts, false) assert.NoError(t, err) diff --git a/core/services/relay/evm/mercury/v3/data_source_test.go b/core/services/relay/evm/mercury/v3/data_source_test.go index 27b3c0d62cf..4ff713abb21 100644 --- a/core/services/relay/evm/mercury/v3/data_source_test.go +++ b/core/services/relay/evm/mercury/v3/data_source_test.go @@ -5,7 +5,7 @@ import ( "math/big" "testing" - pkgerrors "github.com/pkg/errors" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" mercurytypes "github.com/smartcontractkit/chainlink-common/pkg/types/mercury" @@ -121,7 +121,7 @@ func Test_Datasource(t *testing.T) { }) t.Run("if querying latest report fails", func(t *testing.T) { orm.report = nil - orm.err = pkgerrors.New("something exploded") + orm.err = errors.New("something exploded") obs, err := ds.Observe(ctx, repts, true) assert.NoError(t, err) @@ -144,7 +144,7 @@ func Test_Datasource(t *testing.T) { orm.err = nil t.Run("if LatestTimestamp returns error", func(t *testing.T) { - fetcher.tsErr = pkgerrors.New("some error") + fetcher.tsErr = errors.New("some error") obs, err := ds.Observe(ctx, repts, true) assert.NoError(t, err) @@ -216,7 +216,7 @@ func Test_Datasource(t *testing.T) { ds.pipelineRunner = &mercurymocks.MockRunner{ Trrs: goodTrrs, - Err: pkgerrors.New("run execution failed"), + Err: errors.New("run execution failed"), } _, err := ds.Observe(ctx, repts, false) @@ -245,7 +245,7 @@ func Test_Datasource(t *testing.T) { }, { // ask - Result: pipeline.Result{Error: pkgerrors.New("some error with ask")}, + Result: pipeline.Result{Error: errors.New("some error with ask")}, Task: &mercurymocks.MockTask{}, }, } @@ -291,8 +291,8 @@ func Test_Datasource(t *testing.T) { fetcher.nativePriceErr = nil }) - fetcher.linkPriceErr = pkgerrors.New("some error fetching link price") - fetcher.nativePriceErr = pkgerrors.New("some error fetching native price") + fetcher.linkPriceErr = errors.New("some error fetching link price") + fetcher.nativePriceErr = errors.New("some error fetching native price") obs, err := ds.Observe(ctx, repts, false) assert.NoError(t, err) diff --git a/core/services/relay/evm/mercury/wsrpc/client.go b/core/services/relay/evm/mercury/wsrpc/client.go index 6812dd44c11..d420a17a1a4 100644 --- a/core/services/relay/evm/mercury/wsrpc/client.go +++ b/core/services/relay/evm/mercury/wsrpc/client.go @@ -8,7 +8,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common/hexutil" - pkgerrors "github.com/pkg/errors" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -156,7 +156,7 @@ func (w *client) dial(ctx context.Context, opts ...wsrpc.DialOption) error { if err != nil { w.dialErrorCountMetric.Inc() setLivenessMetric(false) - return pkgerrors.Wrap(err, "failed to dial wsrpc client") + return errors.Wrap(err, "failed to dial wsrpc client") } w.dialSuccessCountMetric.Inc() setLivenessMetric(true) @@ -193,16 +193,16 @@ func (w *client) resetTransport() { b := utils.NewRedialBackoff() for { // Will block until successful dial, or context is canceled (i.e. on close) - if err := w.dial(ctx, wsrpc.WithBlock()); err != nil { - if ctx.Err() != nil { - w.logger.Debugw("ResetTransport exiting due to client Close", "err", err) - return - } - w.logger.Errorw("ResetTransport failed to redial", "err", err) - time.Sleep(b.Duration()) - } else { + err := w.dial(ctx, wsrpc.WithBlock()) + if err == nil { break } + if ctx.Err() != nil { + w.logger.Debugw("ResetTransport exiting due to client Close", "err", err) + return + } + w.logger.Errorw("ResetTransport failed to redial", "err", err) + time.Sleep(b.Duration()) } w.logger.Info("ResetTransport successfully redialled") } @@ -231,7 +231,7 @@ func (w *client) Healthy() (err error) { } state := w.conn.GetState() if state != connectivity.Ready { - return pkgerrors.Errorf("client state should be %s; got %s", connectivity.Ready, state) + return errors.Errorf("client state should be %s; got %s", connectivity.Ready, state) } return nil } @@ -239,12 +239,12 @@ func (w *client) Healthy() (err error) { func (w *client) waitForReady(ctx context.Context) (err error) { ok := w.IfStarted(func() { if ready := w.conn.WaitForReady(ctx); !ready { - err = pkgerrors.Errorf("websocket client not ready; got state: %v", w.conn.GetState()) + err = errors.Errorf("websocket client not ready; got state: %v", w.conn.GetState()) return } }) if !ok { - return pkgerrors.New("client is not started") + return errors.New("client is not started") } return } @@ -253,7 +253,7 @@ func (w *client) Transmit(ctx context.Context, req *pb.TransmitRequest) (resp *p w.logger.Trace("Transmit") start := time.Now() if err = w.waitForReady(ctx); err != nil { - return nil, pkgerrors.Wrap(err, "Transmit call failed") + return nil, errors.Wrap(err, "Transmit call failed") } resp, err = w.rawClient.Transmit(ctx, req) w.handleTimeout(err) @@ -269,7 +269,7 @@ func (w *client) Transmit(ctx context.Context, req *pb.TransmitRequest) (resp *p } func (w *client) handleTimeout(err error) { - if pkgerrors.Is(err, context.DeadlineExceeded) { + if errors.Is(err, context.DeadlineExceeded) { w.timeoutCountMetric.Inc() cnt := w.consecutiveTimeoutCnt.Add(1) if cnt == MaxConsecutiveRequestFailures { @@ -305,7 +305,7 @@ func (w *client) LatestReport(ctx context.Context, req *pb.LatestReportRequest) lggr := w.logger.With("req.FeedId", hexutil.Encode(req.FeedId)) lggr.Trace("LatestReport") if err = w.waitForReady(ctx); err != nil { - return nil, pkgerrors.Wrap(err, "LatestReport failed") + return nil, errors.Wrap(err, "LatestReport failed") } var cached bool if w.cache == nil { diff --git a/core/services/relay/evm/mercury/wsrpc/pb/generate.go b/core/services/relay/evm/mercury/wsrpc/pb/generate.go new file mode 100644 index 00000000000..2bb95012d1c --- /dev/null +++ b/core/services/relay/evm/mercury/wsrpc/pb/generate.go @@ -0,0 +1,2 @@ +//go:generate protoc --go_out=. --go_opt=paths=source_relative --go-wsrpc_out=. --go-wsrpc_opt=paths=source_relative mercury.proto +package pb diff --git a/core/services/relay/evm/mercury/wsrpc/pb/mercury.pb.go b/core/services/relay/evm/mercury/wsrpc/pb/mercury.pb.go index 4ffe41860e6..ab4d2f68dad 100644 --- a/core/services/relay/evm/mercury/wsrpc/pb/mercury.pb.go +++ b/core/services/relay/evm/mercury/wsrpc/pb/mercury.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v3.21.12 +// protoc-gen-go v1.32.0 +// protoc v4.25.1 // source: mercury.proto package pb @@ -25,7 +25,8 @@ type TransmitRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"` + Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"` + ReportFormat uint32 `protobuf:"varint,2,opt,name=reportFormat,proto3" json:"reportFormat,omitempty"` } func (x *TransmitRequest) Reset() { @@ -67,6 +68,13 @@ func (x *TransmitRequest) GetPayload() []byte { return nil } +func (x *TransmitRequest) GetReportFormat() uint32 { + if x != nil { + return x.ReportFormat + } + return 0 +} + type TransmitResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -442,73 +450,76 @@ var File_mercury_proto protoreflect.FileDescriptor var file_mercury_proto_rawDesc = []byte{ 0x0a, 0x0d, 0x6d, 0x65, 0x72, 0x63, 0x75, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, - 0x02, 0x70, 0x62, 0x22, 0x2b, 0x0a, 0x0f, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x6d, 0x69, 0x74, 0x52, + 0x02, 0x70, 0x62, 0x22, 0x4f, 0x0a, 0x0f, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x6d, 0x69, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, - 0x22, 0x3c, 0x0a, 0x10, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x6d, 0x69, 0x74, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x05, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, - 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x22, 0x2d, - 0x0a, 0x13, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x66, 0x65, 0x65, 0x64, 0x49, 0x64, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x66, 0x65, 0x65, 0x64, 0x49, 0x64, 0x22, 0x50, 0x0a, - 0x14, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x22, 0x0a, 0x06, 0x72, - 0x65, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x70, 0x62, - 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x52, 0x06, 0x72, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x22, - 0xa1, 0x04, 0x0a, 0x06, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x66, 0x65, - 0x65, 0x64, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x66, 0x65, 0x65, 0x64, - 0x49, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x70, 0x72, 0x69, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x0c, 0x52, 0x05, 0x70, 0x72, 0x69, 0x63, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, - 0x6f, 0x61, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, - 0x61, 0x64, 0x12, 0x32, 0x0a, 0x14, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x46, 0x72, 0x6f, 0x6d, 0x42, - 0x6c, 0x6f, 0x63, 0x6b, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, - 0x52, 0x14, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x46, 0x72, 0x6f, 0x6d, 0x42, 0x6c, 0x6f, 0x63, 0x6b, - 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x12, 0x2e, 0x0a, 0x12, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, - 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x18, 0x05, 0x20, 0x01, - 0x28, 0x03, 0x52, 0x12, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, - 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x12, 0x2a, 0x0a, 0x10, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, - 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x48, 0x61, 0x73, 0x68, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, - 0x52, 0x10, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x48, 0x61, - 0x73, 0x68, 0x12, 0x34, 0x0a, 0x15, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x42, 0x6c, 0x6f, - 0x63, 0x6b, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x07, 0x20, 0x01, 0x28, - 0x04, 0x52, 0x15, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x54, - 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x34, 0x0a, 0x15, 0x6f, 0x62, 0x73, 0x65, - 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, - 0x70, 0x18, 0x08, 0x20, 0x01, 0x28, 0x03, 0x52, 0x15, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, - 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x22, - 0x0a, 0x0c, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x44, 0x69, 0x67, 0x65, 0x73, 0x74, 0x18, 0x09, - 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0c, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x44, 0x69, 0x67, 0x65, - 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x70, 0x6f, 0x63, 0x68, 0x18, 0x0a, 0x20, 0x01, 0x28, - 0x0d, 0x52, 0x05, 0x65, 0x70, 0x6f, 0x63, 0x68, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x6f, 0x75, 0x6e, - 0x64, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x72, 0x6f, 0x75, 0x6e, 0x64, 0x12, 0x22, - 0x0a, 0x0c, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x0c, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x4e, 0x61, - 0x6d, 0x65, 0x12, 0x32, 0x0a, 0x14, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x6d, 0x69, 0x74, 0x74, 0x69, - 0x6e, 0x67, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x0c, - 0x52, 0x14, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x6d, 0x69, 0x74, 0x74, 0x69, 0x6e, 0x67, 0x4f, 0x70, - 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x12, 0x2b, 0x0a, 0x09, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, - 0x64, 0x41, 0x74, 0x18, 0x0e, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x70, 0x62, 0x2e, 0x54, - 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, - 0x64, 0x41, 0x74, 0x22, 0x3b, 0x0a, 0x09, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, - 0x12, 0x18, 0x0a, 0x07, 0x73, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x03, 0x52, 0x07, 0x73, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x6e, 0x61, - 0x6e, 0x6f, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x6e, 0x61, 0x6e, 0x6f, 0x73, - 0x32, 0x83, 0x01, 0x0a, 0x07, 0x4d, 0x65, 0x72, 0x63, 0x75, 0x72, 0x79, 0x12, 0x35, 0x0a, 0x08, - 0x54, 0x72, 0x61, 0x6e, 0x73, 0x6d, 0x69, 0x74, 0x12, 0x13, 0x2e, 0x70, 0x62, 0x2e, 0x54, 0x72, - 0x61, 0x6e, 0x73, 0x6d, 0x69, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, - 0x70, 0x62, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x6d, 0x69, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x41, 0x0a, 0x0c, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x52, 0x65, 0x70, - 0x6f, 0x72, 0x74, 0x12, 0x17, 0x2e, 0x70, 0x62, 0x2e, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x52, - 0x65, 0x70, 0x6f, 0x72, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x70, - 0x62, 0x2e, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x4e, 0x5a, 0x4c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, - 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x6d, 0x61, 0x72, 0x74, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, - 0x63, 0x74, 0x6b, 0x69, 0x74, 0x2f, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x6c, 0x69, 0x6e, 0x6b, 0x2f, - 0x76, 0x32, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, 0x72, 0x65, 0x6c, 0x61, - 0x79, 0x2f, 0x65, 0x76, 0x6d, 0x2f, 0x6d, 0x65, 0x72, 0x63, 0x75, 0x72, 0x79, 0x2f, 0x77, 0x73, - 0x72, 0x70, 0x63, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x12, 0x22, 0x0a, 0x0c, 0x72, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x46, 0x6f, 0x72, 0x6d, 0x61, 0x74, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0c, 0x72, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x46, 0x6f, + 0x72, 0x6d, 0x61, 0x74, 0x22, 0x3c, 0x0a, 0x10, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x6d, 0x69, 0x74, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x14, 0x0a, 0x05, + 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, + 0x6f, 0x72, 0x22, 0x2d, 0x0a, 0x13, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x52, 0x65, 0x70, 0x6f, + 0x72, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x66, 0x65, 0x65, + 0x64, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x66, 0x65, 0x65, 0x64, 0x49, + 0x64, 0x22, 0x50, 0x0a, 0x14, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x52, 0x65, 0x70, 0x6f, 0x72, + 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, + 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, + 0x22, 0x0a, 0x06, 0x72, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x0a, 0x2e, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x52, 0x06, 0x72, 0x65, 0x70, + 0x6f, 0x72, 0x74, 0x22, 0xa1, 0x04, 0x0a, 0x06, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x16, + 0x0a, 0x06, 0x66, 0x65, 0x65, 0x64, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, + 0x66, 0x65, 0x65, 0x64, 0x49, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x70, 0x72, 0x69, 0x63, 0x65, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x70, 0x72, 0x69, 0x63, 0x65, 0x12, 0x18, 0x0a, 0x07, + 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, + 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x32, 0x0a, 0x14, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x46, + 0x72, 0x6f, 0x6d, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x03, 0x52, 0x14, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x46, 0x72, 0x6f, 0x6d, 0x42, + 0x6c, 0x6f, 0x63, 0x6b, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x12, 0x2e, 0x0a, 0x12, 0x63, 0x75, + 0x72, 0x72, 0x65, 0x6e, 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, + 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x12, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x42, + 0x6c, 0x6f, 0x63, 0x6b, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x12, 0x2a, 0x0a, 0x10, 0x63, 0x75, + 0x72, 0x72, 0x65, 0x6e, 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x48, 0x61, 0x73, 0x68, 0x18, 0x06, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x10, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x42, 0x6c, 0x6f, + 0x63, 0x6b, 0x48, 0x61, 0x73, 0x68, 0x12, 0x34, 0x0a, 0x15, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, + 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, + 0x07, 0x20, 0x01, 0x28, 0x04, 0x52, 0x15, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x42, 0x6c, + 0x6f, 0x63, 0x6b, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x34, 0x0a, 0x15, + 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x54, 0x69, 0x6d, 0x65, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x08, 0x20, 0x01, 0x28, 0x03, 0x52, 0x15, 0x6f, 0x62, 0x73, + 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, + 0x6d, 0x70, 0x12, 0x22, 0x0a, 0x0c, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x44, 0x69, 0x67, 0x65, + 0x73, 0x74, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0c, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, + 0x44, 0x69, 0x67, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x70, 0x6f, 0x63, 0x68, 0x18, + 0x0a, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x65, 0x70, 0x6f, 0x63, 0x68, 0x12, 0x14, 0x0a, 0x05, + 0x72, 0x6f, 0x75, 0x6e, 0x64, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x72, 0x6f, 0x75, + 0x6e, 0x64, 0x12, 0x22, 0x0a, 0x0c, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x4e, 0x61, + 0x6d, 0x65, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, + 0x6f, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x32, 0x0a, 0x14, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x6d, + 0x69, 0x74, 0x74, 0x69, 0x6e, 0x67, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x18, 0x0d, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x14, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x6d, 0x69, 0x74, 0x74, 0x69, + 0x6e, 0x67, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x12, 0x2b, 0x0a, 0x09, 0x63, 0x72, + 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x18, 0x0e, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, 0x2e, + 0x70, 0x62, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x63, 0x72, + 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x22, 0x3b, 0x0a, 0x09, 0x54, 0x69, 0x6d, 0x65, 0x73, + 0x74, 0x61, 0x6d, 0x70, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x73, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x12, 0x14, + 0x0a, 0x05, 0x6e, 0x61, 0x6e, 0x6f, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x6e, + 0x61, 0x6e, 0x6f, 0x73, 0x32, 0x83, 0x01, 0x0a, 0x07, 0x4d, 0x65, 0x72, 0x63, 0x75, 0x72, 0x79, + 0x12, 0x35, 0x0a, 0x08, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x6d, 0x69, 0x74, 0x12, 0x13, 0x2e, 0x70, + 0x62, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x6d, 0x69, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x14, 0x2e, 0x70, 0x62, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x6d, 0x69, 0x74, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x41, 0x0a, 0x0c, 0x4c, 0x61, 0x74, 0x65, 0x73, + 0x74, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x17, 0x2e, 0x70, 0x62, 0x2e, 0x4c, 0x61, 0x74, + 0x65, 0x73, 0x74, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x18, 0x2e, 0x70, 0x62, 0x2e, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x52, 0x65, 0x70, 0x6f, + 0x72, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x4e, 0x5a, 0x4c, 0x67, 0x69, + 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x6d, 0x61, 0x72, 0x74, 0x63, 0x6f, + 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x6b, 0x69, 0x74, 0x2f, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x6c, + 0x69, 0x6e, 0x6b, 0x2f, 0x76, 0x32, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, + 0x72, 0x65, 0x6c, 0x61, 0x79, 0x2f, 0x65, 0x76, 0x6d, 0x2f, 0x6d, 0x65, 0x72, 0x63, 0x75, 0x72, + 0x79, 0x2f, 0x77, 0x73, 0x72, 0x70, 0x63, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( diff --git a/core/services/relay/evm/mercury/wsrpc/pb/mercury.proto b/core/services/relay/evm/mercury/wsrpc/pb/mercury.proto index 184b0572046..6b71404a6a6 100644 --- a/core/services/relay/evm/mercury/wsrpc/pb/mercury.proto +++ b/core/services/relay/evm/mercury/wsrpc/pb/mercury.proto @@ -11,7 +11,7 @@ service Mercury { message TransmitRequest { bytes payload = 1; - string reportFormat = 2; + uint32 reportFormat = 2; } message TransmitResponse { diff --git a/core/services/relay/evm/mercury/wsrpc/pb/mercury_wsrpc.pb.go b/core/services/relay/evm/mercury/wsrpc/pb/mercury_wsrpc.pb.go index 23c78abf533..0c31a1d7ac9 100644 --- a/core/services/relay/evm/mercury/wsrpc/pb/mercury_wsrpc.pb.go +++ b/core/services/relay/evm/mercury/wsrpc/pb/mercury_wsrpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-wsrpc. DO NOT EDIT. // versions: // - protoc-gen-go-wsrpc v0.0.1 -// - protoc v4.23.2 +// - protoc v4.25.1 package pb diff --git a/core/services/relay/evm/ocr2keeper.go b/core/services/relay/evm/ocr2keeper.go index efb518f73ea..6563604945c 100644 --- a/core/services/relay/evm/ocr2keeper.go +++ b/core/services/relay/evm/ocr2keeper.go @@ -4,36 +4,30 @@ import ( "context" "encoding/json" "fmt" - "strings" - iregistry21 "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/i_keeper_registry_master_wrapper_2_1" - "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" - evm "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21" - "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding" - "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider" - "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/transmit" - "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate" - "github.com/smartcontractkit/chainlink/v2/core/services/pg" - - "github.com/smartcontractkit/chainlink-common/pkg/types/automation" - - "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" "github.com/jmoiron/sqlx" - pkgerrors "github.com/pkg/errors" + "github.com/pkg/errors" - "github.com/smartcontractkit/libocr/gethwrappers2/ocr2aggregator" "github.com/smartcontractkit/libocr/offchainreporting2plus/chains/evmutil" "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" "github.com/smartcontractkit/chainlink-automation/pkg/v3/plugin" - commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/types/automation" "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" + iregistry21 "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/i_keeper_registry_master_wrapper_2_1" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" + evm "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/transmit" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate" + "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" ) @@ -90,13 +84,17 @@ func NewOCR2KeeperRelayer(db *sqlx.DB, chain legacyevm.Chain, lggr logger.Logger } func (r *ocr2keeperRelayer) NewOCR2KeeperProvider(rargs commontypes.RelayArgs, pargs commontypes.PluginArgs) (OCR2KeeperProvider, error) { + + // TODO https://smartcontract-it.atlassian.net/browse/BCF-2887 + ctx := context.Background() + cfgWatcher, err := newOCR2KeeperConfigProvider(r.lggr, r.chain, rargs) if err != nil { return nil, err } gasLimit := cfgWatcher.chain.Config().EVM().OCR2().Automation().GasLimit() - contractTransmitter, err := newContractTransmitter(r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, cfgWatcher, configTransmitterOpts{pluginGasLimit: &gasLimit}) + contractTransmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, cfgWatcher, configTransmitterOpts{pluginGasLimit: &gasLimit}, OCR2AggregatorTransmissionContractABI) if err != nil { return nil, err } @@ -221,21 +219,20 @@ func newOCR2KeeperConfigProvider(lggr logger.Logger, chain legacyevm.Chain, rarg } contractAddress := common.HexToAddress(rargs.ContractID) - contractABI, err := abi.JSON(strings.NewReader(ocr2aggregator.OCR2AggregatorMetaData.ABI)) - if err != nil { - return nil, pkgerrors.Wrap(err, "could not get OCR2Aggregator ABI JSON") - } configPoller, err := NewConfigPoller( lggr.With("contractID", rargs.ContractID), - chain.Client(), - chain.LogPoller(), - contractAddress, - // TODO: Does ocr2keeper need to support config contract? DF-19182 - nil, + CPConfig{ + chain.Client(), + chain.LogPoller(), + contractAddress, + // TODO: Does ocr2keeper need to support config contract? DF-19182 + nil, + OCR2AggregatorLogDecoder, + }, ) if err != nil { - return nil, pkgerrors.Wrap(err, "failed to create config poller") + return nil, errors.Wrap(err, "failed to create config poller") } offchainConfigDigester := evmutil.EVMOffchainConfigDigester{ @@ -246,7 +243,6 @@ func newOCR2KeeperConfigProvider(lggr logger.Logger, chain legacyevm.Chain, rarg return newConfigWatcher( lggr, contractAddress, - contractABI, offchainConfigDigester, configPoller, chain, diff --git a/core/services/relay/evm/ocr2vrf.go b/core/services/relay/evm/ocr2vrf.go index ed02ba06f91..98753655550 100644 --- a/core/services/relay/evm/ocr2vrf.go +++ b/core/services/relay/evm/ocr2vrf.go @@ -1,16 +1,13 @@ package evm import ( + "context" "encoding/json" "fmt" - "strings" - "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" "github.com/jmoiron/sqlx" - pkgerrors "github.com/pkg/errors" - "github.com/smartcontractkit/libocr/gethwrappers2/ocr2aggregator" "github.com/smartcontractkit/libocr/offchainreporting2plus/chains/evmutil" ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" @@ -63,11 +60,15 @@ func NewOCR2VRFRelayer(db *sqlx.DB, chain legacyevm.Chain, lggr logger.Logger, e } func (r *ocr2vrfRelayer) NewDKGProvider(rargs commontypes.RelayArgs, pargs commontypes.PluginArgs) (DKGProvider, error) { + + // TODO https://smartcontract-it.atlassian.net/browse/BCF-2887 + ctx := context.Background() + configWatcher, err := newOCR2VRFConfigProvider(r.lggr, r.chain, rargs) if err != nil { return nil, err } - contractTransmitter, err := newContractTransmitter(r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, configWatcher, configTransmitterOpts{}) + contractTransmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, configWatcher, configTransmitterOpts{}, OCR2AggregatorTransmissionContractABI) if err != nil { return nil, err } @@ -86,11 +87,15 @@ func (r *ocr2vrfRelayer) NewDKGProvider(rargs commontypes.RelayArgs, pargs commo } func (r *ocr2vrfRelayer) NewOCR2VRFProvider(rargs commontypes.RelayArgs, pargs commontypes.PluginArgs) (OCR2VRFProvider, error) { + + // TODO https://smartcontract-it.atlassian.net/browse/BCF-2887 + ctx := context.Background() + configWatcher, err := newOCR2VRFConfigProvider(r.lggr, r.chain, rargs) if err != nil { return nil, err } - contractTransmitter, err := newContractTransmitter(r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, configWatcher, configTransmitterOpts{}) + contractTransmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, configWatcher, configTransmitterOpts{}, OCR2AggregatorTransmissionContractABI) if err != nil { return nil, err } @@ -146,17 +151,16 @@ func newOCR2VRFConfigProvider(lggr logger.Logger, chain legacyevm.Chain, rargs c } contractAddress := common.HexToAddress(rargs.ContractID) - contractABI, err := abi.JSON(strings.NewReader(ocr2aggregator.OCR2AggregatorABI)) - if err != nil { - return nil, pkgerrors.Wrap(err, "could not get OCR2Aggregator ABI JSON") - } configPoller, err := NewConfigPoller( lggr.With("contractID", rargs.ContractID), - chain.Client(), - chain.LogPoller(), - contractAddress, - // TODO: Does ocr2vrf need to support config contract? DF-19182 - nil, + CPConfig{ + chain.Client(), + chain.LogPoller(), + contractAddress, + // TODO: Does ocr2vrf need to support config contract? DF-19182 + nil, + OCR2AggregatorLogDecoder, + }, ) if err != nil { return nil, err @@ -170,7 +174,6 @@ func newOCR2VRFConfigProvider(lggr logger.Logger, chain legacyevm.Chain, rargs c return newConfigWatcher( lggr, contractAddress, - contractABI, offchainConfigDigester, configPoller, chain, diff --git a/core/services/relay/evm/relayer_extender.go b/core/services/relay/evm/relayer_extender.go index 79800a14b1a..83f03b47f9e 100644 --- a/core/services/relay/evm/relayer_extender.go +++ b/core/services/relay/evm/relayer_extender.go @@ -5,7 +5,7 @@ import ( "fmt" "math/big" - pkgerrors "github.com/pkg/errors" + "github.com/pkg/errors" "go.uber.org/multierr" "github.com/smartcontractkit/chainlink-common/pkg/loop" @@ -16,7 +16,7 @@ import ( ) // ErrNoChains indicates that no EVM chains have been started -var ErrNoChains = pkgerrors.New("no EVM chains loaded") +var ErrNoChains = errors.New("no EVM chains loaded") type EVMChainRelayerExtender interface { loop.RelayerExt @@ -94,7 +94,7 @@ func (s *ChainRelayerExt) Chain() legacyevm.Chain { return s.chain } -var ErrCorruptEVMChain = pkgerrors.New("corrupt evm chain") +var ErrCorruptEVMChain = errors.New("corrupt evm chain") func (s *ChainRelayerExt) Start(ctx context.Context) error { return s.chain.Start(ctx) diff --git a/core/services/relay/evm/request_round_db.go b/core/services/relay/evm/request_round_db.go index a28c24837a3..b3a5b01bc2c 100644 --- a/core/services/relay/evm/request_round_db.go +++ b/core/services/relay/evm/request_round_db.go @@ -4,7 +4,7 @@ import ( "database/sql" "encoding/json" - pkgerrors "github.com/pkg/errors" + "github.com/pkg/errors" "github.com/smartcontractkit/libocr/gethwrappers2/ocr2aggregator" ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" @@ -35,7 +35,7 @@ func NewRoundRequestedDB(sqldb *sql.DB, oracleSpecID int32, lggr logger.Logger) func (d *requestRoundDB) SaveLatestRoundRequested(tx pg.Queryer, rr ocr2aggregator.OCR2AggregatorRoundRequested) error { rawLog, err := json.Marshal(rr.Raw) if err != nil { - return pkgerrors.Wrap(err, "could not marshal log as JSON") + return errors.Wrap(err, "could not marshal log as JSON") } _, err = tx.Exec(` INSERT INTO ocr2_latest_round_requested (ocr2_oracle_spec_id, requester, config_digest, epoch, round, raw) @@ -47,7 +47,7 @@ VALUES ($1,$2,$3,$4,$5,$6) ON CONFLICT (ocr2_oracle_spec_id) DO UPDATE SET raw = EXCLUDED.raw `, d.oracleSpecID, rr.Requester, rr.ConfigDigest[:], rr.Epoch, rr.Round, rawLog) - return pkgerrors.Wrap(err, "could not save latest round requested") + return errors.Wrap(err, "could not save latest round requested") } func (d *requestRoundDB) LoadLatestRoundRequested() (ocr2aggregator.OCR2AggregatorRoundRequested, error) { @@ -59,7 +59,7 @@ WHERE ocr2_oracle_spec_id = $1 LIMIT 1 `, d.oracleSpecID) if err != nil { - return rr, pkgerrors.Wrap(err, "LoadLatestRoundRequested failed to query rows") + return rr, errors.Wrap(err, "LoadLatestRoundRequested failed to query rows") } defer rows.Close() @@ -69,17 +69,17 @@ LIMIT 1 err = rows.Scan(&rr.Requester, &configDigest, &rr.Epoch, &rr.Round, &rawLog) if err != nil { - return rr, pkgerrors.Wrap(err, "LoadLatestRoundRequested failed to scan row") + return rr, errors.Wrap(err, "LoadLatestRoundRequested failed to scan row") } rr.ConfigDigest, err = ocrtypes.BytesToConfigDigest(configDigest) if err != nil { - return rr, pkgerrors.Wrap(err, "LoadLatestRoundRequested failed to decode config digest") + return rr, errors.Wrap(err, "LoadLatestRoundRequested failed to decode config digest") } err = json.Unmarshal(rawLog, &rr.Raw) if err != nil { - return rr, pkgerrors.Wrap(err, "LoadLatestRoundRequested failed to unmarshal raw log") + return rr, errors.Wrap(err, "LoadLatestRoundRequested failed to unmarshal raw log") } } diff --git a/core/services/relay/evm/request_round_tracker.go b/core/services/relay/evm/request_round_tracker.go index 6fe105adcd3..1f1ed71fc31 100644 --- a/core/services/relay/evm/request_round_tracker.go +++ b/core/services/relay/evm/request_round_tracker.go @@ -7,7 +7,7 @@ import ( gethCommon "github.com/ethereum/go-ethereum/common" gethTypes "github.com/ethereum/go-ethereum/core/types" - pkgerrors "github.com/pkg/errors" + "github.com/pkg/errors" "github.com/jmoiron/sqlx" @@ -83,7 +83,7 @@ func (t *RequestRoundTracker) Start() error { return t.StartOnce("RequestRoundTracker", func() (err error) { t.latestRoundRequested, err = t.odb.LoadLatestRoundRequested() if err != nil { - return pkgerrors.Wrap(err, "RequestRoundTracker#Start: failed to load latest round requested") + return errors.Wrap(err, "RequestRoundTracker#Start: failed to load latest round requested") } t.unsubscribeLogs = t.logBroadcaster.Register(t, log.ListenerOpts{ diff --git a/core/services/relay/evm/request_round_tracker_test.go b/core/services/relay/evm/request_round_tracker_test.go index cd9eca822aa..cb2ee2a8d72 100644 --- a/core/services/relay/evm/request_round_tracker_test.go +++ b/core/services/relay/evm/request_round_tracker_test.go @@ -6,7 +6,7 @@ import ( "github.com/ethereum/go-ethereum/common" gethCommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" - pkgerrors "github.com/pkg/errors" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -244,7 +244,7 @@ func Test_OCRContractTracker_HandleLog_OCRContractLatestRoundRequested(t *testin logBroadcast.On("String").Return("").Maybe() uni.lb.On("WasAlreadyConsumed", mock.Anything, mock.Anything).Return(false, nil) - uni.db.On("SaveLatestRoundRequested", mock.Anything, mock.Anything).Return(pkgerrors.New("something exploded")) + uni.db.On("SaveLatestRoundRequested", mock.Anything, mock.Anything).Return(errors.New("something exploded")) uni.requestRoundTracker.HandleLog(logBroadcast) diff --git a/core/services/relay/evm/types/abi_types.go b/core/services/relay/evm/types/abi_types.go index 34b12d885b4..4d1328bcc12 100644 --- a/core/services/relay/evm/types/abi_types.go +++ b/core/services/relay/evm/types/abi_types.go @@ -53,6 +53,10 @@ var typeMap = map[string]*ABIEncodingType{ native: reflect.TypeOf(common.Address{}), checked: reflect.TypeOf(common.Address{}), }, + "bytes": { + native: reflect.TypeOf([]byte{}), + checked: reflect.TypeOf([]byte{}), + }, } type ABIEncodingType struct { diff --git a/core/services/relay/evm/types/codec_entry.go b/core/services/relay/evm/types/codec_entry.go index b87f7ced721..21e5ac59847 100644 --- a/core/services/relay/evm/types/codec_entry.go +++ b/core/services/relay/evm/types/codec_entry.go @@ -222,7 +222,7 @@ func getNativeAndCheckedTypes(curType *abi.Type) (reflect.Type, reflect.Type, er func createTupleType(curType *abi.Type, converter func(reflect.Type) reflect.Type) (reflect.Type, reflect.Type, error) { if len(curType.TupleElems) == 0 { if curType.TupleType == nil { - return nil, nil, fmt.Errorf("%w: unsupported solitidy type: %v", commontypes.ErrInvalidType, curType.String()) + return nil, nil, fmt.Errorf("%w: unsupported solidity type: %v", commontypes.ErrInvalidType, curType.String()) } return curType.TupleType, curType.TupleType, nil }