Skip to content

Commit

Permalink
[issue-368] knative integration with DataIndex and JobService: fix e2…
Browse files Browse the repository at this point in the history
…e test errors and use temp images for DI and JS

[issue-368] knative integration with DataIndex and JobService: fix e2e test errors and use temp images for DI and JS

[issue-368] knative integration with DataIndex and JobService: fix e2e test errors and use temp images for DI and JS

[issue-368] knative integration with DataIndex and JobService: fix e2e test errors and use temp images for DI and JS

[issue-368] knative integration with DataIndex and JobService: fix e2e test errors and use temp images for DI and JS

[issue-368] knative integration with DataIndex and JobService: fix e2e test errors and use temp images for DI and JS

[issue-368] knative integration with DataIndex and JobService: fix e2e test errors and use temp images for DI and JS

[issue-368] knative integration with DataIndex and JobService: fix e2e test errors and use temp images for DI and JS
  • Loading branch information
jianrongzhang89 committed Aug 6, 2024
1 parent 6bc17ba commit 1946ca7
Show file tree
Hide file tree
Showing 21 changed files with 450 additions and 95 deletions.
4 changes: 2 additions & 2 deletions hack/ci/create-kind-cluster-with-registry.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ nodes:
kind: InitConfiguration
nodeRegistration:
kubeletExtraArgs:
system-reserved: memory=1Gi
system-reserved: memory=2Gi
- role: worker
kubeadmConfigPatches:
- |
kind: InitConfiguration
nodeRegistration:
kubeletExtraArgs:
system-reserved: memory=2Gi
system-reserved: memory=4Gi
containerdConfigPatches:
- |-
[plugins."io.containerd.grpc.v1.cri".registry]
Expand Down
76 changes: 75 additions & 1 deletion test/e2e/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"fmt"
"net/url"
"os/exec"
"regexp"
"strconv"
"strings"
"time"

"github.com/apache/incubator-kie-kogito-serverless-operator/test/utils"

Expand Down Expand Up @@ -139,7 +141,7 @@ func verifyWorkflowIsInRunningState(workflowName string, targetNamespace string)
}

func verifyWorkflowIsAddressable(workflowName string, targetNamespace string) bool {
cmd := exec.Command("kubectl", "get", "workflow", workflowName, "-n", targetNamespace, "-o", "jsonpath={.status.address.url}")
cmd := exec.Command("kubectl", "get", "workflow", workflowName, "-n", targetNamespace, "-ojsonpath={.status.address.url}")
if response, err := utils.Run(cmd); err != nil {
GinkgoWriter.Println(fmt.Errorf("failed to check if greeting workflow is running: %v", err))
return false
Expand All @@ -157,3 +159,75 @@ func verifyWorkflowIsAddressable(workflowName string, targetNamespace string) bo
return false
}
}

func verifySchemaMigration(data, name string) bool {
matched1, err := regexp.MatchString(fmt.Sprintf("Successfully applied \\d migrations to schema \"%s\"", name), data)
if err != nil {
GinkgoWriter.Println(fmt.Errorf("string match error:%v", err))
return false
}
matched2, err := regexp.MatchString("Successfully validated \\d (migration|migrations)", data)
if err != nil {
GinkgoWriter.Println(fmt.Errorf("string match error:%v", err))
return false
}
GinkgoWriter.Println(fmt.Sprintf("verifying schemaMigration, logs=%v", data))
return (matched1 && strings.Contains(data, fmt.Sprintf("Creating schema \"%s\"", name)) &&
strings.Contains(data, fmt.Sprintf("Migrating schema \"%s\" to version", name))) ||
(matched2 && strings.Contains(data, fmt.Sprintf("Current version of schema \"%s\"", name)) &&
strings.Contains(data, fmt.Sprintf("Schema \"%s\" is up to date. No migration necessary", name))) ||
(strings.Contains(data, fmt.Sprintf("Creating schema \"%s\"", name)) &&
strings.Contains(data, fmt.Sprintf("Current version of schema \"%s\"", name)) &&
strings.Contains(data, fmt.Sprintf("Schema \"%s\" is up to date. No migration necessary", name)))
}

