Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Keystone][Deployments] Make adding NOPs and DONs idempotent #15163

Merged
merged 1 commit into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deployment/keystone/changeset/internal/test/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func deployCapReg(t *testing.T, lggr logger.Logger, chain deployment.Chain) *kcr
}

func addNops(t *testing.T, lggr logger.Logger, chain deployment.Chain, registry *kcr.CapabilitiesRegistry, nops []kcr.CapabilitiesRegistryNodeOperator) *kslib.RegisterNOPSResponse {
resp, err := kslib.RegisterNOPS(context.TODO(), kslib.RegisterNOPSRequest{
resp, err := kslib.RegisterNOPS(context.TODO(), lggr, kslib.RegisterNOPSRequest{
Chain: chain,
Registry: registry,
Nops: nops,
Expand Down
110 changes: 84 additions & 26 deletions deployment/keystone/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func ConfigureRegistry(ctx context.Context, lggr logger.Logger, req ConfigureCon
for _, nop := range nodeIdToNop {
nops = append(nops, nop)
}
nopsResp, err := RegisterNOPS(ctx, RegisterNOPSRequest{
nopsResp, err := RegisterNOPS(ctx, lggr, RegisterNOPSRequest{
Chain: registryChain,
Registry: registry,
Nops: nops,
Expand Down Expand Up @@ -231,7 +231,7 @@ func ConfigureRegistry(ctx context.Context, lggr logger.Logger, req ConfigureCon
if err != nil {
return nil, fmt.Errorf("failed to register DONS: %w", err)
}
lggr.Infow("registered DONS", "dons", len(donsResp.donInfos))
lggr.Infow("registered DONs", "dons", len(donsResp.donInfos))

return &ConfigureContractsResponse{
Changeset: &deployment.ChangesetOutput{
Expand Down Expand Up @@ -371,6 +371,7 @@ func registerCapabilities(lggr logger.Logger, req registerCapabilitiesRequest) (
if len(req.donToCapabilities) == 0 {
return nil, fmt.Errorf("no capabilities to register")
}
lggr.Infow("registering capabilities...", "len", len(req.donToCapabilities))
resp := &registerCapabilitiesResponse{
donToCapabilities: make(map[string][]RegisteredCapability),
}
Expand Down Expand Up @@ -421,8 +422,37 @@ type RegisterNOPSResponse struct {
Nops []*kcr.CapabilitiesRegistryNodeOperatorAdded
}

func RegisterNOPS(ctx context.Context, req RegisterNOPSRequest) (*RegisterNOPSResponse, error) {
nops := req.Nops
func RegisterNOPS(ctx context.Context, lggr logger.Logger, req RegisterNOPSRequest) (*RegisterNOPSResponse, error) {
lggr.Infow("registering node operators...", "len", len(req.Nops))
existingNops, err := req.Registry.GetNodeOperators(&bind.CallOpts{})
if err != nil {
return nil, err
}
existingNopsAddrToID := make(map[capabilities_registry.CapabilitiesRegistryNodeOperator]uint32)
for id, nop := range existingNops {
existingNopsAddrToID[nop] = uint32(id)
}
lggr.Infow("fetched existing node operators", "len", len(existingNopsAddrToID))
resp := &RegisterNOPSResponse{
Nops: []*kcr.CapabilitiesRegistryNodeOperatorAdded{},
}
nops := []kcr.CapabilitiesRegistryNodeOperator{}
for _, nop := range req.Nops {
if id, ok := existingNopsAddrToID[nop]; !ok {
nops = append(nops, nop)
} else {
lggr.Debugw("node operator already exists", "name", nop.Name, "admin", nop.Admin.String(), "id", id)
resp.Nops = append(resp.Nops, &kcr.CapabilitiesRegistryNodeOperatorAdded{
NodeOperatorId: id,
Name: nop.Name,
Admin: nop.Admin,
})
}
}
if len(nops) == 0 {
lggr.Debug("no new node operators to register")
return resp, nil
}
tx, err := req.Registry.AddNodeOperators(req.Chain.DeployerKey, nops)
if err != nil {
err = DecodeErr(kcr.CapabilitiesRegistryABI, err)
Expand All @@ -442,15 +472,12 @@ func RegisterNOPS(ctx context.Context, req RegisterNOPSRequest) (*RegisterNOPSRe
if len(receipt.Logs) != len(nops) {
return nil, fmt.Errorf("expected %d log entries for AddNodeOperators, got %d", len(nops), len(receipt.Logs))
}
resp := &RegisterNOPSResponse{
Nops: make([]*kcr.CapabilitiesRegistryNodeOperatorAdded, len(receipt.Logs)),
}
for i, log := range receipt.Logs {
o, err := req.Registry.ParseNodeOperatorAdded(*log)
if err != nil {
return nil, fmt.Errorf("failed to parse log %d for operator added: %w", i, err)
}
resp.Nops[i] = o
resp.Nops = append(resp.Nops, o)
}

return resp, nil
Expand Down Expand Up @@ -531,6 +558,7 @@ type registerNodesResponse struct {
// can sign the transactions update the contract state
// TODO: 467 refactor to support MCMS. Specifically need to separate the call data generation from the actual contract call
func registerNodes(lggr logger.Logger, req *registerNodesRequest) (*registerNodesResponse, error) {
lggr.Infow("registering nodes...", "len", len(req.nodeIdToNop))
nopToNodeIDs := make(map[kcr.CapabilitiesRegistryNodeOperator][]string)
for nodeID, nop := range req.nodeIdToNop {
if _, ok := nopToNodeIDs[nop]; !ok {
Expand Down Expand Up @@ -623,7 +651,7 @@ func registerNodes(lggr logger.Logger, req *registerNodesRequest) (*registerNode
if err != nil {
err = DecodeErr(kcr.CapabilitiesRegistryABI, err)
if strings.Contains(err.Error(), "NodeAlreadyExists") {
lggr.Warnw("node already exists, skipping", "p2pid", singleNodeParams.P2pId)
lggr.Warnw("node already exists, skipping", "p2pid", hex.EncodeToString(singleNodeParams.P2pId[:]))
continue
}
return nil, fmt.Errorf("failed to call AddNode for node with p2pid %v: %w", singleNodeParams.P2pId, err)
Expand Down Expand Up @@ -672,13 +700,22 @@ func sortedHash(p2pids [][32]byte) string {
}

func registerDons(lggr logger.Logger, req registerDonsRequest) (*registerDonsResponse, error) {
resp := registerDonsResponse{
donInfos: make(map[string]kcr.CapabilitiesRegistryDONInfo),
}
lggr.Infow("registering DONs...", "len", len(req.donToOcr2Nodes))
// track hash of sorted p2pids to don name because the registry return value does not include the don name
// and we need to map it back to the don name to access the other mapping data such as the don's capabilities & nodes
p2pIdsToDon := make(map[string]string)
var registeredDons = 0
var addedDons = 0

donInfos, err := req.registry.GetDONs(&bind.CallOpts{})
if err != nil {
err = DecodeErr(kcr.CapabilitiesRegistryABI, err)
return nil, fmt.Errorf("failed to call GetDONs: %w", err)
}
existingDONs := make(map[string]struct{})
for _, donInfo := range donInfos {
existingDONs[sortedHash(donInfo.NodeP2PIds)] = struct{}{}
}
lggr.Infow("fetched existing DONs...", "len", len(donInfos), "lenByNodesHash", len(existingDONs))

for don, ocr2nodes := range req.donToOcr2Nodes {
var p2pIds [][32]byte
Expand All @@ -695,6 +732,12 @@ func registerDons(lggr logger.Logger, req registerDonsRequest) (*registerDonsRes

p2pSortedHash := sortedHash(p2pIds)
p2pIdsToDon[p2pSortedHash] = don

if _, ok := existingDONs[p2pSortedHash]; ok {
lggr.Debugw("don already exists, ignoring", "don", don, "p2p sorted hash", p2pSortedHash)
continue
}

caps, ok := req.donToCapabilities[don]
if !ok {
return nil, fmt.Errorf("capabilities not found for node operator %s", don)
Expand Down Expand Up @@ -728,44 +771,59 @@ func registerDons(lggr logger.Logger, req registerDonsRequest) (*registerDonsRes
return nil, fmt.Errorf("failed to confirm AddDON transaction %s for don %s: %w", tx.Hash().String(), don, err)
}
lggr.Debugw("registered DON", "don", don, "p2p sorted hash", p2pSortedHash, "cgs", cfgs, "wfSupported", wfSupported, "f", f)
registeredDons++
addedDons++
}
lggr.Debugf("Registered all DONS %d, waiting for registry to update", registeredDons)
lggr.Debugf("Registered all DONs (new=%d), waiting for registry to update", addedDons)

// occasionally the registry does not return the expected number of DONS immediately after the txns above
// so we retry a few times. while crude, it is effective
var donInfos []capabilities_registry.CapabilitiesRegistryDONInfo
var err error
foundAll := false
for i := 0; i < 10; i++ {
lggr.Debug("attempting to get DONS from registry", i)
lggr.Debugw("attempting to get DONs from registry", "attempt#", i)
donInfos, err = req.registry.GetDONs(&bind.CallOpts{})
if len(donInfos) != registeredDons {
lggr.Debugw("expected dons not registered", "expected", registeredDons, "got", len(donInfos))
if !containsAllDONs(donInfos, p2pIdsToDon) {
lggr.Debugw("some expected dons not registered yet, re-checking after a delay ...")
time.Sleep(2 * time.Second)
} else {
foundAll = true
break
}
}
if err != nil {
err = DecodeErr(kcr.CapabilitiesRegistryABI, err)
return nil, fmt.Errorf("failed to call GetDONs: %w", err)
}
if !foundAll {
return nil, fmt.Errorf("did not find all desired DONS")
}

resp := registerDonsResponse{
donInfos: make(map[string]kcr.CapabilitiesRegistryDONInfo),
}
for i, donInfo := range donInfos {
donName, ok := p2pIdsToDon[sortedHash(donInfo.NodeP2PIds)]
if !ok {
return nil, fmt.Errorf("don not found for p2pids %s in %v", sortedHash(donInfo.NodeP2PIds), p2pIdsToDon)
lggr.Debugw("irrelevant DON found in the registry, ignoring", "p2p sorted hash", sortedHash(donInfo.NodeP2PIds))
continue
}
lggr.Debugw("adding don info", "don", donName, "cnt", i)
lggr.Debugw("adding don info to the reponse (keyed by DON name)", "don", donName)
resp.donInfos[donName] = donInfos[i]
}
lggr.Debugw("found registered DONs", "count", len(resp.donInfos))
if len(resp.donInfos) != registeredDons {
return nil, fmt.Errorf("expected %d dons, got %d", registeredDons, len(resp.donInfos))
}
return &resp, nil
}

// are all DONs from p2pIdsToDon in donInfos
func containsAllDONs(donInfos []kcr.CapabilitiesRegistryDONInfo, p2pIdsToDon map[string]string) bool {
found := make(map[string]struct{})
for _, donInfo := range donInfos {
hash := sortedHash(donInfo.NodeP2PIds)
if _, ok := p2pIdsToDon[hash]; ok {
found[hash] = struct{}{}
}
}
return len(found) == len(p2pIdsToDon)
}

// configureForwarder sets the config for the forwarder contract on the chain for all Dons that accept workflows
// dons that don't accept workflows are not registered with the forwarder
func configureForwarder(lggr logger.Logger, chain deployment.Chain, fwdr *kf.KeystoneForwarder, dons []RegisteredDon) error {
Expand Down
8 changes: 4 additions & 4 deletions deployment/keystone/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,15 @@ type DonCapabilities struct {
}

// map the node id to the NOP
func (dc DonCapabilities) nodeIdToNop(cs uint64) (map[string]capabilities_registry.CapabilitiesRegistryNodeOperator, error) {
func (dc DonCapabilities) nopsByNodeID(chainSelector uint64) (map[string]capabilities_registry.CapabilitiesRegistryNodeOperator, error) {
out := make(map[string]capabilities_registry.CapabilitiesRegistryNodeOperator)
for _, nop := range dc.Nops {
for _, node := range nop.Nodes {
a, err := AdminAddress(node, cs)
a, err := AdminAddress(node, chainSelector)
if err != nil {
return nil, fmt.Errorf("failed to get admin address for node %s: %w", node.ID, err)
}
out[node.ID] = NodeOperator(dc.Name, a)
out[node.ID] = NodeOperator(nop.Name, a)

}
}
Expand Down Expand Up @@ -251,7 +251,7 @@ func AdminAddress(n *models.Node, chainSel uint64) (string, error) {
func nodesToNops(dons []DonCapabilities, chainSel uint64) (map[string]capabilities_registry.CapabilitiesRegistryNodeOperator, error) {
out := make(map[string]capabilities_registry.CapabilitiesRegistryNodeOperator)
for _, don := range dons {
nops, err := don.nodeIdToNop(chainSel)
nops, err := don.nopsByNodeID(chainSel)
if err != nil {
return nil, fmt.Errorf("failed to get registry NOPs for don %s: %w", don.Name, err)
}
Expand Down
Loading