Skip to content

Commit

Permalink
Merge branch 'ccip-develop' of github.com:smartcontractkit/ccip into …
Browse files Browse the repository at this point in the history
…ccip-2645
  • Loading branch information
b-gopalswami committed Jul 8, 2024
2 parents 4e437e8 + 5db8286 commit 5a3c6cd
Show file tree
Hide file tree
Showing 13 changed files with 521 additions and 182 deletions.
31 changes: 9 additions & 22 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"strconv"
"time"

chainselectors "github.com/smartcontractkit/chain-selectors"
"gopkg.in/guregu/null.v4"

cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
Expand All @@ -18,12 +18,12 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
chainselectors "github.com/smartcontractkit/chain-selectors"
"github.com/smartcontractkit/libocr/commontypes"
libocr2 "github.com/smartcontractkit/libocr/offchainreporting2plus"
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"
"google.golang.org/grpc"
"gopkg.in/guregu/null.v4"

ocr2keepers20 "github.com/smartcontractkit/chainlink-automation/pkg/v2"
ocr2keepers20config "github.com/smartcontractkit/chainlink-automation/pkg/v2/config"
Expand All @@ -32,7 +32,6 @@ import (
ocr2keepers20runner "github.com/smartcontractkit/chainlink-automation/pkg/v2/runner"
ocr2keepers21config "github.com/smartcontractkit/chainlink-automation/pkg/v3/config"
ocr2keepers21 "github.com/smartcontractkit/chainlink-automation/pkg/v3/plugin"

commonlogger "github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/loop"
"github.com/smartcontractkit/chainlink-common/pkg/loop/reportingplugins"
Expand All @@ -41,7 +40,6 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/types"
llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"
"github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox"

"github.com/smartcontractkit/chainlink-vrf/altbn_128"
dkgpkg "github.com/smartcontractkit/chainlink-vrf/dkg"
"github.com/smartcontractkit/chainlink-vrf/ocr2vrf"
Expand Down Expand Up @@ -279,12 +277,12 @@ func (d *Delegate) JobType() job.Type {
return job.OffchainReporting2
}

func (d *Delegate) BeforeJobCreated(spec job.Job) {
func (d *Delegate) BeforeJobCreated(_ job.Job) {
// This is only called first time the job is created
d.isNewlyCreatedJob = true
}
func (d *Delegate) AfterJobCreated(spec job.Job) {}
func (d *Delegate) BeforeJobDeleted(spec job.Job) {}
func (d *Delegate) AfterJobCreated(_ job.Job) {}
func (d *Delegate) BeforeJobDeleted(_ job.Job) {}
func (d *Delegate) OnDeleteJob(ctx context.Context, jb job.Job) error {
// If the job spec is malformed in any way, we report the error but return nil so that
// the job deletion itself isn't blocked.
Expand Down Expand Up @@ -1287,10 +1285,10 @@ func (d *Delegate) newServicesOCR2VRF(
lggr.ErrorIf(d.jobORM.RecordError(ctx, jb.ID, msg), "unable to record error")
})
dkgReportingPluginFactoryDecorator := func(wrapped ocrtypes.ReportingPluginFactory) ocrtypes.ReportingPluginFactory {
return promwrapper.NewPromFactory(wrapped, "DKG", string(relay.NetworkEVM), chain.ID())
return promwrapper.NewPromFactory(wrapped, "DKG", relay.NetworkEVM, chain.ID())
}
vrfReportingPluginFactoryDecorator := func(wrapped ocrtypes.ReportingPluginFactory) ocrtypes.ReportingPluginFactory {
return promwrapper.NewPromFactory(wrapped, "OCR2VRF", string(relay.NetworkEVM), chain.ID())
return promwrapper.NewPromFactory(wrapped, "OCR2VRF", relay.NetworkEVM, chain.ID())
}
noopMonitoringEndpoint := telemetry.NoopAgent{}
oracles, err2 := ocr2vrf.NewOCR2VRF(ocr2vrf.DKGVRFArgs{
Expand Down Expand Up @@ -1815,11 +1813,6 @@ func (d *Delegate) newServicesCCIPCommit(ctx context.Context, lggr logger.Sugare
return nil, ErrJobSpecNoRelayer{Err: err, PluginName: string(spec.PluginType)}
}

dstChain, err := d.legacyChains.Get(dstRid.ChainID)
if err != nil {
return nil, fmt.Errorf("ccip services; failed to get chain %s: %w", dstRid.ChainID, err)
}

logError := func(msg string) {
lggr.ErrorIf(d.jobORM.RecordError(context.Background(), jb.ID, msg), "unable to record error")
}
Expand All @@ -1846,12 +1839,6 @@ func (d *Delegate) newServicesCCIPCommit(ctx context.Context, lggr logger.Sugare
return nil, err
}

srcChainIDstr := strconv.FormatUint(srcChainID, 10)
srcChain, err := d.legacyChains.Get(srcChainIDstr)
if err != nil {
return nil, fmt.Errorf("open source chain: %w", err)
}

oracleArgsNoPlugin := libocr2.OCR2OracleArgs{
BinaryNetworkEndpointFactory: d.peerWrapper.Peer2,
V2Bootstrappers: bootstrapPeers,
Expand All @@ -1871,7 +1858,7 @@ func (d *Delegate) newServicesCCIPCommit(ctx context.Context, lggr logger.Sugare
MetricsRegisterer: prometheus.WrapRegistererWith(map[string]string{"job_name": jb.Name.ValueOrZero()}, prometheus.DefaultRegisterer),
}

return ccipcommit.NewCommitServices(ctx, d.ds, srcProvider, dstProvider, srcChain, dstChain, d.legacyChains, jb, lggr, d.pipelineRunner, oracleArgsNoPlugin, d.isNewlyCreatedJob, int64(srcChainID), dstChainID, logError)
return ccipcommit.NewCommitServices(ctx, d.ds, srcProvider, dstProvider, d.legacyChains, jb, lggr, d.pipelineRunner, oracleArgsNoPlugin, d.isNewlyCreatedJob, int64(srcChainID), dstChainID, logError)
}

func newCCIPCommitPluginBytes(isSourceProvider bool, sourceStartBlock uint64, destStartBlock uint64) config.CommitPluginConfig {
Expand Down Expand Up @@ -1985,7 +1972,7 @@ func (d *Delegate) ccipCommitGetSrcProvider(ctx context.Context, jb job.Job, plu
func (d *Delegate) newServicesCCIPExecution(ctx context.Context, lggr logger.SugaredLogger, jb job.Job, bootstrapPeers []commontypes.BootstrapperLocator, kb ocr2key.KeyBundle, ocrDB *db, lc ocrtypes.LocalConfig, transmitterID string) ([]job.ServiceCtx, error) {
spec := jb.OCR2OracleSpec
if spec.Relay != relay.NetworkEVM {
return nil, fmt.Errorf("Non evm chains are not supported for CCIP execution")
return nil, fmt.Errorf("non evm chains are not supported for CCIP execution")
}
dstRid, err := spec.RelayID()

Expand Down
1 change: 0 additions & 1 deletion core/services/ocr2/plugins/ccip/ccipcommit/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ type CommitReportingPluginFactory struct {
}

// NewCommitReportingPluginFactory return a new CommitReportingPluginFactory.
// TODO: wire through a GasEstimatorProvider here to solve two relayer one provider problem for CommitStoreReader
func NewCommitReportingPluginFactory(config CommitPluginStaticConfig) *CommitReportingPluginFactory {
return &CommitReportingPluginFactory{
config: config,
Expand Down
16 changes: 5 additions & 11 deletions core/services/ocr2/plugins/ccip/ccipcommit/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
)

func NewCommitServices(ctx context.Context, ds sqlutil.DataSource, srcProvider commontypes.CCIPCommitProvider, dstProvider commontypes.CCIPCommitProvider, srcChain legacyevm.Chain, dstChain legacyevm.Chain, chainSet legacyevm.LegacyChainContainer, jb job.Job, lggr logger.Logger, pr pipeline.Runner, argsNoPlugin libocr2.OCR2OracleArgs, new bool, sourceChainID int64, destChainID int64, logError func(string)) ([]job.ServiceCtx, error) {
func NewCommitServices(ctx context.Context, ds sqlutil.DataSource, srcProvider commontypes.CCIPCommitProvider, dstProvider commontypes.CCIPCommitProvider, chainSet legacyevm.LegacyChainContainer, jb job.Job, lggr logger.Logger, pr pipeline.Runner, argsNoPlugin libocr2.OCR2OracleArgs, new bool, sourceChainID int64, destChainID int64, logError func(string)) ([]job.ServiceCtx, error) {
spec := jb.OCR2OracleSpec

var pluginConfig ccipconfig.CommitPluginJobSpecConfig
Expand All @@ -52,25 +52,19 @@ func NewCommitServices(ctx context.Context, ds sqlutil.DataSource, srcProvider c
return nil, err
}

// TODO CCIP-2493 EVM family specific behavior leaked for CommitStore, which requires access to two relayers
versionFinder := factory.NewEvmVersionFinder()
commitStoreAddress := common.HexToAddress(spec.ContractID)
sourceMaxGasPrice := srcChain.Config().EVM().GasEstimator().PriceMax().ToInt()
commitStoreReader, err := ccip.NewCommitStoreReader(lggr, versionFinder, ccipcalc.EvmAddrToGeneric(commitStoreAddress), dstChain.Client(), dstChain.LogPoller())
if err != nil {
return nil, err
}

err = commitStoreReader.SetGasEstimator(ctx, srcChain.GasEstimator())
srcCommitStore, err := srcProvider.NewCommitStoreReader(ctx, ccipcalc.EvmAddrToGeneric(commitStoreAddress))
if err != nil {
return nil, err
}

err = commitStoreReader.SetSourceMaxGasPrice(ctx, sourceMaxGasPrice)
dstCommitStore, err := dstProvider.NewCommitStoreReader(ctx, ccipcalc.EvmAddrToGeneric(commitStoreAddress))
if err != nil {
return nil, err
}

var commitStoreReader ccipdata.CommitStoreReader
commitStoreReader = ccip.NewProviderProxyCommitStoreReader(srcCommitStore, dstCommitStore)
commitLggr := lggr.Named("CCIPCommit").With("sourceChain", sourceChainID, "destChain", destChainID)

var priceGetter pricegetter.PriceGetter
Expand Down
16 changes: 16 additions & 0 deletions core/services/ocr2/plugins/ccip/exportinternal.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ccip
import (
"context"
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"

Expand All @@ -17,6 +18,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/batchreader"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/ccipdataprovider"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/factory"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/pricegetter"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/rpclib"
)
Expand Down Expand Up @@ -101,3 +103,17 @@ func (c *ChainAgnosticPriceRegistry) NewPriceRegistryReader(ctx context.Context,
func NewChainAgnosticPriceRegistry(provider ChainAgnosticPriceRegistryFactory) *ChainAgnosticPriceRegistry {
return &ChainAgnosticPriceRegistry{provider}
}

type JSONCommitOffchainConfigV1_2_0 = v1_2_0.JSONCommitOffchainConfig
type CommitOnchainConfig = ccipdata.CommitOnchainConfig

func NewCommitOffchainConfig(
gasPriceDeviationPPB uint32,
gasPriceHeartBeat time.Duration,
tokenPriceDeviationPPB uint32,
tokenPriceHeartBeat time.Duration,
inflightCacheExpiry time.Duration,
priceReportingDisabled bool,
) ccip.CommitOffchainConfig {
return ccipdata.NewCommitOffchainConfig(gasPriceDeviationPPB, gasPriceHeartBeat, tokenPriceDeviationPPB, tokenPriceHeartBeat, inflightCacheExpiry, priceReportingDisabled)
}
61 changes: 32 additions & 29 deletions core/services/ocr2/plugins/ccip/internal/cache/commit_roots.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,38 +131,41 @@ func (s *commitRootsCache) Snooze(merkleRoot [32]byte) {
}

func (s *commitRootsCache) OldestRootTimestamp() time.Time {
messageVisibilityInterval := time.Now().Add(-s.messageVisibilityInterval)
timestamp, ok := s.pickOldestRootBlockTimestamp(messageVisibilityInterval)

if ok {
return timestamp
}

s.rootsQueueMu.Lock()
defer s.rootsQueueMu.Unlock()

// If rootsSearchFilter is before messageVisibilityInterval, it means that we have roots that are stuck forever and will never be executed
// In that case, we wipe out the entire queue. Next round should start from the messageVisibilityInterval and rebuild cache from scratch.
s.unexecutedRootsQueue = orderedmap.New[string, time.Time]()
return messageVisibilityInterval
return time.Now().Add(-s.messageVisibilityInterval)
// TODO we can't rely on block timestamps, because in case of re-org they can change and therefore affect
// the logic in the case. In the meantime, always fallback to the default behaviour and use permissionlessThresholdWindow
//timestamp, ok := s.pickOldestRootBlockTimestamp(messageVisibilityInterval)
//
//if ok {
// return timestamp
//}
//
//s.rootsQueueMu.Lock()
//defer s.rootsQueueMu.Unlock()
//
//// If rootsSearchFilter is before messageVisibilityInterval, it means that we have roots that are stuck forever and will never be executed
//// In that case, we wipe out the entire queue. Next round should start from the messageVisibilityInterval and rebuild cache from scratch.
//s.unexecutedRootsQueue = orderedmap.New[string, time.Time]()
//return messageVisibilityInterval
}

func (s *commitRootsCache) pickOldestRootBlockTimestamp(messageVisibilityInterval time.Time) (time.Time, bool) {
s.rootsQueueMu.RLock()
defer s.rootsQueueMu.RUnlock()

// If there are no roots in the queue, we can return the messageVisibilityInterval
if s.oldestRootTimestamp.IsZero() {
return messageVisibilityInterval, true
}
//func (s *commitRootsCache) pickOldestRootBlockTimestamp(messageVisibilityInterval time.Time) (time.Time, bool) {
// s.rootsQueueMu.RLock()
// defer s.rootsQueueMu.RUnlock()
//
// // If there are no roots in the queue, we can return the messageVisibilityInterval
// if s.oldestRootTimestamp.IsZero() {
// return messageVisibilityInterval, true
// }
//
// if s.oldestRootTimestamp.After(messageVisibilityInterval) {
// // Query used for fetching roots from the database is exclusive (block_timestamp > :timestamp)
// // so we need to subtract 1 second from the head timestamp to make sure that this root is included in the results
// return s.oldestRootTimestamp.Add(-time.Second), true
// }
// return time.Time{}, false
//}

if s.oldestRootTimestamp.After(messageVisibilityInterval) {
// Query used for fetching roots from the database is exclusive (block_timestamp > :timestamp)
// so we need to subtract 1 second from the head timestamp to make sure that this root is included in the results
return s.oldestRootTimestamp.Add(-time.Second), true
}
return time.Time{}, false
}
func (s *commitRootsCache) AppendUnexecutedRoot(merkleRoot [32]byte, blockTimestamp time.Time) {
prettyMerkleRoot := merkleRootToString(merkleRoot)

Expand Down
Loading

0 comments on commit 5a3c6cd

Please sign in to comment.