Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
FelixFan1992 committed May 28, 2024
1 parent c7c8377 commit a0915fa
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/services"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
commontxmgr "github.com/smartcontractkit/chainlink/v2/common/txmgr"
ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/core"
Expand Down Expand Up @@ -401,16 +402,21 @@ func (t *txStatusStore) Start(_ context.Context) error {
for k, v := range t.uuids.Items() {
t.lggr.Infof("querying tx status with UUID %s for upkeep ID %s", k, v.Object.(*big.Int))
// query TXM and update the data structure
//status := t.txm.queryTxStatus(k)
//if status == terminally_error {
// err := t.uids.Add(v.Object.(*big.Int).String(), true, cache.DefaultExpiration)
// if err != nil {
// t.lggr.Errorf("failed to add")
// continue
// }
//} else if status != pending {
// t.uuids.Delete(k)
//}
status, err := t.txm.GetTransactionStatus(ctx, uuid.MustParse(k))
if err != nil {
t.lggr.Errorf("failed to query TXM for tx status due to %s", err.Error())
} else {
if status == commontxmgr.Fatal {
uid := v.Object.(*big.Int).String()
err = t.uids.Add(uid, true, cache.DefaultExpiration)
if err != nil {
t.lggr.Errorf("failed to add upkeep ID %s due to %s", uid, err.Error())
continue
}
} else if status != commontxmgr.Unconfirmed {
t.uuids.Delete(k)
}
}
}
case <-ctx.Done():
return
Expand Down
48 changes: 43 additions & 5 deletions core/services/ocrcommon/transmitter.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package ocrcommon

import (
"bytes"
"context"
"math/big"
"slices"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/google/uuid"
"github.com/pkg/errors"

"github.com/smartcontractkit/chainlink/v2/common/config"
"github.com/smartcontractkit/chainlink/v2/core/logger"

//"github.com/smartcontractkit/chainlink/v2/common/client"
"github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
//txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
Expand Down Expand Up @@ -211,8 +217,13 @@ func (t *ocr2FeedsTransmitter) forwarderAddress(ctx context.Context, eoa, ocr2Ag
return forwarderAddress, nil
}

type saveIdempotencyKey func(uuid uuid.UUID, uid *big.Int) error

type ocr3AutomationTransmitter struct {
transmitter
ChainType config.ChainType
lggr logger.Logger
saveIdempotencyKey
}

// NewOCR3AutomationTransmitter creates a new eth transmitter
Expand All @@ -225,6 +236,9 @@ func NewOCR3AutomationTransmitter(
checker txmgr.TransmitCheckerSpec,
chainID *big.Int,
keystore roundRobinKeystore,
chainType config.ChainType,
saveIdempotencyKey func(uuid uuid.UUID, uid *big.Int) error,
lggr logger.Logger,
) (Transmitter, error) {
// Ensure that a keystore is provided.
if keystore == nil {
Expand All @@ -242,6 +256,9 @@ func NewOCR3AutomationTransmitter(
chainID: chainID,
keystore: keystore,
},
ChainType: chainType,
saveIdempotencyKey: saveIdempotencyKey,
lggr: lggr,
}, nil
}

Expand All @@ -251,14 +268,35 @@ func (t *ocr3AutomationTransmitter) CreateEthTransaction(ctx context.Context, to
return errors.Wrap(err, "skipped OCR transmission, error getting round-robin address")
}

var key *string
if txMeta != nil && txMeta.UpkeepID != nil {
key = txMeta.UpkeepID
var id uuid.UUID
var key string
var keyPtr *string
if t.ChainType == config.ChainScroll || t.ChainType == config.ChainZkEvm || t.ChainType == config.ChainZkSync {
if txMeta != nil && txMeta.UpkeepID != nil {
id, err = uuid.NewRandomFromReader(bytes.NewReader([]byte(*txMeta.UpkeepID + time.Now().String())))
if err != nil {
t.lggr.Errorf("failed to create UUID from %s", *(txMeta.UpkeepID))
} else {
uid, ok := new(big.Int).SetString(*(txMeta.UpkeepID), 10)
if !ok {
t.lggr.Errorf("failed to convert upkeep ID %s to big int", *(txMeta.UpkeepID))
} else {
err = t.saveIdempotencyKey(id, uid)
if err != nil {
t.lggr.Errorf("failed to save idempotency key %s due to %s", id.String(), err.Error())
} else {
key = id.String()
keyPtr = &key
}
}
}
} else {
t.lggr.Errorf("failed to retrieve upkeep ID from tx meta")
}
}


_, err = t.txm.CreateTransaction(ctx, txmgr.TxRequest{
IdempotencyKey: key,
IdempotencyKey: keyPtr,
FromAddress: roundRobinFromAddress,
ToAddress: toAddress,
EncodedPayload: payload,
Expand Down
102 changes: 88 additions & 14 deletions core/services/relay/evm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math/big"
"strings"
"sync"
"time"
Expand All @@ -27,6 +28,7 @@ import (
commontypes "github.com/smartcontractkit/chainlink-common/pkg/types"
coretypes "github.com/smartcontractkit/chainlink-common/pkg/types/core"

"github.com/smartcontractkit/chainlink/v2/common/config"
txmgrcommon "github.com/smartcontractkit/chainlink/v2/common/txmgr"
txm "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
Expand Down Expand Up @@ -201,7 +203,7 @@ func (r *Relayer) NewPluginProvider(rargs commontypes.RelayArgs, pargs commontyp
return nil, err
}

transmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, pargs.TransmitterID, r.ks.Eth(), configWatcher, configTransmitterOpts{}, OCR2AggregatorTransmissionContractABI, 0, nil)
transmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, pargs.TransmitterID, r.ks.Eth(), configWatcher, configTransmitterOpts{}, OCR2AggregatorTransmissionContractABI, 0)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -517,7 +519,7 @@ type configTransmitterOpts struct {
}

// newOnChainContractTransmitter creates a new contract transmitter.
func newOnChainContractTransmitter(ctx context.Context, lggr logger.Logger, rargs commontypes.RelayArgs, transmitterID string, ethKeystore keystore.Eth, configWatcher *configWatcher, opts configTransmitterOpts, transmissionContractABI abi.ABI, transmissionContractRetention time.Duration, reportToEvmTxMeta ReportToEthMetadata) (*contractTransmitter, error) {
func newOnChainContractTransmitter(ctx context.Context, lggr logger.Logger, rargs commontypes.RelayArgs, transmitterID string, ethKeystore keystore.Eth, configWatcher *configWatcher, opts configTransmitterOpts, transmissionContractABI abi.ABI, transmissionContractRetention time.Duration) (*contractTransmitter, error) {
var relayConfig types.RelayConfig
if err := json.Unmarshal(rargs.RelayConfig, &relayConfig); err != nil {
return nil, err
Expand Down Expand Up @@ -582,17 +584,6 @@ func newOnChainContractTransmitter(ctx context.Context, lggr logger.Logger, rarg
configWatcher.chain.ID(),
ethKeystore,
)
case commontypes.OCR2Keeper:
transmitter, err = ocrcommon.NewOCR3AutomationTransmitter(
configWatcher.chain.TxManager(),
fromAddresses,
gasLimit,
effectiveTransmitterAddress,
strategy,
checker,
configWatcher.chain.ID(),
ethKeystore,
)
default:
transmitter, err = ocrcommon.NewTransmitter(
configWatcher.chain.TxManager(),
Expand All @@ -609,6 +600,89 @@ func newOnChainContractTransmitter(ctx context.Context, lggr logger.Logger, rarg
return nil, pkgerrors.Wrap(err, "failed to create transmitter")
}

return NewOCRContractTransmitterWithRetention(
ctx,
configWatcher.contractAddress,
configWatcher.chain.Client(),
transmissionContractABI,
transmitter,
configWatcher.chain.LogPoller(),
lggr,
nil,
transmissionContractRetention,
)
}

// newOnChainAutomationContractTransmitter creates a new contract transmitter.
func newOnChainAutomationContractTransmitter(ctx context.Context, lggr logger.Logger, rargs commontypes.RelayArgs, ethKeystore keystore.Eth, configWatcher *configWatcher, opts configTransmitterOpts, transmissionContractABI abi.ABI, transmissionContractRetention time.Duration, reportToEvmTxMeta ReportToEthMetadata, chainType config.ChainType, saveIdempotencyKey func(uuid uuid.UUID, uid *big.Int) error) (*contractTransmitter, error) {
var relayConfig types.RelayConfig
if err := json.Unmarshal(rargs.RelayConfig, &relayConfig); err != nil {
return nil, err
}
var fromAddresses []common.Address
sendingKeys := relayConfig.SendingKeys
if !relayConfig.EffectiveTransmitterID.Valid {
return nil, pkgerrors.New("EffectiveTransmitterID must be specified")
}
effectiveTransmitterAddress := common.HexToAddress(relayConfig.EffectiveTransmitterID.String)

sendingKeysLength := len(sendingKeys)
if sendingKeysLength == 0 {
return nil, pkgerrors.New("no sending keys provided")
}

// If we are using multiple sending keys, then a forwarder is needed to rotate transmissions.
// Ensure that this forwarder is not set to a local sending key, and ensure our sending keys are enabled.
for _, s := range sendingKeys {
if sendingKeysLength > 1 && s == effectiveTransmitterAddress.String() {
return nil, pkgerrors.New("the transmitter is a local sending key with transaction forwarding enabled")
}
if err := ethKeystore.CheckEnabled(ctx, common.HexToAddress(s), configWatcher.chain.Config().EVM().ChainID()); err != nil {
return nil, pkgerrors.Wrap(err, "one of the sending keys given is not enabled")
}
fromAddresses = append(fromAddresses, common.HexToAddress(s))
}

subject := rargs.ExternalJobID
if opts.subjectID != nil {
subject = *opts.subjectID
}
strategy := txmgrcommon.NewQueueingTxStrategy(subject, relayConfig.DefaultTransactionQueueDepth)

var checker txm.TransmitCheckerSpec
if relayConfig.SimulateTransactions {
checker.CheckerType = txm.TransmitCheckerTypeSimulate
}

gasLimit := configWatcher.chain.Config().EVM().GasEstimator().LimitDefault()
ocr2Limit := configWatcher.chain.Config().EVM().GasEstimator().LimitJobType().OCR2()
if ocr2Limit != nil {
gasLimit = uint64(*ocr2Limit)
}
if opts.pluginGasLimit != nil {
gasLimit = uint64(*opts.pluginGasLimit)
}

var transmitter Transmitter
var err error

transmitter, err = ocrcommon.NewOCR3AutomationTransmitter(
configWatcher.chain.TxManager(),
fromAddresses,
gasLimit,
effectiveTransmitterAddress,
strategy,
checker,
configWatcher.chain.ID(),
ethKeystore,
chainType,
saveIdempotencyKey,
lggr,
)
if err != nil {
return nil, pkgerrors.Wrap(err, "failed to create transmitter")
}

return NewOCRContractTransmitterWithRetention(
ctx,
configWatcher.contractAddress,
Expand Down Expand Up @@ -658,7 +732,7 @@ func (r *Relayer) NewMedianProvider(rargs commontypes.RelayArgs, pargs commontyp

reportCodec := evmreportcodec.ReportCodec{}

contractTransmitter, err := newOnChainContractTransmitter(ctx, lggr, rargs, pargs.TransmitterID, r.ks.Eth(), configWatcher, configTransmitterOpts{}, OCR2AggregatorTransmissionContractABI, 0, nil)
contractTransmitter, err := newOnChainContractTransmitter(ctx, lggr, rargs, pargs.TransmitterID, r.ks.Eth(), configWatcher, configTransmitterOpts{}, OCR2AggregatorTransmissionContractABI, 0)
if err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions core/services/relay/evm/ocr2keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,16 @@ func (r *ocr2keeperRelayer) NewOCR2KeeperProvider(rargs commontypes.RelayArgs, p
return nil, err
}

client := r.chain
services := new(ocr2keeperProvider)
services.txStatusStore = upkeepstate.NewTxStatusStore(r.lggr, client.TxManager())
gasLimit := cfgWatcher.chain.Config().EVM().OCR2().Automation().GasLimit()
contractTransmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, cfgWatcher, configTransmitterOpts{pluginGasLimit: &gasLimit}, OCR2AggregatorTransmissionContractABI, 0, reportToUpkeepID)
ct := cfgWatcher.chain.Config().EVM().ChainType()
contractTransmitter, err := newOnChainAutomationContractTransmitter(ctx, r.lggr, rargs, r.ethKeystore, cfgWatcher, configTransmitterOpts{pluginGasLimit: &gasLimit}, OCR2AggregatorTransmissionContractABI, 0, reportToUpkeepID, ct, services.txStatusStore.SaveTxInfo)
if err != nil {
return nil, err
}

client := r.chain

services := new(ocr2keeperProvider)
services.configWatcher = cfgWatcher
services.contractTransmitter = contractTransmitter

Expand Down Expand Up @@ -150,7 +151,6 @@ func (r *ocr2keeperRelayer) NewOCR2KeeperProvider(rargs commontypes.RelayArgs, p
al := evm.NewActiveUpkeepList()
services.payloadBuilder = evm.NewPayloadBuilder(al, logRecoverer, r.lggr)

services.txStatusStore = upkeepstate.NewTxStatusStore(r.lggr, client.TxManager())
services.registry = evm.NewEvmRegistry(r.lggr, addr, client,
registryContract, rargs.MercuryCredentials, al, logProvider,
packer, blockSubscriber, finalityDepth, services.txStatusStore)
Expand Down
4 changes: 2 additions & 2 deletions core/services/relay/evm/ocr2vrf.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (r *ocr2vrfRelayer) NewDKGProvider(rargs commontypes.RelayArgs, pargs commo
if err != nil {
return nil, err
}
contractTransmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, configWatcher, configTransmitterOpts{}, OCR2AggregatorTransmissionContractABI, 0, nil)
contractTransmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, configWatcher, configTransmitterOpts{}, OCR2AggregatorTransmissionContractABI, 0)
if err != nil {
return nil, err
}
Expand All @@ -89,7 +89,7 @@ func (r *ocr2vrfRelayer) NewOCR2VRFProvider(rargs commontypes.RelayArgs, pargs c
if err != nil {
return nil, err
}
contractTransmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, configWatcher, configTransmitterOpts{}, OCR2AggregatorTransmissionContractABI, 0, nil)
contractTransmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, configWatcher, configTransmitterOpts{}, OCR2AggregatorTransmissionContractABI, 0)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit a0915fa

Please sign in to comment.