From 21290c2d535b2163e4cc614ecf6b67ed9cf78907 Mon Sep 17 00:00:00 2001 From: Vyzaldy Sanchez Date: Thu, 31 Oct 2024 13:37:19 -0400 Subject: [PATCH] Workflow Engine beholder logging (#14958) * Adds heartbeat logging * Refactors `logCustMsg` to use `.Emit()` instead * Fixes error * Adds `Engine` beholder logging * Fixes logging error * Adds more logging * Removes redundant logging * Addressing review comments * Adds heartbeat metric --- core/services/workflows/delegate.go | 5 +++ core/services/workflows/engine.go | 44 +++++++++++++++++++++++++-- core/services/workflows/monitoring.go | 11 +++++++ 3 files changed, 57 insertions(+), 3 deletions(-) diff --git a/core/services/workflows/delegate.go b/core/services/workflows/delegate.go index 8289671916e..8da7b937b6b 100644 --- a/core/services/workflows/delegate.go +++ b/core/services/workflows/delegate.go @@ -6,6 +6,7 @@ import ( "github.com/google/uuid" "github.com/pelletier/go-toml" + "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/types/core" @@ -37,18 +38,22 @@ func (d *Delegate) OnDeleteJob(context.Context, job.Job) error { return nil } // ServicesForSpec satisfies the job.Delegate interface. func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.ServiceCtx, error) { + cma := custmsg.NewLabeler().With(wIDKey, spec.WorkflowSpec.WorkflowID, woIDKey, spec.WorkflowSpec.WorkflowOwner, wnKey, spec.WorkflowSpec.WorkflowName) sdkSpec, err := spec.WorkflowSpec.SDKSpec(ctx) if err != nil { + logCustMsg(cma, fmt.Sprintf("failed to start workflow engine: failed to get workflow sdk spec: %v", err), d.logger) return nil, err } binary, err := spec.WorkflowSpec.RawSpec(ctx) if err != nil { + logCustMsg(cma, fmt.Sprintf("failed to start workflow engine: failed to fetch workflow spec binary: %v", err), d.logger) return nil, err } config, err := spec.WorkflowSpec.GetConfig(ctx) if err != nil { + logCustMsg(cma, fmt.Sprintf("failed to start workflow engine: failed to get workflow spec config: %v", err), d.logger) return nil, err } diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index a814b6a9007..7550d06cf95 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -111,6 +111,7 @@ type Engine struct { stopCh services.StopChan newWorkerTimeout time.Duration maxExecutionDuration time.Duration + heartbeatCadence time.Duration // testing lifecycle hook to signal when an execution is finished. onExecutionFinished func(string) @@ -147,6 +148,9 @@ func (e *Engine) Start(_ context.Context) error { e.wg.Add(1) go e.init(ctx) + e.wg.Add(1) + go e.heartbeat(ctx) + return nil }) } @@ -321,6 +325,7 @@ func (e *Engine) init(ctx context.Context) { if retryErr != nil { e.logger.Errorf("initialization failed: %s", retryErr) + logCustMsg(e.cma, fmt.Sprintf("workflow registration failed: %s", retryErr), e.logger) e.afterInit(false) return } @@ -342,6 +347,7 @@ func (e *Engine) init(ctx context.Context) { } e.logger.Info("engine initialized") + logCustMsg(e.cma, "workflow registered", e.logger) e.afterInit(true) } @@ -715,9 +721,14 @@ func (e *Engine) worker(ctx context.Context) { continue } + cma := e.cma.With(eIDKey, executionID) err = e.startExecution(ctx, executionID, resp.Event.Outputs) if err != nil { e.logger.With(eIDKey, executionID).Errorf("failed to start execution: %v", err) + logCustMsg(cma, fmt.Sprintf("failed to start execution: %s", err), e.logger) + } else { + e.logger.With(eIDKey, executionID).Debug("execution started") + logCustMsg(cma, "execution started", e.logger) } case <-ctx.Done(): return @@ -1026,6 +1037,24 @@ func (e *Engine) isWorkflowFullyProcessed(ctx context.Context, state store.Workf return workflowProcessed, store.StatusCompleted, nil } +func (e *Engine) heartbeat(ctx context.Context) { + defer e.wg.Done() + + ticker := time.NewTicker(e.heartbeatCadence) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + e.logger.Info("shutting down heartbeat") + return + case <-ticker.C: + e.metrics.incrementEngineHeartbeatCounter(ctx) + logCustMsg(e.cma, "engine heartbeat at: "+e.clock.Now().Format(time.RFC3339), e.logger) + } + } +} + func (e *Engine) Close() error { return e.StopOnce("Engine", func() error { e.logger.Info("shutting down engine") @@ -1085,7 +1114,7 @@ func (e *Engine) Close() error { if err != nil { return err } - + logCustMsg(e.cma, "workflow unregistered", e.logger) return nil }) } @@ -1105,6 +1134,7 @@ type Config struct { Config []byte Binary []byte SecretsFetcher secretsFetcher + HeartbeatCadence time.Duration // For testing purposes only maxRetries int @@ -1119,6 +1149,7 @@ const ( defaultQueueSize = 100000 defaultNewWorkerTimeout = 2 * time.Second defaultMaxExecutionDuration = 10 * time.Minute + defaultHeartbeatCadence = 5 * time.Minute ) func NewEngine(cfg Config) (engine *Engine, err error) { @@ -1146,6 +1177,10 @@ func NewEngine(cfg Config) (engine *Engine, err error) { cfg.MaxExecutionDuration = defaultMaxExecutionDuration } + if cfg.HeartbeatCadence == 0 { + cfg.HeartbeatCadence = defaultHeartbeatCadence + } + if cfg.retryMs == 0 { cfg.retryMs = 5000 } @@ -1170,8 +1205,10 @@ func NewEngine(cfg Config) (engine *Engine, err error) { // - that the resulting graph is strongly connected (i.e. no disjointed subgraphs exist) // - etc. + cma := custmsg.NewLabeler().With(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, cfg.WorkflowName) workflow, err := Parse(cfg.Workflow) if err != nil { + logCustMsg(cma, fmt.Sprintf("failed to parse workflow: %s", err), cfg.Lggr) return nil, err } @@ -1180,8 +1217,8 @@ func NewEngine(cfg Config) (engine *Engine, err error) { workflow.name = hex.EncodeToString([]byte(cfg.WorkflowName)) engine = &Engine{ + cma: cma, logger: cfg.Lggr.Named("WorkflowEngine").With("workflowID", cfg.WorkflowID), - cma: custmsg.NewLabeler().With(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, workflow.name), metrics: workflowsMetricLabeler{metrics.NewLabeler().With(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, workflow.name)}, registry: cfg.Registry, workflow: workflow, @@ -1197,6 +1234,7 @@ func NewEngine(cfg Config) (engine *Engine, err error) { stopCh: make(chan struct{}), newWorkerTimeout: cfg.NewWorkerTimeout, maxExecutionDuration: cfg.MaxExecutionDuration, + heartbeatCadence: cfg.HeartbeatCadence, onExecutionFinished: cfg.onExecutionFinished, afterInit: cfg.afterInit, maxRetries: cfg.maxRetries, @@ -1243,6 +1281,6 @@ func (e *workflowError) Error() string { func logCustMsg(cma custmsg.MessageEmitter, msg string, log logger.Logger) { err := cma.Emit(msg) if err != nil { - log.Errorf("failed to send custom message with msg: %s", msg) + log.Errorf("failed to send custom message with msg: %s, err: %v", msg, err) } } diff --git a/core/services/workflows/monitoring.go b/core/services/workflows/monitoring.go index 8f5601554ed..bd448afd9e5 100644 --- a/core/services/workflows/monitoring.go +++ b/core/services/workflows/monitoring.go @@ -17,6 +17,7 @@ var workflowsRunningGauge metric.Int64Gauge var capabilityInvocationCounter metric.Int64Counter var workflowExecutionLatencyGauge metric.Int64Gauge // ms var workflowStepErrorCounter metric.Int64Counter +var engineHeartbeatCounter metric.Int64UpDownCounter func initMonitoringResources() (err error) { registerTriggerFailureCounter, err = beholder.GetMeter().Int64Counter("RegisterTriggerFailure") @@ -44,6 +45,11 @@ func initMonitoringResources() (err error) { return fmt.Errorf("failed to register workflow step error counter: %w", err) } + engineHeartbeatCounter, err = beholder.GetMeter().Int64UpDownCounter("EngineHeartbeat") + if err != nil { + return fmt.Errorf("failed to register engine heartbeat counter: %w", err) + } + return nil } @@ -82,6 +88,11 @@ func (c workflowsMetricLabeler) updateTotalWorkflowsGauge(ctx context.Context, v workflowsRunningGauge.Record(ctx, val, metric.WithAttributes(otelLabels...)) } +func (c workflowsMetricLabeler) incrementEngineHeartbeatCounter(ctx context.Context) { + otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels) + engineHeartbeatCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + // Observability keys const ( cIDKey = "capabilityID"