Skip to content

Commit

Permalink
feature: log poller for vrf v2/+
Browse files Browse the repository at this point in the history
  • Loading branch information
makramkd committed Nov 11, 2023
1 parent 4a6f2fe commit 654c261
Show file tree
Hide file tree
Showing 16 changed files with 2,133 additions and 1,780 deletions.
57 changes: 32 additions & 25 deletions core/services/vrf/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ func (d *Delegate) JobType() job.Type {
return job.VRF
}

func (d *Delegate) BeforeJobCreated(spec job.Job) {}
func (d *Delegate) AfterJobCreated(spec job.Job) {}
func (d *Delegate) BeforeJobDeleted(spec job.Job) {}
func (d *Delegate) OnDeleteJob(spec job.Job, q pg.Queryer) error { return nil }
func (d *Delegate) BeforeJobCreated(job.Job) {}
func (d *Delegate) AfterJobCreated(job.Job) {}
func (d *Delegate) BeforeJobDeleted(job.Job) {}
func (d *Delegate) OnDeleteJob(job.Job, pg.Queryer) error { return nil }

// ServicesForSpec satisfies the job.Delegate interface.
func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) {
Expand Down Expand Up @@ -160,24 +160,28 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) {
return nil, errors.Wrap(err2, "NewAggregatorV3Interface")
}

