From 8c59ca8e3105844cdfd45004a50b668d87872b08 Mon Sep 17 00:00:00 2001 From: Walter Medvedeo Date: Tue, 22 Oct 2024 16:43:11 +0200 Subject: [PATCH] kie-kogito-serverless-operator-556: Move the Jobs Service provisioning from k8s Deployment to a StatefulSet - Step1 --- Makefile | 2 +- config/rbac/builder_role.yaml | 2 + internal/controller/platform/k8s.go | 201 +++++++++++++----- .../controller/platform/services/services.go | 12 +- .../common/constants/platform_services.go | 13 ++ .../profiles/common/persistence/postgresql.go | 2 +- .../sonataflowplatform_controller_test.go | 104 ++++----- operator.yaml | 2 + 8 files changed, 236 insertions(+), 102 deletions(-) diff --git a/Makefile b/Makefile index 81ec5a13c..d09fbe8af 100644 --- a/Makefile +++ b/Makefile @@ -124,7 +124,7 @@ test: manifests generate envtest test-api ## Run tests. @$(MAKE) fmt @echo "🔍 Running controller tests..." @KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" \ - go test $(shell go list ./... | grep -v /test/) -coverprofile cover.out > /dev/null 2>&1 + go test $(shell go list ./... | grep -v /test/) -coverprofile cover.out @echo "✅ Tests completed successfully. Coverage report generated: cover.out." .PHONY: test-api diff --git a/config/rbac/builder_role.yaml b/config/rbac/builder_role.yaml index da397b80e..33a66cabe 100644 --- a/config/rbac/builder_role.yaml +++ b/config/rbac/builder_role.yaml @@ -35,6 +35,7 @@ rules: - secrets - events - deployments + - statefulsets - nodes verbs: - create @@ -59,6 +60,7 @@ rules: - secrets - events - deployments + - statefulsets - nodes verbs: - create diff --git a/internal/controller/platform/k8s.go b/internal/controller/platform/k8s.go index 9433f5a85..af0e7f7b0 100644 --- a/internal/controller/platform/k8s.go +++ b/internal/controller/platform/k8s.go @@ -85,66 +85,29 @@ func (action *serviceAction) Handle(ctx context.Context, platform *operatorapi.S } func createOrUpdateServiceComponents(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error { - if err := createOrUpdateConfigMap(ctx, client, platform, psh); err != nil { + var err error + if err = createOrUpdateConfigMap(ctx, client, platform, psh); err != nil { return err } - if err := createOrUpdateDeployment(ctx, client, platform, psh); err != nil { + if psh.GetDeploymentType() == constants.Deployment { + err = createOrUpdateDeployment(ctx, client, platform, psh) + } else { + err = createOrUpdateStatefulSet(ctx, client, platform, psh) + } + if err != nil { return err } - if err := createOrUpdateService(ctx, client, platform, psh); err != nil { + if err = createOrUpdateService(ctx, client, platform, psh); err != nil { return err } return createOrUpdateKnativeResources(ctx, client, platform, psh) } func createOrUpdateDeployment(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error { - readyProbe := &corev1.Probe{ - ProbeHandler: corev1.ProbeHandler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: constants.QuarkusHealthPathReady, - Port: variables.DefaultHTTPWorkflowPortIntStr, - Scheme: corev1.URISchemeHTTP, - }, - }, - InitialDelaySeconds: int32(45), - TimeoutSeconds: int32(10), - PeriodSeconds: int32(30), - SuccessThreshold: int32(1), - FailureThreshold: int32(4), - } - liveProbe := readyProbe.DeepCopy() - liveProbe.ProbeHandler.HTTPGet.Path = constants.QuarkusHealthPathLive - imageTag := psh.GetServiceImageName(constants.PersistenceTypeEphemeral) - serviceContainer := &corev1.Container{ - Image: imageTag, - ImagePullPolicy: kubeutil.GetImagePullPolicy(imageTag), - Env: psh.GetEnvironmentVariables(), - Resources: psh.GetPodResourceRequirements(), - ReadinessProbe: readyProbe, - LivenessProbe: liveProbe, - Ports: []corev1.ContainerPort{ - { - Name: utils.DefaultServicePortName, - ContainerPort: int32(constants.DefaultHTTPWorkflowPortInt), - Protocol: corev1.ProtocolTCP, - }, - }, - VolumeMounts: []corev1.VolumeMount{ - { - Name: "application-config", - MountPath: "/home/kogito/config", - }, - }, - } - serviceContainer = psh.ConfigurePersistence(serviceContainer) - serviceContainer, err := psh.MergeContainerSpec(serviceContainer) + serviceContainer, err := createServiceContainer(platform, psh) if err != nil { return err } - - // immutable - serviceContainer.Name = psh.GetContainerName() - replicas := psh.GetReplicaCount() kSinkInjected, err := psh.CheckKSinkInjected() if err != nil { @@ -196,7 +159,7 @@ func createOrUpdateDeployment(ctx context.Context, client client.Client, platfor return err } - // Create or Update the deployment + // Create or Update the Deployment if op, err := controllerutil.CreateOrUpdate(ctx, client, serviceDeployment, func() error { knative.SaveKnativeData(&serviceDeploymentSpec.Template.Spec, &serviceDeployment.Spec.Template.Spec) err := mergo.Merge(&(serviceDeployment.Spec), serviceDeploymentSpec, mergo.WithOverride) @@ -212,6 +175,148 @@ func createOrUpdateDeployment(ctx context.Context, client client.Client, platfor return nil } +func createOrUpdateStatefulSet(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error { + serviceContainer, err := createServiceContainer(platform, psh) + if err != nil { + return err + } + replicas := psh.GetReplicaCount() + kSinkInjected, err := psh.CheckKSinkInjected() + if err != nil { + return nil + } + if !kSinkInjected { + replicas = 0 // Wait for K_SINK injection + } + lbl, selectorLbl := getLabels(platform, psh) + serviceStatefulSetSpec := appsv1.StatefulSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: selectorLbl, + }, + Replicas: &replicas, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: lbl, + }, + Spec: corev1.PodSpec{ + Volumes: []corev1.Volume{ + { + Name: "application-config", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: psh.GetServiceCmName(), + }, + }, + }, + }, + }, + }, + }, + } + + serviceStatefulSetSpec.Template.Spec, err = psh.MergePodSpec(serviceStatefulSetSpec.Template.Spec) + if err != nil { + return err + } + kubeutil.AddOrReplaceContainer(serviceContainer.Name, *serviceContainer, &serviceStatefulSetSpec.Template.Spec) + + serviceStatefulSet := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: platform.Namespace, + Name: psh.GetServiceName(), + Labels: lbl, + }} + if err := controllerutil.SetControllerReference(platform, serviceStatefulSet, client.Scheme()); err != nil { + return err + } + + // Create or Update the StatefulSet + if op, err := controllerutil.CreateOrUpdate(ctx, client, serviceStatefulSet, func() error { + knative.SaveKnativeData(&serviceStatefulSetSpec.Template.Spec, &serviceStatefulSet.Spec.Template.Spec) + err := mergo.Merge(&(serviceStatefulSet.Spec), serviceStatefulSetSpec, mergo.WithOverride) + if err != nil { + return err + } + return nil + }); err != nil { + return err + } else { + klog.V(log.I).InfoS("StatefulSet successfully reconciled", "operation", op) + } + return nil +} + +func createReadinessProbe() *corev1.Probe { + return &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: constants.QuarkusHealthPathReady, + Port: variables.DefaultHTTPWorkflowPortIntStr, + Scheme: corev1.URISchemeHTTP, + }, + }, + InitialDelaySeconds: int32(constants.ServiceProbeInitialDelaySeconds), + TimeoutSeconds: int32(constants.ServiceProbeTimeoutSeconds), + PeriodSeconds: int32(constants.ServiceProbePeriodSeconds), + SuccessThreshold: int32(constants.ServiceProbeSuccessThreshold), + FailureThreshold: int32(constants.ServiceProbeFailureThreshold), + } +} + +func createLivenessProbe() *corev1.Probe { + return &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: constants.QuarkusHealthPathLive, + Port: variables.DefaultHTTPWorkflowPortIntStr, + Scheme: corev1.URISchemeHTTP, + }, + }, + InitialDelaySeconds: int32(constants.ServiceProbeInitialDelaySeconds), + TimeoutSeconds: int32(constants.ServiceProbeTimeoutSeconds), + PeriodSeconds: int32(constants.ServiceProbePeriodSeconds), + SuccessThreshold: int32(constants.ServiceProbeSuccessThreshold), + FailureThreshold: int32(constants.ServiceProbeFailureThreshold), + } +} + +func createServiceContainer(platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) (*corev1.Container, error) { + readyProbe := createReadinessProbe() + liveProbe := createLivenessProbe() + imageTag := psh.GetServiceImageName(constants.PersistenceTypeEphemeral) + serviceContainer := &corev1.Container{ + Image: imageTag, + ImagePullPolicy: kubeutil.GetImagePullPolicy(imageTag), + Env: psh.GetEnvironmentVariables(), + Resources: psh.GetPodResourceRequirements(), + ReadinessProbe: readyProbe, + LivenessProbe: liveProbe, + Ports: []corev1.ContainerPort{ + { + Name: utils.DefaultServicePortName, + ContainerPort: int32(constants.DefaultHTTPWorkflowPortInt), + Protocol: corev1.ProtocolTCP, + }, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "application-config", + MountPath: "/home/kogito/config", + }, + }, + } + serviceContainer = psh.ConfigurePersistence(serviceContainer) + serviceContainer, err := psh.MergeContainerSpec(serviceContainer) + if err != nil { + return nil, err + } + + // immutable + serviceContainer.Name = psh.GetContainerName() + return serviceContainer, nil +} + func createOrUpdateService(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error { lbl, selectorLbl := getLabels(platform, psh) dataSvcSpec := corev1.ServiceSpec{ diff --git a/internal/controller/platform/services/services.go b/internal/controller/platform/services/services.go index 20673b75b..3457f57a8 100644 --- a/internal/controller/platform/services/services.go +++ b/internal/controller/platform/services/services.go @@ -69,6 +69,8 @@ type PlatformServiceHandler interface { GetPodResourceRequirements() corev1.ResourceRequirements // GetReplicaCount Returns the default pod replica count for the given service GetReplicaCount() int32 + // GetDeploymentType Returns the deployment type required for the service. + GetDeploymentType() constants.DeploymentType // MergeContainerSpec performs a merge with override using the containerSpec argument and the expected values based on the service's pod template specifications. The returning // object is the merged result @@ -254,6 +256,10 @@ func (d *DataIndexHandler) GetReplicaCount() int32 { return 1 } +func (d *DataIndexHandler) GetDeploymentType() constants.DeploymentType { + return constants.Deployment +} + func (d *DataIndexHandler) GetServiceCmName() string { return fmt.Sprintf("%s-props", d.GetServiceName()) } @@ -385,6 +391,10 @@ func (j *JobServiceHandler) GetReplicaCount() int32 { return 1 } +func (j *JobServiceHandler) GetDeploymentType() constants.DeploymentType { + return constants.StatefulSet +} + func (j JobServiceHandler) MergeContainerSpec(containerSpec *corev1.Container) (*corev1.Container, error) { return mergeContainerSpec(containerSpec, &j.platform.Spec.Services.JobService.PodTemplate.Container) } @@ -705,7 +715,7 @@ func (j *JobServiceHandler) GenerateKnativeResources(platform *operatorapi.Sonat Name: j.GetServiceName(), Namespace: platform.Namespace, APIVersion: "apps/v1", - Kind: "Deployment", + Kind: string(j.GetDeploymentType()), }, }, }, diff --git a/internal/controller/profiles/common/constants/platform_services.go b/internal/controller/profiles/common/constants/platform_services.go index 4416ed7d3..5a83cee74 100644 --- a/internal/controller/profiles/common/constants/platform_services.go +++ b/internal/controller/profiles/common/constants/platform_services.go @@ -77,6 +77,12 @@ const ( DefaultDatabaseName string = "sonataflow" DefaultPostgreSQLPort int = 5432 + + ServiceProbeInitialDelaySeconds int = 45 + ServiceProbeTimeoutSeconds int = 10 + ServiceProbePeriodSeconds int = 30 + ServiceProbeSuccessThreshold int = 1 + ServiceProbeFailureThreshold int = 4 ) type PersistenceType string @@ -89,3 +95,10 @@ const ( func (p PersistenceType) String() string { return string(p) } + +type DeploymentType string + +const ( + Deployment DeploymentType = "Deployment" + StatefulSet DeploymentType = "StatefulSet" +) diff --git a/internal/controller/profiles/common/persistence/postgresql.go b/internal/controller/profiles/common/persistence/postgresql.go index 85253be98..935eae77c 100644 --- a/internal/controller/profiles/common/persistence/postgresql.go +++ b/internal/controller/profiles/common/persistence/postgresql.go @@ -34,7 +34,7 @@ const ( defaultDatabaseName = "sonataflow" ) -// ConfigurePostgreSQLEnv returns the common env variables required for the DataIndex or JobsService when postresql persistence is used. +// ConfigurePostgreSQLEnv returns the common env variables required for the DataIndex or JobsService when postgresql persistence is used. func ConfigurePostgreSQLEnv(postgresql *operatorapi.PersistencePostgreSQL, databaseSchema, databaseNamespace string) []corev1.EnvVar { dataSourcePort := constants.DefaultPostgreSQLPort databaseName := defaultDatabaseName diff --git a/internal/controller/sonataflowplatform_controller_test.go b/internal/controller/sonataflowplatform_controller_test.go index db567769b..ca345031e 100644 --- a/internal/controller/sonataflowplatform_controller_test.go +++ b/internal/controller/sonataflowplatform_controller_test.go @@ -362,14 +362,15 @@ func TestSonataFlowPlatformController(t *testing.T) { assert.Contains(t, dep.Spec.Template.Spec.Containers[0].Env, dbPassword) assert.Contains(t, dep.Spec.Template.Spec.Containers[0].Env, dbSourceDIURL) + stfSet := &appsv1.StatefulSet{} js := services.NewJobServiceHandler(ksp) - assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: js.GetServiceName(), Namespace: ksp.Namespace}, dep)) - assert.Len(t, dep.Spec.Template.Spec.Containers, 1) - assert.Equal(t, js.GetServiceImageName(constants.PersistenceTypePostgreSQL), dep.Spec.Template.Spec.Containers[0].Image) - assert.Contains(t, dep.Spec.Template.Spec.Containers[0].Env, dbSourceKind) - assert.Contains(t, dep.Spec.Template.Spec.Containers[0].Env, dbUsername) - assert.Contains(t, dep.Spec.Template.Spec.Containers[0].Env, dbPassword) - assert.Contains(t, dep.Spec.Template.Spec.Containers[0].Env, dbSourceJSURL) + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: js.GetServiceName(), Namespace: ksp.Namespace}, stfSet)) + assert.Len(t, stfSet.Spec.Template.Spec.Containers, 1) + assert.Equal(t, js.GetServiceImageName(constants.PersistenceTypePostgreSQL), stfSet.Spec.Template.Spec.Containers[0].Image) + assert.Contains(t, stfSet.Spec.Template.Spec.Containers[0].Env, dbSourceKind) + assert.Contains(t, stfSet.Spec.Template.Spec.Containers[0].Env, dbUsername) + assert.Contains(t, stfSet.Spec.Template.Spec.Containers[0].Env, dbPassword) + assert.Contains(t, stfSet.Spec.Template.Spec.Containers[0].Env, dbSourceJSURL) }) t.Run("verify that persistence options are correctly reconciled when defined in the platform and overwriten in the services spec", func(t *testing.T) { @@ -483,14 +484,15 @@ func TestSonataFlowPlatformController(t *testing.T) { assert.Contains(t, dep.Spec.Template.Spec.Containers[0].Env, dbDIPassword) assert.Contains(t, dep.Spec.Template.Spec.Containers[0].Env, corev1.EnvVar{Name: "QUARKUS_DATASOURCE_JDBC_URL", Value: urlDI}) + stfSet := &appsv1.StatefulSet{} js := services.NewJobServiceHandler(ksp) - assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: js.GetServiceName(), Namespace: ksp.Namespace}, dep)) - assert.Len(t, dep.Spec.Template.Spec.Containers, 1) - assert.Equal(t, js.GetServiceImageName(constants.PersistenceTypePostgreSQL), dep.Spec.Template.Spec.Containers[0].Image) - assert.Contains(t, dep.Spec.Template.Spec.Containers[0].Env, dbSourceKind) - assert.Contains(t, dep.Spec.Template.Spec.Containers[0].Env, dbJSUsername) - assert.Contains(t, dep.Spec.Template.Spec.Containers[0].Env, dbJSPassword) - assert.Contains(t, dep.Spec.Template.Spec.Containers[0].Env, corev1.EnvVar{Name: "QUARKUS_DATASOURCE_JDBC_URL", Value: urlJS}) + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: js.GetServiceName(), Namespace: ksp.Namespace}, stfSet)) + assert.Len(t, stfSet.Spec.Template.Spec.Containers, 1) + assert.Equal(t, js.GetServiceImageName(constants.PersistenceTypePostgreSQL), stfSet.Spec.Template.Spec.Containers[0].Image) + assert.Contains(t, stfSet.Spec.Template.Spec.Containers[0].Env, dbSourceKind) + assert.Contains(t, stfSet.Spec.Template.Spec.Containers[0].Env, dbJSUsername) + assert.Contains(t, stfSet.Spec.Template.Spec.Containers[0].Env, dbJSPassword) + assert.Contains(t, stfSet.Spec.Template.Spec.Containers[0].Env, corev1.EnvVar{Name: "QUARKUS_DATASOURCE_JDBC_URL", Value: urlJS}) }) // Job Service tests @@ -535,15 +537,15 @@ func TestSonataFlowPlatformController(t *testing.T) { assert.Equal(t, "", ksp.Status.GetTopLevelCondition().Reason) - // Check data index deployment - dep := &appsv1.Deployment{} + // Check jobs service deployment + stfSet := &appsv1.StatefulSet{} js := services.NewJobServiceHandler(ksp) - assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: js.GetServiceName(), Namespace: ksp.Namespace}, dep)) + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: js.GetServiceName(), Namespace: ksp.Namespace}, stfSet)) - assert.Len(t, dep.Spec.Template.Spec.Containers, 1) - assert.Equal(t, js.GetServiceImageName(constants.PersistenceTypeEphemeral), dep.Spec.Template.Spec.Containers[0].Image) - assert.NotContains(t, dep.Spec.Template.Spec.Containers[0].Env, envDBKind) - assert.NotContains(t, dep.Spec.Template.Spec.Containers[0].Env, envDataIndex) + assert.Len(t, stfSet.Spec.Template.Spec.Containers, 1) + assert.Equal(t, js.GetServiceImageName(constants.PersistenceTypeEphemeral), stfSet.Spec.Template.Spec.Containers[0].Image) + assert.NotContains(t, stfSet.Spec.Template.Spec.Containers[0].Env, envDBKind) + assert.NotContains(t, stfSet.Spec.Template.Spec.Containers[0].Env, envDataIndex) // Check with persistence set ksp.Spec.Services.JobService.Persistence = &v1alpha08.PersistenceOptionsSpec{PostgreSQL: &v1alpha08.PersistencePostgreSQL{ @@ -560,15 +562,15 @@ func TestSonataFlowPlatformController(t *testing.T) { t.Fatalf("reconcile: (%v)", err) } - assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: js.GetServiceName(), Namespace: ksp.Namespace}, dep)) - assert.Len(t, dep.Spec.Template.Spec.Containers, 2) - assert.Equal(t, constants.JobServiceName+"2", dep.Spec.Template.Spec.Containers[0].Name) - assert.Equal(t, "testing", dep.Spec.Template.Spec.Containers[0].TerminationMessagePath) - assert.Equal(t, constants.JobServiceName, dep.Spec.Template.Spec.Containers[1].Name) - assert.Equal(t, "testing", dep.Spec.Template.Spec.Containers[1].TerminationMessagePath) - assert.Equal(t, js.GetServiceImageName(constants.PersistenceTypePostgreSQL), dep.Spec.Template.Spec.Containers[1].Image) - assert.Contains(t, dep.Spec.Template.Spec.Containers[1].Env, envDBKind) - assert.NotContains(t, dep.Spec.Template.Spec.Containers[1].Env, envDataIndex) + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: js.GetServiceName(), Namespace: ksp.Namespace}, stfSet)) + assert.Len(t, stfSet.Spec.Template.Spec.Containers, 2) + assert.Equal(t, constants.JobServiceName+"2", stfSet.Spec.Template.Spec.Containers[0].Name) + assert.Equal(t, "testing", stfSet.Spec.Template.Spec.Containers[0].TerminationMessagePath) + assert.Equal(t, constants.JobServiceName, stfSet.Spec.Template.Spec.Containers[1].Name) + assert.Equal(t, "testing", stfSet.Spec.Template.Spec.Containers[1].TerminationMessagePath) + assert.Equal(t, js.GetServiceImageName(constants.PersistenceTypePostgreSQL), stfSet.Spec.Template.Spec.Containers[1].Image) + assert.Contains(t, stfSet.Spec.Template.Spec.Containers[1].Env, envDBKind) + assert.NotContains(t, stfSet.Spec.Template.Spec.Containers[1].Env, envDataIndex) }) t.Run("verify that a basic reconcile with job service & jdbcUrl is performed without error", func(t *testing.T) { @@ -622,14 +624,14 @@ func TestSonataFlowPlatformController(t *testing.T) { assert.Equal(t, "", ksp.Status.GetTopLevelCondition().Reason) - // Check job service deployment - dep := &appsv1.Deployment{} - assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: js.GetServiceName(), Namespace: ksp.Namespace}, dep)) + // Check job service statefulset + stfSet := &appsv1.StatefulSet{} + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: js.GetServiceName(), Namespace: ksp.Namespace}, stfSet)) - assert.Len(t, dep.Spec.Template.Spec.Containers, 1) - assert.Equal(t, js.GetServiceImageName(constants.PersistenceTypeEphemeral), dep.Spec.Template.Spec.Containers[0].Image) - assert.NotContains(t, dep.Spec.Template.Spec.Containers[0].Env, envDBKind) - assert.NotContains(t, dep.Spec.Template.Spec.Containers[0].Env, envDataIndex) + assert.Len(t, stfSet.Spec.Template.Spec.Containers, 1) + assert.Equal(t, js.GetServiceImageName(constants.PersistenceTypeEphemeral), stfSet.Spec.Template.Spec.Containers[0].Image) + assert.NotContains(t, stfSet.Spec.Template.Spec.Containers[0].Env, envDBKind) + assert.NotContains(t, stfSet.Spec.Template.Spec.Containers[0].Env, envDataIndex) // Check with persistence set url := "jdbc:postgresql://host:1234/database?currentSchema=data-index-service" @@ -646,13 +648,13 @@ func TestSonataFlowPlatformController(t *testing.T) { t.Fatalf("reconcile: (%v)", err) } - assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: js.GetServiceName(), Namespace: ksp.Namespace}, dep)) - assert.Len(t, dep.Spec.Template.Spec.Containers, 1) - assert.Equal(t, js.GetServiceImageName(constants.PersistenceTypePostgreSQL), dep.Spec.Template.Spec.Containers[0].Image) - assert.Equal(t, int32(1), *dep.Spec.Replicas) - assert.Equal(t, []string{"test:latest"}, dep.Spec.Template.Spec.Containers[0].Command) - assert.Contains(t, dep.Spec.Template.Spec.Containers[0].Env, envDBKind) - assert.NotContains(t, dep.Spec.Template.Spec.Containers[0].Env, envDataIndex) + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: js.GetServiceName(), Namespace: ksp.Namespace}, stfSet)) + assert.Len(t, stfSet.Spec.Template.Spec.Containers, 1) + assert.Equal(t, js.GetServiceImageName(constants.PersistenceTypePostgreSQL), stfSet.Spec.Template.Spec.Containers[0].Image) + assert.Equal(t, int32(1), *stfSet.Spec.Replicas) + assert.Equal(t, []string{"test:latest"}, stfSet.Spec.Template.Spec.Containers[0].Command) + assert.Contains(t, stfSet.Spec.Template.Spec.Containers[0].Env, envDBKind) + assert.NotContains(t, stfSet.Spec.Template.Spec.Containers[0].Env, envDataIndex) }) t.Run("verify that a default deployment of a job and data index service will is performed without error", func(t *testing.T) { @@ -710,14 +712,14 @@ func TestSonataFlowPlatformController(t *testing.T) { assert.NotContains(t, dep.Spec.Template.Spec.Containers[0].Env, envDBKind) assert.Contains(t, dep.Spec.Template.Spec.Containers[0].Env, envDataIndex) - // Check job service deployment - dep = &appsv1.Deployment{} - assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: js.GetServiceName(), Namespace: ksp.Namespace}, dep)) + // Check job service statefulset + stfSet := &appsv1.StatefulSet{} + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: js.GetServiceName(), Namespace: ksp.Namespace}, stfSet)) - assert.Len(t, dep.Spec.Template.Spec.Containers, 1) - assert.Equal(t, js.GetServiceImageName(constants.PersistenceTypeEphemeral), dep.Spec.Template.Spec.Containers[0].Image) - assert.NotContains(t, dep.Spec.Template.Spec.Containers[0].Env, envDBKind) - assert.NotContains(t, dep.Spec.Template.Spec.Containers[0].Env, envDataIndex) + assert.Len(t, stfSet.Spec.Template.Spec.Containers, 1) + assert.Equal(t, js.GetServiceImageName(constants.PersistenceTypeEphemeral), stfSet.Spec.Template.Spec.Containers[0].Image) + assert.NotContains(t, stfSet.Spec.Template.Spec.Containers[0].Env, envDBKind) + assert.NotContains(t, stfSet.Spec.Template.Spec.Containers[0].Env, envDataIndex) }) t.Run("verify that a basic reconcile of a cluster platform is performed without error", func(t *testing.T) { diff --git a/operator.yaml b/operator.yaml index 98f7959ac..97983376b 100644 --- a/operator.yaml +++ b/operator.yaml @@ -27602,6 +27602,7 @@ rules: - secrets - events - deployments + - statefulsets - nodes verbs: - create @@ -27626,6 +27627,7 @@ rules: - secrets - events - deployments + - statefulsets - nodes verbs: - create