From 31d77579690cd1f7e31bcf00d55e9b7a986376c0 Mon Sep 17 00:00:00 2001 From: Amir Y <83904651+amirylm@users.noreply.github.com> Date: Wed, 29 Mar 2023 20:06:19 +0300 Subject: [PATCH] Linking messages to transaction hashes in logs (#8817) Co-authored-by: Connor Stein --- core/chains/evm/txmgr/models.go | 15 ++++++++++++ .../features/ocr2/features_ocr2_test.go | 4 ++-- core/services/ocr/contract_transmitter.go | 2 +- core/services/ocrcommon/transmitter.go | 5 ++-- .../ocrcommon/transmitter_pipeline.go | 2 +- .../ocrcommon/transmitter_pipeline_test.go | 2 +- core/services/ocrcommon/transmitter_test.go | 8 +++---- .../relay/evm/contract_transmitter.go | 24 ++++++++++++++++--- .../relay/evm/contract_transmitter_test.go | 7 ++++-- core/services/relay/evm/evm.go | 2 ++ 10 files changed, 55 insertions(+), 16 deletions(-) diff --git a/core/chains/evm/txmgr/models.go b/core/chains/evm/txmgr/models.go index 212f76cb891..0d5b6c1b126 100644 --- a/core/chains/evm/txmgr/models.go +++ b/core/chains/evm/txmgr/models.go @@ -56,6 +56,11 @@ type EthTxMeta struct { // Used only for forwarded txs, tracks the original destination address. // When this is set, it indicates tx is forwarded through To address. FwdrDestAddress *common.Address `json:"ForwarderDestAddress,omitempty"` + + // MessageIDs is used by CCIP for tx to executed messages correlation in logs + MessageIDs []string `json:"MessageIDs,omitempty"` + // SeqNumbers is used by CCIP for tx to committed sequence numbers correlation in logs + SeqNumbers []uint64 `json:"SeqNumbers,omitempty"` } // TransmitCheckerSpec defines the check that should be performed before a transaction is submitted @@ -263,6 +268,16 @@ func (e EthTx) GetLogger(lgr logger.Logger) logger.Logger { if meta.FwdrDestAddress != nil { lgr = lgr.With("FwdrDestAddress", *meta.FwdrDestAddress) } + + if len(meta.MessageIDs) > 0 { + for _, mid := range meta.MessageIDs { + lgr = lgr.With("messageID", mid) + } + } + + if len(meta.SeqNumbers) > 0 { + lgr = lgr.With("SeqNumbers", meta.SeqNumbers) + } } return lgr diff --git a/core/internal/features/ocr2/features_ocr2_test.go b/core/internal/features/ocr2/features_ocr2_test.go index 14962c40901..0eb5a28d118 100644 --- a/core/internal/features/ocr2/features_ocr2_test.go +++ b/core/internal/features/ocr2/features_ocr2_test.go @@ -430,7 +430,7 @@ juelsPerFeeCoinSource = """ // Assert we can read the latest config digest and epoch after a report has been submitted. contractABI, err := abi.JSON(strings.NewReader(ocr2aggregator.OCR2AggregatorABI)) require.NoError(t, err) - ct, err := evm.NewOCRContractTransmitter(ocrContractAddress, apps[0].Chains.EVM.Chains()[0].Client(), contractABI, nil, apps[0].Chains.EVM.Chains()[0].LogPoller(), lggr) + ct, err := evm.NewOCRContractTransmitter(ocrContractAddress, apps[0].Chains.EVM.Chains()[0].Client(), contractABI, nil, apps[0].Chains.EVM.Chains()[0].LogPoller(), lggr, nil) require.NoError(t, err) configDigest, epoch, err := ct.LatestConfigDigestAndEpoch(testutils.Context(t)) require.NoError(t, err) @@ -698,7 +698,7 @@ juelsPerFeeCoinSource = """ // Assert we can read the latest config digest and epoch after a report has been submitted. contractABI, err := abi.JSON(strings.NewReader(ocr2aggregator.OCR2AggregatorABI)) require.NoError(t, err) - ct, err := evm.NewOCRContractTransmitter(ocrContractAddress, apps[0].Chains.EVM.Chains()[0].Client(), contractABI, nil, apps[0].Chains.EVM.Chains()[0].LogPoller(), lggr) + ct, err := evm.NewOCRContractTransmitter(ocrContractAddress, apps[0].Chains.EVM.Chains()[0].Client(), contractABI, nil, apps[0].Chains.EVM.Chains()[0].LogPoller(), lggr, nil) require.NoError(t, err) configDigest, epoch, err := ct.LatestConfigDigestAndEpoch(testutils.Context(t)) require.NoError(t, err) diff --git a/core/services/ocr/contract_transmitter.go b/core/services/ocr/contract_transmitter.go index 94a53e934a2..4939256ac59 100644 --- a/core/services/ocr/contract_transmitter.go +++ b/core/services/ocr/contract_transmitter.go @@ -60,7 +60,7 @@ func (oc *OCRContractTransmitter) Transmit(ctx context.Context, report []byte, r return errors.Wrap(err, "abi.Pack failed") } - return errors.Wrap(oc.transmitter.CreateEthTransaction(ctx, oc.contractAddress, payload), "failed to send Eth transaction") + return errors.Wrap(oc.transmitter.CreateEthTransaction(ctx, oc.contractAddress, payload, nil), "failed to send Eth transaction") } func (oc *OCRContractTransmitter) LatestTransmissionDetails(ctx context.Context) (configDigest ocrtypes.ConfigDigest, epoch uint32, round uint8, latestAnswer ocrtypes.Observation, latestTimestamp time.Time, err error) { diff --git a/core/services/ocrcommon/transmitter.go b/core/services/ocrcommon/transmitter.go index 0e6be237631..6eb9925694e 100644 --- a/core/services/ocrcommon/transmitter.go +++ b/core/services/ocrcommon/transmitter.go @@ -21,7 +21,7 @@ type txManager interface { } type Transmitter interface { - CreateEthTransaction(ctx context.Context, toAddress common.Address, payload []byte) error + CreateEthTransaction(ctx context.Context, toAddress common.Address, payload []byte, txMeta *txmgr.EthTxMeta) error FromAddress() common.Address } @@ -65,7 +65,7 @@ func NewTransmitter( }, nil } -func (t *transmitter) CreateEthTransaction(ctx context.Context, toAddress common.Address, payload []byte) error { +func (t *transmitter) CreateEthTransaction(ctx context.Context, toAddress common.Address, payload []byte, txMeta *txmgr.EthTxMeta) error { roundRobinFromAddress, err := t.keystore.GetRoundRobinAddress(t.chainID, t.fromAddresses...) if err != nil { @@ -80,6 +80,7 @@ func (t *transmitter) CreateEthTransaction(ctx context.Context, toAddress common ForwarderAddress: t.forwarderAddress(), Strategy: t.strategy, Checker: t.checker, + Meta: txMeta, }, pg.WithParentCtx(ctx)) return errors.Wrap(err, "skipped OCR transmission") } diff --git a/core/services/ocrcommon/transmitter_pipeline.go b/core/services/ocrcommon/transmitter_pipeline.go index 35fbd80c11e..31df2d5e78d 100644 --- a/core/services/ocrcommon/transmitter_pipeline.go +++ b/core/services/ocrcommon/transmitter_pipeline.go @@ -64,7 +64,7 @@ func NewPipelineTransmitter( } } -func (t *pipelineTransmitter) CreateEthTransaction(ctx context.Context, toAddress common.Address, payload []byte) error { +func (t *pipelineTransmitter) CreateEthTransaction(ctx context.Context, toAddress common.Address, payload []byte, _ *txmgr.EthTxMeta) error { // t.strategy is ignored currently as pipeline does not support passing this (sc-55115) vars := pipeline.NewVarsFrom(map[string]interface{}{ "jobSpec": map[string]interface{}{ diff --git a/core/services/ocrcommon/transmitter_pipeline_test.go b/core/services/ocrcommon/transmitter_pipeline_test.go index cf62fc8652d..c4d3559e373 100644 --- a/core/services/ocrcommon/transmitter_pipeline_test.go +++ b/core/services/ocrcommon/transmitter_pipeline_test.go @@ -73,5 +73,5 @@ func Test_PipelineTransmitter_CreateEthTransaction(t *testing.T) { run.State = pipeline.RunStatusCompleted }).Once() - require.NoError(t, transmitter.CreateEthTransaction(testutils.Context(t), toAddress, payload)) + require.NoError(t, transmitter.CreateEthTransaction(testutils.Context(t), toAddress, payload, nil)) } diff --git a/core/services/ocrcommon/transmitter_test.go b/core/services/ocrcommon/transmitter_test.go index 5b0dfc558c1..16410d0643c 100644 --- a/core/services/ocrcommon/transmitter_test.go +++ b/core/services/ocrcommon/transmitter_test.go @@ -59,7 +59,7 @@ func Test_DefaultTransmitter_CreateEthTransaction(t *testing.T) { Meta: nil, Strategy: strategy, }, mock.Anything).Return(txmgr.EthTx{}, nil).Once() - require.NoError(t, transmitter.CreateEthTransaction(testutils.Context(t), toAddress, payload)) + require.NoError(t, transmitter.CreateEthTransaction(testutils.Context(t), toAddress, payload, nil)) } func Test_DefaultTransmitter_Forwarding_Enabled_CreateEthTransaction(t *testing.T) { @@ -108,8 +108,8 @@ func Test_DefaultTransmitter_Forwarding_Enabled_CreateEthTransaction(t *testing. Meta: nil, Strategy: strategy, }, mock.Anything).Return(txmgr.EthTx{}, nil).Once() - require.NoError(t, transmitter.CreateEthTransaction(testutils.Context(t), toAddress, payload)) - require.NoError(t, transmitter.CreateEthTransaction(testutils.Context(t), toAddress, payload)) + require.NoError(t, transmitter.CreateEthTransaction(testutils.Context(t), toAddress, payload, nil)) + require.NoError(t, transmitter.CreateEthTransaction(testutils.Context(t), toAddress, payload, nil)) } func Test_DefaultTransmitter_Forwarding_Enabled_CreateEthTransaction_Round_Robin_Error(t *testing.T) { @@ -140,7 +140,7 @@ func Test_DefaultTransmitter_Forwarding_Enabled_CreateEthTransaction_Round_Robin ethKeyStore, ) require.NoError(t, err) - require.Error(t, transmitter.CreateEthTransaction(testutils.Context(t), toAddress, payload)) + require.Error(t, transmitter.CreateEthTransaction(testutils.Context(t), toAddress, payload, nil)) } func Test_DefaultTransmitter_Forwarding_Enabled_CreateEthTransaction_No_Keystore_Error(t *testing.T) { diff --git a/core/services/relay/evm/contract_transmitter.go b/core/services/relay/evm/contract_transmitter.go index 8be6a674cc8..6c0924902e3 100644 --- a/core/services/relay/evm/contract_transmitter.go +++ b/core/services/relay/evm/contract_transmitter.go @@ -16,6 +16,7 @@ import ( ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2/types" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services" "github.com/smartcontractkit/chainlink/v2/core/utils" @@ -29,10 +30,16 @@ type ContractTransmitter interface { var _ ContractTransmitter = &contractTransmitter{} type Transmitter interface { - CreateEthTransaction(ctx context.Context, toAddress gethcommon.Address, payload []byte) error + CreateEthTransaction(ctx context.Context, toAddress gethcommon.Address, payload []byte, txMeta *txmgr.EthTxMeta) error FromAddress() gethcommon.Address } +type ReportToEthMetadata func([]byte) (*txmgr.EthTxMeta, error) + +func reportToEthTxMetaNoop([]byte) (*txmgr.EthTxMeta, error) { + return nil, nil +} + type contractTransmitter struct { contractAddress gethcommon.Address contractABI abi.ABI @@ -41,6 +48,7 @@ type contractTransmitter struct { contractReader contractReader lp logpoller.LogPoller lggr logger.Logger + reportToEthTxMeta ReportToEthMetadata } func transmitterFilterName(addr common.Address) string { @@ -54,6 +62,7 @@ func NewOCRContractTransmitter( transmitter Transmitter, lp logpoller.LogPoller, lggr logger.Logger, + reportToEthTxMeta ReportToEthMetadata, ) (*contractTransmitter, error) { transmitted, ok := contractABI.Events["Transmitted"] if !ok { @@ -64,6 +73,9 @@ func NewOCRContractTransmitter( if err != nil { return nil, err } + if reportToEthTxMeta == nil { + reportToEthTxMeta = reportToEthTxMetaNoop + } return &contractTransmitter{ contractAddress: address, contractABI: contractABI, @@ -72,6 +84,7 @@ func NewOCRContractTransmitter( lp: lp, contractReader: caller, lggr: lggr.Named("OCRContractTransmitter"), + reportToEthTxMeta: reportToEthTxMeta, }, nil } @@ -91,14 +104,19 @@ func (oc *contractTransmitter) Transmit(ctx context.Context, reportCtx ocrtypes. } rawReportCtx := evmutil.RawReportContext(reportCtx) - oc.lggr.Debugw("Transmitting report", "report", hex.EncodeToString(report), "rawReportCtx", rawReportCtx, "contractAddress", oc.contractAddress) + txMeta, err := oc.reportToEthTxMeta(report) + if err != nil { + oc.lggr.Warnw("failed to generate tx metadata for report", "err", err) + } + + oc.lggr.Debugw("Transmitting report", "report", hex.EncodeToString(report), "rawReportCtx", rawReportCtx, "contractAddress", oc.contractAddress, "txMeta", txMeta) payload, err := oc.contractABI.Pack("transmit", rawReportCtx, []byte(report), rs, ss, vs) if err != nil { return errors.Wrap(err, "abi.Pack failed") } - return errors.Wrap(oc.transmitter.CreateEthTransaction(ctx, oc.contractAddress, payload), "failed to send Eth transaction") + return errors.Wrap(oc.transmitter.CreateEthTransaction(ctx, oc.contractAddress, payload, txMeta), "failed to send Eth transaction") } type contractReader interface { diff --git a/core/services/relay/evm/contract_transmitter_test.go b/core/services/relay/evm/contract_transmitter_test.go index 9de5c93c7a0..1b4c12dddb8 100644 --- a/core/services/relay/evm/contract_transmitter_test.go +++ b/core/services/relay/evm/contract_transmitter_test.go @@ -17,6 +17,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" lpmocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller/mocks" evmmocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/mocks" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" ) @@ -25,7 +26,7 @@ var sampleAddress = testutils.NewAddress() type mockTransmitter struct{} -func (mockTransmitter) CreateEthTransaction(ctx context.Context, toAddress gethcommon.Address, payload []byte) error { +func (mockTransmitter) CreateEthTransaction(ctx context.Context, toAddress gethcommon.Address, payload []byte, _ *txmgr.EthTxMeta) error { return nil } func (mockTransmitter) FromAddress() gethcommon.Address { return sampleAddress } @@ -44,7 +45,9 @@ func TestContractTransmitter(t *testing.T) { c.On("CallContract", mock.Anything, mock.Anything, mock.Anything).Return(digestAndEpochDontScanLogs, nil).Once() contractABI, _ := abi.JSON(strings.NewReader(ocr2aggregator.OCR2AggregatorABI)) lp.On("RegisterFilter", mock.Anything).Return(nil) - ot, err := NewOCRContractTransmitter(gethcommon.Address{}, c, contractABI, mockTransmitter{}, lp, lggr) + ot, err := NewOCRContractTransmitter(gethcommon.Address{}, c, contractABI, mockTransmitter{}, lp, lggr, func(b []byte) (*txmgr.EthTxMeta, error) { + return &txmgr.EthTxMeta{}, nil + }) require.NoError(t, err) digest, epoch, err := ot.LatestConfigDigestAndEpoch(testutils.Context(t)) require.NoError(t, err) diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index 53db94427fd..dc8a811a0aa 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -360,6 +360,7 @@ func newContractTransmitter(lggr logger.Logger, rargs relaytypes.RelayArgs, tran transmitter, configWatcher.chain.LogPoller(), lggr, + nil, ) } @@ -407,6 +408,7 @@ func newPipelineContractTransmitter(lggr logger.Logger, rargs relaytypes.RelayAr ), configWatcher.chain.LogPoller(), lggr, + nil, ) }