Skip to content

Commit

Permalink
kie-kogito-serverless-operator-556: Move the Jobs Service provisionin…
Browse files Browse the repository at this point in the history
…g from k8s Deployment to a StatefulSet

    - Step1
  • Loading branch information
wmedvede committed Oct 22, 2024
1 parent 5eddf46 commit 8c59ca8
Show file tree
Hide file tree
Showing 8 changed files with 236 additions and 102 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ test: manifests generate envtest test-api ## Run tests.
@$(MAKE) fmt
@echo "🔍 Running controller tests..."
@KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" \
go test $(shell go list ./... | grep -v /test/) -coverprofile cover.out > /dev/null 2>&1
go test $(shell go list ./... | grep -v /test/) -coverprofile cover.out
@echo "✅ Tests completed successfully. Coverage report generated: cover.out."

.PHONY: test-api
Expand Down
2 changes: 2 additions & 0 deletions config/rbac/builder_role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ rules:
- secrets
- events
- deployments
- statefulsets
- nodes
verbs:
- create
Expand All @@ -59,6 +60,7 @@ rules:
- secrets
- events
- deployments
- statefulsets
- nodes
verbs:
- create
Expand Down
201 changes: 153 additions & 48 deletions internal/controller/platform/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,66 +85,29 @@ func (action *serviceAction) Handle(ctx context.Context, platform *operatorapi.S
}

func createOrUpdateServiceComponents(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error {
if err := createOrUpdateConfigMap(ctx, client, platform, psh); err != nil {
var err error
if err = createOrUpdateConfigMap(ctx, client, platform, psh); err != nil {
return err
}
if err := createOrUpdateDeployment(ctx, client, platform, psh); err != nil {
if psh.GetDeploymentType() == constants.Deployment {
err = createOrUpdateDeployment(ctx, client, platform, psh)
} else {
err = createOrUpdateStatefulSet(ctx, client, platform, psh)
}
if err != nil {
return err
}
if err := createOrUpdateService(ctx, client, platform, psh); err != nil {
if err = createOrUpdateService(ctx, client, platform, psh); err != nil {
return err
}
return createOrUpdateKnativeResources(ctx, client, platform, psh)
}

func createOrUpdateDeployment(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error {
readyProbe := &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: constants.QuarkusHealthPathReady,
Port: variables.DefaultHTTPWorkflowPortIntStr,
Scheme: corev1.URISchemeHTTP,
},
},
InitialDelaySeconds: int32(45),
TimeoutSeconds: int32(10),
PeriodSeconds: int32(30),
SuccessThreshold: int32(1),
FailureThreshold: int32(4),
}
liveProbe := readyProbe.DeepCopy()
liveProbe.ProbeHandler.HTTPGet.Path = constants.QuarkusHealthPathLive
imageTag := psh.GetServiceImageName(constants.PersistenceTypeEphemeral)
serviceContainer := &corev1.Container{
Image: imageTag,
ImagePullPolicy: kubeutil.GetImagePullPolicy(imageTag),
Env: psh.GetEnvironmentVariables(),
Resources: psh.GetPodResourceRequirements(),
ReadinessProbe: readyProbe,
LivenessProbe: liveProbe,
Ports: []corev1.ContainerPort{
{
Name: utils.DefaultServicePortName,
ContainerPort: int32(constants.DefaultHTTPWorkflowPortInt),
Protocol: corev1.ProtocolTCP,
},
},
VolumeMounts: []corev1.VolumeMount{
{
Name: "application-config",
MountPath: "/home/kogito/config",
},
},
}
serviceContainer = psh.ConfigurePersistence(serviceContainer)
serviceContainer, err := psh.MergeContainerSpec(serviceContainer)
serviceContainer, err := createServiceContainer(platform, psh)
if err != nil {
return err
}

// immutable
serviceContainer.Name = psh.GetContainerName()

replicas := psh.GetReplicaCount()
kSinkInjected, err := psh.CheckKSinkInjected()
if err != nil {
Expand Down Expand Up @@ -196,7 +159,7 @@ func createOrUpdateDeployment(ctx context.Context, client client.Client, platfor
return err
}

// Create or Update the deployment
// Create or Update the Deployment
if op, err := controllerutil.CreateOrUpdate(ctx, client, serviceDeployment, func() error {
knative.SaveKnativeData(&serviceDeploymentSpec.Template.Spec, &serviceDeployment.Spec.Template.Spec)
err := mergo.Merge(&(serviceDeployment.Spec), serviceDeploymentSpec, mergo.WithOverride)
Expand All @@ -212,6 +175,148 @@ func createOrUpdateDeployment(ctx context.Context, client client.Client, platfor
return nil
}

func createOrUpdateStatefulSet(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error {
serviceContainer, err := createServiceContainer(platform, psh)
if err != nil {
return err
}
replicas := psh.GetReplicaCount()
kSinkInjected, err := psh.CheckKSinkInjected()
if err != nil {
return nil
}
if !kSinkInjected {
replicas = 0 // Wait for K_SINK injection
}
lbl, selectorLbl := getLabels(platform, psh)
serviceStatefulSetSpec := appsv1.StatefulSetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: selectorLbl,
},
Replicas: &replicas,
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: lbl,
},
Spec: corev1.PodSpec{
Volumes: []corev1.Volume{
{
Name: "application-config",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: psh.GetServiceCmName(),
},
},
},
},
},
},
},
}

serviceStatefulSetSpec.Template.Spec, err = psh.MergePodSpec(serviceStatefulSetSpec.Template.Spec)
if err != nil {
return err
}
kubeutil.AddOrReplaceContainer(serviceContainer.Name, *serviceContainer, &serviceStatefulSetSpec.Template.Spec)

serviceStatefulSet := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: platform.Namespace,
Name: psh.GetServiceName(),
Labels: lbl,
}}
if err := controllerutil.SetControllerReference(platform, serviceStatefulSet, client.Scheme()); err != nil {
return err
}

