Skip to content

Commit

Permalink
[issue-368] knative integration with DataIndex and JobService: fix wo…
Browse files Browse the repository at this point in the history
…rkflow deletion hanging issue
  • Loading branch information
jianrongzhang89 committed Sep 11, 2024
1 parent 449afcf commit 5b0adea
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 35 deletions.
1 change: 0 additions & 1 deletion controllers/profiles/common/constants/platform_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ const (
JobServiceJobEventsPath = "/v2/jobs/events"
JobServiceLeaderCheckExpirationInSeconds = "kogito.jobs-service.management.leader-check.expiration-in-seconds"
DefaultJobServiceLeaderCheckExpirationInSeconds = "60"
JobServiceRequestEventsURL = "mp.messaging.outgoing.kogito-job-service-job-request-events.url"

KogitoProcessInstancesEventsConnector = "mp.messaging.outgoing.kogito-processinstances-events.connector"
KogitoProcessInstancesEventsMethod = "mp.messaging.outgoing.kogito-processinstances-events.method"
Expand Down
28 changes: 9 additions & 19 deletions controllers/sonataflow_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,25 +128,15 @@ func (r *SonataFlowReconciler) setDefaults(workflow *operatorapi.SonataFlow) {
}

func (r *SonataFlowReconciler) cleanupTriggers(ctx context.Context, workflow *operatorapi.SonataFlow) error {
plf, _ := platform.GetActivePlatform(ctx, r.Client, workflow.Namespace)
if plf == nil || len(plf.Status.Triggers) == 0 {
return nil
}
avail, err := knative.GetKnativeAvailability(r.Config)
if err != nil {
return err
}
if avail.Eventing {
for _, triggerRef := range workflow.Status.Triggers {
trigger := &eventingv1.Trigger{
ObjectMeta: metav1.ObjectMeta{
Name: triggerRef.Name,
Namespace: triggerRef.Namespace,
},
}
if err := r.Client.Delete(ctx, trigger); err != nil {
return err
}
for _, triggerRef := range workflow.Status.Triggers {
trigger := &eventingv1.Trigger{
ObjectMeta: metav1.ObjectMeta{
Name: triggerRef.Name,
Namespace: triggerRef.Namespace,
},
}
if err := r.Client.Delete(ctx, trigger); err != nil && !errors.IsNotFound(err) {
return err
}
}
controllerutil.RemoveFinalizer(workflow, constants.TriggerFinalizer)
Expand Down
24 changes: 9 additions & 15 deletions controllers/sonataflowplatform_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,21 +185,15 @@ func (r *SonataFlowPlatformReconciler) Reconcile(ctx context.Context, req reconc
}

func (r *SonataFlowPlatformReconciler) cleanupTriggers(ctx context.Context, platform *operatorapi.SonataFlowPlatform) error {
avail, err := knative.GetKnativeAvailability(r.Config)
if err != nil {
return err
}
if avail.Eventing {
for _, triggerRef := range platform.Status.Triggers {
trigger := &eventingv1.Trigger{
ObjectMeta: metav1.ObjectMeta{
Name: triggerRef.Name,
Namespace: triggerRef.Namespace,
},
}
if err := r.Client.Delete(ctx, trigger); err != nil {
return err
}
for _, triggerRef := range platform.Status.Triggers {
trigger := &eventingv1.Trigger{
ObjectMeta: metav1.ObjectMeta{
Name: triggerRef.Name,
Namespace: triggerRef.Namespace,
},
}
if err := r.Client.Delete(ctx, trigger); err != nil && !errors.IsNotFound(err) {
return err
}
}
controllerutil.RemoveFinalizer(platform, constants.TriggerFinalizer)
Expand Down

0 comments on commit 5b0adea

Please sign in to comment.