Skip to content

Commit

Permalink
Fix SonataFlowPlatform reconciliation objects update
Browse files Browse the repository at this point in the history
Signed-off-by: Ricardo Zanini <[email protected]>
  • Loading branch information
ricardozanini committed Sep 20, 2024
1 parent 35e9c80 commit 1d10c78
Show file tree
Hide file tree
Showing 13 changed files with 186 additions and 152 deletions.
2 changes: 1 addition & 1 deletion api/v1alpha08/sonataflowplatform_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ type SonataFlowPlatformStatus struct {
// Version the operator version controlling this Platform
//+operator-sdk:csv:customresourcedefinitions:type=status,displayName="version"
Version string `json:"version,omitempty"`
// Info generic information related to the build
// Info generic information related to the Platform
//+operator-sdk:csv:customresourcedefinitions:type=status,displayName="info"
Info map[string]string `json:"info,omitempty"`
// ClusterPlatformRef information related to the (optional) active SonataFlowClusterPlatform
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ metadata:
capabilities: Basic Install
categories: Application Runtime
containerImage: docker.io/apache/incubator-kie-sonataflow-operator:main
createdAt: "2024-09-19T22:18:35Z"
createdAt: "2024-09-20T21:17:19Z"
description: SonataFlow Kubernetes Operator for deploying workflow applications
based on the CNCF Serverless Workflow specification
operators.operatorframework.io/builder: operator-sdk-v1.35.0
Expand Down Expand Up @@ -278,7 +278,7 @@ spec:
SonataFlowClusterPlatform
displayName: clusterPlatformRef
path: clusterPlatformRef
- description: Info generic information related to the build
- description: Info generic information related to the Platform
displayName: info
path: info
- description: Triggers list of triggers created for the SonataFlowPlatform
Expand Down
2 changes: 1 addition & 1 deletion bundle/manifests/sonataflow.org_sonataflowplatforms.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18359,7 +18359,7 @@ spec:
info:
additionalProperties:
type: string
description: Info generic information related to the build
description: Info generic information related to the Platform
type: object
observedGeneration:
description: The generation observed by the deployment controller.
Expand Down
2 changes: 1 addition & 1 deletion config/crd/bases/sonataflow.org_sonataflowplatforms.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18360,7 +18360,7 @@ spec:
info:
additionalProperties:
type: string
description: Info generic information related to the build
description: Info generic information related to the Platform
type: object
observedGeneration:
description: The generation observed by the deployment controller.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ spec:
SonataFlowClusterPlatform
displayName: clusterPlatformRef
path: clusterPlatformRef
- description: Info generic information related to the build
- description: Info generic information related to the Platform
displayName: info
path: info
- description: Triggers list of triggers created for the SonataFlowPlatform
Expand Down
148 changes: 136 additions & 12 deletions internal/controller/platform/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ package platform

import (
"context"
"runtime"
"time"

"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/workflowdef"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/yaml"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -59,25 +67,141 @@ func CreateOrUpdateWithDefaults(ctx context.Context, p *operatorapi.SonataFlowPl
klog.V(log.I).InfoS("Maven Timeout set", "timeout", p.Spec.Build.Config.Timeout.Duration)
}

return createOrUpdatePlatform(ctx, p)
return createPlatformIfNotExists(ctx, p)
}

func setPlatformDefaults(p *operatorapi.SonataFlowPlatform, verbose bool) error {
if p.Spec.Build.Config.BuildStrategyOptions == nil {
klog.V(log.D).InfoS("SonataFlow Platform: setting publish strategy options", "namespace", p.Namespace)
p.Spec.Build.Config.BuildStrategyOptions = map[string]string{}
}

if p.Spec.Build.Config.GetTimeout().Duration != 0 {
d := p.Spec.Build.Config.GetTimeout().Duration.Truncate(time.Second)

if verbose && p.Spec.Build.Config.Timeout.Duration != d {
klog.V(log.I).InfoS("ContainerBuild timeout minimum unit is sec", "configured", p.Spec.Build.Config.GetTimeout().Duration, "truncated", d)
}

klog.V(log.D).InfoS("SonataFlow Platform: setting build timeout", "namespace", p.Namespace)
p.Spec.Build.Config.Timeout = &metav1.Duration{
Duration: d,
}
} else {
klog.V(log.D).InfoS("SonataFlow Platform setting default build timeout to 5 minutes", "namespace", p.Namespace)
p.Spec.Build.Config.Timeout = &metav1.Duration{
Duration: 5 * time.Minute,
}
}

if p.Spec.Build.Config.IsStrategyOptionEnabled(kanikoBuildCacheEnabled) {
p.Spec.Build.Config.BuildStrategyOptions[kanikoPVCName] = p.Name
if len(p.Spec.Build.Config.BaseImage) == 0 {
p.Spec.Build.Config.BaseImage = workflowdef.GetDefaultWorkflowBuilderImageTag()
}
}

if p.Spec.Build.Config.BuildStrategy == operatorapi.OperatorBuildStrategy && !p.Spec.Build.Config.IsStrategyOptionEnabled(kanikoBuildCacheEnabled) {
// Default to disabling Kaniko cache warmer
// Using the cache warmer pod seems unreliable with the current Kaniko version
// and requires relying on a persistent volume.
defaultKanikoBuildCache := "false"
p.Spec.Build.Config.BuildStrategyOptions[kanikoBuildCacheEnabled] = defaultKanikoBuildCache
if verbose {
klog.V(log.I).InfoS("Kaniko cache set", "value", defaultKanikoBuildCache)
}
}

// When dataIndex object set, default to enabled if bool not set
if p.Spec.Services != nil {
var enable = true
if p.Spec.Services.DataIndex != nil && p.Spec.Services.DataIndex.Enabled == nil {
p.Spec.Services.DataIndex.Enabled = &enable
}
// When the JobService field has a value, default to enabled if the `Enabled` field's value is nil
if p.Spec.Services.JobService != nil && p.Spec.Services.JobService.Enabled == nil {
p.Spec.Services.JobService.Enabled = &enable
}
}
setStatusAdditionalInfo(p)

if verbose {
klog.V(log.I).InfoS("baseImage set", "value", p.Spec.Build.Config.BaseImage)
klog.V(log.I).InfoS("Timeout set", "value", p.Spec.Build.Config.GetTimeout())
}
return nil
}