// Create or Update the StatefulSet
if op, err := controllerutil.CreateOrUpdate(ctx, client, serviceStatefulSet, func() error {
knative.SaveKnativeData(&serviceStatefulSetSpec.Template.Spec, &serviceStatefulSet.Spec.Template.Spec)
err := mergo.Merge(&(serviceStatefulSet.Spec), serviceStatefulSetSpec, mergo.WithOverride)
if err != nil {
return err
}
return nil
}); err != nil {
return err
} else {
klog.V(log.I).InfoS("StatefulSet successfully reconciled", "operation", op)
}
return nil
}

func createReadinessProbe() *corev1.Probe {
return &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: constants.QuarkusHealthPathReady,
Port: variables.DefaultHTTPWorkflowPortIntStr,
Scheme: corev1.URISchemeHTTP,
},
},
InitialDelaySeconds: int32(constants.ServiceProbeInitialDelaySeconds),
TimeoutSeconds: int32(constants.ServiceProbeTimeoutSeconds),
PeriodSeconds: int32(constants.ServiceProbePeriodSeconds),
SuccessThreshold: int32(constants.ServiceProbeSuccessThreshold),
FailureThreshold: int32(constants.ServiceProbeFailureThreshold),
}
}

func createLivenessProbe() *corev1.Probe {
return &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: constants.QuarkusHealthPathLive,
Port: variables.DefaultHTTPWorkflowPortIntStr,
Scheme: corev1.URISchemeHTTP,
},
},
InitialDelaySeconds: int32(constants.ServiceProbeInitialDelaySeconds),
TimeoutSeconds: int32(constants.ServiceProbeTimeoutSeconds),
PeriodSeconds: int32(constants.ServiceProbePeriodSeconds),
SuccessThreshold: int32(constants.ServiceProbeSuccessThreshold),
FailureThreshold: int32(constants.ServiceProbeFailureThreshold),
}
}

