Skip to content

Commit

Permalink
Merge branch 'develop' into node_migration_version_issues
Browse files Browse the repository at this point in the history
  • Loading branch information
tateexon authored Apr 25, 2024
2 parents 3b7f1ec + aab1519 commit 933a464
Show file tree
Hide file tree
Showing 17 changed files with 602 additions and 98 deletions.
5 changes: 1 addition & 4 deletions .github/workflows/solidity.yml
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,7 @@ jobs:
uses: smartcontractkit/.github/actions/ci-publish-npm@4b0ab756abcb1760cb82e1e87b94ff431905bffc # [email protected]
with:
npm-token: ${{ secrets.NPM_TOKEN }}
github-token: ${{ secrets.GITHUB_TOKEN }}
github-release-tag-name: ${{ github.ref_name }}
github-release-changelog-path: "contracts/CHANGELOG.md"
create-github-release: true
create-github-release: false
publish-command: "pnpm publish-prod --no-git-checks"
package-json-directory: contracts

Expand Down
9 changes: 5 additions & 4 deletions core/capabilities/remote/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
Expand All @@ -12,7 +13,7 @@ import (
// remoteTargetCaller/Receiver are shims translating between capability API calls and network messages
type remoteTargetCaller struct {
capInfo commoncap.CapabilityInfo
donInfo *types.DON
donInfo *capabilities.DON
dispatcher types.Dispatcher
lggr logger.Logger
}
Expand All @@ -22,14 +23,14 @@ var _ types.Receiver = &remoteTargetCaller{}

type remoteTargetReceiver struct {
capInfo commoncap.CapabilityInfo
donInfo *types.DON
donInfo *capabilities.DON
dispatcher types.Dispatcher
lggr logger.Logger
}

var _ types.Receiver = &remoteTargetReceiver{}

func NewRemoteTargetCaller(capInfo commoncap.CapabilityInfo, donInfo *types.DON, dispatcher types.Dispatcher, lggr logger.Logger) *remoteTargetCaller {
func NewRemoteTargetCaller(capInfo commoncap.CapabilityInfo, donInfo *capabilities.DON, dispatcher types.Dispatcher, lggr logger.Logger) *remoteTargetCaller {
return &remoteTargetCaller{
capInfo: capInfo,
donInfo: donInfo,
Expand Down Expand Up @@ -72,7 +73,7 @@ func (c *remoteTargetCaller) Receive(msg *types.MessageBody) {
c.lggr.Debugw("not implemented - received message", "capabilityId", c.capInfo.ID, "payload", msg.Payload)
}

func NewRemoteTargetReceiver(capInfo commoncap.CapabilityInfo, donInfo *types.DON, dispatcher types.Dispatcher, lggr logger.Logger) *remoteTargetReceiver {
func NewRemoteTargetReceiver(capInfo commoncap.CapabilityInfo, donInfo *capabilities.DON, dispatcher types.Dispatcher, lggr logger.Logger) *remoteTargetReceiver {
return &remoteTargetReceiver{
capInfo: capInfo,
donInfo: donInfo,
Expand Down
4 changes: 2 additions & 2 deletions core/capabilities/remote/target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
remoteMocks "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types/mocks"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
Expand All @@ -18,7 +18,7 @@ import (
func TestTarget_Placeholder(t *testing.T) {
lggr := logger.TestLogger(t)
ctx := testutils.Context(t)
donInfo := &types.DON{
donInfo := &capabilities.DON{
Members: []p2ptypes.PeerID{{}},
}
dispatcher := remoteMocks.NewDispatcher(t)
Expand Down
6 changes: 3 additions & 3 deletions core/capabilities/remote/trigger_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ type triggerPublisher struct {
config types.RemoteTriggerConfig
underlying commoncap.TriggerCapability
capInfo commoncap.CapabilityInfo
capDonInfo types.DON
workflowDONs map[string]types.DON
capDonInfo commoncap.DON
workflowDONs map[string]commoncap.DON
dispatcher types.Dispatcher
messageCache *messageCache[registrationKey, p2ptypes.PeerID]
registrations map[registrationKey]*pubRegState
Expand All @@ -48,7 +48,7 @@ type pubRegState struct {
var _ types.Receiver = &triggerPublisher{}
var _ services.Service = &triggerPublisher{}

func NewTriggerPublisher(config types.RemoteTriggerConfig, underlying commoncap.TriggerCapability, capInfo commoncap.CapabilityInfo, capDonInfo types.DON, workflowDONs map[string]types.DON, dispatcher types.Dispatcher, lggr logger.Logger) *triggerPublisher {
func NewTriggerPublisher(config types.RemoteTriggerConfig, underlying commoncap.TriggerCapability, capInfo commoncap.CapabilityInfo, capDonInfo commoncap.DON, workflowDONs map[string]commoncap.DON, dispatcher types.Dispatcher, lggr logger.Logger) *triggerPublisher {
config.ApplyDefaults()
return &triggerPublisher{
config: config,
Expand Down
6 changes: 3 additions & 3 deletions core/capabilities/remote/trigger_publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ func TestTriggerPublisher_Register(t *testing.T) {
require.NoError(t, p1.UnmarshalText([]byte(peerID1)))
p2 := p2ptypes.PeerID{}
require.NoError(t, p2.UnmarshalText([]byte(peerID2)))
capDonInfo := remotetypes.DON{
capDonInfo := commoncap.DON{
ID: "capability-don",
Members: []p2ptypes.PeerID{p1},
F: 0,
}
workflowDonInfo := remotetypes.DON{
workflowDonInfo := commoncap.DON{
ID: "workflow-don",
Members: []p2ptypes.PeerID{p2},
F: 0,
Expand All @@ -47,7 +47,7 @@ func TestTriggerPublisher_Register(t *testing.T) {
MinResponsesToAggregate: 1,
MessageExpiryMs: 100_000,
}
workflowDONs := map[string]remotetypes.DON{
workflowDONs := map[string]commoncap.DON{
workflowDonInfo.ID: workflowDonInfo,
}
underlying := &testTrigger{
Expand Down
8 changes: 4 additions & 4 deletions core/capabilities/remote/trigger_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
sync "sync"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/services"
Expand All @@ -24,9 +25,9 @@ import (
type triggerSubscriber struct {
config types.RemoteTriggerConfig
capInfo commoncap.CapabilityInfo
capDonInfo types.DON
capDonInfo capabilities.DON
capDonMembers map[p2ptypes.PeerID]struct{}
localDonInfo types.DON
localDonInfo capabilities.DON
dispatcher types.Dispatcher
aggregator types.Aggregator
messageCache *messageCache[triggerEventKey, p2ptypes.PeerID]
Expand Down Expand Up @@ -54,8 +55,7 @@ var _ services.Service = &triggerSubscriber{}
// TODO makes this configurable with a default
const defaultSendChannelBufferSize = 1000

func NewTriggerSubscriber(config types.RemoteTriggerConfig, capInfo commoncap.CapabilityInfo, capDonInfo types.DON, localDonInfo types.DON,
dispatcher types.Dispatcher, aggregator types.Aggregator, lggr logger.Logger) *triggerSubscriber {
func NewTriggerSubscriber(config types.RemoteTriggerConfig, capInfo commoncap.CapabilityInfo, capDonInfo capabilities.DON, localDonInfo capabilities.DON, dispatcher types.Dispatcher, aggregator types.Aggregator, lggr logger.Logger) *triggerSubscriber {
if aggregator == nil {
lggr.Warnw("no aggregator provided, using default MODE aggregator", "capabilityId", capInfo.ID)
aggregator = NewDefaultModeAggregator(uint32(capDonInfo.F + 1))
Expand Down
4 changes: 2 additions & 2 deletions core/capabilities/remote/trigger_subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
require.NoError(t, p1.UnmarshalText([]byte(peerID1)))
p2 := p2ptypes.PeerID{}
require.NoError(t, p2.UnmarshalText([]byte(peerID2)))
capDonInfo := remotetypes.DON{
capDonInfo := commoncap.DON{
ID: "capability-don",
Members: []p2ptypes.PeerID{p1},
F: 0,
}
workflowDonInfo := remotetypes.DON{
workflowDonInfo := commoncap.DON{
ID: "workflow-don",
Members: []p2ptypes.PeerID{p2},
F: 0,
Expand Down
10 changes: 6 additions & 4 deletions core/capabilities/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/mercury"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers"
Expand Down Expand Up @@ -70,7 +71,7 @@ func (s *registrySyncer) Start(ctx context.Context) error {
"12D3KooWN2hztiXNNS1jMQTTvvPRYcarK1C7T3Mdqk4x4gwyo5WS",
}
allPeers := make(map[ragetypes.PeerID]p2ptypes.StreamConfig)
addPeersToDONInfo := func(peers []string, donInfo *remotetypes.DON) error {
addPeersToDONInfo := func(peers []string, donInfo *capabilities.DON) error {
for _, peerID := range peers {
var p ragetypes.PeerID
err := p.UnmarshalText([]byte(peerID))
Expand All @@ -82,11 +83,11 @@ func (s *registrySyncer) Start(ctx context.Context) error {
}
return nil
}
workflowDonInfo := remotetypes.DON{ID: "workflowDon1", F: 1}
workflowDonInfo := capabilities.DON{ID: "workflowDon1", F: 1}
if err := addPeersToDONInfo(workflowDONPeers, &workflowDonInfo); err != nil {
return err
}
triggerCapabilityDonInfo := remotetypes.DON{ID: "capabilityDon1", F: 1}
triggerCapabilityDonInfo := capabilities.DON{ID: "capabilityDon1", F: 1}
if err := addPeersToDONInfo(triggerDONPeers, &triggerCapabilityDonInfo); err != nil {
return err
}
Expand All @@ -101,6 +102,7 @@ func (s *registrySyncer) Start(ctx context.Context) error {
CapabilityType: commoncap.CapabilityTypeTrigger,
Description: "Remote Trigger",
Version: "0.0.1",
DON: &triggerCapabilityDonInfo,
}
myId := s.peerWrapper.GetPeer().ID().String()
config := remotetypes.RemoteTriggerConfig{
Expand All @@ -125,7 +127,7 @@ func (s *registrySyncer) Start(ctx context.Context) error {
}
if slices.Contains(triggerDONPeers, myId) {
s.lggr.Info("member of a capability DON - starting remote publishers")
workflowDONs := map[string]remotetypes.DON{
workflowDONs := map[string]capabilities.DON{
workflowDonInfo.ID: workflowDonInfo,
}
underlying := triggers.NewMercuryTriggerService(1000, s.lggr)
Expand Down
28 changes: 21 additions & 7 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2"
"github.com/smartcontractkit/chainlink/v2/core/services/ocrbootstrap"
"github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
externalp2p "github.com/smartcontractkit/chainlink/v2/core/services/p2p/wrapper"
"github.com/smartcontractkit/chainlink/v2/core/services/periodicbackup"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
Expand Down Expand Up @@ -189,9 +190,12 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
unrestrictedHTTPClient := opts.UnrestrictedHTTPClient
registry := capabilities.NewRegistry(globalLogger)

var externalPeerWrapper p2ptypes.PeerWrapper
if cfg.Capabilities().Peering().Enabled() {
externalPeerWrapper := externalp2p.NewExternalPeerWrapper(keyStore.P2P(), cfg.Capabilities().Peering(), globalLogger)
signer := externalPeerWrapper
externalPeer := externalp2p.NewExternalPeerWrapper(keyStore.P2P(), cfg.Capabilities().Peering(), globalLogger)
signer := externalPeer
externalPeerWrapper = externalPeer

srvcs = append(srvcs, externalPeerWrapper)

// NOTE: RegistrySyncer will depend on a Relayer when fully implemented
Expand Down Expand Up @@ -367,16 +371,26 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
globalLogger,
streamRegistry,
pipelineRunner,
cfg.JobPipeline()),
job.Workflow: workflows.NewDelegate(
globalLogger,
registry,
legacyEVMChains,
cfg.JobPipeline(),
),
}
webhookJobRunner = delegates[job.Webhook].(*webhook.Delegate).WebhookJobRunner()
)

delegates[job.Workflow] = workflows.NewDelegate(
globalLogger,
registry,
legacyEVMChains,
func() *p2ptypes.PeerID {
if externalPeerWrapper == nil {
return nil
}

peerID := externalPeerWrapper.GetPeer().ID()
return &peerID
},
)

// Flux monitor requires ethereum just to boot, silence errors with a null delegate
if !cfg.EVMRPCEnabled() {
delegates[job.FluxMonitor] = &job.NullDelegate{Type: job.FluxMonitor}
Expand Down
52 changes: 50 additions & 2 deletions core/services/workflows/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,26 @@ package workflows

import (
"context"
"encoding/hex"
"fmt"

"github.com/google/uuid"
"github.com/pelletier/go-toml"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/targets"
"github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)

type Delegate struct {
registry core.CapabilitiesRegistry
logger logger.Logger
legacyEVMChains legacyevm.LegacyChainContainer
peerID func() *p2ptypes.PeerID
}

var _ job.Delegate = (*Delegate)(nil)
Expand All @@ -42,11 +46,18 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser
d.logger.Errorw("could not initialize writes", err)
}

dinfo, err := initializeDONInfo(d.logger)
if err != nil {
d.logger.Errorw("could not add initialize don info", err)
}

cfg := Config{
Lggr: d.logger,
Spec: spec.WorkflowSpec.Workflow,
WorkflowID: spec.WorkflowSpec.WorkflowID,
Registry: d.registry,
DONInfo: dinfo,
PeerID: d.peerID,
}
engine, err := NewEngine(cfg)
if err != nil {
Expand All @@ -55,8 +66,45 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser
return []job.ServiceCtx{engine}, nil
}

func NewDelegate(logger logger.Logger, registry core.CapabilitiesRegistry, legacyEVMChains legacyevm.LegacyChainContainer) *Delegate {
return &Delegate{logger: logger, registry: registry, legacyEVMChains: legacyEVMChains}
func initializeDONInfo(lggr logger.Logger) (*capabilities.DON, error) {
var key [16]byte

// TODO: fetch the key and DONInfo from the registry
keyString := "44fb5c1ee8ee48846c808a383da3aba3"
k, err := hex.DecodeString(keyString)
if err != nil {
lggr.Errorf("could not decode key %s: %w", keyString, err)
}
key = [16]byte(k)

p2pStrings := []string{
"12D3KooWBCF1XT5Wi8FzfgNCqRL76Swv8TRU3TiD4QiJm8NMNX7N",
"12D3KooWG1AyvwmCpZ93J8pBQUE1SuzrjDXnT4BeouncHR3jWLCG",
"12D3KooWGeUKZBRMbx27FUTgBwZa9Ap9Ym92mywwpuqkEtz8XWyv",
"12D3KooW9zYWQv3STmDeNDidyzxsJSTxoCTLicafgfeEz9nhwhC4",
}

p2pIDs := []p2ptypes.PeerID{}
for _, p := range p2pStrings {
pid := p2ptypes.PeerID{}
err := pid.UnmarshalText([]byte(p))
if err != nil {
return nil, err
}

p2pIDs = append(p2pIDs, pid)
}

return &capabilities.DON{
Members: p2pIDs,
Config: capabilities.DONConfig{
SharedSecret: key,
},
}, nil
}

func NewDelegate(logger logger.Logger, registry core.CapabilitiesRegistry, legacyEVMChains legacyevm.LegacyChainContainer, peerID func() *p2ptypes.PeerID) *Delegate {
return &Delegate{logger: logger, registry: registry, legacyEVMChains: legacyEVMChains, peerID: peerID}
}

func ValidatedWorkflowSpec(tomlString string) (job.Job, error) {
Expand Down
Loading

0 comments on commit 933a464

Please sign in to comment.