return []job.ServiceCtx{v2.New(
chain.Config().EVM(),
chain.Config().EVM().GasEstimator(),
lV2Plus,
chain,
chain.ID(),
d.q,
v2.NewCoordinatorV2_5(coordinatorV2Plus),
batchCoordinatorV2,
vrfOwner,
aggregator,
d.pr,
d.ks.Eth(),
jb,
d.mailMon,
utils.NewHighCapacityMailbox[log.Broadcast](),
func() {},
vrfcommon.NewLogDeduper(int(chain.Config().EVM().FinalityDepth())))}, nil
return []job.ServiceCtx{
v2.New(
chain.Config().EVM(),
chain.Config().EVM().GasEstimator(),
lV2Plus,
chain,
chain.ID(),
d.q,
v2.NewCoordinatorV2_5(coordinatorV2Plus),
batchCoordinatorV2,
vrfOwner,
aggregator,
d.pr,
d.ks.Eth(),
jb,
func() {},
// the lookback in the deduper must be >= the lookback specified for the log poller
// otherwise we will end up re-delivering logs that were already delivered.
vrfcommon.NewInflightCache(int(chain.Config().EVM().FinalityDepth())),
vrfcommon.NewLogDeduper(int(chain.Config().EVM().FinalityDepth())),
),
}, nil
}
if _, ok := task.(*pipeline.VRFTaskV2); ok {
if err2 := CheckFromAddressesExist(jb, d.ks.Eth()); err != nil {
Expand Down Expand Up @@ -225,10 +229,13 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) {
d.pr,
d.ks.Eth(),
jb,
d.mailMon,
utils.NewHighCapacityMailbox[log.Broadcast](),
func() {},
vrfcommon.NewLogDeduper(int(chain.Config().EVM().FinalityDepth())))}, nil
// the lookback in the deduper must be >= the lookback specified for the log poller
// otherwise we will end up re-delivering logs that were already delivered.
vrfcommon.NewInflightCache(int(chain.Config().EVM().FinalityDepth())),
vrfcommon.NewLogDeduper(int(chain.Config().EVM().FinalityDepth())),
),
}, nil
}
if _, ok := task.(*pipeline.VRFTask); ok {
return []job.ServiceCtx{&v1.Listener{
Expand Down
26 changes: 6 additions & 20 deletions core/services/vrf/v2/bhs_feeder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest/heavyweight"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/services/chainlink"
"github.com/smartcontractkit/chainlink/v2/core/services/vrf/vrfcommon"
"github.com/smartcontractkit/chainlink/v2/core/services/vrf/vrftesthelpers"
"github.com/smartcontractkit/chainlink/v2/core/store/models"

Expand Down Expand Up @@ -63,26 +62,12 @@ func TestStartHeartbeats(t *testing.T) {
heartbeatPeriod := 5 * time.Second

t.Run("bhs_feeder_startheartbeats_happy_path", func(tt *testing.T) {
coordinatorAddress := uni.rootContractAddress
vrfVersion := vrfcommon.V2

app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, uni.backend, keys...)
require.NoError(t, app.Start(testutils.Context(t)))

var (
v2CoordinatorAddress string
v2PlusCoordinatorAddress string
)

if vrfVersion == vrfcommon.V2 {
v2CoordinatorAddress = coordinatorAddress.String()
} else if vrfVersion == vrfcommon.V2Plus {
v2PlusCoordinatorAddress = coordinatorAddress.String()
}

_ = vrftesthelpers.CreateAndStartBHSJob(
t, bhsKeyAddresses, app, uni.bhsContractAddress.String(), "",
v2CoordinatorAddress, v2PlusCoordinatorAddress, "", 0, 200, heartbeatPeriod, 100)
uni.rootContractAddress.String(), "", "", 0, 200, heartbeatPeriod, 100)

// Ensure log poller is ready and has all logs.
require.NoError(t, app.GetRelayers().LegacyEVMChains().Slice()[0].LogPoller().Ready())
Expand All @@ -97,9 +82,10 @@ func TestStartHeartbeats(t *testing.T) {
t.Logf("Sleeping %.2f seconds before checking blockhash in BHS added by BHS_Heartbeats_Service\n", diff.Seconds())
time.Sleep(diff)
// storeEarliest in BHS contract stores blocktip - 256 in the Blockhash Store (BHS)
// before the initTxns:260 txns sent by the loop above, 18 txns are sent by
// newVRFCoordinatorV2Universe method. block tip is initTxns + 18
blockTip := initTxns + 18
verifyBlockhashStored(t, uni.coordinatorV2UniverseCommon, uint64(blockTip-256))
tipHeader, err := uni.backend.HeaderByNumber(testutils.Context(t), nil)
require.NoError(t, err)
// the storeEarliest transaction will end up in a new block, hence the + 1 below.
blockNumberStored := tipHeader.Number.Uint64() - 256 + 1
verifyBlockhashStored(t, uni.coordinatorV2UniverseCommon, blockNumberStored)
})
}
37 changes: 37 additions & 0 deletions core/services/vrf/v2/coordinator_v2x_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var (
type CoordinatorV2_X interface {
Address() common.Address
ParseRandomWordsRequested(log types.Log) (RandomWordsRequested, error)
ParseRandomWordsFulfilled(log types.Log) (RandomWordsFulfilled, error)
RequestRandomWords(opts *bind.TransactOpts, keyHash [32]byte, subID *big.Int, requestConfirmations uint16, callbackGasLimit uint32, numWords uint32, payInEth bool) (*types.Transaction, error)
AddConsumer(opts *bind.TransactOpts, subID *big.Int, consumer common.Address) (*types.Transaction, error)
CreateSubscription(opts *bind.TransactOpts) (*types.Transaction, error)
Expand All @@ -47,6 +48,10 @@ type CoordinatorV2_X interface {
GetCommitment(opts *bind.CallOpts, requestID *big.Int) ([32]byte, error)
Migrate(opts *bind.TransactOpts, subID *big.Int, newCoordinator common.Address) (*types.Transaction, error)
FundSubscriptionWithNative(opts *bind.TransactOpts, subID *big.Int, amount *big.Int) (*types.Transaction, error)
// RandomWordsRequestedTopic returns the log topic of the RandomWordsRequested log
RandomWordsRequestedTopic() common.Hash
// RandomWordsFulfilledTopic returns the log topic of the RandomWordsFulfilled log
RandomWordsFulfilledTopic() common.Hash
}

type coordinatorV2 struct {
Expand All @@ -61,6 +66,14 @@ func NewCoordinatorV2(c *vrf_coordinator_v2.VRFCoordinatorV2) CoordinatorV2_X {
}
}

func (c *coordinatorV2) RandomWordsRequestedTopic() common.Hash {
return vrf_coordinator_v2.VRFCoordinatorV2RandomWordsRequested{}.Topic()
}

func (c *coordinatorV2) RandomWordsFulfilledTopic() common.Hash {
return vrf_coordinator_v2.VRFCoordinatorV2RandomWordsFulfilled{}.Topic()
}

func (c *coordinatorV2) Address() common.Address {
return c.coordinator.Address()
}
Expand All @@ -73,6 +86,14 @@ func (c *coordinatorV2) ParseRandomWordsRequested(log types.Log) (RandomWordsReq
return NewV2RandomWordsRequested(parsed), nil
}

func (c *coordinatorV2) ParseRandomWordsFulfilled(log types.Log) (RandomWordsFulfilled, error) {
parsed, err := c.coordinator.ParseRandomWordsFulfilled(log)
if err != nil {
return nil, err
}
return NewV2RandomWordsFulfilled(parsed), nil
}

func (c *coordinatorV2) RequestRandomWords(opts *bind.TransactOpts, keyHash [32]byte, subID *big.Int, requestConfirmations uint16, callbackGasLimit uint32, numWords uint32, payInEth bool) (*types.Transaction, error) {
return c.coordinator.RequestRandomWords(opts, keyHash, subID.Uint64(), requestConfirmations, callbackGasLimit, numWords)
}
Expand Down Expand Up @@ -187,6 +208,14 @@ func NewCoordinatorV2_5(c vrf_coordinator_v2_5.VRFCoordinatorV25Interface) Coord
}
}

func (c *coordinatorV2_5) RandomWordsRequestedTopic() common.Hash {
return vrf_coordinator_v2plus_interface.IVRFCoordinatorV2PlusInternalRandomWordsRequested{}.Topic()
}

func (c *coordinatorV2_5) RandomWordsFulfilledTopic() common.Hash {
return vrf_coordinator_v2plus_interface.IVRFCoordinatorV2PlusInternalRandomWordsFulfilled{}.Topic()
}

func (c *coordinatorV2_5) Address() common.Address {
return c.coordinator.Address()
}
Expand All @@ -199,6 +228,14 @@ func (c *coordinatorV2_5) ParseRandomWordsRequested(log types.Log) (RandomWordsR
return NewV2_5RandomWordsRequested(parsed), nil
}

func (c *coordinatorV2_5) ParseRandomWordsFulfilled(log types.Log) (RandomWordsFulfilled, error) {
parsed, err := c.coordinator.ParseRandomWordsFulfilled(log)
if err != nil {
return nil, err
}
return NewV2_5RandomWordsFulfilled(parsed), nil
}

func (c *coordinatorV2_5) RequestRandomWords(opts *bind.TransactOpts, keyHash [32]byte, subID *big.Int, requestConfirmations uint16, callbackGasLimit uint32, numWords uint32, payInEth bool) (*types.Transaction, error) {
extraArgs, err := extraargs.ExtraArgsV1(payInEth)
if err != nil {
Expand Down
54 changes: 54 additions & 0 deletions core/services/vrf/v2/integration_helpers_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v2_test

import (
"fmt"
"math/big"
"strings"
"testing"
Expand All @@ -12,6 +13,8 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/google/uuid"
"github.com/onsi/gomega"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -72,6 +75,7 @@ func testSingleConsumerHappyPath(
GasEstimator: toml.KeySpecificGasEstimator{PriceMax: gasLanePriceWei},
})(c, s)
c.EVM[0].MinIncomingConfirmations = ptr[uint32](2)
c.Feature.LogPoller = ptr(true)
})
app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, uni.backend, ownerKey, key1, key2)

Expand Down Expand Up @@ -355,6 +359,7 @@ func testMultipleConsumersNeedTrustedBHS(
c.Feature.LogPoller = ptr(true)
c.EVM[0].FinalityDepth = ptr[uint32](2)
c.EVM[0].LogPollInterval = models.MustNewDuration(time.Second)
c.Feature.LogPoller = ptr(true)
})
keys = append(keys, ownerKey, vrfKey)
app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, uni.backend, keys...)
Expand Down Expand Up @@ -539,6 +544,7 @@ func testSingleConsumerHappyPathBatchFulfillment(
c.EVM[0].GasEstimator.LimitDefault = ptr[uint32](5_000_000)
c.EVM[0].MinIncomingConfirmations = ptr[uint32](2)
c.EVM[0].ChainID = (*utils.Big)(testutils.SimulatedChainID)
c.Feature.LogPoller = ptr(true)
})
app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, uni.backend, ownerKey, key1)