func setStatusAdditionalInfo(platform *operatorapi.SonataFlowPlatform) {
platform.Status.Info = make(map[string]string)

klog.V(log.D).InfoS("SonataFlow setting status info", "namespace", platform.Namespace)
platform.Status.Info["goVersion"] = runtime.Version()
platform.Status.Info["goOS"] = runtime.GOOS
}

func createOrUpdatePlatform(ctx context.Context, p *operatorapi.SonataFlowPlatform) error {
config := operatorapi.SonataFlowPlatform{}
err := utils.GetClient().Get(ctx, ctrl.ObjectKey{Namespace: p.Namespace, Name: p.Name}, &config)
func configureRegistry(ctx context.Context, p *operatorapi.SonataFlowPlatform, verbose bool) error {
if p.Spec.Build.Config.BuildStrategy == operatorapi.PlatformBuildStrategy && p.Status.Cluster == operatorapi.PlatformClusterOpenShift {
p.Spec.Build.Config.Registry = operatorapi.RegistrySpec{}
klog.V(log.D).InfoS("Platform registry not set and ignored on openshift cluster")
return nil
}

if p.Spec.Build.Config.Registry.Address == "" && p.Status.Cluster == operatorapi.PlatformClusterKubernetes {
// try KEP-1755
address, err := getRegistryAddress(ctx)
if err != nil && verbose {
klog.V(log.E).ErrorS(err, "Cannot find a registry where to push images via KEP-1755")
} else if err == nil && address != nil {
p.Spec.Build.Config.Registry.Address = *address
}
}

klog.V(log.D).InfoS("Final Registry Address", "address", p.Spec.Build.Config.Registry.Address)
return nil
}

// getRegistryAddress KEP-1755
// https://github.com/kubernetes/enhancements/tree/master/keps/sig-cluster-lifecycle/generic/1755-communicating-a-local-registry
func getRegistryAddress(ctx context.Context) (*string, error) {
config := corev1.ConfigMap{}
err := utils.GetClient().Get(ctx, ctrl.ObjectKey{Namespace: "kube-public", Name: "local-registry-hosting"}, &config)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil, nil
}
return nil, err
}
if data, ok := config.Data["localRegistryHosting.v1"]; ok {
result := LocalRegistryHostingV1{}
if err := yaml.Unmarshal([]byte(data), &result); err != nil {
return nil, err
}
return &result.HostFromClusterNetwork, nil
}
return nil, nil
}

func createPlatformIfNotExists(ctx context.Context, p *operatorapi.SonataFlowPlatform) error {
newPlt := operatorapi.SonataFlowPlatform{}
err := utils.GetClient().Get(ctx, ctrl.ObjectKey{Namespace: p.Namespace, Name: p.Name}, &newPlt)
if errors.IsNotFound(err) {
klog.V(log.D).ErrorS(err, "Platform not found, creating it")
return utils.GetClient().Create(ctx, p)
} else if err != nil {
klog.V(log.E).ErrorS(err, "Error reading the Platform")
}

// FIXME: We should never update the object within methods like this, but let the actual reconciler to do it
// https://github.com/apache/incubator-kie-kogito-serverless-operator/issues/538
if err = SafeUpdatePlatformStatus(ctx, p); err != nil {
klog.V(log.E).ErrorS(err, "Error updating the platform status")
return err
}

config.Spec = p.Spec
config.Status.Cluster = p.Status.Cluster
err = utils.GetClient().Update(ctx, &config)
if err != nil {
klog.V(log.E).ErrorS(err, "Error updating the BuildPlatform")
// FIXME: We should never update the object within methods like this, but let the actual reconciler to do it
// https://github.com/apache/incubator-kie-kogito-serverless-operator/issues/538
if err = SafeUpdatePlatform(ctx, p); err != nil {
klog.V(log.E).ErrorS(err, "Error updating the platform")
return err
}
return err
return nil
}
2 changes: 1 addition & 1 deletion internal/controller/platform/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func createOrUpdateKnativeResources(ctx context.Context, client client.Client, p
}
}

if err := client.Status().Update(ctx, platform); err != nil {
if err := SafeUpdatePlatformStatus(ctx, platform); err != nil {
return err
}

Expand Down
Loading

0 comments on commit 1d10c78

Please sign in to comment.