Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automation v2.1: fixes and leftovers #10599

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewBlockSubscriber(hb httypes.HeadBroadcaster, lp logpoller.LogPoller, lggr
blockHistorySize: blockHistorySize,
blockSize: lookbackDepth,
latestBlock: atomic.Pointer[ocr2keepers.BlockKey]{},
lggr: lggr.Named("BlockSubscriber"),
lggr: lggr.Named(BlockSubscriberServiceName),
amirylm marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ type logEventBuffer struct {

func newLogEventBuffer(lggr logger.Logger, size, maxBlockLogs, maxUpkeepLogsPerBlock int) *logEventBuffer {
return &logEventBuffer{
lggr: lggr.Named("KeepersRegistry.LogEventBuffer"),
lggr: lggr.Named("LogEventBuffer"),
amirylm marked this conversation as resolved.
Show resolved Hide resolved
size: int32(size),
blocks: make([]fetchedBlock, size),
maxBlockLogs: maxBlockLogs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func TestIntegration_LogEventProvider_UpdateConfig(t *testing.T) {
TriggerConfig: cfg,
UpdateBlock: bn.Uint64() - 1,
})
require.Error(t, err)
require.NoError(t, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like the opposite assert now. not sure why this needs to change with the naming changes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will log (debug) rather than returning an error, therefore the error check is irrelevant

// new block
b, err = ethClient.BlockByHash(ctx, backend.Commit())
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ type logEventProvider struct {
func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDataPacker, filterStore UpkeepFilterStore, opts LogTriggersOptions) *logEventProvider {
return &logEventProvider{
threadCtrl: utils.NewThreadControl(),
lggr: lggr.Named("KeepersRegistry.LogEventProvider"),
lggr: lggr.Named(LogProviderServiceName),
packer: packer,
buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), maxLogsPerBlock, maxLogsPerUpkeepInBlock),
poller: poller,
Expand All @@ -121,13 +121,14 @@ func (p *logEventProvider) Start(context.Context) error {
p.lggr.Infow("starting log event provider", "readInterval", p.opts.ReadInterval, "readMaxBatchSize", readMaxBatchSize, "readers", readerThreads)

for i := 0; i < readerThreads; i++ {
tid := i + 1
p.threadCtrl.Go(func(ctx context.Context) {
p.startReader(ctx, readQ)
p.startReader(ctx, readQ, tid)
})
}

p.threadCtrl.Go(func(ctx context.Context) {
lggr := p.lggr.With("where", "scheduler")
lggr := p.lggr.Named("Scheduler")

p.scheduleReadJobs(ctx, func(ids []*big.Int) {
select {
Expand All @@ -151,7 +152,7 @@ func (p *logEventProvider) Close() error {
}

func (p *logEventProvider) HealthReport() map[string]error {
return map[string]error{LogProviderServiceName: p.Healthy()}
return map[string]error{p.lggr.Name(): p.Healthy()}
}

func (p *logEventProvider) GetLatestPayloads(ctx context.Context) ([]ocr2keepers.UpkeepPayload, error) {
Expand Down Expand Up @@ -251,11 +252,11 @@ func (p *logEventProvider) scheduleReadJobs(pctx context.Context, execute func([
}

// startReader starts a reader that reads logs from the ids coming from readQ.
func (p *logEventProvider) startReader(pctx context.Context, readQ <-chan []*big.Int) {
func (p *logEventProvider) startReader(pctx context.Context, readQ <-chan []*big.Int, tid int) {
ctx, cancel := context.WithCancel(pctx)
defer cancel()

lggr := p.lggr.With("where", "reader")
lggr := p.lggr.Named(fmt.Sprintf("ReaderThread-%d", tid))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably tid is thread ID here? Could we perhaps rename the function to startReaderThread?


for {
select {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ func (p *logEventProvider) RegisterFilter(ctx context.Context, opts FilterOption
if currentFilter != nil {
if currentFilter.configUpdateBlock > opts.UpdateBlock {
// already registered with a config from a higher block number
return fmt.Errorf("filter for upkeep with id %s already registered with newer config", upkeepID.String())
p.lggr.Debugf("filter for upkeep with id %s already registered with newer config", upkeepID.String())
return nil
} else if currentFilter.configUpdateBlock == opts.UpdateBlock {
// already registered with the same config
p.lggr.Debugf("filter for upkeep with id %s already registered with the same config", upkeepID.String())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestLogEventProvider_LifeCycle(t *testing.T) {
},
{
"existing config with old block",
true,
false,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't expect you to change this, but maybe in future as a team we should strive to use field names against these values, just for more context on what they represent if anything, especially in code reviews

big.NewInt(111),
LogTriggerConfig{
ContractAddress: common.BytesToAddress(common.LeftPadBytes([]byte{1, 2, 3, 4}, 20)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (r *logRecoverer) Close() error {
}

func (r *logRecoverer) HealthReport() map[string]error {
return map[string]error{LogRecovererServiceName: r.Healthy()}
return map[string]error{r.lggr.Name(): r.Healthy()}
}

func (r *logRecoverer) GetProposalData(ctx context.Context, proposal ocr2keepers.CoordinatedBlockProposal) ([]byte, error) {
Expand Down Expand Up @@ -572,7 +572,7 @@ func (r *logRecoverer) clean(ctx context.Context) {
}
}
r.lock.RUnlock()
lggr := r.lggr.With("where", "clean")
lggr := r.lggr.Named("Cleaner")
if len(expired) == 0 {
lggr.Debug("no expired upkeeps")
return
Expand All @@ -595,7 +595,7 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) error {
if err != nil {
return fmt.Errorf("failed to get states: %w", err)
}
lggr := r.lggr.With("where", "clean")
lggr := r.lggr.Named("TryExpire")
amirylm marked this conversation as resolved.
Show resolved Hide resolved
start, _ := r.getRecoveryWindow(latestBlock)
r.lock.Lock()
defer r.lock.Unlock()
Expand Down
12 changes: 6 additions & 6 deletions core/services/ocr2/plugins/ocr2keeper/evm21/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const (
)

var (
RegistryServiceName = "AutomationRegistry"
RegistryServiceName = "EvmRegistry"
amirylm marked this conversation as resolved.
Show resolved Hide resolved

ErrLogReadFailure = fmt.Errorf("failure reading logs")
ErrHeadNotAvailable = fmt.Errorf("head not available")
Expand Down Expand Up @@ -87,7 +87,7 @@ func NewEvmRegistry(
return &EvmRegistry{
ctx: context.Background(),
threadCtrl: utils.NewThreadControl(),
lggr: lggr.Named("EvmRegistry"),
lggr: lggr.Named(RegistryServiceName),
poller: client.LogPoller(),
addr: addr,
client: client.Client(),
Expand Down Expand Up @@ -165,7 +165,7 @@ func (r *EvmRegistry) Start(ctx context.Context) error {
}

r.threadCtrl.Go(func(ctx context.Context) {
lggr := r.lggr.With("where", "upkeeps_referesh")
lggr := r.lggr.Named("UpkeepRefreshThread")
err := r.refreshActiveUpkeeps()
if err != nil {
lggr.Errorf("failed to initialize upkeeps", err)
Expand All @@ -188,7 +188,7 @@ func (r *EvmRegistry) Start(ctx context.Context) error {
})

r.threadCtrl.Go(func(ctx context.Context) {
lggr := r.lggr.With("where", "logs_polling")
lggr := r.lggr.Named("LogPollingThread")
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

Expand All @@ -206,7 +206,7 @@ func (r *EvmRegistry) Start(ctx context.Context) error {
})

r.threadCtrl.Go(func(ctx context.Context) {
lggr := r.lggr.With("where", "logs_processing")
lggr := r.lggr.Named("LogProcessingThread")
ch := r.chLog

for {
Expand Down Expand Up @@ -234,7 +234,7 @@ func (r *EvmRegistry) Close() error {
}

func (r *EvmRegistry) HealthReport() map[string]error {
return map[string]error{RegistryServiceName: r.Healthy()}
return map[string]error{r.lggr.Name(): r.Healthy()}
}

func (r *EvmRegistry) refreshActiveUpkeeps() error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type UpkeepPrivilegeConfig struct {

// streamsLookup looks through check upkeep results looking for any that need off chain lookup
func (r *EvmRegistry) streamsLookup(ctx context.Context, checkResults []ocr2keepers.CheckResult) []ocr2keepers.CheckResult {
lggr := r.lggr.With("where", "StreamsLookup")
lggr := r.lggr.Named("StreamsLookup")
lookups := map[int]*StreamsLookup{}
for i, res := range checkResults {
if res.IneligibilityReason != uint8(encoding.UpkeepFailureReasonTargetCheckReverted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (u *upkeepStateStore) Close() error {
}

func (u *upkeepStateStore) HealthReport() map[string]error {
return map[string]error{UpkeepStateStoreServiceName: u.Healthy()}
return map[string]error{u.lggr.Name(): u.Healthy()}
}

// SelectByWorkIDs returns the current state of the upkeep for the provided ids.
Expand Down
Loading