Skip to content

Commit

Permalink
Change Chain Reader Lp filters from per event binding to per contract
Browse files Browse the repository at this point in the history
  • Loading branch information
ilija42 committed May 29, 2024
1 parent 816ef3c commit 58c637d
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 135 deletions.
18 changes: 9 additions & 9 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,15 @@ func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, opts Opts) *logPoller
}

type Filter struct {
Name string // see FilterName(id, args) below
Addresses evmtypes.AddressArray
EventSigs evmtypes.HashArray // list of possible values for eventsig (aka topic1)
Topic2 evmtypes.HashArray // list of possible values for topic2
Topic3 evmtypes.HashArray // list of possible values for topic3
Topic4 evmtypes.HashArray // list of possible values for topic4
Retention time.Duration // maximum amount of time to retain logs
MaxLogsKept uint64 // maximum number of logs to retain ( 0 = unlimited )
LogsPerBlock uint64 // rate limit ( maximum # of logs per block, 0 = unlimited )
Name string `json:"name"` // see FilterName(id, args) below
Addresses evmtypes.AddressArray `json:"addresses"`
EventSigs evmtypes.HashArray `json:"eventSigs"` // list of possible values for eventsig (aka topic1)
Topic2 evmtypes.HashArray `json:"topic2"` // list of possible values for topic2
Topic3 evmtypes.HashArray `json:"topic3"` // list of possible values for topic3
Topic4 evmtypes.HashArray `json:"topic4"` // list of possible values for topic4
Retention time.Duration `json:"retention"` // maximum amount of time to retain logs
MaxLogsKept uint64 `json:"maxLogsKept"` // maximum number of logs to retain ( 0 = unlimited )
LogsPerBlock uint64 `json:"logsPerBlock"` // rate limit ( maximum # of logs per block, 0 = unlimited )
}

// FilterName is a suggested convenience function for clients to construct unique filter names
Expand Down
4 changes: 1 addition & 3 deletions core/services/relay/evm/binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
type readBinding interface {
GetLatestValue(ctx context.Context, params, returnVal any) error
QueryKey(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, sequenceDataType any) ([]commontypes.Sequence, error)
Bind(ctx context.Context, binding commontypes.BoundContract) error
Bind(binding commontypes.BoundContract)
SetCodec(codec commontypes.RemoteCodec)
Register(ctx context.Context) error
Unregister(ctx context.Context) error
}
79 changes: 63 additions & 16 deletions core/services/relay/evm/bindings.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,28 @@ package evm
import (
"context"
"fmt"
"sync"

"github.com/ethereum/go-ethereum/common"

commontypes "github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
)

// key is contract name
type contractBindings map[string]readBindings
type bindings map[string]*contractBindings

type readBindings struct {
type contractBindings struct {
// contractFilter is used to filter over all events or any subset of events with same filtering parameters.
// if an event is present in the contract filter, it can't define its own filter in the event binding.
contractFilter logpoller.Filter
contractFilter logpoller.Filter
filterLock sync.Mutex
areEventFiltersRegistered bool
// key is read name
bindings map[string]readBinding
}

func (b contractBindings) GetReadBinding(contractName, readName string) (readBinding, error) {
func (b bindings) GetReadBinding(contractName, readName string) (readBinding, error) {
rb, rbExists := b[contractName]
if !rbExists {
return nil, fmt.Errorf("%w: no contract named %s", commontypes.ErrInvalidType, contractName)
Expand All @@ -32,37 +37,79 @@ func (b contractBindings) GetReadBinding(contractName, readName string) (readBin
return reader, nil
}

func (b contractBindings) AddReadBinding(contractName, readName string, reader readBinding) {
func (b bindings) AddReadBinding(contractName, readName string, rb readBinding) {
rbs, rbsExists := b[contractName]
if !rbsExists {
rbs = readBindings{}
rbs = &contractBindings{}
b[contractName] = rbs
}
rbs.bindings[readName] = reader
rbs.bindings[readName] = rb
}

func (b contractBindings) Bind(ctx context.Context, boundContracts []commontypes.BoundContract) error {
func (b bindings) Bind(ctx context.Context, logPoller logpoller.LogPoller, boundContracts []commontypes.BoundContract) error {
for _, bc := range boundContracts {
rbs, rbsExist := b[bc.Name]
if !rbsExist {
return fmt.Errorf("%w: no contract named %s", commontypes.ErrInvalidConfig, bc.Name)
}

rbs.contractFilter.Addresses = append(rbs.contractFilter.Addresses, common.HexToAddress(bc.Address))
rbs.contractFilter.Name = logpoller.FilterName(bc.Name, bc.Address)

if err := rbs.UnregisterEventFilters(ctx, logPoller); err != nil {
return err
}

// if contract event filter isn't already registered then it will be by chain reader on startup
// if it is already registered then we are overriding filters registered on startup
if rbs.areEventFiltersRegistered {
return rbs.RegisterEventFilters(ctx, logPoller)
}

for _, r := range rbs.bindings {
if err := r.Bind(ctx, bc); err != nil {
return err
}
r.Bind(bc)
}
}
return nil
}

func (b contractBindings) ForEach(ctx context.Context, fn func(readBinding, context.Context) error) error {
func (b bindings) ForEach(ctx context.Context, fn func(context.Context, *contractBindings) error) error {
for _, rbs := range b {
for _, rb := range rbs.bindings {
if err := fn(rb, ctx); err != nil {
return err
}
if err := fn(ctx, rbs); err != nil {
return err
}
}
return nil
}

func (rb *contractBindings) RegisterEventFilters(ctx context.Context, logPoller logpoller.LogPoller) error {
rb.filterLock.Lock()
defer rb.filterLock.Unlock()

rb.areEventFiltersRegistered = true

if logPoller.HasFilter(rb.contractFilter.Name) {
return nil
}

if err := logPoller.RegisterFilter(ctx, rb.contractFilter); err != nil {
return fmt.Errorf("%w: %w", commontypes.ErrInternal, err)
}

return nil

}

func (rb *contractBindings) UnregisterEventFilters(ctx context.Context, logPoller logpoller.LogPoller) error {
rb.filterLock.Lock()
defer rb.filterLock.Unlock()

if !logPoller.HasFilter(rb.contractFilter.Name) {
return nil
}

if err := logPoller.UnregisterFilter(ctx, rb.contractFilter.Name); err != nil {
return fmt.Errorf("%w: %w", commontypes.ErrInternal, err)
}
return nil
}
44 changes: 19 additions & 25 deletions core/services/relay/evm/chain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,14 @@ import (
"time"

"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/google/uuid"

"github.com/smartcontractkit/chainlink-common/pkg/codec"
"github.com/smartcontractkit/chainlink-common/pkg/types/query"

commonservices "github.com/smartcontractkit/chainlink-common/pkg/services"
commontypes "github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/types/query"

evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types"
Expand All @@ -33,7 +30,7 @@ type chainReader struct {
lggr logger.Logger
lp logpoller.LogPoller
client evmclient.Client
contractBindings contractBindings
contractBindings bindings
parsed *parsedTypes
codec commontypes.RemoteCodec
commonservices.StateMachine
Expand All @@ -48,7 +45,7 @@ func NewChainReaderService(ctx context.Context, lggr logger.Logger, lp logpoller
lggr: lggr.Named("ChainReader"),
lp: lp,
client: client,
contractBindings: contractBindings{},
contractBindings: bindings{},
parsed: &parsedTypes{encoderDefs: map[string]types.CodecEntry{}, decoderDefs: map[string]types.CodecEntry{}},
}

Expand All @@ -61,8 +58,10 @@ func NewChainReaderService(ctx context.Context, lggr logger.Logger, lp logpoller
return nil, err
}

err = cr.contractBindings.ForEach(ctx, func(b readBinding, c context.Context) error {
b.SetCodec(cr.codec)
err = cr.contractBindings.ForEach(ctx, func(c context.Context, rbs *contractBindings) error {
for _, rb := range rbs.bindings {
rb.SetCodec(cr.codec)
}
return nil
})

Expand All @@ -83,7 +82,7 @@ func (cr *chainReader) GetLatestValue(ctx context.Context, contractName, method
}

func (cr *chainReader) Bind(ctx context.Context, bindings []commontypes.BoundContract) error {
return cr.contractBindings.Bind(ctx, bindings)
return cr.contractBindings.Bind(ctx, cr.lp, bindings)
}

func (cr *chainReader) QueryKey(ctx context.Context, contractName string, filter query.KeyFilter, limitAndSort query.LimitAndSort, sequenceDataType any) ([]commontypes.Sequence, error) {
Expand Down Expand Up @@ -119,21 +118,27 @@ func (cr *chainReader) init(chainContractReaders map[string]types.ChainContractR
return err
}
}

cr.contractBindings[contractName].contractFilter = chainContractReader.LogPollerFilter
}
return nil
}

func (cr *chainReader) Start(ctx context.Context) error {
return cr.StartOnce("ChainReader", func() error {
return cr.contractBindings.ForEach(ctx, readBinding.Register)
return cr.contractBindings.ForEach(ctx, func(c context.Context, rbs *contractBindings) error {
return rbs.RegisterEventFilters(ctx, cr.lp)
})
})
}

func (cr *chainReader) Close() error {
return cr.StopOnce("ChainReader", func() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
return cr.contractBindings.ForEach(ctx, readBinding.Unregister)
return cr.contractBindings.ForEach(ctx, func(c context.Context, rbs *contractBindings) error {
return rbs.UnregisterEventFilters(ctx, cr.lp)
})
})
}

Expand Down Expand Up @@ -209,27 +214,16 @@ func (cr *chainReader) addEvent(contractName, eventName string, a abi.ABI, chain
}

eventDefinitions := chainReaderDefinition.EventDefinitions
pollingFilter := eventDefinitions.PollingFilter
eb := &eventBinding{
contractName: contractName,
eventName: eventName,
lp: cr.lp,
logPollerFilter: logpoller.Filter{
EventSigs: evmtypes.HashArray{event.ID},
Topic2: pollingFilter.Topic2,
Topic3: pollingFilter.Topic3,
Topic4: pollingFilter.Topic4,
Retention: pollingFilter.Retention.Duration(),
MaxLogsKept: pollingFilter.MaxLogsKept,
LogsPerBlock: pollingFilter.LogsPerBlock,
},
contractName: contractName,
eventName: eventName,
lp: cr.lp,
hash: event.ID,
inputInfo: inputInfo,
inputModifier: inputModifier,
codecTopicInfo: codecTopicInfo,
topics: make(map[string]topicDetail),
eventDataWords: eventDefinitions.GenericDataWordNames,
id: wrapItemType(contractName, eventName, false) + uuid.NewString(),
}

cr.contractBindings.AddReadBinding(contractName, eventName, eb)
Expand Down
6 changes: 3 additions & 3 deletions core/services/relay/evm/chain_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,20 +196,20 @@ func (it *chainReaderInterfaceTester) Setup(t *testing.T) {
EventWithFilterName: {
ChainSpecificName: "Triggered",
ReadType: types.Event,
EventInputFields: []string{"Field"},
EventDefinitions: &types.EventDefinitions{InputFields: []string{"Field"}},
},
triggerWithDynamicTopic: {
ChainSpecificName: triggerWithDynamicTopic,
ReadType: types.Event,
EventInputFields: []string{"fieldHash"},
EventDefinitions: &types.EventDefinitions{InputFields: []string{"fieldHash"}},
InputModifications: codec.ModifiersConfig{
&codec.RenameModifierConfig{Fields: map[string]string{"FieldHash": "Field"}},
},
},
triggerWithAllTopics: {
ChainSpecificName: triggerWithAllTopics,
ReadType: types.Event,
EventInputFields: []string{"Field1", "Field2", "Field3"},
EventDefinitions: &types.EventDefinitions{InputFields: []string{"Field1", "Field2", "Field3"}},
},
MethodReturningSeenStruct: {
ChainSpecificName: "returnSeen",
Expand Down
Loading

0 comments on commit 58c637d

Please sign in to comment.