Skip to content

Commit

Permalink
deployments: keystone: Leverage job-distributor (#15179)
Browse files Browse the repository at this point in the history
* deployment: Support other chain types in CreateCCIPOCRSupportedChains

* nix: Upgrade to postgres 15

* keystone: Migrate from CLO to JD

* CLO compat

* Allow setting labels on nodes

* Rename function

* Tag nodes with p2p_id for easy lookup

* Lookup nodes according to p2p_id

* Implement label & id filtering in the memory job client

* Update the CLO job client as well

* go mod tidy

* Fix DeployCLO

* Fix CLO job client test

* add TODOs

* fix up tests again

* Fix compilation, remove nodeIdToNop indirection

* fix TestDeployCLO

* add clo utils to bridge tooling gap

* add utils to remap clo node id to peer id

* fix p2p filter in clo jd impl; fix mis-matched id vs p2p usage

* Add credential support to jd client

* go mod tidy

* rm pointer lib

---------

Co-authored-by: krehermann <[email protected]>
  • Loading branch information
archseer and krehermann authored Nov 13, 2024
1 parent 973adb5 commit 24c3f90
Show file tree
Hide file tree
Showing 30 changed files with 1,769 additions and 798 deletions.
4 changes: 2 additions & 2 deletions deployment/ccip/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,14 @@ func NewMemoryEnvironment(t *testing.T, lggr logger.Logger, numChains int, numNo
require.GreaterOrEqual(t, numChains, 2, "numChains must be at least 2 for home and feed chains")
require.GreaterOrEqual(t, numNodes, 4, "numNodes must be at least 4")
ctx := testcontext.Get(t)
chains, evmChains := memory.NewMemoryChains(t, numChains)
chains := memory.NewMemoryChains(t, numChains)
homeChainSel, feedSel := allocateCCIPChainSelectors(chains)
replayBlocks, err := LatestBlocksByChain(ctx, chains)
require.NoError(t, err)

ab := deployment.NewMemoryAddressBook()
crConfig := DeployTestContracts(t, lggr, ab, homeChainSel, feedSel, chains)
nodes := memory.NewNodes(t, zapcore.InfoLevel, evmChains, numNodes, 1, crConfig)
nodes := memory.NewNodes(t, zapcore.InfoLevel, chains, numNodes, 1, crConfig)
for _, node := range nodes {
require.NoError(t, node.App.Start(ctx))
t.Cleanup(func() {
Expand Down
137 changes: 0 additions & 137 deletions deployment/environment/clo/env.go

This file was deleted.

98 changes: 75 additions & 23 deletions deployment/environment/clo/offchain_client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package clo

import (
"context"
"fmt"
"slices"
"strings"

"go.uber.org/zap"
"google.golang.org/grpc"
Expand All @@ -10,6 +13,7 @@ import (
csav1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/csa"
jobv1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job"
nodev1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/node"
"github.com/smartcontractkit/chainlink-protos/job-distributor/v1/shared/ptypes"
"github.com/smartcontractkit/chainlink/deployment/environment/clo/models"
)

Expand Down Expand Up @@ -60,39 +64,68 @@ func (j JobClient) GetNode(ctx context.Context, in *nodev1.GetNodeRequest, opts
}

func (j JobClient) ListNodes(ctx context.Context, in *nodev1.ListNodesRequest, opts ...grpc.CallOption) (*nodev1.ListNodesResponse, error) {
//TODO CCIP-3108
var fiterIds map[string]struct{}
include := func(id string) bool {
if in.Filter == nil || len(in.Filter.Ids) == 0 {
include := func(node *nodev1.Node) bool {
if in.Filter == nil {
return true
}
// lazy init
if len(fiterIds) == 0 {
for _, id := range in.Filter.Ids {
fiterIds[id] = struct{}{}
if len(in.Filter.Ids) > 0 {
idx := slices.IndexFunc(in.Filter.Ids, func(id string) bool {
return node.Id == id
})
if idx < 0 {
return false
}
}
_, ok := fiterIds[id]
return ok
for _, selector := range in.Filter.Selectors {
idx := slices.IndexFunc(node.Labels, func(label *ptypes.Label) bool {
return label.Key == selector.Key
})
if idx < 0 {
return false
}
label := node.Labels[idx]

switch selector.Op {
case ptypes.SelectorOp_IN:
values := strings.Split(*selector.Value, ",")
found := slices.Contains(values, *label.Value)
if !found {
return false
}
default:
panic("unimplemented selector")
}
}
return true
}
var nodes []*nodev1.Node
for _, nop := range j.NodeOperators {
for _, n := range nop.Nodes {
if include(n.ID) {
nodes = append(nodes, &nodev1.Node{
Id: n.ID,
Name: n.Name,
PublicKey: *n.PublicKey, // is this the correct val?
IsEnabled: n.Enabled,
IsConnected: n.Connected,
})
p2pId, err := NodeP2PId(n)
if err != nil {
return nil, fmt.Errorf("failed to get p2p id for node %s: %w", n.ID, err)
}
node := &nodev1.Node{
Id: n.ID,
Name: n.Name,
PublicKey: *n.PublicKey,
IsEnabled: n.Enabled,
IsConnected: n.Connected,
Labels: []*ptypes.Label{
{
Key: "p2p_id",
Value: &p2pId, // here n.ID is also peer ID
},
},
}
if include(node) {
nodes = append(nodes, node)
}
}
}
return &nodev1.ListNodesResponse{
Nodes: nodes,
}, nil

}

func (j JobClient) ListNodeChainConfigs(ctx context.Context, in *nodev1.ListNodeChainConfigsRequest, opts ...grpc.CallOption) (*nodev1.ListNodeChainConfigsResponse, error) {
Expand Down Expand Up @@ -160,13 +193,18 @@ type GetNodeOperatorsResponse struct {
NodeOperators []*models.NodeOperator `json:"nodeOperators"`
}

func NewJobClient(lggr logger.Logger, nops []*models.NodeOperator) *JobClient {
type JobClientConfig struct {
Nops []*models.NodeOperator
}

func NewJobClient(lggr logger.Logger, cfg JobClientConfig) *JobClient {

c := &JobClient{
NodeOperators: nops,
NodeOperators: cfg.Nops,
nodesByID: make(map[string]*models.Node),
lggr: lggr,
}
for _, nop := range nops {
for _, nop := range c.NodeOperators {
for _, n := range nop.Nodes {
node := n
c.nodesByID[n.ID] = node // maybe should use the public key instead?
Expand All @@ -184,10 +222,24 @@ func cloNodeToChainConfigs(n *models.Node) []*nodev1.ChainConfig {
}

func cloChainCfgToJDChainCfg(ccfg *models.NodeChainConfig) *nodev1.ChainConfig {
var ctype nodev1.ChainType
switch ccfg.Network.ChainType {
case models.ChainTypeEvm:
ctype = nodev1.ChainType_CHAIN_TYPE_EVM
case models.ChainTypeSolana:
ctype = nodev1.ChainType_CHAIN_TYPE_SOLANA
case models.ChainTypeStarknet:
ctype = nodev1.ChainType_CHAIN_TYPE_STARKNET
case models.ChainTypeAptos:
ctype = nodev1.ChainType_CHAIN_TYPE_APTOS
default:
panic(fmt.Sprintf("Unsupported chain family %v", ccfg.Network.ChainType))
}

return &nodev1.ChainConfig{
Chain: &nodev1.Chain{
Id: ccfg.Network.ChainID,
Type: nodev1.ChainType_CHAIN_TYPE_EVM, // TODO: write conversion func from clo to jd tyes
Type: ctype,
},
AccountAddress: ccfg.AccountAddress,
AdminAddress: ccfg.AdminAddress,
Expand Down
Loading

0 comments on commit 24c3f90

Please sign in to comment.