Skip to content

Commit

Permalink
[ACM-10812]: fix addon status not reported in hub (stolostron#1420)
Browse files Browse the repository at this point in the history
* init version

Signed-off-by: Thibault Mange <[email protected]>

* fix

Signed-off-by: Thibault Mange <[email protected]>

* env test

Signed-off-by: Thibault Mange <[email protected]>

* change withReload naming

Signed-off-by: Thibault Mange <[email protected]>

---------

Signed-off-by: Thibault Mange <[email protected]>
  • Loading branch information
thibaultmg authored Apr 30, 2024
1 parent b78fc3b commit afc17b4
Show file tree
Hide file tree
Showing 10 changed files with 675 additions and 241 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ var (
type ObservabilityAddonReconciler struct {
Client client.Client
Scheme *runtime.Scheme
HubClient client.Client
HubClient *util.ReloadableHubClient
}

// +kubebuilder:rbac:groups=observability.open-cluster-management.io.open-cluster-management.io,resources=observabilityaddons,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -113,18 +113,22 @@ func (r *ObservabilityAddonReconciler) Reconcile(ctx context.Context, req ctrl.R
}

// Fetch the ObservabilityAddon instance in hub cluster
err := r.HubClient.Get(ctx, types.NamespacedName{Name: obAddonName, Namespace: hubNamespace}, hubObsAddon)
if err != nil {
hubClient, obsAddon, err := util.RenewAndRetry(ctx, r.Scheme)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to get observabilityaddon: %w", err)
fetchAddon := func() error {
return r.HubClient.Get(ctx, types.NamespacedName{Name: obAddonName, Namespace: hubNamespace}, hubObsAddon)
}
if err := fetchAddon(); err != nil {
if r.HubClient, err = r.HubClient.Reload(); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to reload the hub client: %w", err)
}

// Retry the operation once with the reloaded client
if err := fetchAddon(); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to get ObservabilityAddon in hub cluster: %w", err)
}
r.HubClient = hubClient
hubObsAddon = obsAddon
}

// Fetch the ObservabilityAddon instance in local cluster
err = r.Client.Get(ctx, types.NamespacedName{Name: obAddonName, Namespace: namespace}, obsAddon)
err := r.Client.Get(ctx, types.NamespacedName{Name: obAddonName, Namespace: namespace}, obsAddon)
if err != nil {
if !errors.IsNotFound(err) {
return ctrl.Result{}, fmt.Errorf("failed to get observabilityaddon: %w", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
hyperv1 "github.com/openshift/hypershift/api/v1alpha1"
promv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
"github.com/stolostron/multicluster-observability-operator/operators/endpointmetrics/pkg/hypershift"
"github.com/stolostron/multicluster-observability-operator/operators/endpointmetrics/pkg/util"
oav1beta1 "github.com/stolostron/multicluster-observability-operator/operators/multiclusterobservability/api/v1beta1"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -67,9 +68,13 @@ func TestIntegrationReconcileHypershift(t *testing.T) {
})
assert.NoError(t, err)

hubClientWithReload, err := util.NewReloadableHubClientWithReloadFunc(func() (client.Client, error) {
return k8sClient, nil
})
assert.NoError(t, err)
reconciler := ObservabilityAddonReconciler{
Client: k8sClient,
HubClient: k8sClient,
HubClient: hubClientWithReload,
}

err = reconciler.SetupWithManager(mgr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

"github.com/stolostron/multicluster-observability-operator/operators/endpointmetrics/pkg/openshift"
Expand Down Expand Up @@ -178,12 +179,17 @@ alertmanager-router-ca: |
}

hubClient := fake.NewClientBuilder().WithRuntimeObjects(hubObjs...).Build()
util.SetHubClient(hubClient)
c := fake.NewClientBuilder().WithRuntimeObjects(objs...).Build()

hubClientWithReload, err := util.NewReloadableHubClientWithReloadFunc(func() (client.Client, error) {
return hubClient, nil
})
if err != nil {
t.Fatalf("Failed to create hub client with reload: %v", err)
}
r := &ObservabilityAddonReconciler{
Client: c,
HubClient: hubClient,
HubClient: hubClientWithReload,
}

// test error in reconcile if missing obervabilityaddon
Expand All @@ -194,7 +200,7 @@ alertmanager-router-ca: |
},
}
ctx := context.TODO()
_, err := r.Reconcile(ctx, req)
_, err = r.Reconcile(ctx, req)
if err == nil {
t.Fatalf("reconcile: miss the error for missing obervabilityaddon")
}
Expand Down
136 changes: 95 additions & 41 deletions operators/endpointmetrics/controllers/status/status_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,91 +6,104 @@ package status

import (
"context"
"os"
"fmt"
"net"
"reflect"
"time"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"github.com/go-logr/logr"
"github.com/stolostron/multicluster-observability-operator/operators/endpointmetrics/pkg/util"
oav1beta1 "github.com/stolostron/multicluster-observability-operator/operators/multiclusterobservability/api/v1beta1"
)

var (
log = ctrl.Log.WithName("controllers").WithName("Status")
)

const (
obAddonName = "observability-addon"
)

var (
namespace = os.Getenv("WATCH_NAMESPACE")
hubNamespace = os.Getenv("HUB_NAMESPACE")
)

// StatusReconciler reconciles status object.
type StatusReconciler struct {
Client client.Client
Scheme *runtime.Scheme
HubClient client.Client
Client client.Client
HubNamespace string
Namespace string
HubClient *util.ReloadableHubClient
ObsAddonName string
Logger logr.Logger
}

