Skip to content

Commit

Permalink
deploy new status lib
Browse files Browse the repository at this point in the history
Signed-off-by: Thibault Mange <[email protected]>
  • Loading branch information
thibaultmg committed Jul 24, 2024
1 parent abe8e00 commit d3fa29e
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 269 deletions.
45 changes: 18 additions & 27 deletions collectors/metrics/pkg/forwarder/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ import (
"github.com/stolostron/multicluster-observability-operator/collectors/metrics/pkg/metricsclient"
"github.com/stolostron/multicluster-observability-operator/collectors/metrics/pkg/simulator"
"github.com/stolostron/multicluster-observability-operator/collectors/metrics/pkg/status"
statuslib "github.com/stolostron/multicluster-observability-operator/operators/endpointmetrics/pkg/status"
)

const (
failedStatusReportMsg = "Failed to report status"
uwlPromURL = "https://prometheus-user-workload.openshift-user-workload-monitoring.svc:9092"
)

type RuleMatcher interface {
Expand Down Expand Up @@ -297,7 +299,9 @@ func New(cfg Config) (*Worker, error) {
}
w.recordingRules = recordingRules

s, err := status.New(logger)
standalone := os.Getenv("STANDALONE") == "true"
isUwl := strings.Contains(os.Getenv("FROM"), uwlPromURL)
s, err := status.New(logger, standalone, isUwl)
if err != nil {
return nil, fmt.Errorf("unable to create StatusReport: %w", err)
}
Expand Down Expand Up @@ -366,6 +370,12 @@ func (w *Worker) forward(ctx context.Context) error {
w.lock.Lock()
defer w.lock.Unlock()

updateStatus := func(reason statuslib.Reason, message string) {
if err := w.status.UpdateStatus(ctx, reason, message); err != nil {
rlogger.Log(w.logger, rlogger.Warn, "msg", failedStatusReportMsg, "err", err)
}
}

var families []*clientmodel.MetricFamily
var err error
if w.simulatedTimeseriesFile != "" {
Expand All @@ -378,19 +388,13 @@ func (w *Worker) forward(ctx context.Context) error {
} else {
families, err = w.getFederateMetrics(ctx)
if err != nil {
statusErr := w.status.UpdateStatus(ctx, "Degraded", "Failed to retrieve metrics")
if statusErr != nil {
rlogger.Log(w.logger, rlogger.Warn, "msg", failedStatusReportMsg, "err", statusErr)
}
updateStatus(statuslib.ForwardFailed, "Failed to retrieve metrics")
return err
}

rfamilies, err := w.getRecordingMetrics(ctx)
if err != nil && len(rfamilies) == 0 {
statusErr := w.status.UpdateStatus(ctx, "Degraded", "Failed to retrieve recording metrics")
if statusErr != nil {
rlogger.Log(w.logger, rlogger.Warn, "msg", failedStatusReportMsg, "err", statusErr)
}
updateStatus(statuslib.ForwardFailed, "Failed to retrieve recording metrics")
return err
} else {
families = append(families, rfamilies...)
Expand All @@ -399,10 +403,7 @@ func (w *Worker) forward(ctx context.Context) error {

before := metricfamily.MetricsCount(families)
if err := metricfamily.Filter(families, w.transformer); err != nil {
statusErr := w.status.UpdateStatus(ctx, "Degraded", "Failed to filter metrics")
if statusErr != nil {
rlogger.Log(w.logger, rlogger.Warn, "msg", failedStatusReportMsg, "err", statusErr)
}
updateStatus(statuslib.ForwardFailed, "Failed to filter metrics")
return err
}

Expand All @@ -416,34 +417,24 @@ func (w *Worker) forward(ctx context.Context) error {

if len(families) == 0 {
rlogger.Log(w.logger, rlogger.Warn, "msg", "no metrics to send, doing nothing")
statusErr := w.status.UpdateStatus(ctx, "Available", "No metrics to send")
if statusErr != nil {
rlogger.Log(w.logger, rlogger.Warn, "msg", failedStatusReportMsg, "err", statusErr)
}
updateStatus(statuslib.ForwardSuccessful, "No metrics to send")
return nil
}

if w.to == nil {
rlogger.Log(w.logger, rlogger.Warn, "msg", "to is nil, doing nothing")
statusErr := w.status.UpdateStatus(ctx, "Available", "Metrics is not required to send")
if statusErr != nil {
rlogger.Log(w.logger, rlogger.Warn, "msg", failedStatusReportMsg, "err", statusErr)
}
updateStatus(statuslib.ForwardSuccessful, "Metrics is not required to send")
return nil
}

req := &http.Request{Method: "POST", URL: w.to}
if err := w.toClient.RemoteWrite(ctx, req, families, w.interval); err != nil {
if err := w.status.UpdateStatus(ctx, "Degraded", "Failed to send metrics"); err != nil {
rlogger.Log(w.logger, rlogger.Warn, "msg", failedStatusReportMsg, "err", err)
}
updateStatus(statuslib.ForwardFailed, "Failed to send metrics")
return err
}

if w.simulatedTimeseriesFile == "" {
if err := w.status.UpdateStatus(ctx, "Available", "Cluster metrics sent successfully"); err != nil {
rlogger.Log(w.logger, rlogger.Warn, "msg", failedStatusReportMsg, "err", err)
}
updateStatus(statuslib.ForwardSuccessful, "Cluster metrics sent successfully")
} else {
rlogger.Log(w.logger, rlogger.Warn, "msg", "Simulated metrics sent successfully")
}
Expand Down
141 changes: 25 additions & 116 deletions collectors/metrics/pkg/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,34 @@ package status
import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"slices"
"sort"
"strings"
"time"

"github.com/go-kit/log"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"github.com/go-logr/logr"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

"github.com/stolostron/multicluster-observability-operator/operators/endpointmetrics/pkg/status"
oav1beta1 "github.com/stolostron/multicluster-observability-operator/operators/multiclusterobservability/api/v1beta1"
)

const (
name = "observability-addon"
namespace = "open-cluster-management-addon-observability"
uwlPromURL = "https://prometheus-user-workload.openshift-user-workload-monitoring.svc:9092"
addonName = "observability-addon"
addonNamespace = "open-cluster-management-addon-observability"
)

type StatusReport struct {
statusClient client.Client
logger log.Logger
statusClient client.Client
standalone bool
isUwl bool
statusReporter status.Status
}

func New(logger log.Logger) (*StatusReport, error) {
func New(logger log.Logger, standalone, isUwl bool) (*StatusReport, error) {
testMode := os.Getenv("UNIT_TEST") != ""
standaloneMode := os.Getenv("STANDALONE") == "true"
var kubeClient client.Client
if testMode {
s := scheme.Scheme
Expand All @@ -50,8 +45,6 @@ func New(logger log.Logger) (*StatusReport, error) {
WithScheme(s).
WithStatusSubresource(&oav1beta1.ObservabilityAddon{}).
Build()
} else if standaloneMode {
kubeClient = nil
} else {
config, err := clientcmd.BuildConfigFromFlags("", "")
if err != nil {
Expand All @@ -67,114 +60,30 @@ func New(logger log.Logger) (*StatusReport, error) {
}
}

statusLogger := logr.FromSlogHandler(slog.New(slog.NewTextHandler(os.Stdout, nil)).With("component", "statusclient").Handler())
return &StatusReport{
statusClient: kubeClient,
logger: log.With(logger, "component", "statusclient"),
statusClient: kubeClient,
standalone: standalone,
isUwl: isUwl,
statusReporter: status.NewStatus(kubeClient, addonName, addonNamespace, statusLogger),
}, nil
}

func (s *StatusReport) UpdateStatus(ctx context.Context, t string, m string) error {
// statusClient is nil when running on the hub.
if s.statusClient == nil {
func (s *StatusReport) UpdateStatus(ctx context.Context, reason status.Reason, message string) error {
// Standalone mode is set when running on the hub cluster
// In this case, we do not need to update the status of the ObservabilityAddon
if s.standalone {
return nil
}

isUwl := false
if strings.Contains(os.Getenv("FROM"), uwlPromURL) {
isUwl = true
component := status.MetricsCollector
if s.isUwl {
component = status.UwlMetricsCollector
}

retryErr := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
addon := &oav1beta1.ObservabilityAddon{}
err := s.statusClient.Get(ctx, types.NamespacedName{
Name: name,
Namespace: namespace,
}, addon)
if err != nil {
return fmt.Errorf("failed to get ObservabilityAddon %s/%s: %w", namespace, name, err)
}

// Sort the conditions by rising LastTransitionTime
sort.Slice(addon.Status.Conditions, func(i, j int) bool {
return addon.Status.Conditions[i].LastTransitionTime.Before(&addon.Status.Conditions[j].LastTransitionTime)
})

currentCondition := addon.Status.Conditions[len(addon.Status.Conditions)-1]
newCondition := mergeCondtion(isUwl, m, currentCondition)

// If the current condition is the same, do not update
if currentCondition.Type == newCondition.Type && currentCondition.Reason == newCondition.Reason && currentCondition.Message == newCondition.Message && currentCondition.Status == newCondition.Status {
return nil
}

s.logger.Log("msg", fmt.Sprintf("Updating status of ObservabilityAddon %s/%s", namespace, name), "type", newCondition.Type, "status", newCondition.Status, "reason", newCondition.Reason)

// Reset the status of other main conditions
for i := range addon.Status.Conditions {
if slices.Contains([]string{"Available", "Degraded", "Progressing"}, addon.Status.Conditions[i].Type) {
addon.Status.Conditions[i].Status = metav1.ConditionFalse
}
}

// Set the new condition
addon.Status.Conditions = mutateOrAppend(addon.Status.Conditions, newCondition)

if err := s.statusClient.Status().Update(ctx, addon); err != nil {
return fmt.Errorf("failed to update ObservabilityAddon %s/%s: %w", namespace, name, err)
}

return nil
})
if retryErr != nil {
return retryErr
}
return nil
}

func mergeCondtion(isUwl bool, m string, condition oav1beta1.StatusCondition) oav1beta1.StatusCondition {
messages := strings.Split(condition.Message, " ; ")
if len(messages) == 1 {
messages = append(messages, "")
}
if isUwl {
messages[1] = fmt.Sprintf("User Workload: %s", m)
} else {
messages[0] = m
}
message := messages[0]
if messages[1] != "" {
message = strings.Join(messages, " ; ")
}
conditionType := "Available"
reason := "Available"
if strings.Contains(message, "Failed") {
conditionType = "Degraded"
reason = "Degraded"
}
return oav1beta1.StatusCondition{
Type: conditionType,
Status: metav1.ConditionTrue,
Reason: reason,
Message: message,
LastTransitionTime: metav1.NewTime(time.Now()),
}
}

// mutateOrAppend updates the status conditions with the new condition.
// If the condition already exists, it updates it with the new condition.
// If the condition does not exist, it appends the new condition to the status conditions.
func mutateOrAppend(conditions []oav1beta1.StatusCondition, newCondition oav1beta1.StatusCondition) []oav1beta1.StatusCondition {
if len(conditions) == 0 {
return []oav1beta1.StatusCondition{newCondition}
if err := s.statusReporter.UpdateComponentCondition(ctx, component, reason, message); err != nil {
return err
}

for i, condition := range conditions {
if condition.Type == newCondition.Type {
// Update the existing condition
conditions[i] = newCondition
return conditions
}
}
// If the condition type does not exist, append the new condition
return append(conditions, newCondition)
return nil
}
Loading

0 comments on commit d3fa29e

Please sign in to comment.