Skip to content

Commit

Permalink
chore: reduce duplication
Browse files Browse the repository at this point in the history
  • Loading branch information
makramkd committed Nov 11, 2023
1 parent 654c261 commit 95d59f0
Showing 1 changed file with 34 additions and 99 deletions.
133 changes: 34 additions & 99 deletions core/services/vrf/v2/listener_v2_log_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/services/vrf/vrfcommon"
"github.com/smartcontractkit/chainlink/v2/core/utils"
Expand Down Expand Up @@ -96,6 +97,9 @@ func (lsn *listenerV2) initializeLastProcessedBlock() (lastProcessedBlock int64,
return 0, errors.Wrap(err, "LogPoller.LatestBlock()")
}

ll := lsn.l.With("latestFinalizedBlock", latestBlock.FinalizedBlockNumber, "latestBlock", latestBlock.BlockNumber)
ll.Infow("Initializing last processed block")

fromTimestamp := time.Now().UTC().Add(-lsn.job.VRFSpec.RequestTimeout)
// get randomness requested logs with the appropriate keyhash
// keyhash is specified in topic1
Expand All @@ -122,45 +126,7 @@ func (lsn *listenerV2) initializeLastProcessedBlock() (lastProcessedBlock int64,
return 0, errors.Wrap(err, "LogPoller.LogsCreatedAfter RandomWordsFulfilled logs")
}