func createServiceContainer(platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) (*corev1.Container, error) {
readyProbe := createReadinessProbe()
liveProbe := createLivenessProbe()
imageTag := psh.GetServiceImageName(constants.PersistenceTypeEphemeral)
serviceContainer := &corev1.Container{
Image: imageTag,
ImagePullPolicy: kubeutil.GetImagePullPolicy(imageTag),
Env: psh.GetEnvironmentVariables(),
Resources: psh.GetPodResourceRequirements(),
ReadinessProbe: readyProbe,
LivenessProbe: liveProbe,
Ports: []corev1.ContainerPort{
{
Name: utils.DefaultServicePortName,
ContainerPort: int32(constants.DefaultHTTPWorkflowPortInt),
Protocol: corev1.ProtocolTCP,
},
},
VolumeMounts: []corev1.VolumeMount{
{
Name: "application-config",
MountPath: "/home/kogito/config",
},
},
}
serviceContainer = psh.ConfigurePersistence(serviceContainer)
serviceContainer, err := psh.MergeContainerSpec(serviceContainer)
if err != nil {
return nil, err
}

// immutable
serviceContainer.Name = psh.GetContainerName()
return serviceContainer, nil
}

func createOrUpdateService(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error {
lbl, selectorLbl := getLabels(platform, psh)
dataSvcSpec := corev1.ServiceSpec{
Expand Down
12 changes: 11 additions & 1 deletion internal/controller/platform/services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type PlatformServiceHandler interface {
GetPodResourceRequirements() corev1.ResourceRequirements
// GetReplicaCount Returns the default pod replica count for the given service
GetReplicaCount() int32
// GetDeploymentType Returns the deployment type required for the service.
GetDeploymentType() constants.DeploymentType

// MergeContainerSpec performs a merge with override using the containerSpec argument and the expected values based on the service's pod template specifications. The returning
// object is the merged result
Expand Down Expand Up @@ -254,6 +256,10 @@ func (d *DataIndexHandler) GetReplicaCount() int32 {
return 1
}

func (d *DataIndexHandler) GetDeploymentType() constants.DeploymentType {
return constants.Deployment
}

func (d *DataIndexHandler) GetServiceCmName() string {
return fmt.Sprintf("%s-props", d.GetServiceName())
}
Expand Down Expand Up @@ -385,6 +391,10 @@ func (j *JobServiceHandler) GetReplicaCount() int32 {
return 1
}

func (j *JobServiceHandler) GetDeploymentType() constants.DeploymentType {
return constants.StatefulSet
}

func (j JobServiceHandler) MergeContainerSpec(containerSpec *corev1.Container) (*corev1.Container, error) {
return mergeContainerSpec(containerSpec, &j.platform.Spec.Services.JobService.PodTemplate.Container)
}
Expand Down Expand Up @@ -705,7 +715,7 @@ func (j *JobServiceHandler) GenerateKnativeResources(platform *operatorapi.Sonat
Name: j.GetServiceName(),
Namespace: platform.Namespace,
APIVersion: "apps/v1",
Kind: "Deployment",
Kind: string(j.GetDeploymentType()),
},
},
},
Expand Down
13 changes: 13 additions & 0 deletions internal/controller/profiles/common/constants/platform_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ const (

DefaultDatabaseName string = "sonataflow"
DefaultPostgreSQLPort int = 5432

ServiceProbeInitialDelaySeconds int = 45
ServiceProbeTimeoutSeconds int = 10
ServiceProbePeriodSeconds int = 30
ServiceProbeSuccessThreshold int = 1
ServiceProbeFailureThreshold int = 4
)

type PersistenceType string
Expand All @@ -89,3 +95,10 @@ const (
func (p PersistenceType) String() string {
return string(p)
}

type DeploymentType string

const (
Deployment DeploymentType = "Deployment"
StatefulSet DeploymentType = "StatefulSet"
)
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const (
defaultDatabaseName = "sonataflow"
)

// ConfigurePostgreSQLEnv returns the common env variables required for the DataIndex or JobsService when postresql persistence is used.
// ConfigurePostgreSQLEnv returns the common env variables required for the DataIndex or JobsService when postgresql persistence is used.
func ConfigurePostgreSQLEnv(postgresql *operatorapi.PersistencePostgreSQL, databaseSchema, databaseNamespace string) []corev1.EnvVar {
dataSourcePort := constants.DefaultPostgreSQLPort
databaseName := defaultDatabaseName
Expand Down
Loading

0 comments on commit 8c59ca8

Please sign in to comment.