Skip to content

Commit

Permalink
[issue-368] knative integration with DataIndex and JobService: update…
Browse files Browse the repository at this point in the history
… unit test cases
  • Loading branch information
jianrongzhang89 committed May 27, 2024
1 parent fbfc1f2 commit 5153141
Show file tree
Hide file tree
Showing 17 changed files with 560 additions and 137 deletions.
76 changes: 74 additions & 2 deletions controllers/knative/knative.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (

operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/utils"
kubeutil "github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
Expand All @@ -35,14 +38,17 @@ import (

var servingClient clientservingv1.ServingV1Interface
var eventingClient clienteventingv1.EventingV1Interface
var discoveryClient discovery.DiscoveryInterface

type Availability struct {
Eventing bool
Serving bool
}

const (
KSink = "K_SINK"
KSink = "K_SINK"
KnativeBundleVolume = "kne-bundle-volume"
KCeOverRides = "K_CE_OVERRIDES"
)

func GetKnativeServingClient(cfg *rest.Config) (clientservingv1.ServingV1Interface, error) {
Expand Down Expand Up @@ -75,8 +81,23 @@ func NewKnativeEventingClient(cfg *rest.Config) (*clienteventingv1.EventingV1Cli
return clienteventingv1.NewForConfig(cfg)
}

func GetDisvoveryClient(cfg *rest.Config) (discovery.DiscoveryInterface, error) {
if discoveryClient == nil {
if cli, err := discovery.NewDiscoveryClientForConfig(cfg); err != nil {
return nil, err
} else {
discoveryClient = cli
}
}
return discoveryClient, nil
}

func SetDisvoveryClient(cli discovery.DiscoveryInterface) {
discoveryClient = cli
}

func GetKnativeAvailability(cfg *rest.Config) (*Availability, error) {
if cli, err := discovery.NewDiscoveryClientForConfig(cfg); err != nil {
if cli, err := GetDisvoveryClient(cfg); err != nil {
return nil, err
} else {
apiList, err := cli.ServerGroups()
Expand Down Expand Up @@ -122,3 +143,54 @@ func GetWorkflowSink(workflow *operatorapi.SonataFlow, pl *operatorapi.SonataFlo
func IsKnativeBroker(kRef *duckv1.KReference) bool {
return kRef.APIVersion == "eventing.knative.dev/v1" && kRef.Kind == "Broker"
}

func SaveKnativeData(dest *corev1.PodSpec, source *corev1.PodSpec) {
for _, volume := range source.Volumes {
if volume.Name == KnativeBundleVolume {
kubeutil.AddOrReplaceVolume(dest, volume)
break
}
}
for _, container := range source.Containers {
for ind, destContainer := range dest.Containers {
if destContainer.Name == container.Name {
for _, mount := range container.VolumeMounts {
if mount.Name == KnativeBundleVolume {
kubeutil.AddOrReplaceVolumeMount(ind, dest, mount)
break
}
}
for _, env := range container.Env {
if env.Name == KSink || env.Name == KCeOverRides {
kubeutil.AddOrReplaceEnvVar(ind, dest, env)
}
}
}
}
}
}

func moveKnativeVolumeToEnd(vols []corev1.Volume) {
for i := 0; i < len(vols)-1; i++ {
if vols[i].Name == KnativeBundleVolume {
vols[i], vols[i+1] = vols[i+1], vols[i]
}
}
}

func moveKnativeVolumeMountToEnd(mounts []corev1.VolumeMount) {
for i := 0; i < len(mounts)-1; i++ {
if mounts[i].Name == KnativeBundleVolume {
mounts[i], mounts[i+1] = mounts[i+1], mounts[i]
}
}
}

// Knative Sinkbinding injects K_SINK env, a volume and volumn mount. The volume and volume mount
// must be in the end of the array to avoid repeadly restarting of the workflow pod
func RestoreKnativeVolumeAndVolumeMount(deployment *appsv1.Deployment) {
moveKnativeVolumeToEnd(deployment.Spec.Template.Spec.Volumes)
for i := 0; i < len(deployment.Spec.Template.Spec.Containers); i++ {
moveKnativeVolumeMountToEnd(deployment.Spec.Template.Spec.Containers[i].VolumeMounts)
}
}
4 changes: 3 additions & 1 deletion controllers/platform/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client"
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/knative"
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/platform/services"
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/constants"
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/variables"
Expand Down Expand Up @@ -186,7 +187,8 @@ func createOrUpdateDeployment(ctx context.Context, client client.Client, platfor

// Create or Update the deployment
if op, err := controllerutil.CreateOrUpdate(ctx, client, serviceDeployment, func() error {
err := mergo.Merge(&(serviceDeployment.Spec), serviceDeploymentSpec)
knative.SaveKnativeData(&serviceDeploymentSpec.Template.Spec, &serviceDeployment.Spec.Template.Spec)
err := mergo.Merge(&(serviceDeployment.Spec), serviceDeploymentSpec, mergo.WithOverride)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions controllers/platform/services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ func (d JobServiceHandler) GetSourceBroker() *duckv1.Destination {

func (d JobServiceHandler) GetSink() *duckv1.Destination {
if d.platform.Spec.Services.JobService.Sink != nil {
return d.platform.Spec.Services.JobService.Source
return d.platform.Spec.Services.JobService.Sink
}
return GetPlatformBroker(d.platform)
}
Expand All @@ -638,7 +638,7 @@ func (j *JobServiceHandler) GenerateKnativeResources(platform *operatorapi.Sonat
brokerName := broker.Ref.Name
jobCreateTrigger := &eventingv1.Trigger{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-job-service-create-job-trigger", platform.Name),
Name: fmt.Sprintf("%s-jobs-service-create-job-trigger", platform.Name),
Namespace: namespace,
Labels: lbl,
},
Expand All @@ -665,7 +665,7 @@ func (j *JobServiceHandler) GenerateKnativeResources(platform *operatorapi.Sonat
resultObjs = append(resultObjs, jobCreateTrigger)
jobDeleteTrigger := &eventingv1.Trigger{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-job-service-delete-job-trigger", platform.Name),
Name: fmt.Sprintf("%s-jobs-service-delete-job-trigger", platform.Name),
Namespace: namespace,
Labels: lbl,
},
Expand Down
3 changes: 3 additions & 0 deletions controllers/profiles/common/ensurer.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ func (d *defaultObjectEnsurerWithPlatform) Ensure(ctx context.Context, workflow
if err != nil {
return nil, result, err
}
if object == nil {
return nil, result, nil
}
if result, err = controllerutil.CreateOrPatch(ctx, d.c, object,
func() error {
for _, v := range visitors {
Expand Down
58 changes: 0 additions & 58 deletions controllers/profiles/common/knative_eventing.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,10 @@ import (
operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/knative"
"github.com/apache/incubator-kie-kogito-serverless-operator/log"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
KnativeBundleVolume = "kne-bundle-volume"
)

var _ KnativeEventingHandler = &knativeObjectManager{}

type knativeObjectManager struct {
Expand Down Expand Up @@ -81,55 +75,3 @@ func (k knativeObjectManager) Ensure(ctx context.Context, workflow *operatorapi.
}
return objs, nil
}

func moveKnativeVolumeToEnd(vols []corev1.Volume) {
for i := 0; i < len(vols)-1; i++ {
if vols[i].Name == KnativeBundleVolume {
vols[i], vols[i+1] = vols[i+1], vols[i]
}
}
}

func moveKnativeVolumeMountToEnd(mounts []corev1.VolumeMount) {
for i := 0; i < len(mounts)-1; i++ {
if mounts[i].Name == KnativeBundleVolume {
mounts[i], mounts[i+1] = mounts[i+1], mounts[i]
}
}
}

// Knative Sinkbinding injects K_SINK env, a volume and volumn mount. The volume and volume mount
// must be in the end of the array to avoid repeadly restarting of the workflow pod
func restoreKnativeVolumeAndVolumeMount(deployment *appsv1.Deployment) {
moveKnativeVolumeToEnd(deployment.Spec.Template.Spec.Volumes)
for i := 0; i < len(deployment.Spec.Template.Spec.Containers); i++ {
moveKnativeVolumeMountToEnd(deployment.Spec.Template.Spec.Containers[i].VolumeMounts)
}
}

func preserveKnativeVolumeMount(object *appsv1.Deployment) {
var kneVol *corev1.Volume = nil
for _, v := range object.Spec.Template.Spec.Volumes {
if v.Name == KnativeBundleVolume {
kneVol = &v
}
}
if kneVol != nil {
object.Spec.Template.Spec.Volumes = []corev1.Volume{*kneVol}
} else {
object.Spec.Template.Spec.Volumes = nil
}
for i := range object.Spec.Template.Spec.Containers {
var kneVolMount *corev1.VolumeMount = nil
for _, mount := range object.Spec.Template.Spec.Containers[i].VolumeMounts {
if mount.Name == KnativeBundleVolume {
kneVolMount = &mount
}
}
if kneVolMount == nil {
object.Spec.Template.Spec.Containers[i].VolumeMounts = nil
} else {
object.Spec.Template.Spec.Containers[i].VolumeMounts = []corev1.VolumeMount{*kneVolMount}
}
}
}
24 changes: 15 additions & 9 deletions controllers/profiles/common/mutate_visitors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ package common
import (
"context"

operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/discovery"
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/knative"
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/properties"
kubeutil "github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes"
"github.com/apache/incubator-kie-kogito-serverless-operator/workflowproj"
"github.com/imdario/mergo"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
servingv1 "knative.dev/serving/pkg/apis/serving/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
kubeutil "github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes"
"github.com/apache/incubator-kie-kogito-serverless-operator/workflowproj"
)

// ImageDeploymentMutateVisitor creates a visitor that mutates a vanilla Kubernetes Deployment to apply the given image in the DefaultContainerName container
Expand Down Expand Up @@ -99,10 +99,16 @@ func EnsureDeployment(original *appsv1.Deployment, object *appsv1.Deployment) er
object.Finalizers = original.Finalizers

// Clean up the volumes, they are inherited from original, additional are added by other visitors
// However, the knative mount path must be preserved
preserveKnativeVolumeMount(object)
// However, the knative data (voulmes, volumes mounts) must be preserved
knative.SaveKnativeData(&original.Spec.Template.Spec, &object.Spec.Template.Spec)
// Clean up volumes and volume mounts
object.Spec.Template.Spec.Volumes = nil
for i := range object.Spec.Template.Spec.Containers {
object.Spec.Template.Spec.Containers[i].VolumeMounts = nil
}

// we do a merge to not keep changing the spec since k8s will set default values to the podSpec
return mergo.Merge(&object.Spec, original.Spec)
return mergo.Merge(&object.Spec.Template.Spec, original.Spec.Template.Spec, mergo.WithOverride)
}

// KServiceMutateVisitor guarantees the state of the default Knative Service object
Expand Down Expand Up @@ -132,7 +138,7 @@ func EnsureKService(original *servingv1.Service, object *servingv1.Service) erro
}

// we do a merge to not keep changing the spec since k8s will set default values to the podSpec
return mergo.Merge(&object.Spec.Template.Spec.PodSpec, original.Spec.Template.Spec.PodSpec, mergo.WithOverride)
return mergo.Merge(&object.Spec.Template.Spec.PodSpec, original.Spec.Template.Spec.PodSpec /*, mergo.WithOverride*/)
}

func ServiceMutateVisitor(workflow *operatorapi.SonataFlow) MutateVisitor {
Expand Down Expand Up @@ -188,7 +194,7 @@ func RolloutDeploymentIfCMChangedMutateVisitor(workflow *operatorapi.SonataFlow,
return func(object client.Object) controllerutil.MutateFn {
return func() error {
deployment := object.(*appsv1.Deployment)
restoreKnativeVolumeAndVolumeMount(deployment)
knative.RestoreKnativeVolumeAndVolumeMount(deployment)
err := kubeutil.AnnotateDeploymentConfigChecksum(workflow, deployment, userPropsCM, managedPropsCM)
return err
}
Expand Down
Loading

0 comments on commit 5153141

Please sign in to comment.