From eb2f2bc67b8f3e8bc0bb3b3eed5f774e356899e6 Mon Sep 17 00:00:00 2001 From: Bolek <1416262+bolekk@users.noreply.github.com> Date: Tue, 10 Dec 2024 08:05:42 -0800 Subject: [PATCH 1/4] [Keystone] Increase default OCR phase size limit (#969) --- pkg/capabilities/consensus/ocr3/factory.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/capabilities/consensus/ocr3/factory.go b/pkg/capabilities/consensus/ocr3/factory.go index d634121b7..04f062eec 100644 --- a/pkg/capabilities/consensus/ocr3/factory.go +++ b/pkg/capabilities/consensus/ocr3/factory.go @@ -22,7 +22,8 @@ type factory struct { } const ( - defaultMaxPhaseOutputBytes = 100000 + // TODO(KS-617): read this from contract config + defaultMaxPhaseOutputBytes = 1000000 // 1 MB defaultMaxReportCount = 20 ) From a9c706f99e83ac0ec0e3508930138e4e06d5b160 Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Tue, 10 Dec 2024 11:26:53 -0800 Subject: [PATCH 2/4] [INFOPLAT-1592] Address high CPU utilization when telemetry is enabled (#967) * [loop/EnvConfig] parse sets TelemetryEmitterBatchProcessor, TelemetryEmitterExportTimeout * [beholder/client] BatchProcessor ExportTimeout option is non-zero value * [loop/EnvConfig] Use maps.Equal in tests --------- Co-authored-by: Patrick --- pkg/beholder/client.go | 20 +++++-- pkg/beholder/httpclient.go | 20 +++++-- pkg/loop/config.go | 8 +++ pkg/loop/config_test.go | 108 +++++++++++++++++++++++++++++-------- 4 files changed, 123 insertions(+), 33 deletions(-) diff --git a/pkg/beholder/client.go b/pkg/beholder/client.go index f707d3346..c6bac815a 100644 --- a/pkg/beholder/client.go +++ b/pkg/beholder/client.go @@ -14,7 +14,6 @@ import ( sdklog "go.opentelemetry.io/otel/sdk/log" sdkmetric "go.opentelemetry.io/otel/sdk/metric" sdkresource "go.opentelemetry.io/otel/sdk/resource" - "go.opentelemetry.io/otel/sdk/trace" sdktrace "go.opentelemetry.io/otel/sdk/trace" oteltrace "go.opentelemetry.io/otel/trace" "google.golang.org/grpc/credentials" @@ -112,9 +111,13 @@ func newGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro // Logger var loggerProcessor sdklog.Processor if cfg.LogBatchProcessor { + batchProcessorOpts := []sdklog.BatchProcessorOption{} + if cfg.LogExportTimeout > 0 { + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportTimeout(cfg.LogExportTimeout)) // Default is 30s + } loggerProcessor = sdklog.NewBatchProcessor( sharedLogExporter, - sdklog.WithExportTimeout(cfg.LogExportTimeout), // Default is 30s + batchProcessorOpts..., ) } else { loggerProcessor = sdklog.NewSimpleProcessor(sharedLogExporter) @@ -152,9 +155,13 @@ func newGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro // Message Emitter var messageLogProcessor sdklog.Processor if cfg.EmitterBatchProcessor { + batchProcessorOpts := []sdklog.BatchProcessorOption{} + if cfg.EmitterExportTimeout > 0 { + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportTimeout(cfg.EmitterExportTimeout)) // Default is 30s + } messageLogProcessor = sdklog.NewBatchProcessor( sharedLogExporter, - sdklog.WithExportTimeout(cfg.EmitterExportTimeout), // Default is 30s + batchProcessorOpts..., ) } else { messageLogProcessor = sdklog.NewSimpleProcessor(sharedLogExporter) @@ -314,9 +321,12 @@ func newTracerProvider(config Config, resource *sdkresource.Resource, creds cred if err != nil { return nil, err } - + batcherOpts := []sdktrace.BatchSpanProcessorOption{} + if config.TraceBatchTimeout > 0 { + batcherOpts = append(batcherOpts, sdktrace.WithBatchTimeout(config.TraceBatchTimeout)) // Default is 5s + } opts := []sdktrace.TracerProviderOption{ - sdktrace.WithBatcher(exporter, trace.WithBatchTimeout(config.TraceBatchTimeout)), // Default is 5s + sdktrace.WithBatcher(exporter, batcherOpts...), sdktrace.WithResource(resource), sdktrace.WithSampler( sdktrace.ParentBased( diff --git a/pkg/beholder/httpclient.go b/pkg/beholder/httpclient.go index ca2ebd014..0df44e647 100644 --- a/pkg/beholder/httpclient.go +++ b/pkg/beholder/httpclient.go @@ -14,7 +14,6 @@ import ( sdklog "go.opentelemetry.io/otel/sdk/log" sdkmetric "go.opentelemetry.io/otel/sdk/metric" sdkresource "go.opentelemetry.io/otel/sdk/resource" - "go.opentelemetry.io/otel/sdk/trace" sdktrace "go.opentelemetry.io/otel/sdk/trace" ) @@ -77,9 +76,13 @@ func newHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro // Logger var loggerProcessor sdklog.Processor if cfg.LogBatchProcessor { + batchProcessorOpts := []sdklog.BatchProcessorOption{} + if cfg.LogExportTimeout > 0 { + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportTimeout(cfg.LogExportTimeout)) // Default is 30s + } loggerProcessor = sdklog.NewBatchProcessor( sharedLogExporter, - sdklog.WithExportTimeout(cfg.LogExportTimeout), // Default is 30s + batchProcessorOpts..., ) } else { loggerProcessor = sdklog.NewSimpleProcessor(sharedLogExporter) @@ -117,9 +120,13 @@ func newHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro // Message Emitter var messageLogProcessor sdklog.Processor if cfg.EmitterBatchProcessor { + batchProcessorOpts := []sdklog.BatchProcessorOption{} + if cfg.EmitterExportTimeout > 0 { + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportTimeout(cfg.EmitterExportTimeout)) // Default is 30s + } messageLogProcessor = sdklog.NewBatchProcessor( sharedLogExporter, - sdklog.WithExportTimeout(cfg.EmitterExportTimeout), // Default is 30s + batchProcessorOpts..., // Default is 30s ) } else { messageLogProcessor = sdklog.NewSimpleProcessor(sharedLogExporter) @@ -181,9 +188,12 @@ func newHTTPTracerProvider(config Config, resource *sdkresource.Resource, tlsCon if err != nil { return nil, err } - + batcherOpts := []sdktrace.BatchSpanProcessorOption{} + if config.TraceBatchTimeout > 0 { + batcherOpts = append(batcherOpts, sdktrace.WithBatchTimeout(config.TraceBatchTimeout)) // Default is 5s + } opts := []sdktrace.TracerProviderOption{ - sdktrace.WithBatcher(exporter, trace.WithBatchTimeout(config.TraceBatchTimeout)), // Default is 5s + sdktrace.WithBatcher(exporter, batcherOpts...), sdktrace.WithResource(resource), sdktrace.WithSampler( sdktrace.ParentBased( diff --git a/pkg/loop/config.go b/pkg/loop/config.go index 6e18e60ae..e63f72f2f 100644 --- a/pkg/loop/config.go +++ b/pkg/loop/config.go @@ -142,6 +142,14 @@ func (e *EnvConfig) parse() error { e.TelemetryTraceSampleRatio = getFloat64OrZero(envTelemetryTraceSampleRatio) e.TelemetryAuthHeaders = getMap(envTelemetryAuthHeader) e.TelemetryAuthPubKeyHex = os.Getenv(envTelemetryAuthPubKeyHex) + e.TelemetryEmitterBatchProcessor, err = getBool(envTelemetryEmitterBatchProcessor) + if err != nil { + return fmt.Errorf("failed to parse %s: %w", envTelemetryEmitterBatchProcessor, err) + } + e.TelemetryEmitterExportTimeout, err = time.ParseDuration(os.Getenv(envTelemetryEmitterExportTimeout)) + if err != nil { + return fmt.Errorf("failed to parse %s: %w", envTelemetryEmitterExportTimeout, err) + } } return nil } diff --git a/pkg/loop/config_test.go b/pkg/loop/config_test.go index b2eb18745..78d177aa4 100644 --- a/pkg/loop/config_test.go +++ b/pkg/loop/config_test.go @@ -1,6 +1,7 @@ package loop import ( + "maps" "net/url" "os" "strconv" @@ -18,34 +19,65 @@ import ( func TestEnvConfig_parse(t *testing.T) { cases := []struct { - name string - envVars map[string]string - expectError bool - expectedDatabaseURL string - expectedPrometheusPort int - expectedTracingEnabled bool - expectedTracingCollectorTarget string - expectedTracingSamplingRatio float64 - expectedTracingTLSCertPath string + name string + envVars map[string]string + expectError bool + expectedDatabaseURL string + expectedPrometheusPort int + expectedTracingEnabled bool + expectedTracingCollectorTarget string + expectedTracingSamplingRatio float64 + expectedTracingTLSCertPath string + expectedTelemetryEnabled bool + expectedTelemetryEndpoint string + expectedTelemetryInsecureConn bool + expectedTelemetryCACertFile string + expectedTelemetryAttributes OtelAttributes + expectedTelemetryTraceSampleRatio float64 + expectedTelemetryAuthHeaders map[string]string + expectedTelemetryAuthPubKeyHex string + expectedTelemetryEmitterBatchProcessor bool + expectedTelemetryEmitterExportTimeout time.Duration }{ { name: "All variables set correctly", envVars: map[string]string{ - envDatabaseURL: "postgres://user:password@localhost:5432/db", - envPromPort: "8080", - envTracingEnabled: "true", - envTracingCollectorTarget: "some:target", - envTracingSamplingRatio: "1.0", - envTracingTLSCertPath: "internal/test/fixtures/client.pem", - envTracingAttribute + "XYZ": "value", + envDatabaseURL: "postgres://user:password@localhost:5432/db", + envPromPort: "8080", + envTracingEnabled: "true", + envTracingCollectorTarget: "some:target", + envTracingSamplingRatio: "1.0", + envTracingTLSCertPath: "internal/test/fixtures/client.pem", + envTracingAttribute + "XYZ": "value", + envTelemetryEnabled: "true", + envTelemetryEndpoint: "example.com/beholder", + envTelemetryInsecureConn: "true", + envTelemetryCACertFile: "foo/bar", + envTelemetryAttribute + "foo": "bar", + envTelemetryAttribute + "baz": "42", + envTelemetryTraceSampleRatio: "0.42", + envTelemetryAuthHeader + "header-key": "header-value", + envTelemetryAuthPubKeyHex: "pub-key-hex", + envTelemetryEmitterBatchProcessor: "true", + envTelemetryEmitterExportTimeout: "1s", }, - expectError: false, - expectedDatabaseURL: "postgres://user:password@localhost:5432/db", - expectedPrometheusPort: 8080, - expectedTracingEnabled: true, - expectedTracingCollectorTarget: "some:target", - expectedTracingSamplingRatio: 1.0, - expectedTracingTLSCertPath: "internal/test/fixtures/client.pem", + expectError: false, + expectedDatabaseURL: "postgres://user:password@localhost:5432/db", + expectedPrometheusPort: 8080, + expectedTracingEnabled: true, + expectedTracingCollectorTarget: "some:target", + expectedTracingSamplingRatio: 1.0, + expectedTracingTLSCertPath: "internal/test/fixtures/client.pem", + expectedTelemetryEnabled: true, + expectedTelemetryEndpoint: "example.com/beholder", + expectedTelemetryInsecureConn: true, + expectedTelemetryCACertFile: "foo/bar", + expectedTelemetryAttributes: OtelAttributes{"foo": "bar", "baz": "42"}, + expectedTelemetryTraceSampleRatio: 0.42, + expectedTelemetryAuthHeaders: map[string]string{"header-key": "header-value"}, + expectedTelemetryAuthPubKeyHex: "pub-key-hex", + expectedTelemetryEmitterBatchProcessor: true, + expectedTelemetryEmitterExportTimeout: 1 * time.Second, }, { name: "CL_DATABASE_URL parse error", @@ -106,6 +138,36 @@ func TestEnvConfig_parse(t *testing.T) { if config.TracingTLSCertPath != tc.expectedTracingTLSCertPath { t.Errorf("Expected tracingTLSCertPath %s, got %s", tc.expectedTracingTLSCertPath, config.TracingTLSCertPath) } + if config.TelemetryEnabled != tc.expectedTelemetryEnabled { + t.Errorf("Expected telemetryEnabled %v, got %v", tc.expectedTelemetryEnabled, config.TelemetryEnabled) + } + if config.TelemetryEndpoint != tc.expectedTelemetryEndpoint { + t.Errorf("Expected telemetryEndpoint %s, got %s", tc.expectedTelemetryEndpoint, config.TelemetryEndpoint) + } + if config.TelemetryInsecureConnection != tc.expectedTelemetryInsecureConn { + t.Errorf("Expected telemetryInsecureConn %v, got %v", tc.expectedTelemetryInsecureConn, config.TelemetryInsecureConnection) + } + if config.TelemetryCACertFile != tc.expectedTelemetryCACertFile { + t.Errorf("Expected telemetryCACertFile %s, got %s", tc.expectedTelemetryCACertFile, config.TelemetryCACertFile) + } + if !maps.Equal(config.TelemetryAttributes, tc.expectedTelemetryAttributes) { + t.Errorf("Expected telemetryAttributes %v, got %v", tc.expectedTelemetryAttributes, config.TelemetryAttributes) + } + if config.TelemetryTraceSampleRatio != tc.expectedTelemetryTraceSampleRatio { + t.Errorf("Expected telemetryTraceSampleRatio %f, got %f", tc.expectedTelemetryTraceSampleRatio, config.TelemetryTraceSampleRatio) + } + if !maps.Equal(config.TelemetryAuthHeaders, tc.expectedTelemetryAuthHeaders) { + t.Errorf("Expected telemetryAuthHeaders %v, got %v", tc.expectedTelemetryAuthHeaders, config.TelemetryAuthHeaders) + } + if config.TelemetryAuthPubKeyHex != tc.expectedTelemetryAuthPubKeyHex { + t.Errorf("Expected telemetryAuthPubKeyHex %s, got %s", tc.expectedTelemetryAuthPubKeyHex, config.TelemetryAuthPubKeyHex) + } + if config.TelemetryEmitterBatchProcessor != tc.expectedTelemetryEmitterBatchProcessor { + t.Errorf("Expected telemetryEmitterBatchProcessor %v, got %v", tc.expectedTelemetryEmitterBatchProcessor, config.TelemetryEmitterBatchProcessor) + } + if config.TelemetryEmitterExportTimeout != tc.expectedTelemetryEmitterExportTimeout { + t.Errorf("Expected telemetryEmitterExportTimeout %v, got %v", tc.expectedTelemetryEmitterExportTimeout, config.TelemetryEmitterExportTimeout) + } } } }) From 9087f5e8daf9a3e693a726839f789e6590e7ce09 Mon Sep 17 00:00:00 2001 From: Lei Date: Wed, 11 Dec 2024 11:06:13 -0800 Subject: [PATCH 3/4] add cron trigger and readcontract action (#971) Signed-off-by: Lei --- .../readcontract/action_builders_generated.go | 90 ++++++++++++++ .../readcontract_action-schema.json | 58 +++++++++ .../readcontract_action_generated.go | 115 ++++++++++++++++++ .../readcontracttest/action_mock_generated.go | 27 ++++ .../triggers/cron/cron_trigger-schema.json | 43 +++++++ .../triggers/cron/cron_trigger_generated.go | 92 ++++++++++++++ .../cron/crontest/trigger_mock_generated.go | 17 +++ .../cron/trigger_builders_generated.go | 84 +++++++++++++ 8 files changed, 526 insertions(+) create mode 100644 pkg/capabilities/actions/readcontract/action_builders_generated.go create mode 100644 pkg/capabilities/actions/readcontract/readcontract_action-schema.json create mode 100644 pkg/capabilities/actions/readcontract/readcontract_action_generated.go create mode 100644 pkg/capabilities/actions/readcontract/readcontracttest/action_mock_generated.go create mode 100644 pkg/capabilities/triggers/cron/cron_trigger-schema.json create mode 100644 pkg/capabilities/triggers/cron/cron_trigger_generated.go create mode 100644 pkg/capabilities/triggers/cron/crontest/trigger_mock_generated.go create mode 100644 pkg/capabilities/triggers/cron/trigger_builders_generated.go diff --git a/pkg/capabilities/actions/readcontract/action_builders_generated.go b/pkg/capabilities/actions/readcontract/action_builders_generated.go new file mode 100644 index 000000000..dad3f4d49 --- /dev/null +++ b/pkg/capabilities/actions/readcontract/action_builders_generated.go @@ -0,0 +1,90 @@ +// Code generated by github.com/smartcontractkit/chainlink-common/pkg/capabilities/cli, DO NOT EDIT. + +package readcontract + +import ( + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk" +) + +func (cfg Config) New(w *sdk.WorkflowSpecFactory, ref string, input ActionInput) OutputCap { + + def := sdk.StepDefinition{ + ID: "read-contract-action@1.0.0", Ref: ref, + Inputs: input.ToSteps(), + Config: map[string]any{ + "ContractAddress": cfg.ContractAddress, + "ContractName": cfg.ContractName, + "ContractReaderConfig": cfg.ContractReaderConfig, + "ReadIdentifier": cfg.ReadIdentifier, + }, + CapabilityType: capabilities.CapabilityTypeAction, + } + + step := sdk.Step[Output]{Definition: def} + raw := step.AddTo(w) + return OutputWrapper(raw) +} + +// OutputWrapper allows access to field from an sdk.CapDefinition[Output] +func OutputWrapper(raw sdk.CapDefinition[Output]) OutputCap { + wrapped, ok := raw.(OutputCap) + if ok { + return wrapped + } + return &outputCap{CapDefinition: raw} +} + +type OutputCap interface { + sdk.CapDefinition[Output] + LatestValue() sdk.CapDefinition[any] + private() +} + +type outputCap struct { + sdk.CapDefinition[Output] +} + +func (*outputCap) private() {} +func (c *outputCap) LatestValue() sdk.CapDefinition[any] { + return sdk.AccessField[Output, any](c.CapDefinition, "LatestValue") +} + +func ConstantOutput(value Output) OutputCap { + return &outputCap{CapDefinition: sdk.ConstantDefinition(value)} +} + +func NewOutputFromFields( + latestValue sdk.CapDefinition[any]) OutputCap { + return &simpleOutput{ + CapDefinition: sdk.ComponentCapDefinition[Output]{ + "LatestValue": latestValue.Ref(), + }, + latestValue: latestValue, + } +} + +type simpleOutput struct { + sdk.CapDefinition[Output] + latestValue sdk.CapDefinition[any] +} + +func (c *simpleOutput) LatestValue() sdk.CapDefinition[any] { + return c.latestValue +} + +func (c *simpleOutput) private() {} + +type ActionInput struct { + ConfidenceLevel sdk.CapDefinition[string] + Params sdk.CapDefinition[InputParams] +} + +func (input ActionInput) ToSteps() sdk.StepInputs { + return sdk.StepInputs{ + Mapping: map[string]any{ + "ConfidenceLevel": input.ConfidenceLevel.Ref(), + "Params": input.Params.Ref(), + }, + } +} diff --git a/pkg/capabilities/actions/readcontract/readcontract_action-schema.json b/pkg/capabilities/actions/readcontract/readcontract_action-schema.json new file mode 100644 index 000000000..de94037d6 --- /dev/null +++ b/pkg/capabilities/actions/readcontract/readcontract_action-schema.json @@ -0,0 +1,58 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/smartcontractkit/chainlink-common/pkg/capabilities/actions/readcontract/read-contract-action@1.0.0", + "$defs": { + "Config": { + "type": "object", + "properties": { + "ContractReaderConfig": { + "type": "string" + }, + "ReadIdentifier": { + "type": "string" + }, + "ContractAddress": { + "type": "string" + }, + "ContractName": { + "type": "string" + } + }, + "required": ["ContractReaderConfig", "ReadIdentifier", "ContractAddress", "ContractName"] + }, + "Input": { + "type": "object", + "properties": { + "ConfidenceLevel": { + "type": "string" + }, + "Params": { + "type": "object", + "additionalProperties": true + } + }, + "required": ["ConfidenceLevel", "Params"] + }, + "Output": { + "type": "object", + "properties": { + "LatestValue": { + "type": ["object", "string", "boolean", "null", "array"] + } + }, + "required": ["LatestValue"] + } + }, + "type": "object", + "properties": { + "Config": { + "$ref": "#/$defs/Config" + }, + "Inputs": { + "$ref": "#/$defs/Input" + }, + "Outputs": { + "$ref": "#/$defs/Output" + } + } +} \ No newline at end of file diff --git a/pkg/capabilities/actions/readcontract/readcontract_action_generated.go b/pkg/capabilities/actions/readcontract/readcontract_action_generated.go new file mode 100644 index 000000000..8f19f8da4 --- /dev/null +++ b/pkg/capabilities/actions/readcontract/readcontract_action_generated.go @@ -0,0 +1,115 @@ +// Code generated by github.com/smartcontractkit/chainlink-common/pkg/capabilities/cli, DO NOT EDIT. + +package readcontract + +import ( + "encoding/json" + "fmt" +) + +type Action struct { + // Config corresponds to the JSON schema field "Config". + Config *Config `json:"Config,omitempty" yaml:"Config,omitempty" mapstructure:"Config,omitempty"` + + // Inputs corresponds to the JSON schema field "Inputs". + Inputs *Input `json:"Inputs,omitempty" yaml:"Inputs,omitempty" mapstructure:"Inputs,omitempty"` + + // Outputs corresponds to the JSON schema field "Outputs". + Outputs *Output `json:"Outputs,omitempty" yaml:"Outputs,omitempty" mapstructure:"Outputs,omitempty"` +} + +type Config struct { + // ContractAddress corresponds to the JSON schema field "ContractAddress". + ContractAddress string `json:"ContractAddress" yaml:"ContractAddress" mapstructure:"ContractAddress"` + + // ContractName corresponds to the JSON schema field "ContractName". + ContractName string `json:"ContractName" yaml:"ContractName" mapstructure:"ContractName"` + + // ContractReaderConfig corresponds to the JSON schema field + // "ContractReaderConfig". + ContractReaderConfig string `json:"ContractReaderConfig" yaml:"ContractReaderConfig" mapstructure:"ContractReaderConfig"` + + // ReadIdentifier corresponds to the JSON schema field "ReadIdentifier". + ReadIdentifier string `json:"ReadIdentifier" yaml:"ReadIdentifier" mapstructure:"ReadIdentifier"` +} + +// UnmarshalJSON implements json.Unmarshaler. +func (j *Config) UnmarshalJSON(b []byte) error { + var raw map[string]interface{} + if err := json.Unmarshal(b, &raw); err != nil { + return err + } + if _, ok := raw["ContractAddress"]; raw != nil && !ok { + return fmt.Errorf("field ContractAddress in Config: required") + } + if _, ok := raw["ContractName"]; raw != nil && !ok { + return fmt.Errorf("field ContractName in Config: required") + } + if _, ok := raw["ContractReaderConfig"]; raw != nil && !ok { + return fmt.Errorf("field ContractReaderConfig in Config: required") + } + if _, ok := raw["ReadIdentifier"]; raw != nil && !ok { + return fmt.Errorf("field ReadIdentifier in Config: required") + } + type Plain Config + var plain Plain + if err := json.Unmarshal(b, &plain); err != nil { + return err + } + *j = Config(plain) + return nil +} + +type Input struct { + // ConfidenceLevel corresponds to the JSON schema field "ConfidenceLevel". + ConfidenceLevel string `json:"ConfidenceLevel" yaml:"ConfidenceLevel" mapstructure:"ConfidenceLevel"` + + // Params corresponds to the JSON schema field "Params". + Params InputParams `json:"Params" yaml:"Params" mapstructure:"Params"` +} + +type InputParams map[string]interface{} + +// UnmarshalJSON implements json.Unmarshaler. +func (j *Input) UnmarshalJSON(b []byte) error { + var raw map[string]interface{} + if err := json.Unmarshal(b, &raw); err != nil { + return err + } + if _, ok := raw["ConfidenceLevel"]; raw != nil && !ok { + return fmt.Errorf("field ConfidenceLevel in Input: required") + } + if _, ok := raw["Params"]; raw != nil && !ok { + return fmt.Errorf("field Params in Input: required") + } + type Plain Input + var plain Plain + if err := json.Unmarshal(b, &plain); err != nil { + return err + } + *j = Input(plain) + return nil +} + +type Output struct { + // LatestValue corresponds to the JSON schema field "LatestValue". + LatestValue interface{} `json:"LatestValue" yaml:"LatestValue" mapstructure:"LatestValue"` +} + +// UnmarshalJSON implements json.Unmarshaler. +func (j *Output) UnmarshalJSON(b []byte) error { + var raw map[string]interface{} + if err := json.Unmarshal(b, &raw); err != nil { + return err + } + if _, ok := raw["LatestValue"]; raw != nil && !ok { + return fmt.Errorf("field LatestValue in Output: required") + } + type Plain Output + var plain Plain + if err := json.Unmarshal(b, &plain); err != nil { + return err + } + *j = Output(plain) + return nil +} diff --git a/pkg/capabilities/actions/readcontract/readcontracttest/action_mock_generated.go b/pkg/capabilities/actions/readcontract/readcontracttest/action_mock_generated.go new file mode 100644 index 000000000..ca5f1b321 --- /dev/null +++ b/pkg/capabilities/actions/readcontract/readcontracttest/action_mock_generated.go @@ -0,0 +1,27 @@ +// Code generated by github.com/smartcontractkit/chainlink-common/pkg/capabilities/cli, DO NOT EDIT. + +// Code generated by github.com/smartcontractkit/chainlink-common/pkg/capabilities/cli, DO NOT EDIT. + +package readcontracttest + +import ( + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/actions/readcontract" + "github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk/testutils" +) + +// Action registers a new capability mock with the runner +// if another mock is registered for the same capability with for a step, it will take priority for that step. +func Action(runner *testutils.Runner, fn func(input readcontract.Input) (readcontract.Output, error)) *testutils.Mock[readcontract.Input, readcontract.Output] { + mock := testutils.MockCapability[readcontract.Input, readcontract.Output]("read-contract-action@1.0.0", fn) + runner.MockCapability("read-contract-action@1.0.0", nil, mock) + return mock +} + +// ActionForStep registers a new capability mock with the runner, but only for a given step. +// if another mock was registered for the same capability without a step, this mock will take priority for that step. +func ActionForStep(runner *testutils.Runner, step string, mockFn func(input readcontract.Input) (readcontract.Output, error)) *testutils.Mock[readcontract.Input, readcontract.Output] { + fn := mockFn + mock := testutils.MockCapability[readcontract.Input, readcontract.Output]("read-contract-action@1.0.0", fn) + runner.MockCapability("read-contract-action@1.0.0", &step, mock) + return mock +} diff --git a/pkg/capabilities/triggers/cron/cron_trigger-schema.json b/pkg/capabilities/triggers/cron/cron_trigger-schema.json new file mode 100644 index 000000000..5710378a4 --- /dev/null +++ b/pkg/capabilities/triggers/cron/cron_trigger-schema.json @@ -0,0 +1,43 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers/cron/cron-trigger@1.0.0", + "$defs": { + "Payload": { + "type": "object", + "properties": { + "ActualExecutionTime": { + "type": "string", + "description": "Time that cron trigger's task execution occurred (RFC3339Nano formatted)" + }, + "ScheduledExecutionTime": { + "type": "string", + "description": "Time that cron trigger's task execution had been scheduled to occur (RFC3339Nano formatted)" + } + }, + "required": ["ActualExecutionTime", "ScheduledExecutionTime"], + "additionalProperties": false + }, + "Config": { + "type": "object", + "properties": { + "schedule": { + "type": "string" + } + }, + "required": ["schedule"], + "additionalProperties": false + } + }, + "type": "object", + "properties": { + "config": { + "$ref": "#/$defs/Config" + }, + "outputs": { + "$ref": "#/$defs/Payload" + } + }, + "required": ["config", "outputs"], + "additionalProperties": false, + "description": "A trigger that uses a cron schedule to run periodically at fixed times, dates, or intervals." +} \ No newline at end of file diff --git a/pkg/capabilities/triggers/cron/cron_trigger_generated.go b/pkg/capabilities/triggers/cron/cron_trigger_generated.go new file mode 100644 index 000000000..5c2e2a5c0 --- /dev/null +++ b/pkg/capabilities/triggers/cron/cron_trigger_generated.go @@ -0,0 +1,92 @@ +// Code generated by github.com/smartcontractkit/chainlink-common/pkg/capabilities/cli, DO NOT EDIT. + +package cron + +import ( + "encoding/json" + "fmt" +) + +type Config struct { + // Schedule corresponds to the JSON schema field "schedule". + Schedule string `json:"schedule" yaml:"schedule" mapstructure:"schedule"` +} + +// UnmarshalJSON implements json.Unmarshaler. +func (j *Config) UnmarshalJSON(b []byte) error { + var raw map[string]interface{} + if err := json.Unmarshal(b, &raw); err != nil { + return err + } + if _, ok := raw["schedule"]; raw != nil && !ok { + return fmt.Errorf("field schedule in Config: required") + } + type Plain Config + var plain Plain + if err := json.Unmarshal(b, &plain); err != nil { + return err + } + *j = Config(plain) + return nil +} + +type Payload struct { + // Time that cron trigger's task execution occurred (RFC3339Nano formatted) + ActualExecutionTime string `json:"ActualExecutionTime" yaml:"ActualExecutionTime" mapstructure:"ActualExecutionTime"` + + // Time that cron trigger's task execution had been scheduled to occur + // (RFC3339Nano formatted) + ScheduledExecutionTime string `json:"ScheduledExecutionTime" yaml:"ScheduledExecutionTime" mapstructure:"ScheduledExecutionTime"` +} + +// UnmarshalJSON implements json.Unmarshaler. +func (j *Payload) UnmarshalJSON(b []byte) error { + var raw map[string]interface{} + if err := json.Unmarshal(b, &raw); err != nil { + return err + } + if _, ok := raw["ActualExecutionTime"]; raw != nil && !ok { + return fmt.Errorf("field ActualExecutionTime in Payload: required") + } + if _, ok := raw["ScheduledExecutionTime"]; raw != nil && !ok { + return fmt.Errorf("field ScheduledExecutionTime in Payload: required") + } + type Plain Payload + var plain Plain + if err := json.Unmarshal(b, &plain); err != nil { + return err + } + *j = Payload(plain) + return nil +} + +// A trigger that uses a cron schedule to run periodically at fixed times, dates, +// or intervals. +type Trigger struct { + // Config corresponds to the JSON schema field "config". + Config Config `json:"config" yaml:"config" mapstructure:"config"` + + // Outputs corresponds to the JSON schema field "outputs". + Outputs Payload `json:"outputs" yaml:"outputs" mapstructure:"outputs"` +} + +// UnmarshalJSON implements json.Unmarshaler. +func (j *Trigger) UnmarshalJSON(b []byte) error { + var raw map[string]interface{} + if err := json.Unmarshal(b, &raw); err != nil { + return err + } + if _, ok := raw["config"]; raw != nil && !ok { + return fmt.Errorf("field config in Trigger: required") + } + if _, ok := raw["outputs"]; raw != nil && !ok { + return fmt.Errorf("field outputs in Trigger: required") + } + type Plain Trigger + var plain Plain + if err := json.Unmarshal(b, &plain); err != nil { + return err + } + *j = Trigger(plain) + return nil +} diff --git a/pkg/capabilities/triggers/cron/crontest/trigger_mock_generated.go b/pkg/capabilities/triggers/cron/crontest/trigger_mock_generated.go new file mode 100644 index 000000000..ad683b1d5 --- /dev/null +++ b/pkg/capabilities/triggers/cron/crontest/trigger_mock_generated.go @@ -0,0 +1,17 @@ +// Code generated by github.com/smartcontractkit/chainlink-common/pkg/capabilities/cli, DO NOT EDIT. + +// Code generated by github.com/smartcontractkit/chainlink-common/pkg/capabilities/cli, DO NOT EDIT. + +package crontest + +import ( + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers/cron" + "github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk/testutils" +) + +// Trigger registers a new capability mock with the runner +func Trigger(runner *testutils.Runner, fn func() (cron.Payload, error)) *testutils.TriggerMock[cron.Payload] { + mock := testutils.MockTrigger[cron.Payload]("cron-trigger@1.0.0", fn) + runner.MockCapability("cron-trigger@1.0.0", nil, mock) + return mock +} diff --git a/pkg/capabilities/triggers/cron/trigger_builders_generated.go b/pkg/capabilities/triggers/cron/trigger_builders_generated.go new file mode 100644 index 000000000..e84dac176 --- /dev/null +++ b/pkg/capabilities/triggers/cron/trigger_builders_generated.go @@ -0,0 +1,84 @@ +// Code generated by github.com/smartcontractkit/chainlink-common/pkg/capabilities/cli, DO NOT EDIT. + +package cron + +import ( + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk" +) + +func (cfg Config) New(w *sdk.WorkflowSpecFactory) PayloadCap { + ref := "trigger" + def := sdk.StepDefinition{ + ID: "cron-trigger@1.0.0", Ref: ref, + Inputs: sdk.StepInputs{}, + Config: map[string]any{ + "schedule": cfg.Schedule, + }, + CapabilityType: capabilities.CapabilityTypeTrigger, + } + + step := sdk.Step[Payload]{Definition: def} + raw := step.AddTo(w) + return PayloadWrapper(raw) +} + +// PayloadWrapper allows access to field from an sdk.CapDefinition[Payload] +func PayloadWrapper(raw sdk.CapDefinition[Payload]) PayloadCap { + wrapped, ok := raw.(PayloadCap) + if ok { + return wrapped + } + return &payloadCap{CapDefinition: raw} +} + +type PayloadCap interface { + sdk.CapDefinition[Payload] + ActualExecutionTime() sdk.CapDefinition[string] + ScheduledExecutionTime() sdk.CapDefinition[string] + private() +} + +type payloadCap struct { + sdk.CapDefinition[Payload] +} + +func (*payloadCap) private() {} +func (c *payloadCap) ActualExecutionTime() sdk.CapDefinition[string] { + return sdk.AccessField[Payload, string](c.CapDefinition, "ActualExecutionTime") +} +func (c *payloadCap) ScheduledExecutionTime() sdk.CapDefinition[string] { + return sdk.AccessField[Payload, string](c.CapDefinition, "ScheduledExecutionTime") +} + +func ConstantPayload(value Payload) PayloadCap { + return &payloadCap{CapDefinition: sdk.ConstantDefinition(value)} +} + +func NewPayloadFromFields( + actualExecutionTime sdk.CapDefinition[string], + scheduledExecutionTime sdk.CapDefinition[string]) PayloadCap { + return &simplePayload{ + CapDefinition: sdk.ComponentCapDefinition[Payload]{ + "ActualExecutionTime": actualExecutionTime.Ref(), + "ScheduledExecutionTime": scheduledExecutionTime.Ref(), + }, + actualExecutionTime: actualExecutionTime, + scheduledExecutionTime: scheduledExecutionTime, + } +} + +type simplePayload struct { + sdk.CapDefinition[Payload] + actualExecutionTime sdk.CapDefinition[string] + scheduledExecutionTime sdk.CapDefinition[string] +} + +func (c *simplePayload) ActualExecutionTime() sdk.CapDefinition[string] { + return c.actualExecutionTime +} +func (c *simplePayload) ScheduledExecutionTime() sdk.CapDefinition[string] { + return c.scheduledExecutionTime +} + +func (c *simplePayload) private() {} From 0b03fa331a49577ad30b8b780e0bc8070bd58328 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko <34754799+dhaidashenko@users.noreply.github.com> Date: Wed, 11 Dec 2024 20:22:25 +0100 Subject: [PATCH 4/4] BCFR-1086 finality violation (#966) * define finality violation error Signed-off-by: Dmytro Haidashenko * rename finality violation Signed-off-by: Dmytro Haidashenko * Test ContainsError Signed-off-by: Dmytro Haidashenko --------- Signed-off-by: Dmytro Haidashenko Co-authored-by: Domino Valdano --- pkg/services/health.go | 11 +++++++ pkg/services/health_test.go | 58 ++++++++++++++++++++++++++++++++++++ pkg/types/contract_reader.go | 8 +++++ 3 files changed, 77 insertions(+) create mode 100644 pkg/services/health_test.go diff --git a/pkg/services/health.go b/pkg/services/health.go index 7bdfb5113..7108e53b6 100644 --- a/pkg/services/health.go +++ b/pkg/services/health.go @@ -257,3 +257,14 @@ func (c *HealthChecker) IsHealthy() (healthy bool, errors map[string]error) { return } + +// ContainsError - returns true if report contains targetErr +func ContainsError(report map[string]error, targetErr error) bool { + for _, err := range report { + if errors.Is(err, targetErr) { + return true + } + } + + return false +} diff --git a/pkg/services/health_test.go b/pkg/services/health_test.go new file mode 100644 index 000000000..325d2cf20 --- /dev/null +++ b/pkg/services/health_test.go @@ -0,0 +1,58 @@ +package services + +import ( + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestContainsError(t *testing.T) { + anError := errors.New("an error") + anotherError := errors.New("another error") + testCases := []struct { + Name string + Report map[string]error + Target error + ExpectedResult bool + }{ + { + Name: "nil map", + Report: nil, + Target: anError, + ExpectedResult: false, + }, + { + Name: "report contains service, but it's healthy", + Report: map[string]error{"service": nil}, + Target: anError, + ExpectedResult: false, + }, + { + Name: "service is not healthy, but it's not caused by target error", + Report: map[string]error{"service": anotherError}, + Target: anError, + ExpectedResult: false, + }, + { + Name: "service is not healthy and contains wrapper target", + Report: map[string]error{"service": fmt.Errorf("wrapped error: %w", anError)}, + Target: anError, + ExpectedResult: true, + }, + { + Name: "service is not healthy due to multiple errors including target", + Report: map[string]error{"service": errors.Join(anError, anotherError)}, + Target: anError, + ExpectedResult: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.Name, func(t *testing.T) { + actualResult := ContainsError(tc.Report, tc.Target) + assert.Equal(t, tc.ExpectedResult, actualResult) + }) + } +} diff --git a/pkg/types/contract_reader.go b/pkg/types/contract_reader.go index 206e77ae4..5d317de77 100644 --- a/pkg/types/contract_reader.go +++ b/pkg/types/contract_reader.go @@ -16,6 +16,7 @@ const ( ErrContractReaderConfigMissing = UnimplementedError("ContractReader entry missing from RelayConfig") ErrInternal = InternalError("internal error") ErrNotFound = NotFoundError("not found") + ErrFinalityViolated = InternalError("finality violated") ) // ContractReader defines essential read operations a chain should implement for reading contract values and events. @@ -70,6 +71,13 @@ type ContractReader interface { // The iterator returns a pair of key and sequence. QueryKeys(ctx context.Context, filters []ContractKeyFilter, limitAndSort query.LimitAndSort) (iter.Seq2[string, Sequence], error) + // HealthReport returns a full health report of the callee including its dependencies. + // Keys are based on Name(), with nil values when healthy or errors otherwise. + // Use CopyHealth to collect reports from sub-services. + // This should run very fast, so avoid doing computation and instead prefer reporting pre-calculated state. + // On finality violation report must contain at least one ErrFinalityViolation. + HealthReport() map[string]error + mustEmbedUnimplementedContractReader() }