Skip to content

Commit

Permalink
Start N workers and have them poll instead of waiting for a worker wh…
Browse files Browse the repository at this point in the history
…en the task is ready. This removes the possibly of an unlucky step being starved if the box is busy (#13767)
  • Loading branch information
nolag authored Jul 8, 2024
1 parent 39ce0ec commit bf9122a
Showing 1 changed file with 27 additions and 31 deletions.
58 changes: 27 additions & 31 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink-common/pkg/workflows"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
Expand All @@ -37,7 +38,6 @@ type Engine struct {
executionStates store.Store
pendingStepRequests chan stepRequest
triggerEvents chan capabilities.CapabilityResponse
newWorkerCh chan struct{}
stepUpdateCh chan store.WorkflowExecutionStep
wg sync.WaitGroup
stopCh services.StopChan
Expand All @@ -55,6 +55,8 @@ type Engine struct {
// when initializing the engine.
retryMs int

maxWorkerLimit int

clock clockwork.Clock
}

Expand All @@ -63,6 +65,11 @@ func (e *Engine) Start(ctx context.Context) error {
// create a new context, since the one passed in via Start is short-lived.
ctx, _ := e.stopCh.NewCtx()

e.wg.Add(e.maxWorkerLimit)
for i := 0; i < e.maxWorkerLimit; i++ {
go e.worker(ctx)
}

e.wg.Add(2)
go e.init(ctx)
go e.loop(ctx)
Expand Down Expand Up @@ -431,21 +438,6 @@ func (e *Engine) loop(ctx context.Context) {
if err != nil {
e.logger.With(eIDKey, executionID).Errorf("failed to start execution: %v", err)
}
case pendingStepRequest := <-e.pendingStepRequests:
// Wait for a new worker to be available before dispatching a new one.
// We'll do this up to newWorkerTimeout. If this expires, we'll put the
// message back on the queue and keep going.
t := e.clock.NewTimer(e.newWorkerTimeout)
select {
case <-e.newWorkerCh:
e.wg.Add(1)
go e.workerForStepRequest(ctx, pendingStepRequest)
case <-t.Chan():
e.logger.With(eIDKey, pendingStepRequest.state.ExecutionID, sRKey, pendingStepRequest.stepRef).
Errorf("timed out when spinning off worker for pending step request %+v", pendingStepRequest)
e.pendingStepRequests <- pendingStepRequest
}
t.Stop()
case stepUpdate := <-e.stepUpdateCh:
// Executed synchronously to ensure we correctly schedule subsequent tasks.
err := e.handleStepUpdate(ctx, stepUpdate)
Expand Down Expand Up @@ -631,10 +623,20 @@ func (e *Engine) finishExecution(ctx context.Context, executionID string, status
return nil
}

func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) {
defer func() { e.newWorkerCh <- struct{}{} }()
func (e *Engine) worker(ctx context.Context) {
defer e.wg.Done()

for {
select {
case pendingStepRequest := <-e.pendingStepRequests:
e.workerForStepRequest(ctx, pendingStepRequest)
case <-ctx.Done():
return
}
}
}

func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) {
// Instantiate a child logger; in addition to the WorkflowID field the workflow
// logger will already have, this adds the `stepRef` and `executionID`
l := e.logger.With(sRKey, msg.stepRef, eIDKey, msg.state.ExecutionID)
Expand Down Expand Up @@ -905,32 +907,26 @@ func NewEngine(cfg Config) (engine *Engine, err error) {
workflow.owner = cfg.WorkflowOwner
workflow.name = hex.EncodeToString([]byte(cfg.WorkflowName))

// Instantiate semaphore to put a limit on the number of workers
newWorkerCh := make(chan struct{}, cfg.MaxWorkerLimit)
for i := 0; i < cfg.MaxWorkerLimit; i++ {
newWorkerCh <- struct{}{}
}

engine = &Engine{
logger: cfg.Lggr.Named("WorkflowEngine").With("workflowID", cfg.WorkflowID),
registry: cfg.Registry,
workflow: workflow,
getLocalNode: cfg.GetLocalNode,
executionStates: cfg.Store,
pendingStepRequests: make(chan stepRequest, cfg.QueueSize),
newWorkerCh: newWorkerCh,
stepUpdateCh: make(chan store.WorkflowExecutionStep),
triggerEvents: make(chan capabilities.CapabilityResponse),
stopCh: make(chan struct{}),
newWorkerTimeout: cfg.NewWorkerTimeout,
maxExecutionDuration: cfg.MaxExecutionDuration,

onExecutionFinished: cfg.onExecutionFinished,
afterInit: cfg.afterInit,
maxRetries: cfg.maxRetries,
retryMs: cfg.retryMs,
clock: cfg.clock,
onExecutionFinished: cfg.onExecutionFinished,
afterInit: cfg.afterInit,
maxRetries: cfg.maxRetries,
retryMs: cfg.retryMs,
maxWorkerLimit: cfg.MaxWorkerLimit,
clock: cfg.clock,
}

return engine, nil
}

Expand Down

0 comments on commit bf9122a

Please sign in to comment.