Skip to content

Commit

Permalink
Add dual contract transmitter (#15202)
Browse files Browse the repository at this point in the history
* -Remove adaptive send
-Add dual transmitter relay config
-Add dual transmitter as contract transmitter

* -Fix lint

* Add tx.Meta

* Revert ocr2 helpers

* Fix lint

* Fix lint

* Change tx.meta to pointers
Send secondary tx even if primary fails

* Add meta fields to test

* Remove unused code
Change DualTransmissionConfig to pointer

* Implement feedback

* Fix failing test
  • Loading branch information
george-dorin authored Nov 15, 2024
1 parent 5f6d46d commit a4d3c22
Show file tree
Hide file tree
Showing 11 changed files with 244 additions and 136 deletions.
4 changes: 4 additions & 0 deletions common/txmgr/types/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ type TxMeta[ADDR types.Hashable, TX_HASH types.Hashable] struct {
MessageIDs []string `json:"MessageIDs,omitempty"`
// SeqNumbers is used by CCIP for tx to committed sequence numbers correlation in logs
SeqNumbers []uint64 `json:"SeqNumbers,omitempty"`

// Dual Broadcast
DualBroadcast *bool `json:"DualBroadcast,omitempty"`
DualBroadcastParams *string `json:"DualBroadcastParams,omitempty"`
}

type TxAttempt[
Expand Down
63 changes: 51 additions & 12 deletions core/services/job/job_orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2014,7 +2014,7 @@ func mustInsertPipelineRun(t *testing.T, orm pipeline.ORM, j job.Job) pipeline.R
return run
}

func TestORM_CreateJob_OCR2_With_AdaptiveSend(t *testing.T) {
func TestORM_CreateJob_OCR2_With_DualTransmission(t *testing.T) {
ctx := testutils.Context(t)
customChainID := big.New(testutils.NewRandomEVMChainID())

Expand All @@ -2030,27 +2030,66 @@ func TestORM_CreateJob_OCR2_With_AdaptiveSend(t *testing.T) {
db := pgtest.NewSqlxDB(t)
keyStore := cltest.NewKeyStore(t, db)
require.NoError(t, keyStore.OCR2().Add(ctx, cltest.DefaultOCR2Key))

_, transmitterID := cltest.MustInsertRandomKey(t, keyStore.Eth())

baseJobSpec := fmt.Sprintf(testspecs.OCR2EVMDualTransmissionSpecMinimalTemplate, transmitterID.String())

lggr := logger.TestLogger(t)
pipelineORM := pipeline.NewORM(db, lggr, config.JobPipeline().MaxSuccessfulRuns())
bridgesORM := bridges.NewORM(db)

jobORM := NewTestORM(t, db, pipelineORM, bridgesORM, keyStore)

adaptiveSendKey := cltest.MustGenerateRandomKey(t)
// Enabled but no config set
enabledDualTransmissionSpec := `
enableDualTransmission=true`

jb, err := ocr2validate.ValidatedOracleSpecToml(testutils.Context(t), config.OCR2(), config.Insecure(), testspecs.GetOCR2EVMWithAdaptiveSendSpecMinimal(cltest.DefaultOCR2Key.ID(), transmitterID.String(), adaptiveSendKey.EIP55Address.String()), nil)
jb, err := ocr2validate.ValidatedOracleSpecToml(testutils.Context(t), config.OCR2(), config.Insecure(), baseJobSpec+enabledDualTransmissionSpec, nil)
require.NoError(t, err)
require.ErrorContains(t, jobORM.CreateJob(ctx, &jb), "dual transmission is enabled but no dual transmission config present")

// ContractAddress not set
emptyContractAddress := `
enableDualTransmission=true
[relayConfig.dualTransmission]
contractAddress=""
`
jb, err = ocr2validate.ValidatedOracleSpecToml(testutils.Context(t), config.OCR2(), config.Insecure(), baseJobSpec+emptyContractAddress, nil)
require.NoError(t, err)
require.ErrorContains(t, jobORM.CreateJob(ctx, &jb), "invalid contract address in dual transmission config")

// Transmitter address not set
emptyTransmitterAddress := `
enableDualTransmission=true
[relayConfig.dualTransmission]
contractAddress = '0x613a38AC1659769640aaE063C651F48E0250454C'
transmitterAddress = ''
`
jb, err = ocr2validate.ValidatedOracleSpecToml(testutils.Context(t), config.OCR2(), config.Insecure(), baseJobSpec+emptyTransmitterAddress, nil)
require.NoError(t, err)
require.ErrorContains(t, jobORM.CreateJob(ctx, &jb), "invalid transmitter address in dual transmission config")

dtTransmitterAddress := cltest.MustGenerateRandomKey(t)
completeDualTransmissionSpec := fmt.Sprintf(`
enableDualTransmission=true
[relayConfig.dualTransmission]
contractAddress = '0x613a38AC1659769640aaE063C651F48E0250454C'
transmitterAddress = '%s'
[relayConfig.dualTransmission.meta]
key1 = 'val1'
key2 = ['val2','val3']
`,
dtTransmitterAddress.Address.String())

jb, err = ocr2validate.ValidatedOracleSpecToml(testutils.Context(t), config.OCR2(), config.Insecure(), baseJobSpec+completeDualTransmissionSpec, nil)
require.NoError(t, err)
require.Equal(t, "arbitrary-value", jb.AdaptiveSendSpec.Metadata["arbitraryParam"])

t.Run("unknown transmitter address", func(t *testing.T) {
require.ErrorContains(t, jobORM.CreateJob(ctx, &jb), "failed to validate AdaptiveSendSpec.TransmitterAddress: no EVM key matching")
})
jb.OCR2OracleSpec.TransmitterID = null.StringFrom(transmitterID.String())

t.Run("multiple jobs", func(t *testing.T) {
keyStore.Eth().XXXTestingOnlyAdd(ctx, adaptiveSendKey)
require.NoError(t, jobORM.CreateJob(ctx, &jb), "failed to validate AdaptiveSendSpec.TransmitterAddress: no EVM key matching")
})
// Unknown transmitter address
require.ErrorContains(t, jobORM.CreateJob(ctx, &jb), "unknown dual transmission transmitterAddress: no EVM key matching:")

// Should not error
keyStore.Eth().XXXTestingOnlyAdd(ctx, dtTransmitterAddress)
require.NoError(t, jobORM.CreateJob(ctx, &jb))
}
24 changes: 0 additions & 24 deletions core/services/job/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ type Job struct {
CCIPSpecID *int32
CCIPSpec *CCIPSpec
CCIPBootstrapSpecID *int32
AdaptiveSendSpec *AdaptiveSendSpec `toml:"adaptiveSend"`
JobSpecErrors []SpecError
Type Type `toml:"type"`
SchemaVersion uint32 `toml:"schemaVersion"`
Expand Down Expand Up @@ -1061,26 +1060,3 @@ type CCIPSpec struct {
// and RMN network info for offchain blessing.
PluginConfig JSONConfig `toml:"pluginConfig"`
}

type AdaptiveSendSpec struct {
TransmitterAddress *evmtypes.EIP55Address `toml:"transmitterAddress"`
ContractAddress *evmtypes.EIP55Address `toml:"contractAddress"`
Delay time.Duration `toml:"delay"`
Metadata JSONConfig `toml:"metadata"`
}

func (o *AdaptiveSendSpec) Validate() error {
if o.TransmitterAddress == nil {
return errors.New("no AdaptiveSendSpec.TransmitterAddress found")
}

if o.ContractAddress == nil {
return errors.New("no AdaptiveSendSpec.ContractAddress found")
}

if o.Delay.Seconds() <= 1 {
return errors.New("AdaptiveSendSpec.Delay not set or smaller than 1s")
}

return nil
}
61 changes: 0 additions & 61 deletions core/services/job/models_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/codec"
"github.com/smartcontractkit/chainlink-common/pkg/types"
pkgworkflows "github.com/smartcontractkit/chainlink-common/pkg/workflows"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"

"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/relay"
Expand Down Expand Up @@ -352,62 +350,3 @@ func TestWorkflowSpec_Validate(t *testing.T) {
require.NotEmpty(t, w.WorkflowID)
})
}

func TestAdaptiveSendConfig(t *testing.T) {
tests := []struct {
name string
shouldError bool
expectedErrorMessage string
config job.AdaptiveSendSpec
}{
{
name: "AdaptiveSendSpec.TransmitterAddress not set",
shouldError: true,
expectedErrorMessage: "no AdaptiveSendSpec.TransmitterAddress found",
config: job.AdaptiveSendSpec{
TransmitterAddress: nil,
ContractAddress: ptr(cltest.NewEIP55Address()),
Delay: time.Second * 30,
},
},
{
name: "AdaptiveSendSpec.ContractAddress not set",
shouldError: true,
expectedErrorMessage: "no AdaptiveSendSpec.ContractAddress found",
config: job.AdaptiveSendSpec{
TransmitterAddress: ptr(cltest.NewEIP55Address()),
ContractAddress: nil,
Delay: time.Second * 30,
},
},
{
name: "AdaptiveSendSpec.Delay not set",
shouldError: true,
expectedErrorMessage: "AdaptiveSendSpec.Delay not set or smaller than 1s",
config: job.AdaptiveSendSpec{
TransmitterAddress: ptr(cltest.NewEIP55Address()),
ContractAddress: ptr(cltest.NewEIP55Address()),
},
},
{
name: "AdaptiveSendSpec.Delay set to 50ms",
shouldError: true,
expectedErrorMessage: "AdaptiveSendSpec.Delay not set or smaller than 1s",
config: job.AdaptiveSendSpec{
TransmitterAddress: ptr(cltest.NewEIP55Address()),
ContractAddress: ptr(cltest.NewEIP55Address()),
Delay: time.Millisecond * 50,
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if test.shouldError {
require.ErrorContains(t, test.config.Validate(), test.expectedErrorMessage)
} else {
require.NoError(t, test.config.Validate())
}
})
}
}
28 changes: 23 additions & 5 deletions core/services/job/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink-common/pkg/types"

"github.com/smartcontractkit/chainlink/v2/core/bridges"
evmconfig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/config"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
Expand Down Expand Up @@ -304,10 +303,29 @@ func (o *orm) CreateJob(ctx context.Context, jb *Job) error {
}
}

if jb.AdaptiveSendSpec != nil {
err = validateKeyStoreMatchForRelay(ctx, jb.OCR2OracleSpec.Relay, tx.keyStore, jb.AdaptiveSendSpec.TransmitterAddress.String())
if err != nil {
return fmt.Errorf("failed to validate AdaptiveSendSpec.TransmitterAddress: %w", err)
if enableDualTransmission, ok := jb.OCR2OracleSpec.RelayConfig["enableDualTransmission"]; ok && enableDualTransmission != nil {
rawDualTransmissionConfig, ok := jb.OCR2OracleSpec.RelayConfig["dualTransmission"]
if !ok {
return errors.New("dual transmission is enabled but no dual transmission config present")
}

dualTransmissionConfig, ok := rawDualTransmissionConfig.(map[string]interface{})
if !ok {
return errors.New("invalid dual transmission config")
}

dtContractAddress, ok := dualTransmissionConfig["contractAddress"].(string)
if !ok || !common.IsHexAddress(dtContractAddress) {
return errors.New("invalid contract address in dual transmission config")
}

dtTransmitterAddress, ok := dualTransmissionConfig["transmitterAddress"].(string)
if !ok || !common.IsHexAddress(dtTransmitterAddress) {
return errors.New("invalid transmitter address in dual transmission config")
}

if err = validateKeyStoreMatchForRelay(ctx, jb.OCR2OracleSpec.Relay, tx.keyStore, dtTransmitterAddress); err != nil {
return errors.Wrap(err, "unknown dual transmission transmitterAddress")
}
}

Expand Down
10 changes: 0 additions & 10 deletions core/services/ocr2/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ func ValidatedOracleSpecToml(ctx context.Context, config OCR2Config, insConf Ins
if err = validateTimingParameters(config, insConf, spec); err != nil {
return jb, err
}
if err = validateAdaptiveSendSpec(ctx, jb); err != nil {
return jb, err
}
return jb, nil
}

Expand Down Expand Up @@ -380,10 +377,3 @@ func validateOCR2LLOSpec(jsonConfig job.JSONConfig) error {
}
return pkgerrors.Wrap(pluginConfig.Validate(), "LLO PluginConfig is invalid")
}

func validateAdaptiveSendSpec(ctx context.Context, spec job.Job) error {
if spec.AdaptiveSendSpec != nil {
return spec.AdaptiveSendSpec.Validate()
}
return nil
}
73 changes: 71 additions & 2 deletions core/services/ocrcommon/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package ocrcommon

import (
"context"
errors2 "errors"
"fmt"
"math/big"
"net/url"
"slices"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -11,6 +14,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/forwarders"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
types2 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types"
)

type roundRobinKeystore interface {
Expand Down Expand Up @@ -88,13 +92,14 @@ func NewOCR2FeedsTransmitter(
checker txmgr.TransmitCheckerSpec,
chainID *big.Int,
keystore roundRobinKeystore,
dualTransmissionConfig *types2.DualTransmissionConfig,
) (Transmitter, error) {
// Ensure that a keystore is provided.
if keystore == nil {
return nil, errors.New("nil keystore provided to transmitter")
}

return &ocr2FeedsTransmitter{
baseTransmitter := &ocr2FeedsTransmitter{
ocr2Aggregator: ocr2Aggregator,
txManagerOCR2: txm,
transmitter: transmitter{
Expand All @@ -107,7 +112,17 @@ func NewOCR2FeedsTransmitter(
chainID: chainID,
keystore: keystore,
},
}, nil
}

if dualTransmissionConfig != nil {
return &ocr2FeedsDualTransmission{
transmitter: *baseTransmitter,
secondaryContractAddress: dualTransmissionConfig.ContractAddress,
secondaryFromAddress: dualTransmissionConfig.TransmitterAddress,
secondaryMeta: dualTransmissionConfig.Meta,
}, nil
}
return baseTransmitter, nil
}

func (t *transmitter) CreateEthTransaction(ctx context.Context, toAddress common.Address, payload []byte, txMeta *txmgr.TxMeta) error {
Expand Down Expand Up @@ -203,3 +218,57 @@ func (t *ocr2FeedsTransmitter) forwarderAddress(ctx context.Context, eoa, ocr2Ag

return forwarderAddress, nil
}

type ocr2FeedsDualTransmission struct {
transmitter ocr2FeedsTransmitter

secondaryContractAddress common.Address
secondaryFromAddress common.Address
secondaryMeta map[string][]string
}

func (t *ocr2FeedsDualTransmission) CreateEthTransaction(ctx context.Context, toAddress common.Address, payload []byte, txMeta *txmgr.TxMeta) error {
// Primary transmission
errPrimary := t.transmitter.CreateEthTransaction(ctx, toAddress, payload, txMeta)
if errPrimary != nil {
errPrimary = fmt.Errorf("skipped primary transmission: %w", errPrimary)
}

if txMeta == nil {
txMeta = &txmgr.TxMeta{}
}

dualBroadcast := true
dualBroadcastParams := t.urlParams()

txMeta.DualBroadcast = &dualBroadcast
txMeta.DualBroadcastParams = &dualBroadcastParams

// Secondary transmission
_, errSecondary := t.transmitter.txm.CreateTransaction(ctx, txmgr.TxRequest{
FromAddress: t.secondaryFromAddress,
ToAddress: t.secondaryContractAddress,
EncodedPayload: payload,
FeeLimit: t.transmitter.gasLimit,
Strategy: t.transmitter.strategy,
Checker: t.transmitter.checker,
Meta: txMeta,
})

errSecondary = errors.Wrap(errSecondary, "skipped secondary transmission")
return errors2.Join(errPrimary, errSecondary)
}

func (t *ocr2FeedsDualTransmission) FromAddress(ctx context.Context) common.Address {
return t.transmitter.FromAddress(ctx)
}

func (t *ocr2FeedsDualTransmission) urlParams() string {
values := url.Values{}
for k, v := range t.secondaryMeta {
for _, p := range v {
values.Add(k, p)
}
}
return values.Encode()
}
Loading

0 comments on commit a4d3c22

Please sign in to comment.