Skip to content

Commit

Permalink
Replace mercury eventbroadcaster with polling (#11707)
Browse files Browse the repository at this point in the history
* POC example code to replace mercury eventbroadcaster with polling

* remove Mercury Notify optimization

* fix bad merge
  • Loading branch information
krehermann authored Jan 9, 2024
1 parent 61f42ce commit 80bc9f2
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 119 deletions.
1 change: 0 additions & 1 deletion core/services/relay/evm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,6 @@ func newConfigProvider(lggr logger.Logger, chain legacyevm.Chain, opts *types.Re
chain.LogPoller(),
aggregatorAddress,
*relayConfig.FeedID,
eventBroadcaster,
// TODO: Does mercury need to support config contract? DF-19182
)
} else {
Expand Down
60 changes: 3 additions & 57 deletions core/services/relay/evm/mercury/config_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,52 +91,37 @@ type ConfigPoller struct {
destChainLogPoller logpoller.LogPoller
addr common.Address
feedId common.Hash
notifyCh chan struct{}
subscription pg.Subscription
}

func FilterName(addr common.Address, feedID common.Hash) string {
return logpoller.FilterName("OCR3 Mercury ConfigPoller", addr.String(), feedID.Hex())
}

// NewConfigPoller creates a new Mercury ConfigPoller
func NewConfigPoller(lggr logger.Logger, destChainPoller logpoller.LogPoller, addr common.Address, feedId common.Hash, eventBroadcaster pg.EventBroadcaster) (*ConfigPoller, error) {
func NewConfigPoller(lggr logger.Logger, destChainPoller logpoller.LogPoller, addr common.Address, feedId common.Hash) (*ConfigPoller, error) {
err := destChainPoller.RegisterFilter(logpoller.Filter{Name: FilterName(addr, feedId), EventSigs: []common.Hash{FeedScopedConfigSet}, Addresses: []common.Address{addr}})
if err != nil {
return nil, err
}

subscription, err := eventBroadcaster.Subscribe(pg.ChannelInsertOnEVMLogs, "")
if err != nil {
return nil, err
}

cp := &ConfigPoller{
lggr: lggr,
destChainLogPoller: destChainPoller,
addr: addr,
feedId: feedId,
notifyCh: make(chan struct{}, 1),
subscription: subscription,
}

return cp, nil
}

// Start the subscription to Postgres' notify events.
func (cp *ConfigPoller) Start() {
go cp.startLogSubscription()
}
func (cp *ConfigPoller) Start() {}

// Close the subscription to Postgres' notify events.
func (cp *ConfigPoller) Close() error {
cp.subscription.Close()
return nil
}

// Notify abstracts the logpoller.LogPoller Notify() implementation
func (cp *ConfigPoller) Notify() <-chan struct{} {
return cp.notifyCh
return nil // rely on libocr's builtin config polling
}

// Replay abstracts the logpoller.LogPoller Replay() implementation
Expand Down Expand Up @@ -190,42 +175,3 @@ func (cp *ConfigPoller) LatestBlockHeight(ctx context.Context) (blockHeight uint
}
return uint64(latest.BlockNumber), nil
}

func (cp *ConfigPoller) startLogSubscription() {
// trim the leading 0x to make it comparable to pg's hex encoding.
addressPgHex := cp.addr.Hex()[2:]
feedIdPgHex := cp.feedId.Hex()[2:]

for {
event, ok := <-cp.subscription.Events()
if !ok {
cp.lggr.Debug("eventBroadcaster subscription closed, exiting notify loop")
return
}

// Event payload should look like: "<address>:<topicVal1>,<topicVal2>"
addressTopicValues := strings.Split(event.Payload, ":")
if len(addressTopicValues) < 2 {
cp.lggr.Warnf("invalid event from %s channel: %s", pg.ChannelInsertOnEVMLogs, event.Payload)
continue
}

address := addressTopicValues[0]
if address != addressPgHex {
continue
}

topicValues := strings.Split(addressTopicValues[1], ",")
if len(topicValues) <= feedIdTopicIndex {
continue
}
if topicValues[feedIdTopicIndex] != feedIdPgHex {
continue
}

select {
case cp.notifyCh <- struct{}{}:
default:
}
}
}
51 changes: 0 additions & 51 deletions core/services/relay/evm/mercury/config_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/onsi/gomega"
"github.com/pkg/errors"
confighelper2 "github.com/smartcontractkit/libocr/offchainreporting2plus/confighelper"
Expand All @@ -18,7 +17,6 @@ import (

evmutils "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

Expand All @@ -27,7 +25,6 @@ func TestMercuryConfigPoller(t *testing.T) {
feedIDBytes := [32]byte(feedID)

th := SetupTH(t, feedID)
th.subscription.On("Events").Return(nil)

notify := th.configPoller.Notify()
assert.Empty(t, notify)
Expand Down Expand Up @@ -115,54 +112,6 @@ func TestMercuryConfigPoller(t *testing.T) {
assert.Equal(t, offchainConfig, newConfig.OffchainConfig)
}

func TestNotify(t *testing.T) {
testutils.SkipFlakey(t, "https://smartcontract-it.atlassian.net/browse/BCF-2746")
feedIDStr := "8257737fdf4f79639585fd0ed01bea93c248a9ad940e98dd27f41c9b6230fed1"
feedIDBytes, err := hexutil.Decode("0x" + feedIDStr)
require.NoError(t, err)
feedID := common.BytesToHash(feedIDBytes)

eventCh := make(chan pg.Event)

th := SetupTH(t, feedID)
th.subscription.On("Events").Return((<-chan pg.Event)(eventCh))

addressPgHex := th.verifierAddress.Hex()[2:]

notify := th.configPoller.Notify()
assert.Empty(t, notify)

eventCh <- pg.Event{} // Empty event
assert.Empty(t, notify)

eventCh <- pg.Event{Payload: addressPgHex} // missing topic values
assert.Empty(t, notify)

eventCh <- pg.Event{Payload: addressPgHex + ":val1"} // missing feedId topic value
assert.Empty(t, notify)

eventCh <- pg.Event{Payload: addressPgHex + ":8257737fdf4f79639585fd0ed01bea93c248a9ad940e98dd27f41c9b6230fed1,val2"} // wrong index
assert.Empty(t, notify)

eventCh <- pg.Event{Payload: addressPgHex + ":val1,val2,8257737fdf4f79639585fd0ed01bea93c248a9ad940e98dd27f41c9b6230fed1"} // wrong index
assert.Empty(t, notify)

eventCh <- pg.Event{Payload: addressPgHex + ":val1,0x8257737fdf4f79639585fd0ed01bea93c248a9ad940e98dd27f41c9b6230fed1"} // 0x prefix
assert.Empty(t, notify)

eventCh <- pg.Event{Payload: "wrong_address:val1,8257737fdf4f79639585fd0ed01bea93c248a9ad940e98dd27f41c9b6230fed1"} // wrong address
assert.Empty(t, notify)

eventCh <- pg.Event{Payload: addressPgHex + ":val1,8257737fdf4f79639585fd0ed01bea93c248a9ad940e98dd27f41c9b6230fed1"} // expected event to notify on
assert.Eventually(t, func() bool { <-notify; return true }, time.Second, 10*time.Millisecond)

eventCh <- pg.Event{Payload: addressPgHex + ":val1,8257737fdf4f79639585fd0ed01bea93c248a9ad940e98dd27f41c9b6230fed1"} // try second time
assert.Eventually(t, func() bool { <-notify; return true }, time.Second, 10*time.Millisecond)

eventCh <- pg.Event{Payload: addressPgHex + ":val1,8257737fdf4f79639585fd0ed01bea93c248a9ad940e98dd27f41c9b6230fed1:additional"} // additional colon separated parts
assert.Eventually(t, func() bool { <-notify; return true }, time.Second, 10*time.Millisecond)
}

func onchainPublicKeyToAddress(publicKeys []types.OnchainPublicKey) (addresses []common.Address, err error) {
for _, signer := range publicKeys {
if len(signer) != 20 {
Expand Down
12 changes: 2 additions & 10 deletions core/services/relay/evm/mercury/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"

evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils"
Expand All @@ -26,7 +27,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
pgmocks "github.com/smartcontractkit/chainlink/v2/core/services/pg/mocks"
reportcodecv1 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v1/reportcodec"
reportcodecv2 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v2/reportcodec"
reportcodecv3 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v3/reportcodec"
Expand Down Expand Up @@ -143,8 +143,6 @@ type TestHarness struct {
verifierAddress common.Address
verifierContract *verifier.Verifier
logPoller logpoller.LogPoller
eventBroadcaster *pgmocks.EventBroadcaster
subscription *pgmocks.Subscription
}

func SetupTH(t *testing.T, feedID common.Hash) TestHarness {
Expand All @@ -170,13 +168,9 @@ func SetupTH(t *testing.T, feedID common.Hash) TestHarness {
lggr := logger.TestLogger(t)
lorm := logpoller.NewORM(big.NewInt(1337), db, lggr, cfg)
lp := logpoller.NewLogPoller(lorm, ethClient, lggr, 100*time.Millisecond, false, 1, 2, 2, 1000)
eventBroadcaster := pgmocks.NewEventBroadcaster(t)
subscription := pgmocks.NewSubscription(t)
servicetest.Run(t, lp)

eventBroadcaster.On("Subscribe", "evm.insert_on_logs", "").Return(subscription, nil)

configPoller, err := NewConfigPoller(lggr, lp, verifierAddress, feedID, eventBroadcaster)
configPoller, err := NewConfigPoller(lggr, lp, verifierAddress, feedID)
require.NoError(t, err)

configPoller.Start()
Expand All @@ -188,7 +182,5 @@ func SetupTH(t *testing.T, feedID common.Hash) TestHarness {
verifierAddress: verifierAddress,
verifierContract: verifierContract,
logPoller: lp,
eventBroadcaster: eventBroadcaster,
subscription: subscription,
}
}

0 comments on commit 80bc9f2

Please sign in to comment.