From 5153141133cf4ddaa64ca3393a316423cd49ddf5 Mon Sep 17 00:00:00 2001 From: jianrongzhang89 Date: Tue, 21 May 2024 19:52:14 -0400 Subject: [PATCH] [issue-368] knative integration with DataIndex and JobService: update unit test cases --- controllers/knative/knative.go | 76 ++++++- controllers/platform/k8s.go | 4 +- controllers/platform/services/services.go | 6 +- controllers/profiles/common/ensurer.go | 3 + .../profiles/common/knative_eventing.go | 58 ------ .../profiles/common/mutate_visitors.go | 24 ++- .../profiles/common/object_creators_test.go | 155 ++++++++++++-- .../common/properties/managed_test.go | 25 ++- controllers/profiles/dev/profile_dev_test.go | 16 +- .../profiles/gitops/profile_gitops_test.go | 11 +- .../preview/deployment_handler_test.go | 5 + .../profiles/preview/profile_preview_test.go | 14 +- .../sonataflowplatform_controller_test.go | 191 ++++++++++++++---- ...ow.org_v1alpha08_sonataflow_vet_event.yaml | 13 ++ ...alpha08_sonataflowplatform_withBroker.yaml | 48 +++++ test/yaml.go | 23 +++ utils/kubernetes/volumes.go | 25 ++- 17 files changed, 560 insertions(+), 137 deletions(-) create mode 100644 test/testdata/sonataflow.org_v1alpha08_sonataflowplatform_withBroker.yaml diff --git a/controllers/knative/knative.go b/controllers/knative/knative.go index 208ac86b7..d340e922e 100644 --- a/controllers/knative/knative.go +++ b/controllers/knative/knative.go @@ -25,6 +25,9 @@ import ( operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" "github.com/apache/incubator-kie-kogito-serverless-operator/utils" + kubeutil "github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/discovery" "k8s.io/client-go/rest" @@ -35,6 +38,7 @@ import ( var servingClient clientservingv1.ServingV1Interface var eventingClient clienteventingv1.EventingV1Interface +var discoveryClient discovery.DiscoveryInterface type Availability struct { Eventing bool @@ -42,7 +46,9 @@ type Availability struct { } const ( - KSink = "K_SINK" + KSink = "K_SINK" + KnativeBundleVolume = "kne-bundle-volume" + KCeOverRides = "K_CE_OVERRIDES" ) func GetKnativeServingClient(cfg *rest.Config) (clientservingv1.ServingV1Interface, error) { @@ -75,8 +81,23 @@ func NewKnativeEventingClient(cfg *rest.Config) (*clienteventingv1.EventingV1Cli return clienteventingv1.NewForConfig(cfg) } +func GetDisvoveryClient(cfg *rest.Config) (discovery.DiscoveryInterface, error) { + if discoveryClient == nil { + if cli, err := discovery.NewDiscoveryClientForConfig(cfg); err != nil { + return nil, err + } else { + discoveryClient = cli + } + } + return discoveryClient, nil +} + +func SetDisvoveryClient(cli discovery.DiscoveryInterface) { + discoveryClient = cli +} + func GetKnativeAvailability(cfg *rest.Config) (*Availability, error) { - if cli, err := discovery.NewDiscoveryClientForConfig(cfg); err != nil { + if cli, err := GetDisvoveryClient(cfg); err != nil { return nil, err } else { apiList, err := cli.ServerGroups() @@ -122,3 +143,54 @@ func GetWorkflowSink(workflow *operatorapi.SonataFlow, pl *operatorapi.SonataFlo func IsKnativeBroker(kRef *duckv1.KReference) bool { return kRef.APIVersion == "eventing.knative.dev/v1" && kRef.Kind == "Broker" } + +func SaveKnativeData(dest *corev1.PodSpec, source *corev1.PodSpec) { + for _, volume := range source.Volumes { + if volume.Name == KnativeBundleVolume { + kubeutil.AddOrReplaceVolume(dest, volume) + break + } + } + for _, container := range source.Containers { + for ind, destContainer := range dest.Containers { + if destContainer.Name == container.Name { + for _, mount := range container.VolumeMounts { + if mount.Name == KnativeBundleVolume { + kubeutil.AddOrReplaceVolumeMount(ind, dest, mount) + break + } + } + for _, env := range container.Env { + if env.Name == KSink || env.Name == KCeOverRides { + kubeutil.AddOrReplaceEnvVar(ind, dest, env) + } + } + } + } + } +} + +func moveKnativeVolumeToEnd(vols []corev1.Volume) { + for i := 0; i < len(vols)-1; i++ { + if vols[i].Name == KnativeBundleVolume { + vols[i], vols[i+1] = vols[i+1], vols[i] + } + } +} + +func moveKnativeVolumeMountToEnd(mounts []corev1.VolumeMount) { + for i := 0; i < len(mounts)-1; i++ { + if mounts[i].Name == KnativeBundleVolume { + mounts[i], mounts[i+1] = mounts[i+1], mounts[i] + } + } +} + +// Knative Sinkbinding injects K_SINK env, a volume and volumn mount. The volume and volume mount +// must be in the end of the array to avoid repeadly restarting of the workflow pod +func RestoreKnativeVolumeAndVolumeMount(deployment *appsv1.Deployment) { + moveKnativeVolumeToEnd(deployment.Spec.Template.Spec.Volumes) + for i := 0; i < len(deployment.Spec.Template.Spec.Containers); i++ { + moveKnativeVolumeMountToEnd(deployment.Spec.Template.Spec.Containers[i].VolumeMounts) + } +} diff --git a/controllers/platform/k8s.go b/controllers/platform/k8s.go index 4a1b291d6..8c2db430f 100644 --- a/controllers/platform/k8s.go +++ b/controllers/platform/k8s.go @@ -24,6 +24,7 @@ import ( operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" "github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/knative" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/platform/services" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/constants" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/variables" @@ -186,7 +187,8 @@ func createOrUpdateDeployment(ctx context.Context, client client.Client, platfor // Create or Update the deployment if op, err := controllerutil.CreateOrUpdate(ctx, client, serviceDeployment, func() error { - err := mergo.Merge(&(serviceDeployment.Spec), serviceDeploymentSpec) + knative.SaveKnativeData(&serviceDeploymentSpec.Template.Spec, &serviceDeployment.Spec.Template.Spec) + err := mergo.Merge(&(serviceDeployment.Spec), serviceDeploymentSpec, mergo.WithOverride) if err != nil { return err } diff --git a/controllers/platform/services/services.go b/controllers/platform/services/services.go index 1ee7c007a..5fb5145c0 100644 --- a/controllers/platform/services/services.go +++ b/controllers/platform/services/services.go @@ -623,7 +623,7 @@ func (d JobServiceHandler) GetSourceBroker() *duckv1.Destination { func (d JobServiceHandler) GetSink() *duckv1.Destination { if d.platform.Spec.Services.JobService.Sink != nil { - return d.platform.Spec.Services.JobService.Source + return d.platform.Spec.Services.JobService.Sink } return GetPlatformBroker(d.platform) } @@ -638,7 +638,7 @@ func (j *JobServiceHandler) GenerateKnativeResources(platform *operatorapi.Sonat brokerName := broker.Ref.Name jobCreateTrigger := &eventingv1.Trigger{ ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-job-service-create-job-trigger", platform.Name), + Name: fmt.Sprintf("%s-jobs-service-create-job-trigger", platform.Name), Namespace: namespace, Labels: lbl, }, @@ -665,7 +665,7 @@ func (j *JobServiceHandler) GenerateKnativeResources(platform *operatorapi.Sonat resultObjs = append(resultObjs, jobCreateTrigger) jobDeleteTrigger := &eventingv1.Trigger{ ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-job-service-delete-job-trigger", platform.Name), + Name: fmt.Sprintf("%s-jobs-service-delete-job-trigger", platform.Name), Namespace: namespace, Labels: lbl, }, diff --git a/controllers/profiles/common/ensurer.go b/controllers/profiles/common/ensurer.go index 0428d639d..eeac823b3 100644 --- a/controllers/profiles/common/ensurer.go +++ b/controllers/profiles/common/ensurer.go @@ -98,6 +98,9 @@ func (d *defaultObjectEnsurerWithPlatform) Ensure(ctx context.Context, workflow if err != nil { return nil, result, err } + if object == nil { + return nil, result, nil + } if result, err = controllerutil.CreateOrPatch(ctx, d.c, object, func() error { for _, v := range visitors { diff --git a/controllers/profiles/common/knative_eventing.go b/controllers/profiles/common/knative_eventing.go index 34684796e..eb80884cc 100644 --- a/controllers/profiles/common/knative_eventing.go +++ b/controllers/profiles/common/knative_eventing.go @@ -20,16 +20,10 @@ import ( operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/knative" "github.com/apache/incubator-kie-kogito-serverless-operator/log" - appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" ) -const ( - KnativeBundleVolume = "kne-bundle-volume" -) - var _ KnativeEventingHandler = &knativeObjectManager{} type knativeObjectManager struct { @@ -81,55 +75,3 @@ func (k knativeObjectManager) Ensure(ctx context.Context, workflow *operatorapi. } return objs, nil } - -func moveKnativeVolumeToEnd(vols []corev1.Volume) { - for i := 0; i < len(vols)-1; i++ { - if vols[i].Name == KnativeBundleVolume { - vols[i], vols[i+1] = vols[i+1], vols[i] - } - } -} - -func moveKnativeVolumeMountToEnd(mounts []corev1.VolumeMount) { - for i := 0; i < len(mounts)-1; i++ { - if mounts[i].Name == KnativeBundleVolume { - mounts[i], mounts[i+1] = mounts[i+1], mounts[i] - } - } -} - -// Knative Sinkbinding injects K_SINK env, a volume and volumn mount. The volume and volume mount -// must be in the end of the array to avoid repeadly restarting of the workflow pod -func restoreKnativeVolumeAndVolumeMount(deployment *appsv1.Deployment) { - moveKnativeVolumeToEnd(deployment.Spec.Template.Spec.Volumes) - for i := 0; i < len(deployment.Spec.Template.Spec.Containers); i++ { - moveKnativeVolumeMountToEnd(deployment.Spec.Template.Spec.Containers[i].VolumeMounts) - } -} - -func preserveKnativeVolumeMount(object *appsv1.Deployment) { - var kneVol *corev1.Volume = nil - for _, v := range object.Spec.Template.Spec.Volumes { - if v.Name == KnativeBundleVolume { - kneVol = &v - } - } - if kneVol != nil { - object.Spec.Template.Spec.Volumes = []corev1.Volume{*kneVol} - } else { - object.Spec.Template.Spec.Volumes = nil - } - for i := range object.Spec.Template.Spec.Containers { - var kneVolMount *corev1.VolumeMount = nil - for _, mount := range object.Spec.Template.Spec.Containers[i].VolumeMounts { - if mount.Name == KnativeBundleVolume { - kneVolMount = &mount - } - } - if kneVolMount == nil { - object.Spec.Template.Spec.Containers[i].VolumeMounts = nil - } else { - object.Spec.Template.Spec.Containers[i].VolumeMounts = []corev1.VolumeMount{*kneVolMount} - } - } -} diff --git a/controllers/profiles/common/mutate_visitors.go b/controllers/profiles/common/mutate_visitors.go index f16ff5678..e76600a46 100644 --- a/controllers/profiles/common/mutate_visitors.go +++ b/controllers/profiles/common/mutate_visitors.go @@ -22,18 +22,18 @@ package common import ( "context" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/discovery" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/knative" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/properties" + kubeutil "github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes" + "github.com/apache/incubator-kie-kogito-serverless-operator/workflowproj" "github.com/imdario/mergo" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" servingv1 "knative.dev/serving/pkg/apis/serving/v1" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - - operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" - kubeutil "github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes" - "github.com/apache/incubator-kie-kogito-serverless-operator/workflowproj" ) // ImageDeploymentMutateVisitor creates a visitor that mutates a vanilla Kubernetes Deployment to apply the given image in the DefaultContainerName container @@ -99,10 +99,16 @@ func EnsureDeployment(original *appsv1.Deployment, object *appsv1.Deployment) er object.Finalizers = original.Finalizers // Clean up the volumes, they are inherited from original, additional are added by other visitors - // However, the knative mount path must be preserved - preserveKnativeVolumeMount(object) + // However, the knative data (voulmes, volumes mounts) must be preserved + knative.SaveKnativeData(&original.Spec.Template.Spec, &object.Spec.Template.Spec) + // Clean up volumes and volume mounts + object.Spec.Template.Spec.Volumes = nil + for i := range object.Spec.Template.Spec.Containers { + object.Spec.Template.Spec.Containers[i].VolumeMounts = nil + } + // we do a merge to not keep changing the spec since k8s will set default values to the podSpec - return mergo.Merge(&object.Spec, original.Spec) + return mergo.Merge(&object.Spec.Template.Spec, original.Spec.Template.Spec, mergo.WithOverride) } // KServiceMutateVisitor guarantees the state of the default Knative Service object @@ -132,7 +138,7 @@ func EnsureKService(original *servingv1.Service, object *servingv1.Service) erro } // we do a merge to not keep changing the spec since k8s will set default values to the podSpec - return mergo.Merge(&object.Spec.Template.Spec.PodSpec, original.Spec.Template.Spec.PodSpec, mergo.WithOverride) + return mergo.Merge(&object.Spec.Template.Spec.PodSpec, original.Spec.Template.Spec.PodSpec /*, mergo.WithOverride*/) } func ServiceMutateVisitor(workflow *operatorapi.SonataFlow) MutateVisitor { @@ -188,7 +194,7 @@ func RolloutDeploymentIfCMChangedMutateVisitor(workflow *operatorapi.SonataFlow, return func(object client.Object) controllerutil.MutateFn { return func() error { deployment := object.(*appsv1.Deployment) - restoreKnativeVolumeAndVolumeMount(deployment) + knative.RestoreKnativeVolumeAndVolumeMount(deployment) err := kubeutil.AnnotateDeploymentConfigChecksum(workflow, deployment, userPropsCM, managedPropsCM) return err } diff --git a/controllers/profiles/common/object_creators_test.go b/controllers/profiles/common/object_creators_test.go index 1969720bb..f1f13d241 100644 --- a/controllers/profiles/common/object_creators_test.go +++ b/controllers/profiles/common/object_creators_test.go @@ -23,13 +23,14 @@ import ( "context" "testing" - sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" - "github.com/magiconair/properties" "github.com/stretchr/testify/assert" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/apache/incubator-kie-kogito-serverless-operator/utils" kubeutil "github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes" @@ -75,6 +76,8 @@ func Test_ensureWorkflowPropertiesConfigMapMutator(t *testing.T) { func Test_ensureWorkflowPropertiesConfigMapMutator_DollarReplacement(t *testing.T) { workflow := test.GetBaseSonataFlowWithDevProfile(t.Name()) platform := test.GetBasePlatformInReadyPhase(workflow.Namespace) + //knative.SetDisvoveryClient(createFakeDiscoveryClient()) + managedProps, _ := ManagedPropsConfigMapCreator(workflow, platform) managedProps.SetName(workflow.Name) managedProps.SetNamespace(workflow.Namespace) @@ -145,7 +148,7 @@ func TestMergePodSpec(t *testing.T) { assert.Len(t, flowContainer.VolumeMounts, 1) } -func TestMergePodSpec_OverrideContainers(t *testing.T) { +func TestMergePodSpecOverrideContainers(t *testing.T) { workflow := test.GetBaseSonataFlow(t.Name()) workflow.Spec.PodTemplate = v1alpha08.FlowPodTemplateSpec{ PodSpec: v1alpha08.PodSpec{ @@ -177,11 +180,13 @@ func TestMergePodSpec_OverrideContainers(t *testing.T) { assert.Empty(t, flowContainer.Env) } -func Test_ensureWorkflowSinkBindingIsCreated(t *testing.T) { +func TestEnsureWorkflowSinkBindingWithWorkflowSinkIsCreated(t *testing.T) { workflow := test.GetVetEventSonataFlow(t.Name()) plf := test.GetBasePlatform() //On Kubernetes we want the service exposed in Dev with NodePort - sinkBinding, _ := SinkBindingCreator(workflow, plf) + sinkBinding, err := SinkBindingCreator(workflow, plf) + assert.NoError(t, err) + assert.NotNil(t, sinkBinding) sinkBinding.SetUID("1") sinkBinding.SetResourceVersion("1") @@ -191,23 +196,143 @@ func Test_ensureWorkflowSinkBindingIsCreated(t *testing.T) { assert.NotNil(t, reflectSinkBinding.Spec) assert.NotEmpty(t, reflectSinkBinding.Spec.Sink) assert.Equal(t, reflectSinkBinding.Spec.Sink.Ref.Kind, "Broker") + assert.Equal(t, reflectSinkBinding.Spec.Sink.Ref.Name, "default") assert.NotNil(t, reflectSinkBinding.GetLabels()) - assert.Equal(t, reflectSinkBinding.ObjectMeta.Labels, map[string]string{"app": "vet", "sonataflow.org/workflow-app": "vet"}) + assert.Equal(t, reflectSinkBinding.ObjectMeta.Labels, map[string]string{"app": "vet", + "sonataflow.org/workflow-app": "vet", + "sonataflow.org/workflow-namespace": workflow.Namespace}) } -func Test_ensureWorkflowTriggersAreCreated(t *testing.T) { +func TestEnsureWorkflowSinkBindingWithPlatformBrokerIsCreated(t *testing.T) { workflow := test.GetVetEventSonataFlow(t.Name()) - plf := test.GetBasePlatform() - //On Kubernetes we want the service exposed in Dev with NodePort - triggers, _ := TriggersCreator(workflow, plf) + workflow.Spec.Sink = nil + workflow.Spec.Sources = nil + plf := test.GetBasePlatformWithBroker() + sinkBinding, err := SinkBindingCreator(workflow, plf) + assert.NoError(t, err) + assert.NotNil(t, sinkBinding) + sinkBinding.SetUID("1") + sinkBinding.SetResourceVersion("1") + + reflectSinkBinding := sinkBinding.(*sourcesv1.SinkBinding) + + assert.NotNil(t, reflectSinkBinding) + assert.NotNil(t, reflectSinkBinding.Spec) + assert.NotEmpty(t, reflectSinkBinding.Spec.Sink) + assert.Equal(t, reflectSinkBinding.Spec.Sink.Ref.Kind, "Broker") + assert.Equal(t, reflectSinkBinding.Spec.Sink.Ref.Name, "default") + assert.NotNil(t, reflectSinkBinding.GetLabels()) + assert.Equal(t, reflectSinkBinding.ObjectMeta.Labels, map[string]string{"app": "vet", + "sonataflow.org/workflow-app": "vet", + "sonataflow.org/workflow-namespace": workflow.Namespace}) +} + +func TestEnsureWorkflowSinkBindingWithoutBrokerAreNotCreated(t *testing.T) { + workflow := test.GetVetEventSonataFlow(t.Name()) + workflow.Spec.Sink = nil + workflow.Spec.Sources = nil + plf := test.GetBasePlatformWithBroker() + plf.Spec.Eventing = nil // No broker configured in the platform, but data index and jobs service are enabled + sinkBinding, err := SinkBindingCreator(workflow, plf) + assert.Error(t, err) + assert.Contains(t, err.Error(), "a sink in the SonataFlow vet or broker in the SonataFlowPlatform sonataflow-platform should be configured when DataIndex or JobService is enabled") + assert.Nil(t, sinkBinding) +} + +func getTrigger(name string, objs []client.Object) *eventingv1.Trigger { + for _, obj := range objs { + if trigger, ok := obj.(*eventingv1.Trigger); ok { + if trigger.Name == name { + return trigger + } + } + } + return nil +} +func TestEnsureWorkflowTriggersWithPlatformBrokerAreCreated(t *testing.T) { + workflow := test.GetVetEventSonataFlow(t.Name()) + workflow.Spec.Sink = nil + workflow.Spec.Sources = nil + plf := test.GetBasePlatformWithBroker() + plf.Namespace = "platform-namespace" + plf.Spec.Eventing.Broker.Ref.Namespace = plf.Namespace + + triggers, err := TriggersCreator(workflow, plf) + assert.NoError(t, err) assert.NotEmpty(t, triggers) assert.Len(t, triggers, 2) - for _, trigger := range triggers { - assert.Contains(t, []string{"vet-vetappointmentrequestreceived-trigger", "vet-vetappointmentinfo-trigger"}, trigger.GetName()) - assert.NotNil(t, trigger.GetLabels()) - assert.Equal(t, trigger.GetLabels(), map[string]string{"app": "vet", "sonataflow.org/workflow-app": "vet"}) - } + //Check the 1st trigger + trigger := getTrigger("vet-vetappointmentrequestreceived-trigger", triggers) + assert.NotNil(t, trigger) + assert.NotNil(t, trigger.GetLabels()) + assert.Equal(t, trigger.GetLabels(), map[string]string{"app": "vet", + "sonataflow.org/workflow-app": "vet", + "sonataflow.org/workflow-namespace": workflow.Namespace}) + assert.Equal(t, trigger.Namespace, plf.Namespace) //trigger should be in the platform namespace + assert.Equal(t, trigger.Spec.Broker, "default") + assert.NotNil(t, trigger.Spec.Filter) + assert.Len(t, trigger.Spec.Filter.Attributes, 1) + assert.Equal(t, trigger.Spec.Filter.Attributes["type"], "events.vet.appointments.request") + //Check the 2nd trigger + trigger = getTrigger("vet-vetappointmentinfo-trigger", triggers) + assert.NotNil(t, trigger) + assert.NotNil(t, trigger.GetLabels()) + assert.Equal(t, trigger.GetLabels(), map[string]string{"app": "vet", + "sonataflow.org/workflow-app": "vet", + "sonataflow.org/workflow-namespace": workflow.Namespace}) + assert.Equal(t, trigger.Namespace, plf.Namespace) //trigger should be in the platform namespace + assert.Equal(t, trigger.Spec.Broker, "default") + assert.NotNil(t, trigger.Spec.Filter) + assert.Len(t, trigger.Spec.Filter.Attributes, 1) + assert.Equal(t, trigger.Spec.Filter.Attributes["type"], "events.vet.appointments") +} + +func TestEnsureWorkflowTriggersWithWorkflowBrokerAreCreated(t *testing.T) { + workflow := test.GetVetEventSonataFlow(t.Name()) + workflow.Spec.Sources[0].Destination.Ref.Namespace = workflow.Namespace + workflow.Spec.Sources[1].Destination.Ref.Namespace = workflow.Namespace + plf := test.GetBasePlatform() // No broker defined in the platform + triggers, err := TriggersCreator(workflow, plf) + assert.NoError(t, err) + assert.NotEmpty(t, triggers) + assert.Len(t, triggers, 2) + //Check the 1st trigger + trigger := getTrigger("vet-vetappointmentrequestreceived-trigger", triggers) + assert.NotNil(t, trigger) + assert.NotNil(t, trigger.GetLabels()) + assert.Equal(t, trigger.GetLabels(), map[string]string{"app": "vet", + "sonataflow.org/workflow-app": "vet", + "sonataflow.org/workflow-namespace": workflow.Namespace}) + assert.Equal(t, trigger.Namespace, workflow.Namespace) //trigger should be in the workflow namespace + assert.Equal(t, trigger.Spec.Broker, "broker-appointments-request") + assert.NotNil(t, trigger.Spec.Filter) + assert.Len(t, trigger.Spec.Filter.Attributes, 1) + assert.Equal(t, trigger.Spec.Filter.Attributes["type"], "events.vet.appointments.request") + //Check the 2nd trigger + trigger = getTrigger("vet-vetappointmentinfo-trigger", triggers) + assert.NotNil(t, trigger) + assert.NotNil(t, trigger.GetLabels()) + assert.Equal(t, trigger.GetLabels(), map[string]string{"app": "vet", + "sonataflow.org/workflow-app": "vet", + "sonataflow.org/workflow-namespace": workflow.Namespace}) + assert.Equal(t, trigger.Namespace, workflow.Namespace) //trigger should be in the workflow namespace + assert.Equal(t, trigger.Spec.Broker, "broker-appointments") + assert.NotNil(t, trigger.Spec.Filter) + assert.Len(t, trigger.Spec.Filter.Attributes, 1) + assert.Equal(t, trigger.Spec.Filter.Attributes["type"], "events.vet.appointments") +} + +func TestEnsureWorkflowTriggersWithoutBrokerAreNotCreated(t *testing.T) { + workflow := test.GetVetEventSonataFlow(t.Name()) + workflow.Spec.Sink = nil + workflow.Spec.Sources = nil + plf := test.GetBasePlatform() + + triggers, err := TriggersCreator(workflow, plf) + assert.Error(t, err) + assert.Contains(t, err.Error(), "no broker configured for eventType events.vet.appointments in SonataFlow vet") + assert.Nil(t, triggers) } func TestMergePodSpec_WithPostgreSQL_and_JDBC_URL_field(t *testing.T) { diff --git a/controllers/profiles/common/properties/managed_test.go b/controllers/profiles/common/properties/managed_test.go index 757041bf3..6e9ee05e7 100644 --- a/controllers/profiles/common/properties/managed_test.go +++ b/controllers/profiles/common/properties/managed_test.go @@ -111,7 +111,8 @@ func (c *mockCatalogService) Query(ctx context.Context, uri discovery.ResourceUr func Test_appPropertyHandler_WithKogitoServiceUrl(t *testing.T) { workflow := test.GetBaseSonataFlow("default") - props, err := ApplicationManagedProperties(workflow, nil) + platform := test.GetBasePlatform() + props, err := ApplicationManagedProperties(workflow, platform) assert.NoError(t, err) assert.Contains(t, props, constants.KogitoServiceURLProperty) assert.Contains(t, props, "http://"+workflow.Name+"."+workflow.Namespace) @@ -121,11 +122,12 @@ func Test_appPropertyHandler_WithUserPropertiesWithNoUserOverrides(t *testing.T) //just add some user provided properties, no overrides. userProperties := "property1=value1\nproperty2=value2" workflow := test.GetBaseSonataFlow("default") - props, err := NewManagedPropertyHandler(workflow, nil) + platform := test.GetBasePlatform() + props, err := NewManagedPropertyHandler(workflow, platform) assert.NoError(t, err) generatedProps, propsErr := properties.LoadString(props.WithUserProperties(userProperties).Build()) assert.NoError(t, propsErr) - assert.Equal(t, 7, len(generatedProps.Keys())) + assert.Equal(t, 12, len(generatedProps.Keys())) assert.NotContains(t, "property1", generatedProps.Keys()) assert.NotContains(t, "property2", generatedProps.Keys()) assert.Equal(t, "http://greeting.default", generatedProps.GetString("kogito.service.url", "")) @@ -134,6 +136,11 @@ func Test_appPropertyHandler_WithUserPropertiesWithNoUserOverrides(t *testing.T) assert.Equal(t, "false", generatedProps.GetString("quarkus.devservices.enabled", "")) assert.Equal(t, "false", generatedProps.GetString("quarkus.kogito.devservices.enabled", "")) assert.Equal(t, "false", generatedProps.GetString(constants.KogitoUserTasksEventsEnabled, "")) + assert.Equal(t, "false", generatedProps.GetString(constants.KogitoProcessDefinitionsEventsEnabled, "")) + assert.Equal(t, "false", generatedProps.GetString(constants.KogitoProcessInstancesEventsEnabled, "")) + assert.Equal(t, "quarkus-http", generatedProps.GetString("mp.messaging.outgoing.kogito-job-service-job-request-events.connector", "")) + assert.Equal(t, "http://localhost/v2/jobs/events", generatedProps.GetString("mp.messaging.outgoing.kogito-job-service-job-request-events.url", "")) + assert.Equal(t, "false", generatedProps.GetString("org.kie.kogito.addons.knative.eventing.health-enabled", "")) } func Test_appPropertyHandler_WithUserPropertiesWithServiceDiscovery(t *testing.T) { @@ -149,7 +156,8 @@ func Test_appPropertyHandler_WithUserPropertiesWithServiceDiscovery(t *testing.T userProperties = userProperties + "broker2=${knative:brokers.v1.eventing.knative.dev/my-kn-broker2}\n" workflow := test.GetBaseSonataFlow(defaultNamespace) - props, err := NewManagedPropertyHandler(workflow, nil) + platform := test.GetBasePlatform() + props, err := NewManagedPropertyHandler(workflow, platform) assert.NoError(t, err) generatedProps, propsErr := properties.LoadString(props. WithUserProperties(userProperties). @@ -157,7 +165,7 @@ func Test_appPropertyHandler_WithUserPropertiesWithServiceDiscovery(t *testing.T Build()) generatedProps.DisableExpansion = true assert.NoError(t, propsErr) - assert.Equal(t, 21, len(generatedProps.Keys())) + assert.Equal(t, 26, len(generatedProps.Keys())) assert.NotContains(t, "property1", generatedProps.Keys()) assert.NotContains(t, "property2", generatedProps.Keys()) assertHasProperty(t, generatedProps, "service1", myService1Address) @@ -184,6 +192,13 @@ func Test_appPropertyHandler_WithUserPropertiesWithServiceDiscovery(t *testing.T assertHasProperty(t, generatedProps, "quarkus.devservices.enabled", "false") assertHasProperty(t, generatedProps, "quarkus.kogito.devservices.enabled", "false") assertHasProperty(t, generatedProps, constants.KogitoUserTasksEventsEnabled, "false") + + assertHasProperty(t, generatedProps, "org.kie.kogito.addons.knative.eventing.health-enabled", "false") + assertHasProperty(t, generatedProps, "kogito.events.processdefinitions.enabled", "false") + assertHasProperty(t, generatedProps, "kogito.events.processinstances.enabled", "false") + assertHasProperty(t, generatedProps, "kogito.events.usertasks.enabled", "false") + assertHasProperty(t, generatedProps, "mp.messaging.outgoing.kogito-job-service-job-request-events.connector", "quarkus-http") + assertHasProperty(t, generatedProps, "mp.messaging.outgoing.kogito-job-service-job-request-events.url", "http://localhost/v2/jobs/events") } func Test_appPropertyHandler_WithServicesWithUserOverrides(t *testing.T) { diff --git a/controllers/profiles/dev/profile_dev_test.go b/controllers/profiles/dev/profile_dev_test.go index c5671faff..b470ec206 100644 --- a/controllers/profiles/dev/profile_dev_test.go +++ b/controllers/profiles/dev/profile_dev_test.go @@ -51,6 +51,7 @@ import ( clientruntime "sigs.k8s.io/controller-runtime/pkg/client" "github.com/apache/incubator-kie-kogito-serverless-operator/api" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/knative" "github.com/apache/incubator-kie-kogito-serverless-operator/test" ) @@ -59,6 +60,8 @@ func Test_OverrideStartupProbe(t *testing.T) { client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build() + knative.SetDisvoveryClient(test.CreateFakeKnativeDiscoveryClient()) + devReconciler := NewProfileReconciler(client, &rest.Config{}, test.NewFakeRecorder()) result, err := devReconciler.Reconcile(context.TODO(), workflow) @@ -85,7 +88,7 @@ func Test_recoverFromFailureNoDeployment(t *testing.T) { workflow.Status.Manager().MarkFalse(api.RunningConditionType, api.DeploymentFailureReason, "") client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build() - + knative.SetDisvoveryClient(test.CreateFakeKnativeDiscoveryClient()) reconciler := NewProfileReconciler(client, &rest.Config{}, test.NewFakeRecorder()) // we are in failed state and have no objects @@ -126,6 +129,7 @@ func Test_newDevProfile(t *testing.T) { workflow := test.GetBaseSonataFlow(t.Name()) client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build() + knative.SetDisvoveryClient(test.CreateFakeKnativeDiscoveryClient()) devReconciler := NewProfileReconciler(client, &rest.Config{}, test.NewFakeRecorder()) @@ -208,6 +212,8 @@ func Test_newDevProfile(t *testing.T) { func Test_devProfileImageDefaultsNoPlatform(t *testing.T) { workflow := test.GetBaseSonataFlowWithDevProfile(t.Name()) client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build() + knative.SetDisvoveryClient(test.CreateFakeKnativeDiscoveryClient()) + devReconciler := NewProfileReconciler(client, &rest.Config{}, test.NewFakeRecorder()) result, err := devReconciler.Reconcile(context.TODO(), workflow) @@ -225,6 +231,8 @@ func Test_devProfileWithImageSnapshotOverrideWithPlatform(t *testing.T) { platform := test.GetBasePlatformWithDevBaseImageInReadyPhase(workflow.Namespace) client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow, platform).WithStatusSubresource(workflow, platform).Build() + knative.SetDisvoveryClient(test.CreateFakeKnativeDiscoveryClient()) + devReconciler := NewProfileReconciler(client, &rest.Config{}, test.NewFakeRecorder()) result, err := devReconciler.Reconcile(context.TODO(), workflow) @@ -242,6 +250,8 @@ func Test_devProfileWithWPlatformWithoutDevBaseImageAndWithBaseImage(t *testing. platform := test.GetBasePlatformWithBaseImageInReadyPhase(workflow.Namespace) client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow, platform).WithStatusSubresource(workflow, platform).Build() + knative.SetDisvoveryClient(test.CreateFakeKnativeDiscoveryClient()) + devReconciler := NewProfileReconciler(client, &rest.Config{}, test.NewFakeRecorder()) result, err := devReconciler.Reconcile(context.TODO(), workflow) @@ -259,6 +269,8 @@ func Test_devProfileWithPlatformWithoutDevBaseImageAndWithoutBaseImage(t *testin platform := test.GetBasePlatformInReadyPhase(workflow.Namespace) client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow, platform).WithStatusSubresource(workflow, platform).Build() + knative.SetDisvoveryClient(test.CreateFakeKnativeDiscoveryClient()) + devReconciler := NewProfileReconciler(client, &rest.Config{}, test.NewFakeRecorder()) result, err := devReconciler.Reconcile(context.TODO(), workflow) @@ -277,6 +289,7 @@ func Test_newDevProfileWithExternalConfigMaps(t *testing.T) { operatorapi.ConfigMapWorkflowResource{ConfigMap: corev1.LocalObjectReference{Name: configmapName}, WorkflowPath: "routes"}) client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build() + knative.SetDisvoveryClient(test.CreateFakeKnativeDiscoveryClient()) devReconciler := NewProfileReconciler(client, &rest.Config{}, test.NewFakeRecorder()) @@ -391,6 +404,7 @@ func Test_VolumeWithCapitalizedPaths(t *testing.T) { workflow := test.GetSonataFlow(test.SonataFlowGreetingsWithStaticResourcesCR, t.Name()) client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow, configMap).WithStatusSubresource(workflow, configMap).Build() + knative.SetDisvoveryClient(test.CreateFakeKnativeDiscoveryClient()) devReconciler := NewProfileReconciler(client, &rest.Config{}, test.NewFakeRecorder()) diff --git a/controllers/profiles/gitops/profile_gitops_test.go b/controllers/profiles/gitops/profile_gitops_test.go index 051ea5772..274bc39a9 100644 --- a/controllers/profiles/gitops/profile_gitops_test.go +++ b/controllers/profiles/gitops/profile_gitops_test.go @@ -20,6 +20,7 @@ import ( "github.com/apache/incubator-kie-kogito-serverless-operator/api" operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/knative" "github.com/apache/incubator-kie-kogito-serverless-operator/test" "github.com/stretchr/testify/assert" appsv1 "k8s.io/api/apps/v1" @@ -38,6 +39,9 @@ func Test_Reconciler_ProdOps(t *testing.T) { client := test.NewSonataFlowClientBuilder(). WithRuntimeObjects(workflow). WithStatusSubresource(workflow, &operatorapi.SonataFlowBuild{}).Build() + + knative.SetDisvoveryClient(test.CreateFakeKnativeDiscoveryClient()) + result, err := NewProfileForOpsReconciler(client, &rest.Config{}, test.NewFakeRecorder()).Reconcile(context.TODO(), workflow) assert.NoError(t, err) @@ -64,5 +68,10 @@ func Test_Reconciler_ProdOps(t *testing.T) { assert.NotNil(t, deployment.ObjectMeta) assert.NotNil(t, deployment.ObjectMeta.Labels) - assert.Equal(t, deployment.ObjectMeta.Labels, map[string]string{"test": "test", "app": "simple", "sonataflow.org/workflow-app": "simple"}) + assert.Equal(t, deployment.ObjectMeta.Labels, map[string]string{ + "test": "test", + "app": "simple", + "sonataflow.org/workflow-app": "simple", + "sonataflow.org/workflow-namespace": workflow.Namespace, + }) } diff --git a/controllers/profiles/preview/deployment_handler_test.go b/controllers/profiles/preview/deployment_handler_test.go index 5faf98bb9..df26d67cf 100644 --- a/controllers/profiles/preview/deployment_handler_test.go +++ b/controllers/profiles/preview/deployment_handler_test.go @@ -20,6 +20,7 @@ import ( "github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata" "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/knative" "github.com/apache/incubator-kie-kogito-serverless-operator/test" "github.com/apache/incubator-kie-kogito-serverless-operator/workflowproj" "github.com/magiconair/properties" @@ -44,6 +45,7 @@ func Test_CheckDeploymentModelIsKnative(t *testing.T) { WithStatusSubresource(workflow). Build() stateSupport := fakeReconcilerSupport(cli) + knative.SetDisvoveryClient(test.CreateFakeKnativeDiscoveryClient()) handler := NewDeploymentReconciler(stateSupport, NewObjectEnsurers(stateSupport)) result, objects, err := handler.ensureObjects(context.TODO(), workflow, "") @@ -70,6 +72,7 @@ func Test_CheckPodTemplateChangesReflectDeployment(t *testing.T) { WithStatusSubresource(workflow). Build() stateSupport := fakeReconcilerSupport(client) + knative.SetDisvoveryClient(test.CreateFakeKnativeDiscoveryClient()) handler := NewDeploymentReconciler(stateSupport, NewObjectEnsurers(stateSupport)) result, objects, err := handler.Reconcile(context.TODO(), workflow) @@ -105,6 +108,7 @@ func Test_CheckDeploymentRolloutAfterCMChange(t *testing.T) { WithStatusSubresource(workflow). Build() stateSupport := fakeReconcilerSupport(client) + knative.SetDisvoveryClient(test.CreateFakeKnativeDiscoveryClient()) handler := NewDeploymentReconciler(stateSupport, NewObjectEnsurers(stateSupport)) result, objects, err := handler.Reconcile(context.TODO(), workflow) @@ -167,6 +171,7 @@ func Test_CheckDeploymentUnchangedAfterCMChangeOtherKeys(t *testing.T) { WithStatusSubresource(workflow). Build() stateSupport := fakeReconcilerSupport(client) + knative.SetDisvoveryClient(test.CreateFakeKnativeDiscoveryClient()) handler := NewDeploymentReconciler(stateSupport, NewObjectEnsurers(stateSupport)) result, objects, err := handler.Reconcile(context.TODO(), workflow) diff --git a/controllers/profiles/preview/profile_preview_test.go b/controllers/profiles/preview/profile_preview_test.go index 22146ccf6..322899822 100644 --- a/controllers/profiles/preview/profile_preview_test.go +++ b/controllers/profiles/preview/profile_preview_test.go @@ -26,6 +26,7 @@ import ( "github.com/apache/incubator-kie-kogito-serverless-operator/api" operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/knative" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common" "github.com/apache/incubator-kie-kogito-serverless-operator/test" "github.com/stretchr/testify/assert" @@ -49,6 +50,7 @@ func Test_Reconciler_ProdCustomPod(t *testing.T) { client := test.NewSonataFlowClientBuilder(). WithRuntimeObjects(workflow, build, platform). WithStatusSubresource(workflow, build, platform).Build() + knative.SetDisvoveryClient(test.CreateFakeKnativeDiscoveryClient()) _, err := NewProfileReconciler(client, &rest.Config{}, test.NewFakeRecorder()).Reconcile(context.TODO(), workflow) assert.NoError(t, err) @@ -63,7 +65,12 @@ func Test_Reconciler_ProdCustomPod(t *testing.T) { assert.Len(t, deployment.Spec.Template.Spec.Containers[0].VolumeMounts, 1) assert.NotNil(t, deployment.ObjectMeta) assert.NotNil(t, deployment.ObjectMeta.Labels) - assert.Equal(t, deployment.ObjectMeta.Labels, map[string]string{"test": "test", "app": "greeting", "sonataflow.org/workflow-app": "greeting"}) + assert.Equal(t, deployment.ObjectMeta.Labels, map[string]string{ + "test": "test", + "app": "greeting", + "sonataflow.org/workflow-app": "greeting", + "sonataflow.org/workflow-namespace": workflow.Namespace, + }) } func Test_reconcilerProdBuildConditions(t *testing.T) { @@ -72,7 +79,7 @@ func Test_reconcilerProdBuildConditions(t *testing.T) { client := test.NewSonataFlowClientBuilder(). WithRuntimeObjects(workflow, platform). WithStatusSubresource(workflow, platform, &operatorapi.SonataFlowBuild{}).Build() - + knative.SetDisvoveryClient(test.CreateFakeKnativeDiscoveryClient()) result, err := NewProfileReconciler(client, &rest.Config{}, test.NewFakeRecorder()).Reconcile(context.TODO(), workflow) assert.NoError(t, err) @@ -134,6 +141,7 @@ func Test_deployWorkflowReconciliationHandler_handleObjects(t *testing.T) { WithRuntimeObjects(workflow, platform, build). WithStatusSubresource(workflow, platform, build). Build() + knative.SetDisvoveryClient(test.CreateFakeKnativeDiscoveryClient()) handler := &deployWithBuildWorkflowState{ StateSupport: fakeReconcilerSupport(client), ensurers: NewObjectEnsurers(&common.StateSupport{C: client}), @@ -162,7 +170,7 @@ func Test_GenerationAnnotationCheck(t *testing.T) { client := test.NewSonataFlowClientBuilder(). WithRuntimeObjects(workflow, platform). WithStatusSubresource(workflow, platform, &operatorapi.SonataFlowBuild{}).Build() - + knative.SetDisvoveryClient(test.CreateFakeKnativeDiscoveryClient()) handler := &deployWithBuildWorkflowState{ StateSupport: fakeReconcilerSupport(client), ensurers: NewObjectEnsurers(&common.StateSupport{C: client}), diff --git a/controllers/sonataflowplatform_controller_test.go b/controllers/sonataflowplatform_controller_test.go index f1cec70c6..9848990bb 100644 --- a/controllers/sonataflowplatform_controller_test.go +++ b/controllers/sonataflowplatform_controller_test.go @@ -23,23 +23,22 @@ import ( "context" "testing" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" - sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" - v1 "knative.dev/pkg/apis/duck/v1" - "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/clusterplatform" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/knative" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/platform/services" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/constants" "github.com/apache/incubator-kie-kogito-serverless-operator/test" "github.com/stretchr/testify/assert" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" + duckv1 "knative.dev/pkg/apis/duck/v1" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -842,43 +841,20 @@ func TestSonataFlowPlatformController(t *testing.T) { assert.NotNil(t, ksp2.Status.ClusterPlatformRef) assert.Nil(t, ksp2.Status.ClusterPlatformRef.Services) }) - t.Run("verify that knative resources creation for job service and data index service is performed without error", func(t *testing.T) { + t.Run("verify that knative resources creation for job service and data index service with platform level broker is performed without error", func(t *testing.T) { namespace := t.Name() // Create a SonataFlowPlatform object with metadata and spec. - ksp := test.GetBasePlatformInReadyPhase(namespace) + ksp := test.GetBasePlatformWithBrokerInReadyPhase(namespace) brokerName := "default" - enabled := true broker := &eventingv1.Broker{ ObjectMeta: metav1.ObjectMeta{ Name: brokerName, Namespace: namespace, }, } - ksp.Spec.Eventing = &v1alpha08.PlatformEventingSpec{ - Broker: &v1.Destination{ - Ref: &v1.KReference{ - APIVersion: v1.SchemeGroupVersion.Version, - Kind: "Broker", - Namespace: namespace, - Name: brokerName, - }, - }, - } - ksp.Spec.Services = &v1alpha08.ServicesPlatformSpec{ - DataIndex: &v1alpha08.DataIndexServiceSpec{ - ServiceSpec: v1alpha08.ServiceSpec{ - Enabled: &enabled, - }, - }, - JobService: &v1alpha08.JobServiceServiceSpec{ - ServiceSpec: v1alpha08.ServiceSpec{ - Enabled: &enabled, - }, - }, - } - // Create a fake client to mock API calls. cl := test.NewKogitoClientBuilderWithOpenShift().WithRuntimeObjects(ksp, broker).WithStatusSubresource(ksp, broker).Build() + knative.SetDisvoveryClient(test.CreateFakeKnativeDiscoveryClient()) // Create a SonataFlowPlatformReconciler object with the scheme and fake client. r := &SonataFlowPlatformReconciler{cl, cl, cl.Scheme(), &rest.Config{}, &record.FakeRecorder{}} @@ -901,6 +877,10 @@ func TestSonataFlowPlatformController(t *testing.T) { assert.Equal(t, "quay.io/kiegroup", ksp.Spec.Build.Config.Registry.Address) assert.Equal(t, "regcred", ksp.Spec.Build.Config.Registry.Secret) assert.Equal(t, v1alpha08.OperatorBuildStrategy, ksp.Spec.Build.Config.BuildStrategy) + assert.NotNil(t, ksp.Spec.Eventing) + assert.NotNil(t, ksp.Spec.Eventing.Broker) + assert.NotNil(t, ksp.Spec.Eventing.Broker.Ref) + assert.Equal(t, ksp.Spec.Eventing.Broker.Ref.Name, brokerName) assert.NotNil(t, ksp.Spec.Services.DataIndex) assert.NotNil(t, ksp.Spec.Services.DataIndex.Enabled) assert.Equal(t, true, *ksp.Spec.Services.DataIndex.Enabled) @@ -913,15 +893,152 @@ func TestSonataFlowPlatformController(t *testing.T) { // Check Triggers trigger := &eventingv1.Trigger{} - assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: "jobs-service-create-job-trigger", Namespace: ksp.Namespace}, trigger)) - assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: "jobs-service-delete-job-trigger", Namespace: ksp.Namespace}, trigger)) - assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: "data-index-service-jobs-trigger", Namespace: ksp.Namespace}, trigger)) - assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: "data-index-service-processes-definition-trigger", Namespace: ksp.Namespace}, trigger)) - assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: "data-index-service-processes-instance-trigger", Namespace: ksp.Namespace}, trigger)) + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: "sonataflow-platform-jobs-service-create-job-trigger", Namespace: ksp.Namespace}, trigger)) + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: "sonataflow-platform-jobs-service-delete-job-trigger", Namespace: ksp.Namespace}, trigger)) + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: "sonataflow-platform-data-index-jobs-trigger", Namespace: ksp.Namespace}, trigger)) + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: "sonataflow-platform-data-index-process-definition-trigger", Namespace: ksp.Namespace}, trigger)) + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: "sonataflow-platform-data-index-process-error-trigger", Namespace: ksp.Namespace}, trigger)) + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: "sonataflow-platform-data-index-process-node-trigger", Namespace: ksp.Namespace}, trigger)) + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: "sonataflow-platform-data-index-process-sla-trigger", Namespace: ksp.Namespace}, trigger)) + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: "sonataflow-platform-data-index-process-state-trigger", Namespace: ksp.Namespace}, trigger)) + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: "sonataflow-platform-data-index-process-variable-trigger", Namespace: ksp.Namespace}, trigger)) // Check SinkBinding sinkBinding := &sourcesv1.SinkBinding{} assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: "jobs-service-sb", Namespace: ksp.Namespace}, sinkBinding)) }) + + t.Run("verify that knative resources creation for job service and data index service with services level brokers is performed without error", func(t *testing.T) { + namespace := t.Name() + // Create a SonataFlowPlatform object with metadata and spec. + ksp := test.GetBasePlatformWithBrokerInReadyPhase(namespace) + brokerName := "default" + brokerNameDataIndexSource := "broker-di-source" + brokerNameJobsServiceSource := "broker-jobs-source" + brokerNameJobsServiceSink := "broker-jobs-sink" + + broker := &eventingv1.Broker{ + ObjectMeta: metav1.ObjectMeta{ + Name: brokerName, + Namespace: namespace, + }, + } + brokerDataIndexSource := &eventingv1.Broker{ + ObjectMeta: metav1.ObjectMeta{ + Name: brokerNameDataIndexSource, + Namespace: namespace, + }, + } + brokerJobsServiceSource := &eventingv1.Broker{ + ObjectMeta: metav1.ObjectMeta{ + Name: brokerNameJobsServiceSource, + Namespace: namespace, + }, + } + brokerJobsServiceSink := &eventingv1.Broker{ + ObjectMeta: metav1.ObjectMeta{ + Name: brokerNameJobsServiceSink, + Namespace: namespace, + }, + } + ksp.Spec.Services.DataIndex.Source = &duckv1.Destination{ + Ref: &duckv1.KReference{ + Name: brokerNameDataIndexSource, + Namespace: namespace, + APIVersion: "eventing.knative.dev/v1", + Kind: "Broker", + }, + } + ksp.Spec.Services.JobService.Sink = &duckv1.Destination{ + Ref: &duckv1.KReference{ + Name: brokerNameJobsServiceSink, + Namespace: namespace, + APIVersion: "eventing.knative.dev/v1", + Kind: "Broker", + }, + } + ksp.Spec.Services.JobService.Source = &duckv1.Destination{ + Ref: &duckv1.KReference{ + Name: brokerNameJobsServiceSource, + Namespace: namespace, + APIVersion: "eventing.knative.dev/v1", + Kind: "Broker", + }, + } + + // Create a fake client to mock API calls. + cl := test.NewKogitoClientBuilderWithOpenShift().WithRuntimeObjects(ksp, broker, brokerDataIndexSource, brokerJobsServiceSource, brokerJobsServiceSink).WithStatusSubresource(ksp, broker, brokerDataIndexSource, brokerJobsServiceSource, brokerJobsServiceSink).Build() + knative.SetDisvoveryClient(test.CreateFakeKnativeDiscoveryClient()) + // Create a SonataFlowPlatformReconciler object with the scheme and fake client. + r := &SonataFlowPlatformReconciler{cl, cl, cl.Scheme(), &rest.Config{}, &record.FakeRecorder{}} + + // Mock request to simulate Reconcile() being called on an event for a + // watched resource . + req := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: ksp.Name, + Namespace: ksp.Namespace, + }, + } + _, err := r.Reconcile(context.TODO(), req) + if err != nil { + t.Fatalf("reconcile: (%v)", err) + } + + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: ksp.Name, Namespace: ksp.Namespace}, ksp)) + + // Perform some checks on the created CR + assert.Equal(t, "quay.io/kiegroup", ksp.Spec.Build.Config.Registry.Address) + assert.Equal(t, "regcred", ksp.Spec.Build.Config.Registry.Secret) + assert.Equal(t, v1alpha08.OperatorBuildStrategy, ksp.Spec.Build.Config.BuildStrategy) + assert.NotNil(t, ksp.Spec.Eventing) + assert.NotNil(t, ksp.Spec.Eventing.Broker) + assert.NotNil(t, ksp.Spec.Eventing.Broker.Ref) + assert.Equal(t, ksp.Spec.Eventing.Broker.Ref.Name, brokerName) + assert.NotNil(t, ksp.Spec.Services.DataIndex) + assert.NotNil(t, ksp.Spec.Services.DataIndex.Enabled) + assert.Equal(t, true, *ksp.Spec.Services.DataIndex.Enabled) + assert.NotNil(t, ksp.Spec.Services.DataIndex.Source) + assert.NotNil(t, ksp.Spec.Services.DataIndex.Source.Ref) + assert.Equal(t, ksp.Spec.Services.DataIndex.Source.Ref.Name, brokerNameDataIndexSource) + assert.NotNil(t, ksp.Spec.Services.JobService) + assert.NotNil(t, ksp.Spec.Services.JobService.Enabled) + assert.Equal(t, true, *ksp.Spec.Services.JobService.Enabled) + assert.NotNil(t, ksp.Spec.Services.JobService.Source) + assert.NotNil(t, ksp.Spec.Services.JobService.Source.Ref) + assert.Equal(t, ksp.Spec.Services.JobService.Source.Ref.Name, brokerNameJobsServiceSource) + assert.NotNil(t, ksp.Spec.Services.JobService.Sink) + assert.NotNil(t, ksp.Spec.Services.JobService.Sink.Ref) + assert.Equal(t, ksp.Spec.Services.JobService.Sink.Ref.Name, brokerNameJobsServiceSink) + assert.Equal(t, v1alpha08.PlatformClusterKubernetes, ksp.Status.Cluster) + assert.Equal(t, "", ksp.Status.GetTopLevelCondition().Reason) + + // Check Triggers to have the service level source used + trigger := &eventingv1.Trigger{} + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: "sonataflow-platform-jobs-service-create-job-trigger", Namespace: ksp.Namespace}, trigger)) + assert.Equal(t, trigger.Spec.Broker, brokerNameJobsServiceSource) + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: "sonataflow-platform-jobs-service-delete-job-trigger", Namespace: ksp.Namespace}, trigger)) + assert.Equal(t, trigger.Spec.Broker, brokerNameJobsServiceSource) + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: "sonataflow-platform-data-index-jobs-trigger", Namespace: ksp.Namespace}, trigger)) + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: "sonataflow-platform-data-index-process-definition-trigger", Namespace: ksp.Namespace}, trigger)) + assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource) + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: "sonataflow-platform-data-index-process-error-trigger", Namespace: ksp.Namespace}, trigger)) + assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource) + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: "sonataflow-platform-data-index-process-node-trigger", Namespace: ksp.Namespace}, trigger)) + assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource) + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: "sonataflow-platform-data-index-process-sla-trigger", Namespace: ksp.Namespace}, trigger)) + assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource) + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: "sonataflow-platform-data-index-process-state-trigger", Namespace: ksp.Namespace}, trigger)) + assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource) + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: "sonataflow-platform-data-index-process-variable-trigger", Namespace: ksp.Namespace}, trigger)) + assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource) + + // Check SinkBinding to have the sink level source used + sinkBinding := &sourcesv1.SinkBinding{} + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: "jobs-service-sb", Namespace: ksp.Namespace}, sinkBinding)) + assert.NotNil(t, sinkBinding.Spec.Sink) + assert.NotNil(t, sinkBinding.Spec.Sink.Ref) + assert.Equal(t, sinkBinding.Spec.Sink.Ref.Name, brokerNameJobsServiceSink) + }) } diff --git a/test/testdata/sonataflow.org_v1alpha08_sonataflow_vet_event.yaml b/test/testdata/sonataflow.org_v1alpha08_sonataflow_vet_event.yaml index 05e3a5b42..8636feb0e 100644 --- a/test/testdata/sonataflow.org_v1alpha08_sonataflow_vet_event.yaml +++ b/test/testdata/sonataflow.org_v1alpha08_sonataflow_vet_event.yaml @@ -29,6 +29,19 @@ spec: namespace: default apiVersion: eventing.knative.dev/v1 kind: Broker + sources: + - eventType: events.vet.appointments + ref: + name: broker-appointments + namespace: default + apiVersion: eventing.knative.dev/v1 + kind: Broker + - eventType: events.vet.appointments.request + ref: + name: broker-appointments-request + namespace: default + apiVersion: eventing.knative.dev/v1 + kind: Broker flow: events: - name: MakeVetAppointment diff --git a/test/testdata/sonataflow.org_v1alpha08_sonataflowplatform_withBroker.yaml b/test/testdata/sonataflow.org_v1alpha08_sonataflowplatform_withBroker.yaml new file mode 100644 index 000000000..d554d60b6 --- /dev/null +++ b/test/testdata/sonataflow.org_v1alpha08_sonataflowplatform_withBroker.yaml @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +apiVersion: sonataflow.org/v1alpha08 +kind: SonataFlowPlatform +metadata: + name: sonataflow-platform +spec: + properties: + flow: + - name: quarkus.log.level + value: INFO + build: + config: + registry: + address: quay.io/kiegroup + secret: regcred + eventing: + broker: + ref: + name: default + apiVersion: eventing.knative.dev/v1 + kind: Broker + services: + dataIndex: + enabled: true + podTemplate: + container: + resources: {} + jobService: + enabled: true + podTemplate: + container: + resources: {} diff --git a/test/yaml.go b/test/yaml.go index 368ac6b79..47b25aa69 100644 --- a/test/yaml.go +++ b/test/yaml.go @@ -34,6 +34,9 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/yaml" + "k8s.io/client-go/discovery" + discfake "k8s.io/client-go/discovery/fake" + clienttesting "k8s.io/client-go/testing" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -48,6 +51,7 @@ const ( SonataFlowGreetingsDataInputSchemaConfig = "v1_configmap_greetings_datainput.yaml" SonataFlowGreetingsStaticFilesConfig = "v1_configmap_greetings_staticfiles.yaml" sonataFlowPlatformYamlCR = "sonataflow.org_v1alpha08_sonataflowplatform.yaml" + sonataFlowPlatformWithBrokerYamlCR = "sonataflow.org_v1alpha08_sonataflowplatform_withBroker.yaml" sonataFlowPlatformWithCacheMinikubeYamlCR = "sonataflow.org_v1alpha08_sonataflowplatform_withCache_minikube.yaml" sonataFlowPlatformForOpenshift = "sonataflow.org_v1alpha08_sonataflowplatform_openshift.yaml" sonataFlowClusterPlatformYamlCR = "sonataflow.org_v1alpha08_sonataflowclusterplatform.yaml" @@ -256,6 +260,14 @@ func GetBasePlatform() *operatorapi.SonataFlowPlatform { return getSonataFlowPlatform(sonataFlowPlatformYamlCR) } +func GetBasePlatformWithBroker() *operatorapi.SonataFlowPlatform { + return getSonataFlowPlatform(sonataFlowPlatformWithBrokerYamlCR) +} + +func GetBasePlatformWithBrokerInReadyPhase(namespace string) *operatorapi.SonataFlowPlatform { + return GetSonataFlowPlatformInReadyPhase(sonataFlowPlatformWithBrokerYamlCR, namespace) +} + func GetPlatformMinikubeE2eTest() string { return e2eSamples + sonataFlowPlatformWithCacheMinikubeYamlCR } @@ -317,3 +329,14 @@ func getProjectDir() string { return projectDir } + +func CreateFakeKnativeDiscoveryClient() discovery.DiscoveryInterface { + return &discfake.FakeDiscovery{ + Fake: &clienttesting.Fake{ + Resources: []*metav1.APIResourceList{ + {GroupVersion: "serving.knative.dev/v1"}, + {GroupVersion: "eventing.knative.dev/v1"}, + }, + }, + } +} diff --git a/utils/kubernetes/volumes.go b/utils/kubernetes/volumes.go index 71b56d620..5aba3dbcc 100644 --- a/utils/kubernetes/volumes.go +++ b/utils/kubernetes/volumes.go @@ -147,7 +147,28 @@ func AddOrReplaceVolumeMount(containerIndex int, podSpec *corev1.PodSpec, mounts mountsToAdd = append(mountsToAdd, mount) } } - for _, mount := range mountsToAdd { - container.VolumeMounts = append(container.VolumeMounts, mount) + container.VolumeMounts = append(container.VolumeMounts, mountsToAdd...) +} + +// AddOrReplaceEnvVar adds or removes the given env variables to the PodSpec. +// If there's already an env variable with the same name, it's replaced. +func AddOrReplaceEnvVar(containerIndex int, podSpec *corev1.PodSpec, envs ...corev1.EnvVar) { + envVarsToAdd := make([]corev1.EnvVar, 0) + wasAdded := false + container := &podSpec.Containers[containerIndex] + for _, envVar := range envs { + wasAdded = false + for i := 0; !wasAdded && i < len(container.Env); i++ { + if envVar.Name == container.Env[i].Name { + // replace existing + container.Env[i] = envVar + wasAdded = true + } + } + if !wasAdded { + // remember to add it later in order + envVarsToAdd = append(envVarsToAdd, envVar) + } } + container.Env = append(container.Env, envVarsToAdd...) }