// Reconcile reads that state of the cluster for a ObservabilityAddon object and makes changes based on the state read
// and what is in the ObservabilityAddon.Status
// The Controller will requeue the Request to be processed again if the returned error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
func (r *StatusReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.WithValues("Request.Namespace", req.Namespace, "Request.Name", req.Name)
log.Info("Reconciling")
r.Logger.WithValues("Request", req.String()).Info("Reconciling")

// Fetch the ObservabilityAddon instance in hub cluster
hubObsAddon := &oav1beta1.ObservabilityAddon{}
err := r.HubClient.Get(ctx, types.NamespacedName{Name: obAddonName, Namespace: hubNamespace}, hubObsAddon)
err := r.HubClient.Get(ctx, types.NamespacedName{Name: r.ObsAddonName, Namespace: r.HubNamespace}, hubObsAddon)
if err != nil {
hubClient, obsAddon, err := util.RenewAndRetry(ctx, r.Scheme)
if err != nil {
return ctrl.Result{}, err
if isAuthOrConnectionErr(err) {
// Try reloading the kubeconfig for the hub cluster
var reloadErr error
if r.HubClient, reloadErr = r.HubClient.Reload(); reloadErr != nil {
return ctrl.Result{}, fmt.Errorf("failed to reload the hub client: %w", reloadErr)
}
r.Logger.Info("Failed to get ObservabilityAddon in hub cluster, reloaded hub, requeue with delay", "error", err)
return ctrl.Result{Requeue: true}, nil
}

if isTransientErr(err) {
r.Logger.Info("Failed to get ObservabilityAddon in hub cluster, requeue with delay", "error", err)
return requeueWithOptionalDelay(err), nil
}
r.HubClient = hubClient
hubObsAddon = obsAddon
}

// Fetch the ObservabilityAddon instance in local cluster
obsAddon := &oav1beta1.ObservabilityAddon{}
err = r.Client.Get(ctx, types.NamespacedName{Name: obAddonName, Namespace: namespace}, obsAddon)
if err != nil {
log.Error(err, "Failed to get observabilityaddon", "namespace", namespace)
return ctrl.Result{}, err
}

hubObsAddon.Status = obsAddon.Status
// Retry on conflict as operation happens in other cluster
// on a shared resource that can be updated by multiple controllers.
retryErr := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
// Fetch the ObservabilityAddon instance in local cluster
obsAddon := &oav1beta1.ObservabilityAddon{}
if err != r.Client.Get(ctx, types.NamespacedName{Name: r.ObsAddonName, Namespace: r.Namespace}, obsAddon) {
return err
}

err = r.HubClient.Status().Update(ctx, hubObsAddon)
if err != nil {
log.Error(err, "Failed to update status for observabilityaddon in hub cluster", "namespace", hubNamespace)
// Only update the status in hub cluster if needed
if reflect.DeepEqual(hubObsAddon.Status, obsAddon.Status) {
return nil
}

updatedAddon := hubObsAddon.DeepCopy()
updatedAddon.Status = obsAddon.Status

// Update the status in hub cluster
return r.HubClient.Status().Update(ctx, updatedAddon)
})
if retryErr != nil {
if isTransientErr(retryErr) || errors.IsConflict(retryErr) {
r.Logger.Info("Retryable error while updating status, request will be retried.", "error", retryErr)
return requeueWithOptionalDelay(retryErr), nil
}

return ctrl.Result{}, fmt.Errorf("failed to update status in hub cluster: %w", retryErr)
}

return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *StatusReconciler) SetupWithManager(mgr ctrl.Manager) error {
if os.Getenv("NAMESPACE") != "" {
namespace = os.Getenv("NAMESPACE")
}

pred := predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return false
},
UpdateFunc: func(e event.UpdateEvent) bool {
if e.ObjectNew.GetNamespace() == namespace &&
if e.ObjectNew.GetNamespace() == r.Namespace &&
!reflect.DeepEqual(e.ObjectNew.(*oav1beta1.ObservabilityAddon).Status,
e.ObjectOld.(*oav1beta1.ObservabilityAddon).Status) {
return true
Expand All @@ -106,3 +119,44 @@ func (r *StatusReconciler) SetupWithManager(mgr ctrl.Manager) error {
For(&oav1beta1.ObservabilityAddon{}, builder.WithPredicates(pred)).
Complete(r)
}

// isTransientErr checks if the error is a transient error
// This suggests that a retry (without any change) might be successful
func isTransientErr(err error) bool {
if _, ok := err.(net.Error); ok {
return true
}

if statusErr, ok := err.(*errors.StatusError); ok {
code := statusErr.Status().Code
if code >= 500 && code < 600 && code != 501 {
return true
}
}

return errors.IsTimeout(err) || errors.IsServerTimeout(err) || errors.IsTooManyRequests(err)
}

// isAuthOrConnectionErr checks if the error is an authentication error or a connection error
// This suggests an issue with the client configuration and a reload might be needed
func isAuthOrConnectionErr(err error) bool {
if errors.IsUnauthorized(err) || errors.IsForbidden(err) || errors.IsTimeout(err) {
return true
}

if _, ok := err.(net.Error); ok {
return true
}

return false
}

// requeueWithOptionalDelay requeues the request with a delay if suggested by the error
// Otherwise, it requeues the request without a delay
func requeueWithOptionalDelay(err error) ctrl.Result {
if delay, ok := errors.SuggestsClientDelay(err); ok {
return ctrl.Result{RequeueAfter: time.Duration(delay) * time.Second}
}

return ctrl.Result{Requeue: true}
}
Loading

0 comments on commit afc17b4

Please sign in to comment.