diff --git a/internal/controller/platform/action.go b/internal/controller/platform/action.go index 58a21ddb2..0dc082edf 100644 --- a/internal/controller/platform/action.go +++ b/internal/controller/platform/action.go @@ -22,9 +22,9 @@ package platform import ( "context" - "github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client" - v08 "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client" + corev1 "k8s.io/api/core/v1" ) // Action --. @@ -38,7 +38,7 @@ type Action interface { CanHandle(platform *v08.SonataFlowPlatform) bool // executes the handling function - Handle(ctx context.Context, platform *v08.SonataFlowPlatform) (*v08.SonataFlowPlatform, error) + Handle(ctx context.Context, platform *v08.SonataFlowPlatform) (*v08.SonataFlowPlatform, *corev1.Event, error) } type baseAction struct { diff --git a/internal/controller/platform/create.go b/internal/controller/platform/create.go index f3dbbe487..ef825a3b9 100644 --- a/internal/controller/platform/create.go +++ b/internal/controller/platform/create.go @@ -22,6 +22,8 @@ package platform import ( "context" + corev1 "k8s.io/api/core/v1" + "github.com/apache/incubator-kie-kogito-serverless-operator/api" v08 "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" ) @@ -43,9 +45,9 @@ func (action *createAction) CanHandle(platform *v08.SonataFlowPlatform) bool { return platform.Status.IsCreating() } -func (action *createAction) Handle(ctx context.Context, platform *v08.SonataFlowPlatform) (*v08.SonataFlowPlatform, error) { +func (action *createAction) Handle(ctx context.Context, platform *v08.SonataFlowPlatform) (*v08.SonataFlowPlatform, *corev1.Event, error) { //TODO: Perform the actions needed for the Platform creation platform.Status.Manager().MarkTrue(api.SucceedConditionType) - return platform, nil + return platform, nil, nil } diff --git a/internal/controller/platform/initialize.go b/internal/controller/platform/initialize.go index 9c486a12d..c46bd93b7 100644 --- a/internal/controller/platform/initialize.go +++ b/internal/controller/platform/initialize.go @@ -61,24 +61,24 @@ func (action *initializeAction) CanHandle(platform *operatorapi.SonataFlowPlatfo return platform.Status.GetTopLevelCondition().IsUnknown() || platform.Status.IsDuplicated() } -func (action *initializeAction) Handle(ctx context.Context, platform *operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, error) { +func (action *initializeAction) Handle(ctx context.Context, platform *operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, *corev1.Event, error) { duplicate, err := action.isPrimaryDuplicate(ctx, platform) if err != nil { - return nil, err + return nil, nil, err } if duplicate { // another platform already present in the namespace if !platform.Status.IsDuplicated() { plat := platform.DeepCopy() plat.Status.Manager().MarkFalse(api.SucceedConditionType, operatorapi.PlatformDuplicatedReason, "") - return plat, nil + return plat, nil, nil } - return nil, nil + return nil, nil, nil } if err = CreateOrUpdateWithDefaults(ctx, platform, true); err != nil { - return nil, err + return nil, nil, err } // nolint: staticcheck if platform.Spec.Build.Config.BuildStrategy == operatorapi.OperatorBuildStrategy { @@ -88,13 +88,13 @@ func (action *initializeAction) Handle(ctx context.Context, platform *operatorap klog.V(log.I).InfoS("Create persistent volume claim") err := createPersistentVolumeClaim(ctx, action.client, platform) if err != nil { - return nil, err + return nil, nil, err } // Create the Kaniko warmer pod that caches the base image into the SonataFlow builder volume klog.V(log.I).InfoS("Create Kaniko cache warmer pod") err = createKanikoCacheWarmerPod(ctx, action.client, platform) if err != nil { - return nil, err + return nil, nil, err } platform.Status.Manager().MarkFalse(api.SucceedConditionType, operatorapi.PlatformWarmingReason, "") } else { @@ -106,7 +106,7 @@ func (action *initializeAction) Handle(ctx context.Context, platform *operatorap } platform.Status.Version = metadata.SpecVersion - return platform, nil + return platform, nil, nil } // TODO: move this to Kaniko packages based on the platform context diff --git a/internal/controller/platform/k8s.go b/internal/controller/platform/k8s.go index 9433f5a85..3eb55c508 100644 --- a/internal/controller/platform/k8s.go +++ b/internal/controller/platform/k8s.go @@ -61,38 +61,38 @@ func (action *serviceAction) CanHandle(platform *operatorapi.SonataFlowPlatform) return platform.Status.IsReady() } -func (action *serviceAction) Handle(ctx context.Context, platform *operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, error) { +func (action *serviceAction) Handle(ctx context.Context, platform *operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, *corev1.Event, error) { // Refresh applied configuration if err := CreateOrUpdateWithDefaults(ctx, platform, false); err != nil { - return nil, err + return nil, nil, err } psDI := services.NewDataIndexHandler(platform) if psDI.IsServiceSetInSpec() { - if err := createOrUpdateServiceComponents(ctx, action.client, platform, psDI); err != nil { - return nil, err + if event, err := createOrUpdateServiceComponents(ctx, action.client, platform, psDI); err != nil { + return nil, event, err } } psJS := services.NewJobServiceHandler(platform) if psJS.IsServiceSetInSpec() { - if err := createOrUpdateServiceComponents(ctx, action.client, platform, psJS); err != nil { - return nil, err + if event, err := createOrUpdateServiceComponents(ctx, action.client, platform, psJS); err != nil { + return nil, event, err } } - return platform, nil + return platform, nil, nil } -func createOrUpdateServiceComponents(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error { +func createOrUpdateServiceComponents(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) (*corev1.Event, error) { if err := createOrUpdateConfigMap(ctx, client, platform, psh); err != nil { - return err + return nil, err } if err := createOrUpdateDeployment(ctx, client, platform, psh); err != nil { - return err + return nil, err } if err := createOrUpdateService(ctx, client, platform, psh); err != nil { - return err + return nil, err } return createOrUpdateKnativeResources(ctx, client, platform, psh) } @@ -307,24 +307,24 @@ func setSonataFlowPlatformFinalizer(ctx context.Context, c client.Client, platfo return nil } -func createOrUpdateKnativeResources(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error { +func createOrUpdateKnativeResources(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) (*corev1.Event, error) { lbl, _ := getLabels(platform, psh) - objs, err := psh.GenerateKnativeResources(platform, lbl) + objs, event, err := psh.GenerateKnativeResources(platform, lbl) if err != nil { - return err + return event, err } // Create or update triggers for _, obj := range objs { if triggerDef, ok := obj.(*eventingv1.Trigger); ok { if platform.Namespace == obj.GetNamespace() { if err := controllerutil.SetControllerReference(platform, obj, client.Scheme()); err != nil { - return err + return nil, err } } else { // This is for Knative trigger in a different namespace // Set the finalizer for trigger cleanup when the platform is deleted if err := setSonataFlowPlatformFinalizer(ctx, client, platform); err != nil { - return err + return nil, err } } trigger := &eventingv1.Trigger{ @@ -335,21 +335,21 @@ func createOrUpdateKnativeResources(ctx context.Context, client client.Client, p return nil }) if err != nil { - return err + return nil, err } addToSonataFlowPlatformTriggerList(platform, trigger) } } if err := SafeUpdatePlatformStatus(ctx, platform); err != nil { - return err + return nil, err } // Create or update sinkbindings for _, obj := range objs { if sbDef, ok := obj.(*sourcesv1.SinkBinding); ok { if err := controllerutil.SetControllerReference(platform, obj, client.Scheme()); err != nil { - return err + return nil, err } sinkBinding := &sourcesv1.SinkBinding{ ObjectMeta: sbDef.ObjectMeta, @@ -359,18 +359,24 @@ func createOrUpdateKnativeResources(ctx context.Context, client client.Client, p return nil }) if err != nil { - return err + return nil, err } kSinkInjected, err := psh.CheckKSinkInjected() if err != nil { - return err + return nil, err } if !kSinkInjected { - return fmt.Errorf("waiting for K_SINK injection for %s to complete", psh.GetServiceName()) + msg := fmt.Sprintf("waiting for K_SINK injection for %s to complete", psh.GetServiceName()) + event := &corev1.Event{ + Type: corev1.EventTypeWarning, + Reason: services.WaitingKnativeEventing, + Message: msg, + } + return event, fmt.Errorf(msg) } } } - return nil + return nil, nil } func addToSonataFlowPlatformTriggerList(platform *operatorapi.SonataFlowPlatform, trigger *eventingv1.Trigger) { diff --git a/internal/controller/platform/monitor.go b/internal/controller/platform/monitor.go index ed10326ab..d3f5c1cdd 100644 --- a/internal/controller/platform/monitor.go +++ b/internal/controller/platform/monitor.go @@ -22,6 +22,7 @@ package platform import ( "context" + corev1 "k8s.io/api/core/v1" "k8s.io/klog/v2" "github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata" @@ -46,7 +47,7 @@ func (action *monitorAction) CanHandle(platform *operatorapi.SonataFlowPlatform) return platform.Status.IsReady() } -func (action *monitorAction) Handle(ctx context.Context, platform *operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, error) { +func (action *monitorAction) Handle(ctx context.Context, platform *operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, *corev1.Event, error) { // Just track the version of the operator in the platform resource if platform.Status.Version != metadata.SpecVersion { platform.Status.Version = metadata.SpecVersion @@ -55,8 +56,8 @@ func (action *monitorAction) Handle(ctx context.Context, platform *operatorapi.S // Refresh applied configuration if err := CreateOrUpdateWithDefaults(ctx, platform, false); err != nil { - return nil, err + return nil, nil, err } - return platform, nil + return platform, nil, nil } diff --git a/internal/controller/platform/services/services.go b/internal/controller/platform/services/services.go index 20673b75b..5bff23f97 100644 --- a/internal/controller/platform/services/services.go +++ b/internal/controller/platform/services/services.go @@ -49,6 +49,7 @@ import ( const ( quarkusHibernateORMDatabaseGeneration string = "QUARKUS_HIBERNATE_ORM_DATABASE_GENERATION" quarkusFlywayMigrateAtStart string = "QUARKUS_FLYWAY_MIGRATE_AT_START" + WaitingKnativeEventing = "WaitingKnativeEventing" ) type PlatformServiceHandler interface { @@ -83,7 +84,7 @@ type PlatformServiceHandler interface { // GenerateServiceProperties returns a property object that contains the application properties required by the service deployment GenerateServiceProperties() (*properties.Properties, error) // GenerateKnativeResources returns knative resources that bridge between workflow deploys and the service - GenerateKnativeResources(platform *operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object, error) + GenerateKnativeResources(platform *operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object, *corev1.Event, error) // IsServiceSetInSpec returns true if the service is set in the spec. IsServiceSetInSpec() bool @@ -582,10 +583,10 @@ func (d *DataIndexHandler) newTrigger(labels map[string]string, brokerName, name }, } } -func (d *DataIndexHandler) GenerateKnativeResources(platform *operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object, error) { +func (d *DataIndexHandler) GenerateKnativeResources(platform *operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object, *corev1.Event, error) { broker := d.GetSourceBroker() if broker == nil || len(broker.Ref.Name) == 0 { - return nil, nil // Nothing to do + return nil, nil, nil // Nothing to do } brokerName := broker.Ref.Name namespace := broker.Ref.Namespace @@ -593,7 +594,12 @@ func (d *DataIndexHandler) GenerateKnativeResources(platform *operatorapi.Sonata namespace = platform.Namespace } if err := knative.ValidateBroker(brokerName, namespace); err != nil { - return nil, err + event := &corev1.Event{ + Type: corev1.EventTypeWarning, + Reason: WaitingKnativeEventing, + Message: err.Error(), + } + return nil, event, err } serviceName := d.GetServiceName() return []client.Object{ @@ -603,7 +609,7 @@ func (d *DataIndexHandler) GenerateKnativeResources(platform *operatorapi.Sonata d.newTrigger(lbl, brokerName, namespace, serviceName, "process-state", "ProcessInstanceStateDataEvent", constants.KogitoProcessInstancesEventsPath, platform), d.newTrigger(lbl, brokerName, namespace, serviceName, "process-variable", "ProcessInstanceVariableDataEvent", constants.KogitoProcessInstancesEventsPath, platform), d.newTrigger(lbl, brokerName, namespace, serviceName, "process-definition", "ProcessDefinitionEvent", constants.KogitoProcessDefinitionsEventsPath, platform), - d.newTrigger(lbl, brokerName, namespace, serviceName, "jobs", "JobEvent", constants.KogitoJobsPath, platform)}, nil + d.newTrigger(lbl, brokerName, namespace, serviceName, "jobs", "JobEvent", constants.KogitoJobsPath, platform)}, nil, nil } func (d JobServiceHandler) GetSourceBroker() *duckv1.Destination { @@ -620,7 +626,7 @@ func (d JobServiceHandler) GetSink() *duckv1.Destination { return GetPlatformBroker(d.platform) } -func (j *JobServiceHandler) GenerateKnativeResources(platform *operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object, error) { +func (j *JobServiceHandler) GenerateKnativeResources(platform *operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object, *corev1.Event, error) { broker := j.GetSourceBroker() sink := j.GetSink() resultObjs := []client.Object{} @@ -632,7 +638,12 @@ func (j *JobServiceHandler) GenerateKnativeResources(platform *operatorapi.Sonat namespace = platform.Namespace } if err := knative.ValidateBroker(brokerName, namespace); err != nil { - return nil, err + event := &corev1.Event{ + Type: corev1.EventTypeWarning, + Reason: WaitingKnativeEventing, + Message: err.Error(), + } + return nil, event, err } jobCreateTrigger := &eventingv1.Trigger{ ObjectMeta: metav1.ObjectMeta{ @@ -712,7 +723,7 @@ func (j *JobServiceHandler) GenerateKnativeResources(platform *operatorapi.Sonat } resultObjs = append(resultObjs, sinkBinding) } - return resultObjs, nil + return resultObjs, nil, nil } func (j *JobServiceHandler) CheckKSinkInjected() (bool, error) { diff --git a/internal/controller/platform/warm.go b/internal/controller/platform/warm.go index b76ff5f6a..b95155076 100644 --- a/internal/controller/platform/warm.go +++ b/internal/controller/platform/warm.go @@ -54,7 +54,7 @@ func (action *warmAction) CanHandle(platform *operatorapi.SonataFlowPlatform) bo return platform.Status.IsWarming() } -func (action *warmAction) Handle(ctx context.Context, platform *operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, error) { +func (action *warmAction) Handle(ctx context.Context, platform *operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, *corev1.Event, error) { // Check Kaniko warmer pod status pod := corev1.Pod{ TypeMeta: metav1.TypeMeta{ @@ -69,19 +69,19 @@ func (action *warmAction) Handle(ctx context.Context, platform *operatorapi.Sona err := action.reader.Get(ctx, types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, &pod) if err != nil { - return nil, err + return nil, nil, err } switch pod.Status.Phase { case corev1.PodSucceeded: klog.V(log.D).InfoS("Kaniko cache successfully warmed up") platform.Status.Manager().MarkTrueWithReason(api.SucceedConditionType, operatorapi.PlatformWarmingReason, "Kaniko cache successfully warmed up") - return platform, nil + return platform, nil, nil case corev1.PodFailed: - return nil, errors.New("failed to warm up Kaniko cache") + return nil, nil, errors.New("failed to warm up Kaniko cache") default: klog.V(log.I).InfoS("Waiting for Kaniko cache to warm up...") // Requeue - return nil, nil + return nil, nil, nil } } diff --git a/internal/controller/sonataflowplatform_controller.go b/internal/controller/sonataflowplatform_controller.go index 75f3d1c5e..3c9f8abfe 100644 --- a/internal/controller/sonataflowplatform_controller.go +++ b/internal/controller/sonataflowplatform_controller.go @@ -27,8 +27,6 @@ import ( "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/knative" sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" - "k8s.io/klog/v2" - "github.com/apache/incubator-kie-kogito-serverless-operator/api" operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" clientr "github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client" @@ -45,6 +43,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" ctrlrun "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -144,7 +143,10 @@ func (r *SonataFlowPlatformReconciler) Reconcile(ctx context.Context, req reconc klog.V(log.I).InfoS("Invoking action", "Name", a.Name()) - target, err = a.Handle(ctx, target) + target, event, err := a.Handle(ctx, target) + if event != nil { + r.Recorder.Event(&instance, event.Type, event.Reason, event.Message) + } if err != nil { if target != nil { target.Status.Manager().MarkFalse(api.SucceedConditionType, operatorapi.PlatformFailureReason, err.Error())