From d2945f6b9be35e2227cd98c69320f615a6c0eb3b Mon Sep 17 00:00:00 2001 From: Jianrong Zhang Date: Sat, 27 Jul 2024 18:54:52 -0400 Subject: [PATCH] [issue-368] knative integration with DataIndex and JobService: fix e2e test errors and use temp images for DI and JS --- Makefile | 2 +- controllers/knative/knative.go | 18 ++ controllers/platform/k8s.go | 58 +++-- controllers/platform/services/services.go | 31 ++- .../common/constants/platform_services.go | 1 + .../sonataflowplatform_controller_test.go | 12 +- hack/ci/create-kind-cluster-with-registry.sh | 4 +- test/e2e/helpers.go | 111 +++++++++- test/e2e/platform_test.go | 204 +++++++++++++++++- test/e2e/workflow_test.go | 43 ++-- .../platform-level-broker/01-postgres.yaml | 86 ++++++++ .../02-sonataflow_platform.yaml | 75 +++++++ .../platform-level-broker/broker/broker.yaml} | 2 + .../platform-level-broker/kustomization.yaml} | 22 +- ...4-sonataflow_callbackstatetimeouts.sw.yaml | 81 +++++++ .../service-level-broker/01-postgres.yaml | 86 ++++++++ .../02-sonataflow_platform.yaml | 89 ++++++++ .../broker}/00-broker.yaml | 27 +++ .../service-level-broker/kustomization.yaml} | 22 +- .../by_service/02-sonataflow_platform.yaml | 6 - .../persistence/by_service/kustomization.yaml | 1 - .../02-sonataflow_platform.yaml | 6 - .../kustomization.yaml | 1 - .../02-sonataflow_platform.yaml | 6 - .../kustomization.yaml | 1 - .../02-sonataflow_platform.yaml | 6 - .../kustomization.yaml | 1 - test/yaml.go | 4 + version/version.go | 2 +- 29 files changed, 913 insertions(+), 95 deletions(-) create mode 100644 test/testdata/platform/services/preview/knative/platform-level-broker/01-postgres.yaml create mode 100644 test/testdata/platform/services/preview/knative/platform-level-broker/02-sonataflow_platform.yaml rename test/testdata/{workflow/persistence/by_service/00-broker.yaml => platform/services/preview/knative/platform-level-broker/broker/broker.yaml} (99%) rename test/testdata/{workflow/persistence/from_platform_with_no_persistence_required/00-broker.yaml => platform/services/preview/knative/platform-level-broker/kustomization.yaml} (63%) create mode 100644 test/testdata/platform/services/preview/knative/platform-level-broker/sonataflow/04-sonataflow_callbackstatetimeouts.sw.yaml create mode 100644 test/testdata/platform/services/preview/knative/service-level-broker/01-postgres.yaml create mode 100644 test/testdata/platform/services/preview/knative/service-level-broker/02-sonataflow_platform.yaml rename test/testdata/{workflow/persistence/from_platform_overwritten_by_service => platform/services/preview/knative/service-level-broker/broker}/00-broker.yaml (63%) rename test/testdata/{workflow/persistence/from_platform_without_di_and_js_services/00-broker.yaml => platform/services/preview/knative/service-level-broker/kustomization.yaml} (63%) diff --git a/Makefile b/Makefile index df7614caf..c2f6a30c9 100644 --- a/Makefile +++ b/Makefile @@ -342,7 +342,7 @@ generate-all: generate generate-deploy bundle addheaders vet fmt .PHONY: test-e2e # You will need to have a Minikube/Kind cluster up in running to run this target, and run container-builder before the test test-e2e: - go test ./test/e2e/* -v -ginkgo.v -ginkgo.no-color -ginkgo.junit-report=./e2e-test-report.xml -timeout 75m + go test ./test/e2e/* -v -ginkgo.v -ginkgo.no-color -ginkgo.timeout=90m -ginkgo.junit-report=./e2e-test-report.xml -timeout 90m .PHONY: before-pr before-pr: test generate-all diff --git a/controllers/knative/knative.go b/controllers/knative/knative.go index e0247ad45..c99942cf8 100644 --- a/controllers/knative/knative.go +++ b/controllers/knative/knative.go @@ -33,6 +33,7 @@ import ( "k8s.io/client-go/discovery" "k8s.io/client-go/rest" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" clienteventingv1 "knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -58,6 +59,7 @@ const ( knativeEventingGroup = "eventing.knative.dev" knativeEventingAPIVersion = "eventing.knative.dev/v1" knativeBrokerKind = "Broker" + knativeSinkProvided = "SinkProvided" ) func GetKnativeServingClient(cfg *rest.Config) (clientservingv1.ServingV1Interface, error) { @@ -270,3 +272,19 @@ func MapTriggerToPlatformRequests(ctx context.Context, object client.Object) []r } return nil } + +// Does the sinkbinding completed K_SINK injection? +func CheckKSinkInjected(name, namespace string) (bool, error) { + sb := &sourcesv1.SinkBinding{} + if err := utils.GetClient().Get(context.TODO(), types.NamespacedName{Name: fmt.Sprintf("%s-sb", name), Namespace: namespace}, sb); err != nil { + if errors.IsNotFound(err) { + return false, nil // deployment hasn't been created yet + } + return false, err + } + cond := sb.Status.GetCondition(apis.ConditionType(knativeSinkProvided)) + if cond != nil && cond.Status == corev1.ConditionTrue { + return true, nil + } + return false, nil // K_SINK has not been injected yet +} diff --git a/controllers/platform/k8s.go b/controllers/platform/k8s.go index 6edbd9d87..60c622200 100644 --- a/controllers/platform/k8s.go +++ b/controllers/platform/k8s.go @@ -21,6 +21,7 @@ package platform import ( "context" + "fmt" operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" "github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client" @@ -144,6 +145,13 @@ func createOrUpdateDeployment(ctx context.Context, client client.Client, platfor serviceContainer.Name = psh.GetContainerName() replicas := psh.GetReplicaCount() + kSinkInjected, err := psh.CheckKSinkInjected() + if err != nil { + return nil + } + if !kSinkInjected { + replicas = 0 // Wait for K_SINK injection + } lbl, selectorLbl := getLabels(platform, psh) serviceDeploymentSpec := appsv1.DeploymentSpec{ Selector: &metav1.LabelSelector{ @@ -200,7 +208,6 @@ func createOrUpdateDeployment(ctx context.Context, client client.Client, platfor } else { klog.V(log.I).InfoS("Deployment successfully reconciled", "operation", op) } - return nil } @@ -305,42 +312,61 @@ func createOrUpdateKnativeResources(ctx context.Context, client client.Client, p if err != nil { return err } + // Create or update triggers for _, obj := range objs { - if platform.Namespace == obj.GetNamespace() { - if err := controllerutil.SetControllerReference(platform, obj, client.Scheme()); err != nil { - return err - } - } if triggerDef, ok := obj.(*eventingv1.Trigger); ok { + if platform.Namespace == obj.GetNamespace() { + if err := controllerutil.SetControllerReference(platform, obj, client.Scheme()); err != nil { + return err + } + } else { + // This is for Knative trigger in a different namespace + // Set the finalizer for trigger cleanup when the platform is deleted + if err := setSonataFlowPlatformFinalizer(ctx, client, platform); err != nil { + return err + } + } trigger := &eventingv1.Trigger{ ObjectMeta: triggerDef.ObjectMeta, } - op, err := controllerutil.CreateOrUpdate(ctx, client, trigger, func() error { + _, err := controllerutil.CreateOrUpdate(ctx, client, trigger, func() error { trigger.Spec = triggerDef.Spec return nil }) if err != nil { return err } - if op == controllerutil.OperationResultCreated { // trigger has been successfully created - addToSonataFlowPlatformTriggerList(platform, trigger) - if platform.Namespace != obj.GetNamespace() { - // This is for Knative trigger in a different namespace - // Set the finalizer for trigger cleanup when the platform is deleted - return setSonataFlowPlatformFinalizer(ctx, client, platform) - } + addToSonataFlowPlatformTriggerList(platform, trigger) + } + } + + if err := client.Status().Update(ctx, platform); err != nil { + return err + } + + // Create or update sinkbindings + for _, obj := range objs { + if sbDef, ok := obj.(*sourcesv1.SinkBinding); ok { + if err := controllerutil.SetControllerReference(platform, obj, client.Scheme()); err != nil { + return err } - } else if sbDef, ok := obj.(*sourcesv1.SinkBinding); ok { sinkBinding := &sourcesv1.SinkBinding{ ObjectMeta: sbDef.ObjectMeta, } - _, err := controllerutil.CreateOrUpdate(ctx, client, sinkBinding, func() error { + _, err = controllerutil.CreateOrUpdate(ctx, client, sinkBinding, func() error { sinkBinding.Spec = sbDef.Spec return nil }) if err != nil { return err } + kSinkInjected, err := psh.CheckKSinkInjected() + if err != nil { + return err + } + if !kSinkInjected { + return fmt.Errorf("waiting for K_SINK injection for %s to complete", psh.GetServiceName()) + } } } return nil diff --git a/controllers/platform/services/services.go b/controllers/platform/services/services.go index cb727ff30..5df568373 100644 --- a/controllers/platform/services/services.go +++ b/controllers/platform/services/services.go @@ -49,9 +49,6 @@ import ( const ( quarkusHibernateORMDatabaseGeneration string = "QUARKUS_HIBERNATE_ORM_DATABASE_GENERATION" quarkusFlywayMigrateAtStart string = "QUARKUS_FLYWAY_MIGRATE_AT_START" - pathProcesses string = "/processes" - pathDefinitions string = "/definitions" - pathJobs string = "/jobs" ) type PlatformServiceHandler interface { @@ -105,6 +102,9 @@ type PlatformServiceHandler interface { SetServiceUrlInWorkflowStatus(workflow *operatorapi.SonataFlow) GetServiceSource() *duckv1.Destination + + // Check if K_SINK has injected for Job Service. No Op for Data Index + CheckKSinkInjected() (bool, error) } type DataIndexHandler struct { @@ -276,6 +276,10 @@ func (d *DataIndexHandler) GenerateServiceProperties() (*properties.Properties, return props, nil } +func (d *DataIndexHandler) CheckKSinkInjected() (bool, error) { + return true, nil // No op +} + type JobServiceHandler struct { platform *operatorapi.SonataFlowPlatform } @@ -599,13 +603,13 @@ func (d *DataIndexHandler) GenerateKnativeResources(platform *operatorapi.Sonata } serviceName := d.GetServiceName() return []client.Object{ - d.newTrigger(lbl, brokerName, namespace, serviceName, "process-error", "ProcessInstanceErrorDataEvent", pathProcesses, platform), - d.newTrigger(lbl, brokerName, namespace, serviceName, "process-node", "ProcessInstanceNodeDataEvent", pathProcesses, platform), - d.newTrigger(lbl, brokerName, namespace, serviceName, "process-sla", "ProcessInstanceSLADataEvent", pathProcesses, platform), - d.newTrigger(lbl, brokerName, namespace, serviceName, "process-state", "ProcessInstanceStateDataEvent", pathProcesses, platform), - d.newTrigger(lbl, brokerName, namespace, serviceName, "process-variable", "ProcessInstanceVariableDataEvent", pathProcesses, platform), - d.newTrigger(lbl, brokerName, namespace, serviceName, "process-definition", "ProcessDefinitionEvent", pathDefinitions, platform), - d.newTrigger(lbl, brokerName, namespace, serviceName, "jobs", "JobEvent", pathJobs, platform)}, nil + d.newTrigger(lbl, brokerName, namespace, serviceName, "process-error", "ProcessInstanceErrorDataEvent", constants.KogitoProcessInstancesEventsPath, platform), + d.newTrigger(lbl, brokerName, namespace, serviceName, "process-node", "ProcessInstanceNodeDataEvent", constants.KogitoProcessInstancesEventsPath, platform), + d.newTrigger(lbl, brokerName, namespace, serviceName, "process-sla", "ProcessInstanceSLADataEvent", constants.KogitoProcessInstancesEventsPath, platform), + d.newTrigger(lbl, brokerName, namespace, serviceName, "process-state", "ProcessInstanceStateDataEvent", constants.KogitoProcessInstancesEventsPath, platform), + d.newTrigger(lbl, brokerName, namespace, serviceName, "process-variable", "ProcessInstanceVariableDataEvent", constants.KogitoProcessInstancesEventsPath, platform), + d.newTrigger(lbl, brokerName, namespace, serviceName, "process-definition", "ProcessDefinitionEvent", constants.KogitoProcessDefinitionsEventsPath, platform), + d.newTrigger(lbl, brokerName, namespace, serviceName, "jobs", "JobEvent", constants.KogitoJobsPath, platform)}, nil } func (d JobServiceHandler) GetSourceBroker() *duckv1.Destination { @@ -717,6 +721,13 @@ func (j *JobServiceHandler) GenerateKnativeResources(platform *operatorapi.Sonat return resultObjs, nil } +func (j *JobServiceHandler) CheckKSinkInjected() (bool, error) { + if j.GetSink() != nil { //job services has sink configured + return knative.CheckKSinkInjected(j.GetServiceName(), j.platform.Namespace) + } + return true, nil +} + func IsDataIndexEnabled(plf *operatorapi.SonataFlowPlatform) bool { if plf.Spec.Services != nil { if plf.Spec.Services.DataIndex != nil { diff --git a/controllers/profiles/common/constants/platform_services.go b/controllers/profiles/common/constants/platform_services.go index 8a4322844..0dc1fd087 100644 --- a/controllers/profiles/common/constants/platform_services.go +++ b/controllers/profiles/common/constants/platform_services.go @@ -47,6 +47,7 @@ const ( KogitoProcessDefinitionsEventsErrorsEnabled = "kogito.events.processdefinitions.errors.propagate" KogitoProcessDefinitionsEventsPath = "/definitions" KogitoUserTasksEventsEnabled = "kogito.events.usertasks.enabled" + KogitoJobsPath = "/jobs" // KogitoDataIndexHealthCheckEnabled configures if a workflow must check for the data index availability as part // of its start health check. KogitoDataIndexHealthCheckEnabled = "kogito.data-index.health-enabled" diff --git a/controllers/sonataflowplatform_controller_test.go b/controllers/sonataflowplatform_controller_test.go index 5a68b825e..9f3661a45 100644 --- a/controllers/sonataflowplatform_controller_test.go +++ b/controllers/sonataflowplatform_controller_test.go @@ -63,6 +63,7 @@ func TestSonataFlowPlatformController(t *testing.T) { // Create a fake client to mock API calls. cl := test.NewSonataFlowClientBuilder().WithRuntimeObjects(ksp).WithStatusSubresource(ksp).Build() + utils.SetClient(cl) // Create a SonataFlowPlatformReconciler object with the scheme and fake client. r := &SonataFlowPlatformReconciler{cl, cl, cl.Scheme(), &rest.Config{}, &record.FakeRecorder{}} @@ -190,6 +191,7 @@ func TestSonataFlowPlatformController(t *testing.T) { // Create a fake client to mock API calls. cl := test.NewKogitoClientBuilderWithOpenShift().WithRuntimeObjects(ksp).WithStatusSubresource(ksp).Build() + utils.SetClient(cl) di := services.NewDataIndexHandler(ksp) @@ -289,6 +291,7 @@ func TestSonataFlowPlatformController(t *testing.T) { // Create a fake client to mock API calls. cl := test.NewKogitoClientBuilderWithOpenShift().WithRuntimeObjects(ksp).WithStatusSubresource(ksp).Build() + utils.SetClient(cl) // Create a SonataFlowPlatformReconciler object with the scheme and fake client. r := &SonataFlowPlatformReconciler{cl, cl, cl.Scheme(), &rest.Config{}, &record.FakeRecorder{}} @@ -397,6 +400,7 @@ func TestSonataFlowPlatformController(t *testing.T) { // Create a fake client to mock API calls. cl := test.NewKogitoClientBuilderWithOpenShift().WithRuntimeObjects(ksp).WithStatusSubresource(ksp).Build() + utils.SetClient(cl) // Create a SonataFlowPlatformReconciler object with the scheme and fake client. r := &SonataFlowPlatformReconciler{cl, cl, cl.Scheme(), &rest.Config{}, &record.FakeRecorder{}} @@ -488,6 +492,7 @@ func TestSonataFlowPlatformController(t *testing.T) { // Create a fake client to mock API calls. cl := test.NewKogitoClientBuilderWithOpenShift().WithRuntimeObjects(ksp).WithStatusSubresource(ksp).Build() + utils.SetClient(cl) // Create a SonataFlowPlatformReconciler object with the scheme and fake client. r := &SonataFlowPlatformReconciler{cl, cl, cl.Scheme(), &rest.Config{}, &record.FakeRecorder{}} @@ -574,6 +579,7 @@ func TestSonataFlowPlatformController(t *testing.T) { // Create a fake client to mock API calls. cl := test.NewKogitoClientBuilderWithOpenShift().WithRuntimeObjects(ksp).WithStatusSubresource(ksp).Build() + utils.SetClient(cl) js := services.NewJobServiceHandler(ksp) // Create a SonataFlowPlatformReconciler object with the scheme and fake client. r := &SonataFlowPlatformReconciler{cl, cl, cl.Scheme(), &rest.Config{}, &record.FakeRecorder{}} @@ -648,6 +654,7 @@ func TestSonataFlowPlatformController(t *testing.T) { // Create a fake client to mock API calls. cl := test.NewKogitoClientBuilderWithOpenShift().WithRuntimeObjects(ksp).WithStatusSubresource(ksp).Build() + utils.SetClient(cl) di := services.NewDataIndexHandler(ksp) js := services.NewJobServiceHandler(ksp) // Create a SonataFlowPlatformReconciler object with the scheme and fake client. @@ -718,6 +725,7 @@ func TestSonataFlowPlatformController(t *testing.T) { // Create a fake client to mock API calls. cl := test.NewSonataFlowClientBuilder().WithRuntimeObjects(kscp, ksp, ksp2).WithStatusSubresource(kscp, ksp, ksp2).Build() + utils.SetClient(cl) // Create a SonataFlowPlatformReconciler object with the scheme and fake client. r := &SonataFlowPlatformReconciler{cl, cl, cl.Scheme(), &rest.Config{}, &record.FakeRecorder{}} @@ -867,7 +875,7 @@ func TestSonataFlowPlatformController(t *testing.T) { }, } _, err := r.Reconcile(context.TODO(), req) - if err != nil { + if err != nil && err.Error() != "waiting for K_SINK injection for sonataflow-platform-jobs-service to complete" { t.Fatalf("reconcile: (%v)", err) } @@ -967,7 +975,7 @@ func TestSonataFlowPlatformController(t *testing.T) { }, } _, err := r.Reconcile(context.TODO(), req) - if err != nil { + if err != nil && err.Error() != "waiting for K_SINK injection for sonataflow-platform-jobs-service to complete" { t.Fatalf("reconcile: (%v)", err) } diff --git a/hack/ci/create-kind-cluster-with-registry.sh b/hack/ci/create-kind-cluster-with-registry.sh index 4cded1a9a..c8c871b22 100755 --- a/hack/ci/create-kind-cluster-with-registry.sh +++ b/hack/ci/create-kind-cluster-with-registry.sh @@ -39,14 +39,14 @@ nodes: kind: InitConfiguration nodeRegistration: kubeletExtraArgs: - system-reserved: memory=1Gi + system-reserved: memory=2Gi - role: worker kubeadmConfigPatches: - | kind: InitConfiguration nodeRegistration: kubeletExtraArgs: - system-reserved: memory=2Gi + system-reserved: memory=4Gi containerdConfigPatches: - |- [plugins."io.containerd.grpc.v1.cri".registry] diff --git a/test/e2e/helpers.go b/test/e2e/helpers.go index 104afa315..a3d727adf 100644 --- a/test/e2e/helpers.go +++ b/test/e2e/helpers.go @@ -19,9 +19,12 @@ import ( "fmt" "net/url" "os/exec" + "regexp" "strconv" "strings" + "time" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" "github.com/apache/incubator-kie-kogito-serverless-operator/test/utils" //nolint:golint @@ -139,7 +142,7 @@ func verifyWorkflowIsInRunningState(workflowName string, targetNamespace string) } func verifyWorkflowIsAddressable(workflowName string, targetNamespace string) bool { - cmd := exec.Command("kubectl", "get", "workflow", workflowName, "-n", targetNamespace, "-o", "jsonpath={.status.address.url}") + cmd := exec.Command("kubectl", "get", "workflow", workflowName, "-n", targetNamespace, "-ojsonpath={.status.address.url}") if response, err := utils.Run(cmd); err != nil { GinkgoWriter.Println(fmt.Errorf("failed to check if greeting workflow is running: %v", err)) return false @@ -157,3 +160,109 @@ func verifyWorkflowIsAddressable(workflowName string, targetNamespace string) bo return false } } + +func verifySchemaMigration(data, name string) bool { + matched1, err := regexp.MatchString(fmt.Sprintf("Successfully applied \\d migrations to schema \"%s\"", name), data) + if err != nil { + GinkgoWriter.Println(fmt.Errorf("string match error:%v", err)) + return false + } + matched2, err := regexp.MatchString("Successfully validated \\d (migration|migrations)", data) + if err != nil { + GinkgoWriter.Println(fmt.Errorf("string match error:%v", err)) + return false + } + GinkgoWriter.Println(fmt.Sprintf("verifying schemaMigration, logs=%v", data)) + return (matched1 && strings.Contains(data, fmt.Sprintf("Creating schema \"%s\"", name)) && + strings.Contains(data, fmt.Sprintf("Migrating schema \"%s\" to version", name))) || + (matched2 && strings.Contains(data, fmt.Sprintf("Current version of schema \"%s\"", name)) && + strings.Contains(data, fmt.Sprintf("Schema \"%s\" is up to date. No migration necessary", name))) || + (strings.Contains(data, fmt.Sprintf("Creating schema \"%s\"", name)) && + strings.Contains(data, fmt.Sprintf("Current version of schema \"%s\"", name)) && + strings.Contains(data, fmt.Sprintf("Schema \"%s\" is up to date. No migration necessary", name))) +} + +func verifyKSinkInjection(label, ns string) bool { + cmd := exec.Command("kubectl", "get", "pod", "-n", ns, "-l", label, "-o", "jsonpath={.items[*].metadata.name}") + out, err := utils.Run(cmd) + if err != nil { + GinkgoWriter.Println(fmt.Errorf("failed to get pods: %v", err)) + return false + } + podNames := strings.Fields(string(out)) + if len(podNames) == 0 { + GinkgoWriter.Println("no pods found to check K_SINK") + return false // pods haven't created yet + } + GinkgoWriter.Println(fmt.Sprintf("pods found: %s", podNames)) + for _, pod := range podNames { + cmd = exec.Command("kubectl", "get", "pod", pod, "-n", ns, "-o", "json") + out, err := utils.Run(cmd) + if err != nil { + GinkgoWriter.Println(fmt.Errorf("failed to get pod: %v", err)) + return false + } + GinkgoWriter.Println(string(out)) + if !strings.Contains(string(out), "K_SINK") { // The pod does not have K_SINK injected + GinkgoWriter.Println(fmt.Sprintf("Pod does not have K_SINK injected: %s", string(out))) + return false + } + } + return true +} + +func waitForPodRestartCompletion(label, ns string) { + EventuallyWithOffset(1, func() bool { + cmd := exec.Command("kubectl", "get", "pod", "-n", ns, "-l", label, "-o", "jsonpath={.items[*].metadata.name}") + out, err := utils.Run(cmd) + if err != nil { + GinkgoWriter.Println(fmt.Errorf("failed to get pods: %v", err)) + return false + } + podNames := strings.Fields(string(out)) + if len(podNames) == 0 { + GinkgoWriter.Println("no pods found") + return false // pods haven't created yet + } else if len(podNames) > 1 { + GinkgoWriter.Println("multiple pods found") + return false // multiple pods found, wait for other pods to terminate + } + return true + }, 1*time.Minute, 5).Should(BeTrue()) +} + +func verifyTrigger(triggers []operatorapi.SonataFlowPlatformTriggerRef, namePrefix, path, ns, broker string) error { + GinkgoWriter.Println("Triggers from platform status:", triggers) + for _, ref := range triggers { + if strings.HasPrefix(ref.Name, namePrefix) && ref.Namespace == ns { + return verifyTriggerData(ref.Name, ns, path, broker) + } + } + return fmt.Errorf("failed to find trigger to verify with prefix: %v, namespace: %v", namePrefix, ns) +} + +func verifyTriggerData(name, ns, path, broker string) error { + cmd := exec.Command("kubectl", "get", "trigger", name, "-n", ns, "-ojsonpath={.spec.broker} {.status.subscriberUri} {.status.conditions[?(@.type=='Ready')].status}") + out, err := utils.Run(cmd) + if err != nil { + return err + } + data := strings.Fields(string(out)) + if len(data) == 3 && broker == data[0] && strings.HasSuffix(data[1], path) && data[2] == "True" { + return nil + } + return fmt.Errorf("failed to verify trigger %v, data=%s", name, string(out)) +} + +func verifySinkBinding(name, ns, broker string) error { + cmd := exec.Command("kubectl", "get", "sinkbinding", name, "-n", ns, "-ojsonpath={.status.sinkUri} {.status.conditions[?(@.type=='Ready')].status}") + out, err := utils.Run(cmd) + if err != nil { + return err + } + data := strings.Fields(string(out)) + if len(data) == 2 && strings.HasSuffix(data[0], broker) && data[1] == "True" { + return nil + } + return fmt.Errorf("failed to verify sinkbinding %v, data=%s", name, string(out)) +} diff --git a/test/e2e/platform_test.go b/test/e2e/platform_test.go index ef4f93139..4650b7ff2 100644 --- a/test/e2e/platform_test.go +++ b/test/e2e/platform_test.go @@ -16,6 +16,7 @@ package e2e import ( "bytes" + "encoding/json" "fmt" "math/rand" "os/exec" @@ -24,6 +25,9 @@ import ( "time" "github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/constants" "github.com/apache/incubator-kie-kogito-serverless-operator/test" "github.com/apache/incubator-kie-kogito-serverless-operator/test/utils" @@ -47,8 +51,9 @@ const ( var _ = Describe("Validate the persistence", Ordered, func() { var ( - projectDir string - targetNamespace string + projectDir string + targetNamespace string + targetNamespace2 string ) BeforeEach(func() { @@ -56,13 +61,25 @@ var _ = Describe("Validate the persistence", Ordered, func() { cmd := exec.Command("kubectl", "create", "namespace", targetNamespace) _, err := utils.Run(cmd) Expect(err).NotTo(HaveOccurred()) + + targetNamespace2 = fmt.Sprintf("test-%d", rand.Intn(1024)+1) + cmd = exec.Command("kubectl", "create", "namespace", targetNamespace2) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) }) AfterEach(func() { // Remove resources in test namespace with no failure - if !CurrentSpecReport().Failed() && len(targetNamespace) > 0 { - cmd := exec.Command("kubectl", "delete", "namespace", targetNamespace, "--wait") - _, err := utils.Run(cmd) - Expect(err).NotTo(HaveOccurred()) + if !CurrentSpecReport().Failed() { + if len(targetNamespace) > 0 { + cmd := exec.Command("kubectl", "delete", "namespace", targetNamespace, "--wait") + _, err := utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + } + if len(targetNamespace2) > 0 { + cmd := exec.Command("kubectl", "delete", "namespace", targetNamespace2, "--wait") + _, err := utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + } } }) var _ = Context("with platform services", func() { @@ -83,11 +100,20 @@ var _ = Describe("Validate the persistence", Ordered, func() { Expect(err).NotTo(HaveOccurred()) By("Wait for SonataFlowPlatform CR to complete deployment") // wait for service deployments to be ready - EventuallyWithOffset(1, func() error { + EventuallyWithOffset(1, func() bool { cmd = exec.Command("kubectl", "wait", "pod", "-n", targetNamespace, "-l", "app.kubernetes.io/name in (jobs-service,data-index-service)", "--for", "condition=Ready", "--timeout=5s") _, err = utils.Run(cmd) - return err - }, 25*time.Minute, 5).Should(Succeed()) + if err != nil { + return false + } + if profile == metadata.PreviewProfile.String() { + GinkgoWriter.Println("waitForPodRestartCompletion") + waitForPodRestartCompletion("app.kubernetes.io/name=jobs-service", targetNamespace) + GinkgoWriter.Println("waitForPodRestartCompletion done") + return true + } + return true + }, 30*time.Minute, 5).Should(BeTrue()) By("Evaluate status of service's health endpoint") cmd = exec.Command("kubectl", "get", "pod", "-l", "app.kubernetes.io/name in (jobs-service,data-index-service)", "-n", targetNamespace, "-ojsonpath={.items[*].metadata.name}") output, err := utils.Run(cmd) @@ -156,4 +182,164 @@ var _ = Describe("Validate the persistence", Ordered, func() { Entry("and both Job Service and Data Index using the one defined in each service, discarding the one from the platform CR", test.GetSonataFlowE2EPlatformPersistenceSampleDataDirectory("overwritten_by_services")), ) + DescribeTable("when deploying a SonataFlowPlatform CR with brokers", func(testcaseDir string) { + By("Deploy the brokers") + cmd := exec.Command("kubectl", "create", "-n", targetNamespace, "-f", filepath.Join(projectDir, + testcaseDir, "broker")) + _, err := utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + By("Wait for the brokers to be ready") + EventuallyWithOffset(1, func() error { + cmd = exec.Command("kubectl", "wait", "broker", "-l", "test=test-e2e", "-n", targetNamespace, "--for", "condition=Ready=True", "--timeout=5s") + _, err = utils.Run(cmd) + return err + }, time.Minute, time.Second).Should(Succeed()) + + By("Deploy the CR") + var manifests []byte + EventuallyWithOffset(1, func() error { + var err error + cmd := exec.Command("kubectl", "kustomize", testcaseDir) + manifests, err = utils.Run(cmd) + return err + }, time.Minute, time.Second).Should(Succeed()) + cmd = exec.Command("kubectl", "create", "-n", targetNamespace, "-f", "-") + cmd.Stdin = bytes.NewBuffer(manifests) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + By("Wait for SonatatFlowPlatform CR to complete deployment") + // wait for service deployments to be ready + EventuallyWithOffset(1, func() error { + cmd = exec.Command("kubectl", "wait", "pod", "-n", targetNamespace, "-l", "app.kubernetes.io/name in (jobs-service,data-index-service)", "--for", "condition=Ready", "--timeout=5s") + _, err = utils.Run(cmd) + return err + }, 10*time.Minute, 5).Should(Succeed()) + + GinkgoWriter.Println("waitForPodRestartCompletion") + waitForPodRestartCompletion("app.kubernetes.io/name=jobs-service", targetNamespace) + GinkgoWriter.Println("waitForPodRestartCompletion done") + + By("Evaluate status of all service's health endpoint") + cmd = exec.Command("kubectl", "get", "pod", "-l", "app.kubernetes.io/name in (jobs-service,data-index-service)", "-n", targetNamespace, "-ojsonpath={.items[*].metadata.name}") + output, err := utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + for _, pn := range strings.Split(string(output), " ") { + verifyHealthStatusInPod(pn, targetNamespace) + } + By("Evaluate triggers and sinkbindings") + cmd = exec.Command("kubectl", "get", "sonataflowplatform", "sonataflow-platform", "-n", targetNamespace, "-ojsonpath={.status.triggers}") + output, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + var triggers []operatorapi.SonataFlowPlatformTriggerRef + err = json.Unmarshal(output, &triggers) + Expect(err).NotTo(HaveOccurred()) + Expect(verifyTrigger(triggers, "data-index-process-error-", constants.KogitoProcessInstancesEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred()) + Expect(verifyTrigger(triggers, "data-index-process-node-", constants.KogitoProcessInstancesEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred()) + Expect(verifyTrigger(triggers, "data-index-process-sla-", constants.KogitoProcessInstancesEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred()) + Expect(verifyTrigger(triggers, "data-index-process-state-", constants.KogitoProcessInstancesEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred()) + Expect(verifyTrigger(triggers, "data-index-process-variable-", constants.KogitoProcessInstancesEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred()) + Expect(verifyTrigger(triggers, "data-index-process-definition-", constants.KogitoProcessDefinitionsEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred()) + Expect(verifyTrigger(triggers, "data-index-jobs-", constants.KogitoJobsPath, targetNamespace, "di-source")).NotTo(HaveOccurred()) + Expect(verifyTrigger(triggers, "jobs-service-create-job-", constants.JobServiceJobEventsPath, targetNamespace, "js-source")).NotTo(HaveOccurred()) + Expect(verifyTrigger(triggers, "jobs-service-delete-job-", constants.JobServiceJobEventsPath, targetNamespace, "js-source")).NotTo(HaveOccurred()) + Expect(verifySinkBinding("sonataflow-platform-jobs-service-sb", targetNamespace, "js-sink")).NotTo(HaveOccurred()) + }, + Entry("and both Job Service and Data Index have service level brokers", test.GetSonataFlowE2EPlatformServicesKnativeDirectory("service-level-broker")), + ) + + DescribeTable("when deploying a SonataFlowPlatform CR with platform broker", func(testcaseDir string, brokerInAnotherNamespace bool) { + By("Deploy the broker") + brokerName := "default" + brokerNamespace := targetNamespace + if brokerInAnotherNamespace { + brokerNamespace = targetNamespace2 + } + GinkgoWriter.Println(fmt.Sprintf("testcaseDir=%v, brokerNamespace = %s", testcaseDir, brokerNamespace)) + cmd := exec.Command("kubectl", "create", "-n", brokerNamespace, "-f", filepath.Join(projectDir, + testcaseDir, "broker")) + _, err := utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + By("Wait for the broker to be ready") + EventuallyWithOffset(1, func() error { + cmd = exec.Command("kubectl", "wait", "broker", brokerName, "-n", brokerNamespace, "--for", "condition=Ready=True", "--timeout=5s") + _, err = utils.Run(cmd) + return err + }, time.Minute, time.Second).Should(Succeed()) + + By("Deploy the SonataFlowPlatform CR") + var manifests []byte + EventuallyWithOffset(1, func() error { + var err error + cmd := exec.Command("kubectl", "kustomize", testcaseDir) + manifests, err = utils.Run(cmd) + return err + }, time.Minute, time.Second).Should(Succeed()) + manifestsUpdated := strings.ReplaceAll(string(manifests), "${BROKER_NAMESPACE}", brokerNamespace) + cmd = exec.Command("kubectl", "create", "-n", targetNamespace, "-f", "-") + cmd.Stdin = bytes.NewBuffer([]byte(manifestsUpdated)) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + By("Wait for SonatatFlowPlatform CR to complete deployment") + // wait for service deployments to be ready + EventuallyWithOffset(1, func() error { + cmd = exec.Command("kubectl", "wait", "pod", "-n", targetNamespace, "-l", "app.kubernetes.io/name in (jobs-service,data-index-service)", "--for", "condition=Ready", "--timeout=5s") + _, err = utils.Run(cmd) + return err + }, 10*time.Minute, 5).Should(Succeed()) + + GinkgoWriter.Println("waitForPodRestartCompletion") + waitForPodRestartCompletion("app.kubernetes.io/name=jobs-service", targetNamespace) + GinkgoWriter.Println("waitForPodRestartCompletion done") + + By("Evaluate status of all service's health endpoint") + cmd = exec.Command("kubectl", "get", "pod", "-l", "app.kubernetes.io/name in (jobs-service,data-index-service)", "-n", targetNamespace, "-ojsonpath={.items[*].metadata.name}") + output, err := utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + for _, pn := range strings.Split(string(output), " ") { + verifyHealthStatusInPod(pn, targetNamespace) + } + By("Evaluate triggers and sinkbindings for DI and JS") + cmd = exec.Command("kubectl", "get", "sonataflowplatform", "sonataflow-platform", "-n", targetNamespace, "-ojsonpath={.status.triggers}") + output, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + var triggers []operatorapi.SonataFlowPlatformTriggerRef + err = json.Unmarshal(output, &triggers) + Expect(err).NotTo(HaveOccurred()) + Expect(verifyTrigger(triggers, "data-index-process-error-", constants.KogitoProcessInstancesEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred()) + Expect(verifyTrigger(triggers, "data-index-process-node-", constants.KogitoProcessInstancesEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred()) + Expect(verifyTrigger(triggers, "data-index-process-sla-", constants.KogitoProcessInstancesEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred()) + Expect(verifyTrigger(triggers, "data-index-process-state-", constants.KogitoProcessInstancesEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred()) + Expect(verifyTrigger(triggers, "data-index-process-variable-", constants.KogitoProcessInstancesEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred()) + Expect(verifyTrigger(triggers, "data-index-process-definition-", constants.KogitoProcessDefinitionsEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred()) + Expect(verifyTrigger(triggers, "data-index-jobs-", constants.KogitoJobsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred()) + Expect(verifyTrigger(triggers, "jobs-service-create-job-", constants.JobServiceJobEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred()) + Expect(verifyTrigger(triggers, "jobs-service-delete-job-", constants.JobServiceJobEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred()) + Expect(verifySinkBinding("sonataflow-platform-jobs-service-sb", targetNamespace, brokerName)).NotTo(HaveOccurred()) + + By("Deploy the SonataFlow CR") + cmd = exec.Command("kubectl", "create", "-n", targetNamespace, "-f", filepath.Join(projectDir, + testcaseDir, "sonataflow")) + manifests, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + sfName := "callbackstatetimeouts" + By("Evaluate status of SonataFlow CR") + EventuallyWithOffset(1, func() bool { + return verifyWorkflowIsInRunningStateInNamespace(sfName, targetNamespace) + }, 10*time.Minute, 5).Should(BeTrue()) + + By("Evaluate triggers and sinkbindings for the workflow") + cmd = exec.Command("kubectl", "get", "sonataflow", sfName, "-n", targetNamespace, "-ojsonpath={.status.triggers}") + output, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + err = json.Unmarshal(output, &triggers) + Expect(err).NotTo(HaveOccurred()) + Expect(verifyTrigger(triggers, sfName, "", brokerNamespace, brokerName)).NotTo(HaveOccurred()) + Expect(verifySinkBinding(fmt.Sprintf("%s-sb", sfName), targetNamespace, brokerName)).NotTo(HaveOccurred()) + }, + Entry("and with broker and platform in the same namespace", test.GetSonataFlowE2EPlatformServicesKnativeDirectory("platform-level-broker"), false), + Entry("and with broker and platform in a separate namespace", test.GetSonataFlowE2EPlatformServicesKnativeDirectory("platform-level-broker"), true), + ) }) diff --git a/test/e2e/workflow_test.go b/test/e2e/workflow_test.go index 57f328b3e..98471c8da 100644 --- a/test/e2e/workflow_test.go +++ b/test/e2e/workflow_test.go @@ -40,6 +40,10 @@ import ( . "github.com/onsi/gomega" ) +const ( + workflowAppLabel = "sonataflow.org/workflow-app" +) + var _ = Describe("SonataFlow Operator", Ordered, func() { var targetNamespace string @@ -174,7 +178,7 @@ var _ = Describe("Validate the persistence ", Ordered, func() { }) - DescribeTable("when deploying a SonataFlow CR with PostgreSQL persistence", func(testcaseDir string, withPersistence bool) { + DescribeTable("when deploying a SonataFlow CR with PostgreSQL persistence", func(testcaseDir string, withPersistence bool, waitKSinkInjection bool) { By("Deploy the CR") var manifests []byte EventuallyWithOffset(1, func() error { @@ -189,15 +193,24 @@ var _ = Describe("Validate the persistence ", Ordered, func() { Expect(err).NotTo(HaveOccurred()) By("Wait for SonatatFlow CR to complete deployment") // wait for service deployments to be ready - EventuallyWithOffset(1, func() error { - cmd = exec.Command("kubectl", "wait", "pod", "-n", ns, "-l", "sonataflow.org/workflow-app", "--for", "condition=Ready", "--timeout=5s") + EventuallyWithOffset(1, func() bool { + cmd = exec.Command("kubectl", "wait", "pod", "-n", ns, "-l", workflowAppLabel, "--for", "condition=Ready", "--timeout=5s") out, err := utils.Run(cmd) + if err != nil { + return false + } GinkgoWriter.Printf("%s\n", string(out)) - return err - }, 15*time.Minute, 5).Should(Succeed()) + if !waitKSinkInjection { + return true + } + GinkgoWriter.Println("waitForPodRestartCompletion") + waitForPodRestartCompletion(workflowAppLabel, ns) + GinkgoWriter.Println("waitForPodRestartCompletion done") + return true + }, 25*time.Minute, 5).Should(BeTrue()) By("Evaluate status of the workflow's pod database connection health endpoint") - cmd = exec.Command("kubectl", "get", "pod", "-l", "sonataflow.org/workflow-app", "-n", ns, "-ojsonpath={.items[*].metadata.name}") + cmd = exec.Command("kubectl", "get", "pod", "-l", workflowAppLabel, "-n", ns, "-ojsonpath={.items[*].metadata.name}") output, err := utils.Run(cmd) Expect(err).NotTo(HaveOccurred()) EventuallyWithOffset(1, func() bool { @@ -232,7 +245,7 @@ var _ = Describe("Validate the persistence ", Ordered, func() { return false }, 1*time.Minute).Should(BeTrue()) // Persistence initialization checks - cmd = exec.Command("kubectl", "get", "pod", "-l", "sonataflow.org/workflow-app", "-n", ns, "-ojsonpath={.items[*].metadata.name}") + cmd = exec.Command("kubectl", "get", "pod", "-l", workflowAppLabel, "-n", ns, "-ojsonpath={.items[*].metadata.name}") output, err = utils.Run(cmd) Expect(err).NotTo(HaveOccurred()) podName := string(output) @@ -244,9 +257,9 @@ var _ = Describe("Validate the persistence ", Ordered, func() { By("Validate that the workflow persistence was properly initialized") Expect(logs).Should(ContainSubstring("Flyway Community Edition")) Expect(logs).Should(ContainSubstring("Database: jdbc:postgresql://postgres.%s:5432", ns)) - Expect(logs).Should(ContainSubstring("Creating schema \"callbackstatetimeouts\"")) - Expect(logs).Should(ContainSubstring("Migrating schema \"callbackstatetimeouts\" to version")) - Expect(logs).Should(MatchRegexp("Successfully applied \\d migrations to schema \"callbackstatetimeouts\"")) + result := verifySchemaMigration(logs, "callbackstatetimeouts") + GinkgoWriter.Println(fmt.Sprintf("verifySchemaMigration: %v", result)) + Expect(result).Should(BeTrue()) Expect(logs).Should(ContainSubstring("Profile prod activated")) } else { By("Validate that the workflow has no persistence") @@ -255,11 +268,11 @@ var _ = Describe("Validate the persistence ", Ordered, func() { Expect(logs).Should(ContainSubstring("Profile prod activated")) } }, - Entry("defined in the workflow from an existing kubernetes service as a reference", test.GetSonataFlowE2EWorkflowPersistenceSampleDataDirectory("by_service"), true), - Entry("defined in the workflow and from the sonataflow platform", test.GetSonataFlowE2EWorkflowPersistenceSampleDataDirectory("from_platform_overwritten_by_service"), true), - Entry("defined from the sonataflow platform as reference and with DI and JS", test.GetSonataFlowE2EWorkflowPersistenceSampleDataDirectory("from_platform_with_di_and_js_services"), true), - Entry("defined from the sonataflow platform as reference and without DI and JS", test.GetSonataFlowE2EWorkflowPersistenceSampleDataDirectory("from_platform_without_di_and_js_services"), true), - Entry("defined from the sonataflow platform as reference but not required by the workflow", test.GetSonataFlowE2EWorkflowPersistenceSampleDataDirectory("from_platform_with_no_persistence_required"), false), + Entry("defined in the workflow from an existing kubernetes service as a reference", test.GetSonataFlowE2EWorkflowPersistenceSampleDataDirectory("by_service"), true, false), + Entry("defined in the workflow and from the sonataflow platform", test.GetSonataFlowE2EWorkflowPersistenceSampleDataDirectory("from_platform_overwritten_by_service"), true, false), + Entry("defined from the sonataflow platform as reference and with DI and JS", test.GetSonataFlowE2EWorkflowPersistenceSampleDataDirectory("from_platform_with_di_and_js_services"), true, true), + Entry("defined from the sonataflow platform as reference and without DI and JS", test.GetSonataFlowE2EWorkflowPersistenceSampleDataDirectory("from_platform_without_di_and_js_services"), true, false), + Entry("defined from the sonataflow platform as reference but not required by the workflow", test.GetSonataFlowE2EWorkflowPersistenceSampleDataDirectory("from_platform_with_no_persistence_required"), false, false), ) }) diff --git a/test/testdata/platform/services/preview/knative/platform-level-broker/01-postgres.yaml b/test/testdata/platform/services/preview/knative/platform-level-broker/01-postgres.yaml new file mode 100644 index 000000000..662de4c7b --- /dev/null +++ b/test/testdata/platform/services/preview/knative/platform-level-broker/01-postgres.yaml @@ -0,0 +1,86 @@ +# Copyright 2024 Apache Software Foundation (ASF) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + labels: + app.kubernetes.io/name: postgres + name: postgres-pvc +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 1Gi +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app.kubernetes.io/name: postgres + name: postgres +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: postgres + template: + metadata: + labels: + app.kubernetes.io/name: postgres + spec: + containers: + - name: postgres + image: postgres:13.2-alpine + imagePullPolicy: 'IfNotPresent' + ports: + - containerPort: 5432 + volumeMounts: + - name: storage + mountPath: /var/lib/postgresql/data + envFrom: + - secretRef: + name: postgres-secrets + readinessProbe: + exec: + command: ["pg_isready"] + initialDelaySeconds: 15 + timeoutSeconds: 2 + livenessProbe: + exec: + command: ["pg_isready"] + initialDelaySeconds: 15 + timeoutSeconds: 2 + resources: + limits: + memory: "256Mi" + cpu: "500m" + volumes: + - name: storage + persistentVolumeClaim: + claimName: postgres-pvc +--- +apiVersion: v1 +kind: Service +metadata: + labels: + app.kubernetes.io/name: postgres + name: postgres +spec: + selector: + app.kubernetes.io/name: postgres + ports: + - port: 5432 diff --git a/test/testdata/platform/services/preview/knative/platform-level-broker/02-sonataflow_platform.yaml b/test/testdata/platform/services/preview/knative/platform-level-broker/02-sonataflow_platform.yaml new file mode 100644 index 000000000..07f37f770 --- /dev/null +++ b/test/testdata/platform/services/preview/knative/platform-level-broker/02-sonataflow_platform.yaml @@ -0,0 +1,75 @@ +# Copyright 2024 Apache Software Foundation (ASF) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: sonataflow.org/v1alpha08 +kind: SonataFlowPlatform +metadata: + name: sonataflow-platform +spec: + eventing: + broker: + ref: + apiVersion: eventing.knative.dev/v1 + kind: Broker + name: default + namespace: ${BROKER_NAMESPACE} + build: + config: + strategyOptions: + KanikoBuildCacheEnabled: "true" + services: + dataIndex: + enabled: true + persistence: + postgresql: + jdbcUrl: jdbc:postgresql://postgres:5432/sonataflow?currentSchema=data-index-service + secretRef: + name: postgres-secrets + userKey: POSTGRES_USER + passwordKey: POSTGRES_PASSWORD + podTemplate: + initContainers: + - name: init-postgres + image: registry.access.redhat.com/ubi9/ubi-micro:latest + imagePullPolicy: IfNotPresent + command: [ 'sh', '-c', 'until (echo 1 > /dev/tcp/postgres.$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local/5432) >/dev/null 2>&1; do echo "Waiting for postgres server"; sleep 3; done;' ] + container: + env: + - name: MY_CUSTOM_VARIABLE + value: "OKAY" + - name: QUARKUS_DATASOURCE_PASSWORD +# This value should not be used since it's already set by the operator. If used, the test will fail. + value: "SHOULD_NOT_BE_USED" + jobService: + enabled: true + persistence: + postgresql: + jdbcUrl: jdbc:postgresql://postgres:5432/sonataflow?currentSchema=jobs-service + secretRef: + name: postgres-secrets + userKey: POSTGRES_USER + passwordKey: POSTGRES_PASSWORD + podTemplate: + initContainers: + - name: init-postgres + image: registry.access.redhat.com/ubi9/ubi-micro:latest + imagePullPolicy: IfNotPresent + command: [ 'sh', '-c', 'until (echo 1 > /dev/tcp/postgres.$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local/5432) >/dev/null 2>&1; do echo "Waiting for postgres server"; sleep 3; done;' ] + container: + env: + - name: MY_CUSTOM_VARIABLE + value: "OKAY" + - name: QUARKUS_DATASOURCE_PASSWORD +# This value should not be used since it's already set by the operator. If used, the test will fail. + value: "SHOULD_NOT_BE_USED" \ No newline at end of file diff --git a/test/testdata/workflow/persistence/by_service/00-broker.yaml b/test/testdata/platform/services/preview/knative/platform-level-broker/broker/broker.yaml similarity index 99% rename from test/testdata/workflow/persistence/by_service/00-broker.yaml rename to test/testdata/platform/services/preview/knative/platform-level-broker/broker/broker.yaml index 6152f24d7..219c2c1b1 100644 --- a/test/testdata/workflow/persistence/by_service/00-broker.yaml +++ b/test/testdata/platform/services/preview/knative/platform-level-broker/broker/broker.yaml @@ -18,3 +18,5 @@ metadata: name: default spec: {} + + diff --git a/test/testdata/workflow/persistence/from_platform_with_no_persistence_required/00-broker.yaml b/test/testdata/platform/services/preview/knative/platform-level-broker/kustomization.yaml similarity index 63% rename from test/testdata/workflow/persistence/from_platform_with_no_persistence_required/00-broker.yaml rename to test/testdata/platform/services/preview/knative/platform-level-broker/kustomization.yaml index 6152f24d7..d3fd127c7 100644 --- a/test/testdata/workflow/persistence/from_platform_with_no_persistence_required/00-broker.yaml +++ b/test/testdata/platform/services/preview/knative/platform-level-broker/kustomization.yaml @@ -12,9 +12,21 @@ # See the License for the specific language governing permissions and # limitations under the License. -apiVersion: eventing.knative.dev/v1 -kind: Broker -metadata: - name: default -spec: {} +resources: +- 01-postgres.yaml +- 02-sonataflow_platform.yaml + +generatorOptions: + disableNameSuffixHash: true + +secretGenerator: + - name: postgres-secrets + literals: + - POSTGRES_USER=sonataflow + - POSTGRES_PASSWORD=sonataflow + - POSTGRES_DATABASE=sonataflow + - PGDATA=/var/lib/pgsql/data/userdata + +sortOptions: + order: fifo diff --git a/test/testdata/platform/services/preview/knative/platform-level-broker/sonataflow/04-sonataflow_callbackstatetimeouts.sw.yaml b/test/testdata/platform/services/preview/knative/platform-level-broker/sonataflow/04-sonataflow_callbackstatetimeouts.sw.yaml new file mode 100644 index 000000000..a76ac23fb --- /dev/null +++ b/test/testdata/platform/services/preview/knative/platform-level-broker/sonataflow/04-sonataflow_callbackstatetimeouts.sw.yaml @@ -0,0 +1,81 @@ +# Copyright 2024 Apache Software Foundation (ASF) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: sonataflow.org/v1alpha08 +kind: SonataFlow +metadata: + name: callbackstatetimeouts + annotations: + sonataflow.org/description: Callback State Timeouts Example k8s + sonataflow.org/version: 0.0.1 + sonataflow.org/profile: preview +spec: + flow: + start: PrintStartMessage + events: + - name: callbackEvent + source: '' + type: callback_event_type + functions: + - name: systemOut + type: custom + operation: sysout + states: + - name: PrintStartMessage + type: operation + actions: + - name: printSystemOut + functionRef: + refName: systemOut + arguments: + message: "${\"callback-state-timeouts: \" + $WORKFLOW.instanceId + \" has started.\"}" + transition: CallbackState + - name: CallbackState + type: callback + action: + name: callbackAction + functionRef: + refName: systemOut + arguments: + message: "${\"callback-state-timeouts: \" + $WORKFLOW.instanceId + \" has executed the callbackFunction.\"}" + eventRef: callbackEvent + transition: CheckEventArrival + timeouts: + eventTimeout: PT30S + - name: CheckEventArrival + type: switch + dataConditions: + - condition: "${ .eventData != null }" + transition: EventArrived + defaultCondition: + transition: EventNotArrived + - name: EventArrived + type: inject + data: + exitMessage: "The callback event has arrived." + transition: PrintExitMessage + - name: EventNotArrived + type: inject + data: + exitMessage: "The callback event has not arrived, and the timeout has overdue." + transition: PrintExitMessage + - name: PrintExitMessage + type: operation + actions: + - name: printSystemOut + functionRef: + refName: systemOut + arguments: + message: "${\"callback-state-timeouts: \" + $WORKFLOW.instanceId + \" has finalized. \" + .exitMessage + \" eventData: \" + .eventData}" + end: true diff --git a/test/testdata/platform/services/preview/knative/service-level-broker/01-postgres.yaml b/test/testdata/platform/services/preview/knative/service-level-broker/01-postgres.yaml new file mode 100644 index 000000000..662de4c7b --- /dev/null +++ b/test/testdata/platform/services/preview/knative/service-level-broker/01-postgres.yaml @@ -0,0 +1,86 @@ +# Copyright 2024 Apache Software Foundation (ASF) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + labels: + app.kubernetes.io/name: postgres + name: postgres-pvc +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 1Gi +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app.kubernetes.io/name: postgres + name: postgres +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: postgres + template: + metadata: + labels: + app.kubernetes.io/name: postgres + spec: + containers: + - name: postgres + image: postgres:13.2-alpine + imagePullPolicy: 'IfNotPresent' + ports: + - containerPort: 5432 + volumeMounts: + - name: storage + mountPath: /var/lib/postgresql/data + envFrom: + - secretRef: + name: postgres-secrets + readinessProbe: + exec: + command: ["pg_isready"] + initialDelaySeconds: 15 + timeoutSeconds: 2 + livenessProbe: + exec: + command: ["pg_isready"] + initialDelaySeconds: 15 + timeoutSeconds: 2 + resources: + limits: + memory: "256Mi" + cpu: "500m" + volumes: + - name: storage + persistentVolumeClaim: + claimName: postgres-pvc +--- +apiVersion: v1 +kind: Service +metadata: + labels: + app.kubernetes.io/name: postgres + name: postgres +spec: + selector: + app.kubernetes.io/name: postgres + ports: + - port: 5432 diff --git a/test/testdata/platform/services/preview/knative/service-level-broker/02-sonataflow_platform.yaml b/test/testdata/platform/services/preview/knative/service-level-broker/02-sonataflow_platform.yaml new file mode 100644 index 000000000..14793f28a --- /dev/null +++ b/test/testdata/platform/services/preview/knative/service-level-broker/02-sonataflow_platform.yaml @@ -0,0 +1,89 @@ +# Copyright 2024 Apache Software Foundation (ASF) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: sonataflow.org/v1alpha08 +kind: SonataFlowPlatform +metadata: + name: sonataflow-platform +spec: + eventing: + broker: + ref: + apiVersion: eventing.knative.dev/v1 + kind: Broker + name: default + build: + config: + strategyOptions: + KanikoBuildCacheEnabled: "true" + services: + dataIndex: + enabled: true + source: + ref: + apiVersion: eventing.knative.dev/v1 + kind: Broker + name: di-source + persistence: + postgresql: + jdbcUrl: jdbc:postgresql://postgres:5432/sonataflow?currentSchema=data-index-service + secretRef: + name: postgres-secrets + userKey: POSTGRES_USER + passwordKey: POSTGRES_PASSWORD + podTemplate: + initContainers: + - name: init-postgres + image: registry.access.redhat.com/ubi9/ubi-micro:latest + imagePullPolicy: IfNotPresent + command: [ 'sh', '-c', 'until (echo 1 > /dev/tcp/postgres.$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local/5432) >/dev/null 2>&1; do echo "Waiting for postgres server"; sleep 3; done;' ] + container: + env: + - name: MY_CUSTOM_VARIABLE + value: "OKAY" + - name: QUARKUS_DATASOURCE_PASSWORD +# This value should not be used since it's already set by the operator. If used, the test will fail. + value: "SHOULD_NOT_BE_USED" + jobService: + enabled: true + source: + ref: + apiVersion: eventing.knative.dev/v1 + kind: Broker + name: js-source + sink: + ref: + apiVersion: eventing.knative.dev/v1 + kind: Broker + name: js-sink + persistence: + postgresql: + jdbcUrl: jdbc:postgresql://postgres:5432/sonataflow?currentSchema=jobs-service + secretRef: + name: postgres-secrets + userKey: POSTGRES_USER + passwordKey: POSTGRES_PASSWORD + podTemplate: + initContainers: + - name: init-postgres + image: registry.access.redhat.com/ubi9/ubi-micro:latest + imagePullPolicy: IfNotPresent + command: [ 'sh', '-c', 'until (echo 1 > /dev/tcp/postgres.$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local/5432) >/dev/null 2>&1; do echo "Waiting for postgres server"; sleep 3; done;' ] + container: + env: + - name: MY_CUSTOM_VARIABLE + value: "OKAY" + - name: QUARKUS_DATASOURCE_PASSWORD +# This value should not be used since it's already set by the operator. If used, the test will fail. + value: "SHOULD_NOT_BE_USED" \ No newline at end of file diff --git a/test/testdata/workflow/persistence/from_platform_overwritten_by_service/00-broker.yaml b/test/testdata/platform/services/preview/knative/service-level-broker/broker/00-broker.yaml similarity index 63% rename from test/testdata/workflow/persistence/from_platform_overwritten_by_service/00-broker.yaml rename to test/testdata/platform/services/preview/knative/service-level-broker/broker/00-broker.yaml index 6152f24d7..74fb9f790 100644 --- a/test/testdata/workflow/persistence/from_platform_overwritten_by_service/00-broker.yaml +++ b/test/testdata/platform/services/preview/knative/service-level-broker/broker/00-broker.yaml @@ -16,5 +16,32 @@ apiVersion: eventing.knative.dev/v1 kind: Broker metadata: name: default + labels: + test: test-e2e spec: {} +--- +apiVersion: eventing.knative.dev/v1 +kind: Broker +metadata: + name: di-source + labels: + test: test-e2e +spec: {} +--- +apiVersion: eventing.knative.dev/v1 +kind: Broker +metadata: + name: js-sink + labels: + test: test-e2e +spec: {} +--- +apiVersion: eventing.knative.dev/v1 +kind: Broker +metadata: + name: js-source + labels: + test: test-e2e +spec: {} + diff --git a/test/testdata/workflow/persistence/from_platform_without_di_and_js_services/00-broker.yaml b/test/testdata/platform/services/preview/knative/service-level-broker/kustomization.yaml similarity index 63% rename from test/testdata/workflow/persistence/from_platform_without_di_and_js_services/00-broker.yaml rename to test/testdata/platform/services/preview/knative/service-level-broker/kustomization.yaml index 6152f24d7..d3fd127c7 100644 --- a/test/testdata/workflow/persistence/from_platform_without_di_and_js_services/00-broker.yaml +++ b/test/testdata/platform/services/preview/knative/service-level-broker/kustomization.yaml @@ -12,9 +12,21 @@ # See the License for the specific language governing permissions and # limitations under the License. -apiVersion: eventing.knative.dev/v1 -kind: Broker -metadata: - name: default -spec: {} +resources: +- 01-postgres.yaml +- 02-sonataflow_platform.yaml + +generatorOptions: + disableNameSuffixHash: true + +secretGenerator: + - name: postgres-secrets + literals: + - POSTGRES_USER=sonataflow + - POSTGRES_PASSWORD=sonataflow + - POSTGRES_DATABASE=sonataflow + - PGDATA=/var/lib/pgsql/data/userdata + +sortOptions: + order: fifo diff --git a/test/testdata/workflow/persistence/by_service/02-sonataflow_platform.yaml b/test/testdata/workflow/persistence/by_service/02-sonataflow_platform.yaml index 360daab2b..082b2ef97 100644 --- a/test/testdata/workflow/persistence/by_service/02-sonataflow_platform.yaml +++ b/test/testdata/workflow/persistence/by_service/02-sonataflow_platform.yaml @@ -17,12 +17,6 @@ kind: SonataFlowPlatform metadata: name: sonataflow-platform spec: - eventing: - broker: - ref: - apiVersion: eventing.knative.dev/v1 - kind: Broker - name: default build: config: strategyOptions: diff --git a/test/testdata/workflow/persistence/by_service/kustomization.yaml b/test/testdata/workflow/persistence/by_service/kustomization.yaml index f494b37c0..2aaac5b14 100644 --- a/test/testdata/workflow/persistence/by_service/kustomization.yaml +++ b/test/testdata/workflow/persistence/by_service/kustomization.yaml @@ -13,7 +13,6 @@ # limitations under the License. resources: -- 00-broker.yaml - 01-postgres.yaml - 02-sonataflow_platform.yaml - 03-configmap_callbackstatetimeouts-props.yaml diff --git a/test/testdata/workflow/persistence/from_platform_overwritten_by_service/02-sonataflow_platform.yaml b/test/testdata/workflow/persistence/from_platform_overwritten_by_service/02-sonataflow_platform.yaml index a3810b3e1..733b0b952 100644 --- a/test/testdata/workflow/persistence/from_platform_overwritten_by_service/02-sonataflow_platform.yaml +++ b/test/testdata/workflow/persistence/from_platform_overwritten_by_service/02-sonataflow_platform.yaml @@ -17,12 +17,6 @@ kind: SonataFlowPlatform metadata: name: sonataflow-platform spec: - eventing: - broker: - ref: - apiVersion: eventing.knative.dev/v1 - kind: Broker - name: default persistence: postgresql: secretRef: diff --git a/test/testdata/workflow/persistence/from_platform_overwritten_by_service/kustomization.yaml b/test/testdata/workflow/persistence/from_platform_overwritten_by_service/kustomization.yaml index 1161b5402..b7f587bcc 100644 --- a/test/testdata/workflow/persistence/from_platform_overwritten_by_service/kustomization.yaml +++ b/test/testdata/workflow/persistence/from_platform_overwritten_by_service/kustomization.yaml @@ -13,7 +13,6 @@ # limitations under the License. resources: -- 00-broker.yaml - 01-postgres.yaml - 02-sonataflow_platform.yaml - 03-sonataflow_callbackstatetimeouts.sw.yaml diff --git a/test/testdata/workflow/persistence/from_platform_with_no_persistence_required/02-sonataflow_platform.yaml b/test/testdata/workflow/persistence/from_platform_with_no_persistence_required/02-sonataflow_platform.yaml index ce2f7f388..817ce0c31 100644 --- a/test/testdata/workflow/persistence/from_platform_with_no_persistence_required/02-sonataflow_platform.yaml +++ b/test/testdata/workflow/persistence/from_platform_with_no_persistence_required/02-sonataflow_platform.yaml @@ -17,12 +17,6 @@ kind: SonataFlowPlatform metadata: name: sonataflow-platform spec: - eventing: - broker: - ref: - apiVersion: eventing.knative.dev/v1 - kind: Broker - name: default persistence: postgresql: secretRef: diff --git a/test/testdata/workflow/persistence/from_platform_with_no_persistence_required/kustomization.yaml b/test/testdata/workflow/persistence/from_platform_with_no_persistence_required/kustomization.yaml index f7b76b2d2..0a410cde1 100644 --- a/test/testdata/workflow/persistence/from_platform_with_no_persistence_required/kustomization.yaml +++ b/test/testdata/workflow/persistence/from_platform_with_no_persistence_required/kustomization.yaml @@ -13,7 +13,6 @@ # limitations under the License. resources: -- 00-broker.yaml - 01-postgres.yaml - 02-sonataflow_platform.yaml - 03-sonataflow_callbackstatetimeouts-no-persistence.sw.yaml diff --git a/test/testdata/workflow/persistence/from_platform_without_di_and_js_services/02-sonataflow_platform.yaml b/test/testdata/workflow/persistence/from_platform_without_di_and_js_services/02-sonataflow_platform.yaml index 3b4f119d0..b5ac4e161 100644 --- a/test/testdata/workflow/persistence/from_platform_without_di_and_js_services/02-sonataflow_platform.yaml +++ b/test/testdata/workflow/persistence/from_platform_without_di_and_js_services/02-sonataflow_platform.yaml @@ -17,12 +17,6 @@ kind: SonataFlowPlatform metadata: name: sonataflow-platform spec: - eventing: - broker: - ref: - apiVersion: eventing.knative.dev/v1 - kind: Broker - name: default persistence: postgresql: secretRef: diff --git a/test/testdata/workflow/persistence/from_platform_without_di_and_js_services/kustomization.yaml b/test/testdata/workflow/persistence/from_platform_without_di_and_js_services/kustomization.yaml index 1161b5402..b7f587bcc 100644 --- a/test/testdata/workflow/persistence/from_platform_without_di_and_js_services/kustomization.yaml +++ b/test/testdata/workflow/persistence/from_platform_without_di_and_js_services/kustomization.yaml @@ -13,7 +13,6 @@ # limitations under the License. resources: -- 00-broker.yaml - 01-postgres.yaml - 02-sonataflow_platform.yaml - 03-sonataflow_callbackstatetimeouts.sw.yaml diff --git a/test/yaml.go b/test/yaml.go index a77f78934..a0fbd8bb6 100644 --- a/test/yaml.go +++ b/test/yaml.go @@ -285,6 +285,10 @@ func GetSonataFlowE2EPlatformServicesDirectory() string { return filepath.Join(getTestDataDir(), "platform", "services") } +func GetSonataFlowE2EPlatformServicesKnativeDirectory(subdir string) string { + return filepath.Join(getTestDataDir(), "platform", "services", "preview", "knative", subdir) +} + func GetSonataFlowE2EPlatformNoServicesDirectory() string { return filepath.Join(getTestDataDir(), "platform", "noservices") } diff --git a/version/version.go b/version/version.go index eb0eb353d..bbc04934a 100644 --- a/version/version.go +++ b/version/version.go @@ -31,7 +31,7 @@ const ( // When released, this version should reflect the `major.minor` version in the registry. // For example, docker.io/apache/incubator-kie-sonataflow-operator:10.0 tagVersion = "main" - serviceTagVersion = "weekly-latest" + serviceTagVersion = "main-20240728" // weekly-latest ) // GetOperatorVersion gets the current binary version of the operator. Do not use it to compose image tags!