Skip to content
This repository has been archived by the owner on Oct 21, 2024. It is now read-only.

feat!: add support to StatefulSet #59

Merged
merged 17 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
linters:
enable:
- exhaustruct
- exportloopref
- gomnd
- staticcheck
- exhaustive
max-issues-per-linter: 0
sort-results: true
8 changes: 4 additions & 4 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
gomod2nix.url = "github:nix-community/gomod2nix";
gomod2nix.inputs.nixpkgs.follows = "nixpkgs";
gomod2nix.inputs.flake-utils.follows = "flake-utils";
kardinal.url = "github:kurtosis-tech/kardinal/b5a991623aeb";
kardinal.url = "github:kurtosis-tech/kardinal/a1632d5aecd8";
};
outputs = {
self,
Expand Down
113 changes: 84 additions & 29 deletions kontrol-service/api/server.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions kontrol-service/database/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ type Tenant struct {
TenantId string `gorm:"uniqueIndex"`
BaseClusterTopology datatypes.JSON
ServiceConfigs datatypes.JSON
DeploymentConfigs datatypes.JSON
StatefulSetConfigs datatypes.JSON
IngressConfigs datatypes.JSON
GatewayConfigs datatypes.JSON
RouteConfigs datatypes.JSON
Expand Down
146 changes: 122 additions & 24 deletions kontrol-service/engine/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,36 @@ package engine

import (
"fmt"
"strings"

apitypes "github.com/kurtosis-tech/kardinal/libs/cli-kontrol-api/api/golang/types"
"github.com/kurtosis-tech/stacktrace"
"github.com/samber/lo"
"github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"

corev1 "k8s.io/api/core/v1"
net "k8s.io/api/networking/v1"
gateway "sigs.k8s.io/gateway-api/apis/v1"
"strings"

"kardinal.kontrol-service/engine/flow"
"kardinal.kontrol-service/plugins"
"kardinal.kontrol-service/types/cluster_topology/resolved"
"kardinal.kontrol-service/types/flow_spec"
kardinal "kardinal.kontrol-service/types/kardinal"
)

func GenerateProdOnlyCluster(
flowID string,
serviceConfigs []apitypes.ServiceConfig,
deploymentConfigs []apitypes.DeploymentConfig,
statefulSetConfigs []apitypes.StatefulSetConfig,
ingressConfigs []apitypes.IngressConfig,
gatewayConfigs []apitypes.GatewayConfig,
routeConfigs []apitypes.RouteConfig,
namespace string,
) (*resolved.ClusterTopology, error) {
clusterTopology, err := generateClusterTopology(serviceConfigs, ingressConfigs, gatewayConfigs, routeConfigs, namespace, flowID)
clusterTopology, err := generateClusterTopology(serviceConfigs, deploymentConfigs, statefulSetConfigs, ingressConfigs, gatewayConfigs, routeConfigs, namespace, flowID)
if err != nil {
return nil, stacktrace.Propagate(err, "An error occurred generating the cluster topology from the service configs")
}
Expand All @@ -42,18 +47,16 @@ func GenerateProdDevCluster(baseClusterTopologyMaybeWithTemplateOverrides *resol
if err != nil {
return nil, stacktrace.Propagate(err, "Service with UUID %s not found", devServiceName)
}
if devService.DeploymentSpec == nil {
return nil, stacktrace.NewError("Service with UUID %s has no DeploymentSpec", devServiceName)
}

deploymentSpec := flow.DeepCopyDeploymentSpec(devService.DeploymentSpec)
workloadSpec := devService.WorkloadSpec
clonedWorkloadSpec := workloadSpec.DeepCopy()

// TODO: find a better way to update deploymentSpec, this assumes there is only container in the pod
deploymentSpec.Template.Spec.Containers[0].Image = item.Image
clonedWorkloadSpec.GetTemplateSpec().Containers[0].Image = item.Image

patches = append(patches, flow_spec.ServicePatch{
Service: devServiceName,
DeploymentSpec: deploymentSpec,
Service: devServiceName,
WorkloadSpec: clonedWorkloadSpec,
})
}

Expand All @@ -72,6 +75,8 @@ func GenerateProdDevCluster(baseClusterTopologyMaybeWithTemplateOverrides *resol

func generateClusterTopology(
serviceConfigs []apitypes.ServiceConfig,
deploymentConfigs []apitypes.DeploymentConfig,
statefulSetConfig []apitypes.StatefulSetConfig,
ingressConfigs []apitypes.IngressConfig,
gatewayConfigs []apitypes.GatewayConfig,
routeConfigs []apitypes.RouteConfig,
Expand All @@ -80,7 +85,11 @@ func generateClusterTopology(
) (*resolved.ClusterTopology, error) {
clusterTopologyGatewayAndRoutes := processGatewayAndRouteConfigs(gatewayConfigs, routeConfigs, version)
clusterTopologyIngress := processIngressConfigs(ingressConfigs, version)
clusterTopologyServices, clusterTopologyServiceDependencies, err := processServiceConfigs(serviceConfigs, version)
clusterTopologyServices, clusterTopologyServiceDependencies, err := processServiceConfigs(
serviceConfigs,
deploymentConfigs,
statefulSetConfig,
version)
if err != nil {
return nil, stacktrace.NewError("an error occurred processing the service configs")
}
Expand Down Expand Up @@ -140,7 +149,58 @@ func processGatewayAndRouteConfigs(gatewayConfigs []apitypes.GatewayConfig, rout
return gatewayAndRoutes
}

func processServiceConfigs(serviceConfigs []apitypes.ServiceConfig, version string) ([]*resolved.Service, []resolved.ServiceDependency, error) {
func getDeploymentForService(
serviceConfig apitypes.ServiceConfig,
deploymentConfigs []apitypes.DeploymentConfig,
) *apitypes.DeploymentConfig {
service := serviceConfig.Service
workload, foundworkload := lo.Find(deploymentConfigs, func(workloadConfig apitypes.DeploymentConfig) bool {
deploymentLabels := workloadConfig.Deployment.GetLabels()
for key, value := range service.Spec.Selector {
label, found := deploymentLabels[key]
if !found || value != label {
return false
}
}
return true
})

if foundworkload {
return &workload
}

return nil
}

func getSatefulSetForService(
serviceConfig apitypes.ServiceConfig,
statefulSetConfigs []apitypes.StatefulSetConfig,
) *apitypes.StatefulSetConfig {
service := serviceConfig.Service
workload, foundworkload := lo.Find(statefulSetConfigs, func(workloadConfig apitypes.StatefulSetConfig) bool {
workloadLabel := workloadConfig.StatefulSet.GetLabels()
for key, value := range service.Spec.Selector {
label, found := workloadLabel[key]
if !found || value != label {
return false
}
}
return true
})

if foundworkload {
return &workload
}

return nil
}

func processServiceConfigs(
serviceConfigs []apitypes.ServiceConfig,
deploymentConfigs []apitypes.DeploymentConfig,
statefulSetConfigs []apitypes.StatefulSetConfig,
version string,
) ([]*resolved.Service, []resolved.ServiceDependency, error) {
clusterTopologyServices := []*resolved.Service{}
clusterTopologyServiceDependencies := []resolved.ServiceDependency{}
externalServicesDependencies := []resolved.ServiceDependency{}
Expand All @@ -157,7 +217,13 @@ func processServiceConfigs(serviceConfigs []apitypes.ServiceConfig, version stri

// 1- Service
logrus.Infof("Processing service: %v", service.GetObjectMeta().GetName())
clusterTopologyService := newClusterTopologyServiceFromServiceConfig(serviceConfig, version)

deploymentConfig := getDeploymentForService(serviceConfig, deploymentConfigs)
statefulSetConfig := getSatefulSetForService(serviceConfig, statefulSetConfigs)
clusterTopologyService, error := newClusterTopologyServiceFromConfigs(serviceConfig, deploymentConfig, statefulSetConfig, version)
if error != nil {
return nil, nil, stacktrace.Propagate(error, "An error occurred creating new cluster topology service from service config '%s'", service.Name)
}

// 2- Service plugins
serviceStatefulPlugins, externalServices, newExternalServicesDependencies, err := newStatefulPluginsAndExternalServicesFromServiceConfig(serviceConfig, version, &clusterTopologyService)
Expand Down Expand Up @@ -234,11 +300,11 @@ func newStatefulPluginsAndExternalServicesFromServiceConfig(serviceConfig apityp
serviceName = fmt.Sprintf("%v:%v", serviceID, "external")
}
externalService := resolved.Service{
ServiceID: serviceName,
Version: version,
ServiceSpec: nil, // leave empty for now
DeploymentSpec: nil, // leave empty for now
IsExternal: true,
ServiceID: serviceName,
Version: version,
ServiceSpec: nil, // leave empty for now
WorkloadSpec: nil, // leave empty for now
IsExternal: true,
// external services can definitely be stateful but for now treat external and stateful services as mutually exclusive to make plugin logic easier to handle
IsStateful: false,
}
Expand All @@ -259,21 +325,53 @@ func newStatefulPluginsAndExternalServicesFromServiceConfig(serviceConfig apityp
return serviceStatefulPlugins, externalServices, externalServiceDependencies, nil
}

func newClusterTopologyServiceFromServiceConfig(serviceConfig apitypes.ServiceConfig, version string) resolved.Service {
func newClusterTopologyServiceFromConfigs(
serviceConfig apitypes.ServiceConfig,
deploymentConfig *apitypes.DeploymentConfig,
statefulSetConfig *apitypes.StatefulSetConfig,
version string,
) (resolved.Service, error) {
service := serviceConfig.Service
deployment := serviceConfig.Deployment
serviceName := service.GetObjectMeta().GetName()
serviceAnnotations := service.GetObjectMeta().GetAnnotations()

clusterTopologyService := resolved.Service{
ServiceID: service.GetObjectMeta().GetName(),
Version: version,
ServiceSpec: &service.Spec,
DeploymentSpec: &deployment.Spec,
ServiceID: service.GetObjectMeta().GetName(),
Version: version,
ServiceSpec: &service.Spec,
}

if deploymentConfig == nil && statefulSetConfig == nil {
lostbean marked this conversation as resolved.
Show resolved Hide resolved
logrus.Warnf("Service %s has no workload", serviceName)
}
if deploymentConfig != nil && statefulSetConfig != nil {
workloads := []string{
deploymentConfig.Deployment.GetObjectMeta().GetName(),
statefulSetConfig.StatefulSet.GetObjectMeta().GetName(),
}
logrus.Error("Service %s is associated with more than one workload: %v", serviceName, workloads)
lostbean marked this conversation as resolved.
Show resolved Hide resolved
}
if deploymentConfig != nil {
workload := kardinal.NewDeploymentWorkloadSpec(deploymentConfig.Deployment.Spec)
clusterTopologyService.WorkloadSpec = &workload
}
if statefulSetConfig != nil {
workload := kardinal.NewStatefulSetWorkloadSpec(statefulSetConfig.StatefulSet.Spec)
clusterTopologyService.WorkloadSpec = &workload
}

// Set default for IsStateful to true if the workload is a StatefulSet, otherwise false
clusterTopologyService.IsStateful = clusterTopologyService.WorkloadSpec.IsStatefulSet()

// Override the IsStateful value by manual annotations
isStateful, ok := serviceAnnotations["kardinal.dev.service/stateful"]
if ok && isStateful == "true" {
clusterTopologyService.IsStateful = true
}
if ok && isStateful == "false" {
clusterTopologyService.IsStateful = false
}

isExternal, ok := serviceAnnotations["kardinal.dev.service/external"]
if ok && isExternal == "true" {
clusterTopologyService.IsExternal = true
Expand All @@ -283,7 +381,7 @@ func newClusterTopologyServiceFromServiceConfig(serviceConfig apitypes.ServiceCo
if ok && isShared == "true" {
clusterTopologyService.IsShared = true
}
return clusterTopologyService
return clusterTopologyService, nil
}

func getServiceAndPortFromClusterTopologyServices(serviceName string, servicePortName string, clusterTopologyServices []*resolved.Service) (*resolved.Service, *corev1.ServicePort, error) {
Expand Down
14 changes: 8 additions & 6 deletions kontrol-service/engine/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ import (
)

func TestServiceConfigsToClusterTopology(t *testing.T) {
testServiceConfigs := test.GetServiceConfigs()
testServiceConfigs, testDeploymentConfigs := test.GetServiceConfigs()
testVersion := "prod"
testNamespace := "prod"

testIngressConfigs := test.GetIngressConfigs()
testGatewayConfigs := []apitypes.GatewayConfig{}
testRouteConfigs := []apitypes.RouteConfig{}
cluster, err := generateClusterTopology(testServiceConfigs, testIngressConfigs, testGatewayConfigs, testRouteConfigs, testVersion, testNamespace)
testStatefulSetConfigs := []apitypes.StatefulSetConfig{}
cluster, err := generateClusterTopology(testServiceConfigs, testDeploymentConfigs, testStatefulSetConfigs, testIngressConfigs, testGatewayConfigs, testRouteConfigs, testVersion, testNamespace)
if err != nil {
t.Errorf("Error generating cluster: %s", err)
}
Expand All @@ -29,14 +30,14 @@ func TestServiceConfigsToClusterTopology(t *testing.T) {
statefulPlugin := redisProdService.StatefulPlugins[0]
require.Equal(t, statefulPlugin.Name, "github.com/kardinaldev/redis-db-sidecar-plugin:36ed9a4")
require.Equal(t, *redisProdService.ServiceSpec, testServiceConfigs[0].Service.Spec)
require.Equal(t, *redisProdService.DeploymentSpec, testServiceConfigs[0].Deployment.Spec)
require.Equal(t, *redisProdService.WorkloadSpec.GetTemplateSpec(), testDeploymentConfigs[0].Deployment.Spec.Template.Spec)

votingAppUIService := cluster.Services[1]
require.Equal(t, votingAppUIService.ServiceID, "voting-app-ui")
require.Equal(t, votingAppUIService.IsExternal, false)
require.Equal(t, votingAppUIService.IsStateful, false)
require.Equal(t, *votingAppUIService.ServiceSpec, testServiceConfigs[1].Service.Spec)
require.Equal(t, *votingAppUIService.DeploymentSpec, testServiceConfigs[1].Deployment.Spec)
require.Equal(t, *votingAppUIService.WorkloadSpec.GetTemplateSpec(), testDeploymentConfigs[1].Deployment.Spec.Template.Spec)

dependency := cluster.ServiceDependencies[0]
require.Equal(t, dependency.Service, votingAppUIService)
Expand All @@ -48,17 +49,18 @@ func TestServiceConfigsToClusterTopology(t *testing.T) {
}

func TestIngressConfigsTakePrecedenceOverK8sServicesActingAsIngresses(t *testing.T) {
testServiceConfigs := test.GetServiceConfigs()
testServiceConfigs, testDeploymentConfigs := test.GetServiceConfigs()

// use an Ingress Config
// this should take precedence over any Ingress defined elsewhere in the k8s manifest
testIngressConfigs := test.GetIngressConfigs()
testGatewayConfigs := []apitypes.GatewayConfig{}
testRouteConfigs := []apitypes.RouteConfig{}
testStatefulSetConfigs := []apitypes.StatefulSetConfig{}
testVersion := "prod"
testNamespace := "prod"

cluster, err := generateClusterTopology(testServiceConfigs, testIngressConfigs, testGatewayConfigs, testRouteConfigs, testVersion, testNamespace)
cluster, err := generateClusterTopology(testServiceConfigs, testDeploymentConfigs, testStatefulSetConfigs, testIngressConfigs, testGatewayConfigs, testRouteConfigs, testVersion, testNamespace)
if err != nil {
t.Errorf("Error generating cluster: %s", err)
}
Expand Down
Loading
Loading