func verifyKSinkInjection(label, ns string) bool {
GinkgoWriter.Println(fmt.Sprintf("failed to get pod for label: %v, ns=%s", label, ns))
cmd := exec.Command("kubectl", "get", "pod", "-n", ns, "-l", label, "-o", "jsonpath={.items[*].metadata.name}")
out, err := utils.Run(cmd)
if err != nil {
GinkgoWriter.Println(fmt.Errorf("failed to get pods: %v", err))
return false
}
podNames := strings.Fields(string(out))
if len(podNames) == 0 {
GinkgoWriter.Println("no pods found to check K_SINK")
return false // pods haven't created yet
}
GinkgoWriter.Println(fmt.Sprintf("pods found: %s", podNames))
for _, pod := range podNames {
cmd = exec.Command("kubectl", "get", "pod", pod, "-n", ns, "-o", "json")
out, err := utils.Run(cmd)
if err != nil {
GinkgoWriter.Println(fmt.Errorf("failed to get pod: %v", err))
return false
}
GinkgoWriter.Println(string(out))
if !strings.Contains(string(out), "K_SINK") { // The pod does not have K_SINK injected
GinkgoWriter.Println(fmt.Sprintf("Pod does not have K_SINK injected: %s", string(out)))
return false
}
}
return true
}

