From 3c95aac632f3b454ebe6fcdea81001422e34f34c Mon Sep 17 00:00:00 2001 From: jianrongzhang89 Date: Thu, 1 Feb 2024 12:29:12 -0500 Subject: [PATCH] [issue-368] knative integration with DataIndex and JobService --- .gitignore | 2 + Makefile | 8 +- api/condition_types.go | 2 + api/v1alpha08/sonataflow_types.go | 12 + .../sonataflowplatform_services_types.go | 31 +- api/v1alpha08/sonataflowplatform_types.go | 13 + api/v1alpha08/zz_generated.deepcopy.go | 102 ++++++- ...taflow-operator.clusterserviceversion.yaml | 9 + .../sonataflow.org_sonataflowplatforms.yaml | 204 +++++++++++++ .../manifests/sonataflow.org_sonataflows.yaml | 57 ++++ .../sonataflow.org_sonataflowplatforms.yaml | 204 +++++++++++++ .../crd/bases/sonataflow.org_sonataflows.yaml | 57 ++++ ...taflow-operator.clusterserviceversion.yaml | 9 + controllers/knative/knative.go | 38 +++ controllers/platform/k8s.go | 39 ++- controllers/platform/services/knative.go | 48 +++ controllers/platform/services/properties.go | 30 +- .../services/properties_services_test.go | 12 +- controllers/platform/services/services.go | 289 +++++++++++++++--- .../common/constants/platform_services.go | 27 +- .../profiles/common/constants/workflows.go | 2 +- controllers/profiles/common/ensurer.go | 52 +++- .../profiles/common/knative_eventing.go | 89 +++++- .../profiles/common/mutate_visitors.go | 11 +- .../profiles/common/object_creators.go | 115 +++++-- .../profiles/common/object_creators_test.go | 8 +- .../profiles/common/properties/knative.go | 15 +- .../profiles/common/properties/managed.go | 2 +- .../common/properties/managed_test.go | 18 +- .../profiles/dev/object_creators_dev.go | 3 +- .../profiles/dev/object_creators_dev_test.go | 2 +- controllers/profiles/dev/states_dev.go | 6 +- controllers/profiles/factory/factory.go | 1 + .../profiles/preview/deployment_handler.go | 35 ++- .../profiles/preview/states_preview.go | 15 +- controllers/sonataflow_controller.go | 55 +++- .../sonataflowplatform_controller_test.go | 165 +++++++--- go.mod | 2 +- operator.yaml | 261 ++++++++++++++++ test/kubernetes_cli.go | 9 +- utils/kubernetes/deployment.go | 9 +- workflowproj/operator.go | 7 +- 42 files changed, 1893 insertions(+), 182 deletions(-) create mode 100644 controllers/platform/services/knative.go diff --git a/.gitignore b/.gitignore index b72576ce8..34fb20c94 100644 --- a/.gitignore +++ b/.gitignore @@ -27,5 +27,7 @@ Dockerfile /.idea/ /.vscode/ /target/ +/__debug* +database/ e2e-test-report.xml diff --git a/Makefile b/Makefile index ce87602d8..d8349ffdb 100644 --- a/Makefile +++ b/Makefile @@ -174,7 +174,7 @@ docker-buildx: generate ## Build and push docker image for the manager for cross sed -e '1 s/\(^FROM\)/FROM --platform=\$$\{BUILDPLATFORM\}/; t' -e ' 1,// s//FROM --platform=\$$\{BUILDPLATFORM\}/' Dockerfile > Dockerfile.cross - docker buildx create --name project-v3-builder docker buildx use project-v3-builder - - docker buildx build --build-arg SOURCE_DATE_EPOCH=$(shell git log -1 --pretty=%ct) --push --platform=$(PLATFORMS) --tag ${IMG} -f Dockerfile.cross + - docker buildx build --build-arg SOURCE_DATE_EPOCH=$(shell git log -1 --pretty=%ct) --push . --platform=$(PLATFORMS) --tag ${IMG} -f Dockerfile.cross - docker buildx rm project-v3-builder rm Dockerfile.cross @@ -297,12 +297,16 @@ ifneq ($(origin CATALOG_BASE_IMG), undefined) FROM_INDEX_OPT := --from-index $(CATALOG_BASE_IMG) endif +PLATFORM ?= linux/amd64 + # Build a catalog image by adding bundle images to an empty catalog using the operator package manager tool, 'opm'. # This recipe invokes 'opm' in 'semver' bundle add mode. For more information on add modes, see: # https://github.com/operator-framework/community-operators/blob/7f1438c/docs/packaging-operator.md#updating-your-existing-operator .PHONY: catalog-build catalog-build: opm ## Build a catalog image. - $(OPM) index add --container-tool $(BUILDER) --mode semver --tag $(CATALOG_IMG) --bundles $(BUNDLE_IMGS) $(FROM_INDEX_OPT) + $(OPM) index add --container-tool $(BUILDER) --mode semver --tag $(CATALOG_IMG) --bundles $(BUNDLE_IMGS) $(FROM_INDEX_OPT) --generate -d ./index.Dockerfile + $(BUILDER) build --platform $(PLATFORM) -f ./index.Dockerfile -t $(CATALOG_IMG) . + rm ./index.Dockerfile # Push the catalog image. .PHONY: catalog-push diff --git a/api/condition_types.go b/api/condition_types.go index 69b6e3be4..f71576197 100644 --- a/api/condition_types.go +++ b/api/condition_types.go @@ -39,6 +39,8 @@ const ( SucceedConditionType ConditionType = "Succeed" // BuiltConditionType describes the condition of a resource that needs to be build. BuiltConditionType ConditionType = "Built" + // DeployedConditionType describes the condition of a resource that needs to be deployed. + DeployedConditionType ConditionType = "Deployed" ) const ( diff --git a/api/v1alpha08/sonataflow_types.go b/api/v1alpha08/sonataflow_types.go index 7763dc23d..f1766cc09 100644 --- a/api/v1alpha08/sonataflow_types.go +++ b/api/v1alpha08/sonataflow_types.go @@ -161,6 +161,18 @@ type SonataFlowSpec struct { // Sink describes the sinkBinding details of this SonataFlow instance. //+operator-sdk:csv:customresourcedefinitions:type=spec,displayName="sink" Sink *duckv1.Destination `json:"sink,omitempty"` + // Sources describes the list of sources used to create triggers for events consumed by this SonataFlow instance. + //+operator-sdk:csv:customresourcedefinitions:type=spec,displayName="sources" + Sources []SonataFlowSourceSpec `json:"sources,omitempty"` +} + +// SonataFlowSourceSpec defines the desired state of a source used for trigger creation +// +k8s:openapi-gen=true +type SonataFlowSourceSpec struct { + // Defines the eventType to filter the events + EventType string `json:"eventType"` + // Defines the broker used + duckv1.Destination `json:",inline"` } // SonataFlowStatus defines the observed state of SonataFlow diff --git a/api/v1alpha08/sonataflowplatform_services_types.go b/api/v1alpha08/sonataflowplatform_services_types.go index a16bc2782..229bb2faf 100644 --- a/api/v1alpha08/sonataflowplatform_services_types.go +++ b/api/v1alpha08/sonataflowplatform_services_types.go @@ -14,14 +14,41 @@ package v1alpha08 +import ( + duckv1 "knative.dev/pkg/apis/duck/v1" +) + // ServicesPlatformSpec describes the desired service configuration for workflows without the `sonataflow.org/profile: dev` annotation. type ServicesPlatformSpec struct { // Deploys the Data Index service for use by workflows without the `sonataflow.org/profile: dev` annotation. // +optional - DataIndex *ServiceSpec `json:"dataIndex,omitempty"` + DataIndex *DataIndexServiceSpec `json:"dataIndex,omitempty"` // Deploys the Job service for use by workflows without the `sonataflow.org/profile: dev` annotation. // +optional - JobService *ServiceSpec `json:"jobService,omitempty"` + JobService *JobServiceServiceSpec `json:"jobService,omitempty"` +} + +// DataIndexServiceSpec defines the desired state of Dataindex service +// +k8s:openapi-gen=true +type DataIndexServiceSpec struct { + // Defines the common spec of a platform service + ServiceSpec `json:",inline"` + // Defines the source where the Dataindex receives events from + // +optional + Source *duckv1.Destination `json:"source,omitempty"` +} + +// JobServiceServiceSpec defines the desired state of Jobservice service +// +k8s:openapi-gen=true +type JobServiceServiceSpec struct { + // Defines the common spec of a platform service + ServiceSpec `json:",inline"` + // Defines the sink where the Jobservice sends events to + // +optional + Sink *duckv1.Destination `json:"sink,omitempty"` + // Defines the source where the Jobservice receives events from + // +optional + Source *duckv1.Destination `json:"source,omitempty"` } // ServiceSpec defines the desired state of a platform service diff --git a/api/v1alpha08/sonataflowplatform_types.go b/api/v1alpha08/sonataflowplatform_types.go index 26276fb8b..1144405fc 100644 --- a/api/v1alpha08/sonataflowplatform_types.go +++ b/api/v1alpha08/sonataflowplatform_types.go @@ -21,6 +21,7 @@ package v1alpha08 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + duckv1 "knative.dev/pkg/apis/duck/v1" "github.com/apache/incubator-kie-kogito-serverless-operator/api" ) @@ -47,6 +48,9 @@ type SonataFlowPlatformSpec struct { // +optional // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Services" Services *ServicesPlatformSpec `json:"services,omitempty"` + // Eventing describes the information required for Knative Eventing integration in the platform. + // +optional + Eventing *PlatformEventingSpec `json:"eventing,omitempty"` // Persistence defines the platform persistence configuration. When this field is set, // the configuration is used as the persistence for platform services and SonataFlow instances // that don't provide one of their own. @@ -61,6 +65,15 @@ type SonataFlowPlatformSpec struct { Properties *PropertyPlatformSpec `json:"properties,omitempty"` } +// PlatformEventingSpec specifies the Knative Eventing integration details in the platform. +// +k8s:openapi-gen=true +type PlatformEventingSpec struct { + // Broker to communicate with workflow deployment. It can be the default broker when the workflow, Dataindex, or Jobservice does not have a sink or source specified. + // +optional + // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="broker" + Broker *duckv1.Destination `json:"broker,omitempty"` +} + // PlatformCluster is the kind of orchestration cluster the platform is installed into // +kubebuilder:validation:Enum=kubernetes;openshift type PlatformCluster string diff --git a/api/v1alpha08/zz_generated.deepcopy.go b/api/v1alpha08/zz_generated.deepcopy.go index 59f245632..f621782e2 100644 --- a/api/v1alpha08/zz_generated.deepcopy.go +++ b/api/v1alpha08/zz_generated.deepcopy.go @@ -213,6 +213,27 @@ func (in *ContainerSpec) DeepCopy() *ContainerSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DataIndexServiceSpec) DeepCopyInto(out *DataIndexServiceSpec) { + *out = *in + in.ServiceSpec.DeepCopyInto(&out.ServiceSpec) + if in.Source != nil { + in, out := &in.Source, &out.Source + *out = new(duckv1.Destination) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DataIndexServiceSpec. +func (in *DataIndexServiceSpec) DeepCopy() *DataIndexServiceSpec { + if in == nil { + return nil + } + out := new(DataIndexServiceSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DevModePlatformSpec) DeepCopyInto(out *DevModePlatformSpec) { *out = *in @@ -342,6 +363,32 @@ func (in *FlowPodTemplateSpec) DeepCopy() *FlowPodTemplateSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *JobServiceServiceSpec) DeepCopyInto(out *JobServiceServiceSpec) { + *out = *in + in.ServiceSpec.DeepCopyInto(&out.ServiceSpec) + if in.Sink != nil { + in, out := &in.Sink, &out.Sink + *out = new(duckv1.Destination) + (*in).DeepCopyInto(*out) + } + if in.Source != nil { + in, out := &in.Source, &out.Source + *out = new(duckv1.Destination) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JobServiceServiceSpec. +func (in *JobServiceServiceSpec) DeepCopy() *JobServiceServiceSpec { + if in == nil { + return nil + } + out := new(JobServiceServiceSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PersistenceOptionsSpec) DeepCopyInto(out *PersistenceOptionsSpec) { *out = *in @@ -383,6 +430,26 @@ func (in *PersistencePostgreSQL) DeepCopy() *PersistencePostgreSQL { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PlatformEventingSpec) DeepCopyInto(out *PlatformEventingSpec) { + *out = *in + if in.Broker != nil { + in, out := &in.Broker, &out.Broker + *out = new(duckv1.Destination) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PlatformEventingSpec. +func (in *PlatformEventingSpec) DeepCopy() *PlatformEventingSpec { + if in == nil { + return nil + } + out := new(PlatformEventingSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PlatformPersistenceOptionsSpec) DeepCopyInto(out *PlatformPersistenceOptionsSpec) { *out = *in @@ -817,12 +884,12 @@ func (in *ServicesPlatformSpec) DeepCopyInto(out *ServicesPlatformSpec) { *out = *in if in.DataIndex != nil { in, out := &in.DataIndex, &out.DataIndex - *out = new(ServiceSpec) + *out = new(DataIndexServiceSpec) (*in).DeepCopyInto(*out) } if in.JobService != nil { in, out := &in.JobService, &out.JobService - *out = new(ServiceSpec) + *out = new(JobServiceServiceSpec) (*in).DeepCopyInto(*out) } } @@ -1208,6 +1275,11 @@ func (in *SonataFlowPlatformSpec) DeepCopyInto(out *SonataFlowPlatformSpec) { *out = new(ServicesPlatformSpec) (*in).DeepCopyInto(*out) } + if in.Eventing != nil { + in, out := &in.Eventing, &out.Eventing + *out = new(PlatformEventingSpec) + (*in).DeepCopyInto(*out) + } if in.Persistence != nil { in, out := &in.Persistence, &out.Persistence *out = new(PlatformPersistenceOptionsSpec) @@ -1258,6 +1330,22 @@ func (in *SonataFlowPlatformStatus) DeepCopy() *SonataFlowPlatformStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SonataFlowSourceSpec) DeepCopyInto(out *SonataFlowSourceSpec) { + *out = *in + in.Destination.DeepCopyInto(&out.Destination) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SonataFlowSourceSpec. +func (in *SonataFlowSourceSpec) DeepCopy() *SonataFlowSourceSpec { + if in == nil { + return nil + } + out := new(SonataFlowSourceSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SonataFlowSpec) DeepCopyInto(out *SonataFlowSpec) { *out = *in @@ -1274,10 +1362,12 @@ func (in *SonataFlowSpec) DeepCopyInto(out *SonataFlowSpec) { *out = new(duckv1.Destination) (*in).DeepCopyInto(*out) } - if in.Sink != nil { - in, out := &in.Sink, &out.Sink - *out = new(duckv1.Destination) - (*in).DeepCopyInto(*out) + if in.Sources != nil { + in, out := &in.Sources, &out.Sources + *out = make([]SonataFlowSourceSpec, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } } } diff --git a/bundle/manifests/sonataflow-operator.clusterserviceversion.yaml b/bundle/manifests/sonataflow-operator.clusterserviceversion.yaml index 7e78264fa..6fc3c17a8 100644 --- a/bundle/manifests/sonataflow-operator.clusterserviceversion.yaml +++ b/bundle/manifests/sonataflow-operator.clusterserviceversion.yaml @@ -248,6 +248,11 @@ spec: no build required) displayName: DevMode path: devMode + - description: Broker to communicate with workflow deployment. It can be the + default broker when the workflow, Dataindex, or Jobservice does not have + a sink or source specified. + displayName: broker + path: eventing.broker - description: 'Services attributes for deploying supporting applications like Data Index & Job Service. Only workflows without the `sonataflow.org/profile: dev` annotation will be configured to use these service(s). Setting this @@ -318,6 +323,10 @@ spec: - description: Sink describes the sinkBinding details of this SonataFlow instance. displayName: sink path: sink + - description: Sources describes the list of sources used to create triggers + for events consumed by this SonataFlow instance. + displayName: sources + path: sources statusDescriptors: - description: Address is used as a part of Addressable interface (status.address.url) for knative diff --git a/bundle/manifests/sonataflow.org_sonataflowplatforms.yaml b/bundle/manifests/sonataflow.org_sonataflowplatforms.yaml index 09a2f2544..565f51d66 100644 --- a/bundle/manifests/sonataflow.org_sonataflowplatforms.yaml +++ b/bundle/manifests/sonataflow.org_sonataflowplatforms.yaml @@ -420,6 +420,60 @@ spec: of the operator's default. type: string type: object + eventing: + description: Eventing describes the information required for Knative + Eventing integration in the platform. + properties: + broker: + description: Broker to communicate with workflow deployment. It + can be the default broker when the workflow, Dataindex, or Jobservice + does not have a sink or source specified. + properties: + CACerts: + description: CACerts are Certification Authority (CA) certificates + in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. + If set, these CAs are appended to the set of CAs provided + by the Addressable target, if any. + type: string + ref: + description: Ref points to an Addressable. + properties: + address: + description: Address points to a specific Address Name. + type: string + apiVersion: + description: API version of the referent. + type: string + group: + description: 'Group of the API, without the version of + the group. This can be used as an alternative to the + APIVersion, and then resolved using ResolveGroup. Note: + This API is EXPERIMENTAL and might break anytime. For + more details: https://github.com/knative/eventing/issues/5086' + type: string + kind: + description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + name: + description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' + type: string + namespace: + description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ + This is optional field, it gets defaulted to the object + holding it if left out.' + type: string + required: + - kind + - name + type: object + uri: + description: URI can be an absolute URL(non-empty scheme and + non-empty host) pointing to the target or a relative URI. + Relative URIs will be resolved using the base URI retrieved + from Ref. + type: string + type: object + type: object persistence: description: Persistence defines the platform persistence configuration. When this field is set, the configuration is used as the persistence @@ -8427,6 +8481,56 @@ spec: type: object type: array type: object + source: + description: Defines the source where the Dataindex receives + events from + properties: + CACerts: + description: CACerts are Certification Authority (CA) + certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. + If set, these CAs are appended to the set of CAs provided + by the Addressable target, if any. + type: string + ref: + description: Ref points to an Addressable. + properties: + address: + description: Address points to a specific Address + Name. + type: string + apiVersion: + description: API version of the referent. + type: string + group: + description: 'Group of the API, without the version + of the group. This can be used as an alternative + to the APIVersion, and then resolved using ResolveGroup. + Note: This API is EXPERIMENTAL and might break anytime. + For more details: https://github.com/knative/eventing/issues/5086' + type: string + kind: + description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + name: + description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' + type: string + namespace: + description: 'Namespace of the referent. More info: + https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ + This is optional field, it gets defaulted to the + object holding it if left out.' + type: string + required: + - kind + - name + type: object + uri: + description: URI can be an absolute URL(non-empty scheme + and non-empty host) pointing to the target or a relative + URI. Relative URIs will be resolved using the base URI + retrieved from Ref. + type: string + type: object type: object jobService: description: 'Deploys the Job service for use by workflows without @@ -16299,6 +16403,106 @@ spec: type: object type: array type: object + sink: + description: Defines the sink where the Jobservice sends events + to + properties: + CACerts: + description: CACerts are Certification Authority (CA) + certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. + If set, these CAs are appended to the set of CAs provided + by the Addressable target, if any. + type: string + ref: + description: Ref points to an Addressable. + properties: + address: + description: Address points to a specific Address + Name. + type: string + apiVersion: + description: API version of the referent. + type: string + group: + description: 'Group of the API, without the version + of the group. This can be used as an alternative + to the APIVersion, and then resolved using ResolveGroup. + Note: This API is EXPERIMENTAL and might break anytime. + For more details: https://github.com/knative/eventing/issues/5086' + type: string + kind: + description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + name: + description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' + type: string + namespace: + description: 'Namespace of the referent. More info: + https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ + This is optional field, it gets defaulted to the + object holding it if left out.' + type: string + required: + - kind + - name + type: object + uri: + description: URI can be an absolute URL(non-empty scheme + and non-empty host) pointing to the target or a relative + URI. Relative URIs will be resolved using the base URI + retrieved from Ref. + type: string + type: object + source: + description: Defines the source where the Jobservice receives + events from + properties: + CACerts: + description: CACerts are Certification Authority (CA) + certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. + If set, these CAs are appended to the set of CAs provided + by the Addressable target, if any. + type: string + ref: + description: Ref points to an Addressable. + properties: + address: + description: Address points to a specific Address + Name. + type: string + apiVersion: + description: API version of the referent. + type: string + group: + description: 'Group of the API, without the version + of the group. This can be used as an alternative + to the APIVersion, and then resolved using ResolveGroup. + Note: This API is EXPERIMENTAL and might break anytime. + For more details: https://github.com/knative/eventing/issues/5086' + type: string + kind: + description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + name: + description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' + type: string + namespace: + description: 'Namespace of the referent. More info: + https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ + This is optional field, it gets defaulted to the + object holding it if left out.' + type: string + required: + - kind + - name + type: object + uri: + description: URI can be an absolute URL(non-empty scheme + and non-empty host) pointing to the target or a relative + URI. Relative URIs will be resolved using the base URI + retrieved from Ref. + type: string + type: object type: object type: object type: object diff --git a/bundle/manifests/sonataflow.org_sonataflows.yaml b/bundle/manifests/sonataflow.org_sonataflows.yaml index 6f2cb2e75..109cdb84d 100644 --- a/bundle/manifests/sonataflow.org_sonataflows.yaml +++ b/bundle/manifests/sonataflow.org_sonataflows.yaml @@ -9402,6 +9402,63 @@ spec: will be resolved using the base URI retrieved from Ref. type: string type: object + sources: + description: Sources describes the list of sources used to create + triggers for events consumed by this SonataFlow instance. + items: + description: SonataFlowSourceSpec defines the desired state of a + source used for trigger creation + properties: + CACerts: + description: CACerts are Certification Authority (CA) certificates + in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. + If set, these CAs are appended to the set of CAs provided + by the Addressable target, if any. + type: string + eventType: + description: Defines the eventType to filter the events + type: string + ref: + description: Ref points to an Addressable. + properties: + address: + description: Address points to a specific Address Name. + type: string + apiVersion: + description: API version of the referent. + type: string + group: + description: 'Group of the API, without the version of the + group. This can be used as an alternative to the APIVersion, + and then resolved using ResolveGroup. Note: This API is + EXPERIMENTAL and might break anytime. For more details: + https://github.com/knative/eventing/issues/5086' + type: string + kind: + description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + name: + description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' + type: string + namespace: + description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ + This is optional field, it gets defaulted to the object + holding it if left out.' + type: string + required: + - kind + - name + type: object + uri: + description: URI can be an absolute URL(non-empty scheme and + non-empty host) pointing to the target or a relative URI. + Relative URIs will be resolved using the base URI retrieved + from Ref. + type: string + required: + - eventType + type: object + type: array required: - flow type: object diff --git a/config/crd/bases/sonataflow.org_sonataflowplatforms.yaml b/config/crd/bases/sonataflow.org_sonataflowplatforms.yaml index 648b1bcde..c5c84e448 100644 --- a/config/crd/bases/sonataflow.org_sonataflowplatforms.yaml +++ b/config/crd/bases/sonataflow.org_sonataflowplatforms.yaml @@ -421,6 +421,60 @@ spec: of the operator's default. type: string type: object + eventing: + description: Eventing describes the information required for Knative + Eventing integration in the platform. + properties: + broker: + description: Broker to communicate with workflow deployment. It + can be the default broker when the workflow, Dataindex, or Jobservice + does not have a sink or source specified. + properties: + CACerts: + description: CACerts are Certification Authority (CA) certificates + in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. + If set, these CAs are appended to the set of CAs provided + by the Addressable target, if any. + type: string + ref: + description: Ref points to an Addressable. + properties: + address: + description: Address points to a specific Address Name. + type: string + apiVersion: + description: API version of the referent. + type: string + group: + description: 'Group of the API, without the version of + the group. This can be used as an alternative to the + APIVersion, and then resolved using ResolveGroup. Note: + This API is EXPERIMENTAL and might break anytime. For + more details: https://github.com/knative/eventing/issues/5086' + type: string + kind: + description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + name: + description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' + type: string + namespace: + description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ + This is optional field, it gets defaulted to the object + holding it if left out.' + type: string + required: + - kind + - name + type: object + uri: + description: URI can be an absolute URL(non-empty scheme and + non-empty host) pointing to the target or a relative URI. + Relative URIs will be resolved using the base URI retrieved + from Ref. + type: string + type: object + type: object persistence: description: Persistence defines the platform persistence configuration. When this field is set, the configuration is used as the persistence @@ -8428,6 +8482,56 @@ spec: type: object type: array type: object + source: + description: Defines the source where the Dataindex receives + events from + properties: + CACerts: + description: CACerts are Certification Authority (CA) + certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. + If set, these CAs are appended to the set of CAs provided + by the Addressable target, if any. + type: string + ref: + description: Ref points to an Addressable. + properties: + address: + description: Address points to a specific Address + Name. + type: string + apiVersion: + description: API version of the referent. + type: string + group: + description: 'Group of the API, without the version + of the group. This can be used as an alternative + to the APIVersion, and then resolved using ResolveGroup. + Note: This API is EXPERIMENTAL and might break anytime. + For more details: https://github.com/knative/eventing/issues/5086' + type: string + kind: + description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + name: + description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' + type: string + namespace: + description: 'Namespace of the referent. More info: + https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ + This is optional field, it gets defaulted to the + object holding it if left out.' + type: string + required: + - kind + - name + type: object + uri: + description: URI can be an absolute URL(non-empty scheme + and non-empty host) pointing to the target or a relative + URI. Relative URIs will be resolved using the base URI + retrieved from Ref. + type: string + type: object type: object jobService: description: 'Deploys the Job service for use by workflows without @@ -16300,6 +16404,106 @@ spec: type: object type: array type: object + sink: + description: Defines the sink where the Jobservice sends events + to + properties: + CACerts: + description: CACerts are Certification Authority (CA) + certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. + If set, these CAs are appended to the set of CAs provided + by the Addressable target, if any. + type: string + ref: + description: Ref points to an Addressable. + properties: + address: + description: Address points to a specific Address + Name. + type: string + apiVersion: + description: API version of the referent. + type: string + group: + description: 'Group of the API, without the version + of the group. This can be used as an alternative + to the APIVersion, and then resolved using ResolveGroup. + Note: This API is EXPERIMENTAL and might break anytime. + For more details: https://github.com/knative/eventing/issues/5086' + type: string + kind: + description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + name: + description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' + type: string + namespace: + description: 'Namespace of the referent. More info: + https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ + This is optional field, it gets defaulted to the + object holding it if left out.' + type: string + required: + - kind + - name + type: object + uri: + description: URI can be an absolute URL(non-empty scheme + and non-empty host) pointing to the target or a relative + URI. Relative URIs will be resolved using the base URI + retrieved from Ref. + type: string + type: object + source: + description: Defines the source where the Jobservice receives + events from + properties: + CACerts: + description: CACerts are Certification Authority (CA) + certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. + If set, these CAs are appended to the set of CAs provided + by the Addressable target, if any. + type: string + ref: + description: Ref points to an Addressable. + properties: + address: + description: Address points to a specific Address + Name. + type: string + apiVersion: + description: API version of the referent. + type: string + group: + description: 'Group of the API, without the version + of the group. This can be used as an alternative + to the APIVersion, and then resolved using ResolveGroup. + Note: This API is EXPERIMENTAL and might break anytime. + For more details: https://github.com/knative/eventing/issues/5086' + type: string + kind: + description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + name: + description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' + type: string + namespace: + description: 'Namespace of the referent. More info: + https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ + This is optional field, it gets defaulted to the + object holding it if left out.' + type: string + required: + - kind + - name + type: object + uri: + description: URI can be an absolute URL(non-empty scheme + and non-empty host) pointing to the target or a relative + URI. Relative URIs will be resolved using the base URI + retrieved from Ref. + type: string + type: object type: object type: object type: object diff --git a/config/crd/bases/sonataflow.org_sonataflows.yaml b/config/crd/bases/sonataflow.org_sonataflows.yaml index 61002c32f..dc9bcec81 100644 --- a/config/crd/bases/sonataflow.org_sonataflows.yaml +++ b/config/crd/bases/sonataflow.org_sonataflows.yaml @@ -9403,6 +9403,63 @@ spec: will be resolved using the base URI retrieved from Ref. type: string type: object + sources: + description: Sources describes the list of sources used to create + triggers for events consumed by this SonataFlow instance. + items: + description: SonataFlowSourceSpec defines the desired state of a + source used for trigger creation + properties: + CACerts: + description: CACerts are Certification Authority (CA) certificates + in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. + If set, these CAs are appended to the set of CAs provided + by the Addressable target, if any. + type: string + eventType: + description: Defines the eventType to filter the events + type: string + ref: + description: Ref points to an Addressable. + properties: + address: + description: Address points to a specific Address Name. + type: string + apiVersion: + description: API version of the referent. + type: string + group: + description: 'Group of the API, without the version of the + group. This can be used as an alternative to the APIVersion, + and then resolved using ResolveGroup. Note: This API is + EXPERIMENTAL and might break anytime. For more details: + https://github.com/knative/eventing/issues/5086' + type: string + kind: + description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + name: + description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' + type: string + namespace: + description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ + This is optional field, it gets defaulted to the object + holding it if left out.' + type: string + required: + - kind + - name + type: object + uri: + description: URI can be an absolute URL(non-empty scheme and + non-empty host) pointing to the target or a relative URI. + Relative URIs will be resolved using the base URI retrieved + from Ref. + type: string + required: + - eventType + type: object + type: array required: - flow type: object diff --git a/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml b/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml index c6fdf4e6a..b592f46dd 100644 --- a/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml +++ b/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml @@ -132,6 +132,11 @@ spec: no build required) displayName: DevMode path: devMode + - description: Broker to communicate with workflow deployment. It can be the + default broker when the workflow, Dataindex, or Jobservice does not have + a sink or source specified. + displayName: broker + path: eventing.broker - description: 'Services attributes for deploying supporting applications like Data Index & Job Service. Only workflows without the `sonataflow.org/profile: dev` annotation will be configured to use these service(s). Setting this @@ -202,6 +207,10 @@ spec: - description: Sink describes the sinkBinding details of this SonataFlow instance. displayName: sink path: sink + - description: Sources describes the list of sources used to create triggers + for events consumed by this SonataFlow instance. + displayName: sources + path: sources statusDescriptors: - description: Address is used as a part of Addressable interface (status.address.url) for knative diff --git a/controllers/knative/knative.go b/controllers/knative/knative.go index 929a96cc5..208ac86b7 100644 --- a/controllers/knative/knative.go +++ b/controllers/knative/knative.go @@ -20,9 +20,16 @@ package knative import ( + "context" + "fmt" + + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/utils" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/discovery" "k8s.io/client-go/rest" clienteventingv1 "knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1" + duckv1 "knative.dev/pkg/apis/duck/v1" clientservingv1 "knative.dev/serving/pkg/client/clientset/versioned/typed/serving/v1" ) @@ -34,6 +41,10 @@ type Availability struct { Serving bool } +const ( + KSink = "K_SINK" +) + func GetKnativeServingClient(cfg *rest.Config) (clientservingv1.ServingV1Interface, error) { if servingClient == nil { if knServingClient, err := NewKnativeServingClient(cfg); err != nil { @@ -84,3 +95,30 @@ func GetKnativeAvailability(cfg *rest.Config) (*Availability, error) { return result, nil } } + +func GetWorkflowSink(workflow *operatorapi.SonataFlow, pl *operatorapi.SonataFlowPlatform) (*duckv1.Destination, error) { + if workflow == nil { + return nil, nil + } + if workflow.Spec.Sink != nil { + return workflow.Spec.Sink, nil + } + if pl != nil && pl.Spec.Eventing != nil { + // no sink defined in the workflow, use the platform broker + return pl.Spec.Eventing.Broker, nil + } else if pl.Status.ClusterPlatformRef != nil { + // Find the platform referred by the cluster platform + platform := &operatorapi.SonataFlowPlatform{} + if err := utils.GetClient().Get(context.TODO(), types.NamespacedName{Namespace: pl.Status.ClusterPlatformRef.PlatformRef.Namespace, Name: pl.Status.ClusterPlatformRef.PlatformRef.Name}, platform); err != nil { + return nil, fmt.Errorf("error reading the platform referred by the cluster platform") + } + if platform.Spec.Eventing != nil { + return platform.Spec.Eventing.Broker, nil + } + } + return nil, nil +} + +func IsKnativeBroker(kRef *duckv1.KReference) bool { + return kRef.APIVersion == "eventing.knative.dev/v1" && kRef.Kind == "Broker" +} diff --git a/controllers/platform/k8s.go b/controllers/platform/k8s.go index 62d8b3699..4a1b291d6 100644 --- a/controllers/platform/k8s.go +++ b/controllers/platform/k8s.go @@ -31,6 +31,7 @@ import ( "github.com/apache/incubator-kie-kogito-serverless-operator/utils" kubeutil "github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes" "github.com/apache/incubator-kie-kogito-serverless-operator/workflowproj" + "github.com/imdario/mergo" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -85,7 +86,10 @@ func createOrUpdateServiceComponents(ctx context.Context, client client.Client, if err := createOrUpdateDeployment(ctx, client, platform, psh); err != nil { return err } - return createOrUpdateService(ctx, client, platform, psh) + if err := createOrUpdateService(ctx, client, platform, psh); err != nil { + return err + } + return createOrUpdateKnativeResources(ctx, client, platform, psh) } func createOrUpdateDeployment(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error { @@ -182,8 +186,10 @@ func createOrUpdateDeployment(ctx context.Context, client client.Client, platfor // Create or Update the deployment if op, err := controllerutil.CreateOrUpdate(ctx, client, serviceDeployment, func() error { - serviceDeployment.Spec = serviceDeploymentSpec - + err := mergo.Merge(&(serviceDeployment.Spec), serviceDeploymentSpec) + if err != nil { + return err + } return nil }); err != nil { return err @@ -248,6 +254,7 @@ func createOrUpdateConfigMap(ctx context.Context, client client.Client, platform return err } lbl, _ := getLabels(platform, psh) + dataStr := handler.Build() configMap := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: psh.GetServiceCmName(), @@ -255,7 +262,7 @@ func createOrUpdateConfigMap(ctx context.Context, client client.Client, platform Labels: lbl, }, Data: map[string]string{ - workflowproj.ApplicationPropertiesFileName: handler.Build(), + workflowproj.ApplicationPropertiesFileName: dataStr, }, } if err := controllerutil.SetControllerReference(platform, configMap, client.Scheme()); err != nil { @@ -264,7 +271,7 @@ func createOrUpdateConfigMap(ctx context.Context, client client.Client, platform // Create or Update the service if op, err := controllerutil.CreateOrUpdate(ctx, client, configMap, func() error { - configMap.Data[workflowproj.ApplicationPropertiesFileName] = handler.WithUserProperties(configMap.Data[workflowproj.ApplicationPropertiesFileName]).Build() + configMap.Data[workflowproj.ApplicationPropertiesFileName] = handler.WithUserProperties(dataStr).Build() return nil }); err != nil { @@ -272,6 +279,28 @@ func createOrUpdateConfigMap(ctx context.Context, client client.Client, platform } else { klog.V(log.I).InfoS("ConfigMap successfully reconciled", "operation", op) } + return nil +} +func createOrUpdateKnativeResources(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error { + + lbl, _ := getLabels(platform, psh) + if objs, err := psh.GenerateKnativeResources(platform, lbl); err != nil { + return err + } else if len(objs) > 0 { + for _, obj := range objs { + if op, err := controllerutil.CreateOrUpdate(ctx, client, obj, func() error { + if err := controllerutil.SetControllerReference(platform, obj, client.Scheme()); err != nil { + return err + } + return nil + }); err != nil { + return err + } else { + klog.V(log.I).InfoS("Knative Eventing resources successfully created", "operation", op) + } + } + return nil + } return nil } diff --git a/controllers/platform/services/knative.go b/controllers/platform/services/knative.go new file mode 100644 index 000000000..09ea1b4d5 --- /dev/null +++ b/controllers/platform/services/knative.go @@ -0,0 +1,48 @@ +// Copyright 2024 Apache Software Foundation (ASF) +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package services + +import ( + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "k8s.io/utils/pointer" +) + +func IsDataIndexEnabled(plf *operatorapi.SonataFlowPlatform) bool { + if plf.Spec.Services != nil { + if plf.Spec.Services.DataIndex != nil { + return pointer.BoolDeref(plf.Spec.Services.DataIndex.Enabled, false) + } + return false + } + // Check if DataIndex is enabled in the platform status + if plf.Status.ClusterPlatformRef != nil && plf.Status.ClusterPlatformRef.Services != nil && plf.Status.ClusterPlatformRef.Services.DataIndexRef != nil && len(plf.Status.ClusterPlatformRef.Services.DataIndexRef.Url) > 0 { + return true + } + return false +} + +func IsJobServiceEnabled(plf *operatorapi.SonataFlowPlatform) bool { + if plf.Spec.Services != nil { + if plf.Spec.Services.JobService != nil { + return pointer.BoolDeref(plf.Spec.Services.JobService.Enabled, false) + } + return false + } + // Check if JobService is enabled in the platform status + if plf.Status.ClusterPlatformRef != nil && plf.Status.ClusterPlatformRef.Services != nil && plf.Status.ClusterPlatformRef.Services.JobServiceRef != nil && len(plf.Status.ClusterPlatformRef.Services.JobServiceRef.Url) > 0 { + return true + } + return false +} diff --git a/controllers/platform/services/properties.go b/controllers/platform/services/properties.go index c1deedcbb..14a1f1506 100644 --- a/controllers/platform/services/properties.go +++ b/controllers/platform/services/properties.go @@ -29,6 +29,7 @@ import ( "k8s.io/klog/v2" operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/knative" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/constants" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/workflowdef" @@ -161,6 +162,10 @@ func GenerateDataIndexWorkflowProperties(workflow *operatorapi.SonataFlow, platf props := properties.NewProperties() props.Set(constants.KogitoProcessDefinitionsEventsEnabled, "false") props.Set(constants.KogitoProcessInstancesEventsEnabled, "false") + sink, err := knative.GetWorkflowSink(workflow, platform) + if err != nil { + return nil, err + } di := NewDataIndexHandler(platform) if !profiles.IsDevProfile(workflow) && workflow != nil && workflow.Status.Services != nil && workflow.Status.Services.DataIndexRef != nil { serviceBaseUrl := workflow.Status.Services.DataIndexRef.Url @@ -170,8 +175,17 @@ func GenerateDataIndexWorkflowProperties(workflow *operatorapi.SonataFlow, platf props.Set(constants.KogitoProcessDefinitionsEventsErrorsEnabled, "true") props.Set(constants.KogitoDataIndexHealthCheckEnabled, "true") props.Set(constants.KogitoDataIndexURL, serviceBaseUrl) - props.Set(constants.KogitoProcessDefinitionsEventsURL, serviceBaseUrl+constants.KogitoProcessDefinitionsEventsPath) - props.Set(constants.KogitoProcessInstancesEventsURL, serviceBaseUrl+constants.KogitoProcessInstancesEventsPath) + if sink != nil { + props.Set(constants.KogitoProcessDefinitionsEventsConnector, constants.QuarkusHTTP) + props.Set(constants.KogitoProcessInstancesEventsConnector, constants.QuarkusHTTP) + props.Set(constants.KogitoProcessDefinitionsEventsURL, constants.KnativeInjectedEnvVar) + props.Set(constants.KogitoProcessInstancesEventsURL, constants.KnativeInjectedEnvVar) + props.Set(constants.KogitoProcessDefinitionsEventsMethod, constants.Post) + props.Set(constants.KogitoProcessInstancesEventsMethod, constants.Post) + } else { + props.Set(constants.KogitoProcessDefinitionsEventsURL, serviceBaseUrl+constants.KogitoProcessDefinitionsEventsPath) + props.Set(constants.KogitoProcessInstancesEventsURL, serviceBaseUrl+constants.KogitoProcessInstancesEventsPath) + } } } props.Sort() @@ -187,6 +201,10 @@ func GenerateJobServiceWorkflowProperties(workflow *operatorapi.SonataFlow, plat props := properties.NewProperties() props.Set(constants.JobServiceRequestEventsConnector, constants.QuarkusHTTP) props.Set(constants.JobServiceRequestEventsURL, fmt.Sprintf("%s://localhost/v2/jobs/events", constants.JobServiceURLProtocol)) + sink, err := knative.GetWorkflowSink(workflow, platform) + if err != nil { + return nil, err + } js := NewJobServiceHandler(platform) if !profiles.IsDevProfile(workflow) && workflow != nil && workflow.Status.Services != nil && workflow.Status.Services.JobServiceRef != nil { serviceBaseUrl := workflow.Status.Services.JobServiceRef.Url @@ -195,7 +213,13 @@ func GenerateJobServiceWorkflowProperties(workflow *operatorapi.SonataFlow, plat props.Set(constants.KogitoJobServiceHealthCheckEnabled, "true") } props.Set(constants.KogitoJobServiceURL, serviceBaseUrl) - props.Set(constants.JobServiceRequestEventsURL, serviceBaseUrl+constants.JobServiceJobEventsPath) + if sink != nil { + props.Set(constants.JobServiceRequestEventsURL, constants.KnativeInjectedEnvVar) + props.Set(constants.JobServiceRequestEventsConnector, constants.QuarkusHTTP) + props.Set(constants.JobServiceRequestEventsMethod, constants.Post) + } else { + props.Set(constants.JobServiceRequestEventsURL, serviceBaseUrl+constants.JobServiceJobEventsPath) + } } } props.Sort() diff --git a/controllers/platform/services/properties_services_test.go b/controllers/platform/services/properties_services_test.go index 46ea31a44..4e43898bd 100644 --- a/controllers/platform/services/properties_services_test.go +++ b/controllers/platform/services/properties_services_test.go @@ -177,7 +177,7 @@ func setJobServiceEnabledValue(v *bool) plfmOptionFn { p.Spec.Services = &operatorapi.ServicesPlatformSpec{} } if p.Spec.Services.JobService == nil { - p.Spec.Services.JobService = &operatorapi.ServiceSpec{} + p.Spec.Services.JobService = &operatorapi.JobServiceServiceSpec{} } p.Spec.Services.JobService.Enabled = v } @@ -189,7 +189,7 @@ func setDataIndexEnabledValue(v *bool) plfmOptionFn { p.Spec.Services = &operatorapi.ServicesPlatformSpec{} } if p.Spec.Services.DataIndex == nil { - p.Spec.Services.DataIndex = &operatorapi.ServiceSpec{} + p.Spec.Services.DataIndex = &operatorapi.DataIndexServiceSpec{} } p.Spec.Services.DataIndex.Enabled = v } @@ -201,7 +201,7 @@ func emptyDataIndexServiceSpec() plfmOptionFn { p.Spec.Services = &operatorapi.ServicesPlatformSpec{} } if p.Spec.Services.DataIndex == nil { - p.Spec.Services.DataIndex = &operatorapi.ServiceSpec{} + p.Spec.Services.DataIndex = &operatorapi.DataIndexServiceSpec{} } } } @@ -212,7 +212,7 @@ func emptyJobServiceSpec() plfmOptionFn { p.Spec.Services = &operatorapi.ServicesPlatformSpec{} } if p.Spec.Services.JobService == nil { - p.Spec.Services.JobService = &operatorapi.ServiceSpec{} + p.Spec.Services.JobService = &operatorapi.JobServiceServiceSpec{} } } } @@ -235,7 +235,7 @@ func setJobServiceJDBC(jdbc string) plfmOptionFn { p.Spec.Services = &operatorapi.ServicesPlatformSpec{} } if p.Spec.Services.JobService == nil { - p.Spec.Services.JobService = &operatorapi.ServiceSpec{} + p.Spec.Services.JobService = &operatorapi.JobServiceServiceSpec{} } if p.Spec.Services.JobService.Persistence == nil { p.Spec.Services.JobService.Persistence = &operatorapi.PersistenceOptionsSpec{} @@ -253,7 +253,7 @@ func setDataIndexJDBC(jdbc string) plfmOptionFn { p.Spec.Services = &operatorapi.ServicesPlatformSpec{} } if p.Spec.Services.DataIndex == nil { - p.Spec.Services.DataIndex = &operatorapi.ServiceSpec{} + p.Spec.Services.DataIndex = &operatorapi.DataIndexServiceSpec{} } if p.Spec.Services.DataIndex.Persistence == nil { p.Spec.Services.DataIndex.Persistence = &operatorapi.PersistenceOptionsSpec{} diff --git a/controllers/platform/services/services.go b/controllers/platform/services/services.go index 4cd9c02ba..1a614d710 100644 --- a/controllers/platform/services/services.go +++ b/controllers/platform/services/services.go @@ -23,23 +23,33 @@ import ( "fmt" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/cfg" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles" "github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes" + "github.com/imdario/mergo" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "github.com/magiconair/properties" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/tracker" + "sigs.k8s.io/controller-runtime/pkg/client" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" - "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/constants" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/persistence" - "github.com/magiconair/properties" - "github.com/apache/incubator-kie-kogito-serverless-operator/version" - "github.com/imdario/mergo" ) const ( quarkusHibernateORMDatabaseGeneration string = "QUARKUS_HIBERNATE_ORM_DATABASE_GENERATION" quarkusFlywayMigrateAtStart string = "QUARKUS_FLYWAY_MIGRATE_AT_START" + pathProcesses string = "/processes" + pathDefinitions string = "/definitions" + pathJobs string = "/jobs" ) type PlatformServiceHandler interface { @@ -73,6 +83,8 @@ type PlatformServiceHandler interface { MergePodSpec(podSpec corev1.PodSpec) (corev1.PodSpec, error) // GenerateServiceProperties returns a property object that contains the application properties required by the service deployment GenerateServiceProperties() (*properties.Properties, error) + // GenerateKnativeResources returns knative resources that bridge between workflow deploys and the service + GenerateKnativeResources(platform *operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object, error) // IsServiceSetInSpec returns true if the service is set in the spec. IsServiceSetInSpec() bool @@ -89,6 +101,8 @@ type PlatformServiceHandler interface { SetServiceUrlInPlatformStatus(clusterRefPlatform *operatorapi.SonataFlowPlatform) // SetServiceUrlInWorkflowStatus sets the service url in a workflow's status. SetServiceUrlInWorkflowStatus(workflow *operatorapi.SonataFlow) + + GetServiceSource() *duckv1.Destination } type DataIndexHandler struct { @@ -96,10 +110,10 @@ type DataIndexHandler struct { } func NewDataIndexHandler(platform *operatorapi.SonataFlowPlatform) PlatformServiceHandler { - return DataIndexHandler{platform: platform} + return &DataIndexHandler{platform: platform} } -func (d DataIndexHandler) GetContainerName() string { +func (d *DataIndexHandler) GetContainerName() string { return constants.DataIndexServiceName } @@ -121,7 +135,7 @@ func (d DataIndexHandler) GetServiceImageName(persistenceType constants.Persiste return fmt.Sprintf("%s-%s-%s:%s", constants.ImageNamePrefix, constants.DataIndexName, persistenceType.String()+suffix, tag) } -func (d DataIndexHandler) GetServiceName() string { +func (d *DataIndexHandler) GetServiceName() string { return fmt.Sprintf("%s-%s", d.platform.Name, constants.DataIndexServiceName) } @@ -154,21 +168,21 @@ func (d DataIndexHandler) IsServiceSetInSpec() bool { return isDataIndexSet(d.platform) } -func (d DataIndexHandler) IsServiceEnabledInSpec() bool { +func (d *DataIndexHandler) IsServiceEnabledInSpec() bool { return isDataIndexEnabled(d.platform) } -func (d DataIndexHandler) isServiceEnabledInStatus() bool { +func (d *DataIndexHandler) isServiceEnabledInStatus() bool { return d.platform != nil && d.platform.Status.ClusterPlatformRef != nil && d.platform.Status.ClusterPlatformRef.Services != nil && d.platform.Status.ClusterPlatformRef.Services.DataIndexRef != nil && !isServicesSet(d.platform) } -func (d DataIndexHandler) IsServiceEnabled() bool { +func (d *DataIndexHandler) IsServiceEnabled() bool { return d.IsServiceEnabledInSpec() || d.isServiceEnabledInStatus() } -func (d DataIndexHandler) GetServiceBaseUrl() string { +func (d *DataIndexHandler) GetServiceBaseUrl() string { if d.IsServiceEnabledInSpec() { return d.GetLocalServiceBaseUrl() } @@ -178,11 +192,11 @@ func (d DataIndexHandler) GetServiceBaseUrl() string { return "" } -func (d DataIndexHandler) GetLocalServiceBaseUrl() string { +func (d *DataIndexHandler) GetLocalServiceBaseUrl() string { return GenerateServiceURL(constants.KogitoServiceURLProtocol, d.platform.Namespace, d.GetServiceName()) } -func (d DataIndexHandler) GetEnvironmentVariables() []corev1.EnvVar { +func (d *DataIndexHandler) GetEnvironmentVariables() []corev1.EnvVar { return []corev1.EnvVar{ { Name: "KOGITO_DATA_INDEX_QUARKUS_PROFILE", @@ -199,7 +213,7 @@ func (d DataIndexHandler) GetEnvironmentVariables() []corev1.EnvVar { } } -func (d DataIndexHandler) GetPodResourceRequirements() corev1.ResourceRequirements { +func (d *DataIndexHandler) GetPodResourceRequirements() corev1.ResourceRequirements { return corev1.ResourceRequirements{ Requests: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("100m"), @@ -212,20 +226,20 @@ func (d DataIndexHandler) GetPodResourceRequirements() corev1.ResourceRequiremen } } -func (d DataIndexHandler) MergePodSpec(podSpec corev1.PodSpec) (corev1.PodSpec, error) { +func (d *DataIndexHandler) MergePodSpec(podSpec corev1.PodSpec) (corev1.PodSpec, error) { c := podSpec.DeepCopy() err := mergo.Merge(c, d.platform.Spec.Services.DataIndex.PodTemplate.PodSpec.ToPodSpec(), mergo.WithOverride) return *c, err } // hasPostgreSQLConfigured returns true when either the SonataFlow Platform PostgreSQL CR's structure or the one in the Data Index service specification is not nil -func (d DataIndexHandler) hasPostgreSQLConfigured() bool { +func (d *DataIndexHandler) hasPostgreSQLConfigured() bool { return d.IsServiceSetInSpec() && ((d.platform.Spec.Services.DataIndex.Persistence != nil && d.platform.Spec.Services.DataIndex.Persistence.PostgreSQL != nil) || (d.platform.Spec.Persistence != nil && d.platform.Spec.Persistence.PostgreSQL != nil)) } -func (d DataIndexHandler) ConfigurePersistence(containerSpec *corev1.Container) *corev1.Container { +func (d *DataIndexHandler) ConfigurePersistence(containerSpec *corev1.Container) *corev1.Container { if d.hasPostgreSQLConfigured() { p := persistence.RetrieveConfiguration(d.platform.Spec.Services.DataIndex.Persistence, d.platform.Spec.Persistence, d.GetServiceName()) c := containerSpec.DeepCopy() @@ -242,18 +256,25 @@ func (d DataIndexHandler) MergeContainerSpec(containerSpec *corev1.Container) (* return mergeContainerSpec(containerSpec, &d.platform.Spec.Services.DataIndex.PodTemplate.Container) } -func (d DataIndexHandler) GetReplicaCount() int32 { +func (d *DataIndexHandler) GetReplicaCount() int32 { if d.platform.Spec.Services.DataIndex.PodTemplate.Replicas != nil { return *d.platform.Spec.Services.DataIndex.PodTemplate.Replicas } return 1 } -func (d DataIndexHandler) GetServiceCmName() string { +func (d *DataIndexHandler) GetServiceCmName() string { return fmt.Sprintf("%s-props", d.GetServiceName()) } -func (d DataIndexHandler) GenerateServiceProperties() (*properties.Properties, error) { +func (d *DataIndexHandler) GetServiceSource() *duckv1.Destination { + if d.platform.Spec.Services.DataIndex.Source != nil { + return d.platform.Spec.Services.DataIndex.Source + } + return GetPlatformBroker(d.platform) +} + +func (d *DataIndexHandler) GenerateServiceProperties() (*properties.Properties, error) { props := properties.NewProperties() props.Set(constants.KogitoServiceURLProperty, d.GetLocalServiceBaseUrl()) props.Set(constants.DataIndexKafkaSmallRyeHealthProperty, "false") @@ -265,10 +286,10 @@ type JobServiceHandler struct { } func NewJobServiceHandler(platform *operatorapi.SonataFlowPlatform) PlatformServiceHandler { - return JobServiceHandler{platform: platform} + return &JobServiceHandler{platform: platform} } -func (j JobServiceHandler) GetContainerName() string { +func (j *JobServiceHandler) GetContainerName() string { return constants.JobServiceName } @@ -290,11 +311,11 @@ func (j JobServiceHandler) GetServiceImageName(persistenceType constants.Persist return fmt.Sprintf("%s-%s-%s:%s", constants.ImageNamePrefix, constants.JobServiceName, persistenceType.String()+suffix, tag) } -func (j JobServiceHandler) GetServiceName() string { +func (j *JobServiceHandler) GetServiceName() string { return fmt.Sprintf("%s-%s", j.platform.Name, constants.JobServiceName) } -func (j JobServiceHandler) GetServiceCmName() string { +func (j *JobServiceHandler) GetServiceCmName() string { return fmt.Sprintf("%s-props", j.GetServiceName()) } @@ -327,21 +348,21 @@ func (j JobServiceHandler) IsServiceSetInSpec() bool { return isJobServiceSet(j.platform) } -func (j JobServiceHandler) IsServiceEnabledInSpec() bool { +func (j *JobServiceHandler) IsServiceEnabledInSpec() bool { return isJobServiceEnabled(j.platform) } -func (j JobServiceHandler) isServiceEnabledInStatus() bool { +func (j *JobServiceHandler) isServiceEnabledInStatus() bool { return j.platform != nil && j.platform.Status.ClusterPlatformRef != nil && j.platform.Status.ClusterPlatformRef.Services != nil && j.platform.Status.ClusterPlatformRef.Services.JobServiceRef != nil && !isServicesSet(j.platform) } -func (j JobServiceHandler) IsServiceEnabled() bool { +func (j *JobServiceHandler) IsServiceEnabled() bool { return j.IsServiceEnabledInSpec() || j.isServiceEnabledInStatus() } -func (j JobServiceHandler) GetServiceBaseUrl() string { +func (j *JobServiceHandler) GetServiceBaseUrl() string { if j.IsServiceEnabledInSpec() { return j.GetLocalServiceBaseUrl() } @@ -351,11 +372,11 @@ func (j JobServiceHandler) GetServiceBaseUrl() string { return "" } -func (j JobServiceHandler) GetLocalServiceBaseUrl() string { +func (j *JobServiceHandler) GetLocalServiceBaseUrl() string { return GenerateServiceURL(constants.JobServiceURLProtocol, j.platform.Namespace, j.GetServiceName()) } -func (j JobServiceHandler) GetEnvironmentVariables() []corev1.EnvVar { +func (j *JobServiceHandler) GetEnvironmentVariables() []corev1.EnvVar { return []corev1.EnvVar{ { Name: "QUARKUS_HTTP_CORS", @@ -368,7 +389,7 @@ func (j JobServiceHandler) GetEnvironmentVariables() []corev1.EnvVar { } } -func (j JobServiceHandler) GetPodResourceRequirements() corev1.ResourceRequirements { +func (j *JobServiceHandler) GetPodResourceRequirements() corev1.ResourceRequirements { return corev1.ResourceRequirements{ Requests: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("250m"), @@ -381,7 +402,7 @@ func (j JobServiceHandler) GetPodResourceRequirements() corev1.ResourceRequireme } } -func (j JobServiceHandler) GetReplicaCount() int32 { +func (j *JobServiceHandler) GetReplicaCount() int32 { return 1 } @@ -390,13 +411,13 @@ func (j JobServiceHandler) MergeContainerSpec(containerSpec *corev1.Container) ( } // hasPostgreSQLConfigured returns true when either the SonataFlow Platform PostgreSQL CR's structure or the one in the Job service specification is not nil -func (j JobServiceHandler) hasPostgreSQLConfigured() bool { +func (j *JobServiceHandler) hasPostgreSQLConfigured() bool { return j.IsServiceSetInSpec() && ((j.platform.Spec.Services.JobService.Persistence != nil && j.platform.Spec.Services.JobService.Persistence.PostgreSQL != nil) || (j.platform.Spec.Persistence != nil && j.platform.Spec.Persistence.PostgreSQL != nil)) } -func (j JobServiceHandler) ConfigurePersistence(containerSpec *corev1.Container) *corev1.Container { +func (j *JobServiceHandler) ConfigurePersistence(containerSpec *corev1.Container) *corev1.Container { if j.hasPostgreSQLConfigured() { c := containerSpec.DeepCopy() @@ -411,16 +432,22 @@ func (j JobServiceHandler) ConfigurePersistence(containerSpec *corev1.Container) return containerSpec } -func (j JobServiceHandler) MergePodSpec(podSpec corev1.PodSpec) (corev1.PodSpec, error) { +func (j *JobServiceHandler) MergePodSpec(podSpec corev1.PodSpec) (corev1.PodSpec, error) { c := podSpec.DeepCopy() err := mergo.Merge(c, j.platform.Spec.Services.JobService.PodTemplate.PodSpec.ToPodSpec(), mergo.WithOverride) return *c, err } -func (j JobServiceHandler) GenerateServiceProperties() (*properties.Properties, error) { +func (j *JobServiceHandler) GenerateServiceProperties() (*properties.Properties, error) { props := properties.NewProperties() + props.Set(constants.KogitoServiceURLProperty, GenerateServiceURL(constants.KogitoServiceURLProtocol, j.platform.Namespace, j.GetServiceName())) - props.Set(constants.JobServiceKafkaSmallRyeHealthProperty, "false") + if j.GetServiceSource() == nil { + props.Set(constants.JobServiceKafkaSmallRyeHealthProperty, "false") + } else { + props.Set(constants.JobServiceKafkaSmallRyeHealthProperty, "true") + } + // add data source reactive URL if j.hasPostgreSQLConfigured() { p := persistence.RetrieveConfiguration(j.platform.Spec.Services.JobService.Persistence, j.platform.Spec.Persistence, j.GetServiceName()) @@ -432,9 +459,15 @@ func (j JobServiceHandler) GenerateServiceProperties() (*properties.Properties, } if isDataIndexEnabled(j.platform) { - di := NewDataIndexHandler(j.platform) props.Set(constants.JobServiceStatusChangeEvents, "true") - props.Set(constants.JobServiceStatusChangeEventsURL, di.GetLocalServiceBaseUrl()+"/jobs") + if /*!isKnativeEnvInjected ||*/ j.GetServiceSource() == nil { + di := NewDataIndexHandler(j.platform) + props.Set(constants.JobServiceStatusChangeEventsURL, di.GetLocalServiceBaseUrl()+"/jobs") + } else { + props.Set(constants.JobServiceStatusChangeEventsURL, constants.KnativeInjectedEnvVar) + props.Set(constants.JobServiceStatusChangeEventsConnector, constants.QuarkusHTTP) + props.Set(constants.JobServiceStatusChangeEventsMethod, constants.Post) + } } props.Sort() return props, nil @@ -449,6 +482,20 @@ func SetServiceUrlsInWorkflowStatus(pl *operatorapi.SonataFlowPlatform, workflow tpsJS.SetServiceUrlInWorkflowStatus(workflow) } +func (j *JobServiceHandler) GetServiceSource() *duckv1.Destination { + if j.platform.Spec.Services.JobService.Source != nil { + return j.platform.Spec.Services.JobService.Source + } + return GetPlatformBroker(j.platform) +} + +func (j *JobServiceHandler) GetServiceSink() *duckv1.Destination { + if j.platform.Spec.Services.JobService.Sink != nil { + return j.platform.Spec.Services.JobService.Sink + } + return GetPlatformBroker(j.platform) +} + func isDataIndexEnabled(platform *operatorapi.SonataFlowPlatform) bool { return isDataIndexSet(platform) && platform.Spec.Services.DataIndex.Enabled != nil && *platform.Spec.Services.DataIndex.Enabled @@ -505,3 +552,167 @@ func mergeContainerPreservingEnvVars(dest *corev1.Container, source *corev1.Cont } return nil } + +// GetPlatformBroker gets the default broker for the platform. +func GetPlatformBroker(platform *operatorapi.SonataFlowPlatform) *duckv1.Destination { + if platform != nil && platform.Spec.Eventing != nil && platform.Spec.Eventing.Broker != nil { + return platform.Spec.Eventing.Broker + } + return nil +} + +func (d *DataIndexHandler) GetSourceBroker() *duckv1.Destination { + if d.platform != nil && d.platform.Spec.Services.DataIndex.Source != nil && d.platform.Spec.Services.DataIndex.Source.Ref != nil { + return d.platform.Spec.Services.DataIndex.Source + } + return GetPlatformBroker(d.platform) +} + +func (d *DataIndexHandler) newTrigger(labels map[string]string, brokerName, namespace, platformName, serviceName, tag, eventType, path string) *eventingv1.Trigger { + return &eventingv1.Trigger{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-data-index-%s-trigger", platformName, tag), + Namespace: namespace, + Labels: labels, + }, + Spec: eventingv1.TriggerSpec{ + Broker: brokerName, + Filter: &eventingv1.TriggerFilter{ + Attributes: eventingv1.TriggerFilterAttributes{ + "type": eventType, + }, + }, + Subscriber: duckv1.Destination{ + Ref: &duckv1.KReference{ + Name: serviceName, + Namespace: namespace, + APIVersion: "v1", + Kind: "Service", + }, + URI: &apis.URL{ + Path: path, + }, + }, + }, + } +} +func (d *DataIndexHandler) GenerateKnativeResources(platform *operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object, error) { + broker := d.GetSourceBroker() + if broker == nil { + return nil, nil // Nothing to do + } + brokerName := broker.Ref.Name + namespace := platform.Namespace + serviceName := d.GetServiceName() + return []client.Object{ + d.newTrigger(lbl, brokerName, namespace, platform.Name, serviceName, "process-error", "ProcessInstanceErrorDataEvent", pathProcesses), + d.newTrigger(lbl, brokerName, namespace, platform.Name, serviceName, "process-node", "ProcessInstanceNodeDataEvent", pathProcesses), + d.newTrigger(lbl, brokerName, namespace, platform.Name, serviceName, "process-sla", "ProcessInstanceSLADataEvent", pathProcesses), + d.newTrigger(lbl, brokerName, namespace, platform.Name, serviceName, "process-state", "ProcessInstanceStateDataEvent", pathProcesses), + d.newTrigger(lbl, brokerName, namespace, platform.Name, serviceName, "process-variable", "ProcessInstanceVariableDataEvent", pathProcesses), + d.newTrigger(lbl, brokerName, namespace, platform.Name, serviceName, "process-definition", "ProcessDefinitionEvent", pathDefinitions), + d.newTrigger(lbl, brokerName, namespace, platform.Name, serviceName, "jobs", "JobEvent", pathJobs)}, nil +} + +func (d JobServiceHandler) GetSourceBroker() *duckv1.Destination { + if d.platform.Spec.Services.JobService.Source != nil && d.platform.Spec.Services.JobService.Source.Ref != nil { + return d.platform.Spec.Services.JobService.Source + } + return GetPlatformBroker(d.platform) +} + +func (d JobServiceHandler) GetSink() *duckv1.Destination { + if d.platform.Spec.Services.JobService.Sink != nil { + return d.platform.Spec.Services.JobService.Source + } + return GetPlatformBroker(d.platform) +} + +func (j *JobServiceHandler) GenerateKnativeResources(platform *operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object, error) { + broker := j.GetSourceBroker() + sink := j.GetSink() + namespace := platform.Namespace + resultObjs := []client.Object{} + + if broker != nil { + brokerName := broker.Ref.Name + jobCreateTrigger := &eventingv1.Trigger{ + ObjectMeta: metav1.ObjectMeta{ + Name: "jobs-service-create-job-trigger", + Namespace: namespace, + Labels: lbl, + }, + Spec: eventingv1.TriggerSpec{ + Broker: brokerName, + Filter: &eventingv1.TriggerFilter{ + Attributes: eventingv1.TriggerFilterAttributes{ + "type": "job.create", + }, + }, + Subscriber: duckv1.Destination{ + Ref: &duckv1.KReference{ + Name: j.GetServiceName(), + Namespace: namespace, + APIVersion: "v1", + Kind: "Service", + }, + URI: &apis.URL{ + Path: "/v2/jobs/events", + }, + }, + }, + } + resultObjs = append(resultObjs, jobCreateTrigger) + jobDeleteTrigger := &eventingv1.Trigger{ + ObjectMeta: metav1.ObjectMeta{ + Name: "jobs-service-delete-job-trigger", + Namespace: namespace, + Labels: lbl, + }, + Spec: eventingv1.TriggerSpec{ + Broker: brokerName, + Filter: &eventingv1.TriggerFilter{ + Attributes: eventingv1.TriggerFilterAttributes{ + "type": "job.delete", + }, + }, + Subscriber: duckv1.Destination{ + Ref: &duckv1.KReference{ + Name: j.GetServiceName(), + Namespace: namespace, + APIVersion: "v1", + Kind: "Service", + }, + URI: &apis.URL{ + Path: "/v2/jobs/events", + }, + }, + }, + } + resultObjs = append(resultObjs, jobDeleteTrigger) + } + if sink != nil { + sinkBinding := &sourcesv1.SinkBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "jobs-service-sb", + Namespace: namespace, + Labels: lbl, + }, + Spec: sourcesv1.SinkBindingSpec{ + SourceSpec: duckv1.SourceSpec{ + Sink: *sink, + }, + BindingSpec: duckv1.BindingSpec{ + Subject: tracker.Reference{ + Name: j.GetServiceName(), + Namespace: namespace, + APIVersion: "apps/v1", + Kind: "Deployment", + }, + }, + }, + } + resultObjs = append(resultObjs, sinkBinding) + } + return resultObjs, nil +} diff --git a/controllers/profiles/common/constants/platform_services.go b/controllers/profiles/common/constants/platform_services.go index 30b25fb2f..da8f70669 100644 --- a/controllers/profiles/common/constants/platform_services.go +++ b/controllers/profiles/common/constants/platform_services.go @@ -20,21 +20,29 @@ package constants const ( - QuarkusHTTP = "quarkus-http" - + QuarkusHTTP = "quarkus-http" + Post = "POST" ConfigMapWorkflowPropsVolumeName = "workflow-properties" - JobServiceRequestEventsURL = "mp.messaging.outgoing.kogito-job-service-job-request-events.url" - JobServiceRequestEventsConnector = "mp.messaging.outgoing.kogito-job-service-job-request-events.connector" - JobServiceStatusChangeEvents = "kogito.jobs-service.http.job-status-change-events" - JobServiceStatusChangeEventsURL = "mp.messaging.outgoing.kogito-job-service-job-status-events-http.url" - JobServiceURLProtocol = "http" - JobServiceDataSourceReactiveURL = "quarkus.datasource.reactive.url" - JobServiceJobEventsPath = "/v2/jobs/events" + JobServiceRequestEventsURL = "mp.messaging.outgoing.kogito-job-service-job-request-events.url" + JobServiceRequestEventsMethod = "mp.messaging.outgoing.kogito-job-service-job-request-events.method" + JobServiceRequestEventsConnector = "mp.messaging.outgoing.kogito-job-service-job-request-events.connector" + JobServiceStatusChangeEvents = "kogito.jobs-service.http.job-status-change-events" + JobServiceStatusChangeEventsURL = "mp.messaging.outgoing.kogito-job-service-job-status-events-http.url" + JobServiceStatusChangeEventsConnector = "mp.messaging.outgoing.kogito-job-service-job-status-events-http.connector" + JobServiceStatusChangeEventsMethod = "mp.messaging.outgoing.kogito-job-service-job-status-events-http.method" + JobServiceURLProtocol = "http" + JobServiceDataSourceReactiveURL = "quarkus.datasource.reactive.url" + JobServiceJobEventsPath = "/v2/jobs/events" + KogitoProcessEventsProtocol = "http" + KogitoProcessInstancesEventsConnector = "mp.messaging.outgoing.kogito-processinstances-events.connector" + KogitoProcessInstancesEventsMethod = "mp.messaging.outgoing.kogito-processinstances-events.method" KogitoProcessInstancesEventsURL = "mp.messaging.outgoing.kogito-processinstances-events.url" KogitoProcessInstancesEventsEnabled = "kogito.events.processinstances.enabled" KogitoProcessInstancesEventsPath = "/processes" + KogitoProcessDefinitionsEventsConnector = "mp.messaging.outgoing.kogito-processdefinitions-events.connector" + KogitoProcessDefinitionsEventsMethod = "mp.messaging.outgoing.kogito-processdefinitions-events.method" KogitoProcessDefinitionsEventsURL = "mp.messaging.outgoing.kogito-processdefinitions-events.url" KogitoProcessDefinitionsEventsEnabled = "kogito.events.processdefinitions.enabled" KogitoProcessDefinitionsEventsErrorsEnabled = "kogito.events.processdefinitions.errors.propagate" @@ -45,6 +53,7 @@ const ( KogitoDataIndexHealthCheckEnabled = "kogito.data-index.health-enabled" // KogitoDataIndexURL configures the data index url, this value can be used internally by the workflow. KogitoDataIndexURL = "kogito.data-index.url" + // KogitoJobServiceHealthCheckEnabled configures if a workflow must check for the job service availability as part // of its start health check. KogitoJobServiceHealthCheckEnabled = "kogito.jobs-service.health-enabled" diff --git a/controllers/profiles/common/constants/workflows.go b/controllers/profiles/common/constants/workflows.go index 8087f963b..2c7b684b3 100644 --- a/controllers/profiles/common/constants/workflows.go +++ b/controllers/profiles/common/constants/workflows.go @@ -22,5 +22,5 @@ const ( KogitoIncomingEventsPath = "mp.messaging.incoming.kogito_incoming_stream.path" KnativeHealthEnabled = "org.kie.kogito.addons.knative.eventing.health-enabled" KnativeInjectedEnvVar = "${K_SINK}" - KnativeEventingBrokerDefault = "default" + WorkflowTriggerFinalizer = "trigger-deletion" ) diff --git a/controllers/profiles/common/ensurer.go b/controllers/profiles/common/ensurer.go index ad4558d25..0428d639d 100644 --- a/controllers/profiles/common/ensurer.go +++ b/controllers/profiles/common/ensurer.go @@ -28,6 +28,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/constants" ) var _ ObjectEnsurer = &defaultObjectEnsurer{} @@ -60,7 +61,7 @@ func NewObjectEnsurer(client client.Client, creator ObjectCreator) ObjectEnsurer } } -// NewObjectEnsurerWithPlatform see defaultObjectEnsurerWithPlatform +// NewObjectEnsurerWithPlatform see defaultObjectEnsurerWithPLatform func NewObjectEnsurerWithPlatform(client client.Client, creator ObjectCreatorWithPlatform) ObjectEnsurerWithPlatform { return &defaultObjectEnsurerWithPlatform{ c: client, @@ -137,6 +138,42 @@ type ObjectEnsurerResult struct { Error error } +// ObjectsEnsurer is an ensurer to apply multiple objects +type ObjectsEnsurerWithPlatform interface { + Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, pl *operatorapi.SonataFlowPlatform, visitors ...MutateVisitor) []ObjectEnsurerResult +} + +func NewObjectsEnsurerWithPlatform(client client.Client, creator ObjectsCreatorWithPlatform) ObjectsEnsurerWithPlatform { + return &defaultObjectsEnsurerWithPlatform{ + c: client, + creator: creator, + } +} + +type defaultObjectsEnsurerWithPlatform struct { + ObjectsEnsurer + c client.Client + creator ObjectsCreatorWithPlatform +} + +func (d *defaultObjectsEnsurerWithPlatform) Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, pl *operatorapi.SonataFlowPlatform, visitors ...MutateVisitor) []ObjectEnsurerResult { + result := controllerutil.OperationResultNone + + objects, err := d.creator(workflow, pl) + if err != nil { + return []ObjectEnsurerResult{{nil, result, err}} + } + var ensureResult []ObjectEnsurerResult + for _, object := range objects { + ensureObject, c, err := ensureObject(ctx, workflow, visitors, result, d.c, object) + ensureResult = append(ensureResult, ObjectEnsurerResult{ensureObject, c, err}) + if err != nil { + return ensureResult + } + } + return ensureResult +} + func NewObjectsEnsurer(client client.Client, creator ObjectsCreator) ObjectsEnsurer { return &defaultObjectsEnsurer{ c: client, @@ -168,6 +205,14 @@ func (d *defaultObjectsEnsurer) Ensure(ctx context.Context, workflow *operatorap return ensureResult } +func setWorkflowFinalizer(ctx context.Context, c client.Client, workflow *operatorapi.SonataFlow) error { + if !controllerutil.ContainsFinalizer(workflow, constants.WorkflowTriggerFinalizer) { + controllerutil.AddFinalizer(workflow, constants.WorkflowTriggerFinalizer) + return c.Update(ctx, workflow) + } + return nil +} + func ensureObject(ctx context.Context, workflow *operatorapi.SonataFlow, visitors []MutateVisitor, result controllerutil.OperationResult, c client.Client, object client.Object) (client.Object, controllerutil.OperationResult, error) { if result, err := controllerutil.CreateOrPatch(ctx, c, object, func() error { @@ -176,6 +221,11 @@ func ensureObject(ctx context.Context, workflow *operatorapi.SonataFlow, visitor return visitorErr } } + if workflow.Namespace != object.GetNamespace() { + // This is for Knative trigger in a different namespace + // Set the finalizer for trigger cleanup when the workflow is deleted + return setWorkflowFinalizer(ctx, c, workflow) + } return controllerutil.SetControllerReference(workflow, object, c.Scheme()) }); err != nil { return nil, result, err diff --git a/controllers/profiles/common/knative_eventing.go b/controllers/profiles/common/knative_eventing.go index 13f5bc49c..34684796e 100644 --- a/controllers/profiles/common/knative_eventing.go +++ b/controllers/profiles/common/knative_eventing.go @@ -17,26 +17,33 @@ package common import ( "context" - "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/knative" - operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/knative" "github.com/apache/incubator-kie-kogito-serverless-operator/log" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" ) +const ( + KnativeBundleVolume = "kne-bundle-volume" +) + var _ KnativeEventingHandler = &knativeObjectManager{} type knativeObjectManager struct { - sinkBinding ObjectEnsurer - trigger ObjectsEnsurer + sinkBinding ObjectEnsurerWithPlatform + trigger ObjectsEnsurerWithPlatform + platform *operatorapi.SonataFlowPlatform *StateSupport } -func NewKnativeEventingHandler(support *StateSupport) KnativeEventingHandler { +func NewKnativeEventingHandler(support *StateSupport, pl *operatorapi.SonataFlowPlatform) KnativeEventingHandler { return &knativeObjectManager{ - sinkBinding: NewObjectEnsurer(support.C, SinkBindingCreator), - trigger: NewObjectsEnsurer(support.C, TriggersCreator), + sinkBinding: NewObjectEnsurerWithPlatform(support.C, SinkBindingCreator), + trigger: NewObjectsEnsurerWithPlatform(support.C, TriggersCreator), + platform: pl, StateSupport: support, } } @@ -48,23 +55,23 @@ type KnativeEventingHandler interface { func (k knativeObjectManager) Ensure(ctx context.Context, workflow *operatorapi.SonataFlow) ([]client.Object, error) { var objs []client.Object - if workflow.Spec.Flow.Events == nil { - // skip if no event is found - klog.V(log.I).InfoS("skip knative resource creation as no event is found") - } else if workflow.Spec.Sink == nil { - klog.V(log.I).InfoS("Spec.Sink is not provided") - } else if knativeAvail, err := knative.GetKnativeAvailability(k.Cfg); err != nil || knativeAvail == nil || !knativeAvail.Eventing { + knativeAvail, err := knative.GetKnativeAvailability(k.Cfg) + if err != nil { + klog.V(log.I).InfoS("Error checking Knative Eventing: %v", err) + return nil, err + } + if !knativeAvail.Eventing { klog.V(log.I).InfoS("Knative Eventing is not installed") } else { // create sinkBinding and trigger - sinkBinding, _, err := k.sinkBinding.Ensure(ctx, workflow) + sinkBinding, _, err := k.sinkBinding.Ensure(ctx, workflow, k.platform) if err != nil { return objs, err } else if sinkBinding != nil { objs = append(objs, sinkBinding) } - triggers := k.trigger.Ensure(ctx, workflow) + triggers := k.trigger.Ensure(ctx, workflow, k.platform) for _, trigger := range triggers { if trigger.Error != nil { return objs, trigger.Error @@ -74,3 +81,55 @@ func (k knativeObjectManager) Ensure(ctx context.Context, workflow *operatorapi. } return objs, nil } + +func moveKnativeVolumeToEnd(vols []corev1.Volume) { + for i := 0; i < len(vols)-1; i++ { + if vols[i].Name == KnativeBundleVolume { + vols[i], vols[i+1] = vols[i+1], vols[i] + } + } +} + +func moveKnativeVolumeMountToEnd(mounts []corev1.VolumeMount) { + for i := 0; i < len(mounts)-1; i++ { + if mounts[i].Name == KnativeBundleVolume { + mounts[i], mounts[i+1] = mounts[i+1], mounts[i] + } + } +} + +// Knative Sinkbinding injects K_SINK env, a volume and volumn mount. The volume and volume mount +// must be in the end of the array to avoid repeadly restarting of the workflow pod +func restoreKnativeVolumeAndVolumeMount(deployment *appsv1.Deployment) { + moveKnativeVolumeToEnd(deployment.Spec.Template.Spec.Volumes) + for i := 0; i < len(deployment.Spec.Template.Spec.Containers); i++ { + moveKnativeVolumeMountToEnd(deployment.Spec.Template.Spec.Containers[i].VolumeMounts) + } +} + +func preserveKnativeVolumeMount(object *appsv1.Deployment) { + var kneVol *corev1.Volume = nil + for _, v := range object.Spec.Template.Spec.Volumes { + if v.Name == KnativeBundleVolume { + kneVol = &v + } + } + if kneVol != nil { + object.Spec.Template.Spec.Volumes = []corev1.Volume{*kneVol} + } else { + object.Spec.Template.Spec.Volumes = nil + } + for i := range object.Spec.Template.Spec.Containers { + var kneVolMount *corev1.VolumeMount = nil + for _, mount := range object.Spec.Template.Spec.Containers[i].VolumeMounts { + if mount.Name == KnativeBundleVolume { + kneVolMount = &mount + } + } + if kneVolMount == nil { + object.Spec.Template.Spec.Containers[i].VolumeMounts = nil + } else { + object.Spec.Template.Spec.Containers[i].VolumeMounts = []corev1.VolumeMount{*kneVolMount} + } + } +} diff --git a/controllers/profiles/common/mutate_visitors.go b/controllers/profiles/common/mutate_visitors.go index 426154ee0..f16ff5678 100644 --- a/controllers/profiles/common/mutate_visitors.go +++ b/controllers/profiles/common/mutate_visitors.go @@ -96,15 +96,13 @@ func EnsureDeployment(original *appsv1.Deployment, object *appsv1.Deployment) er object.Spec.Replicas = original.Spec.Replicas object.Spec.Selector = original.Spec.Selector object.Labels = original.GetLabels() + object.Finalizers = original.Finalizers // Clean up the volumes, they are inherited from original, additional are added by other visitors - object.Spec.Template.Spec.Volumes = nil - for i := range object.Spec.Template.Spec.Containers { - object.Spec.Template.Spec.Containers[i].VolumeMounts = nil - } - + // However, the knative mount path must be preserved + preserveKnativeVolumeMount(object) // we do a merge to not keep changing the spec since k8s will set default values to the podSpec - return mergo.Merge(&object.Spec.Template.Spec, original.Spec.Template.Spec, mergo.WithOverride) + return mergo.Merge(&object.Spec, original.Spec) } // KServiceMutateVisitor guarantees the state of the default Knative Service object @@ -190,6 +188,7 @@ func RolloutDeploymentIfCMChangedMutateVisitor(workflow *operatorapi.SonataFlow, return func(object client.Object) controllerutil.MutateFn { return func() error { deployment := object.(*appsv1.Deployment) + restoreKnativeVolumeAndVolumeMount(deployment) err := kubeutil.AnnotateDeploymentConfigChecksum(workflow, deployment, userPropsCM, managedPropsCM) return err } diff --git a/controllers/profiles/common/object_creators.go b/controllers/profiles/common/object_creators.go index c1ac67030..179a3c164 100644 --- a/controllers/profiles/common/object_creators.go +++ b/controllers/profiles/common/object_creators.go @@ -20,6 +20,7 @@ package common import ( + "context" "fmt" "strings" @@ -31,7 +32,9 @@ import ( "github.com/imdario/mergo" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -39,6 +42,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/knative" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/platform/services" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/constants" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/persistence" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/properties" @@ -60,6 +65,9 @@ type ObjectCreatorWithPlatform func(workflow *operatorapi.SonataFlow, platform * // ObjectsCreator creates multiple resources type ObjectsCreator func(workflow *operatorapi.SonataFlow) ([]client.Object, error) +// ObjectsCreatorWithPlatform creates multiple resources +type ObjectsCreatorWithPlatform func(workflow *operatorapi.SonataFlow, platform *operatorapi.SonataFlowPlatform) ([]client.Object, error) + const ( defaultHTTPServicePort = 80 @@ -70,6 +78,7 @@ const ( healthStartedFailureThreshold = 5 healthStartedPeriodSeconds = 15 healthStartedInitialDelaySeconds = 10 + healthSuccessThreshold = 1 ) // DeploymentCreator is an objectCreator for a base Kubernetes Deployments for profiles that need to deploy the workflow on a vanilla deployment. @@ -165,34 +174,44 @@ func defaultContainer(workflow *operatorapi.SonataFlow, plf *operatorapi.SonataF LivenessProbe: &corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ HTTPGet: &corev1.HTTPGetAction{ - Path: constants.QuarkusHealthPathLive, - Port: variables.DefaultHTTPWorkflowPortIntStr, + Path: constants.QuarkusHealthPathLive, + Port: variables.DefaultHTTPWorkflowPortIntStr, + Scheme: corev1.URISchemeHTTP, }, }, - TimeoutSeconds: healthTimeoutSeconds, - PeriodSeconds: healthStartedPeriodSeconds, + InitialDelaySeconds: healthStartedInitialDelaySeconds, + TimeoutSeconds: healthTimeoutSeconds, + FailureThreshold: healthStartedFailureThreshold, + PeriodSeconds: healthStartedPeriodSeconds, + SuccessThreshold: healthSuccessThreshold, }, ReadinessProbe: &corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ HTTPGet: &corev1.HTTPGetAction{ - Path: constants.QuarkusHealthPathReady, - Port: variables.DefaultHTTPWorkflowPortIntStr, + Path: constants.QuarkusHealthPathReady, + Port: variables.DefaultHTTPWorkflowPortIntStr, + Scheme: corev1.URISchemeHTTP, }, }, - TimeoutSeconds: healthTimeoutSeconds, - PeriodSeconds: healthStartedPeriodSeconds, + InitialDelaySeconds: healthStartedInitialDelaySeconds, + TimeoutSeconds: healthTimeoutSeconds, + FailureThreshold: healthStartedFailureThreshold, + PeriodSeconds: healthStartedPeriodSeconds, + SuccessThreshold: healthSuccessThreshold, }, StartupProbe: &corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ HTTPGet: &corev1.HTTPGetAction{ - Path: constants.QuarkusHealthPathStarted, - Port: variables.DefaultHTTPWorkflowPortIntStr, + Path: constants.QuarkusHealthPathStarted, + Port: variables.DefaultHTTPWorkflowPortIntStr, + Scheme: corev1.URISchemeHTTP, }, }, InitialDelaySeconds: healthStartedInitialDelaySeconds, TimeoutSeconds: healthTimeoutSeconds, FailureThreshold: healthStartedFailureThreshold, PeriodSeconds: healthStartedPeriodSeconds, + SuccessThreshold: healthSuccessThreshold, }, SecurityContext: kubeutil.SecurityDefaults(), } @@ -252,15 +271,23 @@ func ServiceCreator(workflow *operatorapi.SonataFlow) (client.Object, error) { // SinkBindingCreator is an ObjectsCreator for SinkBinding. // It will create v1.SinkBinding based on events defined in workflow. -func SinkBindingCreator(workflow *operatorapi.SonataFlow) (client.Object, error) { +func SinkBindingCreator(workflow *operatorapi.SonataFlow, plf *operatorapi.SonataFlowPlatform) (client.Object, error) { lbl := workflowproj.GetMergedLabels(workflow) - // skip if no produced event is found - if workflow.Spec.Sink == nil || !workflowdef.ContainsEventKind(workflow, cncfmodel.EventKindProduced) { - return nil, nil + sink, err := knative.GetWorkflowSink(workflow, plf) + if err != nil { + return nil, err } + dataIndexEnabled := services.IsDataIndexEnabled(plf) + jobServiceEnabled := services.IsJobServiceEnabled(plf) - sink := workflow.Spec.Sink + // skip if no produced event is found and there is no DataIndex/JobService enabled + if sink == nil { + if dataIndexEnabled || jobServiceEnabled || workflowdef.ContainsEventKind(workflow, cncfmodel.EventKindProduced) { + return nil, fmt.Errorf("a sink in the SonataFlow %s or broker in the SonataFlowPlatform %s should be configured when DataIndex or JobService is enabled", workflow.Name, plf.Name) + } + return nil, nil /*nothing to do*/ + } // subject must be deployment to inject K_SINK, service won't work sinkBinding := &sourcesv1.SinkBinding{ @@ -286,9 +313,48 @@ func SinkBindingCreator(workflow *operatorapi.SonataFlow) (client.Object, error) return sinkBinding, nil } +func getBrokerRefFromPlatform(plf *operatorapi.SonataFlowPlatform) (*duckv1.KReference, error) { + // check the local platform + if plf.Spec.Eventing != nil && plf.Spec.Eventing.Broker != nil && plf.Spec.Eventing.Broker.Ref != nil { + ref := plf.Spec.Eventing.Broker.Ref.DeepCopy() + if len(ref.Namespace) == 0 { + ref.Namespace = plf.Namespace // default to the platform namespace + } + return ref, nil + } + // Check the cluster platform + if plf.Status.ClusterPlatformRef != nil && len(plf.Status.ClusterPlatformRef.PlatformRef.Name) > 0 { + platform := &operatorapi.SonataFlowPlatform{} + if err := utils.GetClient().Get(context.TODO(), types.NamespacedName{Namespace: plf.Status.ClusterPlatformRef.PlatformRef.Namespace, Name: plf.Status.ClusterPlatformRef.PlatformRef.Name}, platform); err != nil { + if errors.IsNotFound(err) { + return nil, nil + } + return nil, err + } + return getBrokerRefFromPlatform(platform) + + } + return nil, nil +} + +func getBrokerRefForEventType(eventType string, workflow *operatorapi.SonataFlow, plf *operatorapi.SonataFlowPlatform) (*duckv1.KReference, error) { + // Check the workflow + for _, source := range workflow.Spec.Sources { + if source.EventType == eventType { + ref := source.Ref.DeepCopy() + if len(ref.Namespace) == 0 { + ref.Namespace = workflow.Namespace // default to the workflow namespace + } + return ref, nil + } + } + // get the broker from the local platform or cluster platform + return getBrokerRefFromPlatform(plf) +} + // TriggersCreator is an ObjectsCreator for Triggers. // It will create a list of eventingv1.Trigger based on events defined in workflow. -func TriggersCreator(workflow *operatorapi.SonataFlow) ([]client.Object, error) { +func TriggersCreator(workflow *operatorapi.SonataFlow, plf *operatorapi.SonataFlowPlatform) ([]client.Object, error) { var resultObjects []client.Object lbl := workflowproj.GetMergedLabels(workflow) @@ -299,16 +365,26 @@ func TriggersCreator(workflow *operatorapi.SonataFlow) ([]client.Object, error) if event.Kind == cncfmodel.EventKindProduced { continue } - + brokerRef, err := getBrokerRefForEventType(event.Type, workflow, plf) + if err != nil { + return nil, err + } + if brokerRef == nil { + return nil, fmt.Errorf("no broker configured for eventType %s in SonataFlow %s", event.Type, workflow.Name) + } + if !knative.IsKnativeBroker(brokerRef) { + return nil, fmt.Errorf("no valid broker configured for eventType %s in SonataFlow %s", event.Type, workflow.Name) + } // construct eventingv1.Trigger + // The trigger must be created in the same namespace as the broker trigger := &eventingv1.Trigger{ ObjectMeta: metav1.ObjectMeta{ Name: strings.ToLower(fmt.Sprintf("%s-%s-trigger", workflow.Name, event.Name)), - Namespace: workflow.Namespace, + Namespace: brokerRef.Namespace, Labels: lbl, }, Spec: eventingv1.TriggerSpec{ - Broker: constants.KnativeEventingBrokerDefault, + Broker: brokerRef.Name, Filter: &eventingv1.TriggerFilter{ Attributes: eventingv1.TriggerFilterAttributes{ "type": event.Type, @@ -344,6 +420,7 @@ func UserPropsConfigMapCreator(workflow *operatorapi.SonataFlow) (client.Object, // ManagedPropsConfigMapCreator creates an empty ConfigMap to hold the external application properties func ManagedPropsConfigMapCreator(workflow *operatorapi.SonataFlow, platform *operatorapi.SonataFlowPlatform) (client.Object, error) { + props, err := properties.ApplicationManagedProperties(workflow, platform) if err != nil { return nil, err diff --git a/controllers/profiles/common/object_creators_test.go b/controllers/profiles/common/object_creators_test.go index 5057b2180..0c701a4e2 100644 --- a/controllers/profiles/common/object_creators_test.go +++ b/controllers/profiles/common/object_creators_test.go @@ -179,9 +179,9 @@ func TestMergePodSpec_OverrideContainers(t *testing.T) { func Test_ensureWorkflowSinkBindingIsCreated(t *testing.T) { workflow := test.GetVetEventSonataFlow(t.Name()) - + plf := test.GetBasePlatform() //On Kubernetes we want the service exposed in Dev with NodePort - sinkBinding, _ := SinkBindingCreator(workflow) + sinkBinding, _ := SinkBindingCreator(workflow, plf) sinkBinding.SetUID("1") sinkBinding.SetResourceVersion("1") @@ -197,9 +197,9 @@ func Test_ensureWorkflowSinkBindingIsCreated(t *testing.T) { func Test_ensureWorkflowTriggersAreCreated(t *testing.T) { workflow := test.GetVetEventSonataFlow(t.Name()) - + plf := test.GetBasePlatform() //On Kubernetes we want the service exposed in Dev with NodePort - triggers, _ := TriggersCreator(workflow) + triggers, _ := TriggersCreator(workflow, plf) assert.NotEmpty(t, triggers) assert.Len(t, triggers, 2) diff --git a/controllers/profiles/common/properties/knative.go b/controllers/profiles/common/properties/knative.go index 195dd21fa..ab5716bf2 100644 --- a/controllers/profiles/common/properties/knative.go +++ b/controllers/profiles/common/properties/knative.go @@ -16,6 +16,7 @@ package properties import ( operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/knative" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/constants" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/workflowdef" "github.com/magiconair/properties" @@ -25,13 +26,17 @@ import ( // generateKnativeEventingWorkflowProperties returns the set of application properties required for the workflow to produce or consume // Knative Events. // Never nil. -func generateKnativeEventingWorkflowProperties(workflow *operatorapi.SonataFlow) (*properties.Properties, error) { +func generateKnativeEventingWorkflowProperties(workflow *operatorapi.SonataFlow, platform *operatorapi.SonataFlowPlatform) (*properties.Properties, error) { props := properties.NewProperties() - if workflow == nil || workflow.Spec.Sink == nil { + props.Set(constants.KnativeHealthEnabled, "false") + sink, err := knative.GetWorkflowSink(workflow, platform) + if err != nil { + return nil, err + } + if workflow == nil || sink == nil { props.Set(constants.KnativeHealthEnabled, "false") return props, nil } - // verify ${K_SINK} props.Set(constants.KnativeHealthEnabled, "true") if workflowdef.ContainsEventKind(workflow, cncfmodel.EventKindProduced) { props.Set(constants.KogitoOutgoingEventsConnector, constants.QuarkusHTTP) @@ -40,8 +45,8 @@ func generateKnativeEventingWorkflowProperties(workflow *operatorapi.SonataFlow) if workflowdef.ContainsEventKind(workflow, cncfmodel.EventKindConsumed) { props.Set(constants.KogitoIncomingEventsConnector, constants.QuarkusHTTP) var path = "/" - if workflow.Spec.Sink.URI != nil { - path = workflow.Spec.Sink.URI.Path + if sink.URI != nil { + path = sink.URI.Path } props.Set(constants.KogitoIncomingEventsPath, path) } diff --git a/controllers/profiles/common/properties/managed.go b/controllers/profiles/common/properties/managed.go index 98a73ad33..c77fe6636 100644 --- a/controllers/profiles/common/properties/managed.go +++ b/controllers/profiles/common/properties/managed.go @@ -172,7 +172,7 @@ func NewManagedPropertyHandler(workflow *operatorapi.SonataFlow, platform *opera props.Merge(p) } - p, err := generateKnativeEventingWorkflowProperties(workflow) + p, err := generateKnativeEventingWorkflowProperties(workflow, platform) if err != nil { return nil, err } diff --git a/controllers/profiles/common/properties/managed_test.go b/controllers/profiles/common/properties/managed_test.go index d27d69e1d..757041bf3 100644 --- a/controllers/profiles/common/properties/managed_test.go +++ b/controllers/profiles/common/properties/managed_test.go @@ -197,11 +197,15 @@ func Test_appPropertyHandler_WithServicesWithUserOverrides(t *testing.T) { platform.Namespace = ns platform.Spec = operatorapi.SonataFlowPlatformSpec{ Services: &operatorapi.ServicesPlatformSpec{ - DataIndex: &operatorapi.ServiceSpec{ - Enabled: &enabled, + DataIndex: &operatorapi.DataIndexServiceSpec{ + ServiceSpec: operatorapi.ServiceSpec{ + Enabled: &enabled, + }, }, - JobService: &operatorapi.ServiceSpec{ - Enabled: &enabled, + JobService: &operatorapi.JobServiceServiceSpec{ + ServiceSpec: operatorapi.ServiceSpec{ + Enabled: &enabled, + }, }, }, } @@ -636,7 +640,7 @@ func setJobServiceEnabledValue(v *bool) plfmOptionFn { p.Spec.Services = &operatorapi.ServicesPlatformSpec{} } if p.Spec.Services.JobService == nil { - p.Spec.Services.JobService = &operatorapi.ServiceSpec{} + p.Spec.Services.JobService = &operatorapi.JobServiceServiceSpec{} } p.Spec.Services.JobService.Enabled = v } @@ -648,7 +652,7 @@ func setDataIndexEnabledValue(v *bool) plfmOptionFn { p.Spec.Services = &operatorapi.ServicesPlatformSpec{} } if p.Spec.Services.DataIndex == nil { - p.Spec.Services.DataIndex = &operatorapi.ServiceSpec{} + p.Spec.Services.DataIndex = &operatorapi.DataIndexServiceSpec{} } p.Spec.Services.DataIndex.Enabled = v } @@ -672,7 +676,7 @@ func setJobServiceJDBC(jdbc string) plfmOptionFn { p.Spec.Services = &operatorapi.ServicesPlatformSpec{} } if p.Spec.Services.JobService == nil { - p.Spec.Services.JobService = &operatorapi.ServiceSpec{} + p.Spec.Services.JobService = &operatorapi.JobServiceServiceSpec{} } if p.Spec.Services.JobService.Persistence == nil { p.Spec.Services.JobService.Persistence = &operatorapi.PersistenceOptionsSpec{} diff --git a/controllers/profiles/dev/object_creators_dev.go b/controllers/profiles/dev/object_creators_dev.go index 0f6069f18..6db048ab1 100644 --- a/controllers/profiles/dev/object_creators_dev.go +++ b/controllers/profiles/dev/object_creators_dev.go @@ -51,6 +51,7 @@ func serviceCreator(workflow *operatorapi.SonataFlow) (client.Object, error) { } func deploymentCreator(workflow *operatorapi.SonataFlow, plf *operatorapi.SonataFlowPlatform) (client.Object, error) { + obj, err := common.DeploymentCreator(workflow, plf) if err != nil { return nil, err @@ -95,7 +96,7 @@ func deploymentMutateVisitor(workflow *operatorapi.SonataFlow, plf *operatorapi. } } -func ensureWorkflowDefConfigMapMutator(workflow *operatorapi.SonataFlow) common.MutateVisitor { +func ensureWorkflowDefConfigMapMutator(workflow *operatorapi.SonataFlow, support *common.StateSupport) common.MutateVisitor { return func(object client.Object) controllerutil.MutateFn { return func() error { if kubeutil.IsObjectNew(object) { diff --git a/controllers/profiles/dev/object_creators_dev_test.go b/controllers/profiles/dev/object_creators_dev_test.go index 25fdc8ee2..8c200d841 100644 --- a/controllers/profiles/dev/object_creators_dev_test.go +++ b/controllers/profiles/dev/object_creators_dev_test.go @@ -43,5 +43,5 @@ func Test_ensureWorkflowDevServiceIsExposed(t *testing.T) { assert.Equal(t, reflectService.Spec.Type, v1.ServiceTypeNodePort) assert.NotNil(t, reflectService.ObjectMeta) assert.NotNil(t, reflectService.ObjectMeta.Labels) - assert.Equal(t, reflectService.ObjectMeta.Labels, map[string]string{"test": "test", "app": "greeting", "sonataflow.org/workflow-app": "greeting"}) + assert.Equal(t, reflectService.ObjectMeta.Labels, map[string]string{"test": "test", "app": "greeting", "sonataflow.org/workflow-app": "greeting", "sonataflow.org/workflow-namespace": "Test_ensureWorkflowDevServiceIsExposed"}) } diff --git a/controllers/profiles/dev/states_dev.go b/controllers/profiles/dev/states_dev.go index 17bd5a80a..730c9d385 100644 --- a/controllers/profiles/dev/states_dev.go +++ b/controllers/profiles/dev/states_dev.go @@ -62,7 +62,7 @@ func (e *ensureRunningWorkflowState) CanReconcile(workflow *operatorapi.SonataFl func (e *ensureRunningWorkflowState) Do(ctx context.Context, workflow *operatorapi.SonataFlow) (ctrl.Result, []client.Object, error) { var objs []client.Object - flowDefCM, _, err := e.ensurers.definitionConfigMap.Ensure(ctx, workflow, ensureWorkflowDefConfigMapMutator(workflow)) + flowDefCM, _, err := e.ensurers.definitionConfigMap.Ensure(ctx, workflow, ensureWorkflowDefConfigMapMutator(workflow, e.StateSupport)) if err != nil { return ctrl.Result{Requeue: false}, objs, err } @@ -81,7 +81,7 @@ func (e *ensureRunningWorkflowState) Do(ctx context.Context, workflow *operatora if err != nil { return ctrl.Result{Requeue: false}, objs, err } - managedPropsCM, _, err := e.ensurers.managedPropsConfigMap.Ensure(ctx, workflow, pl, common.ManagedPropertiesMutateVisitor(ctx, e.StateSupport.Catalog, workflow, pl, userPropsCM.(*corev1.ConfigMap))) + managedPropsCM, _, err := e.ensurers.managedPropsConfigMap.Ensure(ctx, workflow, pl, common.ManagedPropertiesMutateVisitor(ctx, e.Catalog, workflow, pl, userPropsCM.(*corev1.ConfigMap))) if err != nil { return ctrl.Result{Requeue: false}, objs, err } @@ -117,7 +117,7 @@ func (e *ensureRunningWorkflowState) Do(ctx context.Context, workflow *operatora } objs = append(objs, route) - if knativeObjs, err := common.NewKnativeEventingHandler(e.StateSupport).Ensure(ctx, workflow); err != nil { + if knativeObjs, err := common.NewKnativeEventingHandler(e.StateSupport, pl).Ensure(ctx, workflow); err != nil { return ctrl.Result{RequeueAfter: constants.RequeueAfterFailure}, objs, err } else { objs = append(objs, knativeObjs...) diff --git a/controllers/profiles/factory/factory.go b/controllers/profiles/factory/factory.go index 511c6ca36..bd028b4c2 100644 --- a/controllers/profiles/factory/factory.go +++ b/controllers/profiles/factory/factory.go @@ -23,6 +23,7 @@ import ( "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/gitops" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/preview" "github.com/apache/incubator-kie-kogito-serverless-operator/log" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" diff --git a/controllers/profiles/preview/deployment_handler.go b/controllers/profiles/preview/deployment_handler.go index dfbac2538..0c2d6821f 100644 --- a/controllers/profiles/preview/deployment_handler.go +++ b/controllers/profiles/preview/deployment_handler.go @@ -16,6 +16,7 @@ package preview import ( "context" + "fmt" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/knative" v1 "k8s.io/api/core/v1" @@ -26,6 +27,7 @@ import ( "github.com/apache/incubator-kie-kogito-serverless-operator/api" operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/platform" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/platform/services" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/constants" "github.com/apache/incubator-kie-kogito-serverless-operator/utils" @@ -53,6 +55,11 @@ func (d *DeploymentReconciler) reconcileWithImage(ctx context.Context, workflow return reconcile.Result{Requeue: false}, nil, err } + // Checks if the workflow has sink configured. + if requires, err := d.ensureKnativeSinkConfigured(workflow); requires || err != nil { + return reconcile.Result{Requeue: false}, nil, err + } + // Ensure objects result, objs, err := d.ensureObjects(ctx, workflow, image) if err != nil || result.Requeue { @@ -89,6 +96,32 @@ func (d *DeploymentReconciler) ensureKnativeServingRequired(workflow *operatorap return false, nil } +// if Knative Eventing is available, the workflow should have a sink configured, or the platform should have a broker defined +func (d *DeploymentReconciler) ensureKnativeSinkConfigured(workflow *operatorapi.SonataFlow) (bool, error) { + avail, err := knative.GetKnativeAvailability(d.Cfg) + if err != nil { + return true, err + } + if !avail.Eventing { + return false, nil + } + platform, err := platform.GetActivePlatform(context.TODO(), d.C, workflow.Namespace) + if err != nil { + return true, err + } + sink, err := knative.GetWorkflowSink(workflow, platform) + if err != nil { + return true, err + } + if sink == nil && (services.IsDataIndexEnabled(platform) || services.IsJobServiceEnabled(platform)) { + d.Recorder.Eventf(workflow, v1.EventTypeWarning, + "KnativeSinkNotConfigured", + "Failed to deploy workflow. No sink configured in the workflow or the platform when Job Service or Data Index Service is enabled.") + return true, fmt.Errorf("no sink configured in the workflow or the platform when Job Service or Data Index Service is enabled") + } + return false, nil +} + func (d *DeploymentReconciler) ensureObjects(ctx context.Context, workflow *operatorapi.SonataFlow, image string) (reconcile.Result, []client.Object, error) { pl, _ := platform.GetActivePlatform(ctx, d.C, workflow.Namespace) userPropsCM, _, err := d.ensurers.userPropsConfigMap.Ensure(ctx, workflow) @@ -121,7 +154,7 @@ func (d *DeploymentReconciler) ensureObjects(ctx context.Context, workflow *oper return reconcile.Result{}, nil, err } - eventingObjs, err := common.NewKnativeEventingHandler(d.StateSupport).Ensure(ctx, workflow) + eventingObjs, err := common.NewKnativeEventingHandler(d.StateSupport, pl).Ensure(ctx, workflow) if err != nil { return reconcile.Result{}, nil, err } diff --git a/controllers/profiles/preview/states_preview.go b/controllers/profiles/preview/states_preview.go index 333c33976..eefd1e54b 100644 --- a/controllers/profiles/preview/states_preview.go +++ b/controllers/profiles/preview/states_preview.go @@ -21,6 +21,7 @@ package preview import ( "context" + "fmt" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -70,7 +71,7 @@ func (h *newBuilderState) Do(ctx context.Context, workflow *operatorapi.SonataFl // available at build time. userPropsCM, _, err := h.ensurers.userPropsConfigMap.Ensure(ctx, workflow) if err != nil { - workflow.Status.Manager().MarkFalse(api.RunningConditionType, api.ExternalResourcesNotFoundReason, "Unable to retrieve the user properties config map") + workflow.Status.Manager().MarkFalse(api.RunningConditionType, api.ExternalResourcesNotFoundReason, fmt.Sprintf("Unable to retrieve the user properties config map: %v", err)) _, err = h.PerformStatusUpdate(ctx, workflow) return ctrl.Result{}, nil, err } @@ -78,7 +79,7 @@ func (h *newBuilderState) Do(ctx context.Context, workflow *operatorapi.SonataFl _, _, err = h.ensurers.managedPropsConfigMap.Ensure(ctx, workflow, pl, common.ManagedPropertiesMutateVisitor(ctx, h.StateSupport.Catalog, workflow, pl, userPropsCM.(*corev1.ConfigMap))) if err != nil { - workflow.Status.Manager().MarkFalse(api.RunningConditionType, api.ExternalResourcesNotFoundReason, "Unable to retrieve the managed properties config map") + workflow.Status.Manager().MarkFalse(api.RunningConditionType, api.ExternalResourcesNotFoundReason, fmt.Sprintf("Unable to retrieve the managed properties config map: %v", err)) _, err = h.PerformStatusUpdate(ctx, workflow) return ctrl.Result{}, nil, err } @@ -210,7 +211,15 @@ func (h *deployWithBuildWorkflowState) Do(ctx context.Context, workflow *operato } // didn't change, business as usual - return NewDeploymentReconciler(h.StateSupport, h.ensurers).reconcileWithImage(ctx, workflow, build.Status.ImageTag) + result, objs, err := NewDeploymentReconciler(h.StateSupport, h.ensurers).reconcileWithImage(ctx, workflow, build.Status.ImageTag) + if err != nil { + workflow.Status.Manager().MarkFalse(api.DeployedConditionType, api.DeploymentFailureReason, fmt.Sprintf("Error in deploy the workflow:%s", err)) + _, err = h.PerformStatusUpdate(ctx, workflow) + return result, nil, err + } + workflow.Status.Manager().MarkTrue(api.DeployedConditionType) + _, err = h.PerformStatusUpdate(ctx, workflow) + return result, objs, err } func (h *deployWithBuildWorkflowState) PostReconcile(ctx context.Context, workflow *operatorapi.SonataFlow) error { diff --git a/controllers/sonataflow_controller.go b/controllers/sonataflow_controller.go index 7c1d28abb..943c89462 100644 --- a/controllers/sonataflow_controller.go +++ b/controllers/sonataflow_controller.go @@ -26,26 +26,31 @@ import ( "github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata" "k8s.io/klog/v2" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/constants" profiles "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/factory" - + "github.com/apache/incubator-kie-kogito-serverless-operator/workflowproj" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/rest" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/apache/incubator-kie-kogito-serverless-operator/api" - "github.com/apache/incubator-kie-kogito-serverless-operator/log" operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/knative" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/platform" ) @@ -91,6 +96,15 @@ func (r *SonataFlowReconciler) Reconcile(ctx context.Context, req ctrl.Request) } r.setDefaults(workflow) + // If the workflow is being deleted, clean up the triggers on a different namespace + if workflow.DeletionTimestamp != nil && controllerutil.ContainsFinalizer(workflow, constants.WorkflowTriggerFinalizer) { + err := r.cleanupTriggers(ctx, workflow) + if err != nil { + klog.V(log.E).ErrorS(err, "Failed to clean up triggers") + return ctrl.Result{}, err + } + return ctrl.Result{}, nil + } // Only process resources assigned to the operator if !platform.IsOperatorHandlerConsideringLock(ctx, r.Client, req.Namespace, workflow) { @@ -112,6 +126,43 @@ func (r *SonataFlowReconciler) setDefaults(workflow *operatorapi.SonataFlow) { } } +func (r *SonataFlowReconciler) cleanupTriggers(ctx context.Context, workflow *operatorapi.SonataFlow) error { + plf, _ := platform.GetActivePlatform(ctx, r.Client, workflow.Namespace) + if plf.Status.ClusterPlatformRef == nil { + return nil + } + avail, err := knative.GetKnativeAvailability(r.Config) + if err != nil { + return err + } + if avail.Eventing { + triggers := &eventingv1.TriggerList{} + opts := &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(labels.Set{ + workflowproj.LabelWorkflow: workflow.Name, + workflowproj.LabelWorkflowNamespace: workflow.Namespace, + }, + ), + Namespace: plf.Status.ClusterPlatformRef.PlatformRef.Namespace, + } + if err := r.Client.List(ctx, triggers, opts); err != nil { + return err + } + for _, trigger := range triggers.Items { + if err := r.Client.Delete(ctx, &trigger); err != nil { + return err + } + } + } + controllerutil.RemoveFinalizer(workflow, constants.WorkflowTriggerFinalizer) + return r.Client.Update(ctx, workflow) +} + +// Delete implements a handler for the Delete event. +func (r *SonataFlowReconciler) Delete(e event.DeleteEvent) error { + return nil +} + func platformEnqueueRequestsFromMapFunc(c client.Client, p *operatorapi.SonataFlowPlatform) []reconcile.Request { var requests []reconcile.Request diff --git a/controllers/sonataflowplatform_controller_test.go b/controllers/sonataflowplatform_controller_test.go index 022c76063..f1cec70c6 100644 --- a/controllers/sonataflowplatform_controller_test.go +++ b/controllers/sonataflowplatform_controller_test.go @@ -23,6 +23,12 @@ import ( "context" "testing" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" + v1 "knative.dev/pkg/apis/duck/v1" + "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/clusterplatform" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/platform/services" @@ -88,7 +94,7 @@ func TestSonataFlowPlatformController(t *testing.T) { // Create a SonataFlowPlatform object with metadata and spec. ksp := test.GetBasePlatformInReadyPhase(namespace) ksp.Spec.Services = &v1alpha08.ServicesPlatformSpec{ - DataIndex: &v1alpha08.ServiceSpec{}, + DataIndex: &v1alpha08.DataIndexServiceSpec{}, } // Create a fake client to mock API calls. @@ -167,20 +173,24 @@ func TestSonataFlowPlatformController(t *testing.T) { ksp := test.GetBasePlatformInReadyPhase(namespace) var replicas int32 = 2 ksp.Spec.Services = &v1alpha08.ServicesPlatformSpec{ - DataIndex: &v1alpha08.ServiceSpec{ - PodTemplate: v1alpha08.PodTemplateSpec{ - Replicas: &replicas, - Container: v1alpha08.ContainerSpec{ - Command: []string{"test:latest"}, + DataIndex: &v1alpha08.DataIndexServiceSpec{ + ServiceSpec: v1alpha08.ServiceSpec{ + PodTemplate: v1alpha08.PodTemplateSpec{ + Replicas: &replicas, + Container: v1alpha08.ContainerSpec{ + Command: []string{"test:latest"}, + }, }, }, + Source: nil, }, } - di := services.NewDataIndexHandler(ksp) - // Create a fake client to mock API calls. cl := test.NewKogitoClientBuilderWithOpenShift().WithRuntimeObjects(ksp).WithStatusSubresource(ksp).Build() + + di := services.NewDataIndexHandler(ksp) + // Create a SonataFlowPlatformReconciler object with the scheme and fake client. r := &SonataFlowPlatformReconciler{cl, cl, cl.Scheme(), &rest.Config{}, &record.FakeRecorder{}} @@ -260,8 +270,8 @@ func TestSonataFlowPlatformController(t *testing.T) { // Check with persistence set ksp.Spec = v1alpha08.SonataFlowPlatformSpec{ Services: &v1alpha08.ServicesPlatformSpec{ - DataIndex: &v1alpha08.ServiceSpec{}, - JobService: &v1alpha08.ServiceSpec{}, + DataIndex: &v1alpha08.DataIndexServiceSpec{}, + JobService: &v1alpha08.JobServiceServiceSpec{}, }, Persistence: &v1alpha08.PlatformPersistenceOptionsSpec{ PostgreSQL: &v1alpha08.PlatformPersistencePostgreSQL{ @@ -354,19 +364,23 @@ func TestSonataFlowPlatformController(t *testing.T) { urlJS := "jdbc:postgresql://localhost:5432/database?currentSchema=job-service" ksp.Spec = v1alpha08.SonataFlowPlatformSpec{ Services: &v1alpha08.ServicesPlatformSpec{ - DataIndex: &v1alpha08.ServiceSpec{ - Persistence: &v1alpha08.PersistenceOptionsSpec{ - PostgreSQL: &v1alpha08.PersistencePostgreSQL{ - SecretRef: v1alpha08.PostgreSQLSecretOptions{Name: "dataIndex"}, - JdbcUrl: urlDI, + DataIndex: &v1alpha08.DataIndexServiceSpec{ + ServiceSpec: v1alpha08.ServiceSpec{ + Persistence: &v1alpha08.PersistenceOptionsSpec{ + PostgreSQL: &v1alpha08.PersistencePostgreSQL{ + SecretRef: v1alpha08.PostgreSQLSecretOptions{Name: "dataIndex"}, + JdbcUrl: urlDI, + }, }, }, }, - JobService: &v1alpha08.ServiceSpec{ - Persistence: &v1alpha08.PersistenceOptionsSpec{ - PostgreSQL: &v1alpha08.PersistencePostgreSQL{ - SecretRef: v1alpha08.PostgreSQLSecretOptions{Name: "job"}, - JdbcUrl: urlJS, + JobService: &v1alpha08.JobServiceServiceSpec{ + ServiceSpec: v1alpha08.ServiceSpec{ + Persistence: &v1alpha08.PersistenceOptionsSpec{ + PostgreSQL: &v1alpha08.PersistencePostgreSQL{ + SecretRef: v1alpha08.PostgreSQLSecretOptions{Name: "job"}, + JdbcUrl: urlJS, + }, }, }, }, @@ -467,7 +481,7 @@ func TestSonataFlowPlatformController(t *testing.T) { // Create a SonataFlowPlatform object with metadata and spec. ksp := test.GetBasePlatformInReadyPhase(namespace) ksp.Spec.Services = &v1alpha08.ServicesPlatformSpec{ - JobService: &v1alpha08.ServiceSpec{}, + JobService: &v1alpha08.JobServiceServiceSpec{}, } // Create a fake client to mock API calls. @@ -544,20 +558,21 @@ func TestSonataFlowPlatformController(t *testing.T) { ksp := test.GetBasePlatformInReadyPhase(namespace) var replicas int32 = 2 ksp.Spec.Services = &v1alpha08.ServicesPlatformSpec{ - JobService: &v1alpha08.ServiceSpec{ - PodTemplate: v1alpha08.PodTemplateSpec{ - Replicas: &replicas, - Container: v1alpha08.ContainerSpec{ - Command: []string{"test:latest"}, + JobService: &v1alpha08.JobServiceServiceSpec{ + ServiceSpec: v1alpha08.ServiceSpec{ + PodTemplate: v1alpha08.PodTemplateSpec{ + Replicas: &replicas, + Container: v1alpha08.ContainerSpec{ + Command: []string{"test:latest"}, + }, }, }, }, } - js := services.NewJobServiceHandler(ksp) - // Create a fake client to mock API calls. cl := test.NewKogitoClientBuilderWithOpenShift().WithRuntimeObjects(ksp).WithStatusSubresource(ksp).Build() + js := services.NewJobServiceHandler(ksp) // Create a SonataFlowPlatformReconciler object with the scheme and fake client. r := &SonataFlowPlatformReconciler{cl, cl, cl.Scheme(), &rest.Config{}, &record.FakeRecorder{}} @@ -625,14 +640,14 @@ func TestSonataFlowPlatformController(t *testing.T) { // Create a SonataFlowPlatform object with metadata and spec. ksp := test.GetBasePlatformInReadyPhase(namespace) ksp.Spec.Services = &v1alpha08.ServicesPlatformSpec{ - DataIndex: &v1alpha08.ServiceSpec{}, - JobService: &v1alpha08.ServiceSpec{}, + DataIndex: &v1alpha08.DataIndexServiceSpec{}, + JobService: &v1alpha08.JobServiceServiceSpec{}, } - di := services.NewDataIndexHandler(ksp) - js := services.NewJobServiceHandler(ksp) // Create a fake client to mock API calls. cl := test.NewKogitoClientBuilderWithOpenShift().WithRuntimeObjects(ksp).WithStatusSubresource(ksp).Build() + di := services.NewDataIndexHandler(ksp) + js := services.NewJobServiceHandler(ksp) // Create a SonataFlowPlatformReconciler object with the scheme and fake client. r := &SonataFlowPlatformReconciler{cl, cl, cl.Scheme(), &rest.Config{}, &record.FakeRecorder{}} @@ -693,8 +708,8 @@ func TestSonataFlowPlatformController(t *testing.T) { // Create a SonataFlowPlatform object with metadata and spec. ksp := test.GetBasePlatformInReadyPhase(namespace) ksp.Spec.Services = &v1alpha08.ServicesPlatformSpec{ - DataIndex: &v1alpha08.ServiceSpec{}, - JobService: &v1alpha08.ServiceSpec{}, + DataIndex: &v1alpha08.DataIndexServiceSpec{}, + JobService: &v1alpha08.JobServiceServiceSpec{}, } ksp2 := test.GetBasePlatformInReadyPhase(namespace) ksp2.Name = "ksp2" @@ -827,4 +842,86 @@ func TestSonataFlowPlatformController(t *testing.T) { assert.NotNil(t, ksp2.Status.ClusterPlatformRef) assert.Nil(t, ksp2.Status.ClusterPlatformRef.Services) }) + t.Run("verify that knative resources creation for job service and data index service is performed without error", func(t *testing.T) { + namespace := t.Name() + // Create a SonataFlowPlatform object with metadata and spec. + ksp := test.GetBasePlatformInReadyPhase(namespace) + brokerName := "default" + enabled := true + broker := &eventingv1.Broker{ + ObjectMeta: metav1.ObjectMeta{ + Name: brokerName, + Namespace: namespace, + }, + } + ksp.Spec.Eventing = &v1alpha08.PlatformEventingSpec{ + Broker: &v1.Destination{ + Ref: &v1.KReference{ + APIVersion: v1.SchemeGroupVersion.Version, + Kind: "Broker", + Namespace: namespace, + Name: brokerName, + }, + }, + } + ksp.Spec.Services = &v1alpha08.ServicesPlatformSpec{ + DataIndex: &v1alpha08.DataIndexServiceSpec{ + ServiceSpec: v1alpha08.ServiceSpec{ + Enabled: &enabled, + }, + }, + JobService: &v1alpha08.JobServiceServiceSpec{ + ServiceSpec: v1alpha08.ServiceSpec{ + Enabled: &enabled, + }, + }, + } + + // Create a fake client to mock API calls. + cl := test.NewKogitoClientBuilderWithOpenShift().WithRuntimeObjects(ksp, broker).WithStatusSubresource(ksp, broker).Build() + // Create a SonataFlowPlatformReconciler object with the scheme and fake client. + r := &SonataFlowPlatformReconciler{cl, cl, cl.Scheme(), &rest.Config{}, &record.FakeRecorder{}} + + // Mock request to simulate Reconcile() being called on an event for a + // watched resource . + req := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: ksp.Name, + Namespace: ksp.Namespace, + }, + } + _, err := r.Reconcile(context.TODO(), req) + if err != nil { + t.Fatalf("reconcile: (%v)", err) + } + + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: ksp.Name, Namespace: ksp.Namespace}, ksp)) + + // Perform some checks on the created CR + assert.Equal(t, "quay.io/kiegroup", ksp.Spec.Build.Config.Registry.Address) + assert.Equal(t, "regcred", ksp.Spec.Build.Config.Registry.Secret) + assert.Equal(t, v1alpha08.OperatorBuildStrategy, ksp.Spec.Build.Config.BuildStrategy) + assert.NotNil(t, ksp.Spec.Services.DataIndex) + assert.NotNil(t, ksp.Spec.Services.DataIndex.Enabled) + assert.Equal(t, true, *ksp.Spec.Services.DataIndex.Enabled) + assert.NotNil(t, ksp.Spec.Services.JobService) + assert.NotNil(t, ksp.Spec.Services.JobService.Enabled) + assert.Equal(t, true, *ksp.Spec.Services.JobService.Enabled) + assert.Equal(t, v1alpha08.PlatformClusterKubernetes, ksp.Status.Cluster) + + assert.Equal(t, "", ksp.Status.GetTopLevelCondition().Reason) + + // Check Triggers + trigger := &eventingv1.Trigger{} + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: "jobs-service-create-job-trigger", Namespace: ksp.Namespace}, trigger)) + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: "jobs-service-delete-job-trigger", Namespace: ksp.Namespace}, trigger)) + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: "data-index-service-jobs-trigger", Namespace: ksp.Namespace}, trigger)) + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: "data-index-service-processes-definition-trigger", Namespace: ksp.Namespace}, trigger)) + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: "data-index-service-processes-instance-trigger", Namespace: ksp.Namespace}, trigger)) + + // Check SinkBinding + sinkBinding := &sourcesv1.SinkBinding{} + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: "jobs-service-sb", Namespace: ksp.Namespace}, sinkBinding)) + + }) } diff --git a/go.mod b/go.mod index 7e75b4c90..463059463 100644 --- a/go.mod +++ b/go.mod @@ -38,6 +38,7 @@ require ( github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc github.com/imdario/mergo v0.3.16 k8s.io/klog/v2 v2.100.1 + k8s.io/utils v0.0.0-20230711102312-30195339c3c7 knative.dev/eventing v0.26.0 ) @@ -128,7 +129,6 @@ require ( k8s.io/apiextensions-apiserver v0.27.6 // indirect k8s.io/component-base v0.27.6 // indirect k8s.io/kube-openapi v0.0.0-20230525220651-2546d827e515 // indirect - k8s.io/utils v0.0.0-20230711102312-30195339c3c7 // indirect knative.dev/networking v0.0.0-20231017124814-2a7676e912b7 // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/structured-merge-diff/v4 v4.3.0 // indirect diff --git a/operator.yaml b/operator.yaml index e9995c511..7fd80e037 100644 --- a/operator.yaml +++ b/operator.yaml @@ -912,6 +912,60 @@ spec: of the operator's default. type: string type: object + eventing: + description: Eventing describes the information required for Knative + Eventing integration in the platform. + properties: + broker: + description: Broker to communicate with workflow deployment. It + can be the default broker when the workflow, Dataindex, or Jobservice + does not have a sink or source specified. + properties: + CACerts: + description: CACerts are Certification Authority (CA) certificates + in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. + If set, these CAs are appended to the set of CAs provided + by the Addressable target, if any. + type: string + ref: + description: Ref points to an Addressable. + properties: + address: + description: Address points to a specific Address Name. + type: string + apiVersion: + description: API version of the referent. + type: string + group: + description: 'Group of the API, without the version of + the group. This can be used as an alternative to the + APIVersion, and then resolved using ResolveGroup. Note: + This API is EXPERIMENTAL and might break anytime. For + more details: https://github.com/knative/eventing/issues/5086' + type: string + kind: + description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + name: + description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' + type: string + namespace: + description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ + This is optional field, it gets defaulted to the object + holding it if left out.' + type: string + required: + - kind + - name + type: object + uri: + description: URI can be an absolute URL(non-empty scheme and + non-empty host) pointing to the target or a relative URI. + Relative URIs will be resolved using the base URI retrieved + from Ref. + type: string + type: object + type: object persistence: description: Persistence defines the platform persistence configuration. When this field is set, the configuration is used as the persistence @@ -8919,6 +8973,56 @@ spec: type: object type: array type: object + source: + description: Defines the source where the Dataindex receives + events from + properties: + CACerts: + description: CACerts are Certification Authority (CA) + certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. + If set, these CAs are appended to the set of CAs provided + by the Addressable target, if any. + type: string + ref: + description: Ref points to an Addressable. + properties: + address: + description: Address points to a specific Address + Name. + type: string + apiVersion: + description: API version of the referent. + type: string + group: + description: 'Group of the API, without the version + of the group. This can be used as an alternative + to the APIVersion, and then resolved using ResolveGroup. + Note: This API is EXPERIMENTAL and might break anytime. + For more details: https://github.com/knative/eventing/issues/5086' + type: string + kind: + description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + name: + description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' + type: string + namespace: + description: 'Namespace of the referent. More info: + https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ + This is optional field, it gets defaulted to the + object holding it if left out.' + type: string + required: + - kind + - name + type: object + uri: + description: URI can be an absolute URL(non-empty scheme + and non-empty host) pointing to the target or a relative + URI. Relative URIs will be resolved using the base URI + retrieved from Ref. + type: string + type: object type: object jobService: description: 'Deploys the Job service for use by workflows without @@ -16791,6 +16895,106 @@ spec: type: object type: array type: object + sink: + description: Defines the sink where the Jobservice sends events + to + properties: + CACerts: + description: CACerts are Certification Authority (CA) + certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. + If set, these CAs are appended to the set of CAs provided + by the Addressable target, if any. + type: string + ref: + description: Ref points to an Addressable. + properties: + address: + description: Address points to a specific Address + Name. + type: string + apiVersion: + description: API version of the referent. + type: string + group: + description: 'Group of the API, without the version + of the group. This can be used as an alternative + to the APIVersion, and then resolved using ResolveGroup. + Note: This API is EXPERIMENTAL and might break anytime. + For more details: https://github.com/knative/eventing/issues/5086' + type: string + kind: + description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + name: + description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' + type: string + namespace: + description: 'Namespace of the referent. More info: + https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ + This is optional field, it gets defaulted to the + object holding it if left out.' + type: string + required: + - kind + - name + type: object + uri: + description: URI can be an absolute URL(non-empty scheme + and non-empty host) pointing to the target or a relative + URI. Relative URIs will be resolved using the base URI + retrieved from Ref. + type: string + type: object + source: + description: Defines the source where the Jobservice receives + events from + properties: + CACerts: + description: CACerts are Certification Authority (CA) + certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. + If set, these CAs are appended to the set of CAs provided + by the Addressable target, if any. + type: string + ref: + description: Ref points to an Addressable. + properties: + address: + description: Address points to a specific Address + Name. + type: string + apiVersion: + description: API version of the referent. + type: string + group: + description: 'Group of the API, without the version + of the group. This can be used as an alternative + to the APIVersion, and then resolved using ResolveGroup. + Note: This API is EXPERIMENTAL and might break anytime. + For more details: https://github.com/knative/eventing/issues/5086' + type: string + kind: + description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + name: + description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' + type: string + namespace: + description: 'Namespace of the referent. More info: + https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ + This is optional field, it gets defaulted to the + object holding it if left out.' + type: string + required: + - kind + - name + type: object + uri: + description: URI can be an absolute URL(non-empty scheme + and non-empty host) pointing to the target or a relative + URI. Relative URIs will be resolved using the base URI + retrieved from Ref. + type: string + type: object type: object type: object type: object @@ -26299,6 +26503,63 @@ spec: will be resolved using the base URI retrieved from Ref. type: string type: object + sources: + description: Sources describes the list of sources used to create + triggers for events consumed by this SonataFlow instance. + items: + description: SonataFlowSourceSpec defines the desired state of a + source used for trigger creation + properties: + CACerts: + description: CACerts are Certification Authority (CA) certificates + in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. + If set, these CAs are appended to the set of CAs provided + by the Addressable target, if any. + type: string + eventType: + description: Defines the eventType to filter the events + type: string + ref: + description: Ref points to an Addressable. + properties: + address: + description: Address points to a specific Address Name. + type: string + apiVersion: + description: API version of the referent. + type: string + group: + description: 'Group of the API, without the version of the + group. This can be used as an alternative to the APIVersion, + and then resolved using ResolveGroup. Note: This API is + EXPERIMENTAL and might break anytime. For more details: + https://github.com/knative/eventing/issues/5086' + type: string + kind: + description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + name: + description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' + type: string + namespace: + description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ + This is optional field, it gets defaulted to the object + holding it if left out.' + type: string + required: + - kind + - name + type: object + uri: + description: URI can be an absolute URL(non-empty scheme and + non-empty host) pointing to the target or a relative URI. + Relative URIs will be resolved using the base URI retrieved + from Ref. + type: string + required: + - eventType + type: object + type: array required: - flow type: object diff --git a/test/kubernetes_cli.go b/test/kubernetes_cli.go index 72029fd1c..ab87558f3 100644 --- a/test/kubernetes_cli.go +++ b/test/kubernetes_cli.go @@ -21,8 +21,13 @@ package test import ( "context" + "testing" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" + servingv1 "knative.dev/serving/pkg/apis/serving/v1" + buildv1 "github.com/openshift/api/build/v1" imgv1 "github.com/openshift/api/image/v1" routev1 "github.com/openshift/api/route/v1" @@ -33,7 +38,6 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/record" - servingv1 "knative.dev/serving/pkg/apis/serving/v1" ctrl "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" @@ -66,6 +70,9 @@ func NewKogitoClientBuilderWithOpenShift() *fake.ClientBuilder { utilruntime.Must(buildv1.Install(s)) utilruntime.Must(imgv1.Install(s)) utilruntime.Must(operatorapi.AddToScheme(s)) + utilruntime.Must(eventingv1.AddToScheme(s)) + utilruntime.Must(sourcesv1.AddToScheme(s)) + utilruntime.Must(servingv1.AddToScheme(s)) return fake.NewClientBuilder().WithScheme(s) } diff --git a/utils/kubernetes/deployment.go b/utils/kubernetes/deployment.go index 676c49924..4bdeb287a 100644 --- a/utils/kubernetes/deployment.go +++ b/utils/kubernetes/deployment.go @@ -151,8 +151,13 @@ func dataFromCM(cm *v1.ConfigMap, key string) string { } func calculateHash(userPropsCM, managedPropsCM *v1.ConfigMap, workflow *operatorapi.SonataFlow) (string, error) { - aggregatedProps := fmt.Sprintf("%s,%s", dataFromCM(userPropsCM, workflowproj.ApplicationPropertiesFileName), - dataFromCM(managedPropsCM, workflowproj.GetManagedPropertiesFileName(workflow))) + aggregatedProps := "" + if workflow == nil { + aggregatedProps = dataFromCM(userPropsCM, workflowproj.ApplicationPropertiesFileName) + } else { + aggregatedProps = fmt.Sprintf("%s,%s", dataFromCM(userPropsCM, workflowproj.ApplicationPropertiesFileName), + dataFromCM(managedPropsCM, workflowproj.GetManagedPropertiesFileName(workflow))) + } hash := sha256.New() _, err := hash.Write([]byte(aggregatedProps)) if err != nil { diff --git a/workflowproj/operator.go b/workflowproj/operator.go index 2383f34fa..bc44b254b 100644 --- a/workflowproj/operator.go +++ b/workflowproj/operator.go @@ -41,6 +41,8 @@ const ( LabelService = metadata.Domain + "/service" // LabelWorkflow specialized label managed by the controller LabelWorkflow = metadata.Domain + "/workflow-app" + // LabelWorkflowNamespace specialized label managed by the controller indicating the namespace of the workflow + LabelWorkflowNamespace = metadata.Domain + "/workflow-namespace" ) // SetTypeToObject sets the Kind and ApiVersion to a given object since the default constructor won't do it. @@ -85,8 +87,9 @@ func GetManagedPropertiesFileName(workflow *operatorapi.SonataFlow) string { // GetDefaultLabels gets the default labels based on the given workflow. func GetDefaultLabels(workflow *operatorapi.SonataFlow) map[string]string { return map[string]string{ - LabelApp: workflow.Name, - LabelWorkflow: workflow.Name, + LabelApp: workflow.Name, + LabelWorkflow: workflow.Name, + LabelWorkflowNamespace: workflow.Namespace, } }