Skip to content

Commit

Permalink
Workflow Engine beholder logging (#14958)
Browse files Browse the repository at this point in the history
* Adds heartbeat logging

* Refactors `logCustMsg` to use `.Emit()` instead

* Fixes error

* Adds `Engine` beholder logging

* Fixes logging error

* Adds more logging

* Removes redundant logging

* Addressing review comments

* Adds heartbeat metric
  • Loading branch information
vyzaldysanchez authored Oct 31, 2024
1 parent 885baff commit 21290c2
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 3 deletions.
5 changes: 5 additions & 0 deletions core/services/workflows/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/google/uuid"
"github.com/pelletier/go-toml"
"github.com/smartcontractkit/chainlink-common/pkg/custmsg"

"github.com/smartcontractkit/chainlink-common/pkg/types/core"

Expand Down Expand Up @@ -37,18 +38,22 @@ func (d *Delegate) OnDeleteJob(context.Context, job.Job) error { return nil }

// ServicesForSpec satisfies the job.Delegate interface.
func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.ServiceCtx, error) {
cma := custmsg.NewLabeler().With(wIDKey, spec.WorkflowSpec.WorkflowID, woIDKey, spec.WorkflowSpec.WorkflowOwner, wnKey, spec.WorkflowSpec.WorkflowName)
sdkSpec, err := spec.WorkflowSpec.SDKSpec(ctx)
if err != nil {
logCustMsg(cma, fmt.Sprintf("failed to start workflow engine: failed to get workflow sdk spec: %v", err), d.logger)
return nil, err
}

binary, err := spec.WorkflowSpec.RawSpec(ctx)
if err != nil {
logCustMsg(cma, fmt.Sprintf("failed to start workflow engine: failed to fetch workflow spec binary: %v", err), d.logger)
return nil, err
}

config, err := spec.WorkflowSpec.GetConfig(ctx)
if err != nil {
logCustMsg(cma, fmt.Sprintf("failed to start workflow engine: failed to get workflow spec config: %v", err), d.logger)
return nil, err
}

Expand Down
44 changes: 41 additions & 3 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ type Engine struct {
stopCh services.StopChan
newWorkerTimeout time.Duration
maxExecutionDuration time.Duration
heartbeatCadence time.Duration

// testing lifecycle hook to signal when an execution is finished.
onExecutionFinished func(string)
Expand Down Expand Up @@ -147,6 +148,9 @@ func (e *Engine) Start(_ context.Context) error {
e.wg.Add(1)
go e.init(ctx)

e.wg.Add(1)
go e.heartbeat(ctx)

return nil
})
}
Expand Down Expand Up @@ -321,6 +325,7 @@ func (e *Engine) init(ctx context.Context) {

if retryErr != nil {
e.logger.Errorf("initialization failed: %s", retryErr)
logCustMsg(e.cma, fmt.Sprintf("workflow registration failed: %s", retryErr), e.logger)
e.afterInit(false)
return
}
Expand All @@ -342,6 +347,7 @@ func (e *Engine) init(ctx context.Context) {
}

e.logger.Info("engine initialized")
logCustMsg(e.cma, "workflow registered", e.logger)
e.afterInit(true)
}

Expand Down Expand Up @@ -715,9 +721,14 @@ func (e *Engine) worker(ctx context.Context) {
continue
}

cma := e.cma.With(eIDKey, executionID)
err = e.startExecution(ctx, executionID, resp.Event.Outputs)
if err != nil {
e.logger.With(eIDKey, executionID).Errorf("failed to start execution: %v", err)
logCustMsg(cma, fmt.Sprintf("failed to start execution: %s", err), e.logger)
} else {
e.logger.With(eIDKey, executionID).Debug("execution started")
logCustMsg(cma, "execution started", e.logger)
}
case <-ctx.Done():
return
Expand Down Expand Up @@ -1026,6 +1037,24 @@ func (e *Engine) isWorkflowFullyProcessed(ctx context.Context, state store.Workf
return workflowProcessed, store.StatusCompleted, nil
}

func (e *Engine) heartbeat(ctx context.Context) {
defer e.wg.Done()

ticker := time.NewTicker(e.heartbeatCadence)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
e.logger.Info("shutting down heartbeat")
return
case <-ticker.C:
e.metrics.incrementEngineHeartbeatCounter(ctx)
logCustMsg(e.cma, "engine heartbeat at: "+e.clock.Now().Format(time.RFC3339), e.logger)
}
}
}

