Skip to content

Commit

Permalink
Upgrade to ocr2keepers latest (#10179)
Browse files Browse the repository at this point in the history
* Upgrade to ocr2keepers latest

* update type

* add todo

* update

* update ocr2keepers

* added payload builder

* get latest block once

* update ocr2keepers

* Hook in payload builder to delegate

* hook in dependencies

* fix conditionals

* add missing file

* use state unknown from ocr2keepers

* update block subscriber to latest

* update

* update ocr2keepers

* remove logs

* small fixes

* fix transmit event provider

* fixes

* logs

* revert block subscriber changes

* fix build

* cleanup

* update block subscriber

* fix close

* add nil protection

* fix integration test

* go mod tidy

* fix tests

---------

Co-authored-by: amirylm <[email protected]>
  • Loading branch information
infiloop2 and amirylm authored Aug 15, 2023
1 parent 2b56c25 commit 1abdb05
Show file tree
Hide file tree
Showing 20 changed files with 129 additions and 46 deletions.
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20230802221916-2271752fa829
github.com/smartcontractkit/ocr2keepers v0.7.8
github.com/smartcontractkit/ocr2keepers v0.7.9
github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687
github.com/smartcontractkit/sqlx v1.3.5-0.20210805004948-4be295aacbeb
github.com/spf13/cobra v1.6.1
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1442,8 +1442,8 @@ github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f h1:hgJ
github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f/go.mod h1:MvMXoufZAtqExNexqi4cjrNYE9MefKddKylxjS+//n0=
github.com/smartcontractkit/libocr v0.0.0-20230802221916-2271752fa829 h1:fzefK1SzoRSHzZduOCzIJ2kmkBMPKwIf3FgeBlw7Jjk=
github.com/smartcontractkit/libocr v0.0.0-20230802221916-2271752fa829/go.mod h1:2lyRkw/qLQgUWlrWWmq5nj0y90rWeO6Y+v+fCakRgb0=
github.com/smartcontractkit/ocr2keepers v0.7.8 h1:Z7fSsbWj/9K0CNPN057jrfoEaPOi23B76YsOxxxmXjU=
github.com/smartcontractkit/ocr2keepers v0.7.8/go.mod h1:y5QUa8sHmrqz/LYIueB5RL8aeO71kh4F9BfaJ8rZHt0=
github.com/smartcontractkit/ocr2keepers v0.7.9 h1:Jh6LJci3vEnEZWV/T13rJRvT/CLSHX90LRLB8ISFyuQ=
github.com/smartcontractkit/ocr2keepers v0.7.9/go.mod h1:y5QUa8sHmrqz/LYIueB5RL8aeO71kh4F9BfaJ8rZHt0=
github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687 h1:NwC3SOc25noBTe1KUQjt45fyTIuInhoE2UfgcHAdihM=
github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687/go.mod h1:YYZq52t4wcHoMQeITksYsorD+tZcOyuVU5+lvot3VFM=
github.com/smartcontractkit/sqlx v1.3.5-0.20210805004948-4be295aacbeb h1:OMaBUb4X9IFPLbGbCHsMU+kw/BPCrewaVwWGIBc0I4A=
Expand Down
22 changes: 14 additions & 8 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,7 @@ func (d *Delegate) newServicesOCR2Keepers21(

mc := d.cfg.Mercury().Credentials(credName)

keeperProvider, rgstry, encoder, transmitEventProvider, logProvider, wrappedKey, blockSub, err2 := ocr2keeper.EVMDependencies21(jb, d.db, lggr, d.chainSet, d.pipelineRunner, mc, kb)
keeperProvider, rgstry, encoder, transmitEventProvider, logProvider, wrappedKey, blockSub, payloadBuilder, upkeepStateStore, up, err2 := ocr2keeper.EVMDependencies21(jb, d.db, lggr, d.chainSet, d.pipelineRunner, mc, kb)
if err2 != nil {
return nil, errors.Wrap(err2, "could not build dependencies for ocr2 keepers")
}
Expand Down Expand Up @@ -911,23 +911,28 @@ func (d *Delegate) newServicesOCR2Keepers21(
ContractTransmitter: evmrelay.NewKeepersOCR3ContractTransmitter(keeperProvider.ContractTransmitter()),
ContractConfigTracker: keeperProvider.ContractConfigTracker(),
KeepersDatabase: ocrDB,
LocalConfig: lc,
Logger: ocrLogger,
MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(spec.ContractID, synchronization.OCR2Automation),
OffchainConfigDigester: keeperProvider.OffchainConfigDigester(),
OffchainKeyring: kb,
OnchainKeyring: wrappedKey,
LocalConfig: lc,
LogProvider: logProvider,
EventProvider: transmitEventProvider,
Encoder: encoder,
Runnable: rgstry,
LogProvider: logProvider,
CacheExpiration: cfg.CacheExpiration.Value(),
CacheEvictionInterval: cfg.CacheEvictionInterval.Value(),
MaxServiceWorkers: cfg.MaxServiceWorkers,
ServiceQueueLength: cfg.ServiceQueueLength,
Encoder: encoder,
BlockSubscriber: blockSub,
RecoverableProvider: new(mockRecoverableProvider),
PayloadBuilder: payloadBuilder,
UpkeepProvider: up,
UpkeepStateUpdater: upkeepStateStore,
UpkeepTypeGetter: ocr2keeper21core.GetUpkeepType,
WorkIDGenerator: ocr2keeper21core.WorkIDGenerator,
// TODO: Clean up the config
CacheExpiration: cfg.CacheExpiration.Value(),
CacheEvictionInterval: cfg.CacheEvictionInterval.Value(),
MaxServiceWorkers: cfg.MaxServiceWorkers,
ServiceQueueLength: cfg.ServiceQueueLength,
}

pluginService, err := ocr2keepers21.NewDelegate(dConf)
Expand All @@ -950,6 +955,7 @@ func (d *Delegate) newServicesOCR2Keepers21(
runResultSaver,
keeperProvider,
rgstry,
blockSub,
transmitEventProvider,
pluginService,
}, nil
Expand Down
13 changes: 8 additions & 5 deletions core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (bs *BlockSubscriber) buildHistory(block int64) ocr2keepers.BlockHistory {
Hash: common.HexToHash(h),
})
} else {
bs.lggr.Infof("block %d is missing", block-i)
bs.lggr.Debugf("block %d is missing", block-i)
}
}
}
Expand All @@ -117,7 +117,7 @@ func (bs *BlockSubscriber) cleanup() {
bs.mu.Lock()
defer bs.mu.Unlock()

bs.lggr.Infof("start clearing blocks from %d to %d", bs.lastClearedBlock+1, bs.lastSentBlock-bs.blockSize)
bs.lggr.Debugf("start clearing blocks from %d to %d", bs.lastClearedBlock+1, bs.lastSentBlock-bs.blockSize)
for i := bs.lastClearedBlock + 1; i <= bs.lastSentBlock-bs.blockSize; i++ {
delete(bs.blocks, i)
}
Expand All @@ -142,15 +142,17 @@ func (bs *BlockSubscriber) Start(ctx context.Context) error {
bs.lggr.Errorf("failed to get log poller blocks", err)
}

_, bs.unsubscribe = bs.hb.Subscribe(&headWrapper{headC: bs.headC})
_, bs.unsubscribe = bs.hb.Subscribe(&headWrapper{headC: bs.headC, lggr: bs.lggr})

// poll from head broadcaster channel and push to subscribers
{
go func(ctx context.Context) {
for {
select {
case h := <-bs.headC:
bs.processHead(h)
if h != nil {
bs.processHead(h)
}
case <-ctx.Done():
return
}
Expand Down Expand Up @@ -184,7 +186,6 @@ func (bs *BlockSubscriber) Close() error {
bs.mu.Lock()
defer bs.mu.Unlock()

close(bs.headC)
bs.cancel()
bs.unsubscribe()
return nil
Expand Down Expand Up @@ -263,13 +264,15 @@ func (bs *BlockSubscriber) queryBlocksMap(bn int64) (string, bool) {

type headWrapper struct {
headC chan *evmtypes.Head
lggr logger.Logger
}

func (w *headWrapper) OnNewLongestChain(_ context.Context, head *evmtypes.Head) {
if head != nil {
select {
case w.headC <- head:
default:
w.lggr.Debugf("head channel is full, discarding head %+v", head)
}
}
}
12 changes: 12 additions & 0 deletions core/services/ocr2/plugins/ocr2keeper/evm21/core/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package core

import (
"context"

ocr2keepers "github.com/smartcontractkit/ocr2keepers/pkg/v3/types"
)

// UpkeepStateReader is the interface for reading the current state of upkeeps.
type UpkeepStateReader interface {
SelectByWorkIDsInRange(ctx context.Context, start, end int64, workIDs ...string) ([]ocr2keepers.UpkeepState, error)
}
7 changes: 7 additions & 0 deletions core/services/ocr2/plugins/ocr2keeper/evm21/core/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ func UpkeepWorkID(id *big.Int, trigger ocr2keepers.Trigger) (string, error) {
return hex.EncodeToString(hash[:]), nil
}

func WorkIDGenerator(u ocr2keepers.UpkeepIdentifier, trigger ocr2keepers.Trigger) string {
// Error should not happen here since we pass in a valid upkeepID
// TODO: Clean this up
id, _ := UpkeepWorkID(u.BigInt(), trigger)
return id
}

func NewUpkeepPayload(id *big.Int, trigger ocr2keepers.Trigger, checkData []byte) (ocr2keepers.UpkeepPayload, error) {
uid := &ocr2keepers.UpkeepIdentifier{}
ok := uid.FromBigInt(id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (p *logEventProvider) Close() error {
return nil
}

func (p *logEventProvider) BuildPayloads(ctx context.Context, proposals ...ocr2keepers.CoordinatedProposal) ([]ocr2keepers.UpkeepPayload, error) {
func (p *logEventProvider) BuildPayloads(ctx context.Context, proposals ...ocr2keepers.CoordinatedBlockProposal) ([]ocr2keepers.UpkeepPayload, error) {
// TODO: implement
return []ocr2keepers.UpkeepPayload{}, nil
}
Expand Down
48 changes: 48 additions & 0 deletions core/services/ocr2/plugins/ocr2keeper/evm21/payload_builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package evm

import (
"context"

ocr2keepers "github.com/smartcontractkit/ocr2keepers/pkg/v3/types"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evm21/core"
)

type payloadBuilder struct {
lggr logger.Logger
}

var _ ocr2keepers.PayloadBuilder = &payloadBuilder{}

func NewPayloadBuilder(lggr logger.Logger) *payloadBuilder {
return &payloadBuilder{
lggr: lggr,
}
}

func (b *payloadBuilder) BuildPayloads(ctx context.Context, proposals ...ocr2keepers.CoordinatedBlockProposal) ([]ocr2keepers.UpkeepPayload, error) {
payloads := make([]ocr2keepers.UpkeepPayload, len(proposals))
for i, p := range proposals {
b.lggr.Debugf("building payload for coordinated block proposal %+v", p)
var checkData []byte
switch core.GetUpkeepType(p.UpkeepID) {
case ocr2keepers.LogTrigger:
checkData = []byte{} // TODO: call recoverer
case ocr2keepers.ConditionTrigger:
// Trigger.BlockNumber and Trigger.BlockHash are already coordinated
checkData = []byte{} // CheckData derived on chain for conditionals
// TODO: check for upkeepID being active upkeep here
default:
}
payload, err := core.NewUpkeepPayload(p.UpkeepID.BigInt(), p.Trigger, checkData)
if err != nil {
b.lggr.Warnw("failed to build payload", "err", err, "upkeepID", p.UpkeepID)
payloads[i] = ocr2keepers.UpkeepPayload{}
continue
}
payloads[i] = payload
}

return payloads, nil
}
5 changes: 4 additions & 1 deletion core/services/ocr2/plugins/ocr2keeper/evm21/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ func (r *EvmRegistry) GetActiveUpkeepIDsByType(ctx context.Context, triggers ...
}

func (r *EvmRegistry) CheckUpkeeps(ctx context.Context, keys ...ocr2keepers.UpkeepPayload) ([]ocr2keepers.CheckResult, error) {
r.lggr.Debugw("Checking upkeeps", "upkeeps", keys)
chResult := make(chan checkResult, 1)
go r.doCheck(ctx, keys, chResult)

Expand Down Expand Up @@ -704,7 +705,9 @@ func (r *EvmRegistry) checkUpkeeps(ctx context.Context, payloads []ocr2keepers.U
return nil, err
}
default:
payload, err = r.abi.Pack("checkUpkeep", upkeepId)
// checkUpkeep is overloaded on the contract for conditionals and log upkeeps
// Need to use the first function (checkUpkeep0) for conditionals
payload, err = r.abi.Pack("checkUpkeep0", upkeepId)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func (c *TransmitEventProvider) convertToTransmitEvents(logs []transmitEventLog,
TransactionHash: l.TxHash,
WorkID: workID,
UpkeepID: *upkeepId,
CheckBlock: trigger.BlockNumber,
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func TestTransmitEventProvider_ConvertToTransmitEvents(t *testing.T) {
{
Type: ocr2keepers.PerformEvent,
UpkeepID: id,
CheckBlock: ocr2keepers.BlockNumber(0), // empty for log triggers
CheckBlock: ocr2keepers.BlockNumber(1), // empty for log triggers
},
},
false,
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestTransmitEventLog(t *testing.T) {
Trigger: []byte{1, 2, 3, 4, 5, 6, 7, 8},
},
},
ocr2keepers.TransmitEventType(3),
ocr2keepers.InsufficientFundsReportEvent,
},
{
"reorged",
Expand All @@ -220,7 +220,7 @@ func TestTransmitEventLog(t *testing.T) {
Trigger: []byte{1, 2, 3, 4, 5, 6, 7, 8},
},
},
ocr2keepers.TransmitEventType(2),
ocr2keepers.ReorgReportEvent,
},
{
"empty",
Expand All @@ -230,7 +230,7 @@ func TestTransmitEventLog(t *testing.T) {
BlockHash: common.HexToHash("0x010203040"),
},
},
ocr2keepers.TransmitEventType(0),
ocr2keepers.UnknownEvent,
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ var (
CacheExpiration = 24 * time.Hour
// GCInterval is the amount of time between cache cleanups.
GCInterval = 2 * time.Hour
// TODO: use sentinel value from ocr2keepers
StateUnknown = ocr2keepers.UpkeepState(100)
)

// UpkeepStateReader is the interface for reading the current state of upkeeps.
Expand Down Expand Up @@ -194,7 +192,7 @@ func (u *upkeepStateStore) selectFromCache(workIDs ...string) ([]ocr2keepers.Upk
states[i] = state.state
} else {
hasMisses = true
states[i] = StateUnknown
states[i] = ocr2keepers.UnknownState
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestUpkeepStateStore(t *testing.T) {
workIDsSelect: []string{"0x1", "0x2"},
expected: []ocr2keepers.UpkeepState{
ocr2keepers.Ineligible,
StateUnknown,
ocr2keepers.UnknownState,
},
},
{
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestUpkeepStateStore(t *testing.T) {
workIDsSelect: []string{"0x2"},
workIDsFromScanner: []string{},
expected: []ocr2keepers.UpkeepState{
StateUnknown,
ocr2keepers.UnknownState,
},
},
{
Expand Down
5 changes: 1 addition & 4 deletions core/services/ocr2/plugins/ocr2keeper/integration_21_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ func TestFilterNamesFromSpec21(t *testing.T) {
}

func TestIntegration_KeeperPluginConditionalUpkeep(t *testing.T) {
t.Skip() // TODO: unskip when conditional flow is ready

g := gomega.NewWithT(t)
lggr := logger.TestLogger(t)

Expand Down Expand Up @@ -173,7 +171,6 @@ func TestIntegration_KeeperPluginConditionalUpkeep(t *testing.T) {
}

func TestIntegration_KeeperPluginLogUpkeep(t *testing.T) {
t.Skip() // TODO: Auto-4282, Run this in CI once the tests properly waits instead of timers
g := gomega.NewWithT(t)

// setup blockchain
Expand Down Expand Up @@ -394,7 +391,7 @@ func setupNodes(t *testing.T, nodeKeys [5]ethkey.KeyV2, registry *iregistry21.IK
5*time.Second, // deltaProgress time.Duration,
10*time.Second, // deltaResend time.Duration,
100*time.Millisecond, // deltaInitial time.Duration,
2500*time.Millisecond, // deltaRound time.Duration,
1000*time.Millisecond, // deltaRound time.Duration,
40*time.Millisecond, // deltaGrace time.Duration,
200*time.Millisecond, // deltaRequestCertifiedCommit time.Duration,
30*time.Second, // deltaStage time.Duration,
Expand Down
Loading

0 comments on commit 1abdb05

Please sign in to comment.