func waitForPodRestartCompletion(label, ns string) {
EventuallyWithOffset(1, func() bool {
GinkgoWriter.Println(fmt.Sprintf("failed to get pod for label: %v, ns=%s", label, ns))
cmd := exec.Command("kubectl", "get", "pod", "-n", ns, "-l", label, "-o", "jsonpath={.items[*].metadata.name}")
out, err := utils.Run(cmd)
if err != nil {
GinkgoWriter.Println(fmt.Errorf("failed to get pods: %v", err))
return false
}
podNames := strings.Fields(string(out))
if len(podNames) == 0 {
GinkgoWriter.Println("no pods found")
return false // pods haven't created yet
} else if len(podNames) > 1 {
GinkgoWriter.Println("multiple pods found")
return false // multiple pods found, wait for other pods to terminate
}
return true
}, 1*time.Minute, 5).Should(BeTrue())
}
50 changes: 47 additions & 3 deletions test/e2e/platform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,20 @@ var _ = Describe("Validate the persistence", Ordered, func() {
Expect(err).NotTo(HaveOccurred())
By("Wait for SonataFlowPlatform CR to complete deployment")
// wait for service deployments to be ready
EventuallyWithOffset(1, func() error {
EventuallyWithOffset(1, func() bool {
cmd = exec.Command("kubectl", "wait", "pod", "-n", targetNamespace, "-l", "app.kubernetes.io/name in (jobs-service,data-index-service)", "--for", "condition=Ready", "--timeout=5s")
_, err = utils.Run(cmd)
return err
}, 25*time.Minute, 5).Should(Succeed())
if err != nil {
return false
}
if profile == metadata.PreviewProfile.String() {
GinkgoWriter.Println("waitForPodRestartCompletion")
waitForPodRestartCompletion("app.kubernetes.io/name=jobs-service", targetNamespace)
GinkgoWriter.Println("waitForPodRestartCompletion done")
return true
}
return true
}, 25*time.Minute, 5).Should(BeTrue())
By("Evaluate status of service's health endpoint")
cmd = exec.Command("kubectl", "get", "pod", "-l", "app.kubernetes.io/name in (jobs-service,data-index-service)", "-n", targetNamespace, "-ojsonpath={.items[*].metadata.name}")
output, err := utils.Run(cmd)
Expand Down Expand Up @@ -156,4 +165,39 @@ var _ = Describe("Validate the persistence", Ordered, func() {
Entry("and both Job Service and Data Index using the one defined in each service, discarding the one from the platform CR", test.GetSonataFlowE2EPlatformPersistenceSampleDataDirectory("overwritten_by_services")),
)

DescribeTable("when deploying a SonataFlowPlatform CR with brokers", func(testcaseDir string) {
By("Deploy the CR")
var manifests []byte
EventuallyWithOffset(1, func() error {
var err error
cmd := exec.Command("kubectl", "kustomize", testcaseDir)
manifests, err = utils.Run(cmd)
return err
}, time.Minute, time.Second).Should(Succeed())
cmd := exec.Command("kubectl", "create", "-n", targetNamespace, "-f", "-")
cmd.Stdin = bytes.NewBuffer(manifests)
_, err := utils.Run(cmd)
Expect(err).NotTo(HaveOccurred())
By("Wait for SonatatFlowPlatform CR to complete deployment")
// wait for service deployments to be ready
EventuallyWithOffset(1, func() error {
cmd = exec.Command("kubectl", "wait", "pod", "-n", targetNamespace, "-l", "app.kubernetes.io/name in (jobs-service,data-index-service)", "--for", "condition=Ready", "--timeout=5s")
_, err = utils.Run(cmd)
return err
}, 10*time.Minute, 5).Should(Succeed())

GinkgoWriter.Println("waitForPodRestartCompletion")
waitForPodRestartCompletion("app.kubernetes.io/name=jobs-service", targetNamespace)
GinkgoWriter.Println("waitForPodRestartCompletion done")

By("Evaluate status of all service's health endpoint")
cmd = exec.Command("kubectl", "get", "pod", "-l", "app.kubernetes.io/name in (jobs-service,data-index-service)", "-n", targetNamespace, "-ojsonpath={.items[*].metadata.name}")
output, err := utils.Run(cmd)
Expect(err).NotTo(HaveOccurred())
for _, pn := range strings.Split(string(output), " ") {
verifyHealthStatusInPod(pn, targetNamespace)
}
},
Entry("and both Job Service and Data Index have service level brokers", test.GetSonataFlowE2EPlatformServicesKnativeDirectory("service-level-broker")),
)
})
43 changes: 28 additions & 15 deletions test/e2e/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ import (
. "github.com/onsi/gomega"
)

const (
workflowAppLabel = "sonataflow.org/workflow-app"
)

var _ = Describe("SonataFlow Operator", Ordered, func() {

var targetNamespace string
Expand Down Expand Up @@ -174,7 +178,7 @@ var _ = Describe("Validate the persistence ", Ordered, func() {

})

DescribeTable("when deploying a SonataFlow CR with PostgreSQL persistence", func(testcaseDir string, withPersistence bool) {
DescribeTable("when deploying a SonataFlow CR with PostgreSQL persistence", func(testcaseDir string, withPersistence bool, waitKSinkInjection bool) {
By("Deploy the CR")
var manifests []byte
EventuallyWithOffset(1, func() error {
Expand All @@ -189,15 +193,24 @@ var _ = Describe("Validate the persistence ", Ordered, func() {
Expect(err).NotTo(HaveOccurred())
By("Wait for SonatatFlow CR to complete deployment")
// wait for service deployments to be ready
EventuallyWithOffset(1, func() error {
cmd = exec.Command("kubectl", "wait", "pod", "-n", ns, "-l", "sonataflow.org/workflow-app", "--for", "condition=Ready", "--timeout=5s")
EventuallyWithOffset(1, func() bool {
cmd = exec.Command("kubectl", "wait", "pod", "-n", ns, "-l", workflowAppLabel, "--for", "condition=Ready", "--timeout=5s")
out, err := utils.Run(cmd)
if err != nil {
return false
}
GinkgoWriter.Printf("%s\n", string(out))
return err
}, 15*time.Minute, 5).Should(Succeed())
if !waitKSinkInjection {
return true
}
GinkgoWriter.Println("waitForPodRestartCompletion")
waitForPodRestartCompletion(workflowAppLabel, ns)
GinkgoWriter.Println("waitForPodRestartCompletion done")
return true
}, 25*time.Minute, 5).Should(BeTrue())

By("Evaluate status of the workflow's pod database connection health endpoint")
cmd = exec.Command("kubectl", "get", "pod", "-l", "sonataflow.org/workflow-app", "-n", ns, "-ojsonpath={.items[*].metadata.name}")
cmd = exec.Command("kubectl", "get", "pod", "-l", workflowAppLabel, "-n", ns, "-ojsonpath={.items[*].metadata.name}")
output, err := utils.Run(cmd)
Expect(err).NotTo(HaveOccurred())
EventuallyWithOffset(1, func() bool {
Expand Down Expand Up @@ -232,7 +245,7 @@ var _ = Describe("Validate the persistence ", Ordered, func() {
return false
}, 1*time.Minute).Should(BeTrue())
// Persistence initialization checks
cmd = exec.Command("kubectl", "get", "pod", "-l", "sonataflow.org/workflow-app", "-n", ns, "-ojsonpath={.items[*].metadata.name}")
cmd = exec.Command("kubectl", "get", "pod", "-l", workflowAppLabel, "-n", ns, "-ojsonpath={.items[*].metadata.name}")
output, err = utils.Run(cmd)
Expect(err).NotTo(HaveOccurred())
podName := string(output)
Expand All @@ -244,9 +257,9 @@ var _ = Describe("Validate the persistence ", Ordered, func() {
By("Validate that the workflow persistence was properly initialized")
Expect(logs).Should(ContainSubstring("Flyway Community Edition"))
Expect(logs).Should(ContainSubstring("Database: jdbc:postgresql://postgres.%s:5432", ns))
Expect(logs).Should(ContainSubstring("Creating schema \"callbackstatetimeouts\""))
Expect(logs).Should(ContainSubstring("Migrating schema \"callbackstatetimeouts\" to version"))
Expect(logs).Should(MatchRegexp("Successfully applied \\d migrations to schema \"callbackstatetimeouts\""))
result := verifySchemaMigration(logs, "callbackstatetimeouts")
GinkgoWriter.Println(fmt.Sprintf("verifySchemaMigration: %v", result))
Expect(result).Should(BeTrue())
Expect(logs).Should(ContainSubstring("Profile prod activated"))
} else {
By("Validate that the workflow has no persistence")
Expand All @@ -255,11 +268,11 @@ var _ = Describe("Validate the persistence ", Ordered, func() {
Expect(logs).Should(ContainSubstring("Profile prod activated"))
}
},
Entry("defined in the workflow from an existing kubernetes service as a reference", test.GetSonataFlowE2EWorkflowPersistenceSampleDataDirectory("by_service"), true),
Entry("defined in the workflow and from the sonataflow platform", test.GetSonataFlowE2EWorkflowPersistenceSampleDataDirectory("from_platform_overwritten_by_service"), true),
Entry("defined from the sonataflow platform as reference and with DI and JS", test.GetSonataFlowE2EWorkflowPersistenceSampleDataDirectory("from_platform_with_di_and_js_services"), true),
Entry("defined from the sonataflow platform as reference and without DI and JS", test.GetSonataFlowE2EWorkflowPersistenceSampleDataDirectory("from_platform_without_di_and_js_services"), true),
Entry("defined from the sonataflow platform as reference but not required by the workflow", test.GetSonataFlowE2EWorkflowPersistenceSampleDataDirectory("from_platform_with_no_persistence_required"), false),
Entry("defined in the workflow from an existing kubernetes service as a reference", test.GetSonataFlowE2EWorkflowPersistenceSampleDataDirectory("by_service"), true, false),
Entry("defined in the workflow and from the sonataflow platform", test.GetSonataFlowE2EWorkflowPersistenceSampleDataDirectory("from_platform_overwritten_by_service"), true, false),
Entry("defined from the sonataflow platform as reference and with DI and JS", test.GetSonataFlowE2EWorkflowPersistenceSampleDataDirectory("from_platform_with_di_and_js_services"), true, true),
Entry("defined from the sonataflow platform as reference and without DI and JS", test.GetSonataFlowE2EWorkflowPersistenceSampleDataDirectory("from_platform_without_di_and_js_services"), true, false),
Entry("defined from the sonataflow platform as reference but not required by the workflow", test.GetSonataFlowE2EWorkflowPersistenceSampleDataDirectory("from_platform_with_no_persistence_required"), false, false),
)

})
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,23 @@ kind: Broker
metadata:
name: default
spec: {}
---
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
name: di-source
spec: {}
---
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
name: js-sink
spec: {}
---
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
name: js-source
spec: {}


Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Copyright 2024 Apache Software Foundation (ASF)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
labels:
app.kubernetes.io/name: postgres
name: postgres-pvc
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi
---
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app.kubernetes.io/name: postgres
name: postgres
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: postgres
template:
metadata:
labels:
app.kubernetes.io/name: postgres
spec:
containers:
- name: postgres
image: postgres:13.2-alpine
imagePullPolicy: 'IfNotPresent'
ports:
- containerPort: 5432
volumeMounts:
- name: storage
mountPath: /var/lib/postgresql/data
envFrom:
- secretRef:
name: postgres-secrets
readinessProbe:
exec:
command: ["pg_isready"]
initialDelaySeconds: 15
timeoutSeconds: 2
livenessProbe:
exec:
command: ["pg_isready"]
initialDelaySeconds: 15
timeoutSeconds: 2
resources:
limits:
memory: "256Mi"
cpu: "500m"
volumes:
- name: storage
persistentVolumeClaim:
claimName: postgres-pvc
---
apiVersion: v1
kind: Service
metadata:
labels:
app.kubernetes.io/name: postgres
name: postgres
spec:
selector:
app.kubernetes.io/name: postgres
ports:
- port: 5432
Loading

0 comments on commit 1946ca7

Please sign in to comment.