From c274c23eabc7fade1b1a5e7649187c56f218cc77 Mon Sep 17 00:00:00 2001 From: Sri Kidambi <1702865+kidambisrinivas@users.noreply.github.com> Date: Wed, 13 Dec 2023 23:45:38 +0000 Subject: [PATCH 1/2] Introduce job spec flag for custom reverted pipeline (#11529) * Introduce job spec flag for custom reverted pipeline * Disable the flag for V2+ * Rename file after merge conflict --- core/services/job/job_orm_test.go | 21 +++-- core/services/job/models.go | 3 + core/services/job/orm.go | 4 +- core/services/job/validate.go | 1 + core/services/vrf/delegate.go | 3 + .../0214_add_custom_reverts_vrf.sql | 5 ++ core/testdata/testspecs/v2_specs.go | 3 + core/web/presenters/job.go | 38 ++++---- core/web/presenters/job_test.go | 87 +++++++++++++++++++ core/web/resolver/spec.go | 5 ++ core/web/resolver/spec_test.go | 3 + core/web/schema/type/spec.graphql | 1 + 12 files changed, 149 insertions(+), 25 deletions(-) create mode 100644 core/store/migrate/migrations/0214_add_custom_reverts_vrf.sql diff --git a/core/services/job/job_orm_test.go b/core/services/job/job_orm_test.go index c0622ba8066..768d16ddeb7 100644 --- a/core/services/job/job_orm_test.go +++ b/core/services/job/job_orm_test.go @@ -454,6 +454,9 @@ func TestORM_CreateJob_VRFV2(t *testing.T) { var batchFulfillmentEnabled bool require.NoError(t, db.Get(&batchFulfillmentEnabled, `SELECT batch_fulfillment_enabled FROM vrf_specs LIMIT 1`)) require.False(t, batchFulfillmentEnabled) + var customRevertsPipelineEnabled bool + require.NoError(t, db.Get(&customRevertsPipelineEnabled, `SELECT custom_reverts_pipeline_enabled FROM vrf_specs LIMIT 1`)) + require.False(t, customRevertsPipelineEnabled) var batchFulfillmentGasMultiplier float64 require.NoError(t, db.Get(&batchFulfillmentGasMultiplier, `SELECT batch_fulfillment_gas_multiplier FROM vrf_specs LIMIT 1`)) require.Equal(t, float64(1.0), batchFulfillmentGasMultiplier) @@ -514,13 +517,14 @@ func TestORM_CreateJob_VRFV2Plus(t *testing.T) { fromAddresses := []string{cltest.NewEIP55Address().String(), cltest.NewEIP55Address().String()} jb, err := vrfcommon.ValidatedVRFSpec(testspecs.GenerateVRFSpec( testspecs.VRFSpecParams{ - VRFVersion: vrfcommon.V2Plus, - RequestedConfsDelay: 10, - FromAddresses: fromAddresses, - ChunkSize: 25, - BackoffInitialDelay: time.Minute, - BackoffMaxDelay: time.Hour, - GasLanePrice: assets.GWei(100), + VRFVersion: vrfcommon.V2Plus, + RequestedConfsDelay: 10, + FromAddresses: fromAddresses, + ChunkSize: 25, + BackoffInitialDelay: time.Minute, + BackoffMaxDelay: time.Hour, + GasLanePrice: assets.GWei(100), + CustomRevertsPipelineEnabled: true, }). Toml()) require.NoError(t, err) @@ -534,6 +538,9 @@ func TestORM_CreateJob_VRFV2Plus(t *testing.T) { var batchFulfillmentEnabled bool require.NoError(t, db.Get(&batchFulfillmentEnabled, `SELECT batch_fulfillment_enabled FROM vrf_specs LIMIT 1`)) require.False(t, batchFulfillmentEnabled) + var customRevertsPipelineEnabled bool + require.NoError(t, db.Get(&customRevertsPipelineEnabled, `SELECT custom_reverts_pipeline_enabled FROM vrf_specs LIMIT 1`)) + require.True(t, customRevertsPipelineEnabled) var batchFulfillmentGasMultiplier float64 require.NoError(t, db.Get(&batchFulfillmentGasMultiplier, `SELECT batch_fulfillment_gas_multiplier FROM vrf_specs LIMIT 1`)) require.Equal(t, float64(1.0), batchFulfillmentGasMultiplier) diff --git a/core/services/job/models.go b/core/services/job/models.go index 3c869f76056..f60e0a12981 100644 --- a/core/services/job/models.go +++ b/core/services/job/models.go @@ -503,6 +503,9 @@ type VRFSpec struct { // for fulfilling requests. If set to true, batchCoordinatorAddress must be set in // the job spec. BatchFulfillmentEnabled bool `toml:"batchFulfillmentEnabled"` + // CustomRevertsPipelineEnabled indicates to the vrf job to run the + // custom reverted txns pipeline along with VRF listener + CustomRevertsPipelineEnabled bool `toml:"customRevertsPipelineEnabled"` // BatchFulfillmentGasMultiplier is used to determine the final gas estimate for the batch // fulfillment. BatchFulfillmentGasMultiplier tomlutils.Float64 `toml:"batchFulfillmentGasMultiplier"` diff --git a/core/services/job/orm.go b/core/services/job/orm.go index b2cf2b2af4b..6c5a879ebd0 100644 --- a/core/services/job/orm.go +++ b/core/services/job/orm.go @@ -326,14 +326,14 @@ func (o *orm) CreateJob(jb *Job, qopts ...pg.QOpt) error { evm_chain_id, from_addresses, poll_period, requested_confs_delay, request_timeout, chunk_size, batch_coordinator_address, batch_fulfillment_enabled, batch_fulfillment_gas_multiplier, backoff_initial_delay, backoff_max_delay, gas_lane_price, - vrf_owner_address, + vrf_owner_address, custom_reverts_pipeline_enabled, created_at, updated_at) VALUES ( :coordinator_address, :public_key, :min_incoming_confirmations, :evm_chain_id, :from_addresses, :poll_period, :requested_confs_delay, :request_timeout, :chunk_size, :batch_coordinator_address, :batch_fulfillment_enabled, :batch_fulfillment_gas_multiplier, :backoff_initial_delay, :backoff_max_delay, :gas_lane_price, - :vrf_owner_address, + :vrf_owner_address, :custom_reverts_pipeline_enabled, NOW(), NOW()) RETURNING id;` diff --git a/core/services/job/validate.go b/core/services/job/validate.go index 8f559fdb02d..b7a1dca3616 100644 --- a/core/services/job/validate.go +++ b/core/services/job/validate.go @@ -63,6 +63,7 @@ func ValidateSpec(ts string) (Type, error) { if jb.Pipeline.RequiresPreInsert() && !jb.Type.SupportsAsync() { return "", errors.Errorf("async=true tasks are not supported for %v", jb.Type) } + // spec.CustomRevertsPipelineEnabled == false, default is custom reverted txns pipeline disabled if strings.Contains(ts, "<{}>") { return "", errors.Errorf("'<{}>' syntax is not supported. Please use \"{}\" instead") diff --git a/core/services/vrf/delegate.go b/core/services/vrf/delegate.go index 03e40614a10..ba28e83bf3f 100644 --- a/core/services/vrf/delegate.go +++ b/core/services/vrf/delegate.go @@ -142,6 +142,9 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) { if vrfOwner != nil { return nil, errors.New("VRF Owner is not supported for VRF V2 Plus") } + if jb.VRFSpec.CustomRevertsPipelineEnabled { + return nil, errors.New("Custom Reverted Txns Pipeline is not supported for VRF V2 Plus") + } // Get the LINKNATIVEFEED address with retries // This is needed because the RPC endpoint may be down so we need to diff --git a/core/store/migrate/migrations/0214_add_custom_reverts_vrf.sql b/core/store/migrate/migrations/0214_add_custom_reverts_vrf.sql new file mode 100644 index 00000000000..a2865fce816 --- /dev/null +++ b/core/store/migrate/migrations/0214_add_custom_reverts_vrf.sql @@ -0,0 +1,5 @@ +-- +goose Up +ALTER TABLE vrf_specs ADD COLUMN custom_reverts_pipeline_enabled boolean DEFAULT FALSE NOT NULL; + +-- +goose Down +ALTER TABLE vrf_specs DROP COLUMN custom_reverts_pipeline_enabled; diff --git a/core/testdata/testspecs/v2_specs.go b/core/testdata/testspecs/v2_specs.go index 00297ebdb12..e9dd25fa0f7 100644 --- a/core/testdata/testspecs/v2_specs.go +++ b/core/testdata/testspecs/v2_specs.go @@ -234,6 +234,7 @@ type VRFSpecParams struct { BatchCoordinatorAddress string VRFOwnerAddress string BatchFulfillmentEnabled bool + CustomRevertsPipelineEnabled bool BatchFulfillmentGasMultiplier float64 MinIncomingConfirmations int FromAddresses []string @@ -403,6 +404,7 @@ evmChainID = "%s" batchCoordinatorAddress = "%s" batchFulfillmentEnabled = %v batchFulfillmentGasMultiplier = %s +customRevertsPipelineEnabled = %v minIncomingConfirmations = %d requestedConfsDelay = %d requestTimeout = "%s" @@ -419,6 +421,7 @@ observationSource = """ toml := fmt.Sprintf(template, jobID, name, coordinatorAddress, params.EVMChainID, batchCoordinatorAddress, params.BatchFulfillmentEnabled, strconv.FormatFloat(batchFulfillmentGasMultiplier, 'f', 2, 64), + params.CustomRevertsPipelineEnabled, confirmations, params.RequestedConfsDelay, requestTimeout.String(), publicKey, chunkSize, params.BackoffInitialDelay.String(), params.BackoffMaxDelay.String(), gasLanePrice.String(), pollPeriod.String(), observationSource) diff --git a/core/web/presenters/job.go b/core/web/presenters/job.go index a2a9e70c793..e9b63c73615 100644 --- a/core/web/presenters/job.go +++ b/core/web/presenters/job.go @@ -267,6 +267,7 @@ func NewCronSpec(spec *job.CronSpec) *CronSpec { type VRFSpec struct { BatchCoordinatorAddress *ethkey.EIP55Address `json:"batchCoordinatorAddress"` BatchFulfillmentEnabled bool `json:"batchFulfillmentEnabled"` + CustomRevertsPipelineEnabled *bool `json:"customRevertsPipelineEnabled,omitempty"` BatchFulfillmentGasMultiplier float64 `json:"batchFulfillmentGasMultiplier"` CoordinatorAddress ethkey.EIP55Address `json:"coordinatorAddress"` PublicKey secp256k1.PublicKey `json:"publicKey"` @@ -281,26 +282,31 @@ type VRFSpec struct { BackoffInitialDelay models.Duration `json:"backoffInitialDelay"` BackoffMaxDelay models.Duration `json:"backoffMaxDelay"` GasLanePrice *assets.Wei `json:"gasLanePrice"` - VRFOwnerAddress *ethkey.EIP55Address `json:"vrfOwnerAddress"` + RequestedConfsDelay int64 `json:"requestedConfsDelay"` + VRFOwnerAddress *ethkey.EIP55Address `json:"vrfOwnerAddress,omitempty"` } func NewVRFSpec(spec *job.VRFSpec) *VRFSpec { return &VRFSpec{ - BatchCoordinatorAddress: spec.BatchCoordinatorAddress, - BatchFulfillmentEnabled: spec.BatchFulfillmentEnabled, - CoordinatorAddress: spec.CoordinatorAddress, - PublicKey: spec.PublicKey, - FromAddresses: spec.FromAddresses, - PollPeriod: models.MustMakeDuration(spec.PollPeriod), - MinIncomingConfirmations: spec.MinIncomingConfirmations, - CreatedAt: spec.CreatedAt, - UpdatedAt: spec.UpdatedAt, - EVMChainID: spec.EVMChainID, - ChunkSize: spec.ChunkSize, - RequestTimeout: models.MustMakeDuration(spec.RequestTimeout), - BackoffInitialDelay: models.MustMakeDuration(spec.BackoffInitialDelay), - BackoffMaxDelay: models.MustMakeDuration(spec.BackoffMaxDelay), - GasLanePrice: spec.GasLanePrice, + BatchCoordinatorAddress: spec.BatchCoordinatorAddress, + BatchFulfillmentEnabled: spec.BatchFulfillmentEnabled, + BatchFulfillmentGasMultiplier: float64(spec.BatchFulfillmentGasMultiplier), + CustomRevertsPipelineEnabled: &spec.CustomRevertsPipelineEnabled, + CoordinatorAddress: spec.CoordinatorAddress, + PublicKey: spec.PublicKey, + FromAddresses: spec.FromAddresses, + PollPeriod: models.MustMakeDuration(spec.PollPeriod), + MinIncomingConfirmations: spec.MinIncomingConfirmations, + CreatedAt: spec.CreatedAt, + UpdatedAt: spec.UpdatedAt, + EVMChainID: spec.EVMChainID, + ChunkSize: spec.ChunkSize, + RequestTimeout: models.MustMakeDuration(spec.RequestTimeout), + BackoffInitialDelay: models.MustMakeDuration(spec.BackoffInitialDelay), + BackoffMaxDelay: models.MustMakeDuration(spec.BackoffMaxDelay), + GasLanePrice: spec.GasLanePrice, + RequestedConfsDelay: spec.RequestedConfsDelay, + VRFOwnerAddress: spec.VRFOwnerAddress, } } diff --git a/core/web/presenters/job_test.go b/core/web/presenters/job_test.go index a5d6db0df18..b782452948b 100644 --- a/core/web/presenters/job_test.go +++ b/core/web/presenters/job_test.go @@ -14,11 +14,13 @@ import ( "gopkg.in/guregu/null.v4" "github.com/smartcontractkit/chainlink-common/pkg/assets" + evmassets "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" clnull "github.com/smartcontractkit/chainlink/v2/core/null" "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" + "github.com/smartcontractkit/chainlink/v2/core/services/signatures/secp256k1" "github.com/smartcontractkit/chainlink/v2/core/store/models" "github.com/smartcontractkit/chainlink/v2/core/web/presenters" ) @@ -58,6 +60,7 @@ func TestJob(t *testing.T) { trustedBlockhashStoreBatchSize := int32(20) var specGasLimit uint32 = 1000 + vrfPubKey, _ := secp256k1.NewPublicKeyFromHex("0xede539e216e3a50e69d1c68aa9cc472085876c4002f6e1e6afee0ea63b50a78b00") testCases := []struct { name string @@ -469,6 +472,90 @@ func TestJob(t *testing.T) { } }`, }, + { + name: "vrf job spec", + job: job.Job{ + ID: 1, + Name: null.StringFrom("vrf_test"), + Type: job.VRF, + SchemaVersion: 1, + ExternalJobID: uuid.MustParse("0eec7e1d-d0d2-476c-a1a8-72dfb6633f47"), + VRFSpec: &job.VRFSpec{ + BatchCoordinatorAddress: &contractAddress, + BatchFulfillmentEnabled: true, + CustomRevertsPipelineEnabled: true, + MinIncomingConfirmations: 1, + CoordinatorAddress: contractAddress, + CreatedAt: timestamp, + UpdatedAt: timestamp, + EVMChainID: evmChainID, + FromAddresses: []ethkey.EIP55Address{fromAddress}, + PublicKey: vrfPubKey, + RequestedConfsDelay: 10, + ChunkSize: 25, + BatchFulfillmentGasMultiplier: 1, + GasLanePrice: evmassets.GWei(200), + VRFOwnerAddress: nil, + }, + PipelineSpec: &pipeline.Spec{ + ID: 1, + DotDagSource: "", + }, + }, + want: fmt.Sprintf(` + { + "data": { + "type": "jobs", + "id": "1", + "attributes": { + "name": "vrf_test", + "type": "vrf", + "schemaVersion": 1, + "maxTaskDuration": "0s", + "externalJobID": "0eec7e1d-d0d2-476c-a1a8-72dfb6633f47", + "directRequestSpec": null, + "fluxMonitorSpec": null, + "gasLimit": null, + "forwardingAllowed": false, + "cronSpec": null, + "offChainReportingOracleSpec": null, + "offChainReporting2OracleSpec": null, + "keeperSpec": null, + "vrfSpec": { + "batchCoordinatorAddress": "%s", + "batchFulfillmentEnabled": true, + "customRevertsPipelineEnabled": true, + "confirmations": 1, + "coordinatorAddress": "%s", + "createdAt": "2000-01-01T00:00:00Z", + "updatedAt": "2000-01-01T00:00:00Z", + "evmChainID": "42", + "fromAddresses": ["%s"], + "pollPeriod": "0s", + "publicKey": "%s", + "requestedConfsDelay": 10, + "requestTimeout": "0s", + "chunkSize": 25, + "batchFulfillmentGasMultiplier": 1, + "backoffInitialDelay": "0s", + "backoffMaxDelay": "0s", + "gasLanePrice": "200 gwei" + }, + "webhookSpec": null, + "blockhashStoreSpec": null, + "blockHeaderFeederSpec": null, + "bootstrapSpec": null, + "pipelineSpec": { + "id": 1, + "jobID": 0, + "dotDagSource": "" + }, + "gatewaySpec": null, + "errors": [] + } + } + }`, contractAddress, contractAddress, fromAddress, vrfPubKey.String()), + }, { name: "blockhash store spec", job: job.Job{ diff --git a/core/web/resolver/spec.go b/core/web/resolver/spec.go index 3e93ea38eab..5e8937dbb96 100644 --- a/core/web/resolver/spec.go +++ b/core/web/resolver/spec.go @@ -641,6 +641,11 @@ func (r *VRFSpecResolver) BatchFulfillmentGasMultiplier() float64 { return float64(r.spec.BatchFulfillmentGasMultiplier) } +// CustomRevertsPipelineEnabled resolves the spec's custom reverts pipeline enabled flag. +func (r *VRFSpecResolver) CustomRevertsPipelineEnabled() *bool { + return &r.spec.CustomRevertsPipelineEnabled +} + // ChunkSize resolves the spec's chunk size. func (r *VRFSpecResolver) ChunkSize() int32 { return int32(r.spec.ChunkSize) diff --git a/core/web/resolver/spec_test.go b/core/web/resolver/spec_test.go index 277aac851a6..a4fb9cdb338 100644 --- a/core/web/resolver/spec_test.go +++ b/core/web/resolver/spec_test.go @@ -581,6 +581,7 @@ func TestResolver_VRFSpec(t *testing.T) { VRFSpec: &job.VRFSpec{ BatchCoordinatorAddress: &batchCoordinatorAddress, BatchFulfillmentEnabled: true, + CustomRevertsPipelineEnabled: true, MinIncomingConfirmations: 1, CoordinatorAddress: coordinatorAddress, CreatedAt: f.Timestamp(), @@ -617,6 +618,7 @@ func TestResolver_VRFSpec(t *testing.T) { batchCoordinatorAddress batchFulfillmentEnabled batchFulfillmentGasMultiplier + customRevertsPipelineEnabled chunkSize backoffInitialDelay backoffMaxDelay @@ -644,6 +646,7 @@ func TestResolver_VRFSpec(t *testing.T) { "batchCoordinatorAddress": "0x0ad9FE7a58216242a8475ca92F222b0640E26B63", "batchFulfillmentEnabled": true, "batchFulfillmentGasMultiplier": 1, + "customRevertsPipelineEnabled": true, "chunkSize": 25, "backoffInitialDelay": "1m0s", "backoffMaxDelay": "1h0m0s", diff --git a/core/web/schema/type/spec.graphql b/core/web/schema/type/spec.graphql index 81244478637..5e24f7c3fa8 100644 --- a/core/web/schema/type/spec.graphql +++ b/core/web/schema/type/spec.graphql @@ -97,6 +97,7 @@ type VRFSpec { batchCoordinatorAddress: String batchFulfillmentEnabled: Boolean! batchFulfillmentGasMultiplier: Float! + customRevertsPipelineEnabled: Boolean chunkSize: Int! backoffInitialDelay: String! backoffMaxDelay: String! From 917b74f3a67395f4106f60f70949043f0d989bdf Mon Sep 17 00:00:00 2001 From: Anindita Ghosh <88458927+AnieeG@users.noreply.github.com> Date: Wed, 13 Dec 2023 15:47:26 -0800 Subject: [PATCH 2/2] options to include customized pg and chainlink image (#11570) --- integration-tests/docker/test_env/cl_node.go | 23 ++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/integration-tests/docker/test_env/cl_node.go b/integration-tests/docker/test_env/cl_node.go index b79bb91c70e..d6ab22957a2 100644 --- a/integration-tests/docker/test_env/cl_node.go +++ b/integration-tests/docker/test_env/cl_node.go @@ -25,6 +25,7 @@ import ( "github.com/smartcontractkit/chainlink-testing-framework/logging" "github.com/smartcontractkit/chainlink-testing-framework/logstream" "github.com/smartcontractkit/chainlink-testing-framework/utils/testcontext" + "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/chaintype" @@ -83,6 +84,28 @@ func WithLogStream(ls *logstream.LogStream) ClNodeOption { } } +func WithImage(image string) ClNodeOption { + return func(c *ClNode) { + c.ContainerImage = image + } +} + +func WithVersion(version string) ClNodeOption { + return func(c *ClNode) { + c.ContainerVersion = version + } +} + +func WithPgDBOptions(opts ...test_env.PostgresDbOption) ClNodeOption { + return func(c *ClNode) { + var err error + c.PostgresDb, err = test_env.NewPostgresDb(c.EnvComponent.Networks, opts...) + if err != nil { + c.t.Fatalf("failed to create postgres db: %v", err) + } + } +} + func NewClNode(networks []string, imageName, imageVersion string, nodeConfig *chainlink.Config, opts ...ClNodeOption) (*ClNode, error) { nodeDefaultCName := fmt.Sprintf("%s-%s", "cl-node", uuid.NewString()[0:8]) pgDefaultCName := fmt.Sprintf("pg-%s", nodeDefaultCName)