Skip to content

Commit

Permalink
Add per event polling filter override for Chain Reader
Browse files Browse the repository at this point in the history
  • Loading branch information
ilija42 committed May 31, 2024
1 parent 0055f69 commit a1cdcf9
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 49 deletions.
2 changes: 2 additions & 0 deletions core/services/relay/evm/binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,7 @@ type readBinding interface {
GetLatestValue(ctx context.Context, params, returnVal any) error
QueryKey(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, sequenceDataType any) ([]commontypes.Sequence, error)
Bind(binding commontypes.BoundContract)
Register(ctx context.Context) error
Unregister(ctx context.Context) error
SetCodec(codec commontypes.RemoteCodec)
}
63 changes: 42 additions & 21 deletions core/services/relay/evm/bindings.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,25 @@ import (
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/google/uuid"

commontypes "github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
)

// key is contract name
type bindings map[string]*contractBindings

type FilterRegisterer struct {
pollingFilter logpoller.Filter
filterLock sync.Mutex
isRegistered bool
}

type contractBindings struct {
// contractFilter is used to filter over all events or any subset of events with same filtering parameters.
// if an event is present in the contract filter, it can't define its own filter in the event binding.
contractFilter logpoller.Filter
filterLock sync.Mutex
areEventFiltersRegistered bool
// FilterRegisterer is used to manage polling filter registration.
FilterRegisterer
// key is read name
bindings map[string]readBinding
}
Expand Down Expand Up @@ -53,22 +58,25 @@ func (b bindings) Bind(ctx context.Context, logPoller logpoller.LogPoller, bound
return fmt.Errorf("%w: no contract named %s", commontypes.ErrInvalidConfig, bc.Name)
}

rbs.contractFilter.Addresses = append(rbs.contractFilter.Addresses, common.HexToAddress(bc.Address))
rbs.contractFilter.Name = logpoller.FilterName(bc.Name, bc.Address)
rbs.pollingFilter.Addresses = evmtypes.AddressArray{common.HexToAddress(bc.Address)}
rbs.pollingFilter.Name = logpoller.FilterName(bc.Name+"."+uuid.NewString(), bc.Address)

