From 5050b794452bfcff2a37a2fa7224848029cfd46d Mon Sep 17 00:00:00 2001 From: Cedric Date: Fri, 1 Nov 2024 11:28:48 +0000 Subject: [PATCH] Add context to message emitter (#15059) * [CAPPL-182] Add context to message emitter * Run go generate --- .../logeventcap/trigger_builders_generated.go | 74 ++++++++++++------ .../webapicap/trigger_builders_generated.go | 78 ++++++++++++------- core/scripts/go.mod | 2 +- core/scripts/go.sum | 4 +- core/services/workflows/delegate.go | 9 ++- core/services/workflows/engine.go | 35 +++++---- core/services/workflows/engine_test.go | 2 +- deployment/go.mod | 2 +- deployment/go.sum | 4 +- go.mod | 2 +- go.sum | 4 +- integration-tests/go.mod | 2 +- integration-tests/go.sum | 4 +- integration-tests/load/go.mod | 2 +- integration-tests/load/go.sum | 4 +- 15 files changed, 139 insertions(+), 89 deletions(-) diff --git a/core/capabilities/triggers/logevent/logeventcap/trigger_builders_generated.go b/core/capabilities/triggers/logevent/logeventcap/trigger_builders_generated.go index 8788f005d63..c256050fc08 100644 --- a/core/capabilities/triggers/logevent/logeventcap/trigger_builders_generated.go +++ b/core/capabilities/triggers/logevent/logeventcap/trigger_builders_generated.go @@ -22,7 +22,17 @@ func (cfg Config) New(w *sdk.WorkflowSpecFactory, id string) OutputCap { } step := sdk.Step[Output]{Definition: def} - return OutputCapFromStep(w, step) + raw := step.AddTo(w) + return OutputWrapper(raw) +} + +// HeadWrapper allows access to field from an sdk.CapDefinition[Head] +func HeadWrapper(raw sdk.CapDefinition[Head]) HeadCap { + wrapped, ok := raw.(HeadCap) + if ok { + return wrapped + } + return &headCap{CapDefinition: raw} } type HeadCap interface { @@ -33,27 +43,25 @@ type HeadCap interface { private() } -// HeadCapFromStep should only be called from generated code to assure type safety -func HeadCapFromStep(w *sdk.WorkflowSpecFactory, step sdk.Step[Head]) HeadCap { - raw := step.AddTo(w) - return &head{CapDefinition: raw} -} - -type head struct { +type headCap struct { sdk.CapDefinition[Head] } -func (*head) private() {} -func (c *head) Hash() sdk.CapDefinition[string] { +func (*headCap) private() {} +func (c *headCap) Hash() sdk.CapDefinition[string] { return sdk.AccessField[Head, string](c.CapDefinition, "Hash") } -func (c *head) Height() sdk.CapDefinition[string] { +func (c *headCap) Height() sdk.CapDefinition[string] { return sdk.AccessField[Head, string](c.CapDefinition, "Height") } -func (c *head) Timestamp() sdk.CapDefinition[uint64] { +func (c *headCap) Timestamp() sdk.CapDefinition[uint64] { return sdk.AccessField[Head, uint64](c.CapDefinition, "Timestamp") } +func ConstantHead(value Head) HeadCap { + return &headCap{CapDefinition: sdk.ConstantDefinition(value)} +} + func NewHeadFromFields( hash sdk.CapDefinition[string], height sdk.CapDefinition[string], @@ -89,6 +97,15 @@ func (c *simpleHead) Timestamp() sdk.CapDefinition[uint64] { func (c *simpleHead) private() {} +// OutputWrapper allows access to field from an sdk.CapDefinition[Output] +func OutputWrapper(raw sdk.CapDefinition[Output]) OutputCap { + wrapped, ok := raw.(OutputCap) + if ok { + return wrapped + } + return &outputCap{CapDefinition: raw} +} + type OutputCap interface { sdk.CapDefinition[Output] Cursor() sdk.CapDefinition[string] @@ -97,25 +114,23 @@ type OutputCap interface { private() } -// OutputCapFromStep should only be called from generated code to assure type safety -func OutputCapFromStep(w *sdk.WorkflowSpecFactory, step sdk.Step[Output]) OutputCap { - raw := step.AddTo(w) - return &output{CapDefinition: raw} -} - -type output struct { +type outputCap struct { sdk.CapDefinition[Output] } -func (*output) private() {} -func (c *output) Cursor() sdk.CapDefinition[string] { +func (*outputCap) private() {} +func (c *outputCap) Cursor() sdk.CapDefinition[string] { return sdk.AccessField[Output, string](c.CapDefinition, "Cursor") } -func (c *output) Data() OutputDataCap { - return OutputDataCap(sdk.AccessField[Output, OutputData](c.CapDefinition, "Data")) +func (c *outputCap) Data() OutputDataCap { + return OutputDataWrapper(sdk.AccessField[Output, OutputData](c.CapDefinition, "Data")) } -func (c *output) Head() HeadCap { - return &head{CapDefinition: sdk.AccessField[Output, Head](c.CapDefinition, "Head")} +func (c *outputCap) Head() HeadCap { + return HeadWrapper(sdk.AccessField[Output, Head](c.CapDefinition, "Head")) +} + +func ConstantOutput(value Output) OutputCap { + return &outputCap{CapDefinition: sdk.ConstantDefinition(value)} } func NewOutputFromFields( @@ -153,4 +168,13 @@ func (c *simpleOutput) Head() HeadCap { func (c *simpleOutput) private() {} +// OutputDataWrapper allows access to field from an sdk.CapDefinition[OutputData] +func OutputDataWrapper(raw sdk.CapDefinition[OutputData]) OutputDataCap { + wrapped, ok := raw.(OutputDataCap) + if ok { + return wrapped + } + return OutputDataCap(raw) +} + type OutputDataCap sdk.CapDefinition[OutputData] diff --git a/core/capabilities/webapi/webapicap/trigger_builders_generated.go b/core/capabilities/webapi/webapicap/trigger_builders_generated.go index 296146d4666..95f78ce5cb4 100644 --- a/core/capabilities/webapi/webapicap/trigger_builders_generated.go +++ b/core/capabilities/webapi/webapicap/trigger_builders_generated.go @@ -22,7 +22,17 @@ func (cfg TriggerConfig) New(w *sdk.WorkflowSpecFactory) TriggerRequestPayloadCa } step := sdk.Step[TriggerRequestPayload]{Definition: def} - return TriggerRequestPayloadCapFromStep(w, step) + raw := step.AddTo(w) + return TriggerRequestPayloadWrapper(raw) +} + +// RateLimiterConfigWrapper allows access to field from an sdk.CapDefinition[RateLimiterConfig] +func RateLimiterConfigWrapper(raw sdk.CapDefinition[RateLimiterConfig]) RateLimiterConfigCap { + wrapped, ok := raw.(RateLimiterConfigCap) + if ok { + return wrapped + } + return &rateLimiterConfigCap{CapDefinition: raw} } type RateLimiterConfigCap interface { @@ -34,30 +44,28 @@ type RateLimiterConfigCap interface { private() } -// RateLimiterConfigCapFromStep should only be called from generated code to assure type safety -func RateLimiterConfigCapFromStep(w *sdk.WorkflowSpecFactory, step sdk.Step[RateLimiterConfig]) RateLimiterConfigCap { - raw := step.AddTo(w) - return &rateLimiterConfig{CapDefinition: raw} -} - -type rateLimiterConfig struct { +type rateLimiterConfigCap struct { sdk.CapDefinition[RateLimiterConfig] } -func (*rateLimiterConfig) private() {} -func (c *rateLimiterConfig) GlobalBurst() sdk.CapDefinition[int64] { +func (*rateLimiterConfigCap) private() {} +func (c *rateLimiterConfigCap) GlobalBurst() sdk.CapDefinition[int64] { return sdk.AccessField[RateLimiterConfig, int64](c.CapDefinition, "globalBurst") } -func (c *rateLimiterConfig) GlobalRPS() sdk.CapDefinition[float64] { +func (c *rateLimiterConfigCap) GlobalRPS() sdk.CapDefinition[float64] { return sdk.AccessField[RateLimiterConfig, float64](c.CapDefinition, "globalRPS") } -func (c *rateLimiterConfig) PerSenderBurst() sdk.CapDefinition[int64] { +func (c *rateLimiterConfigCap) PerSenderBurst() sdk.CapDefinition[int64] { return sdk.AccessField[RateLimiterConfig, int64](c.CapDefinition, "perSenderBurst") } -func (c *rateLimiterConfig) PerSenderRPS() sdk.CapDefinition[float64] { +func (c *rateLimiterConfigCap) PerSenderRPS() sdk.CapDefinition[float64] { return sdk.AccessField[RateLimiterConfig, float64](c.CapDefinition, "perSenderRPS") } +func ConstantRateLimiterConfig(value RateLimiterConfig) RateLimiterConfigCap { + return &rateLimiterConfigCap{CapDefinition: sdk.ConstantDefinition(value)} +} + func NewRateLimiterConfigFromFields( globalBurst sdk.CapDefinition[int64], globalRPS sdk.CapDefinition[float64], @@ -100,6 +108,15 @@ func (c *simpleRateLimiterConfig) PerSenderRPS() sdk.CapDefinition[float64] { func (c *simpleRateLimiterConfig) private() {} +// TriggerRequestPayloadWrapper allows access to field from an sdk.CapDefinition[TriggerRequestPayload] +func TriggerRequestPayloadWrapper(raw sdk.CapDefinition[TriggerRequestPayload]) TriggerRequestPayloadCap { + wrapped, ok := raw.(TriggerRequestPayloadCap) + if ok { + return wrapped + } + return &triggerRequestPayloadCap{CapDefinition: raw} +} + type TriggerRequestPayloadCap interface { sdk.CapDefinition[TriggerRequestPayload] Params() TriggerRequestPayloadParamsCap @@ -110,33 +127,31 @@ type TriggerRequestPayloadCap interface { private() } -// TriggerRequestPayloadCapFromStep should only be called from generated code to assure type safety -func TriggerRequestPayloadCapFromStep(w *sdk.WorkflowSpecFactory, step sdk.Step[TriggerRequestPayload]) TriggerRequestPayloadCap { - raw := step.AddTo(w) - return &triggerRequestPayload{CapDefinition: raw} -} - -type triggerRequestPayload struct { +type triggerRequestPayloadCap struct { sdk.CapDefinition[TriggerRequestPayload] } -func (*triggerRequestPayload) private() {} -func (c *triggerRequestPayload) Params() TriggerRequestPayloadParamsCap { - return TriggerRequestPayloadParamsCap(sdk.AccessField[TriggerRequestPayload, TriggerRequestPayloadParams](c.CapDefinition, "params")) +func (*triggerRequestPayloadCap) private() {} +func (c *triggerRequestPayloadCap) Params() TriggerRequestPayloadParamsCap { + return TriggerRequestPayloadParamsWrapper(sdk.AccessField[TriggerRequestPayload, TriggerRequestPayloadParams](c.CapDefinition, "params")) } -func (c *triggerRequestPayload) Timestamp() sdk.CapDefinition[int64] { +func (c *triggerRequestPayloadCap) Timestamp() sdk.CapDefinition[int64] { return sdk.AccessField[TriggerRequestPayload, int64](c.CapDefinition, "timestamp") } -func (c *triggerRequestPayload) Topics() sdk.CapDefinition[[]string] { +func (c *triggerRequestPayloadCap) Topics() sdk.CapDefinition[[]string] { return sdk.AccessField[TriggerRequestPayload, []string](c.CapDefinition, "topics") } -func (c *triggerRequestPayload) TriggerEventId() sdk.CapDefinition[string] { +func (c *triggerRequestPayloadCap) TriggerEventId() sdk.CapDefinition[string] { return sdk.AccessField[TriggerRequestPayload, string](c.CapDefinition, "trigger_event_id") } -func (c *triggerRequestPayload) TriggerId() sdk.CapDefinition[string] { +func (c *triggerRequestPayloadCap) TriggerId() sdk.CapDefinition[string] { return sdk.AccessField[TriggerRequestPayload, string](c.CapDefinition, "trigger_id") } +func ConstantTriggerRequestPayload(value TriggerRequestPayload) TriggerRequestPayloadCap { + return &triggerRequestPayloadCap{CapDefinition: sdk.ConstantDefinition(value)} +} + func NewTriggerRequestPayloadFromFields( params TriggerRequestPayloadParamsCap, timestamp sdk.CapDefinition[int64], @@ -186,4 +201,13 @@ func (c *simpleTriggerRequestPayload) TriggerId() sdk.CapDefinition[string] { func (c *simpleTriggerRequestPayload) private() {} +// TriggerRequestPayloadParamsWrapper allows access to field from an sdk.CapDefinition[TriggerRequestPayloadParams] +func TriggerRequestPayloadParamsWrapper(raw sdk.CapDefinition[TriggerRequestPayloadParams]) TriggerRequestPayloadParamsCap { + wrapped, ok := raw.(TriggerRequestPayloadParamsCap) + if ok { + return wrapped + } + return TriggerRequestPayloadParamsCap(raw) +} + type TriggerRequestPayloadParamsCap sdk.CapDefinition[TriggerRequestPayloadParams] diff --git a/core/scripts/go.mod b/core/scripts/go.mod index 45a09161e8c..8e1e4d33fdf 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -24,7 +24,7 @@ require ( github.com/prometheus/client_golang v1.20.5 github.com/shopspring/decimal v1.4.0 github.com/smartcontractkit/chainlink-automation v0.8.1 - github.com/smartcontractkit/chainlink-common v0.3.1-0.20241025132045-cfad02139595 + github.com/smartcontractkit/chainlink-common v0.3.1-0.20241101093830-33711d0c3de7 github.com/smartcontractkit/chainlink/deployment v0.0.0-00010101000000-000000000000 github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000 github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12 diff --git a/core/scripts/go.sum b/core/scripts/go.sum index 456f5ec5c38..6f49d99ee94 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -1092,8 +1092,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241031135640-ac3278008a73 h1:F+WjcRCHZ4lVk69qDXMb4GpZ2+8kx/j93PreuK+8xe8= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241031135640-ac3278008a73/go.mod h1:4adKaHNaxFsRvV/lYfqtbsWyyvIPUMLR0FdOJN/ljis= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241025132045-cfad02139595 h1:H6i0LEvXB0se/63E3jE9N0/7TugOYLpK4e6TT6a0omc= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241025132045-cfad02139595/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241101093830-33711d0c3de7 h1:AGi0kAtMRW1zl1h7sGw+3CKO4Nlev6iA08YfEcgJCGs= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241101093830-33711d0c3de7/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f h1:BwrIaQIx5Iy6eT+DfLhFfK2XqjxRm74mVdlX8gbu4dw= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f/go.mod h1:wHtwSR3F1CQSJJZDQKuqaqFYnvkT+kMyget7dl8Clvo= github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e h1:JiETqdNM0bktAUGMc62COwXIaw3rR3M77Me6bBLG0Fg= diff --git a/core/services/workflows/delegate.go b/core/services/workflows/delegate.go index 8da7b937b6b..7cba967115e 100644 --- a/core/services/workflows/delegate.go +++ b/core/services/workflows/delegate.go @@ -6,6 +6,7 @@ import ( "github.com/google/uuid" "github.com/pelletier/go-toml" + "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/types/core" @@ -41,19 +42,19 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser cma := custmsg.NewLabeler().With(wIDKey, spec.WorkflowSpec.WorkflowID, woIDKey, spec.WorkflowSpec.WorkflowOwner, wnKey, spec.WorkflowSpec.WorkflowName) sdkSpec, err := spec.WorkflowSpec.SDKSpec(ctx) if err != nil { - logCustMsg(cma, fmt.Sprintf("failed to start workflow engine: failed to get workflow sdk spec: %v", err), d.logger) + logCustMsg(ctx, cma, fmt.Sprintf("failed to start workflow engine: failed to get workflow sdk spec: %v", err), d.logger) return nil, err } binary, err := spec.WorkflowSpec.RawSpec(ctx) if err != nil { - logCustMsg(cma, fmt.Sprintf("failed to start workflow engine: failed to fetch workflow spec binary: %v", err), d.logger) + logCustMsg(ctx, cma, fmt.Sprintf("failed to start workflow engine: failed to fetch workflow spec binary: %v", err), d.logger) return nil, err } config, err := spec.WorkflowSpec.GetConfig(ctx) if err != nil { - logCustMsg(cma, fmt.Sprintf("failed to start workflow engine: failed to get workflow spec config: %v", err), d.logger) + logCustMsg(ctx, cma, fmt.Sprintf("failed to start workflow engine: failed to get workflow spec config: %v", err), d.logger) return nil, err } @@ -69,7 +70,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser Binary: binary, SecretsFetcher: d.secretsFetcher, } - engine, err := NewEngine(cfg) + engine, err := NewEngine(ctx, cfg) if err != nil { return nil, err } diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index 7550d06cf95..142ca49f369 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -169,7 +169,7 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error { if err != nil { log := e.logger.With(cIDKey, t.ID) log.Errorf("failed to get trigger capability: %s", err) - logCustMsg(e.cma.With(cIDKey, t.ID), fmt.Sprintf("failed to resolve trigger: %s", err), log) + logCustMsg(ctx, e.cma.With(cIDKey, t.ID), fmt.Sprintf("failed to resolve trigger: %s", err), log) // we don't immediately return here, since we want to retry all triggers // to notify the user of all errors at once. triggersInitialized = false @@ -200,6 +200,7 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error { err := e.initializeCapability(ctx, s) if err != nil { logCustMsg( + ctx, e.cma.With(wIDKey, e.workflow.id, sIDKey, s.ID, sRKey, s.Ref), fmt.Sprintf("failed to initialize capability for step: %s", err), e.logger, @@ -325,7 +326,7 @@ func (e *Engine) init(ctx context.Context) { if retryErr != nil { e.logger.Errorf("initialization failed: %s", retryErr) - logCustMsg(e.cma, fmt.Sprintf("workflow registration failed: %s", retryErr), e.logger) + logCustMsg(ctx, e.cma, fmt.Sprintf("workflow registration failed: %s", retryErr), e.logger) e.afterInit(false) return } @@ -342,12 +343,12 @@ func (e *Engine) init(ctx context.Context) { if terr != nil { log := e.logger.With(cIDKey, t.ID) log.Errorf("failed to register trigger: %s", terr) - logCustMsg(e.cma.With(cIDKey, t.ID), fmt.Sprintf("failed to register trigger: %s", terr), log) + logCustMsg(ctx, e.cma.With(cIDKey, t.ID), fmt.Sprintf("failed to register trigger: %s", terr), log) } } e.logger.Info("engine initialized") - logCustMsg(e.cma, "workflow registered", e.logger) + logCustMsg(ctx, e.cma, "workflow registered", e.logger) e.afterInit(true) } @@ -617,7 +618,7 @@ func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate store.Workflow // This is to ensure that any side effects are executed consistently, since otherwise // the async nature of the workflow engine would provide no guarantees. } - logCustMsg(cma, "execution status: "+status, l) + logCustMsg(ctx, cma, "execution status: "+status, l) return e.finishExecution(ctx, state.ExecutionID, status) } @@ -725,10 +726,10 @@ func (e *Engine) worker(ctx context.Context) { err = e.startExecution(ctx, executionID, resp.Event.Outputs) if err != nil { e.logger.With(eIDKey, executionID).Errorf("failed to start execution: %v", err) - logCustMsg(cma, fmt.Sprintf("failed to start execution: %s", err), e.logger) + logCustMsg(ctx, cma, fmt.Sprintf("failed to start execution: %s", err), e.logger) } else { e.logger.With(eIDKey, executionID).Debug("execution started") - logCustMsg(cma, "execution started", e.logger) + logCustMsg(ctx, cma, "execution started", e.logger) } case <-ctx.Done(): return @@ -750,7 +751,7 @@ func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) { } // TODO ks-462 inputs - logCustMsg(cma, "executing step", l) + logCustMsg(ctx, cma, "executing step", l) inputs, outputs, err := e.executeStep(ctx, l, msg) var stepStatus string @@ -758,18 +759,18 @@ func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) { case errors.Is(capabilities.ErrStopExecution, err): lmsg := "step executed successfully with a termination" l.Info(lmsg) - logCustMsg(cma, lmsg, l) + logCustMsg(ctx, cma, lmsg, l) stepStatus = store.StatusCompletedEarlyExit case err != nil: lmsg := fmt.Sprintf("error executing step request: %s", err) l.Error(lmsg) - logCustMsg(cma, lmsg, l) + logCustMsg(ctx, cma, lmsg, l) stepStatus = store.StatusErrored default: lmsg := "step executed successfully" l.With("outputs", outputs).Info(lmsg) // TODO ks-462 emit custom message with outputs - logCustMsg(cma, lmsg, l) + logCustMsg(ctx, cma, lmsg, l) stepStatus = store.StatusCompleted } @@ -1050,7 +1051,7 @@ func (e *Engine) heartbeat(ctx context.Context) { return case <-ticker.C: e.metrics.incrementEngineHeartbeatCounter(ctx) - logCustMsg(e.cma, "engine heartbeat at: "+e.clock.Now().Format(time.RFC3339), e.logger) + logCustMsg(ctx, e.cma, "engine heartbeat at: "+e.clock.Now().Format(time.RFC3339), e.logger) } } } @@ -1114,7 +1115,7 @@ func (e *Engine) Close() error { if err != nil { return err } - logCustMsg(e.cma, "workflow unregistered", e.logger) + logCustMsg(ctx, e.cma, "workflow unregistered", e.logger) return nil }) } @@ -1152,7 +1153,7 @@ const ( defaultHeartbeatCadence = 5 * time.Minute ) -func NewEngine(cfg Config) (engine *Engine, err error) { +func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) { if cfg.Store == nil { return nil, &workflowError{reason: "store is nil", labels: map[string]string{ @@ -1208,7 +1209,7 @@ func NewEngine(cfg Config) (engine *Engine, err error) { cma := custmsg.NewLabeler().With(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, cfg.WorkflowName) workflow, err := Parse(cfg.Workflow) if err != nil { - logCustMsg(cma, fmt.Sprintf("failed to parse workflow: %s", err), cfg.Lggr) + logCustMsg(ctx, cma, fmt.Sprintf("failed to parse workflow: %s", err), cfg.Lggr) return nil, err } @@ -1278,8 +1279,8 @@ func (e *workflowError) Error() string { return errStr } -func logCustMsg(cma custmsg.MessageEmitter, msg string, log logger.Logger) { - err := cma.Emit(msg) +func logCustMsg(ctx context.Context, cma custmsg.MessageEmitter, msg string, log logger.Logger) { + err := cma.Emit(ctx, msg) if err != nil { log.Errorf("failed to send custom message with msg: %s, err: %v", msg, err) } diff --git a/core/services/workflows/engine_test.go b/core/services/workflows/engine_test.go index 9eea5a8f826..13f7bbd9d49 100644 --- a/core/services/workflows/engine_test.go +++ b/core/services/workflows/engine_test.go @@ -184,7 +184,7 @@ func newTestEngine(t *testing.T, reg *coreCap.Registry, sdkSpec sdk.WorkflowSpec if cfg.Store == nil { cfg.Store = newTestDBStore(t, cfg.clock) } - eng, err := NewEngine(cfg) + eng, err := NewEngine(testutils.Context(t), cfg) require.NoError(t, err) return eng, &testHooks{initSuccessful: initSuccessful, initFailed: initFailed, executionFinished: executionFinished} } diff --git a/deployment/go.mod b/deployment/go.mod index 4498a9f1ddc..dadaf29c2e8 100644 --- a/deployment/go.mod +++ b/deployment/go.mod @@ -24,7 +24,7 @@ require ( github.com/smartcontractkit/ccip-owner-contracts v0.0.0-20240926212305-a6deabdfce86 github.com/smartcontractkit/chain-selectors v1.0.27 github.com/smartcontractkit/chainlink-ccip v0.0.0-20241031135640-ac3278008a73 - github.com/smartcontractkit/chainlink-common v0.3.1-0.20241025132045-cfad02139595 + github.com/smartcontractkit/chainlink-common v0.3.1-0.20241101093830-33711d0c3de7 github.com/smartcontractkit/chainlink-protos/job-distributor v0.4.0 github.com/smartcontractkit/chainlink-testing-framework/lib v1.50.13 github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000 diff --git a/deployment/go.sum b/deployment/go.sum index 1c167d1f4f8..300c49658b9 100644 --- a/deployment/go.sum +++ b/deployment/go.sum @@ -1384,8 +1384,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241031135640-ac3278008a73 h1:F+WjcRCHZ4lVk69qDXMb4GpZ2+8kx/j93PreuK+8xe8= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241031135640-ac3278008a73/go.mod h1:4adKaHNaxFsRvV/lYfqtbsWyyvIPUMLR0FdOJN/ljis= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241025132045-cfad02139595 h1:H6i0LEvXB0se/63E3jE9N0/7TugOYLpK4e6TT6a0omc= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241025132045-cfad02139595/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241101093830-33711d0c3de7 h1:AGi0kAtMRW1zl1h7sGw+3CKO4Nlev6iA08YfEcgJCGs= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241101093830-33711d0c3de7/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f h1:BwrIaQIx5Iy6eT+DfLhFfK2XqjxRm74mVdlX8gbu4dw= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f/go.mod h1:wHtwSR3F1CQSJJZDQKuqaqFYnvkT+kMyget7dl8Clvo= github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e h1:JiETqdNM0bktAUGMc62COwXIaw3rR3M77Me6bBLG0Fg= diff --git a/go.mod b/go.mod index f097eaa6a68..21a41eec8c9 100644 --- a/go.mod +++ b/go.mod @@ -76,7 +76,7 @@ require ( github.com/smartcontractkit/chain-selectors v1.0.27 github.com/smartcontractkit/chainlink-automation v0.8.1 github.com/smartcontractkit/chainlink-ccip v0.0.0-20241031135640-ac3278008a73 - github.com/smartcontractkit/chainlink-common v0.3.1-0.20241025132045-cfad02139595 + github.com/smartcontractkit/chainlink-common v0.3.1-0.20241101093830-33711d0c3de7 github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e github.com/smartcontractkit/chainlink-feeds v0.1.1 diff --git a/go.sum b/go.sum index 1b2662e7768..161af1b06a1 100644 --- a/go.sum +++ b/go.sum @@ -1077,8 +1077,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241031135640-ac3278008a73 h1:F+WjcRCHZ4lVk69qDXMb4GpZ2+8kx/j93PreuK+8xe8= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241031135640-ac3278008a73/go.mod h1:4adKaHNaxFsRvV/lYfqtbsWyyvIPUMLR0FdOJN/ljis= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241025132045-cfad02139595 h1:H6i0LEvXB0se/63E3jE9N0/7TugOYLpK4e6TT6a0omc= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241025132045-cfad02139595/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241101093830-33711d0c3de7 h1:AGi0kAtMRW1zl1h7sGw+3CKO4Nlev6iA08YfEcgJCGs= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241101093830-33711d0c3de7/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f h1:BwrIaQIx5Iy6eT+DfLhFfK2XqjxRm74mVdlX8gbu4dw= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f/go.mod h1:wHtwSR3F1CQSJJZDQKuqaqFYnvkT+kMyget7dl8Clvo= github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e h1:JiETqdNM0bktAUGMc62COwXIaw3rR3M77Me6bBLG0Fg= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index 7530e47a7ac..fb95fcc84d2 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -36,7 +36,7 @@ require ( github.com/smartcontractkit/chain-selectors v1.0.27 github.com/smartcontractkit/chainlink-automation v0.8.1 github.com/smartcontractkit/chainlink-ccip v0.0.0-20241031135640-ac3278008a73 - github.com/smartcontractkit/chainlink-common v0.3.1-0.20241025132045-cfad02139595 + github.com/smartcontractkit/chainlink-common v0.3.1-0.20241101093830-33711d0c3de7 github.com/smartcontractkit/chainlink-protos/job-distributor v0.4.0 github.com/smartcontractkit/chainlink-testing-framework/havoc v1.50.2 github.com/smartcontractkit/chainlink-testing-framework/lib v1.50.13 diff --git a/integration-tests/go.sum b/integration-tests/go.sum index 688b791c8b2..09e8ef95805 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -1405,8 +1405,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241031135640-ac3278008a73 h1:F+WjcRCHZ4lVk69qDXMb4GpZ2+8kx/j93PreuK+8xe8= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241031135640-ac3278008a73/go.mod h1:4adKaHNaxFsRvV/lYfqtbsWyyvIPUMLR0FdOJN/ljis= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241025132045-cfad02139595 h1:H6i0LEvXB0se/63E3jE9N0/7TugOYLpK4e6TT6a0omc= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241025132045-cfad02139595/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241101093830-33711d0c3de7 h1:AGi0kAtMRW1zl1h7sGw+3CKO4Nlev6iA08YfEcgJCGs= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241101093830-33711d0c3de7/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f h1:BwrIaQIx5Iy6eT+DfLhFfK2XqjxRm74mVdlX8gbu4dw= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f/go.mod h1:wHtwSR3F1CQSJJZDQKuqaqFYnvkT+kMyget7dl8Clvo= github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e h1:JiETqdNM0bktAUGMc62COwXIaw3rR3M77Me6bBLG0Fg= diff --git a/integration-tests/load/go.mod b/integration-tests/load/go.mod index c5d84ae6427..508df8845c4 100644 --- a/integration-tests/load/go.mod +++ b/integration-tests/load/go.mod @@ -17,7 +17,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/rs/zerolog v1.33.0 github.com/slack-go/slack v0.15.0 - github.com/smartcontractkit/chainlink-common v0.3.1-0.20241025132045-cfad02139595 + github.com/smartcontractkit/chainlink-common v0.3.1-0.20241101093830-33711d0c3de7 github.com/smartcontractkit/chainlink-testing-framework/lib v1.50.13 github.com/smartcontractkit/chainlink-testing-framework/seth v1.50.5 github.com/smartcontractkit/chainlink-testing-framework/wasp v1.50.2 diff --git a/integration-tests/load/go.sum b/integration-tests/load/go.sum index 20e46b9d689..c205c5005ad 100644 --- a/integration-tests/load/go.sum +++ b/integration-tests/load/go.sum @@ -1394,8 +1394,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241031135640-ac3278008a73 h1:F+WjcRCHZ4lVk69qDXMb4GpZ2+8kx/j93PreuK+8xe8= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241031135640-ac3278008a73/go.mod h1:4adKaHNaxFsRvV/lYfqtbsWyyvIPUMLR0FdOJN/ljis= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241025132045-cfad02139595 h1:H6i0LEvXB0se/63E3jE9N0/7TugOYLpK4e6TT6a0omc= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241025132045-cfad02139595/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241101093830-33711d0c3de7 h1:AGi0kAtMRW1zl1h7sGw+3CKO4Nlev6iA08YfEcgJCGs= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241101093830-33711d0c3de7/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f h1:BwrIaQIx5Iy6eT+DfLhFfK2XqjxRm74mVdlX8gbu4dw= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f/go.mod h1:wHtwSR3F1CQSJJZDQKuqaqFYnvkT+kMyget7dl8Clvo= github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e h1:JiETqdNM0bktAUGMc62COwXIaw3rR3M77Me6bBLG0Fg=