diff --git a/bundle/manifests/sonataflow-operator-controllers-config_v1_configmap.yaml b/bundle/manifests/sonataflow-operator-controllers-config_v1_configmap.yaml index ba91602d6..be219212e 100644 --- a/bundle/manifests/sonataflow-operator-controllers-config_v1_configmap.yaml +++ b/bundle/manifests/sonataflow-operator-controllers-config_v1_configmap.yaml @@ -1,6 +1,6 @@ apiVersion: v1 data: - controllers_cfg.yaml: | + controllers_cfg.yaml: |- # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -58,6 +58,9 @@ data: - groupId: org.kie artifactId: kie-addons-quarkus-persistence-jdbc version: 999-20240912-SNAPSHOT + # If true, the workflow deployments will be configured to send accumulated workflow status change events to the Data + # Index Service reducing the number of produced events. Set to false to send individual events. + kogitoEventsGrouping: true kind: ConfigMap metadata: name: sonataflow-operator-controllers-config diff --git a/config/manager/controllers_cfg.yaml b/config/manager/controllers_cfg.yaml index 0a518963e..b4d5bcf43 100644 --- a/config/manager/controllers_cfg.yaml +++ b/config/manager/controllers_cfg.yaml @@ -55,3 +55,6 @@ postgreSQLPersistenceExtensions: - groupId: org.kie artifactId: kie-addons-quarkus-persistence-jdbc version: 999-20240912-SNAPSHOT +# If true, the workflow deployments will be configured to send accumulated workflow status change events to the Data +# Index Service reducing the number of produced events. Set to false to send individual events. +kogitoEventsGrouping: true \ No newline at end of file diff --git a/internal/controller/cfg/controllers_cfg.go b/internal/controller/cfg/controllers_cfg.go index 7a99d7131..0a8e76f00 100644 --- a/internal/controller/cfg/controllers_cfg.go +++ b/internal/controller/cfg/controllers_cfg.go @@ -71,6 +71,7 @@ type ControllersCfg struct { SonataFlowDevModeImageTag string `yaml:"sonataFlowDevModeImageTag,omitempty"` BuilderConfigMapName string `yaml:"builderConfigMapName,omitempty"` PostgreSQLPersistenceExtensions []GAV `yaml:"postgreSQLPersistenceExtensions,omitempty"` + KogitoEventsGrouping bool `yaml:"kogitoEventsGrouping,omitempty"` } // InitializeControllersCfg initializes the platform configuration for this instance. diff --git a/internal/controller/cfg/controllers_cfg_test.go b/internal/controller/cfg/controllers_cfg_test.go index 0dfe51358..e72ea5bb0 100644 --- a/internal/controller/cfg/controllers_cfg_test.go +++ b/internal/controller/cfg/controllers_cfg_test.go @@ -54,6 +54,7 @@ func TestInitializeControllersCfgAt_ValidFile(t *testing.T) { ArtifactId: "kie-addons-quarkus-persistence-jdbc", Version: "999-SNAPSHOT", }, postgresExtensions[2]) + assert.True(t, cfg.KogitoEventsGrouping) } func TestInitializeControllersCfgAt_FileNotFound(t *testing.T) { diff --git a/internal/controller/cfg/testdata/controllers-cfg-test.yaml b/internal/controller/cfg/testdata/controllers-cfg-test.yaml index 415167784..33babade5 100644 --- a/internal/controller/cfg/testdata/controllers-cfg-test.yaml +++ b/internal/controller/cfg/testdata/controllers-cfg-test.yaml @@ -34,4 +34,5 @@ postgreSQLPersistenceExtensions: version: 3.8.6 - groupId: org.kie artifactId: kie-addons-quarkus-persistence-jdbc - version: 999-SNAPSHOT \ No newline at end of file + version: 999-SNAPSHOT +kogitoEventsGrouping: true \ No newline at end of file diff --git a/internal/controller/platform/services/services.go b/internal/controller/platform/services/services.go index 20673b75b..4fb67a35b 100644 --- a/internal/controller/platform/services/services.go +++ b/internal/controller/platform/services/services.go @@ -603,6 +603,7 @@ func (d *DataIndexHandler) GenerateKnativeResources(platform *operatorapi.Sonata d.newTrigger(lbl, brokerName, namespace, serviceName, "process-state", "ProcessInstanceStateDataEvent", constants.KogitoProcessInstancesEventsPath, platform), d.newTrigger(lbl, brokerName, namespace, serviceName, "process-variable", "ProcessInstanceVariableDataEvent", constants.KogitoProcessInstancesEventsPath, platform), d.newTrigger(lbl, brokerName, namespace, serviceName, "process-definition", "ProcessDefinitionEvent", constants.KogitoProcessDefinitionsEventsPath, platform), + d.newTrigger(lbl, brokerName, namespace, serviceName, "process-instance-multiple", "MultipleProcessInstanceDataEvent", constants.KogitoProcessInstancesMultiEventsPath, platform), d.newTrigger(lbl, brokerName, namespace, serviceName, "jobs", "JobEvent", constants.KogitoJobsPath, platform)}, nil } diff --git a/internal/controller/profiles/common/constants/platform_services.go b/internal/controller/profiles/common/constants/platform_services.go index 4416ed7d3..185f78427 100644 --- a/internal/controller/profiles/common/constants/platform_services.go +++ b/internal/controller/profiles/common/constants/platform_services.go @@ -38,11 +38,13 @@ const ( JobServiceLeaderCheckExpirationInSeconds = "kogito.jobs-service.management.leader-check.expiration-in-seconds" DefaultJobServiceLeaderCheckExpirationInSeconds = "60" - KogitoProcessInstancesEventsConnector = "mp.messaging.outgoing.kogito-processinstances-events.connector" - KogitoProcessInstancesEventsMethod = "mp.messaging.outgoing.kogito-processinstances-events.method" - KogitoProcessInstancesEventsURL = "mp.messaging.outgoing.kogito-processinstances-events.url" - KogitoProcessInstancesEventsEnabled = "kogito.events.processinstances.enabled" - KogitoProcessInstancesEventsPath = "/processes" + KogitoProcessInstancesEventsConnector = "mp.messaging.outgoing.kogito-processinstances-events.connector" + KogitoProcessInstancesEventsMethod = "mp.messaging.outgoing.kogito-processinstances-events.method" + KogitoProcessInstancesEventsURL = "mp.messaging.outgoing.kogito-processinstances-events.url" + KogitoProcessInstancesEventsEnabled = "kogito.events.processinstances.enabled" + KogitoProcessInstancesEventsPath = "/processes" + // KogitoProcessInstancesMultiEventsPath Same value as KogitoProcessInstancesEventsPath intentionally + KogitoProcessInstancesMultiEventsPath = "/processes" KogitoProcessDefinitionsEventsConnector = "mp.messaging.outgoing.kogito-processdefinitions-events.connector" KogitoProcessDefinitionsEventsMethod = "mp.messaging.outgoing.kogito-processdefinitions-events.method" KogitoProcessDefinitionsEventsURL = "mp.messaging.outgoing.kogito-processdefinitions-events.url" diff --git a/internal/controller/profiles/common/constants/workflows.go b/internal/controller/profiles/common/constants/workflows.go index a51b1d1f9..edb4864f7 100644 --- a/internal/controller/profiles/common/constants/workflows.go +++ b/internal/controller/profiles/common/constants/workflows.go @@ -29,4 +29,5 @@ const ( QuarkusDevUICorsEnabled = "quarkus.dev-ui.cors.enabled" QuarkusHttpCors = "quarkus.http.cors" QuarkusHttpCorsOrigins = "quarkus.http.cors.origins" + KogitoEventsGrouping = "kogito.events.grouping" ) diff --git a/internal/controller/profiles/common/properties/managed.go b/internal/controller/profiles/common/properties/managed.go index 3a6bb0dc2..f47747659 100644 --- a/internal/controller/profiles/common/properties/managed.go +++ b/internal/controller/profiles/common/properties/managed.go @@ -23,6 +23,8 @@ import ( "context" "fmt" + "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/cfg" + "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles" "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common/persistence" @@ -157,6 +159,7 @@ func NewManagedPropertyHandler(workflow *operatorapi.SonataFlow, platform *opera if profiles.IsDevProfile(workflow) { setDevProfileProperties(props) } + setControllersConfigProperties(workflow, props) props.Set(constants.KogitoUserTasksEventsEnabled, "false") if platform != nil { p, err := resolvePlatformWorkflowProperties(platform) @@ -192,6 +195,12 @@ func NewManagedPropertyHandler(workflow *operatorapi.SonataFlow, platform *opera return handler.withKogitoServiceUrl(), nil } +func setControllersConfigProperties(workflow *operatorapi.SonataFlow, props *properties.Properties) { + if !profiles.IsDevProfile(workflow) && cfg.GetCfg().KogitoEventsGrouping { + props.Set(constants.KogitoEventsGrouping, "true") + } +} + func setDevProfileProperties(props *properties.Properties) { props.Set(constants.QuarkusDevUICorsEnabled, "false") } diff --git a/internal/controller/profiles/common/properties/managed_test.go b/internal/controller/profiles/common/properties/managed_test.go index db4501c7a..0ee9293f1 100644 --- a/internal/controller/profiles/common/properties/managed_test.go +++ b/internal/controller/profiles/common/properties/managed_test.go @@ -24,6 +24,8 @@ import ( "fmt" "testing" + "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/cfg" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -315,6 +317,48 @@ func Test_appPropertyHandler_WithServicesWithUserOverrides(t *testing.T) { assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEventsURL, "")) } +func Test_appPropertyHandler_KogitoEventsGroupingTrueWithDevProfile(t *testing.T) { + doTestManagedPropsForKogitoEventsGrouping(t, metadata.DevProfile, true, false) +} + +func Test_appPropertyHandler_KogitoEventsGroupingTrueWithPreviewProfile(t *testing.T) { + doTestManagedPropsForKogitoEventsGrouping(t, metadata.PreviewProfile, true, true) +} + +func Test_appPropertyHandler_KogitoEventsGroupingTrueWithGitOpsProfile(t *testing.T) { + doTestManagedPropsForKogitoEventsGrouping(t, metadata.GitOpsProfile, true, true) +} + +func Test_appPropertyHandler_KogitoEventsGroupingFalseWithDevProfile(t *testing.T) { + doTestManagedPropsForKogitoEventsGrouping(t, metadata.DevProfile, false, false) +} + +func Test_appPropertyHandler_KogitoEventsGroupingFalseWithPreviewProfile(t *testing.T) { + doTestManagedPropsForKogitoEventsGrouping(t, metadata.PreviewProfile, false, false) +} + +func Test_appPropertyHandler_KogitoEventsGroupingFalseWithGitOpsProfile(t *testing.T) { + doTestManagedPropsForKogitoEventsGrouping(t, metadata.GitOpsProfile, false, false) +} + +func doTestManagedPropsForKogitoEventsGrouping(t *testing.T, profile metadata.ProfileType, kogitoEventsGrouping bool, shouldContain bool) { + currentKogitoEventGroupingValue := cfg.GetCfg().KogitoEventsGrouping + cfg.GetCfg().KogitoEventsGrouping = kogitoEventsGrouping + workflow := test.GetBaseSonataFlow("default") + setProfileInFlow(profile)(workflow) + platform := test.GetBasePlatform() + handler, err := NewManagedPropertyHandler(workflow, platform) + cfg.GetCfg().KogitoEventsGrouping = currentKogitoEventGroupingValue + assert.NoError(t, err) + generatedProps, propsErr := properties.LoadString(handler.Build()) + assert.NoError(t, propsErr) + if shouldContain { + assertHasProperty(t, generatedProps, "kogito.events.grouping", "true") + } else { + assert.NotContains(t, generatedProps.Keys(), "kogito.events.grouping") + } +} + var _ = Describe("Platform properties", func() { var _ = Context("for workflow properties", func() { diff --git a/operator.yaml b/operator.yaml index 3a4203fc6..1a74d064c 100644 --- a/operator.yaml +++ b/operator.yaml @@ -28217,7 +28217,7 @@ metadata: --- apiVersion: v1 data: - controllers_cfg.yaml: | + controllers_cfg.yaml: |- # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -28275,6 +28275,9 @@ data: - groupId: org.kie artifactId: kie-addons-quarkus-persistence-jdbc version: 999-20240912-SNAPSHOT + # If true, the workflow deployments will be configured to send accumulated workflow status change events to the Data + # Index Service reducing the number of produced events. Set to false to send individual events. + kogitoEventsGrouping: true kind: ConfigMap metadata: name: sonataflow-operator-controllers-config