Skip to content
This repository has been archived by the owner on Jan 9, 2025. It is now read-only.

Commit

Permalink
manage stateful sets
Browse files Browse the repository at this point in the history
  • Loading branch information
lostbean committed Oct 1, 2024
1 parent 11d6212 commit 71ca109
Showing 1 changed file with 72 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package cluster_manager
import (
"context"
"encoding/json"
"k8s.io/apimachinery/pkg/labels"
"strings"

"k8s.io/apimachinery/pkg/labels"

"github.com/kurtosis-tech/kardinal/libs/manager-kontrol-api/api/golang/types"
"github.com/kurtosis-tech/stacktrace"
"github.com/samber/lo"
Expand Down Expand Up @@ -204,6 +205,7 @@ func (manager *ClusterManager) ApplyClusterResources(ctx context.Context, cluste
allNSs := [][]string{
lo.Uniq(lo.Map(*clusterResources.Services, func(item corev1.Service, _ int) string { return item.Namespace })),
lo.Uniq(lo.Map(*clusterResources.Deployments, func(item appsv1.Deployment, _ int) string { return item.Namespace })),
lo.Uniq(lo.Map(*clusterResources.StatefulSets, func(item appsv1.StatefulSet, _ int) string { return item.Namespace })),
lo.Uniq(lo.Map(*clusterResources.VirtualServices, func(item v1alpha3.VirtualService, _ int) string { return item.Namespace })),
lo.Uniq(lo.Map(*clusterResources.DestinationRules, func(item v1alpha3.DestinationRule, _ int) string { return item.Namespace })),
lo.Uniq(lo.Map(*clusterResources.Gateways, func(item gateway.Gateway, _ int) string { return item.Namespace })),
Expand Down Expand Up @@ -240,6 +242,12 @@ func (manager *ClusterManager) ApplyClusterResources(ctx context.Context, cluste
}
}

for _, statefulSet := range *clusterResources.StatefulSets {
if err := manager.createOrUpdateStatefulSet(ctx, &statefulSet); err != nil {
return stacktrace.Propagate(err, "An error occurred while creating or updating statefulSet '%s'", statefulSet.GetName())
}
}

for _, virtualService := range *clusterResources.VirtualServices {
if err := manager.createOrUpdateVirtualService(ctx, &virtualService); err != nil {
return stacktrace.Propagate(err, "An error occurred while creating or updating virtual service '%s'", virtualService.GetName())
Expand Down Expand Up @@ -392,6 +400,14 @@ func (manager *ClusterManager) CleanUpClusterResources(ctx context.Context, clus
}
}

// Clean up deployments
statefulSetsByNS := lo.GroupBy(*clusterResources.StatefulSets, func(item appsv1.StatefulSet) string { return item.Namespace })
for namespace, statefulSets := range statefulSetsByNS {
if err := manager.cleanUpStatefulSetsInNamespace(ctx, namespace, statefulSets); err != nil {
return stacktrace.Propagate(err, "An error occurred cleaning up statefulSets '%+v' in namespace '%s'", statefulSets, namespace)
}
}

// Cleanup authorization policies
if clusterResources.AuthorizationPolicies != nil {
authorizationPoliciesByNS := lo.GroupBy(*clusterResources.AuthorizationPolicies, func(item securityv1beta1.AuthorizationPolicy) string {
Expand Down Expand Up @@ -472,7 +488,6 @@ func (manager *ClusterManager) removeNamespace(ctx context.Context, namespace *c
}

func (manager *ClusterManager) ensureNamespace(ctx context.Context, name string) error {

if name == istioSystemNamespace {
// Some resources might be under the istio system namespace but we don't want to alter
// this namespace because it is managed by Istio
Expand Down Expand Up @@ -554,6 +569,27 @@ func (manager *ClusterManager) createOrUpdateDeployment(ctx context.Context, dep
return nil
}

func (manager *ClusterManager) createOrUpdateStatefulSet(ctx context.Context, statefulSet *appsv1.StatefulSet) error {
statefulSetClient := manager.kubernetesClient.clientSet.AppsV1().StatefulSets(statefulSet.Namespace)
existingStatefulSet, err := statefulSetClient.Get(ctx, statefulSet.Name, metav1.GetOptions{})
if err != nil {
_, err = statefulSetClient.Create(ctx, statefulSet, globalCreateOptions)
if err != nil {
return stacktrace.Propagate(err, "Failed to create statefulSet: %s", statefulSet.GetName())
}
} else {
if !deepCheckEqual(existingStatefulSet.Spec, statefulSet.Spec) {
updateStatefulSetWithRelevantValuesFromCurrentDeployment(statefulSet, existingStatefulSet)
_, err = statefulSetClient.Update(ctx, statefulSet, globalUpdateOptions)
if err != nil {
return stacktrace.Propagate(err, "Failed to update statefulSet: %s", statefulSet.GetName())
}
}
}

return nil
}

func updateDeploymentWithRelevantValuesFromCurrentDeployment(newDeployment *appsv1.Deployment, currentDeployment *appsv1.Deployment) {
newDeployment.ResourceVersion = currentDeployment.ResourceVersion
// merge annotations
Expand All @@ -570,6 +606,22 @@ func updateDeploymentWithRelevantValuesFromCurrentDeployment(newDeployment *apps
newDeployment.Spec.Template.Annotations = newAnnotations
}

func updateStatefulSetWithRelevantValuesFromCurrentDeployment(newStatefulSet *appsv1.StatefulSet, currentStatefulSet *appsv1.StatefulSet) {
newStatefulSet.ResourceVersion = currentStatefulSet.ResourceVersion
// merge annotations
newAnnotations := newStatefulSet.Spec.Template.GetAnnotations()
currentAnnotations := currentStatefulSet.Spec.Template.GetAnnotations()

for annotationKey, annotationValue := range currentAnnotations {
if annotationKey == telepresenceRestartedAtAnnotation {
// This key is necessary for Kardinal/Telepresence (https://www.telepresence.io/) integration
// keeping this annotation because otherwise the telepresence traffic-agent container will be removed from the pod
newAnnotations[annotationKey] = annotationValue
}
}
newStatefulSet.Spec.Template.Annotations = newAnnotations
}

func (manager *ClusterManager) createOrUpdateVirtualService(ctx context.Context, virtualService *v1alpha3.VirtualService) error {
virtServiceClient := manager.istioClient.clientSet.NetworkingV1alpha3().VirtualServices(virtualService.GetNamespace())

Expand Down Expand Up @@ -763,6 +815,24 @@ func (manager *ClusterManager) cleanUpDeploymentsInNamespace(ctx context.Context
return nil
}

func (manager *ClusterManager) cleanUpStatefulSetsInNamespace(ctx context.Context, namespace string, statefulSetsToKeep []appsv1.StatefulSet) error {
statefulSetClient := manager.kubernetesClient.clientSet.AppsV1().StatefulSets(namespace)
allstatefulSets, err := statefulSetClient.List(ctx, globalListOptions)
if err != nil {
return stacktrace.Propagate(err, "Failed to list statefulSets in namespace %s", namespace)
}
for _, statefulSet := range allstatefulSets.Items {
_, exists := lo.Find(statefulSetsToKeep, func(item appsv1.StatefulSet) bool { return item.Name == statefulSet.Name })
if !exists {
err = statefulSetClient.Delete(ctx, statefulSet.Name, globalDeleteOptions)
if err != nil {
return stacktrace.Propagate(err, "Failed to delete statefulSet %s", statefulSet.GetName())
}
}
}
return nil
}

func (manager *ClusterManager) cleanUpVirtualServicesInNamespace(ctx context.Context, namespace string, virtualServicesToKeep []v1alpha3.VirtualService) error {
virtServiceClient := manager.istioClient.clientSet.NetworkingV1alpha3().VirtualServices(namespace)
allVirtServices, err := virtServiceClient.List(ctx, globalListOptions)
Expand Down

0 comments on commit 71ca109

Please sign in to comment.