From 58c637d5839d5ef32063a3f10b3a00b47e048c8a Mon Sep 17 00:00:00 2001 From: ilija Date: Wed, 29 May 2024 18:19:44 +0200 Subject: [PATCH] Change Chain Reader Lp filters from per event binding to per contract --- core/chains/evm/logpoller/log_poller.go | 18 ++--- core/services/relay/evm/binding.go | 4 +- core/services/relay/evm/bindings.go | 79 ++++++++++++++++---- core/services/relay/evm/chain_reader.go | 44 +++++------ core/services/relay/evm/chain_reader_test.go | 6 +- core/services/relay/evm/event_binding.go | 71 ++++-------------- core/services/relay/evm/method_binding.go | 3 +- core/services/relay/evm/types/types.go | 22 +----- 8 files changed, 112 insertions(+), 135 deletions(-) diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index 26978b18d48..dc7f989c67b 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -173,15 +173,15 @@ func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, opts Opts) *logPoller } type Filter struct { - Name string // see FilterName(id, args) below - Addresses evmtypes.AddressArray - EventSigs evmtypes.HashArray // list of possible values for eventsig (aka topic1) - Topic2 evmtypes.HashArray // list of possible values for topic2 - Topic3 evmtypes.HashArray // list of possible values for topic3 - Topic4 evmtypes.HashArray // list of possible values for topic4 - Retention time.Duration // maximum amount of time to retain logs - MaxLogsKept uint64 // maximum number of logs to retain ( 0 = unlimited ) - LogsPerBlock uint64 // rate limit ( maximum # of logs per block, 0 = unlimited ) + Name string `json:"name"` // see FilterName(id, args) below + Addresses evmtypes.AddressArray `json:"addresses"` + EventSigs evmtypes.HashArray `json:"eventSigs"` // list of possible values for eventsig (aka topic1) + Topic2 evmtypes.HashArray `json:"topic2"` // list of possible values for topic2 + Topic3 evmtypes.HashArray `json:"topic3"` // list of possible values for topic3 + Topic4 evmtypes.HashArray `json:"topic4"` // list of possible values for topic4 + Retention time.Duration `json:"retention"` // maximum amount of time to retain logs + MaxLogsKept uint64 `json:"maxLogsKept"` // maximum number of logs to retain ( 0 = unlimited ) + LogsPerBlock uint64 `json:"logsPerBlock"` // rate limit ( maximum # of logs per block, 0 = unlimited ) } // FilterName is a suggested convenience function for clients to construct unique filter names diff --git a/core/services/relay/evm/binding.go b/core/services/relay/evm/binding.go index d7a04dcc9b9..273aebed2ef 100644 --- a/core/services/relay/evm/binding.go +++ b/core/services/relay/evm/binding.go @@ -10,8 +10,6 @@ import ( type readBinding interface { GetLatestValue(ctx context.Context, params, returnVal any) error QueryKey(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, sequenceDataType any) ([]commontypes.Sequence, error) - Bind(ctx context.Context, binding commontypes.BoundContract) error + Bind(binding commontypes.BoundContract) SetCodec(codec commontypes.RemoteCodec) - Register(ctx context.Context) error - Unregister(ctx context.Context) error } diff --git a/core/services/relay/evm/bindings.go b/core/services/relay/evm/bindings.go index bac17d798e1..74915d198b8 100644 --- a/core/services/relay/evm/bindings.go +++ b/core/services/relay/evm/bindings.go @@ -3,23 +3,28 @@ package evm import ( "context" "fmt" + "sync" + + "github.com/ethereum/go-ethereum/common" commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" ) // key is contract name -type contractBindings map[string]readBindings +type bindings map[string]*contractBindings -type readBindings struct { +type contractBindings struct { // contractFilter is used to filter over all events or any subset of events with same filtering parameters. // if an event is present in the contract filter, it can't define its own filter in the event binding. - contractFilter logpoller.Filter + contractFilter logpoller.Filter + filterLock sync.Mutex + areEventFiltersRegistered bool // key is read name bindings map[string]readBinding } -func (b contractBindings) GetReadBinding(contractName, readName string) (readBinding, error) { +func (b bindings) GetReadBinding(contractName, readName string) (readBinding, error) { rb, rbExists := b[contractName] if !rbExists { return nil, fmt.Errorf("%w: no contract named %s", commontypes.ErrInvalidType, contractName) @@ -32,37 +37,79 @@ func (b contractBindings) GetReadBinding(contractName, readName string) (readBin return reader, nil } -func (b contractBindings) AddReadBinding(contractName, readName string, reader readBinding) { +func (b bindings) AddReadBinding(contractName, readName string, rb readBinding) { rbs, rbsExists := b[contractName] if !rbsExists { - rbs = readBindings{} + rbs = &contractBindings{} b[contractName] = rbs } - rbs.bindings[readName] = reader + rbs.bindings[readName] = rb } -func (b contractBindings) Bind(ctx context.Context, boundContracts []commontypes.BoundContract) error { +func (b bindings) Bind(ctx context.Context, logPoller logpoller.LogPoller, boundContracts []commontypes.BoundContract) error { for _, bc := range boundContracts { rbs, rbsExist := b[bc.Name] if !rbsExist { return fmt.Errorf("%w: no contract named %s", commontypes.ErrInvalidConfig, bc.Name) } + + rbs.contractFilter.Addresses = append(rbs.contractFilter.Addresses, common.HexToAddress(bc.Address)) + rbs.contractFilter.Name = logpoller.FilterName(bc.Name, bc.Address) + + if err := rbs.UnregisterEventFilters(ctx, logPoller); err != nil { + return err + } + + // if contract event filter isn't already registered then it will be by chain reader on startup + // if it is already registered then we are overriding filters registered on startup + if rbs.areEventFiltersRegistered { + return rbs.RegisterEventFilters(ctx, logPoller) + } + for _, r := range rbs.bindings { - if err := r.Bind(ctx, bc); err != nil { - return err - } + r.Bind(bc) } } return nil } -func (b contractBindings) ForEach(ctx context.Context, fn func(readBinding, context.Context) error) error { +func (b bindings) ForEach(ctx context.Context, fn func(context.Context, *contractBindings) error) error { for _, rbs := range b { - for _, rb := range rbs.bindings { - if err := fn(rb, ctx); err != nil { - return err - } + if err := fn(ctx, rbs); err != nil { + return err } } return nil } + +func (rb *contractBindings) RegisterEventFilters(ctx context.Context, logPoller logpoller.LogPoller) error { + rb.filterLock.Lock() + defer rb.filterLock.Unlock() + + rb.areEventFiltersRegistered = true + + if logPoller.HasFilter(rb.contractFilter.Name) { + return nil + } + + if err := logPoller.RegisterFilter(ctx, rb.contractFilter); err != nil { + return fmt.Errorf("%w: %w", commontypes.ErrInternal, err) + } + + return nil + +} + +func (rb *contractBindings) UnregisterEventFilters(ctx context.Context, logPoller logpoller.LogPoller) error { + rb.filterLock.Lock() + defer rb.filterLock.Unlock() + + if !logPoller.HasFilter(rb.contractFilter.Name) { + return nil + } + + if err := logPoller.UnregisterFilter(ctx, rb.contractFilter.Name); err != nil { + return fmt.Errorf("%w: %w", commontypes.ErrInternal, err) + } + return nil +} diff --git a/core/services/relay/evm/chain_reader.go b/core/services/relay/evm/chain_reader.go index 213816aeacd..dda0da7a781 100644 --- a/core/services/relay/evm/chain_reader.go +++ b/core/services/relay/evm/chain_reader.go @@ -8,17 +8,14 @@ import ( "time" "github.com/ethereum/go-ethereum/accounts/abi" - "github.com/google/uuid" "github.com/smartcontractkit/chainlink-common/pkg/codec" - "github.com/smartcontractkit/chainlink-common/pkg/types/query" - commonservices "github.com/smartcontractkit/chainlink-common/pkg/services" commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/types/query" evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" - evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" @@ -33,7 +30,7 @@ type chainReader struct { lggr logger.Logger lp logpoller.LogPoller client evmclient.Client - contractBindings contractBindings + contractBindings bindings parsed *parsedTypes codec commontypes.RemoteCodec commonservices.StateMachine @@ -48,7 +45,7 @@ func NewChainReaderService(ctx context.Context, lggr logger.Logger, lp logpoller lggr: lggr.Named("ChainReader"), lp: lp, client: client, - contractBindings: contractBindings{}, + contractBindings: bindings{}, parsed: &parsedTypes{encoderDefs: map[string]types.CodecEntry{}, decoderDefs: map[string]types.CodecEntry{}}, } @@ -61,8 +58,10 @@ func NewChainReaderService(ctx context.Context, lggr logger.Logger, lp logpoller return nil, err } - err = cr.contractBindings.ForEach(ctx, func(b readBinding, c context.Context) error { - b.SetCodec(cr.codec) + err = cr.contractBindings.ForEach(ctx, func(c context.Context, rbs *contractBindings) error { + for _, rb := range rbs.bindings { + rb.SetCodec(cr.codec) + } return nil }) @@ -83,7 +82,7 @@ func (cr *chainReader) GetLatestValue(ctx context.Context, contractName, method } func (cr *chainReader) Bind(ctx context.Context, bindings []commontypes.BoundContract) error { - return cr.contractBindings.Bind(ctx, bindings) + return cr.contractBindings.Bind(ctx, cr.lp, bindings) } func (cr *chainReader) QueryKey(ctx context.Context, contractName string, filter query.KeyFilter, limitAndSort query.LimitAndSort, sequenceDataType any) ([]commontypes.Sequence, error) { @@ -119,13 +118,17 @@ func (cr *chainReader) init(chainContractReaders map[string]types.ChainContractR return err } } + + cr.contractBindings[contractName].contractFilter = chainContractReader.LogPollerFilter } return nil } func (cr *chainReader) Start(ctx context.Context) error { return cr.StartOnce("ChainReader", func() error { - return cr.contractBindings.ForEach(ctx, readBinding.Register) + return cr.contractBindings.ForEach(ctx, func(c context.Context, rbs *contractBindings) error { + return rbs.RegisterEventFilters(ctx, cr.lp) + }) }) } @@ -133,7 +136,9 @@ func (cr *chainReader) Close() error { return cr.StopOnce("ChainReader", func() error { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - return cr.contractBindings.ForEach(ctx, readBinding.Unregister) + return cr.contractBindings.ForEach(ctx, func(c context.Context, rbs *contractBindings) error { + return rbs.UnregisterEventFilters(ctx, cr.lp) + }) }) } @@ -209,27 +214,16 @@ func (cr *chainReader) addEvent(contractName, eventName string, a abi.ABI, chain } eventDefinitions := chainReaderDefinition.EventDefinitions - pollingFilter := eventDefinitions.PollingFilter eb := &eventBinding{ - contractName: contractName, - eventName: eventName, - lp: cr.lp, - logPollerFilter: logpoller.Filter{ - EventSigs: evmtypes.HashArray{event.ID}, - Topic2: pollingFilter.Topic2, - Topic3: pollingFilter.Topic3, - Topic4: pollingFilter.Topic4, - Retention: pollingFilter.Retention.Duration(), - MaxLogsKept: pollingFilter.MaxLogsKept, - LogsPerBlock: pollingFilter.LogsPerBlock, - }, + contractName: contractName, + eventName: eventName, + lp: cr.lp, hash: event.ID, inputInfo: inputInfo, inputModifier: inputModifier, codecTopicInfo: codecTopicInfo, topics: make(map[string]topicDetail), eventDataWords: eventDefinitions.GenericDataWordNames, - id: wrapItemType(contractName, eventName, false) + uuid.NewString(), } cr.contractBindings.AddReadBinding(contractName, eventName, eb) diff --git a/core/services/relay/evm/chain_reader_test.go b/core/services/relay/evm/chain_reader_test.go index fc62022f8b5..163fb2bd573 100644 --- a/core/services/relay/evm/chain_reader_test.go +++ b/core/services/relay/evm/chain_reader_test.go @@ -196,12 +196,12 @@ func (it *chainReaderInterfaceTester) Setup(t *testing.T) { EventWithFilterName: { ChainSpecificName: "Triggered", ReadType: types.Event, - EventInputFields: []string{"Field"}, + EventDefinitions: &types.EventDefinitions{InputFields: []string{"Field"}}, }, triggerWithDynamicTopic: { ChainSpecificName: triggerWithDynamicTopic, ReadType: types.Event, - EventInputFields: []string{"fieldHash"}, + EventDefinitions: &types.EventDefinitions{InputFields: []string{"fieldHash"}}, InputModifications: codec.ModifiersConfig{ &codec.RenameModifierConfig{Fields: map[string]string{"FieldHash": "Field"}}, }, @@ -209,7 +209,7 @@ func (it *chainReaderInterfaceTester) Setup(t *testing.T) { triggerWithAllTopics: { ChainSpecificName: triggerWithAllTopics, ReadType: types.Event, - EventInputFields: []string{"Field1", "Field2", "Field3"}, + EventDefinitions: &types.EventDefinitions{InputFields: []string{"Field1", "Field2", "Field3"}}, }, MethodReturningSeenStruct: { ChainSpecificName: "returnSeen", diff --git a/core/services/relay/evm/event_binding.go b/core/services/relay/evm/event_binding.go index c94fff29fc3..8bbf75a0332 100644 --- a/core/services/relay/evm/event_binding.go +++ b/core/services/relay/evm/event_binding.go @@ -5,7 +5,6 @@ import ( "fmt" "reflect" "strings" - "sync" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" @@ -21,27 +20,23 @@ import ( ) type eventBinding struct { - address common.Address - contractName string - eventName string - logPollerFilter logpoller.Filter - lp logpoller.LogPoller - hash common.Hash - codec commontypes.RemoteCodec - pending bool - bound bool - registerCalled bool - lock sync.Mutex - inputInfo types.CodecEntry - inputModifier codec.Modifier - codecTopicInfo types.CodecEntry + address common.Address + contractName string + eventName string + lp logpoller.LogPoller + hash common.Hash + codec commontypes.RemoteCodec + pending bool + bound bool + inputInfo types.CodecEntry + inputModifier codec.Modifier + codecTopicInfo types.CodecEntry // topics maps a generic topic name (key) to topic data topics map[string]topicDetail // eventDataWords maps a generic name to a word index // key is a predefined generic name for evm log event data word // for eg. first evm data word(32bytes) of USDC log event is value so the key can be called value eventDataWords map[string]uint8 - id string } type topicDetail struct { @@ -55,40 +50,12 @@ func (e *eventBinding) SetCodec(codec commontypes.RemoteCodec) { e.codec = codec } -func (e *eventBinding) Register(ctx context.Context) error { - e.lock.Lock() - defer e.lock.Unlock() - - e.registerCalled = true - if !e.bound || e.lp.HasFilter(e.id) { - return nil - } - - if err := e.lp.RegisterFilter(ctx, e.logPollerFilter); err != nil { - return fmt.Errorf("%w: %w", commontypes.ErrInternal, err) - } - return nil -} - -func (e *eventBinding) Unregister(ctx context.Context) error { - e.lock.Lock() - defer e.lock.Unlock() - - if !e.lp.HasFilter(e.id) { - return nil - } - - if err := e.lp.UnregisterFilter(ctx, e.id); err != nil { - return fmt.Errorf("%w: %w", commontypes.ErrInternal, err) - } - return nil -} - func (e *eventBinding) GetLatestValue(ctx context.Context, params, into any) error { if !e.bound { return fmt.Errorf("%w: event not bound", commontypes.ErrInvalidType) } + // TODO BCF-3247 change GetLatestValue to use chain agnostic confidence levels confs := evmtypes.Finalized if e.pending { confs = evmtypes.Unconfirmed @@ -131,20 +98,10 @@ func (e *eventBinding) QueryKey(ctx context.Context, filter query.KeyFilter, lim return e.decodeLogsIntoSequences(ctx, logs, sequenceDataType) } -func (e *eventBinding) Bind(ctx context.Context, binding commontypes.BoundContract) error { - if err := e.Unregister(ctx); err != nil { - return err - } - +func (e *eventBinding) Bind(binding commontypes.BoundContract) { e.address = common.HexToAddress(binding.Address) - e.logPollerFilter.Name = logpoller.FilterName(e.id, e.address) - e.logPollerFilter.Addresses = evmtypes.AddressArray{e.address} + e.pending = binding.Pending e.bound = true - - if e.registerCalled { - return e.Register(ctx) - } - return nil } func (e *eventBinding) getLatestValueWithoutFilters(ctx context.Context, confs evmtypes.Confirmations, into any) error { diff --git a/core/services/relay/evm/method_binding.go b/core/services/relay/evm/method_binding.go index 7484d17c3ef..ed6c204315a 100644 --- a/core/services/relay/evm/method_binding.go +++ b/core/services/relay/evm/method_binding.go @@ -68,8 +68,7 @@ func (m *methodBinding) QueryKey(_ context.Context, _ query.KeyFilter, _ query.L return nil, nil } -func (m *methodBinding) Bind(_ context.Context, binding commontypes.BoundContract) error { +func (m *methodBinding) Bind(binding commontypes.BoundContract) { m.address = common.HexToAddress(binding.Address) m.bound = true - return nil } diff --git a/core/services/relay/evm/types/types.go b/core/services/relay/evm/types/types.go index 0d9402241cc..d9cf51a317a 100644 --- a/core/services/relay/evm/types/types.go +++ b/core/services/relay/evm/types/types.go @@ -16,9 +16,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/codec" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types" - evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" - "github.com/smartcontractkit/chainlink/v2/core/store/models" - + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" ) @@ -40,7 +38,7 @@ type ChainCodecConfig struct { type ChainContractReader struct { ContractABI string `json:"contractABI" toml:"contractABI"` - ContractPollingFilter ContractPollingFilter `json:"contractPollingFilter"` + LogPollerFilter logpoller.Filter `json:"logPollerFilter,omitempty"` // key is genericName from config Configs map[string]*ChainReaderDefinition `json:"configs" toml:"configs"` } @@ -48,7 +46,6 @@ type ChainContractReader struct { type ChainReaderDefinition chainReaderDefinitionFields type EventDefinitions struct { - PollingFilter PollingFilter `json:"pollingFilter"` // GenericTopicNames helps QueryingKeys not rely on EVM specific topic names. Key is chain specific name, value is generic name. // This helps us translate chain agnostic querying key "transfer-value" to EVM specific "evmTransferEvent-weiAmountTopic". GenericTopicNames map[string]string `json:"genericTopicNames,omitempty"` @@ -119,21 +116,6 @@ func (r *ReadType) UnmarshalText(text []byte) error { return fmt.Errorf("unrecognized ReadType: %s", string(text)) } -type PollingFilter struct { - Topic2 evmtypes.HashArray `json:"topic2"` // list of possible values for topic2 - Topic3 evmtypes.HashArray `json:"topic3"` // list of possible values for topic3 - Topic4 evmtypes.HashArray `json:"topic4"` // list of possible values for topic4 - Retention models.Interval `json:"retention"` // maximum amount of time to retain logs - MaxLogsKept uint64 `json:"maxLogsKept"` // maximum number of logs to retain ( 0 = unlimited ) - LogsPerBlock uint64 `json:"logsPerBlock"` // rate limit ( maximum # of logs per block, 0 = unlimited ) -} - -type ContractPollingFilter struct { - EventKeys []string `json:"eventKeys"` // list of possible values for eventsig (aka topic1) - // contract wide polling filter - PollingFilter PollingFilter -} - type RelayConfig struct { ChainID *big.Big `json:"chainID"` FromBlock uint64 `json:"fromBlock"`