Skip to content

Commit

Permalink
[issue-368] knative integration with DataIndex and JobService: fix e2…
Browse files Browse the repository at this point in the history
…e test errors and use temp images for DI and JS
  • Loading branch information
jianrongzhang89 committed Aug 13, 2024
1 parent 6bc17ba commit d2945f6
Show file tree
Hide file tree
Showing 29 changed files with 913 additions and 95 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ generate-all: generate generate-deploy bundle addheaders vet fmt

.PHONY: test-e2e # You will need to have a Minikube/Kind cluster up in running to run this target, and run container-builder before the test
test-e2e:
go test ./test/e2e/* -v -ginkgo.v -ginkgo.no-color -ginkgo.junit-report=./e2e-test-report.xml -timeout 75m
go test ./test/e2e/* -v -ginkgo.v -ginkgo.no-color -ginkgo.timeout=90m -ginkgo.junit-report=./e2e-test-report.xml -timeout 90m

.PHONY: before-pr
before-pr: test generate-all
Expand Down
18 changes: 18 additions & 0 deletions controllers/knative/knative.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
clienteventingv1 "knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
Expand All @@ -58,6 +59,7 @@ const (
knativeEventingGroup = "eventing.knative.dev"
knativeEventingAPIVersion = "eventing.knative.dev/v1"
knativeBrokerKind = "Broker"
knativeSinkProvided = "SinkProvided"
)

func GetKnativeServingClient(cfg *rest.Config) (clientservingv1.ServingV1Interface, error) {
Expand Down Expand Up @@ -270,3 +272,19 @@ func MapTriggerToPlatformRequests(ctx context.Context, object client.Object) []r
}
return nil
}

// Does the sinkbinding completed K_SINK injection?
func CheckKSinkInjected(name, namespace string) (bool, error) {
sb := &sourcesv1.SinkBinding{}
if err := utils.GetClient().Get(context.TODO(), types.NamespacedName{Name: fmt.Sprintf("%s-sb", name), Namespace: namespace}, sb); err != nil {
if errors.IsNotFound(err) {
return false, nil // deployment hasn't been created yet
}
return false, err
}
cond := sb.Status.GetCondition(apis.ConditionType(knativeSinkProvided))
if cond != nil && cond.Status == corev1.ConditionTrue {
return true, nil
}
return false, nil // K_SINK has not been injected yet
}
58 changes: 42 additions & 16 deletions controllers/platform/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package platform

import (
"context"
"fmt"

operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client"
Expand Down Expand Up @@ -144,6 +145,13 @@ func createOrUpdateDeployment(ctx context.Context, client client.Client, platfor
serviceContainer.Name = psh.GetContainerName()

replicas := psh.GetReplicaCount()
kSinkInjected, err := psh.CheckKSinkInjected()
if err != nil {
return nil
}
if !kSinkInjected {
replicas = 0 // Wait for K_SINK injection
}
lbl, selectorLbl := getLabels(platform, psh)
serviceDeploymentSpec := appsv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
Expand Down Expand Up @@ -200,7 +208,6 @@ func createOrUpdateDeployment(ctx context.Context, client client.Client, platfor
} else {
klog.V(log.I).InfoS("Deployment successfully reconciled", "operation", op)
}

return nil
}

Expand Down Expand Up @@ -305,42 +312,61 @@ func createOrUpdateKnativeResources(ctx context.Context, client client.Client, p
if err != nil {
return err
}
// Create or update triggers
for _, obj := range objs {
if platform.Namespace == obj.GetNamespace() {
if err := controllerutil.SetControllerReference(platform, obj, client.Scheme()); err != nil {
return err
}
}
if triggerDef, ok := obj.(*eventingv1.Trigger); ok {
if platform.Namespace == obj.GetNamespace() {
if err := controllerutil.SetControllerReference(platform, obj, client.Scheme()); err != nil {
return err
}
} else {
// This is for Knative trigger in a different namespace
// Set the finalizer for trigger cleanup when the platform is deleted
if err := setSonataFlowPlatformFinalizer(ctx, client, platform); err != nil {
return err
}
}
trigger := &eventingv1.Trigger{
ObjectMeta: triggerDef.ObjectMeta,
}
op, err := controllerutil.CreateOrUpdate(ctx, client, trigger, func() error {
_, err := controllerutil.CreateOrUpdate(ctx, client, trigger, func() error {
trigger.Spec = triggerDef.Spec
return nil
})
if err != nil {
return err
}
if op == controllerutil.OperationResultCreated { // trigger has been successfully created
addToSonataFlowPlatformTriggerList(platform, trigger)
if platform.Namespace != obj.GetNamespace() {
// This is for Knative trigger in a different namespace
// Set the finalizer for trigger cleanup when the platform is deleted
return setSonataFlowPlatformFinalizer(ctx, client, platform)
}
addToSonataFlowPlatformTriggerList(platform, trigger)
}
}

if err := client.Status().Update(ctx, platform); err != nil {
return err
}

// Create or update sinkbindings
for _, obj := range objs {
if sbDef, ok := obj.(*sourcesv1.SinkBinding); ok {
if err := controllerutil.SetControllerReference(platform, obj, client.Scheme()); err != nil {
return err
}
} else if sbDef, ok := obj.(*sourcesv1.SinkBinding); ok {
sinkBinding := &sourcesv1.SinkBinding{
ObjectMeta: sbDef.ObjectMeta,
}
_, err := controllerutil.CreateOrUpdate(ctx, client, sinkBinding, func() error {
_, err = controllerutil.CreateOrUpdate(ctx, client, sinkBinding, func() error {
sinkBinding.Spec = sbDef.Spec
return nil
})
if err != nil {
return err
}
kSinkInjected, err := psh.CheckKSinkInjected()
if err != nil {
return err
}
if !kSinkInjected {
return fmt.Errorf("waiting for K_SINK injection for %s to complete", psh.GetServiceName())
}
}
}
return nil
Expand Down
31 changes: 21 additions & 10 deletions controllers/platform/services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ import (
const (
quarkusHibernateORMDatabaseGeneration string = "QUARKUS_HIBERNATE_ORM_DATABASE_GENERATION"
quarkusFlywayMigrateAtStart string = "QUARKUS_FLYWAY_MIGRATE_AT_START"
pathProcesses string = "/processes"
pathDefinitions string = "/definitions"
pathJobs string = "/jobs"
)

type PlatformServiceHandler interface {
Expand Down Expand Up @@ -105,6 +102,9 @@ type PlatformServiceHandler interface {
SetServiceUrlInWorkflowStatus(workflow *operatorapi.SonataFlow)

GetServiceSource() *duckv1.Destination

// Check if K_SINK has injected for Job Service. No Op for Data Index
CheckKSinkInjected() (bool, error)
}

type DataIndexHandler struct {
Expand Down Expand Up @@ -276,6 +276,10 @@ func (d *DataIndexHandler) GenerateServiceProperties() (*properties.Properties,
return props, nil
}

func (d *DataIndexHandler) CheckKSinkInjected() (bool, error) {
return true, nil // No op
}

type JobServiceHandler struct {
platform *operatorapi.SonataFlowPlatform
}
Expand Down Expand Up @@ -599,13 +603,13 @@ func (d *DataIndexHandler) GenerateKnativeResources(platform *operatorapi.Sonata
}
serviceName := d.GetServiceName()
return []client.Object{
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-error", "ProcessInstanceErrorDataEvent", pathProcesses, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-node", "ProcessInstanceNodeDataEvent", pathProcesses, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-sla", "ProcessInstanceSLADataEvent", pathProcesses, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-state", "ProcessInstanceStateDataEvent", pathProcesses, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-variable", "ProcessInstanceVariableDataEvent", pathProcesses, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-definition", "ProcessDefinitionEvent", pathDefinitions, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "jobs", "JobEvent", pathJobs, platform)}, nil
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-error", "ProcessInstanceErrorDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-node", "ProcessInstanceNodeDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-sla", "ProcessInstanceSLADataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-state", "ProcessInstanceStateDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-variable", "ProcessInstanceVariableDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-definition", "ProcessDefinitionEvent", constants.KogitoProcessDefinitionsEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "jobs", "JobEvent", constants.KogitoJobsPath, platform)}, nil
}

func (d JobServiceHandler) GetSourceBroker() *duckv1.Destination {
Expand Down Expand Up @@ -717,6 +721,13 @@ func (j *JobServiceHandler) GenerateKnativeResources(platform *operatorapi.Sonat
return resultObjs, nil
}

func (j *JobServiceHandler) CheckKSinkInjected() (bool, error) {
if j.GetSink() != nil { //job services has sink configured
return knative.CheckKSinkInjected(j.GetServiceName(), j.platform.Namespace)
}
return true, nil
}

func IsDataIndexEnabled(plf *operatorapi.SonataFlowPlatform) bool {
if plf.Spec.Services != nil {
if plf.Spec.Services.DataIndex != nil {
Expand Down
1 change: 1 addition & 0 deletions controllers/profiles/common/constants/platform_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
KogitoProcessDefinitionsEventsErrorsEnabled = "kogito.events.processdefinitions.errors.propagate"
KogitoProcessDefinitionsEventsPath = "/definitions"
KogitoUserTasksEventsEnabled = "kogito.events.usertasks.enabled"
KogitoJobsPath = "/jobs"
// KogitoDataIndexHealthCheckEnabled configures if a workflow must check for the data index availability as part
// of its start health check.
KogitoDataIndexHealthCheckEnabled = "kogito.data-index.health-enabled"
Expand Down
12 changes: 10 additions & 2 deletions controllers/sonataflowplatform_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestSonataFlowPlatformController(t *testing.T) {

// Create a fake client to mock API calls.
cl := test.NewSonataFlowClientBuilder().WithRuntimeObjects(ksp).WithStatusSubresource(ksp).Build()
utils.SetClient(cl)
// Create a SonataFlowPlatformReconciler object with the scheme and fake client.
r := &SonataFlowPlatformReconciler{cl, cl, cl.Scheme(), &rest.Config{}, &record.FakeRecorder{}}

Expand Down Expand Up @@ -190,6 +191,7 @@ func TestSonataFlowPlatformController(t *testing.T) {

// Create a fake client to mock API calls.
cl := test.NewKogitoClientBuilderWithOpenShift().WithRuntimeObjects(ksp).WithStatusSubresource(ksp).Build()
utils.SetClient(cl)

di := services.NewDataIndexHandler(ksp)

Expand Down Expand Up @@ -289,6 +291,7 @@ func TestSonataFlowPlatformController(t *testing.T) {

// Create a fake client to mock API calls.
cl := test.NewKogitoClientBuilderWithOpenShift().WithRuntimeObjects(ksp).WithStatusSubresource(ksp).Build()
utils.SetClient(cl)
// Create a SonataFlowPlatformReconciler object with the scheme and fake client.
r := &SonataFlowPlatformReconciler{cl, cl, cl.Scheme(), &rest.Config{}, &record.FakeRecorder{}}

Expand Down Expand Up @@ -397,6 +400,7 @@ func TestSonataFlowPlatformController(t *testing.T) {

// Create a fake client to mock API calls.
cl := test.NewKogitoClientBuilderWithOpenShift().WithRuntimeObjects(ksp).WithStatusSubresource(ksp).Build()
utils.SetClient(cl)
// Create a SonataFlowPlatformReconciler object with the scheme and fake client.
r := &SonataFlowPlatformReconciler{cl, cl, cl.Scheme(), &rest.Config{}, &record.FakeRecorder{}}

Expand Down Expand Up @@ -488,6 +492,7 @@ func TestSonataFlowPlatformController(t *testing.T) {

// Create a fake client to mock API calls.
cl := test.NewKogitoClientBuilderWithOpenShift().WithRuntimeObjects(ksp).WithStatusSubresource(ksp).Build()
utils.SetClient(cl)
// Create a SonataFlowPlatformReconciler object with the scheme and fake client.
r := &SonataFlowPlatformReconciler{cl, cl, cl.Scheme(), &rest.Config{}, &record.FakeRecorder{}}

Expand Down Expand Up @@ -574,6 +579,7 @@ func TestSonataFlowPlatformController(t *testing.T) {

// Create a fake client to mock API calls.
cl := test.NewKogitoClientBuilderWithOpenShift().WithRuntimeObjects(ksp).WithStatusSubresource(ksp).Build()
utils.SetClient(cl)
js := services.NewJobServiceHandler(ksp)
// Create a SonataFlowPlatformReconciler object with the scheme and fake client.
r := &SonataFlowPlatformReconciler{cl, cl, cl.Scheme(), &rest.Config{}, &record.FakeRecorder{}}
Expand Down Expand Up @@ -648,6 +654,7 @@ func TestSonataFlowPlatformController(t *testing.T) {

// Create a fake client to mock API calls.
cl := test.NewKogitoClientBuilderWithOpenShift().WithRuntimeObjects(ksp).WithStatusSubresource(ksp).Build()
utils.SetClient(cl)
di := services.NewDataIndexHandler(ksp)
js := services.NewJobServiceHandler(ksp)
// Create a SonataFlowPlatformReconciler object with the scheme and fake client.
Expand Down Expand Up @@ -718,6 +725,7 @@ func TestSonataFlowPlatformController(t *testing.T) {

// Create a fake client to mock API calls.
cl := test.NewSonataFlowClientBuilder().WithRuntimeObjects(kscp, ksp, ksp2).WithStatusSubresource(kscp, ksp, ksp2).Build()
utils.SetClient(cl)

// Create a SonataFlowPlatformReconciler object with the scheme and fake client.
r := &SonataFlowPlatformReconciler{cl, cl, cl.Scheme(), &rest.Config{}, &record.FakeRecorder{}}
Expand Down Expand Up @@ -867,7 +875,7 @@ func TestSonataFlowPlatformController(t *testing.T) {
},
}
_, err := r.Reconcile(context.TODO(), req)
if err != nil {
if err != nil && err.Error() != "waiting for K_SINK injection for sonataflow-platform-jobs-service to complete" {
t.Fatalf("reconcile: (%v)", err)
}

Expand Down Expand Up @@ -967,7 +975,7 @@ func TestSonataFlowPlatformController(t *testing.T) {
},
}
_, err := r.Reconcile(context.TODO(), req)
if err != nil {
if err != nil && err.Error() != "waiting for K_SINK injection for sonataflow-platform-jobs-service to complete" {
t.Fatalf("reconcile: (%v)", err)
}

Expand Down
4 changes: 2 additions & 2 deletions hack/ci/create-kind-cluster-with-registry.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ nodes:
kind: InitConfiguration
nodeRegistration:
kubeletExtraArgs:
system-reserved: memory=1Gi
system-reserved: memory=2Gi
- role: worker
kubeadmConfigPatches:
- |
kind: InitConfiguration
nodeRegistration:
kubeletExtraArgs:
system-reserved: memory=2Gi
system-reserved: memory=4Gi
containerdConfigPatches:
- |-
[plugins."io.containerd.grpc.v1.cri".registry]
Expand Down
Loading

0 comments on commit d2945f6

Please sign in to comment.