var (
fulfilled = make(map[string]RandomWordsFulfilled)
requested = make(map[string]RandomWordsRequested)
)
var errs error
for _, l := range requests {
if l.EventSig == lsn.coordinator.RandomWordsRequestedTopic() {
parsed, err2 := lsn.coordinator.ParseRandomWordsRequested(l.ToGethLog())
if err2 != nil {
errs = multierr.Append(errs, err2)
continue
}
requested[parsed.RequestID().String()] = parsed
} else {
errs = multierr.Append(errs, errors.Errorf("unexpected event sig %s", l.EventSig))
}
}
for _, l := range fulfillments {
if l.EventSig == lsn.coordinator.RandomWordsFulfilledTopic() {
parsed, err2 := lsn.coordinator.ParseRandomWordsFulfilled(l.ToGethLog())
if err2 != nil {
errs = multierr.Append(errs, err2)
continue
}
fulfilled[parsed.RequestID().String()] = parsed
} else {
errs = multierr.Append(errs, errors.Errorf("unexpected event sig %s", l.EventSig))
}
}
if err != nil {
return 0, errors.Wrap(err, "LogPoller.LogsCreatedAfter")
}
// figure out the earliest block number with an unfulfilled request
var unfulfilled []RandomWordsRequested
for reqID, req := range requested {
if _, ok := fulfilled[reqID]; !ok {
unfulfilled = append(unfulfilled, req)
}
}
unfulfilled, _, _ := lsn.getUnfulfilled(append(requests, fulfillments...), ll)
var earliestUnfulfilledBlock int64 = math.MaxInt64
for _, req := range unfulfilled {
if req.Raw().BlockNumber < uint64(earliestUnfulfilledBlock) {
Expand All @@ -179,6 +145,7 @@ func (lsn *listenerV2) initializeLastProcessedBlock() (lastProcessedBlock int64,

func (lsn *listenerV2) updateLastProcessedBlock(ctx context.Context, currLastProcessedBlock int64) (lastProcessedBlock int64, err error) {
lp := lsn.chain.LogPoller()
ll := lsn.l.With("currLastProcessedBlock", currLastProcessedBlock)

latestBlock, err := lp.LatestBlock(pg.WithParentCtx(ctx))
if err != nil {
Expand All @@ -197,58 +164,21 @@ func (lsn *listenerV2) updateLastProcessedBlock(ctx context.Context, currLastPro
return currLastProcessedBlock, errors.Wrap(err, "LogPoller.LogsWithSigs")
}

var (
fulfilled = make(map[string]RandomWordsFulfilled)
requested = make(map[string]RandomWordsRequested)
requestedLP = make(map[string]logpoller.Log)
errs error
)
for _, l := range logs {
if l.EventSig == lsn.coordinator.RandomWordsFulfilledTopic() {
parsed, err2 := lsn.coordinator.ParseRandomWordsFulfilled(l.ToGethLog())
if err2 != nil {
errs = multierr.Append(errs, err2)
continue
}
fulfilled[parsed.RequestID().String()] = parsed
} else if l.EventSig == lsn.coordinator.RandomWordsRequestedTopic() {
parsed, err2 := lsn.coordinator.ParseRandomWordsRequested(l.ToGethLog())
if err2 != nil {
errs = multierr.Append(errs, err2)
continue
}
keyHash := parsed.KeyHash()
expectedKeyHash := lsn.job.VRFSpec.PublicKey.MustHash()
if !bytes.Equal(keyHash[:], expectedKeyHash[:]) {
// wrong keyhash, can ignore
continue
}
requested[parsed.RequestID().String()] = parsed
requestedLP[parsed.RequestID().String()] = l
}
}
// should never happen, unsure if recoverable
// may be worth a panic
if errs != nil {
lsn.l.Errorw("encountered parse errors", "err", err)
}

unfulfilled, _, _ := lsn.getUnfulfilled(logs, ll)
// find request block of earliest unfulfilled request
var earliestUnprocessedRequestBlock int64 = math.MaxInt64
for reqID, req := range requested {
if _, isFulfilled := fulfilled[reqID]; !isFulfilled {
if req.Raw().BlockNumber < uint64(earliestUnprocessedRequestBlock) {
earliestUnprocessedRequestBlock = int64(req.Raw().BlockNumber)
}
for _, req := range unfulfilled {
if req.Raw().BlockNumber < uint64(earliestUnprocessedRequestBlock) {
earliestUnprocessedRequestBlock = int64(req.Raw().BlockNumber)
}
}

// cases:
// 1. pending requests exist, earliestUnprocessedRequestBlock < latestBlock.FinalizedBlockNumber
// 2. pending requests exist, earliestUnprocessedRequestBlock > latestBlock.FinalizedBlockNumber
// 3. no pending requests, earliestUnprocessedRequestBlock == 0
// 3. no pending requests, earliestUnprocessedRequestBlock == math.MaxInt64
// case 2 or 3
if earliestUnprocessedRequestBlock == math.MaxInt64 || earliestUnprocessedRequestBlock > int64(latestBlock.FinalizedBlockNumber) {
if earliestUnprocessedRequestBlock == math.MaxInt64 || earliestUnprocessedRequestBlock > latestBlock.FinalizedBlockNumber {
return latestBlock.FinalizedBlockNumber, nil
}
// case 1
Expand Down Expand Up @@ -282,28 +212,43 @@ func (lsn *listenerV2) pollLogs(ctx context.Context, minConfs uint32, lastProces
return nil, errors.Wrap(err, "LogPoller.LogsWithSigs")
}

unfulfilled, unfulfilledLP, fulfilled := lsn.getUnfulfilled(logs, ll)
if len(unfulfilled) > 0 {
ll.Debugw("found unfulfilled logs", "unfulfilled", len(unfulfilled))
} else {
ll.Debugw("no unfulfilled logs found")
}

lsn.handleFulfilled(fulfilled)

return lsn.handleRequested(unfulfilled, unfulfilledLP, minConfs), nil
}

func (lsn *listenerV2) getUnfulfilled(logs []logpoller.Log, ll logger.Logger) (unfulfilled []RandomWordsRequested, unfulfilledLP []logpoller.Log, fulfilled map[string]RandomWordsFulfilled) {
var (
fulfilled = make(map[string]RandomWordsFulfilled)
requested = make(map[string]RandomWordsRequested)
requestedLP = make(map[string]logpoller.Log)
errs error
requested = make(map[string]RandomWordsRequested)
requestedLP = make(map[string]logpoller.Log)
errs error
expectedKeyHash = lsn.job.VRFSpec.PublicKey.MustHash()
)
fulfilled = make(map[string]RandomWordsFulfilled)
for _, l := range logs {
if l.EventSig == lsn.coordinator.RandomWordsFulfilledTopic() {
parsed, err2 := lsn.coordinator.ParseRandomWordsFulfilled(l.ToGethLog())
if err2 != nil {
// should never happen
errs = multierr.Append(errs, err2)
continue
}
fulfilled[parsed.RequestID().String()] = parsed
} else if l.EventSig == lsn.coordinator.RandomWordsRequestedTopic() {
parsed, err2 := lsn.coordinator.ParseRandomWordsRequested(l.ToGethLog())
if err2 != nil {
// should never happen
errs = multierr.Append(errs, err2)
continue
}
keyHash := parsed.KeyHash()
expectedKeyHash := lsn.job.VRFSpec.PublicKey.MustHash()
if !bytes.Equal(keyHash[:], expectedKeyHash[:]) {
// wrong keyhash, can ignore
continue
Expand All @@ -315,7 +260,7 @@ func (lsn *listenerV2) pollLogs(ctx context.Context, minConfs uint32, lastProces
// should never happen, unsure if recoverable
// may be worth a panic
if errs != nil {
ll.Errorw("encountered parse errors", "err", err)
ll.Errorw("encountered parse errors", "err", errs)
}

if len(fulfilled) > 0 || len(requested) > 0 {
Expand All @@ -325,24 +270,14 @@ func (lsn *listenerV2) pollLogs(ctx context.Context, minConfs uint32, lastProces
}

// find unfulfilled requests by comparing requested events with the fulfilled events
var unfulfilled []RandomWordsRequested
var unfulfilledLP []logpoller.Log
for reqID, req := range requested {
if _, isFulfilled := fulfilled[reqID]; !isFulfilled {
unfulfilled = append(unfulfilled, req)
unfulfilledLP = append(unfulfilledLP, requestedLP[reqID])
}
}

if len(unfulfilled) > 0 {
ll.Debugw("found unfulfilled logs", "unfulfilled", len(unfulfilled))
} else {
ll.Debugw("no unfulfilled logs found")
}

lsn.handleFulfilled(fulfilled)

return lsn.handleRequested(unfulfilled, unfulfilledLP, minConfs), nil
return unfulfilled, unfulfilledLP, fulfilled
}

func (lsn *listenerV2) getConfirmedAt(req RandomWordsRequested, nodeMinConfs uint32) uint64 {
Expand Down

0 comments on commit 95d59f0

Please sign in to comment.