Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spike: v4 workflow refactor #318

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 0 additions & 146 deletions pkg/v3/flows/conditional.go

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package flows
package conditional

import (
"context"
Expand Down Expand Up @@ -83,7 +83,7 @@ func TestConditionalFinalization(t *testing.T) {
// set the ticker time lower to reduce the test time
interval := 50 * time.Millisecond
pre := []ocr2keepersv3.PreProcessor[common.UpkeepPayload]{coord}
svc := newFinalConditionalFlow(pre, rStore, runner, interval, proposalQ, payloadBuilder, retryQ, upkeepStateUpdater, logger)
svc := NewFinalConditionalFlow(pre, rStore, runner, interval, proposalQ, payloadBuilder, retryQ, upkeepStateUpdater, logger)

var wg sync.WaitGroup
wg.Add(1)
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestSamplingProposal(t *testing.T) {
upkeepProvider.On("GetActiveUpkeeps", mock.Anything).Return([]common.UpkeepPayload{}, nil)
// set the ticker time lower to reduce the test time
pre := []ocr2keepersv3.PreProcessor[common.UpkeepPayload]{coord}
svc := newSampleProposalFlow(pre, ratio, upkeepProvider, mStore, runner, time.Millisecond*100, logger)
svc := NewSampleProposalFlow(pre, ratio, upkeepProvider, mStore, runner, time.Millisecond*100, logger)

var wg sync.WaitGroup
wg.Add(1)
Expand Down
103 changes: 103 additions & 0 deletions pkg/v3/flows/conditional/final_flow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package conditional

import (
"context"
"fmt"
"log"
"time"

ocr2keepersv3 "github.com/smartcontractkit/chainlink-automation/pkg/v3"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/postprocessors"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/service"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/telemetry"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/tickers"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/types"
common "github.com/smartcontractkit/chainlink-common/pkg/types/automation"
)

const (
// This is the ticker interval for final conditional flow
finalConditionalInterval = 1 * time.Second
// These are the maximum number of conditional upkeeps dequeued on every tick from proposal queue in FinalConditionalFlow
// This is kept same as OutcomeSurfacedProposalsLimit as those many can get enqueued by plugin in every round
finalConditionalBatchSize = 50

observationProcessLimit = 20 * time.Second
)

func NewFinalConditionalFlow(
preprocessors []ocr2keepersv3.PreProcessor,
resultStore types.ResultStore,
runner ocr2keepersv3.Runner,
proposalQ types.ProposalQueue,
builder common.PayloadBuilder,
retryQ types.RetryQueue,
stateUpdater common.UpkeepStateUpdater,
logger *log.Logger,
) service.Recoverable {
post := postprocessors.NewCombinedPostprocessor(
postprocessors.NewEligiblePostProcessor(resultStore, telemetry.WrapLogger(logger, "conditional-final-eligible-postprocessor")),
postprocessors.NewRetryablePostProcessor(retryQ, telemetry.WrapLogger(logger, "conditional-final-retryable-postprocessor")),
)
// create observer that only pushes results to result stores. everything at
// this point can be dropped. this process is only responsible for running
// conditional proposals that originate from network agreements
observer := ocr2keepersv3.NewRunnableObserver(
preprocessors,
post,
runner,
observationProcessLimit,
log.New(logger.Writer(), fmt.Sprintf("[%s | conditional-final-observer]", telemetry.ServiceName), telemetry.LogPkgStdFlags),
)

getterFn := func(ctx context.Context, _ time.Time) (tickers.Tick, error) {
return coordinatedProposalsTick{
logger: logger,
builder: builder,
q: proposalQ,
utype: types.ConditionTrigger,
batchSize: finalConditionalBatchSize,
}, nil
}

ticker := tickers.NewTimeTicker(finalConditionalInterval, observer, getterFn, log.New(logger.Writer(), fmt.Sprintf("[%s | conditional-final-ticker]", telemetry.ServiceName), telemetry.LogPkgStdFlags))

return ticker
}

// coordinatedProposalsTick is used to push proposals from the proposal queue to some observer
type coordinatedProposalsTick struct {
logger *log.Logger
builder common.PayloadBuilder
q types.ProposalQueue
utype types.UpkeepType
batchSize int
}

func (t coordinatedProposalsTick) Value(ctx context.Context) ([]common.UpkeepPayload, error) {
if t.q == nil {
return nil, nil
}

proposals, err := t.q.Dequeue(t.utype, t.batchSize)
if err != nil {
return nil, fmt.Errorf("failed to dequeue from retry queue: %w", err)
}
t.logger.Printf("%d proposals returned from queue", len(proposals))

builtPayloads, err := t.builder.BuildPayloads(ctx, proposals...)
if err != nil {
return nil, fmt.Errorf("failed to build payloads from proposals: %w", err)
}
payloads := []common.UpkeepPayload{}
filtered := 0
for _, p := range builtPayloads {
if p.IsEmpty() {
filtered++
continue
}
payloads = append(payloads, p)
}
t.logger.Printf("%d payloads built from %d proposals, %d filtered", len(payloads), len(proposals), filtered)
return payloads, nil
}
97 changes: 97 additions & 0 deletions pkg/v3/flows/conditional/proposal_flow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package conditional

import (
"context"
"fmt"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/random"
"log"
"time"

ocr2keepersv3 "github.com/smartcontractkit/chainlink-automation/pkg/v3"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/postprocessors"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/preprocessors"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/service"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/telemetry"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/tickers"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/types"
common "github.com/smartcontractkit/chainlink-common/pkg/types/automation"
)

const (
// This is the ticker interval for sampling conditional flow
samplingConditionInterval = 3 * time.Second
// Maximum number of upkeeps to be sampled in every round
maxSampledConditionals = 300
)

type shuffler[T any] interface {
Shuffle([]T) []T
}

type sampler struct {
logger *log.Logger

ratio types.Ratio
getter common.ConditionalUpkeepProvider
shuffler shuffler[common.UpkeepPayload]
}

func NewSampleProposalFlow(
pre []ocr2keepersv3.PreProcessor,
ratio types.Ratio,
getter common.ConditionalUpkeepProvider,
metadataStore types.MetadataStore,
runner ocr2keepersv3.Runner,
logger *log.Logger,
) service.Recoverable {
pre = append(pre, preprocessors.NewProposalFilterer(metadataStore, types.LogTrigger))
postprocessors := postprocessors.NewAddProposalToMetadataStorePostprocessor(metadataStore)

observer := ocr2keepersv3.NewRunnableObserver(
pre,
postprocessors,
runner,
observationProcessLimit,
log.New(logger.Writer(), fmt.Sprintf("[%s | sample-proposal-observer]", telemetry.ServiceName), telemetry.LogPkgStdFlags),
)

getterFn := func(ctx context.Context, _ time.Time) (tickers.Tick, error) {
return &sampler{
logger: logger,
getter: getter,
ratio: ratio,
shuffler: random.Shuffler[common.UpkeepPayload]{Source: random.NewCryptoRandSource()},
}, nil
}

lggrPrefix := fmt.Sprintf("[%s | sample-proposal-ticker]", telemetry.ServiceName)
lggr := log.New(logger.Writer(), lggrPrefix, telemetry.LogPkgStdFlags)

return tickers.NewTimeTicker(samplingConditionInterval, observer, getterFn, lggr)
}

func (s *sampler) Value(ctx context.Context) ([]common.UpkeepPayload, error) {
upkeeps, err := s.getter.GetActiveUpkeeps(ctx)
if err != nil {
return nil, err
}
if len(upkeeps) == 0 {
return nil, nil
}

upkeeps = s.shuffler.Shuffle(upkeeps)
size := s.ratio.OfInt(len(upkeeps))

if size <= 0 {
return nil, nil
}
if size > maxSampledConditionals {
s.logger.Printf("Required sample size %d exceeds max allowed conditional samples %d, limiting to max", size, maxSampledConditionals)
size = maxSampledConditionals
}
if len(upkeeps) < size {
size = len(upkeeps)
}
s.logger.Printf("sampled %d upkeeps", size)
return upkeeps[:size], nil
}
Loading
Loading