Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hotfixes (histogram buckets + hex name + engine beholder metrics) #15626

Merged
merged 3 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/cmd/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/cache"
"github.com/smartcontractkit/chainlink/v2/core/services/versioning"
"github.com/smartcontractkit/chainlink/v2/core/services/webhook"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows"
"github.com/smartcontractkit/chainlink/v2/core/sessions"
"github.com/smartcontractkit/chainlink/v2/core/static"
"github.com/smartcontractkit/chainlink/v2/core/store/migrate"
Expand Down Expand Up @@ -109,6 +110,10 @@ func initGlobals(cfgProm config.Prometheus, cfgTracing config.Tracing, cfgTeleme
AuthPublicKeyHex: csaPubKeyHex,
AuthHeaders: beholderAuthHeaders,
}
// note: due to the OTEL specification, all histogram buckets
// must be defined when the beholder client is created
clientCfg.MetricViews = append(clientCfg.MetricViews, workflows.MetricViews()...)

if tracingCfg.Enabled {
clientCfg.TraceSpanExporter, err = tracingCfg.NewSpanExporter()
if err != nil {
Expand Down
84 changes: 56 additions & 28 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,7 @@ func (e *Engine) Start(_ context.Context) error {
// create a new context, since the one passed in via Start is short-lived.
ctx, _ := e.stopCh.NewCtx()

// spin up monitoring resources
err := initMonitoringResources()
if err != nil {
return fmt.Errorf("could not initialize monitoring resources: %w", err)
}
e.metrics.incrementWorkflowInitializationCounter(ctx)

e.wg.Add(e.maxWorkerLimit)
for i := 0; i < e.maxWorkerLimit; i++ {
Expand Down Expand Up @@ -358,6 +354,7 @@ func (e *Engine) init(ctx context.Context) {

e.logger.Info("engine initialized")
logCustMsg(ctx, e.cma, "workflow registered", e.logger)
e.metrics.incrementWorkflowRegisteredCounter(ctx)
e.afterInit(true)
}

Expand Down Expand Up @@ -439,7 +436,7 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability, trig
Metadata: capabilities.RequestMetadata{
WorkflowID: e.workflow.id,
WorkflowOwner: e.workflow.owner,
WorkflowName: e.workflow.name,
WorkflowName: e.workflow.hexName,
WorkflowDonID: e.localNode.WorkflowDON.ID,
WorkflowDonConfigVersion: e.localNode.WorkflowDON.ConfigVersion,
ReferenceID: t.Ref,
Expand Down Expand Up @@ -678,7 +675,6 @@ func (e *Engine) queueIfReady(state store.WorkflowExecution, step *step) {

func (e *Engine) finishExecution(ctx context.Context, cma custmsg.MessageEmitter, executionID string, status string) error {
l := e.logger.With(platform.KeyWorkflowExecutionID, executionID, "status", status)
metrics := e.metrics.with("status", status)

l.Info("finishing execution")

Expand All @@ -692,18 +688,28 @@ func (e *Engine) finishExecution(ctx context.Context, cma custmsg.MessageEmitter
return err
}

executionDuration := execState.FinishedAt.Sub(*execState.CreatedAt).Milliseconds()

e.stepUpdatesChMap.remove(executionID)
metrics.updateTotalWorkflowsGauge(ctx, e.stepUpdatesChMap.len())
metrics.updateWorkflowExecutionLatencyGauge(ctx, executionDuration)

executionDuration := int64(execState.FinishedAt.Sub(*execState.CreatedAt).Seconds())
switch status {
case store.StatusCompleted:
e.metrics.updateWorkflowCompletedDurationHistogram(ctx, executionDuration)
case store.StatusCompletedEarlyExit:
e.metrics.updateWorkflowEarlyExitDurationHistogram(ctx, executionDuration)
case store.StatusErrored:
e.metrics.updateWorkflowErrorDurationHistogram(ctx, executionDuration)
case store.StatusTimeout:
// should expect the same values unless the timeout is adjusted.
// using histogram as it gives count of executions for free
e.metrics.updateWorkflowTimeoutDurationHistogram(ctx, executionDuration)
}

if executionDuration > fifteenMinutesMs {
logCustMsg(ctx, cma, fmt.Sprintf("execution duration exceeded 15 minutes: %d", executionDuration), l)
l.Warnf("execution duration exceeded 15 minutes: %d", executionDuration)
logCustMsg(ctx, cma, fmt.Sprintf("execution duration exceeded 15 minutes: %d (seconds)", executionDuration), l)
l.Warnf("execution duration exceeded 15 minutes: %d (seconds)", executionDuration)
}
logCustMsg(ctx, cma, fmt.Sprintf("execution duration: %d", executionDuration), l)
l.Infof("execution duration: %d", executionDuration)
logCustMsg(ctx, cma, fmt.Sprintf("execution duration: %d (seconds)", executionDuration), l)
l.Infof("execution duration: %d (seconds)", executionDuration)
e.onExecutionFinished(executionID)
return nil
}
Expand Down Expand Up @@ -747,6 +753,7 @@ func (e *Engine) worker(ctx context.Context) {
if err != nil {
e.logger.With(platform.KeyWorkflowExecutionID, executionID).Errorf("failed to start execution: %v", err)
logCustMsg(ctx, cma, fmt.Sprintf("failed to start execution: %s", err), e.logger)
e.metrics.with(platform.KeyTriggerID, te.ID).incrementTriggerWorkflowStarterErrorCounter(ctx)
} else {
e.logger.With(platform.KeyWorkflowExecutionID, executionID).Debug("execution started")
logCustMsg(ctx, cma, "execution started", e.logger)
Expand All @@ -770,10 +777,21 @@ func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) {
Ref: msg.stepRef,
}

// TODO ks-462 inputs
logCustMsg(ctx, cma, "executing step", l)

stepExecutionStartTime := time.Now()
inputs, outputs, err := e.executeStep(ctx, l, msg)
stepExecutionDuration := time.Since(stepExecutionStartTime).Seconds()

curStepID := "UNSET"
curStep, verr := e.workflow.Vertex(msg.stepRef)
if verr == nil {
curStepID = curStep.ID
} else {
l.Errorf("failed to resolve step in workflow; error %v", verr)
}
e.metrics.with(platform.KeyCapabilityID, curStepID).updateWorkflowStepDurationHistogram(ctx, int64(stepExecutionDuration))

var stepStatus string
switch {
case errors.Is(capabilities.ErrStopExecution, err):
Expand Down Expand Up @@ -850,7 +868,7 @@ func (e *Engine) interpolateEnvVars(config map[string]any, env exec.Env) (*value
// registry (for capability-level configuration). It doesn't perform any caching of the config values, since
// the two registries perform their own caching.
func (e *Engine) configForStep(ctx context.Context, lggr logger.Logger, step *step) (*values.Map, error) {
secrets, err := e.secretsFetcher.SecretsFor(e.workflow.owner, e.workflow.name)
secrets, err := e.secretsFetcher.SecretsFor(e.workflow.owner, e.workflow.hexName)
if err != nil {
return nil, fmt.Errorf("failed to fetch secrets: %w", err)
}
Expand Down Expand Up @@ -894,16 +912,16 @@ func (e *Engine) configForStep(ctx context.Context, lggr logger.Logger, step *st

// executeStep executes the referenced capability within a step and returns the result.
func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRequest) (*values.Map, values.Value, error) {
step, err := e.workflow.Vertex(msg.stepRef)
curStep, err := e.workflow.Vertex(msg.stepRef)
if err != nil {
return nil, nil, err
}

var inputs any
if step.Inputs.OutputRef != "" {
inputs = step.Inputs.OutputRef
if curStep.Inputs.OutputRef != "" {
inputs = curStep.Inputs.OutputRef
} else {
inputs = step.Inputs.Mapping
inputs = curStep.Inputs.Mapping
}

i, err := exec.FindAndInterpolateAllKeys(inputs, msg.state)
Expand All @@ -916,7 +934,7 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe
return nil, nil, err
}

config, err := e.configForStep(ctx, lggr, step)
config, err := e.configForStep(ctx, lggr, curStep)
if err != nil {
return nil, nil, err
}
Expand All @@ -942,7 +960,7 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe
WorkflowID: msg.state.WorkflowID,
WorkflowExecutionID: msg.state.ExecutionID,
WorkflowOwner: e.workflow.owner,
WorkflowName: e.workflow.name,
WorkflowName: e.workflow.hexName,
WorkflowDonID: e.localNode.WorkflowDON.ID,
WorkflowDonConfigVersion: e.localNode.WorkflowDON.ConfigVersion,
ReferenceID: msg.stepRef,
Expand All @@ -952,9 +970,10 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe
stepCtx, cancel := context.WithTimeout(ctx, stepTimeoutDuration)
defer cancel()

e.metrics.incrementCapabilityInvocationCounter(stepCtx)
output, err := step.capability.Execute(stepCtx, tr)
e.metrics.with(platform.KeyCapabilityID, curStep.ID).incrementCapabilityInvocationCounter(ctx)
output, err := curStep.capability.Execute(stepCtx, tr)
if err != nil {
e.metrics.with(platform.KeyStepRef, msg.stepRef, platform.KeyCapabilityID, curStep.ID).incrementCapabilityFailureCounter(ctx)
return inputsMap, nil, err
}

Expand All @@ -967,7 +986,7 @@ func (e *Engine) deregisterTrigger(ctx context.Context, t *triggerCapability, tr
WorkflowID: e.workflow.id,
WorkflowDonID: e.localNode.WorkflowDON.ID,
WorkflowDonConfigVersion: e.localNode.WorkflowDON.ConfigVersion,
WorkflowName: e.workflow.name,
WorkflowName: e.workflow.hexName,
WorkflowOwner: e.workflow.owner,
ReferenceID: t.Ref,
},
Expand Down Expand Up @@ -1074,6 +1093,7 @@ func (e *Engine) isWorkflowFullyProcessed(ctx context.Context, state store.Workf
return workflowProcessed, store.StatusCompleted, nil
}

// heartbeat runs by default every defaultHeartbeatCadence minutes
func (e *Engine) heartbeat(ctx context.Context) {
defer e.wg.Done()

Expand All @@ -1087,6 +1107,7 @@ func (e *Engine) heartbeat(ctx context.Context) {
return
case <-ticker.C:
e.metrics.incrementEngineHeartbeatCounter(ctx)
e.metrics.updateTotalWorkflowsGauge(ctx, e.stepUpdatesChMap.len())
logCustMsg(ctx, e.cma, "engine heartbeat at: "+e.clock.Now().Format(time.RFC3339), e.logger)
}
}
Expand Down Expand Up @@ -1153,6 +1174,7 @@ func (e *Engine) Close() error {
return err
}
logCustMsg(ctx, e.cma, "workflow unregistered", e.logger)
e.metrics.incrementWorkflowUnregisteredCounter(ctx)
return nil
})
}
Expand Down Expand Up @@ -1249,6 +1271,12 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {
// - that the resulting graph is strongly connected (i.e. no disjointed subgraphs exist)
// - etc.

// spin up monitoring resources
em, err := initMonitoringResources()
if err != nil {
return nil, fmt.Errorf("could not initialize monitoring resources: %w", err)
}

cma := custmsg.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName)
workflow, err := Parse(cfg.Workflow)
if err != nil {
Expand All @@ -1258,12 +1286,12 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {

workflow.id = cfg.WorkflowID
workflow.owner = cfg.WorkflowOwner
workflow.name = hex.EncodeToString([]byte(cfg.WorkflowName))
workflow.hexName = hex.EncodeToString([]byte(cfg.WorkflowName))

engine = &Engine{
cma: cma,
logger: cfg.Lggr.Named("WorkflowEngine").With("workflowID", cfg.WorkflowID),
metrics: workflowsMetricLabeler{metrics.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, workflow.name)},
metrics: workflowsMetricLabeler{metrics.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName), *em},
registry: cfg.Registry,
workflow: workflow,
secretsFetcher: cfg.SecretsFetcher,
Expand Down
6 changes: 3 additions & 3 deletions core/services/workflows/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
// treated differently due to their nature of being the starting
// point of a workflow.
type workflow struct {
id string
owner string
name string
id string
owner string
hexName string
graph.Graph[string, *step]

triggers []*triggerCapability
Expand Down
Loading
Loading