if err := rbs.UnregisterEventFilters(ctx, logPoller); err != nil {
// we are changing contract address reference, so we need to unregister old filters
if err := rbs.Unregister(ctx, logPoller); err != nil {
return err
}

// if contract event filter isn't already registered then it will be by chain reader on startup
// if it is already registered then we are overriding filters registered on startup
if rbs.areEventFiltersRegistered {
return rbs.RegisterEventFilters(ctx, logPoller)
}

for _, r := range rbs.bindings {
r.Bind(bc)
}

// if contract event filters aren't already registered then they will on startup
// if they are already registered then we are overriding them because contract binding (address) has changed
if rbs.isRegistered {
if err := rbs.Register(ctx, logPoller); err != nil {
return err
}
}
}
return nil
}
Expand All @@ -82,33 +90,46 @@ func (b bindings) ForEach(ctx context.Context, fn func(context.Context, *contrac
return nil
}

func (rb *contractBindings) RegisterEventFilters(ctx context.Context, logPoller logpoller.LogPoller) error {
// Register registers polling filters.
func (rb *contractBindings) Register(ctx context.Context, logPoller logpoller.LogPoller) error {
rb.filterLock.Lock()
defer rb.filterLock.Unlock()

rb.areEventFiltersRegistered = true
rb.isRegistered = true

if logPoller.HasFilter(rb.contractFilter.Name) {
if logPoller.HasFilter(rb.pollingFilter.Name) {
return nil
}

if err := logPoller.RegisterFilter(ctx, rb.contractFilter); err != nil {
if err := logPoller.RegisterFilter(ctx, rb.pollingFilter); err != nil {
return fmt.Errorf("%w: %w", commontypes.ErrInternal, err)
}

for _, binding := range rb.bindings {
if err := binding.Register(ctx); err != nil {
return err
}
}
return nil
}

func (rb *contractBindings) UnregisterEventFilters(ctx context.Context, logPoller logpoller.LogPoller) error {
// Unregister unregisters polling filters.
func (rb *contractBindings) Unregister(ctx context.Context, logPoller logpoller.LogPoller) error {
rb.filterLock.Lock()
defer rb.filterLock.Unlock()

if !logPoller.HasFilter(rb.contractFilter.Name) {
if !logPoller.HasFilter(rb.pollingFilter.Name) {
return nil
}

if err := logPoller.UnregisterFilter(ctx, rb.contractFilter.Name); err != nil {
if err := logPoller.UnregisterFilter(ctx, rb.pollingFilter.Name); err != nil {
return fmt.Errorf("%w: %w", commontypes.ErrInternal, err)
}

for _, binding := range rb.bindings {
if err := binding.Unregister(ctx); err != nil {
return err
}
}
return nil
}
40 changes: 24 additions & 16 deletions core/services/relay/evm/chain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,23 +106,27 @@ func (cr *chainReader) init(chainContractReaders map[string]types.ChainContractR
return err
}

var eventSigsForContractFilter evmtypes.HashArray
var eventSigsForPollingFilter evmtypes.HashArray
for typeName, chainReaderDefinition := range chainContractReader.Configs {
switch chainReaderDefinition.ReadType {
case types.Method:
err = cr.addMethod(contractName, typeName, contractAbi, *chainReaderDefinition)
case types.Event:
if chainReaderDefinition.HasPollingFilter() {
if slices.Contains(chainContractReader.GenericEventNames, typeName) {
return fmt.Errorf(
"%w: invalid chain reader polling filter definition, "+
"can't have polling filter defined both on contract and event level: %s",
commontypes.ErrInvalidConfig,
chainReaderDefinition.ReadType)
}
} else {
eventSig := contractAbi.Events[chainReaderDefinition.ChainSpecificName]
eventSigsForContractFilter = append(eventSigsForContractFilter, eventSig.ID)
partOfContractPollingFilter := slices.Contains(chainContractReader.GenericEventNames, typeName)
hasContractFilterOverride := chainReaderDefinition.HasPollingFilter()
if !partOfContractPollingFilter && !chainReaderDefinition.HasPollingFilter() {
return fmt.Errorf(
"%w: chain reader has no polling filter defined for contract: %s event: %s",
commontypes.ErrInvalidConfig, contractName, typeName)
}
if hasContractFilterOverride && partOfContractPollingFilter {
return fmt.Errorf(
"%w: conflicting chain reader polling filter definitions for contract: %s event: %s, can't have polling filter defined both on contract and event level",
commontypes.ErrInvalidConfig, contractName, typeName)
}

if !hasContractFilterOverride {
eventSigsForPollingFilter = append(eventSigsForPollingFilter, contractAbi.Events[chainReaderDefinition.ChainSpecificName].ID)
}

err = cr.addEvent(contractName, typeName, contractAbi, *chainReaderDefinition)
Expand All @@ -132,20 +136,19 @@ func (cr *chainReader) init(chainContractReaders map[string]types.ChainContractR
commontypes.ErrInvalidConfig,
chainReaderDefinition.ReadType)
}

if err != nil {
return err
}
}
cr.contractBindings[contractName].contractFilter = chainContractReader.PollingFilter.ToLPFilter(eventSigsForContractFilter)
cr.contractBindings[contractName].pollingFilter = chainContractReader.PollingFilter.ToLPFilter(eventSigsForPollingFilter)
}
return nil
}

func (cr *chainReader) Start(ctx context.Context) error {
return cr.StartOnce("ChainReader", func() error {
return cr.contractBindings.ForEach(ctx, func(c context.Context, rbs *contractBindings) error {
return rbs.RegisterEventFilters(ctx, cr.lp)
return rbs.Register(ctx, cr.lp)
})
})
}
Expand All @@ -155,7 +158,7 @@ func (cr *chainReader) Close() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
return cr.contractBindings.ForEach(ctx, func(c context.Context, rbs *contractBindings) error {
return rbs.UnregisterEventFilters(ctx, cr.lp)
return rbs.Unregister(ctx, cr.lp)
})
})
}
Expand Down Expand Up @@ -212,6 +215,10 @@ func (cr *chainReader) addEvent(contractName, eventName string, a abi.ABI, chain
return fmt.Errorf("%w: event %s doesn't exist", commontypes.ErrInvalidConfig, chainReaderDefinition.ChainSpecificName)
}

if chainReaderDefinition.EventDefinitions == nil {
return fmt.Errorf("%w: event %s doesn't have event definitions set", commontypes.ErrInvalidConfig, chainReaderDefinition.ChainSpecificName)
}

filterArgs, codecTopicInfo, indexArgNames := setupEventInput(event, chainReaderDefinition)
if err := verifyEventInputsUsed(chainReaderDefinition, indexArgNames); err != nil {
return err
Expand Down Expand Up @@ -240,6 +247,7 @@ func (cr *chainReader) addEvent(contractName, eventName string, a abi.ABI, chain
eb := &eventBinding{
contractName: contractName,
eventName: eventName,
logPollerFilter: eventDefinitions.PollingFilter.ToLPFilter(evmtypes.HashArray{a.Events[event.Name].ID}),
lp: cr.lp,
hash: event.ID,
inputInfo: inputInfo,
Expand Down
59 changes: 48 additions & 11 deletions core/services/relay/evm/event_binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/google/uuid"

"github.com/smartcontractkit/chainlink-common/pkg/codec"
commontypes "github.com/smartcontractkit/chainlink-common/pkg/types"
Expand All @@ -20,17 +21,19 @@ import (
)

type eventBinding struct {
address common.Address
contractName string
eventName string
lp logpoller.LogPoller
hash common.Hash
codec commontypes.RemoteCodec
pending bool
bound bool
inputInfo types.CodecEntry
inputModifier codec.Modifier
codecTopicInfo types.CodecEntry
FilterRegisterer
address common.Address
contractName string
eventName string
lp logpoller.LogPoller
logPollerFilter logpoller.Filter
hash common.Hash
codec commontypes.RemoteCodec
pending bool
bound bool
inputInfo types.CodecEntry
inputModifier codec.Modifier
codecTopicInfo types.CodecEntry
// topics maps a generic topic name (key) to topic data
topics map[string]topicDetail
// eventDataWords maps a generic name to a word index
Expand All @@ -51,6 +54,35 @@ func (e *eventBinding) SetCodec(codec commontypes.RemoteCodec) {
e.codec = codec
}

func (e *eventBinding) Register(ctx context.Context) error {
e.filterLock.Lock()
defer e.filterLock.Unlock()

e.isRegistered = true
if !e.bound || e.lp.HasFilter(e.pollingFilter.Name) {
return nil
}

if err := e.lp.RegisterFilter(ctx, e.logPollerFilter); err != nil {
return fmt.Errorf("%w: %w", commontypes.ErrInternal, err)
}
return nil
}

func (e *eventBinding) Unregister(ctx context.Context) error {
e.filterLock.Lock()
defer e.filterLock.Unlock()

if !e.lp.HasFilter(e.pollingFilter.Name) {
return nil
}

if err := e.lp.UnregisterFilter(ctx, e.pollingFilter.Name); err != nil {
return fmt.Errorf("%w: %w", commontypes.ErrInternal, err)
}
return nil
}

func (e *eventBinding) GetLatestValue(ctx context.Context, params, into any) error {
if !e.bound {
return fmt.Errorf("%w: event not bound", commontypes.ErrInvalidType)
Expand Down Expand Up @@ -102,6 +134,11 @@ func (e *eventBinding) QueryKey(ctx context.Context, filter query.KeyFilter, lim
func (e *eventBinding) Bind(binding commontypes.BoundContract) {
e.address = common.HexToAddress(binding.Address)
e.pending = binding.Pending

id := fmt.Sprintf("%s,%s,%s", e.contractName, e.eventName, uuid.NewString())
e.logPollerFilter.Name = logpoller.FilterName(id, e.address)
e.logPollerFilter.Addresses = evmtypes.AddressArray{e.address}

e.bound = true
}

Expand Down
5 changes: 4 additions & 1 deletion core/services/relay/evm/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ type chainReaderDefinitionFields struct {
}

func (d *ChainReaderDefinition) HasPollingFilter() bool {
return d.EventDefinitions == nil && d.EventDefinitions.PollingFilter == nil
if d.EventDefinitions == nil && d.EventDefinitions.PollingFilter == nil {
return false
}
return true
}

func (d *ChainReaderDefinition) MarshalText() ([]byte, error) {
Expand Down

0 comments on commit a1cdcf9

Please sign in to comment.