func (e *Engine) Close() error {
return e.StopOnce("Engine", func() error {
e.logger.Info("shutting down engine")
Expand Down Expand Up @@ -1085,7 +1114,7 @@ func (e *Engine) Close() error {
if err != nil {
return err
}

logCustMsg(e.cma, "workflow unregistered", e.logger)
return nil
})
}
Expand All @@ -1105,6 +1134,7 @@ type Config struct {
Config []byte
Binary []byte
SecretsFetcher secretsFetcher
HeartbeatCadence time.Duration

// For testing purposes only
maxRetries int
Expand All @@ -1119,6 +1149,7 @@ const (
defaultQueueSize = 100000
defaultNewWorkerTimeout = 2 * time.Second
defaultMaxExecutionDuration = 10 * time.Minute
defaultHeartbeatCadence = 5 * time.Minute
)

func NewEngine(cfg Config) (engine *Engine, err error) {
Expand Down Expand Up @@ -1146,6 +1177,10 @@ func NewEngine(cfg Config) (engine *Engine, err error) {
cfg.MaxExecutionDuration = defaultMaxExecutionDuration
}

if cfg.HeartbeatCadence == 0 {
cfg.HeartbeatCadence = defaultHeartbeatCadence
}

if cfg.retryMs == 0 {
cfg.retryMs = 5000
}
Expand All @@ -1170,8 +1205,10 @@ func NewEngine(cfg Config) (engine *Engine, err error) {
// - that the resulting graph is strongly connected (i.e. no disjointed subgraphs exist)
// - etc.

cma := custmsg.NewLabeler().With(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, cfg.WorkflowName)
workflow, err := Parse(cfg.Workflow)
if err != nil {
logCustMsg(cma, fmt.Sprintf("failed to parse workflow: %s", err), cfg.Lggr)
return nil, err
}

Expand All @@ -1180,8 +1217,8 @@ func NewEngine(cfg Config) (engine *Engine, err error) {
workflow.name = hex.EncodeToString([]byte(cfg.WorkflowName))

engine = &Engine{
cma: cma,
logger: cfg.Lggr.Named("WorkflowEngine").With("workflowID", cfg.WorkflowID),
cma: custmsg.NewLabeler().With(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, workflow.name),
metrics: workflowsMetricLabeler{metrics.NewLabeler().With(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, workflow.name)},
registry: cfg.Registry,
workflow: workflow,
Expand All @@ -1197,6 +1234,7 @@ func NewEngine(cfg Config) (engine *Engine, err error) {
stopCh: make(chan struct{}),
newWorkerTimeout: cfg.NewWorkerTimeout,
maxExecutionDuration: cfg.MaxExecutionDuration,
heartbeatCadence: cfg.HeartbeatCadence,
onExecutionFinished: cfg.onExecutionFinished,
afterInit: cfg.afterInit,
maxRetries: cfg.maxRetries,
Expand Down Expand Up @@ -1243,6 +1281,6 @@ func (e *workflowError) Error() string {
func logCustMsg(cma custmsg.MessageEmitter, msg string, log logger.Logger) {
err := cma.Emit(msg)
if err != nil {
log.Errorf("failed to send custom message with msg: %s", msg)
log.Errorf("failed to send custom message with msg: %s, err: %v", msg, err)
}
}
11 changes: 11 additions & 0 deletions core/services/workflows/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ var workflowsRunningGauge metric.Int64Gauge
var capabilityInvocationCounter metric.Int64Counter
var workflowExecutionLatencyGauge metric.Int64Gauge // ms
var workflowStepErrorCounter metric.Int64Counter
var engineHeartbeatCounter metric.Int64UpDownCounter

func initMonitoringResources() (err error) {
registerTriggerFailureCounter, err = beholder.GetMeter().Int64Counter("RegisterTriggerFailure")
Expand Down Expand Up @@ -44,6 +45,11 @@ func initMonitoringResources() (err error) {
return fmt.Errorf("failed to register workflow step error counter: %w", err)
}

engineHeartbeatCounter, err = beholder.GetMeter().Int64UpDownCounter("EngineHeartbeat")
if err != nil {
return fmt.Errorf("failed to register engine heartbeat counter: %w", err)
}

return nil
}

Expand Down Expand Up @@ -82,6 +88,11 @@ func (c workflowsMetricLabeler) updateTotalWorkflowsGauge(ctx context.Context, v
workflowsRunningGauge.Record(ctx, val, metric.WithAttributes(otelLabels...))
}

func (c workflowsMetricLabeler) incrementEngineHeartbeatCounter(ctx context.Context) {
otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels)
engineHeartbeatCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}

// Observability keys
const (
cIDKey = "capabilityID"
Expand Down

0 comments on commit 21290c2

Please sign in to comment.