diff --git a/pkg/v3/flows/conditional.go b/pkg/v3/flows/conditional.go deleted file mode 100644 index db6f9136..00000000 --- a/pkg/v3/flows/conditional.go +++ /dev/null @@ -1,146 +0,0 @@ -package flows - -import ( - "context" - "fmt" - "log" - "time" - - common "github.com/smartcontractkit/chainlink-common/pkg/types/automation" - - ocr2keepersv3 "github.com/smartcontractkit/chainlink-automation/pkg/v3" - "github.com/smartcontractkit/chainlink-automation/pkg/v3/postprocessors" - "github.com/smartcontractkit/chainlink-automation/pkg/v3/preprocessors" - "github.com/smartcontractkit/chainlink-automation/pkg/v3/random" - "github.com/smartcontractkit/chainlink-automation/pkg/v3/service" - "github.com/smartcontractkit/chainlink-automation/pkg/v3/telemetry" - "github.com/smartcontractkit/chainlink-automation/pkg/v3/tickers" - "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" -) - -const ( - // This is the ticker interval for sampling conditional flow - SamplingConditionInterval = 3 * time.Second - // Maximum number of upkeeps to be sampled in every round - MaxSampledConditionals = 300 - // This is the ticker interval for final conditional flow - FinalConditionalInterval = 1 * time.Second - // These are the maximum number of conditional upkeeps dequeued on every tick from proposal queue in FinalConditionalFlow - // This is kept same as OutcomeSurfacedProposalsLimit as those many can get enqueued by plugin in every round - FinalConditionalBatchSize = 50 -) - -func newSampleProposalFlow( - pre []ocr2keepersv3.PreProcessor[common.UpkeepPayload], - ratio types.Ratio, - getter common.ConditionalUpkeepProvider, - ms types.MetadataStore, - runner ocr2keepersv3.Runner, - interval time.Duration, - logger *log.Logger, -) service.Recoverable { - pre = append(pre, preprocessors.NewProposalFilterer(ms, types.LogTrigger)) - postprocessors := postprocessors.NewAddProposalToMetadataStorePostprocessor(ms) - - observer := ocr2keepersv3.NewRunnableObserver( - pre, - postprocessors, - runner, - ObservationProcessLimit, - log.New(logger.Writer(), fmt.Sprintf("[%s | sample-proposal-observer]", telemetry.ServiceName), telemetry.LogPkgStdFlags), - ) - - return tickers.NewTimeTicker[[]common.UpkeepPayload](interval, observer, func(ctx context.Context, _ time.Time) (tickers.Tick[[]common.UpkeepPayload], error) { - return NewSampler(ratio, getter, logger), nil - }, log.New(logger.Writer(), fmt.Sprintf("[%s | sample-proposal-ticker]", telemetry.ServiceName), telemetry.LogPkgStdFlags)) -} - -func NewSampler( - ratio types.Ratio, - getter common.ConditionalUpkeepProvider, - logger *log.Logger, -) *sampler { - return &sampler{ - logger: logger, - getter: getter, - ratio: ratio, - shuffler: random.Shuffler[common.UpkeepPayload]{Source: random.NewCryptoRandSource()}, - } -} - -type shuffler[T any] interface { - Shuffle([]T) []T -} - -type sampler struct { - logger *log.Logger - - ratio types.Ratio - getter common.ConditionalUpkeepProvider - shuffler shuffler[common.UpkeepPayload] -} - -func (s *sampler) Value(ctx context.Context) ([]common.UpkeepPayload, error) { - upkeeps, err := s.getter.GetActiveUpkeeps(ctx) - if err != nil { - return nil, err - } - if len(upkeeps) == 0 { - return nil, nil - } - - upkeeps = s.shuffler.Shuffle(upkeeps) - size := s.ratio.OfInt(len(upkeeps)) - - if size <= 0 { - return nil, nil - } - if size > MaxSampledConditionals { - s.logger.Printf("Required sample size %d exceeds max allowed conditional samples %d, limiting to max", size, MaxSampledConditionals) - size = MaxSampledConditionals - } - if len(upkeeps) < size { - size = len(upkeeps) - } - s.logger.Printf("sampled %d upkeeps", size) - return upkeeps[:size], nil -} - -func newFinalConditionalFlow( - preprocessors []ocr2keepersv3.PreProcessor[common.UpkeepPayload], - resultStore types.ResultStore, - runner ocr2keepersv3.Runner, - interval time.Duration, - proposalQ types.ProposalQueue, - builder common.PayloadBuilder, - retryQ types.RetryQueue, - stateUpdater common.UpkeepStateUpdater, - logger *log.Logger, -) service.Recoverable { - post := postprocessors.NewCombinedPostprocessor( - postprocessors.NewEligiblePostProcessor(resultStore, telemetry.WrapLogger(logger, "conditional-final-eligible-postprocessor")), - postprocessors.NewRetryablePostProcessor(retryQ, telemetry.WrapLogger(logger, "conditional-final-retryable-postprocessor")), - ) - // create observer that only pushes results to result stores. everything at - // this point can be dropped. this process is only responsible for running - // conditional proposals that originate from network agreements - observer := ocr2keepersv3.NewRunnableObserver( - preprocessors, - post, - runner, - ObservationProcessLimit, - log.New(logger.Writer(), fmt.Sprintf("[%s | conditional-final-observer]", telemetry.ServiceName), telemetry.LogPkgStdFlags), - ) - - ticker := tickers.NewTimeTicker[[]common.UpkeepPayload](interval, observer, func(ctx context.Context, _ time.Time) (tickers.Tick[[]common.UpkeepPayload], error) { - return coordinatedProposalsTick{ - logger: logger, - builder: builder, - q: proposalQ, - utype: types.ConditionTrigger, - batchSize: FinalConditionalBatchSize, - }, nil - }, log.New(logger.Writer(), fmt.Sprintf("[%s | conditional-final-ticker]", telemetry.ServiceName), telemetry.LogPkgStdFlags)) - - return ticker -} diff --git a/pkg/v3/flows/conditional_test.go b/pkg/v3/flows/conditional/conditional_test.go similarity index 97% rename from pkg/v3/flows/conditional_test.go rename to pkg/v3/flows/conditional/conditional_test.go index cc9fb8d7..50c7becd 100644 --- a/pkg/v3/flows/conditional_test.go +++ b/pkg/v3/flows/conditional/conditional_test.go @@ -1,4 +1,4 @@ -package flows +package conditional import ( "context" @@ -83,7 +83,7 @@ func TestConditionalFinalization(t *testing.T) { // set the ticker time lower to reduce the test time interval := 50 * time.Millisecond pre := []ocr2keepersv3.PreProcessor[common.UpkeepPayload]{coord} - svc := newFinalConditionalFlow(pre, rStore, runner, interval, proposalQ, payloadBuilder, retryQ, upkeepStateUpdater, logger) + svc := NewFinalConditionalFlow(pre, rStore, runner, interval, proposalQ, payloadBuilder, retryQ, upkeepStateUpdater, logger) var wg sync.WaitGroup wg.Add(1) @@ -185,7 +185,7 @@ func TestSamplingProposal(t *testing.T) { upkeepProvider.On("GetActiveUpkeeps", mock.Anything).Return([]common.UpkeepPayload{}, nil) // set the ticker time lower to reduce the test time pre := []ocr2keepersv3.PreProcessor[common.UpkeepPayload]{coord} - svc := newSampleProposalFlow(pre, ratio, upkeepProvider, mStore, runner, time.Millisecond*100, logger) + svc := NewSampleProposalFlow(pre, ratio, upkeepProvider, mStore, runner, time.Millisecond*100, logger) var wg sync.WaitGroup wg.Add(1) diff --git a/pkg/v3/flows/conditional/final_flow.go b/pkg/v3/flows/conditional/final_flow.go new file mode 100644 index 00000000..3803a647 --- /dev/null +++ b/pkg/v3/flows/conditional/final_flow.go @@ -0,0 +1,103 @@ +package conditional + +import ( + "context" + "fmt" + "log" + "time" + + ocr2keepersv3 "github.com/smartcontractkit/chainlink-automation/pkg/v3" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/postprocessors" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/service" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/telemetry" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/tickers" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" + common "github.com/smartcontractkit/chainlink-common/pkg/types/automation" +) + +const ( + // This is the ticker interval for final conditional flow + finalConditionalInterval = 1 * time.Second + // These are the maximum number of conditional upkeeps dequeued on every tick from proposal queue in FinalConditionalFlow + // This is kept same as OutcomeSurfacedProposalsLimit as those many can get enqueued by plugin in every round + finalConditionalBatchSize = 50 + + observationProcessLimit = 20 * time.Second +) + +func NewFinalConditionalFlow( + preprocessors []ocr2keepersv3.PreProcessor, + resultStore types.ResultStore, + runner ocr2keepersv3.Runner, + proposalQ types.ProposalQueue, + builder common.PayloadBuilder, + retryQ types.RetryQueue, + stateUpdater common.UpkeepStateUpdater, + logger *log.Logger, +) service.Recoverable { + post := postprocessors.NewCombinedPostprocessor( + postprocessors.NewEligiblePostProcessor(resultStore, telemetry.WrapLogger(logger, "conditional-final-eligible-postprocessor")), + postprocessors.NewRetryablePostProcessor(retryQ, telemetry.WrapLogger(logger, "conditional-final-retryable-postprocessor")), + ) + // create observer that only pushes results to result stores. everything at + // this point can be dropped. this process is only responsible for running + // conditional proposals that originate from network agreements + observer := ocr2keepersv3.NewRunnableObserver( + preprocessors, + post, + runner, + observationProcessLimit, + log.New(logger.Writer(), fmt.Sprintf("[%s | conditional-final-observer]", telemetry.ServiceName), telemetry.LogPkgStdFlags), + ) + + getterFn := func(ctx context.Context, _ time.Time) (tickers.Tick, error) { + return coordinatedProposalsTick{ + logger: logger, + builder: builder, + q: proposalQ, + utype: types.ConditionTrigger, + batchSize: finalConditionalBatchSize, + }, nil + } + + ticker := tickers.NewTimeTicker(finalConditionalInterval, observer, getterFn, log.New(logger.Writer(), fmt.Sprintf("[%s | conditional-final-ticker]", telemetry.ServiceName), telemetry.LogPkgStdFlags)) + + return ticker +} + +// coordinatedProposalsTick is used to push proposals from the proposal queue to some observer +type coordinatedProposalsTick struct { + logger *log.Logger + builder common.PayloadBuilder + q types.ProposalQueue + utype types.UpkeepType + batchSize int +} + +func (t coordinatedProposalsTick) Value(ctx context.Context) ([]common.UpkeepPayload, error) { + if t.q == nil { + return nil, nil + } + + proposals, err := t.q.Dequeue(t.utype, t.batchSize) + if err != nil { + return nil, fmt.Errorf("failed to dequeue from retry queue: %w", err) + } + t.logger.Printf("%d proposals returned from queue", len(proposals)) + + builtPayloads, err := t.builder.BuildPayloads(ctx, proposals...) + if err != nil { + return nil, fmt.Errorf("failed to build payloads from proposals: %w", err) + } + payloads := []common.UpkeepPayload{} + filtered := 0 + for _, p := range builtPayloads { + if p.IsEmpty() { + filtered++ + continue + } + payloads = append(payloads, p) + } + t.logger.Printf("%d payloads built from %d proposals, %d filtered", len(payloads), len(proposals), filtered) + return payloads, nil +} diff --git a/pkg/v3/flows/conditional/proposal_flow.go b/pkg/v3/flows/conditional/proposal_flow.go new file mode 100644 index 00000000..00f14cb8 --- /dev/null +++ b/pkg/v3/flows/conditional/proposal_flow.go @@ -0,0 +1,97 @@ +package conditional + +import ( + "context" + "fmt" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/random" + "log" + "time" + + ocr2keepersv3 "github.com/smartcontractkit/chainlink-automation/pkg/v3" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/postprocessors" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/preprocessors" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/service" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/telemetry" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/tickers" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" + common "github.com/smartcontractkit/chainlink-common/pkg/types/automation" +) + +const ( + // This is the ticker interval for sampling conditional flow + samplingConditionInterval = 3 * time.Second + // Maximum number of upkeeps to be sampled in every round + maxSampledConditionals = 300 +) + +type shuffler[T any] interface { + Shuffle([]T) []T +} + +type sampler struct { + logger *log.Logger + + ratio types.Ratio + getter common.ConditionalUpkeepProvider + shuffler shuffler[common.UpkeepPayload] +} + +func NewSampleProposalFlow( + pre []ocr2keepersv3.PreProcessor, + ratio types.Ratio, + getter common.ConditionalUpkeepProvider, + metadataStore types.MetadataStore, + runner ocr2keepersv3.Runner, + logger *log.Logger, +) service.Recoverable { + pre = append(pre, preprocessors.NewProposalFilterer(metadataStore, types.LogTrigger)) + postprocessors := postprocessors.NewAddProposalToMetadataStorePostprocessor(metadataStore) + + observer := ocr2keepersv3.NewRunnableObserver( + pre, + postprocessors, + runner, + observationProcessLimit, + log.New(logger.Writer(), fmt.Sprintf("[%s | sample-proposal-observer]", telemetry.ServiceName), telemetry.LogPkgStdFlags), + ) + + getterFn := func(ctx context.Context, _ time.Time) (tickers.Tick, error) { + return &sampler{ + logger: logger, + getter: getter, + ratio: ratio, + shuffler: random.Shuffler[common.UpkeepPayload]{Source: random.NewCryptoRandSource()}, + }, nil + } + + lggrPrefix := fmt.Sprintf("[%s | sample-proposal-ticker]", telemetry.ServiceName) + lggr := log.New(logger.Writer(), lggrPrefix, telemetry.LogPkgStdFlags) + + return tickers.NewTimeTicker(samplingConditionInterval, observer, getterFn, lggr) +} + +func (s *sampler) Value(ctx context.Context) ([]common.UpkeepPayload, error) { + upkeeps, err := s.getter.GetActiveUpkeeps(ctx) + if err != nil { + return nil, err + } + if len(upkeeps) == 0 { + return nil, nil + } + + upkeeps = s.shuffler.Shuffle(upkeeps) + size := s.ratio.OfInt(len(upkeeps)) + + if size <= 0 { + return nil, nil + } + if size > maxSampledConditionals { + s.logger.Printf("Required sample size %d exceeds max allowed conditional samples %d, limiting to max", size, maxSampledConditionals) + size = maxSampledConditionals + } + if len(upkeeps) < size { + size = len(upkeeps) + } + s.logger.Printf("sampled %d upkeeps", size) + return upkeeps[:size], nil +} diff --git a/pkg/v3/flows/factory.go b/pkg/v3/flows/factory.go index d733b3ca..8d9766af 100644 --- a/pkg/v3/flows/factory.go +++ b/pkg/v3/flows/factory.go @@ -2,16 +2,18 @@ package flows import ( "log" - "time" ocr2keepersv3 "github.com/smartcontractkit/chainlink-automation/pkg/v3" "github.com/smartcontractkit/chainlink-automation/pkg/v3/service" "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" + conditionalflows "github.com/smartcontractkit/chainlink-automation/pkg/v4/workflows/conditional" + logflows "github.com/smartcontractkit/chainlink-automation/pkg/v4/workflows/log" + recoveryflows "github.com/smartcontractkit/chainlink-automation/pkg/v4/workflows/log/recovery" common "github.com/smartcontractkit/chainlink-common/pkg/types/automation" ) -func ConditionalTriggerFlows( - coord ocr2keepersv3.PreProcessor[common.UpkeepPayload], +func NewConditionalTriggerFlows( + coord ocr2keepersv3.PreProcessor, ratio types.Ratio, getter common.ConditionalUpkeepProvider, subscriber common.BlockSubscriber, @@ -24,29 +26,26 @@ func ConditionalTriggerFlows( stateUpdater common.UpkeepStateUpdater, logger *log.Logger, ) []service.Recoverable { - preprocessors := []ocr2keepersv3.PreProcessor[common.UpkeepPayload]{coord} - - // runs full check pipeline on a coordinated block with coordinated upkeeps - conditionalFinal := newFinalConditionalFlow(preprocessors, resultStore, runner, FinalConditionalInterval, proposalQ, builder, retryQ, stateUpdater, logger) + preprocessors := []ocr2keepersv3.PreProcessor{coord} // the sampling proposal flow takes random samples of active upkeeps, checks // them and surfaces the ids if the items are eligible - conditionalProposal := newSampleProposalFlow(preprocessors, ratio, getter, metadataStore, runner, SamplingConditionInterval, logger) + conditionalProposal := conditionalflows.NewConditionalProposalFlow(preprocessors, ratio, getter, metadataStore, runner, logger) + + // runs full check pipeline on a coordinated block with coordinated upkeeps + conditionalFinal := conditionalflows.NewConditionalFinalWorkflow(preprocessors, resultStore, runner, proposalQ, builder, retryQ, logger) - return []service.Recoverable{conditionalFinal, conditionalProposal} + return []service.Recoverable{conditionalProposal, conditionalFinal} } -func LogTriggerFlows( - coord ocr2keepersv3.PreProcessor[common.UpkeepPayload], +func NewLogTriggerFlows( + coord ocr2keepersv3.PreProcessor, resultStore types.ResultStore, metadataStore types.MetadataStore, runner ocr2keepersv3.Runner, logProvider common.LogEventProvider, rp common.RecoverableProvider, builder common.PayloadBuilder, - logInterval time.Duration, - recoveryProposalInterval time.Duration, - recoveryFinalInterval time.Duration, retryQ types.RetryQueue, proposals types.ProposalQueue, stateUpdater common.UpkeepStateUpdater, @@ -54,27 +53,27 @@ func LogTriggerFlows( ) []service.Recoverable { // all flows use the same preprocessor based on the coordinator // each flow can add preprocessors to this provided slice - preprocessors := []ocr2keepersv3.PreProcessor[common.UpkeepPayload]{coord} + preprocessors := []ocr2keepersv3.PreProcessor{coord} + + // the log trigger flow is the happy path for log trigger payloads. all + // retryables that are encountered in this flow are elevated to the retry + // flow + logTriggerFlow := logflows.NewLogTriggerFlow(preprocessors, logProvider, retryQ, resultStore, stateUpdater, runner, logger) // the recovery proposal flow is for nodes to surface payloads that should // be recovered. these values are passed to the network and the network // votes on the proposed values - rcvProposal := newRecoveryProposalFlow(preprocessors, runner, metadataStore, rp, recoveryProposalInterval, stateUpdater, logger) + recoveryProposalFlow := recoveryflows.NewProposalRecoveryFlow(preprocessors, metadataStore, stateUpdater, runner, rp, logger) // the final recovery flow takes recoverable payloads merged with the latest // blocks and runs the pipeline for them. these values to run are derived // from node coordination and it can be assumed that all values should be // run. - rcvFinal := newFinalRecoveryFlow(preprocessors, resultStore, runner, retryQ, recoveryFinalInterval, proposals, builder, stateUpdater, logger) - - // the log trigger flow is the happy path for log trigger payloads. all - // retryables that are encountered in this flow are elevated to the retry - // flow - logTrigger := newLogTriggerFlow(preprocessors, resultStore, runner, logProvider, logInterval, retryQ, stateUpdater, logger) + finalRecoveryFlow := recoveryflows.NewFinalRecoveryFlow(preprocessors, resultStore, stateUpdater, retryQ, proposals, builder, runner, logger) return []service.Recoverable{ - rcvProposal, - rcvFinal, - logTrigger, + logTriggerFlow, + recoveryProposalFlow, + finalRecoveryFlow, } } diff --git a/pkg/v3/flows/factory_test.go b/pkg/v3/flows/factory_test.go index d1363bfe..3ce0f4b8 100644 --- a/pkg/v3/flows/factory_test.go +++ b/pkg/v3/flows/factory_test.go @@ -13,7 +13,7 @@ import ( ) func TestConditionalTriggerFlows(t *testing.T) { - flows := ConditionalTriggerFlows( + flows := NewConditionalTriggerFlows( nil, nil, nil, @@ -39,7 +39,7 @@ func TestConditionalTriggerFlows(t *testing.T) { } func TestLogTriggerFlows(t *testing.T) { - flows := LogTriggerFlows( + flows := NewLogTriggerFlows( nil, nil, nil, diff --git a/pkg/v3/flows/logtrigger.go b/pkg/v3/flows/log/log_trigger_flow.go similarity index 70% rename from pkg/v3/flows/logtrigger.go rename to pkg/v3/flows/log/log_trigger_flow.go index bfbe127a..c19858ce 100644 --- a/pkg/v3/flows/logtrigger.go +++ b/pkg/v3/flows/log/log_trigger_flow.go @@ -1,4 +1,4 @@ -package flows +package log import ( "context" @@ -21,20 +21,19 @@ var ( const ( // This is the ticker interval for log trigger flow - LogCheckInterval = 1 * time.Second + logCheckInterval = 1 * time.Second // Limit for processing a whole observer flow given a payload. The main component of this // is the checkPipeline which involves some RPC, DB and Mercury calls, this is limited // to 20 seconds for now - ObservationProcessLimit = 20 * time.Second + observationProcessLimit = 20 * time.Second ) // log trigger flow is the happy path entry point for log triggered upkeeps -func newLogTriggerFlow( - preprocessors []ocr2keepersv3.PreProcessor[common.UpkeepPayload], +func NewLogTriggerFlow( + preprocessors []ocr2keepersv3.PreProcessor, rs types.ResultStore, rn ocr2keepersv3.Runner, logProvider common.LogEventProvider, - logInterval time.Duration, retryQ types.RetryQueue, stateUpdater common.UpkeepStateUpdater, logger *log.Logger, @@ -45,17 +44,22 @@ func newLogTriggerFlow( postprocessors.NewIneligiblePostProcessor(stateUpdater, telemetry.WrapLogger(logger, "retry-ineligible-postprocessor")), ) - obs := ocr2keepersv3.NewRunnableObserver( + observer := ocr2keepersv3.NewRunnableObserver( preprocessors, post, rn, - ObservationProcessLimit, + observationProcessLimit, log.New(logger.Writer(), fmt.Sprintf("[%s | log-trigger-observer]", telemetry.ServiceName), telemetry.LogPkgStdFlags), ) - timeTick := tickers.NewTimeTicker[[]common.UpkeepPayload](logInterval, obs, func(ctx context.Context, _ time.Time) (tickers.Tick[[]common.UpkeepPayload], error) { + getterFn := func(ctx context.Context, _ time.Time) (tickers.Tick, error) { return logTick{logger: logger, logProvider: logProvider}, nil - }, log.New(logger.Writer(), fmt.Sprintf("[%s | log-trigger-ticker]", telemetry.ServiceName), telemetry.LogPkgStdFlags)) + } + + lgrPrefix := fmt.Sprintf("[%s | log-trigger-ticker]", telemetry.ServiceName) + lggr := log.New(logger.Writer(), lgrPrefix, telemetry.LogPkgStdFlags) + + timeTick := tickers.NewTimeTicker(logCheckInterval, observer, getterFn, lggr) return timeTick } @@ -65,14 +69,14 @@ type logTick struct { logger *log.Logger } -func (et logTick) Value(ctx context.Context) ([]common.UpkeepPayload, error) { - if et.logProvider == nil { +func (t logTick) Value(ctx context.Context) ([]common.UpkeepPayload, error) { + if t.logProvider == nil { return nil, nil } - logs, err := et.logProvider.GetLatestPayloads(ctx) + logs, err := t.logProvider.GetLatestPayloads(ctx) - et.logger.Printf("%d logs returned by log provider", len(logs)) + t.logger.Printf("%d logs returned by log provider", len(logs)) return logs, err } diff --git a/pkg/v3/flows/logtrigger_test.go b/pkg/v3/flows/log/log_trigger_flow_test.go similarity index 97% rename from pkg/v3/flows/logtrigger_test.go rename to pkg/v3/flows/log/log_trigger_flow_test.go index 2cb6c6cd..a83e4e91 100644 --- a/pkg/v3/flows/logtrigger_test.go +++ b/pkg/v3/flows/log/log_trigger_flow_test.go @@ -1,4 +1,4 @@ -package flows +package log import ( "context" @@ -68,7 +68,7 @@ func TestLogTriggerFlow(t *testing.T) { // set the ticker time lower to reduce the test time logInterval := 50 * time.Millisecond - svc := newLogTriggerFlow([]ocr2keepersv3.PreProcessor[common.UpkeepPayload]{coord}, + svc := NewLogTriggerFlow([]ocr2keepersv3.PreProcessor[common.UpkeepPayload]{coord}, rStore, runner, lp, logInterval, retryQ, upkeepStateUpdater, logger) var wg sync.WaitGroup diff --git a/pkg/v3/flows/recovery.go b/pkg/v3/flows/log/recovery_final_flow.go similarity index 54% rename from pkg/v3/flows/recovery.go rename to pkg/v3/flows/log/recovery_final_flow.go index 77e2529e..2be80abf 100644 --- a/pkg/v3/flows/recovery.go +++ b/pkg/v3/flows/log/recovery_final_flow.go @@ -1,4 +1,4 @@ -package flows +package log import ( "context" @@ -10,7 +10,6 @@ import ( ocr2keepersv3 "github.com/smartcontractkit/chainlink-automation/pkg/v3" "github.com/smartcontractkit/chainlink-automation/pkg/v3/postprocessors" - "github.com/smartcontractkit/chainlink-automation/pkg/v3/preprocessors" "github.com/smartcontractkit/chainlink-automation/pkg/v3/service" "github.com/smartcontractkit/chainlink-automation/pkg/v3/telemetry" "github.com/smartcontractkit/chainlink-automation/pkg/v3/tickers" @@ -19,20 +18,17 @@ import ( const ( // This is the ticker interval for recovery final flow - RecoveryFinalInterval = 1 * time.Second + recoveryFinalInterval = 1 * time.Second // These are the maximum number of log upkeeps dequeued on every tick from proposal queue in FinalRecoveryFlow // This is kept same as OutcomeSurfacedProposalsLimit as those many can get enqueued by plugin in every round - FinalRecoveryBatchSize = 50 - // This is the ticker interval for recovery proposal flow - RecoveryProposalInterval = 1 * time.Second + finalRecoveryBatchSize = 50 ) -func newFinalRecoveryFlow( - preprocessors []ocr2keepersv3.PreProcessor[common.UpkeepPayload], +func NewFinalRecoveryFlow( + preprocessors []ocr2keepersv3.PreProcessor, resultStore types.ResultStore, runner ocr2keepersv3.Runner, retryQ types.RetryQueue, - recoveryFinalizationInterval time.Duration, proposalQ types.ProposalQueue, builder common.PayloadBuilder, stateUpdater common.UpkeepStateUpdater, @@ -43,6 +39,10 @@ func newFinalRecoveryFlow( postprocessors.NewRetryablePostProcessor(retryQ, telemetry.WrapLogger(logger, "recovery-final-retryable-postprocessor")), postprocessors.NewIneligiblePostProcessor(stateUpdater, telemetry.WrapLogger(logger, "retry-ineligible-postprocessor")), ) + + observerLggrPrefix := fmt.Sprintf("[%s | recovery-final-observer]", telemetry.ServiceName) + observerLggr := log.New(logger.Writer(), observerLggrPrefix, telemetry.LogPkgStdFlags) + // create observer that only pushes results to result stores. everything at // this point can be dropped. this process is only responsible for running // recovery proposals that originate from network agreements @@ -50,19 +50,24 @@ func newFinalRecoveryFlow( preprocessors, post, runner, - ObservationProcessLimit, - log.New(logger.Writer(), fmt.Sprintf("[%s | recovery-final-observer]", telemetry.ServiceName), telemetry.LogPkgStdFlags), + observationProcessLimit, + observerLggr, ) - ticker := tickers.NewTimeTicker[[]common.UpkeepPayload](recoveryFinalizationInterval, recoveryObserver, func(ctx context.Context, _ time.Time) (tickers.Tick[[]common.UpkeepPayload], error) { + getterFn := func(ctx context.Context, _ time.Time) (tickers.Tick, error) { return coordinatedProposalsTick{ logger: logger, builder: builder, q: proposalQ, utype: types.LogTrigger, - batchSize: FinalRecoveryBatchSize, + batchSize: finalRecoveryBatchSize, }, nil - }, log.New(logger.Writer(), fmt.Sprintf("[%s | recovery-final-ticker]", telemetry.ServiceName), telemetry.LogPkgStdFlags)) + } + + lggrPrefix := fmt.Sprintf("[%s | recovery-final-ticker]", telemetry.ServiceName) + lggr := log.New(logger.Writer(), lggrPrefix, telemetry.LogPkgStdFlags) + + ticker := tickers.NewTimeTicker(recoveryFinalInterval, recoveryObserver, getterFn, lggr) return ticker } @@ -103,48 +108,3 @@ func (t coordinatedProposalsTick) Value(ctx context.Context) ([]common.UpkeepPay t.logger.Printf("%d payloads built from %d proposals, %d filtered", len(payloads), len(proposals), filtered) return payloads, nil } - -func newRecoveryProposalFlow( - preProcessors []ocr2keepersv3.PreProcessor[common.UpkeepPayload], - runner ocr2keepersv3.Runner, - metadataStore types.MetadataStore, - recoverableProvider common.RecoverableProvider, - recoveryInterval time.Duration, - stateUpdater common.UpkeepStateUpdater, - logger *log.Logger, -) service.Recoverable { - preProcessors = append(preProcessors, preprocessors.NewProposalFilterer(metadataStore, types.LogTrigger)) - postprocessors := postprocessors.NewCombinedPostprocessor( - postprocessors.NewIneligiblePostProcessor(stateUpdater, logger), - postprocessors.NewAddProposalToMetadataStorePostprocessor(metadataStore), - ) - - observer := ocr2keepersv3.NewRunnableObserver( - preProcessors, - postprocessors, - runner, - ObservationProcessLimit, - log.New(logger.Writer(), fmt.Sprintf("[%s | recovery-proposal-observer]", telemetry.ServiceName), telemetry.LogPkgStdFlags), - ) - - return tickers.NewTimeTicker[[]common.UpkeepPayload](recoveryInterval, observer, func(ctx context.Context, _ time.Time) (tickers.Tick[[]common.UpkeepPayload], error) { - return logRecoveryTick{logger: logger, logRecoverer: recoverableProvider}, nil - }, log.New(logger.Writer(), fmt.Sprintf("[%s | recovery-proposal-ticker]", telemetry.ServiceName), telemetry.LogPkgStdFlags)) -} - -type logRecoveryTick struct { - logRecoverer common.RecoverableProvider - logger *log.Logger -} - -func (et logRecoveryTick) Value(ctx context.Context) ([]common.UpkeepPayload, error) { - if et.logRecoverer == nil { - return nil, nil - } - - logs, err := et.logRecoverer.GetRecoveryProposals(ctx) - - et.logger.Printf("%d logs returned by log recoverer", len(logs)) - - return logs, err -} diff --git a/pkg/v3/flows/log/recovery_proposal_flow.go b/pkg/v3/flows/log/recovery_proposal_flow.go new file mode 100644 index 00000000..6646b760 --- /dev/null +++ b/pkg/v3/flows/log/recovery_proposal_flow.go @@ -0,0 +1,75 @@ +package log + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" + + ocr2keepersv3 "github.com/smartcontractkit/chainlink-automation/pkg/v3" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/postprocessors" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/preprocessors" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/service" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/telemetry" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/tickers" + common "github.com/smartcontractkit/chainlink-common/pkg/types/automation" +) + +const ( + // This is the ticker interval for recovery proposal flow + recoveryProposalInterval = 1 * time.Second +) + +func NewRecoveryProposalFlow( + preProcessors []ocr2keepersv3.PreProcessor, + runner ocr2keepersv3.Runner, + metadataStore types.MetadataStore, + recoverableProvider common.RecoverableProvider, + stateUpdater common.UpkeepStateUpdater, + logger *log.Logger, +) service.Recoverable { + preProcessors = append(preProcessors, preprocessors.NewProposalFilterer(metadataStore, types.LogTrigger)) + postprocessors := postprocessors.NewCombinedPostprocessor( + postprocessors.NewIneligiblePostProcessor(stateUpdater, logger), + postprocessors.NewAddProposalToMetadataStorePostprocessor(metadataStore), + ) + + observerLggrPrefix := fmt.Sprintf("[%s | recovery-proposal-observer]", telemetry.ServiceName) + observerLggr := log.New(logger.Writer(), observerLggrPrefix, telemetry.LogPkgStdFlags) + + observer := ocr2keepersv3.NewRunnableObserver( + preProcessors, + postprocessors, + runner, + observationProcessLimit, + observerLggr, + ) + + getterFn := func(ctx context.Context, _ time.Time) (tickers.Tick, error) { + return logRecoveryTick{logger: logger, logRecoverer: recoverableProvider}, nil + } + + lggrPrefix := fmt.Sprintf("[%s | recovery-proposal-ticker]", telemetry.ServiceName) + lggr := log.New(logger.Writer(), lggrPrefix, telemetry.LogPkgStdFlags) + + return tickers.NewTimeTicker(recoveryProposalInterval, observer, getterFn, lggr) +} + +type logRecoveryTick struct { + logRecoverer common.RecoverableProvider + logger *log.Logger +} + +func (t logRecoveryTick) Value(ctx context.Context) ([]common.UpkeepPayload, error) { + if t.logRecoverer == nil { + return nil, nil + } + + logs, err := t.logRecoverer.GetRecoveryProposals(ctx) + + t.logger.Printf("%d logs returned by log recoverer", len(logs)) + + return logs, err +} diff --git a/pkg/v3/flows/recovery_test.go b/pkg/v3/flows/log/recovery_proposal_flow_test.go similarity index 97% rename from pkg/v3/flows/recovery_test.go rename to pkg/v3/flows/log/recovery_proposal_flow_test.go index 554c66d4..d683426c 100644 --- a/pkg/v3/flows/recovery_test.go +++ b/pkg/v3/flows/log/recovery_proposal_flow_test.go @@ -1,4 +1,4 @@ -package flows +package log import ( "context" @@ -81,7 +81,7 @@ func TestRecoveryFinalization(t *testing.T) { // set the ticker time lower to reduce the test time recFinalInterval := 50 * time.Millisecond pre := []ocr2keepersv3.PreProcessor[common.UpkeepPayload]{coord} - svc := newFinalRecoveryFlow(pre, rStore, runner, retryQ, recFinalInterval, proposalQ, payloadBuilder, upkeepStateUpdater, logger) + svc := NewFinalRecoveryFlow(pre, rStore, runner, retryQ, recFinalInterval, proposalQ, payloadBuilder, upkeepStateUpdater, logger) var wg sync.WaitGroup wg.Add(1) @@ -183,7 +183,7 @@ func TestRecoveryProposal(t *testing.T) { interval := 50 * time.Millisecond pre := []ocr2keepersv3.PreProcessor[common.UpkeepPayload]{coord} stateUpdater := &mockStateUpdater{} - svc := newRecoveryProposalFlow(pre, runner, mStore, recoverer, interval, stateUpdater, logger) + svc := NewRecoveryProposalFlow(pre, runner, mStore, recoverer, interval, stateUpdater, logger) var wg sync.WaitGroup wg.Add(1) diff --git a/pkg/v3/flows/retry.go b/pkg/v3/flows/log/retry_flow.go similarity index 73% rename from pkg/v3/flows/retry.go rename to pkg/v3/flows/log/retry_flow.go index 200b094f..0d3f7ab0 100644 --- a/pkg/v3/flows/retry.go +++ b/pkg/v3/flows/log/retry_flow.go @@ -1,4 +1,4 @@ -package flows +package log import ( "context" @@ -17,13 +17,13 @@ import ( const ( // These are the max number of payloads dequeued on every tick from the retry queue in the retry flow - RetryBatchSize = 10 + retryBatchSize = 10 // This is the ticker interval for retry flow RetryCheckInterval = 5 * time.Second ) func NewRetryFlow( - coord ocr2keepersv3.PreProcessor[common.UpkeepPayload], + coord ocr2keepersv3.PreProcessor, resultStore types.ResultStore, runner ocr2keepersv3.Runner, retryQ types.RetryQueue, @@ -31,24 +31,32 @@ func NewRetryFlow( stateUpdater common.UpkeepStateUpdater, logger *log.Logger, ) service.Recoverable { - preprocessors := []ocr2keepersv3.PreProcessor[common.UpkeepPayload]{coord} + preprocessors := []ocr2keepersv3.PreProcessor{coord} post := postprocessors.NewCombinedPostprocessor( postprocessors.NewEligiblePostProcessor(resultStore, telemetry.WrapLogger(logger, "retry-eligible-postprocessor")), postprocessors.NewRetryablePostProcessor(retryQ, telemetry.WrapLogger(logger, "retry-retryable-postprocessor")), postprocessors.NewIneligiblePostProcessor(stateUpdater, telemetry.WrapLogger(logger, "retry-ineligible-postprocessor")), ) + observerLggrPrefix := fmt.Sprintf("[%s | retry-observer]", telemetry.ServiceName) + observerLggr := log.New(logger.Writer(), observerLggrPrefix, telemetry.LogPkgStdFlags) + obs := ocr2keepersv3.NewRunnableObserver( preprocessors, post, runner, - ObservationProcessLimit, - log.New(logger.Writer(), fmt.Sprintf("[%s | retry-observer]", telemetry.ServiceName), telemetry.LogPkgStdFlags), + observationProcessLimit, + observerLggr, ) - timeTick := tickers.NewTimeTicker[[]common.UpkeepPayload](retryTickerInterval, obs, func(ctx context.Context, _ time.Time) (tickers.Tick[[]common.UpkeepPayload], error) { - return retryTick{logger: logger, q: retryQ, batchSize: RetryBatchSize}, nil - }, log.New(logger.Writer(), fmt.Sprintf("[%s | retry-ticker]", telemetry.ServiceName), telemetry.LogPkgStdFlags)) + getterFn := func(ctx context.Context, _ time.Time) (tickers.Tick, error) { + return retryTick{logger: logger, q: retryQ, batchSize: retryBatchSize}, nil + } + + lggrPrefix := fmt.Sprintf("[%s | retry-ticker]", telemetry.ServiceName) + lggr := log.New(logger.Writer(), lggrPrefix, telemetry.LogPkgStdFlags) + + timeTick := tickers.NewTimeTicker(retryTickerInterval, obs, getterFn, lggr) return timeTick } diff --git a/pkg/v3/flows/retry_test.go b/pkg/v3/flows/log/retry_flow_test.go similarity index 99% rename from pkg/v3/flows/retry_test.go rename to pkg/v3/flows/log/retry_flow_test.go index fe5ef3eb..a2f77f72 100644 --- a/pkg/v3/flows/retry_test.go +++ b/pkg/v3/flows/log/retry_flow_test.go @@ -1,4 +1,4 @@ -package flows +package log import ( "context" diff --git a/pkg/v3/observer.go b/pkg/v3/observer.go index 4fc926a8..2940db2c 100644 --- a/pkg/v3/observer.go +++ b/pkg/v3/observer.go @@ -12,15 +12,15 @@ import ( // PreProcessor is the general interface for middleware used to filter, add, or modify upkeep // payloads before checking their eligibility status -type PreProcessor[T any] interface { +type PreProcessor interface { // PreProcess takes a slice of payloads and returns a new slice - PreProcess(context.Context, []T) ([]T, error) + PreProcess(context.Context, []ocr2keepers.UpkeepPayload) ([]ocr2keepers.UpkeepPayload, error) } // PostProcessor is the general interface for a processing function after checking eligibility status -type PostProcessor[T any] interface { +type PostProcessor interface { // PostProcess takes a slice of results where eligibility status is known - PostProcess(context.Context, []ocr2keepers.CheckResult, []T) error + PostProcess(context.Context, []ocr2keepers.CheckResult, []ocr2keepers.UpkeepPayload) error } // Runner is the interface for an object that should determine eligibility state @@ -29,12 +29,23 @@ type Runner interface { CheckUpkeeps(context.Context, ...ocr2keepers.UpkeepPayload) ([]ocr2keepers.CheckResult, error) } -type Observer[T any] struct { +type Observer struct { lggr *log.Logger - Preprocessors []PreProcessor[T] - Postprocessor PostProcessor[T] - processFunc func(context.Context, ...T) ([]ocr2keepers.CheckResult, error) + Preprocessors []PreProcessor + Postprocessor PostProcessor + processFunc func(context.Context, ...ocr2keepers.UpkeepPayload) ([]ocr2keepers.CheckResult, error) + + // internal configurations + processTimeLimit time.Duration +} + +type SampleProposalObserverV2 struct { + lggr *log.Logger + + Preprocessors []PreProcessor + Postprocessor PostProcessor + processFunc func(context.Context, ...ocr2keepers.UpkeepPayload) ([]ocr2keepers.CheckResult, error) // internal configurations processTimeLimit time.Duration @@ -42,13 +53,13 @@ type Observer[T any] struct { // NewRunnableObserver creates a new Observer with the given pre-processors, post-processor, and runner func NewRunnableObserver( - preprocessors []PreProcessor[ocr2keepers.UpkeepPayload], - postprocessor PostProcessor[ocr2keepers.UpkeepPayload], + preprocessors []PreProcessor, + postprocessor PostProcessor, runner Runner, processLimit time.Duration, logger *log.Logger, -) *Observer[ocr2keepers.UpkeepPayload] { - return &Observer[ocr2keepers.UpkeepPayload]{ +) *Observer { + return &Observer{ lggr: logger, Preprocessors: preprocessors, Postprocessor: postprocessor, @@ -57,25 +68,33 @@ func NewRunnableObserver( } } -// NewGenericObserver creates a new Observer with the given pre-processors, post-processor, and runner -func NewGenericObserver[T any]( - preprocessors []PreProcessor[T], - postprocessor PostProcessor[T], - processor func(context.Context, ...T) ([]ocr2keepers.CheckResult, error), +// NewRunnableObserver creates a new Observer with the given pre-processors, post-processor, and runner +func NewSampleProposalObserverV2( + preprocessors []PreProcessor, + postprocessor PostProcessor, + runner Runner, processLimit time.Duration, logger *log.Logger, -) *Observer[T] { - return &Observer[T]{ +) *SampleProposalObserverV2 { + return &SampleProposalObserverV2{ lggr: logger, Preprocessors: preprocessors, Postprocessor: postprocessor, - processFunc: processor, + processFunc: runner.CheckUpkeeps, processTimeLimit: processLimit, } } +func (t *SampleProposalObserverV2) Start(pctx context.Context) error { + return nil +} + +func (t *SampleProposalObserverV2) Close() error { + return nil +} + // Process - receives a tick and runs it through the eligibility pipeline. Calls all pre-processors, runs the check pipeline, and calls the post-processor. -func (o *Observer[T]) Process(ctx context.Context, tick tickers.Tick[[]T]) error { +func (o *Observer) Process(ctx context.Context, tick tickers.Tick) error { pCtx, cancel := context.WithTimeout(ctx, o.processTimeLimit) defer cancel() diff --git a/pkg/v3/plugin/plugin.go b/pkg/v3/plugin/plugin.go index 6ebb74a1..0a901deb 100644 --- a/pkg/v3/plugin/plugin.go +++ b/pkg/v3/plugin/plugin.go @@ -2,6 +2,7 @@ package plugin import ( "fmt" + log3 "github.com/smartcontractkit/chainlink-automation/pkg/v4/workflows/log" "log" "github.com/smartcontractkit/chainlink-automation/pkg/v3/config" @@ -59,12 +60,12 @@ func newPlugin( retryQ := stores.NewRetryQueue(logger) - retrySvc := flows.NewRetryFlow(coord, resultStore, runner, retryQ, flows.RetryCheckInterval, upkeepStateUpdater, logger) - + retrySvc := log3.NewRetryFlow(retryQ, coord, resultStore, upkeepStateUpdater, runner, logger) + proposalQ := stores.NewProposalQueue(upkeepTypeGetter) // initialize the log trigger eligibility flow - logTriggerFlows := flows.LogTriggerFlows( + logTriggerFlows := flows.NewLogTriggerFlows( coord, resultStore, metadataStore, @@ -72,9 +73,6 @@ func newPlugin( logProvider, recoverablesProvider, builder, - flows.LogCheckInterval, - flows.RecoveryProposalInterval, - flows.RecoveryFinalInterval, retryQ, proposalQ, upkeepStateUpdater, @@ -84,7 +82,7 @@ func newPlugin( // create service recoverers to provide panic recovery on dependent services allSvcs := append(logTriggerFlows, []service.Recoverable{retrySvc, resultStore, metadataStore, coord, runner}...) - contionalFlows := flows.ConditionalTriggerFlows( + contionalFlows := flows.NewConditionalTriggerFlows( coord, ratio, getter, diff --git a/pkg/v3/preprocessors/proposal_filterer.go b/pkg/v3/preprocessors/proposal_filterer.go index 89472f26..d0354a47 100644 --- a/pkg/v3/preprocessors/proposal_filterer.go +++ b/pkg/v3/preprocessors/proposal_filterer.go @@ -8,7 +8,7 @@ import ( ocr2keepers "github.com/smartcontractkit/chainlink-common/pkg/types/automation" ) -func NewProposalFilterer(metadata types.MetadataStore, upkeepType types.UpkeepType) ocr2keepersv3.PreProcessor[ocr2keepers.UpkeepPayload] { +func NewProposalFilterer(metadata types.MetadataStore, upkeepType types.UpkeepType) ocr2keepersv3.PreProcessor { return &proposalFilterer{ upkeepType: upkeepType, metadata: metadata, @@ -20,7 +20,7 @@ type proposalFilterer struct { upkeepType types.UpkeepType } -var _ ocr2keepersv3.PreProcessor[ocr2keepers.UpkeepPayload] = (*proposalFilterer)(nil) +var _ ocr2keepersv3.PreProcessor = (*proposalFilterer)(nil) func (p *proposalFilterer) PreProcess(ctx context.Context, payloads []ocr2keepers.UpkeepPayload) ([]ocr2keepers.UpkeepPayload, error) { all := p.metadata.ViewProposals(p.upkeepType) diff --git a/pkg/v3/tickers/tick.go b/pkg/v3/tickers/tick.go index 1ee446e4..1eba5818 100644 --- a/pkg/v3/tickers/tick.go +++ b/pkg/v3/tickers/tick.go @@ -2,10 +2,11 @@ package tickers import ( "context" + common "github.com/smartcontractkit/chainlink-common/pkg/types/automation" ) // Tick is the container for the individual tick -type Tick[T any] interface { +type Tick interface { // Value provides data scoped to the tick - Value(ctx context.Context) (T, error) + Value(ctx context.Context) ([]common.UpkeepPayload, error) } diff --git a/pkg/v3/tickers/time.go b/pkg/v3/tickers/time.go index a48fa8ad..659631ef 100644 --- a/pkg/v3/tickers/time.go +++ b/pkg/v3/tickers/time.go @@ -9,36 +9,34 @@ import ( "github.com/smartcontractkit/chainlink-automation/internal/util" ) -type observer[T any] interface { - Process(context.Context, Tick[T]) error +type observer interface { + Process(context.Context, Tick) error } -type getterFunc[T any] func(context.Context, time.Time) (Tick[T], error) +type getterFunc func(context.Context, time.Time) (Tick, error) -type timeTicker[T any] struct { +type timeTicker struct { closer util.Closer interval time.Duration - observer observer[T] - getterFn getterFunc[T] + observer observer + getterFn getterFunc logger *log.Logger } -func NewTimeTicker[T any](interval time.Duration, observer observer[T], getterFn getterFunc[T], logger *log.Logger) *timeTicker[T] { - t := &timeTicker[T]{ +func NewTimeTicker(interval time.Duration, observer observer, getterFn getterFunc, logger *log.Logger) *timeTicker { + return &timeTicker{ interval: interval, observer: observer, getterFn: getterFn, logger: logger, } - - return t } // Start uses the provided context for each call to the getter function with the // configured interval as a timeout. This function blocks until Close is called // or the parent context is cancelled. -func (t *timeTicker[T]) Start(pctx context.Context) error { +func (t *timeTicker) Start(pctx context.Context) error { ctx, cancel := context.WithCancel(pctx) defer cancel() @@ -65,7 +63,7 @@ func (t *timeTicker[T]) Start(pctx context.Context) error { // observer.Process can be a heavy call taking upto ObservationProcessLimit seconds // so it is run in a separate goroutine to not block further ticks // Exploratory: Add some control to limit the number of goroutines spawned - go func(c context.Context, t Tick[T], o observer[T], l *log.Logger) { + go func(c context.Context, t Tick, o observer, l *log.Logger) { if err := o.Process(c, t); err != nil { l.Printf("error processing observer: %s", err.Error()) } @@ -76,7 +74,7 @@ func (t *timeTicker[T]) Start(pctx context.Context) error { } } -func (t *timeTicker[T]) Close() error { +func (t *timeTicker) Close() error { _ = t.closer.Close() return nil } diff --git a/pkg/v4/workflows/conditional/final.go b/pkg/v4/workflows/conditional/final.go new file mode 100644 index 00000000..60462fe8 --- /dev/null +++ b/pkg/v4/workflows/conditional/final.go @@ -0,0 +1,97 @@ +package conditional + +import ( + "context" + "fmt" + "github.com/smartcontractkit/chainlink-automation/pkg/v4/workflows" + "log" + "time" + + ocr2keepersv3 "github.com/smartcontractkit/chainlink-automation/pkg/v3" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/postprocessors" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/service" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/telemetry" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" + common "github.com/smartcontractkit/chainlink-common/pkg/types/automation" +) + +const ( + // This is the ticker interval for final conditional flow + finalConditionalInterval = 1 * time.Second + // These are the maximum number of conditional upkeeps dequeued on every tick from proposal queue in FinalConditionalFlow + // This is kept same as OutcomeSurfacedProposalsLimit as those many can get enqueued by plugin in every round + finalConditionalBatchSize = 50 +) + +type conditionalFinalWorkflow struct { + preprocessors []ocr2keepersv3.PreProcessor + postprocessors postprocessors.PostProcessor + runner ocr2keepersv3.Runner + proposalQ types.ProposalQueue + builder common.PayloadBuilder + logger *log.Logger +} + +func NewConditionalFinalWorkflow( + preProcessors []ocr2keepersv3.PreProcessor, + resultStore types.ResultStore, + runner ocr2keepersv3.Runner, + proposalQ types.ProposalQueue, + builder common.PayloadBuilder, + retryQ types.RetryQueue, + logger *log.Logger, +) service.Recoverable { + postProcessors := postprocessors.NewCombinedPostprocessor( + postprocessors.NewEligiblePostProcessor(resultStore, telemetry.WrapLogger(logger, "conditional-final-eligible-postprocessor")), + postprocessors.NewRetryablePostProcessor(retryQ, telemetry.WrapLogger(logger, "conditional-final-retryable-postprocessor")), + ) + + lggr := log.New(logger.Writer(), fmt.Sprintf("[%s | conditional-final-observe]", telemetry.ServiceName), telemetry.LogPkgStdFlags) + + workflowProvider := &conditionalFinalWorkflow{ + preprocessors: preProcessors, + postprocessors: postProcessors, + runner: runner, + proposalQ: proposalQ, + builder: builder, + logger: lggr, + } + + return workflows.NewPipeline(workflowProvider, finalConditionalInterval, lggr) +} + +func (t *conditionalFinalWorkflow) GetPayloads(ctx context.Context) ([]common.UpkeepPayload, error) { + proposals, err := t.proposalQ.Dequeue(types.ConditionTrigger, finalConditionalBatchSize) + if err != nil { + return nil, fmt.Errorf("failed to dequeue from retry queue: %w", err) + } + t.logger.Printf("%d proposals returned from queue", len(proposals)) + + builtPayloads, err := t.builder.BuildPayloads(ctx, proposals...) + if err != nil { + return nil, fmt.Errorf("failed to build payloads from proposals: %w", err) + } + payloads := []common.UpkeepPayload{} + filtered := 0 + for _, p := range builtPayloads { + if p.IsEmpty() { + filtered++ + continue + } + payloads = append(payloads, p) + } + t.logger.Printf("%d payloads built from %d proposals, %d filtered", len(payloads), len(proposals), filtered) + return payloads, nil +} + +func (t *conditionalFinalWorkflow) GetPreprocessors() []ocr2keepersv3.PreProcessor { + return t.preprocessors +} + +func (t *conditionalFinalWorkflow) GetPostprocessor() postprocessors.PostProcessor { + return t.postprocessors +} + +func (t *conditionalFinalWorkflow) GetRunner() ocr2keepersv3.Runner { + return t.runner +} diff --git a/pkg/v4/workflows/conditional/proposal.go b/pkg/v4/workflows/conditional/proposal.go new file mode 100644 index 00000000..29d075d8 --- /dev/null +++ b/pkg/v4/workflows/conditional/proposal.go @@ -0,0 +1,105 @@ +package conditional + +import ( + "context" + "fmt" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/random" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/telemetry" + "github.com/smartcontractkit/chainlink-automation/pkg/v4/workflows" + "log" + "time" + + ocr2keepersv3 "github.com/smartcontractkit/chainlink-automation/pkg/v3" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/postprocessors" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/preprocessors" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/service" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" + common "github.com/smartcontractkit/chainlink-common/pkg/types/automation" +) + +const ( + // This is the ticker interval for sampling conditional flow + samplingConditionInterval = 3 * time.Second + // Maximum number of upkeeps to be sampled in every round + maxSampledConditionals = 300 +) + +type shuffler[T any] interface { + Shuffle([]T) []T +} + +type conditionalProposalFlow struct { + preprocessors []ocr2keepersv3.PreProcessor + postprocessors postprocessors.PostProcessor + runner ocr2keepersv3.Runner + upkeepProvider common.ConditionalUpkeepProvider + shuffler shuffler[common.UpkeepPayload] + ratio types.Ratio + logger *log.Logger +} + +func NewConditionalProposalFlow( + preProcessors []ocr2keepersv3.PreProcessor, + ratio types.Ratio, + upkeepProvider common.ConditionalUpkeepProvider, + metadataStore types.MetadataStore, + runner ocr2keepersv3.Runner, + logger *log.Logger, +) service.Recoverable { + preProcessors = append(preProcessors, preprocessors.NewProposalFilterer(metadataStore, types.LogTrigger)) + postProcessors := postprocessors.NewAddProposalToMetadataStorePostprocessor(metadataStore) + + lggr := log.New(logger.Writer(), fmt.Sprintf("[%s | sample-proposal-observer]", telemetry.ServiceName), telemetry.LogPkgStdFlags) + + workflowProvider := &conditionalProposalFlow{ + preprocessors: preProcessors, + postprocessors: postProcessors, + runner: runner, + upkeepProvider: upkeepProvider, + ratio: ratio, + shuffler: random.Shuffler[common.UpkeepPayload]{Source: random.NewCryptoRandSource()}, + + logger: lggr, + } + + return workflows.NewPipeline(workflowProvider, samplingConditionInterval, lggr) +} + +func (t *conditionalProposalFlow) GetPayloads(ctx context.Context) ([]common.UpkeepPayload, error) { + upkeeps, err := t.upkeepProvider.GetActiveUpkeeps(ctx) + if err != nil { + return nil, err + } + + if len(upkeeps) == 0 { + return nil, nil + } + + upkeeps = t.shuffler.Shuffle(upkeeps) + size := t.ratio.OfInt(len(upkeeps)) + + if size <= 0 { + return nil, nil + } + if size > maxSampledConditionals { + t.logger.Printf("Required sample size %d exceeds max allowed conditional samples %d, limiting to max", size, maxSampledConditionals) + size = maxSampledConditionals + } + if len(upkeeps) < size { + size = len(upkeeps) + } + t.logger.Printf("sampled %d upkeeps", size) + return upkeeps[:size], nil +} + +func (t *conditionalProposalFlow) GetPreprocessors() []ocr2keepersv3.PreProcessor { + return t.preprocessors +} + +func (t *conditionalProposalFlow) GetPostprocessor() postprocessors.PostProcessor { + return t.postprocessors +} + +func (t *conditionalProposalFlow) GetRunner() ocr2keepersv3.Runner { + return t.runner +} diff --git a/pkg/v4/workflows/log/log_trigger.go b/pkg/v4/workflows/log/log_trigger.go new file mode 100644 index 00000000..75b0559d --- /dev/null +++ b/pkg/v4/workflows/log/log_trigger.go @@ -0,0 +1,81 @@ +package log + +import ( + "context" + "fmt" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/telemetry" + "github.com/smartcontractkit/chainlink-automation/pkg/v4/workflows" + "log" + "time" + + ocr2keepersv3 "github.com/smartcontractkit/chainlink-automation/pkg/v3" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/postprocessors" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/service" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" + common "github.com/smartcontractkit/chainlink-common/pkg/types/automation" +) + +const ( + // This is the ticker interval for log trigger flow + logCheckInterval = 1 * time.Second +) + +type logTriggerFlow struct { + preprocessors []ocr2keepersv3.PreProcessor + postprocessors postprocessors.PostProcessor + runner ocr2keepersv3.Runner + logProvider common.LogEventProvider + + upkeepProvider common.ConditionalUpkeepProvider + ratio types.Ratio + + logger *log.Logger +} + +func NewLogTriggerFlow( + preProcessors []ocr2keepersv3.PreProcessor, + logProvider common.LogEventProvider, + retryQ types.RetryQueue, + resultStore types.ResultStore, + stateUpdater common.UpkeepStateUpdater, + runner ocr2keepersv3.Runner, + logger *log.Logger, +) service.Recoverable { + postProcessors := postprocessors.NewCombinedPostprocessor( + postprocessors.NewEligiblePostProcessor(resultStore, telemetry.WrapLogger(logger, "log-trigger-eligible-postprocessor")), + postprocessors.NewRetryablePostProcessor(retryQ, telemetry.WrapLogger(logger, "log-trigger-retryable-postprocessor")), + postprocessors.NewIneligiblePostProcessor(stateUpdater, telemetry.WrapLogger(logger, "retry-ineligible-postprocessor")), + ) + + lggr := log.New(logger.Writer(), fmt.Sprintf("[%s | log-trigger-observer]", telemetry.ServiceName), telemetry.LogPkgStdFlags) + + workflowProvider := &logTriggerFlow{ + preprocessors: preProcessors, + postprocessors: postProcessors, + runner: runner, + logProvider: logProvider, + logger: lggr, + } + + return workflows.NewPipeline(workflowProvider, logCheckInterval, lggr) +} + +func (t *logTriggerFlow) GetPayloads(ctx context.Context) ([]common.UpkeepPayload, error) { + logs, err := t.logProvider.GetLatestPayloads(ctx) + + t.logger.Printf("%d logs returned by log provider", len(logs)) + + return logs, err +} + +func (t *logTriggerFlow) GetPreprocessors() []ocr2keepersv3.PreProcessor { + return t.preprocessors +} + +func (t *logTriggerFlow) GetPostprocessor() postprocessors.PostProcessor { + return t.postprocessors +} + +func (t *logTriggerFlow) GetRunner() ocr2keepersv3.Runner { + return t.runner +} diff --git a/pkg/v4/workflows/log/recovery/final.go b/pkg/v4/workflows/log/recovery/final.go new file mode 100644 index 00000000..b49b014a --- /dev/null +++ b/pkg/v4/workflows/log/recovery/final.go @@ -0,0 +1,100 @@ +package recovery + +import ( + "context" + "fmt" + ocr2keepersv3 "github.com/smartcontractkit/chainlink-automation/pkg/v3" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/postprocessors" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/service" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/telemetry" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" + "github.com/smartcontractkit/chainlink-automation/pkg/v4/workflows" + common "github.com/smartcontractkit/chainlink-common/pkg/types/automation" + "log" + "time" +) + +const ( + // This is the ticker interval for recovery final flow + recoveryFinalInterval = 1 * time.Second + // These are the maximum number of log upkeeps dequeued on every tick from proposal queue in FinalRecoveryFlow + // This is kept same as OutcomeSurfacedProposalsLimit as those many can get enqueued by plugin in every round + finalRecoveryBatchSize = 50 +) + +type finalFlow struct { + preprocessors []ocr2keepersv3.PreProcessor + postprocessors postprocessors.PostProcessor + runner ocr2keepersv3.Runner + retryQ types.RetryQueue + proposalQ types.ProposalQueue + builder common.PayloadBuilder + logger *log.Logger +} + +func NewFinalRecoveryFlow( + preProcessors []ocr2keepersv3.PreProcessor, + resultStore types.ResultStore, + stateUpdater common.UpkeepStateUpdater, + retryQ types.RetryQueue, + proposalQ types.ProposalQueue, + builder common.PayloadBuilder, + runner ocr2keepersv3.Runner, + logger *log.Logger, +) service.Recoverable { + postProcessors := postprocessors.NewCombinedPostprocessor( + postprocessors.NewEligiblePostProcessor(resultStore, telemetry.WrapLogger(logger, "recovery-final-eligible-postprocessor")), + postprocessors.NewRetryablePostProcessor(retryQ, telemetry.WrapLogger(logger, "recovery-final-retryable-postprocessor")), + postprocessors.NewIneligiblePostProcessor(stateUpdater, telemetry.WrapLogger(logger, "retry-ineligible-postprocessor")), + ) + + lggr := log.New(logger.Writer(), fmt.Sprintf("[%s | recovery-final-observer]", telemetry.ServiceName), telemetry.LogPkgStdFlags) + + workflowProvider := &finalFlow{ + preprocessors: preProcessors, + postprocessors: postProcessors, + runner: runner, + retryQ: retryQ, + proposalQ: proposalQ, + builder: builder, + logger: lggr, + } + + return workflows.NewPipeline(workflowProvider, recoveryFinalInterval, lggr) +} + +func (t *finalFlow) GetPayloads(ctx context.Context) ([]common.UpkeepPayload, error) { + proposals, err := t.proposalQ.Dequeue(types.LogTrigger, finalRecoveryBatchSize) + if err != nil { + return nil, fmt.Errorf("failed to dequeue from retry queue: %w", err) + } + t.logger.Printf("%d proposals returned from queue", len(proposals)) + + builtPayloads, err := t.builder.BuildPayloads(ctx, proposals...) + if err != nil { + return nil, fmt.Errorf("failed to build payloads from proposals: %w", err) + } + payloads := []common.UpkeepPayload{} + filtered := 0 + for _, p := range builtPayloads { + if p.IsEmpty() { + filtered++ + continue + } + payloads = append(payloads, p) + } + t.logger.Printf("%d payloads built from %d proposals, %d filtered", len(payloads), len(proposals), filtered) + return payloads, nil +} + +func (t *finalFlow) GetPreprocessors() []ocr2keepersv3.PreProcessor { + return t.preprocessors +} + +func (t *finalFlow) GetPostprocessor() postprocessors.PostProcessor { + return t.postprocessors +} + +func (t *finalFlow) GetRunner() ocr2keepersv3.Runner { + return t.runner +} diff --git a/pkg/v4/workflows/log/recovery/proposal.go b/pkg/v4/workflows/log/recovery/proposal.go new file mode 100644 index 00000000..b07dc29a --- /dev/null +++ b/pkg/v4/workflows/log/recovery/proposal.go @@ -0,0 +1,76 @@ +package recovery + +import ( + "context" + "fmt" + ocr2keepersv3 "github.com/smartcontractkit/chainlink-automation/pkg/v3" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/postprocessors" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/preprocessors" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/service" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/telemetry" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" + "github.com/smartcontractkit/chainlink-automation/pkg/v4/workflows" + common "github.com/smartcontractkit/chainlink-common/pkg/types/automation" + "log" + "time" +) + +const ( + // This is the ticker interval for recovery proposal flow + recoveryProposalInterval = 1 * time.Second +) + +type proposalFlow struct { + preprocessors []ocr2keepersv3.PreProcessor + postprocessors postprocessors.PostProcessor + runner ocr2keepersv3.Runner + logRecoverer common.RecoverableProvider + logger *log.Logger +} + +func NewProposalRecoveryFlow( + preProcessors []ocr2keepersv3.PreProcessor, + metadataStore types.MetadataStore, + stateUpdater common.UpkeepStateUpdater, + runner ocr2keepersv3.Runner, + logRecoverer common.RecoverableProvider, + logger *log.Logger, +) service.Recoverable { + preProcessors = append(preProcessors, preprocessors.NewProposalFilterer(metadataStore, types.LogTrigger)) + postProcessors := postprocessors.NewCombinedPostprocessor( + postprocessors.NewIneligiblePostProcessor(stateUpdater, logger), + postprocessors.NewAddProposalToMetadataStorePostprocessor(metadataStore), + ) + + lggr := log.New(logger.Writer(), fmt.Sprintf("[%s | recovery-proposal-observer]", telemetry.ServiceName), telemetry.LogPkgStdFlags) + + workflowProvider := &proposalFlow{ + preprocessors: preProcessors, + postprocessors: postProcessors, + runner: runner, + logRecoverer: logRecoverer, + logger: lggr, + } + + return workflows.NewPipeline(workflowProvider, recoveryProposalInterval, lggr) +} + +func (t *proposalFlow) GetPayloads(ctx context.Context) ([]common.UpkeepPayload, error) { + logs, err := t.logRecoverer.GetRecoveryProposals(ctx) + + t.logger.Printf("%d logs returned by log recoverer", len(logs)) + + return logs, err +} + +func (t *proposalFlow) GetPreprocessors() []ocr2keepersv3.PreProcessor { + return t.preprocessors +} + +func (t *proposalFlow) GetPostprocessor() postprocessors.PostProcessor { + return t.postprocessors +} + +func (t *proposalFlow) GetRunner() ocr2keepersv3.Runner { + return t.runner +} diff --git a/pkg/v4/workflows/log/retry.go b/pkg/v4/workflows/log/retry.go new file mode 100644 index 00000000..c8f83e96 --- /dev/null +++ b/pkg/v4/workflows/log/retry.go @@ -0,0 +1,81 @@ +package log + +import ( + "context" + "fmt" + ocr2keepersv3 "github.com/smartcontractkit/chainlink-automation/pkg/v3" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/postprocessors" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/service" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/telemetry" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" + "github.com/smartcontractkit/chainlink-automation/pkg/v4/workflows" + common "github.com/smartcontractkit/chainlink-common/pkg/types/automation" + "log" + "time" +) + +const ( + // These are the max number of payloads dequeued on every tick from the retry queue in the retry flow + retryBatchSize = 10 + + // This is the ticker interval for retry flow + retryCheckInterval = 5 * time.Second +) + +type retryFlow struct { + preprocessors []ocr2keepersv3.PreProcessor + postprocessors postprocessors.PostProcessor + runner ocr2keepersv3.Runner + retryQ types.RetryQueue + logger *log.Logger +} + +func NewRetryFlow( + retryQ types.RetryQueue, + coord ocr2keepersv3.PreProcessor, + resultStore types.ResultStore, + stateUpdater common.UpkeepStateUpdater, + runner ocr2keepersv3.Runner, + logger *log.Logger, +) service.Recoverable { + preProcessors := []ocr2keepersv3.PreProcessor{coord} + postProcessors := postprocessors.NewCombinedPostprocessor( + postprocessors.NewEligiblePostProcessor(resultStore, telemetry.WrapLogger(logger, "retry-eligible-postprocessor")), + postprocessors.NewRetryablePostProcessor(retryQ, telemetry.WrapLogger(logger, "retry-retryable-postprocessor")), + postprocessors.NewIneligiblePostProcessor(stateUpdater, telemetry.WrapLogger(logger, "retry-ineligible-postprocessor")), + ) + + lggr := log.New(logger.Writer(), fmt.Sprintf("[%s | retry-observer]", telemetry.ServiceName), telemetry.LogPkgStdFlags) + + workflowProvider := &retryFlow{ + preprocessors: preProcessors, + postprocessors: postProcessors, + runner: runner, + logger: lggr, + retryQ: retryQ, + } + + return workflows.NewPipeline(workflowProvider, retryCheckInterval, lggr) +} + +func (t *retryFlow) GetPayloads(ctx context.Context) ([]common.UpkeepPayload, error) { + payloads, err := t.retryQ.Dequeue(retryBatchSize) + if err != nil { + return nil, fmt.Errorf("failed to dequeue from retry queue: %w", err) + } + t.logger.Printf("%d payloads returned by retry queue", len(payloads)) + + return payloads, err +} + +func (t *retryFlow) GetPreprocessors() []ocr2keepersv3.PreProcessor { + return t.preprocessors +} + +func (t *retryFlow) GetPostprocessor() postprocessors.PostProcessor { + return t.postprocessors +} + +func (t *retryFlow) GetRunner() ocr2keepersv3.Runner { + return t.runner +} diff --git a/pkg/v4/workflows/pipeline.go b/pkg/v4/workflows/pipeline.go new file mode 100644 index 00000000..64ba2b38 --- /dev/null +++ b/pkg/v4/workflows/pipeline.go @@ -0,0 +1,122 @@ +package workflows + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/smartcontractkit/chainlink-automation/internal/util" + ocr2keepers "github.com/smartcontractkit/chainlink-automation/pkg/v3" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/postprocessors" + "github.com/smartcontractkit/chainlink-common/pkg/types/automation" +) + +const ( + observationProcessLimit = 20 * time.Second +) + +type WorkflowProvider interface { + GetPayloads(ctx context.Context) ([]automation.UpkeepPayload, error) + GetPreprocessors() []ocr2keepers.PreProcessor + GetPostprocessor() postprocessors.PostProcessor + GetRunner() ocr2keepers.Runner +} + +type Pipeline struct { + closer util.Closer + logger *log.Logger + interval time.Duration + workflowProvider WorkflowProvider +} + +func NewPipeline(provider WorkflowProvider, interval time.Duration, logger *log.Logger) *Pipeline { + return &Pipeline{ + logger: logger, + interval: interval, + workflowProvider: provider, + } +} + +func (p *Pipeline) Start(pctx context.Context) error { + ctx, cancel := context.WithCancel(pctx) + defer cancel() + + if !p.closer.Store(cancel) { + return fmt.Errorf("already running") + } + + p.logger.Printf("starting ticker service") + defer p.logger.Printf("ticker service stopped") + + ticker := time.NewTicker(p.interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + payloads, err := p.workflowProvider.GetPayloads(ctx) + if err != nil { + p.logger.Printf("error fetching payloads: %s", err.Error()) + } + + preprocessor := p.workflowProvider.GetPreprocessors() + + postprocessor := p.workflowProvider.GetPostprocessor() + + runner := p.workflowProvider.GetRunner() + + // Process can be a heavy call taking upto ObservationProcessLimit seconds + // so it is run in a separate goroutine to not block further ticks + // Exploratory: Add some control to limit the number of goroutines spawned + go func(ctx context.Context, payloads []automation.UpkeepPayload, l *log.Logger) { + if err := p.Process(ctx, p.logger, payloads, preprocessor, postprocessor, runner); err != nil { + l.Printf("error processing observer: %s", err.Error()) + } + }(ctx, payloads, p.logger) + case <-ctx.Done(): + return nil + } + } +} + +func (p *Pipeline) Process(ctx context.Context, logger *log.Logger, payloads []automation.UpkeepPayload, preprocessors []ocr2keepers.PreProcessor, postprocessor postprocessors.PostProcessor, runner ocr2keepers.Runner) error { + pCtx, cancel := context.WithTimeout(ctx, observationProcessLimit) + defer cancel() + + logger.Printf("got %d payloads from ticker", len(payloads)) + + var err error + + // Run pre-processors + for _, preprocessor := range preprocessors { + payloads, err = preprocessor.PreProcess(pCtx, payloads) + if err != nil { + return err + } + } + + logger.Printf("processing %d payloads", len(payloads)) + + // Run check pipeline + results, err := runner.CheckUpkeeps(pCtx, payloads...) + if err != nil { + return err + } + + logger.Printf("post-processing %d results", len(results)) + + // Run post-processor + if err := postprocessor.PostProcess(pCtx, results, payloads); err != nil { + return err + } + + logger.Printf("finished processing of %d results: %+v", len(results), results) + + return nil +} + +func (p *Pipeline) Close() error { + _ = p.closer.Close() + return nil +}