Expand Down Expand Up @@ -641,6 +647,7 @@ func testSingleConsumerNeedsTopUp(
GasEstimator: toml.KeySpecificGasEstimator{PriceMax: gasLanePriceWei},
})(c, s)
c.EVM[0].MinIncomingConfirmations = ptr[uint32](2)
c.Feature.LogPoller = ptr(true)
})
app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, uni.backend, ownerKey, key)

Expand Down Expand Up @@ -904,6 +911,7 @@ func testSingleConsumerForcedFulfillment(
GasEstimator: toml.KeySpecificGasEstimator{PriceMax: gasLanePriceWei},
})(c, s)
c.EVM[0].MinIncomingConfirmations = ptr[uint32](2)
c.Feature.LogPoller = ptr(true)
})
app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, uni.backend, ownerKey, key1, key2)

Expand Down Expand Up @@ -1068,6 +1076,7 @@ func testSingleConsumerEIP150(
})(c, s)
c.EVM[0].GasEstimator.LimitDefault = ptr(uint32(gasLimit))
c.EVM[0].MinIncomingConfirmations = ptr[uint32](2)
c.Feature.LogPoller = ptr(true)
})
app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, uni.backend, ownerKey, key1)
consumer := uni.vrfConsumers[0]
Expand Down Expand Up @@ -1136,6 +1145,7 @@ func testSingleConsumerEIP150Revert(
})(c, s)
c.EVM[0].GasEstimator.LimitDefault = ptr(uint32(gasLimit))
c.EVM[0].MinIncomingConfirmations = ptr[uint32](2)
c.Feature.LogPoller = ptr(true)
})
app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, uni.backend, ownerKey, key1)
consumer := uni.vrfConsumers[0]
Expand Down Expand Up @@ -1199,6 +1209,7 @@ func testSingleConsumerBigGasCallbackSandwich(
})(c, s)
c.EVM[0].GasEstimator.LimitDefault = ptr[uint32](5_000_000)
c.EVM[0].MinIncomingConfirmations = ptr[uint32](2)
c.Feature.LogPoller = ptr(true)
})
app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, uni.backend, ownerKey, key1)
consumer := uni.vrfConsumers[0]
Expand Down Expand Up @@ -1319,6 +1330,7 @@ func testSingleConsumerMultipleGasLanes(
})(c, s)
c.EVM[0].MinIncomingConfirmations = ptr[uint32](2)
c.EVM[0].GasEstimator.LimitDefault = ptr[uint32](5_000_000)
c.Feature.LogPoller = ptr(true)
})

