From 95d59f07339777b4fc6fd536254907ca3975cf2f Mon Sep 17 00:00:00 2001 From: Makram Kamaleddine Date: Sat, 11 Nov 2023 20:56:03 +0200 Subject: [PATCH] chore: reduce duplication --- .../vrf/v2/listener_v2_log_listener.go | 133 +++++------------- 1 file changed, 34 insertions(+), 99 deletions(-) diff --git a/core/services/vrf/v2/listener_v2_log_listener.go b/core/services/vrf/v2/listener_v2_log_listener.go index 7e11553f96e..37e233810e9 100644 --- a/core/services/vrf/v2/listener_v2_log_listener.go +++ b/core/services/vrf/v2/listener_v2_log_listener.go @@ -13,6 +13,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/services/vrf/vrfcommon" "github.com/smartcontractkit/chainlink/v2/core/utils" @@ -96,6 +97,9 @@ func (lsn *listenerV2) initializeLastProcessedBlock() (lastProcessedBlock int64, return 0, errors.Wrap(err, "LogPoller.LatestBlock()") } + ll := lsn.l.With("latestFinalizedBlock", latestBlock.FinalizedBlockNumber, "latestBlock", latestBlock.BlockNumber) + ll.Infow("Initializing last processed block") + fromTimestamp := time.Now().UTC().Add(-lsn.job.VRFSpec.RequestTimeout) // get randomness requested logs with the appropriate keyhash // keyhash is specified in topic1 @@ -122,45 +126,7 @@ func (lsn *listenerV2) initializeLastProcessedBlock() (lastProcessedBlock int64, return 0, errors.Wrap(err, "LogPoller.LogsCreatedAfter RandomWordsFulfilled logs") } - var ( - fulfilled = make(map[string]RandomWordsFulfilled) - requested = make(map[string]RandomWordsRequested) - ) - var errs error - for _, l := range requests { - if l.EventSig == lsn.coordinator.RandomWordsRequestedTopic() { - parsed, err2 := lsn.coordinator.ParseRandomWordsRequested(l.ToGethLog()) - if err2 != nil { - errs = multierr.Append(errs, err2) - continue - } - requested[parsed.RequestID().String()] = parsed - } else { - errs = multierr.Append(errs, errors.Errorf("unexpected event sig %s", l.EventSig)) - } - } - for _, l := range fulfillments { - if l.EventSig == lsn.coordinator.RandomWordsFulfilledTopic() { - parsed, err2 := lsn.coordinator.ParseRandomWordsFulfilled(l.ToGethLog()) - if err2 != nil { - errs = multierr.Append(errs, err2) - continue - } - fulfilled[parsed.RequestID().String()] = parsed - } else { - errs = multierr.Append(errs, errors.Errorf("unexpected event sig %s", l.EventSig)) - } - } - if err != nil { - return 0, errors.Wrap(err, "LogPoller.LogsCreatedAfter") - } - // figure out the earliest block number with an unfulfilled request - var unfulfilled []RandomWordsRequested - for reqID, req := range requested { - if _, ok := fulfilled[reqID]; !ok { - unfulfilled = append(unfulfilled, req) - } - } + unfulfilled, _, _ := lsn.getUnfulfilled(append(requests, fulfillments...), ll) var earliestUnfulfilledBlock int64 = math.MaxInt64 for _, req := range unfulfilled { if req.Raw().BlockNumber < uint64(earliestUnfulfilledBlock) { @@ -179,6 +145,7 @@ func (lsn *listenerV2) initializeLastProcessedBlock() (lastProcessedBlock int64, func (lsn *listenerV2) updateLastProcessedBlock(ctx context.Context, currLastProcessedBlock int64) (lastProcessedBlock int64, err error) { lp := lsn.chain.LogPoller() + ll := lsn.l.With("currLastProcessedBlock", currLastProcessedBlock) latestBlock, err := lp.LatestBlock(pg.WithParentCtx(ctx)) if err != nil { @@ -197,58 +164,21 @@ func (lsn *listenerV2) updateLastProcessedBlock(ctx context.Context, currLastPro return currLastProcessedBlock, errors.Wrap(err, "LogPoller.LogsWithSigs") } - var ( - fulfilled = make(map[string]RandomWordsFulfilled) - requested = make(map[string]RandomWordsRequested) - requestedLP = make(map[string]logpoller.Log) - errs error - ) - for _, l := range logs { - if l.EventSig == lsn.coordinator.RandomWordsFulfilledTopic() { - parsed, err2 := lsn.coordinator.ParseRandomWordsFulfilled(l.ToGethLog()) - if err2 != nil { - errs = multierr.Append(errs, err2) - continue - } - fulfilled[parsed.RequestID().String()] = parsed - } else if l.EventSig == lsn.coordinator.RandomWordsRequestedTopic() { - parsed, err2 := lsn.coordinator.ParseRandomWordsRequested(l.ToGethLog()) - if err2 != nil { - errs = multierr.Append(errs, err2) - continue - } - keyHash := parsed.KeyHash() - expectedKeyHash := lsn.job.VRFSpec.PublicKey.MustHash() - if !bytes.Equal(keyHash[:], expectedKeyHash[:]) { - // wrong keyhash, can ignore - continue - } - requested[parsed.RequestID().String()] = parsed - requestedLP[parsed.RequestID().String()] = l - } - } - // should never happen, unsure if recoverable - // may be worth a panic - if errs != nil { - lsn.l.Errorw("encountered parse errors", "err", err) - } - + unfulfilled, _, _ := lsn.getUnfulfilled(logs, ll) // find request block of earliest unfulfilled request var earliestUnprocessedRequestBlock int64 = math.MaxInt64 - for reqID, req := range requested { - if _, isFulfilled := fulfilled[reqID]; !isFulfilled { - if req.Raw().BlockNumber < uint64(earliestUnprocessedRequestBlock) { - earliestUnprocessedRequestBlock = int64(req.Raw().BlockNumber) - } + for _, req := range unfulfilled { + if req.Raw().BlockNumber < uint64(earliestUnprocessedRequestBlock) { + earliestUnprocessedRequestBlock = int64(req.Raw().BlockNumber) } } // cases: // 1. pending requests exist, earliestUnprocessedRequestBlock < latestBlock.FinalizedBlockNumber // 2. pending requests exist, earliestUnprocessedRequestBlock > latestBlock.FinalizedBlockNumber - // 3. no pending requests, earliestUnprocessedRequestBlock == 0 + // 3. no pending requests, earliestUnprocessedRequestBlock == math.MaxInt64 // case 2 or 3 - if earliestUnprocessedRequestBlock == math.MaxInt64 || earliestUnprocessedRequestBlock > int64(latestBlock.FinalizedBlockNumber) { + if earliestUnprocessedRequestBlock == math.MaxInt64 || earliestUnprocessedRequestBlock > latestBlock.FinalizedBlockNumber { return latestBlock.FinalizedBlockNumber, nil } // case 1 @@ -282,16 +212,31 @@ func (lsn *listenerV2) pollLogs(ctx context.Context, minConfs uint32, lastProces return nil, errors.Wrap(err, "LogPoller.LogsWithSigs") } + unfulfilled, unfulfilledLP, fulfilled := lsn.getUnfulfilled(logs, ll) + if len(unfulfilled) > 0 { + ll.Debugw("found unfulfilled logs", "unfulfilled", len(unfulfilled)) + } else { + ll.Debugw("no unfulfilled logs found") + } + + lsn.handleFulfilled(fulfilled) + + return lsn.handleRequested(unfulfilled, unfulfilledLP, minConfs), nil +} + +func (lsn *listenerV2) getUnfulfilled(logs []logpoller.Log, ll logger.Logger) (unfulfilled []RandomWordsRequested, unfulfilledLP []logpoller.Log, fulfilled map[string]RandomWordsFulfilled) { var ( - fulfilled = make(map[string]RandomWordsFulfilled) - requested = make(map[string]RandomWordsRequested) - requestedLP = make(map[string]logpoller.Log) - errs error + requested = make(map[string]RandomWordsRequested) + requestedLP = make(map[string]logpoller.Log) + errs error + expectedKeyHash = lsn.job.VRFSpec.PublicKey.MustHash() ) + fulfilled = make(map[string]RandomWordsFulfilled) for _, l := range logs { if l.EventSig == lsn.coordinator.RandomWordsFulfilledTopic() { parsed, err2 := lsn.coordinator.ParseRandomWordsFulfilled(l.ToGethLog()) if err2 != nil { + // should never happen errs = multierr.Append(errs, err2) continue } @@ -299,11 +244,11 @@ func (lsn *listenerV2) pollLogs(ctx context.Context, minConfs uint32, lastProces } else if l.EventSig == lsn.coordinator.RandomWordsRequestedTopic() { parsed, err2 := lsn.coordinator.ParseRandomWordsRequested(l.ToGethLog()) if err2 != nil { + // should never happen errs = multierr.Append(errs, err2) continue } keyHash := parsed.KeyHash() - expectedKeyHash := lsn.job.VRFSpec.PublicKey.MustHash() if !bytes.Equal(keyHash[:], expectedKeyHash[:]) { // wrong keyhash, can ignore continue @@ -315,7 +260,7 @@ func (lsn *listenerV2) pollLogs(ctx context.Context, minConfs uint32, lastProces // should never happen, unsure if recoverable // may be worth a panic if errs != nil { - ll.Errorw("encountered parse errors", "err", err) + ll.Errorw("encountered parse errors", "err", errs) } if len(fulfilled) > 0 || len(requested) > 0 { @@ -325,8 +270,6 @@ func (lsn *listenerV2) pollLogs(ctx context.Context, minConfs uint32, lastProces } // find unfulfilled requests by comparing requested events with the fulfilled events - var unfulfilled []RandomWordsRequested - var unfulfilledLP []logpoller.Log for reqID, req := range requested { if _, isFulfilled := fulfilled[reqID]; !isFulfilled { unfulfilled = append(unfulfilled, req) @@ -334,15 +277,7 @@ func (lsn *listenerV2) pollLogs(ctx context.Context, minConfs uint32, lastProces } } - if len(unfulfilled) > 0 { - ll.Debugw("found unfulfilled logs", "unfulfilled", len(unfulfilled)) - } else { - ll.Debugw("no unfulfilled logs found") - } - - lsn.handleFulfilled(fulfilled) - - return lsn.handleRequested(unfulfilled, unfulfilledLP, minConfs), nil + return unfulfilled, unfulfilledLP, fulfilled } func (lsn *listenerV2) getConfirmedAt(req RandomWordsRequested, nodeMinConfs uint32) uint64 {