Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve kube api usage and reduce unnecessary reconciliations #155

Merged
merged 4 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions cmd/exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ package main

import (
"net/http"
"time"

"k8s.io/apimachinery/pkg/util/runtime"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/util/workqueue"

"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
Expand All @@ -39,15 +42,21 @@ func init() {
}

func main() {
clients := clients.ClientSets{}
stop := make(chan struct{})
defer close(stop)
defer runtime.HandleCrash()

wq := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
defer wq.ShutDown()

//Getting kubeConfig and Generate ClientSets
if err := clients.GenerateClientSetFromKubeConfig(); err != nil {
clientset, err := clients.NewClientSet(stop, 5*time.Minute, wq)
if err != nil {
log.Fatalf("Unable to Get the kubeconfig, err: %v", err)
}

// Trigger the chaos metrics collection
go controller.Exporter(clients)
go controller.Exporter(clientset, wq)

//This section will start the HTTP server and expose metrics on the /metrics endpoint.
http.Handle("/metrics", promhttp.Handler())
Expand Down
27 changes: 13 additions & 14 deletions controller/collect-data.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package controller

import (
"context"
"math"
"strconv"
"strings"
Expand All @@ -12,15 +11,15 @@ import (
litmuschaosv1alpha1 "github.com/litmuschaos/chaos-operator/api/litmuschaos/v1alpha1"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
clientTypes "k8s.io/apimachinery/pkg/types"
)

//go:generate mockgen -destination=mocks/mock_collect-data.go -package=mocks github.com/litmuschaos/chaos-exporter/controller ResultCollector

// ResultCollector interface for the both functions GetResultList and getExperimentMetricsFromResult
type ResultCollector interface {
GetResultList(clients clients.ClientSets, chaosNamespace string, monitoringEnabled *MonitoringEnabled) (litmuschaosv1alpha1.ChaosResultList, error)
GetResultList(clients clients.ClientSets, chaosNamespace string, monitoringEnabled *MonitoringEnabled) ([]*v1alpha1.ChaosResult, error)
GetExperimentMetricsFromResult(chaosResult *litmuschaosv1alpha1.ChaosResult, clients clients.ClientSets) (bool, error)
SetResultDetails()
GetResultDetails() ChaosResultDetails
Expand All @@ -30,28 +29,28 @@ type ResultDetails struct {
}

// GetResultList return the result list correspond to the monitoring enabled chaosengine
func (r *ResultDetails) GetResultList(clients clients.ClientSets, chaosNamespace string, monitoringEnabled *MonitoringEnabled) (litmuschaosv1alpha1.ChaosResultList, error) {
func (r *ResultDetails) GetResultList(clients clients.ClientSets, chaosNamespace string, monitoringEnabled *MonitoringEnabled) ([]*v1alpha1.ChaosResult, error) {

chaosResultList, err := clients.LitmusClient.LitmuschaosV1alpha1().ChaosResults(chaosNamespace).List(context.Background(), metav1.ListOptions{})
chaosResultList, err := clients.ResultInformer.ChaosResults(chaosNamespace).List(labels.Everything())
if err != nil {
return litmuschaosv1alpha1.ChaosResultList{}, err
return nil, err
}
// waiting until any chaosresult found
if len(chaosResultList.Items) == 0 {
if len(chaosResultList) == 0 {
if monitoringEnabled.IsChaosResultsAvailable {
monitoringEnabled.IsChaosResultsAvailable = false
log.Warnf("No chaosresult found!")
log.Info("[Wait]: Waiting for the chaosresult ... ")
}
return litmuschaosv1alpha1.ChaosResultList{}, nil
return nil, nil
}

if !monitoringEnabled.IsChaosResultsAvailable {
log.Info("[Wait]: Cheers! Wait is over, found desired chaosresult")
monitoringEnabled.IsChaosResultsAvailable = true
}

return *chaosResultList, nil
return chaosResultList, nil
}

// GetExperimentMetricsFromResult derive all the metrics data from the chaosresult and set into resultDetails struct
Expand All @@ -61,7 +60,7 @@ func (r *ResultDetails) GetExperimentMetricsFromResult(chaosResult *litmuschaosv
if err != nil {
return false, err
}
engine, err := clients.LitmusClient.LitmuschaosV1alpha1().ChaosEngines(chaosResult.Namespace).Get(context.Background(), chaosResult.Spec.EngineName, metav1.GetOptions{})
engine, err := clients.EngineInformer.ChaosEngines(chaosResult.Namespace).Get(chaosResult.Spec.EngineName)
if err != nil {
// k8serrors.IsNotFound(err) checking k8s resource is found or not,
// It will skip this result if k8s resource is not found.
Expand Down Expand Up @@ -267,14 +266,14 @@ func getProbeSuccessPercentage(chaosResult *litmuschaosv1alpha1.ChaosResult) (fl
// getEventsForSpecificInvolvedResource derive all the events correspond to the specific resource
func getEventsForSpecificInvolvedResource(clients clients.ClientSets, resourceUID clientTypes.UID, chaosNamespace string) (corev1.EventList, error) {
finalEventList := corev1.EventList{}
eventsList, err := clients.KubeClient.CoreV1().Events(chaosNamespace).List(context.Background(), metav1.ListOptions{})
eventsList, err := clients.EventsInformer.Events(chaosNamespace).List(labels.Everything())
if err != nil {
return corev1.EventList{}, err
}

for _, event := range eventsList.Items {
if event.InvolvedObject.UID == resourceUID {
finalEventList.Items = append(finalEventList.Items, event)
for _, event := range eventsList {
if event != nil && event.InvolvedObject.UID == resourceUID {
finalEventList.Items = append(finalEventList.Items, *event)
}
}
return finalEventList, nil
Expand Down
7 changes: 6 additions & 1 deletion controller/collect-data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package controller_test

import (
"context"
"testing"

"github.com/litmuschaos/chaos-exporter/controller"
"github.com/litmuschaos/chaos-exporter/pkg/clients"
"github.com/litmuschaos/chaos-operator/api/litmuschaos/v1alpha1"
Expand All @@ -12,7 +14,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"testing"
"k8s.io/client-go/util/workqueue"
)

func TestGetResultList(t *testing.T) {
Expand Down Expand Up @@ -183,5 +185,8 @@ func CreateFakeClient(t *testing.T) clients.ClientSets {
cs := clients.ClientSets{}
cs.KubeClient = fake.NewSimpleClientset([]runtime.Object{}...)
cs.LitmusClient = litmusFakeClientSet.NewSimpleClientset([]runtime.Object{}...)
stopCh := make(chan struct{})
err := cs.SetupInformers(stopCh, cs.KubeClient, cs.LitmusClient, 0, workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()))
require.NoError(t, err)
return cs
}
20 changes: 13 additions & 7 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@ limitations under the License.
package controller

import (
"time"

"github.com/litmuschaos/chaos-exporter/pkg/clients"
"github.com/litmuschaos/chaos-exporter/pkg/log"
litmuschaosv1alpha1 "github.com/litmuschaos/chaos-operator/api/litmuschaos/v1alpha1"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/client-go/util/workqueue"
)

// Exporter continuously collects the chaos metrics for a given chaosengine
func Exporter(clients clients.ClientSets) {
func Exporter(clientSet clients.ClientSets, wq workqueue.RateLimitingInterface) {
log.Info("Started creating Metrics")
// Register the fixed (count) chaos metrics
log.Info("Registering Fixed Metrics")
Expand All @@ -35,7 +34,7 @@ func Exporter(clients clients.ClientSets) {
ResultCollector: &ResultDetails{},
}
//gaugeMetrics := GaugeMetrics{}
overallChaosResults := litmuschaosv1alpha1.ChaosResultList{}
overallChaosResults := []*litmuschaosv1alpha1.ChaosResult{}

r.GaugeMetrics.InitializeGaugeMetrics().
RegisterFixedMetrics()
Expand All @@ -45,11 +44,18 @@ func Exporter(clients clients.ClientSets) {
IsChaosEnginesAvailable: true,
}

for {
if err := r.GetLitmusChaosMetrics(clients, &overallChaosResults, &monitoringEnabled); err != nil {
// refresh metrics whenever there's a change in chaosengine or chaosresult
// or every informer resync duration, whichever is earlier
for _, done := wq.Get(); !done; _, done = wq.Get() {
needRequeue, err := r.GetLitmusChaosMetrics(clientSet, &overallChaosResults, &monitoringEnabled)
if err != nil {
log.Errorf("err: %v", err)
}
time.Sleep(1000 * time.Millisecond)
wq.Done(clients.ProcessKey)
// Add after
if needRequeue != nil {
wq.AddAfter(clients.ProcessKey, *needRequeue)
}
}
}

Expand Down
37 changes: 22 additions & 15 deletions controller/handle-result-deletion.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ import (
"fmt"
"os"
"strconv"
"time"

litmuschaosv1alpha1 "github.com/litmuschaos/chaos-operator/api/litmuschaos/v1alpha1"
)

// unsetDeletedChaosResults unset the metrics correspond to deleted chaosresults
func (gaugeMetrics *GaugeMetrics) unsetDeletedChaosResults(oldChaosResults, newChaosResults *litmuschaosv1alpha1.ChaosResultList) {
for _, oldResult := range oldChaosResults.Items {
func (gaugeMetrics *GaugeMetrics) unsetDeletedChaosResults(oldChaosResults, newChaosResults []*litmuschaosv1alpha1.ChaosResult) {
for _, oldResult := range oldChaosResults {
found := false
for _, newResult := range newChaosResults.Items {
for _, newResult := range newChaosResults {
if oldResult.UID == newResult.UID {
found = true
break
Expand All @@ -22,7 +23,7 @@ func (gaugeMetrics *GaugeMetrics) unsetDeletedChaosResults(oldChaosResults, newC
if !found {
for _, value := range resultStore[string(oldResult.UID)] {

probeSuccesPercentage, _ := getProbeSuccessPercentage(&oldResult)
probeSuccesPercentage, _ := getProbeSuccessPercentage(oldResult)
resultDetails := initialiseResult().
setName(oldResult.Name).
setNamespace(oldResult.Namespace).
Expand All @@ -46,10 +47,13 @@ func (gaugeMetrics *GaugeMetrics) unsetDeletedChaosResults(oldChaosResults, newC

// unsetOutdatedMetrics unset the metrics when chaosresult verdict changes
// if same chaosresult is continuously repeated more than scrape interval then it sets the metrics value to 0
func (gaugeMetrics *GaugeMetrics) unsetOutdatedMetrics(resultDetails ChaosResultDetails) float64 {
func (gaugeMetrics *GaugeMetrics) unsetOutdatedMetrics(resultDetails ChaosResultDetails) (float64, *time.Duration) {
scrapeTime, _ := strconv.Atoi(getEnv("TSDB_SCRAPE_INTERVAL", "10"))
result, ok := matchVerdict[string(resultDetails.UID)]
reset := false
var needRequeue *time.Duration

scrapeDuration := time.Duration(scrapeTime) * time.Second

switch ok {
case true:
Expand All @@ -58,20 +62,23 @@ func (gaugeMetrics *GaugeMetrics) unsetOutdatedMetrics(resultDetails ChaosResult
case result.Verdict != resultDetails.Verdict:
gaugeMetrics.ResultVerdict.DeleteLabelValues(resultDetails.Namespace, resultDetails.Name, resultDetails.ChaosEngineName,
resultDetails.ChaosEngineContext, result.Verdict, fmt.Sprintf("%f", result.ProbeSuccessPercentage), resultDetails.AppLabel,
resultDetails.AppNs, resultDetails.AppKind, resultDetails.WorkflowName, result.FaultName)
result.Count = 1
resultDetails.AppNs, resultDetails.AppKind, resultDetails.WorkflowName, resultDetails.FaultName)
result.Timer = time.Now()
needRequeue = &scrapeDuration
default:
// if time passed scrape time then reset the value to 0
if result.Count >= scrapeTime {
if time.Since(result.Timer) >= scrapeDuration {
reset = true
} else {
result.Count++
scrapeDuration = scrapeDuration - time.Since(result.Timer)
needRequeue = &scrapeDuration
}
}
default:
result = initialiseResultData().
setCount(1).
setTimer(time.Now()).
setVerdictReset(false)
needRequeue = &scrapeDuration
}

// update the values inside matchVerdict
Expand All @@ -80,9 +87,9 @@ func (gaugeMetrics *GaugeMetrics) unsetOutdatedMetrics(resultDetails ChaosResult
setVerdictReset(reset)

if reset {
return float64(0)
return float64(0), needRequeue
}
return float64(1)
return float64(1), needRequeue
}

// getEnv derived the ENVs and sets the default value if env contains empty value
Expand All @@ -105,7 +112,7 @@ func (resultDetails *ChaosResultDetails) setResultData() {
setAppLabel(resultDetails.AppLabel).
setVerdict(resultDetails.Verdict).
setFaultName(resultDetails.FaultName).
setCount(0).
setTimer(time.Now()).
setVerdictReset(false).
setProbeSuccesPercentage(resultDetails.ProbeSuccessPercentage)

Expand Down Expand Up @@ -164,8 +171,8 @@ func (resultData *ResultData) setFaultName(fault string) *ResultData {
}

// setCount sets the count inside resultData struct
func (resultData *ResultData) setCount(count int) *ResultData {
resultData.Count = count
func (resultData *ResultData) setTimer(timer time.Time) *ResultData {
resultData.Timer = timer
return resultData
}

Expand Down
27 changes: 12 additions & 15 deletions controller/handle-result-deletion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package controller

import (
"errors"
"testing"

"github.com/litmuschaos/chaos-operator/api/litmuschaos/v1alpha1"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"testing"
)

func Test_unsetDeletedChaosResults(t *testing.T) {
Expand All @@ -15,8 +16,8 @@ func Test_unsetDeletedChaosResults(t *testing.T) {
execFunc func(details *ChaosResultDetails)
isErr bool
resultDetails *ChaosResultDetails
oldChaosResult *v1alpha1.ChaosResultList
newChaosResult *v1alpha1.ChaosResultList
oldChaosResult []*v1alpha1.ChaosResult
newChaosResult []*v1alpha1.ChaosResult
}{
{
name: "success: deleted chaosResult",
Expand All @@ -26,21 +27,17 @@ func Test_unsetDeletedChaosResults(t *testing.T) {
resultDetails: &ChaosResultDetails{
UID: "FAKE-UID-OLD",
},
oldChaosResult: &v1alpha1.ChaosResultList{
Items: []v1alpha1.ChaosResult{
{
ObjectMeta: metav1.ObjectMeta{
UID: "FAKE-UID-OLD",
},
oldChaosResult: []*v1alpha1.ChaosResult{
{
ObjectMeta: metav1.ObjectMeta{
UID: "FAKE-UID-OLD",
},
},
},
newChaosResult: &v1alpha1.ChaosResultList{
Items: []v1alpha1.ChaosResult{
{
ObjectMeta: metav1.ObjectMeta{
UID: "FAKE-UID-NEW",
},
newChaosResult: []*v1alpha1.ChaosResult{
{
ObjectMeta: metav1.ObjectMeta{
UID: "FAKE-UID-NEW",
},
},
},
Expand Down
4 changes: 2 additions & 2 deletions controller/mocks/mock_collect-data.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading