Skip to content

Commit

Permalink
Remove generic types from observer and pre and post processors
Browse files Browse the repository at this point in the history
  • Loading branch information
ferglor committed Apr 5, 2024
1 parent 2c683c4 commit e081699
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 126 deletions.
4 changes: 2 additions & 2 deletions pkg/v3/flows/conditional.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (
)

func newSampleProposalFlow(
pre []ocr2keepersv3.PreProcessor[common.UpkeepPayload],
pre []ocr2keepersv3.PreProcessor,
ratio types.Ratio,
getter common.ConditionalUpkeepProvider,
ms types.MetadataStore,
Expand Down Expand Up @@ -107,7 +107,7 @@ func (s *sampler) Value(ctx context.Context) ([]common.UpkeepPayload, error) {
}

func newFinalConditionalFlow(
preprocessors []ocr2keepersv3.PreProcessor[common.UpkeepPayload],
preprocessors []ocr2keepersv3.PreProcessor,
resultStore types.ResultStore,
runner ocr2keepersv3.Runner,
interval time.Duration,
Expand Down
4 changes: 2 additions & 2 deletions pkg/v3/flows/conditional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestConditionalFinalization(t *testing.T) {
upkeepStateUpdater.On("SetUpkeepState", mock.Anything, mock.Anything, mock.Anything).Return(nil)
// set the ticker time lower to reduce the test time
interval := 50 * time.Millisecond
pre := []ocr2keepersv3.PreProcessor[common.UpkeepPayload]{coord}
pre := []ocr2keepersv3.PreProcessor{coord}
svc := newFinalConditionalFlow(pre, rStore, runner, interval, proposalQ, payloadBuilder, retryQ, upkeepStateUpdater, logger)

var wg sync.WaitGroup
Expand Down Expand Up @@ -184,7 +184,7 @@ func TestSamplingProposal(t *testing.T) {
}, nil).Times(2)
upkeepProvider.On("GetActiveUpkeeps", mock.Anything).Return([]common.UpkeepPayload{}, nil)
// set the ticker time lower to reduce the test time
pre := []ocr2keepersv3.PreProcessor[common.UpkeepPayload]{coord}
pre := []ocr2keepersv3.PreProcessor{coord}
svc := newSampleProposalFlow(pre, ratio, upkeepProvider, mStore, runner, time.Millisecond*100, logger)

var wg sync.WaitGroup
Expand Down
8 changes: 4 additions & 4 deletions pkg/v3/flows/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

func ConditionalTriggerFlows(
coord ocr2keepersv3.PreProcessor[common.UpkeepPayload],
coord ocr2keepersv3.PreProcessor,
ratio types.Ratio,
getter common.ConditionalUpkeepProvider,
subscriber common.BlockSubscriber,
Expand All @@ -24,7 +24,7 @@ func ConditionalTriggerFlows(
stateUpdater common.UpkeepStateUpdater,
logger *log.Logger,
) []service.Recoverable {
preprocessors := []ocr2keepersv3.PreProcessor[common.UpkeepPayload]{coord}
preprocessors := []ocr2keepersv3.PreProcessor{coord}

// runs full check pipeline on a coordinated block with coordinated upkeeps
conditionalFinal := newFinalConditionalFlow(preprocessors, resultStore, runner, FinalConditionalInterval, proposalQ, builder, retryQ, stateUpdater, logger)
Expand All @@ -37,7 +37,7 @@ func ConditionalTriggerFlows(
}

func LogTriggerFlows(
coord ocr2keepersv3.PreProcessor[common.UpkeepPayload],
coord ocr2keepersv3.PreProcessor,
resultStore types.ResultStore,
metadataStore types.MetadataStore,
runner ocr2keepersv3.Runner,
Expand All @@ -54,7 +54,7 @@ func LogTriggerFlows(
) []service.Recoverable {
// all flows use the same preprocessor based on the coordinator
// each flow can add preprocessors to this provided slice
preprocessors := []ocr2keepersv3.PreProcessor[common.UpkeepPayload]{coord}
preprocessors := []ocr2keepersv3.PreProcessor{coord}

// the recovery proposal flow is for nodes to surface payloads that should
// be recovered. these values are passed to the network and the network
Expand Down
2 changes: 1 addition & 1 deletion pkg/v3/flows/logtrigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (

// log trigger flow is the happy path entry point for log triggered upkeeps
func newLogTriggerFlow(
preprocessors []ocr2keepersv3.PreProcessor[common.UpkeepPayload],
preprocessors []ocr2keepersv3.PreProcessor,
rs types.ResultStore,
rn ocr2keepersv3.Runner,
logProvider common.LogEventProvider,
Expand Down
2 changes: 1 addition & 1 deletion pkg/v3/flows/logtrigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestLogTriggerFlow(t *testing.T) {
// set the ticker time lower to reduce the test time
logInterval := 50 * time.Millisecond

svc := newLogTriggerFlow([]ocr2keepersv3.PreProcessor[common.UpkeepPayload]{coord},
svc := newLogTriggerFlow([]ocr2keepersv3.PreProcessor{coord},
rStore, runner, lp, logInterval, retryQ, upkeepStateUpdater, logger)

var wg sync.WaitGroup
Expand Down
4 changes: 2 additions & 2 deletions pkg/v3/flows/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const (
)

func newFinalRecoveryFlow(
preprocessors []ocr2keepersv3.PreProcessor[common.UpkeepPayload],
preprocessors []ocr2keepersv3.PreProcessor,
resultStore types.ResultStore,
runner ocr2keepersv3.Runner,
retryQ types.RetryQueue,
Expand Down Expand Up @@ -105,7 +105,7 @@ func (t coordinatedProposalsTick) Value(ctx context.Context) ([]common.UpkeepPay
}

func newRecoveryProposalFlow(
preProcessors []ocr2keepersv3.PreProcessor[common.UpkeepPayload],
preProcessors []ocr2keepersv3.PreProcessor,
runner ocr2keepersv3.Runner,
metadataStore types.MetadataStore,
recoverableProvider common.RecoverableProvider,
Expand Down
4 changes: 2 additions & 2 deletions pkg/v3/flows/recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestRecoveryFinalization(t *testing.T) {
upkeepStateUpdater.On("SetUpkeepState", mock.Anything, mock.Anything, mock.Anything).Return(nil)
// set the ticker time lower to reduce the test time
recFinalInterval := 50 * time.Millisecond
pre := []ocr2keepersv3.PreProcessor[common.UpkeepPayload]{coord}
pre := []ocr2keepersv3.PreProcessor{coord}
svc := newFinalRecoveryFlow(pre, rStore, runner, retryQ, recFinalInterval, proposalQ, payloadBuilder, upkeepStateUpdater, logger)

var wg sync.WaitGroup
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestRecoveryProposal(t *testing.T) {
}, nil).Times(1)
// set the ticker time lower to reduce the test time
interval := 50 * time.Millisecond
pre := []ocr2keepersv3.PreProcessor[common.UpkeepPayload]{coord}
pre := []ocr2keepersv3.PreProcessor{coord}
stateUpdater := &mockStateUpdater{}
svc := newRecoveryProposalFlow(pre, runner, mStore, recoverer, interval, stateUpdater, logger)

Expand Down
4 changes: 2 additions & 2 deletions pkg/v3/flows/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ const (
)

func NewRetryFlow(
coord ocr2keepersv3.PreProcessor[common.UpkeepPayload],
coord ocr2keepersv3.PreProcessor,
resultStore types.ResultStore,
runner ocr2keepersv3.Runner,
retryQ types.RetryQueue,
retryTickerInterval time.Duration,
stateUpdater common.UpkeepStateUpdater,
logger *log.Logger,
) service.Recoverable {
preprocessors := []ocr2keepersv3.PreProcessor[common.UpkeepPayload]{coord}
preprocessors := []ocr2keepersv3.PreProcessor{coord}
post := postprocessors.NewCombinedPostprocessor(
postprocessors.NewEligiblePostProcessor(resultStore, telemetry.WrapLogger(logger, "retry-eligible-postprocessor")),
postprocessors.NewRetryablePostProcessor(retryQ, telemetry.WrapLogger(logger, "retry-retryable-postprocessor")),
Expand Down
43 changes: 13 additions & 30 deletions pkg/v3/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ import (

// PreProcessor is the general interface for middleware used to filter, add, or modify upkeep
// payloads before checking their eligibility status
type PreProcessor[T any] interface {
type PreProcessor interface {
// PreProcess takes a slice of payloads and returns a new slice
PreProcess(context.Context, []T) ([]T, error)
PreProcess(context.Context, []ocr2keepers.UpkeepPayload) ([]ocr2keepers.UpkeepPayload, error)
}

// PostProcessor is the general interface for a processing function after checking eligibility status
type PostProcessor[T any] interface {
type PostProcessor interface {
// PostProcess takes a slice of results where eligibility status is known
PostProcess(context.Context, []ocr2keepers.CheckResult, []T) error
PostProcess(context.Context, []ocr2keepers.CheckResult, []ocr2keepers.UpkeepPayload) error
}

// Runner is the interface for an object that should determine eligibility state
Expand All @@ -29,26 +29,26 @@ type Runner interface {
CheckUpkeeps(context.Context, ...ocr2keepers.UpkeepPayload) ([]ocr2keepers.CheckResult, error)
}

type Observer[T any] struct {
type Observer struct {
lggr *log.Logger

Preprocessors []PreProcessor[T]
Postprocessor PostProcessor[T]
processFunc func(context.Context, ...T) ([]ocr2keepers.CheckResult, error)
Preprocessors []PreProcessor
Postprocessor PostProcessor
processFunc func(context.Context, ...ocr2keepers.UpkeepPayload) ([]ocr2keepers.CheckResult, error)

// internal configurations
processTimeLimit time.Duration
}

// NewRunnableObserver creates a new Observer with the given pre-processors, post-processor, and runner
func NewRunnableObserver(
preprocessors []PreProcessor[ocr2keepers.UpkeepPayload],
postprocessor PostProcessor[ocr2keepers.UpkeepPayload],
preprocessors []PreProcessor,
postprocessor PostProcessor,
runner Runner,
processLimit time.Duration,
logger *log.Logger,
) *Observer[ocr2keepers.UpkeepPayload] {
return &Observer[ocr2keepers.UpkeepPayload]{
) *Observer {
return &Observer{
lggr: logger,
Preprocessors: preprocessors,
Postprocessor: postprocessor,
Expand All @@ -57,25 +57,8 @@ func NewRunnableObserver(
}
}

// NewGenericObserver creates a new Observer with the given pre-processors, post-processor, and runner
func NewGenericObserver[T any](
preprocessors []PreProcessor[T],
postprocessor PostProcessor[T],
processor func(context.Context, ...T) ([]ocr2keepers.CheckResult, error),
processLimit time.Duration,
logger *log.Logger,
) *Observer[T] {
return &Observer[T]{
lggr: logger,
Preprocessors: preprocessors,
Postprocessor: postprocessor,
processFunc: processor,
processTimeLimit: processLimit,
}
}

// Process - receives a tick and runs it through the eligibility pipeline. Calls all pre-processors, runs the check pipeline, and calls the post-processor.
func (o *Observer[T]) Process(ctx context.Context, tick tickers.Tick[[]T]) error {
func (o *Observer) Process(ctx context.Context, tick tickers.Tick[[]ocr2keepers.UpkeepPayload]) error {
pCtx, cancel := context.WithTimeout(ctx, o.processTimeLimit)

defer cancel()
Expand Down
Loading

0 comments on commit e081699

Please sign in to comment.