app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, uni.backend, ownerKey, cheapKey, expensiveKey)
Expand Down Expand Up @@ -1434,6 +1446,7 @@ func testSingleConsumerAlwaysRevertingCallbackStillFulfilled(
GasEstimator: v2.KeySpecificGasEstimator{PriceMax: gasLanePriceWei},
})(c, s)
c.EVM[0].MinIncomingConfirmations = ptr[uint32](2)
c.Feature.LogPoller = ptr(true)
})
app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, uni.backend, ownerKey, key)
consumer := uni.reverter
Expand Down Expand Up @@ -1505,6 +1518,7 @@ func testConsumerProxyHappyPath(
GasEstimator: v2.KeySpecificGasEstimator{PriceMax: gasLanePriceWei},
})(c, s)
c.EVM[0].MinIncomingConfirmations = ptr[uint32](2)
c.Feature.LogPoller = ptr(true)
})
app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, uni.backend, ownerKey, key1, key2)
consumerOwner := uni.neil
Expand Down Expand Up @@ -1629,6 +1643,7 @@ func testMaliciousConsumer(
c.EVM[0].GasEstimator.PriceDefault = assets.GWei(1)
c.EVM[0].GasEstimator.FeeCapDefault = assets.GWei(1)
c.EVM[0].ChainID = (*utils.Big)(testutils.SimulatedChainID)
c.Feature.LogPoller = ptr(true)
})
carol := uni.vrfConsumers[0]

Expand Down Expand Up @@ -1720,3 +1735,42 @@ func testMaliciousConsumer(
}
require.Equal(t, 1, len(requests))
}

func getMetricInt64(c prometheus.Collector) (int64, error) {
var (
m prometheus.Metric
mCount int
mChan = make(chan prometheus.Metric)
done = make(chan struct{})
)

go func() {
for m = range mChan {
mCount++
}
close(done)
}()

c.Collect(mChan)
close(mChan)
<-done

if mCount != 1 {
return 0.0, fmt.Errorf("collected %d metrics instead of exactly 1", mCount)
}

pb := &dto.Metric{}
if err := m.Write(pb); err != nil {
panic(fmt.Errorf("error happened while collecting metrics: %w", err))
}
if pb.Gauge != nil {
return int64(pb.Gauge.GetValue()), nil
}
if pb.Counter != nil {
return int64(pb.Counter.GetValue()), nil
}
if pb.Untyped != nil {
return int64(pb.Untyped.GetValue()), nil
}
panic(fmt.Errorf("collected a non-gauge/counter/untyped metric: %s", pb))
}
Loading

0 comments on commit 654c261

Please sign in to comment.