Skip to content
This repository is currently being migrated. It's locked while the migration is in progress.

Commit

Permalink
Skip duplicated nodes by hostname (#124)
Browse files Browse the repository at this point in the history
  • Loading branch information
mhmxs authored Mar 2, 2022
1 parent d3ee027 commit a724564
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 12 deletions.
29 changes: 17 additions & 12 deletions internal/controllers/sync-node/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (

stosv1 "github.com/storageos/api-manager/api/v1"
"github.com/storageos/api-manager/internal/pkg/storageos"
apierrors "k8s.io/apimachinery/pkg/api/errors"
)

const (
Expand All @@ -65,7 +66,7 @@ type Reconciler struct {
apiReset chan<- struct{}
apiPollInterval time.Duration
nodeHashes map[string][]byte
existingNodes map[string]client.ObjectKey
existingNodes map[string]string
recorder record.EventRecorder
}

Expand All @@ -91,7 +92,7 @@ func NewReconciler(
apiReset: apiReset,
apiPollInterval: apiPollInterval,
nodeHashes: map[string][]byte{},
existingNodes: map[string]client.ObjectKey{},
existingNodes: map[string]string{},
recorder: recorder,
}
}
Expand Down Expand Up @@ -121,7 +122,7 @@ func (r *Reconciler) Start(ctx context.Context) error {
return err
}
for _, node := range listNodes.Items {
r.existingNodes[node.Status.NodeID] = client.ObjectKey{Name: node.Name}
r.existingNodes[node.Status.NodeID] = node.Name
}
r.log.V(5).Info("existing nodes: %v", r.existingNodes)

Expand Down Expand Up @@ -152,20 +153,21 @@ func (r *Reconciler) sync(ctx context.Context) {
r.apiReset <- struct{}{}
continue
}

span.SetAttributes(label.Int("nodes", len(nodes)))

// Current nodes for easy lookup later.
currentNodes := map[string]client.ObjectKey{}
currentNodes := map[string]bool{}

for key, obj := range nodes {
r.existingNodes[obj.GetID()] = key
currentNodes[obj.GetID()] = key
r.existingNodes[obj.GetID()] = key.Name
currentNodes[obj.GetID()] = true

r.save(ctx, obj)
}

// Delete orphan nodes.
orphanNodes := map[string]client.ObjectKey{}
orphanNodes := map[string]string{}
for nodeID, nodeKey := range r.existingNodes {
if _, ok := currentNodes[nodeID]; !ok {
orphanNodes[nodeID] = nodeKey
Expand Down Expand Up @@ -252,8 +254,8 @@ func (r *Reconciler) save(ctx context.Context, obj storageos.Object) {
}

// delete receives Nodes to delete it.
func (r *Reconciler) delete(ctx context.Context, nodeKey client.ObjectKey) {
log := r.log.WithValues("name", nodeKey.Name)
func (r *Reconciler) delete(ctx context.Context, nodeName string) {
log := r.log.WithValues("name", nodeName)
log.Info("delete node")

// Record reconcile duration.
Expand All @@ -263,16 +265,19 @@ func (r *Reconciler) delete(ctx context.Context, nodeKey client.ObjectKey) {
tr := otel.Tracer("delete-node")
ctx, span := tr.Start(ctx, "delete node")
defer span.End()
span.SetAttributes(label.String("name", nodeKey.Name))
span.SetAttributes(label.String("name", nodeName))

observeErr := func(err error, msg string) {
span.RecordError(errors.Wrap(err, msg))
log.Error(err, msg)
}

node := &stosv1.Node{}
if err := r.Client.Get(ctx, nodeKey, node); err != nil {
observeErr(err, "unable to fetch node")
if err := r.Client.Get(ctx, client.ObjectKey{Name: nodeName}, node); err != nil {
if !apierrors.IsNotFound(err) {
observeErr(err, "unable to fetch node")
}

return
}

Expand Down
7 changes: 7 additions & 0 deletions internal/pkg/storageos/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"net/http"
"sort"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -48,6 +49,12 @@ func (c *Client) NodeObjects(ctx context.Context) (map[client.ObjectKey]Object,
if err != nil {
return nil, observeErr(stosapierrors.MapAPIError(err, resp))
}

// Guarantee uniqueness of hostnames, latest wins
sort.Slice(nodes, func(i, j int) bool {
return nodes[i].CreatedAt.Before(nodes[j].CreatedAt)
})

objects := make(map[client.ObjectKey]Object)
for _, node := range nodes {
nodeWrapper := model.StosObject{
Expand Down

0 comments on commit a724564

Please sign in to comment.