diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store.go index b86752e745a..9b80289e478 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store.go @@ -15,6 +15,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/core" @@ -347,16 +348,18 @@ type txStatusStore struct { mu sync.RWMutex uuids *cache.Cache // uuid to uid uids *cache.Cache // uid to bool + txm txmgr.TxManager threadCtrl utils.ThreadControl } // NewTxStatusStore creates a new tx status store -func NewTxStatusStore(lggr logger.Logger) *txStatusStore { +func NewTxStatusStore(lggr logger.Logger, txm txmgr.TxManager) *txStatusStore { return &txStatusStore{ lggr: lggr, uuids: cache.New(24 * time.Hour, 15 * time.Minute), uids: cache.New(24 * time.Hour, 15 * time.Minute), + txm: txm, threadCtrl: utils.NewThreadControl(), } } @@ -398,7 +401,7 @@ func (t *txStatusStore) Start(_ context.Context) error { for k, v := range t.uuids.Items() { t.lggr.Infof("querying tx status with UUID %s for upkeep ID %s", k, v.Object.(*big.Int)) // query TXM and update the data structure - //status := txm.queryTxStatus(k) + //status := t.txm.queryTxStatus(k) //if status == terminally_error { // err := t.uids.Add(v.Object.(*big.Int).String(), true, cache.DefaultExpiration) // if err != nil { diff --git a/core/services/ocrcommon/transmitter.go b/core/services/ocrcommon/transmitter.go index fd06279eb87..c0993b2dffa 100644 --- a/core/services/ocrcommon/transmitter.go +++ b/core/services/ocrcommon/transmitter.go @@ -93,7 +93,12 @@ func (t *transmitter) CreateEthTransaction(ctx context.Context, toAddress common return errors.Wrap(err, "skipped OCR transmission, error getting round-robin address") } + var key *string + if txMeta.UpkeepID != nil { + key = txMeta.UpkeepID + } _, err = t.txm.CreateTransaction(ctx, txmgr.TxRequest{ + IdempotencyKey: key, FromAddress: roundRobinFromAddress, ToAddress: toAddress, EncodedPayload: payload, diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index 5a0ccffaf71..c4572883b11 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -201,7 +201,7 @@ func (r *Relayer) NewPluginProvider(rargs commontypes.RelayArgs, pargs commontyp return nil, err } - transmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, pargs.TransmitterID, r.ks.Eth(), configWatcher, configTransmitterOpts{}, OCR2AggregatorTransmissionContractABI, 0) + transmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, pargs.TransmitterID, r.ks.Eth(), configWatcher, configTransmitterOpts{}, OCR2AggregatorTransmissionContractABI, 0, nil) if err != nil { return nil, err } @@ -517,7 +517,7 @@ type configTransmitterOpts struct { } // newOnChainContractTransmitter creates a new contract transmitter. -func newOnChainContractTransmitter(ctx context.Context, lggr logger.Logger, rargs commontypes.RelayArgs, transmitterID string, ethKeystore keystore.Eth, configWatcher *configWatcher, opts configTransmitterOpts, transmissionContractABI abi.ABI, transmissionContractRetention time.Duration) (*contractTransmitter, error) { +func newOnChainContractTransmitter(ctx context.Context, lggr logger.Logger, rargs commontypes.RelayArgs, transmitterID string, ethKeystore keystore.Eth, configWatcher *configWatcher, opts configTransmitterOpts, transmissionContractABI abi.ABI, transmissionContractRetention time.Duration, reportToEvmTxMeta ReportToEthMetadata) (*contractTransmitter, error) { var relayConfig types.RelayConfig if err := json.Unmarshal(rargs.RelayConfig, &relayConfig); err != nil { return nil, err @@ -589,7 +589,7 @@ func newOnChainContractTransmitter(ctx context.Context, lggr logger.Logger, rarg transmitter, configWatcher.chain.LogPoller(), lggr, - nil, + reportToEvmTxMeta, transmissionContractRetention, ) } @@ -620,7 +620,7 @@ func (r *Relayer) NewMedianProvider(rargs commontypes.RelayArgs, pargs commontyp reportCodec := evmreportcodec.ReportCodec{} - contractTransmitter, err := newOnChainContractTransmitter(ctx, lggr, rargs, pargs.TransmitterID, r.ks.Eth(), configWatcher, configTransmitterOpts{}, OCR2AggregatorTransmissionContractABI, 0) + contractTransmitter, err := newOnChainContractTransmitter(ctx, lggr, rargs, pargs.TransmitterID, r.ks.Eth(), configWatcher, configTransmitterOpts{}, OCR2AggregatorTransmissionContractABI, 0, nil) if err != nil { return nil, err } diff --git a/core/services/relay/evm/ocr2keeper.go b/core/services/relay/evm/ocr2keeper.go index 9a3a9dab5e4..4303fb00eb4 100644 --- a/core/services/relay/evm/ocr2keeper.go +++ b/core/services/relay/evm/ocr2keeper.go @@ -18,6 +18,7 @@ import ( commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink-common/pkg/types/automation" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" ac "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/i_automation_v21_plus_common" @@ -35,6 +36,7 @@ var ( _ OCR2KeeperRelayer = (*ocr2keeperRelayer)(nil) _ OCR2KeeperProvider = (*ocr2keeperProvider)(nil) ErrInitializationFailure = fmt.Errorf("failed to initialize registry") + packer = encoding.NewAbiPacker() ) // OCR2KeeperProviderOpts is the custom options to create a keeper provider @@ -82,6 +84,18 @@ func NewOCR2KeeperRelayer(ds sqlutil.DataSource, chain legacyevm.Chain, lggr log } } +func reportToUpkeepID(report []byte) (*txmgr.TxMeta, error) { + r, err := packer.UnpackReport(report) + if err != nil { + return nil, err + } + + uid := r.UpkeepIds[0].String() + return &txmgr.TxMeta{ + UpkeepID: &uid, + }, nil +} + func (r *ocr2keeperRelayer) NewOCR2KeeperProvider(rargs commontypes.RelayArgs, pargs commontypes.PluginArgs) (OCR2KeeperProvider, error) { // TODO https://smartcontract-it.atlassian.net/browse/BCF-2887 ctx := context.Background() @@ -92,7 +106,7 @@ func (r *ocr2keeperRelayer) NewOCR2KeeperProvider(rargs commontypes.RelayArgs, p } gasLimit := cfgWatcher.chain.Config().EVM().OCR2().Automation().GasLimit() - contractTransmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, cfgWatcher, configTransmitterOpts{pluginGasLimit: &gasLimit}, OCR2AggregatorTransmissionContractABI, 0) + contractTransmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, cfgWatcher, configTransmitterOpts{pluginGasLimit: &gasLimit}, OCR2AggregatorTransmissionContractABI, 0, reportToUpkeepID) if err != nil { return nil, err } @@ -119,7 +133,6 @@ func (r *ocr2keeperRelayer) NewOCR2KeeperProvider(rargs commontypes.RelayArgs, p services.transmitEventProvider = transmitEventProvider - packer := encoding.NewAbiPacker() services.encoder = encoding.NewReportEncoder(packer) finalityDepth := client.Config().EVM().FinalityDepth() @@ -137,7 +150,7 @@ func (r *ocr2keeperRelayer) NewOCR2KeeperProvider(rargs commontypes.RelayArgs, p al := evm.NewActiveUpkeepList() services.payloadBuilder = evm.NewPayloadBuilder(al, logRecoverer, r.lggr) - services.txStatusStore = upkeepstate.NewTxStatusStore(r.lggr) + services.txStatusStore = upkeepstate.NewTxStatusStore(r.lggr, client.TxManager()) services.registry = evm.NewEvmRegistry(r.lggr, addr, client, registryContract, rargs.MercuryCredentials, al, logProvider, packer, blockSubscriber, finalityDepth, services.txStatusStore) @@ -155,7 +168,7 @@ type ocr3keeperProviderContractTransmitter struct { var _ ocr3types.ContractTransmitter[plugin.AutomationReportInfo] = &ocr3keeperProviderContractTransmitter{} -func NewKeepersOCR3ContractTransmitter(ocr2ContractTransmitter ocrtypes.ContractTransmitter, txStatusStore automation.TxStatusStore, lggr logger.SugaredLogger,) *ocr3keeperProviderContractTransmitter { +func NewKeepersOCR3ContractTransmitter(ocr2ContractTransmitter ocrtypes.ContractTransmitter, txStatusStore automation.TxStatusStore, lggr logger.SugaredLogger) *ocr3keeperProviderContractTransmitter { return &ocr3keeperProviderContractTransmitter{contractTransmitter: ocr2ContractTransmitter, txStatusStore: txStatusStore, lggr: lggr.Named("ocr3keeperProviderContractTransmitter")} } diff --git a/core/services/relay/evm/ocr2vrf.go b/core/services/relay/evm/ocr2vrf.go index 3f9fb11bfc9..36594d010d6 100644 --- a/core/services/relay/evm/ocr2vrf.go +++ b/core/services/relay/evm/ocr2vrf.go @@ -63,7 +63,7 @@ func (r *ocr2vrfRelayer) NewDKGProvider(rargs commontypes.RelayArgs, pargs commo if err != nil { return nil, err } - contractTransmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, configWatcher, configTransmitterOpts{}, OCR2AggregatorTransmissionContractABI, 0) + contractTransmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, configWatcher, configTransmitterOpts{}, OCR2AggregatorTransmissionContractABI, 0, nil) if err != nil { return nil, err } @@ -89,7 +89,7 @@ func (r *ocr2vrfRelayer) NewOCR2VRFProvider(rargs commontypes.RelayArgs, pargs c if err != nil { return nil, err } - contractTransmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, configWatcher, configTransmitterOpts{}, OCR2AggregatorTransmissionContractABI, 0) + contractTransmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, configWatcher, configTransmitterOpts{}, OCR2AggregatorTransmissionContractABI, 0, nil) if err != nil { return nil, err }