Skip to content

Commit

Permalink
Feat/ccip 2466 HomeChain config syncer (#978)
Browse files Browse the repository at this point in the history
## Motivation

Plugins need to refresh HomeChain config regularly to know which oracles
support which chains and the latest F for each chain.

## Solution

* Add HomeChainConfigPoller which polls chain configs and maps them to internal configs
* Commit plugin uses HomeChainConfigPoller to access required data
  • Loading branch information
asoliman92 authored Jun 24, 2024
1 parent 8d34b38 commit a9370e1
Show file tree
Hide file tree
Showing 11 changed files with 822 additions and 212 deletions.
17 changes: 11 additions & 6 deletions core/services/ocr3/plugins/ccip/commit/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ import (

"github.com/smartcontractkit/ccipocr3/internal/reader"

"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
"google.golang.org/grpc"

cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"

"github.com/smartcontractkit/libocr/commontypes"
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types"
libocrtypes "github.com/smartcontractkit/libocr/ragep2p/types"
)

// PluginFactoryConstructor implements common OCR3ReportingPluginClient and is used for initializing a plugin factory
Expand Down Expand Up @@ -48,22 +51,24 @@ func NewPluginFactory() *PluginFactory {

func (p PluginFactory) NewReportingPlugin(config ocr3types.ReportingPluginConfig,
) (ocr3types.ReportingPlugin[[]byte], ocr3types.ReportingPluginInfo, error) {
var oracleIDToP2pID map[commontypes.OracleID]libocrtypes.PeerID // TODO: Get this from ocr config, it's the mapping of the oracleId index in the DON
onChainTokenPricesReader := reader.NewOnchainTokenPricesReader(
reader.TokenPriceConfig{ // TODO: Inject config
StaticPrices: map[ocr2types.Account]big.Int{},
},
nil, // TODO: Inject this
)

return NewPlugin(
context.Background(),
config.OracleID,
oracleIDToP2pID,
cciptypes.CommitPluginConfig{},
nil,
nil, //ccipReader
onChainTokenPricesReader,
nil,
nil,
nil,
nil, //reportCodec
nil, //msgHasher
nil, // lggr
nil, //homeChainPoller
), ocr3types.ReportingPluginInfo{}, nil
}

Expand Down
120 changes: 79 additions & 41 deletions core/services/ocr3/plugins/ccip/commit/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@ import (
"time"

mapset "github.com/deckarep/golang-set/v2"

"github.com/smartcontractkit/ccipocr3/internal/reader"

"github.com/smartcontractkit/libocr/commontypes"
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"
libocrtypes "github.com/smartcontractkit/libocr/ragep2p/types"

"github.com/smartcontractkit/ccipocr3/internal/libs/slicelib"

Expand All @@ -23,53 +27,39 @@ import (
// NOTE: If you are changing core plugin logic, you should also update the commit plugin python spec.
type Plugin struct {
nodeID commontypes.OracleID
oracleIDToP2pID map[commontypes.OracleID]libocrtypes.PeerID
cfg cciptypes.CommitPluginConfig
ccipReader cciptypes.CCIPReader
tokenPricesReader cciptypes.TokenPricesReader
reportCodec cciptypes.CommitPluginCodec
msgHasher cciptypes.MessageHasher
lggr logger.Logger

// readableChains is the set of chains that the plugin can read from.
readableChains mapset.Set[cciptypes.ChainSelector]
// knownSourceChains is the set of chains that the plugin knows about.
knownSourceChains mapset.Set[cciptypes.ChainSelector]
homeChainPoller reader.HomeChainPoller
}

// TODO: background service for home chain config polling

func NewPlugin(
_ context.Context,
nodeID commontypes.OracleID,
oracleIDToP2pID map[commontypes.OracleID]libocrtypes.PeerID,
cfg cciptypes.CommitPluginConfig,
ccipReader cciptypes.CCIPReader,
tokenPricesReader cciptypes.TokenPricesReader,
reportCodec cciptypes.CommitPluginCodec,
msgHasher cciptypes.MessageHasher,
lggr logger.Logger,
homeChainPoller reader.HomeChainPoller,
) *Plugin {
knownSourceChains := mapset.NewSet[cciptypes.ChainSelector]()
for _, inf := range cfg.ObserverInfo {
var sources []cciptypes.ChainSelector
for _, chain := range inf.Reads {
if chain != cfg.DestChain {
sources = append(sources, chain)
}
}
knownSourceChains = knownSourceChains.Union(mapset.NewSet(sources...))
}

return &Plugin{
nodeID: nodeID,
oracleIDToP2pID: oracleIDToP2pID,
cfg: cfg,
ccipReader: ccipReader,
tokenPricesReader: tokenPricesReader,
reportCodec: reportCodec,
msgHasher: msgHasher,
lggr: lggr,

readableChains: mapset.NewSet(cfg.ObserverInfo[nodeID].Reads...),
knownSourceChains: knownSourceChains,
homeChainPoller: homeChainPoller,
}
}

Expand Down Expand Up @@ -102,8 +92,13 @@ func (p *Plugin) Query(_ context.Context, _ ocr3types.OutcomeContext) (types.Que
// We discover the token prices only for the tokens that are used to pay for ccip fees.
// The fee tokens are configured in the plugin config.
func (p *Plugin) Observation(ctx context.Context, outctx ocr3types.OutcomeContext, _ types.Query) (types.Observation, error) {
supportedChains, err := p.supportedChains()
if err != nil {
return types.Observation{}, fmt.Errorf("error finding supported chains by node: %w", err)
}

msgBaseDetails := make([]cciptypes.CCIPMsgBaseDetails, 0)
latestCommittedSeqNumsObservation, err := observeLatestCommittedSeqNums(ctx, p.lggr, p.ccipReader, p.readableChains, p.cfg.DestChain, p.knownSourceChains.ToSlice())
latestCommittedSeqNumsObservation, err := observeLatestCommittedSeqNums(ctx, p.lggr, p.ccipReader, supportedChains, p.cfg.DestChain, p.knownSourceChainsSlice())
if err != nil {
return types.Observation{}, fmt.Errorf("observe latest committed sequence numbers: %w", err)
}
Expand All @@ -126,11 +121,17 @@ func (p *Plugin) Observation(ctx context.Context, outctx ocr3types.OutcomeContex
if err != nil {
return types.Observation{}, fmt.Errorf("observe gas prices: %w", err)
}

fChain, err := p.homeChainPoller.GetFChain()
if err != nil {
return types.Observation{}, fmt.Errorf("get f chain: %w", err)
}

// If there's no previous outcome (first round ever), we only observe the latest committed sequence numbers.
// and on the next round we use those to look for messages.
if outctx.PreviousOutcome == nil {
p.lggr.Debugw("first round ever, can't observe new messages yet")
return cciptypes.NewCommitPluginObservation(msgBaseDetails, gasPrices, tokenPrices, latestCommittedSeqNumsObservation, p.cfg).Encode()
return cciptypes.NewCommitPluginObservation(msgBaseDetails, gasPrices, tokenPrices, latestCommittedSeqNumsObservation, fChain).Encode()
}

prevOutcome, err := cciptypes.DecodeCommitPluginOutcome(outctx.PreviousOutcome)
Expand All @@ -145,7 +146,7 @@ func (p *Plugin) Observation(ctx context.Context, outctx ocr3types.OutcomeContex
p.lggr,
p.ccipReader,
p.msgHasher,
p.readableChains,
supportedChains,
prevOutcome.MaxSeqNums, // TODO: Chainlink common PR to rename.
p.cfg.NewMsgScanBatchSize,
)
Expand All @@ -158,12 +159,14 @@ func (p *Plugin) Observation(ctx context.Context, outctx ocr3types.OutcomeContex
"gasPrices", len(gasPrices),
"tokenPrices", len(tokenPrices),
"latestCommittedSeqNums", latestCommittedSeqNumsObservation,
"observerInfo", p.cfg.ObserverInfo)
"fChain", fChain)

for _, msg := range newMsgs {
msgBaseDetails = append(msgBaseDetails, msg.CCIPMsgBaseDetails)
}
return cciptypes.NewCommitPluginObservation(msgBaseDetails, gasPrices, tokenPrices, latestCommittedSeqNumsObservation, p.cfg).Encode()

return cciptypes.NewCommitPluginObservation(msgBaseDetails, gasPrices, tokenPrices, latestCommittedSeqNumsObservation, fChain).Encode()

}

func (p *Plugin) ValidateObservation(_ ocr3types.OutcomeContext, _ types.Query, ao types.AttributedObservation) error {
Expand All @@ -176,7 +179,12 @@ func (p *Plugin) ValidateObservation(_ ocr3types.OutcomeContext, _ types.Query,
return fmt.Errorf("validate sequence numbers: %w", err)
}

if err := validateObserverReadingEligibility(ao.Observer, obs.NewMsgs, obs.MaxSeqNums, p.cfg.ObserverInfo); err != nil {
destSupportedChains, err := p.supportedChains()
if err != nil {
return fmt.Errorf("error finding supported chains by node: %w", err)
}

if err := validateObserverReadingEligibility(obs.NewMsgs, obs.MaxSeqNums, destSupportedChains, p.cfg.DestChain); err != nil {
return fmt.Errorf("validate observer %d reading eligibility: %w", ao.Observer, err)
}

Expand All @@ -188,10 +196,6 @@ func (p *Plugin) ValidateObservation(_ ocr3types.OutcomeContext, _ types.Query,
return fmt.Errorf("validate gas prices: %w", err)
}

if err := obs.PluginConfig.Validate(); err != nil {
return fmt.Errorf("validate plugin config: %w", err)
}

return nil
}

Expand All @@ -216,22 +220,17 @@ func (p *Plugin) Outcome(_ ocr3types.OutcomeContext, _ types.Query, aos []types.
decodedObservations = append(decodedObservations, obs)
}

cfg := pluginConfigConsensus(p.cfg, decodedObservations)
p.lggr.Debugw("plugin config follower state", "pluginConfig", p.cfg)
p.lggr.Debugw("plugin config after consensus", "pluginConfig", cfg)
if err := cfg.Validate(); err != nil {
return ocr3types.Outcome{}, fmt.Errorf("no consensus on plugin config: %w", err)
}
fChains := fChainConsensus(decodedObservations)

fChainDest, ok := cfg.FChain[cfg.DestChain]
fChainDest, ok := fChains[p.cfg.DestChain]
if !ok {
return ocr3types.Outcome{}, fmt.Errorf("missing destination chain %d in fChain config", p.cfg.DestChain)
}

maxSeqNums := maxSeqNumsConsensus(p.lggr, fChainDest, decodedObservations)
p.lggr.Debugw("max sequence numbers consensus", "maxSeqNumsConsensus", maxSeqNums)

merkleRoots, err := newMsgsConsensus(p.lggr, maxSeqNums, decodedObservations, cfg.FChain)
merkleRoots, err := newMsgsConsensus(p.lggr, maxSeqNums, decodedObservations, fChains)
if err != nil {
return ocr3types.Outcome{}, fmt.Errorf("new messages consensus: %w", err)
}
Expand All @@ -242,7 +241,7 @@ func (p *Plugin) Outcome(_ ocr3types.OutcomeContext, _ types.Query, aos []types.
return ocr3types.Outcome{}, fmt.Errorf("token prices consensus: %w", err)
}

gasPrices := gasPricesConsensus(p.lggr, decodedObservations, cfg.FChain[cfg.DestChain])
gasPrices := gasPricesConsensus(p.lggr, decodedObservations, fChainDest)
p.lggr.Debugw("gas prices consensus", "gasPrices", gasPrices)

outcome := cciptypes.NewCommitPluginOutcome(maxSeqNums, merkleRoots, tokenPrices, gasPrices)
Expand Down Expand Up @@ -293,7 +292,11 @@ func (p *Plugin) ShouldAcceptAttestedReport(ctx context.Context, u uint64, r ocr
}

func (p *Plugin) ShouldTransmitAcceptedReport(ctx context.Context, u uint64, r ocr3types.ReportWithInfo[[]byte]) (bool, error) {
if !p.cfg.ObserverInfo[p.nodeID].Writer {
isWriter, err := p.supportsDestChain()
if err != nil {
return false, fmt.Errorf("can't know if it's a writer: %w", err)
}
if !isWriter {
p.lggr.Debugw("not a writer, skipping report transmission")
return false, nil
}
Expand Down Expand Up @@ -325,10 +328,45 @@ func (p *Plugin) Close() error {
}

func (p *Plugin) knownSourceChainsSlice() []cciptypes.ChainSelector {
knownSourceChainsSlice := p.knownSourceChains.ToSlice()
knownSourceChains, err := p.homeChainPoller.GetKnownCCIPChains()
if err != nil {
p.lggr.Errorw("error getting known chains", "err", err)
return nil
}
knownSourceChainsSlice := knownSourceChains.ToSlice()
sort.Slice(knownSourceChainsSlice, func(i, j int) bool { return knownSourceChainsSlice[i] < knownSourceChainsSlice[j] })
return slicelib.Filter(knownSourceChainsSlice, func(ch cciptypes.ChainSelector) bool { return ch != p.cfg.DestChain })
}

func (p *Plugin) supportedChains() (mapset.Set[cciptypes.ChainSelector], error) {
p2pID, exists := p.oracleIDToP2pID[p.nodeID]
if !exists {
return nil, fmt.Errorf("oracle ID %d not found in oracleIDToP2pID", p.nodeID)
}
supportedChains, err := p.homeChainPoller.GetSupportedChainsForPeer(p2pID)
if err != nil {
p.lggr.Warnw("error getting supported chains", err)
return mapset.NewSet[cciptypes.ChainSelector](), fmt.Errorf("error getting supported chains: %w", err)
}

return supportedChains, nil
}

func (p *Plugin) getDestChainConfig() (reader.ChainConfig, error) {
cfg, err := p.homeChainPoller.GetChainConfig(p.cfg.DestChain)
if err != nil {
return reader.ChainConfig{}, fmt.Errorf("get chain config: %w", err)
}
return cfg, nil
}

func (p *Plugin) supportsDestChain() (bool, error) {
destChainConfig, err := p.getDestChainConfig()
if err != nil {
return false, fmt.Errorf("error getting chain config: %w", err)
}
return destChainConfig.SupportedNodes.Contains(p.oracleIDToP2pID[p.nodeID]), nil
}

// Interface compatibility checks.
var _ ocr3types.ReportingPlugin[[]byte] = &Plugin{}
Loading

0 comments on commit a9370e1

Please sign in to comment.