Skip to content

Commit

Permalink
[issue-521] Enhance error messages in SonataFlowPlatform CR status fo…
Browse files Browse the repository at this point in the history
…r knative integration
  • Loading branch information
jianrongzhang89 committed Oct 23, 2024
1 parent 5eddf46 commit 9ea09df
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 55 deletions.
6 changes: 3 additions & 3 deletions internal/controller/platform/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ package platform
import (
"context"

"github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client"

v08 "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client"
corev1 "k8s.io/api/core/v1"
)

// Action --.
Expand All @@ -38,7 +38,7 @@ type Action interface {
CanHandle(platform *v08.SonataFlowPlatform) bool

// executes the handling function
Handle(ctx context.Context, platform *v08.SonataFlowPlatform) (*v08.SonataFlowPlatform, error)
Handle(ctx context.Context, platform *v08.SonataFlowPlatform) (*v08.SonataFlowPlatform, *corev1.Event, error)
}

type baseAction struct {
Expand Down
6 changes: 4 additions & 2 deletions internal/controller/platform/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ package platform
import (
"context"

corev1 "k8s.io/api/core/v1"

"github.com/apache/incubator-kie-kogito-serverless-operator/api"
v08 "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
)
Expand All @@ -43,9 +45,9 @@ func (action *createAction) CanHandle(platform *v08.SonataFlowPlatform) bool {
return platform.Status.IsCreating()
}

func (action *createAction) Handle(ctx context.Context, platform *v08.SonataFlowPlatform) (*v08.SonataFlowPlatform, error) {
func (action *createAction) Handle(ctx context.Context, platform *v08.SonataFlowPlatform) (*v08.SonataFlowPlatform, *corev1.Event, error) {
//TODO: Perform the actions needed for the Platform creation
platform.Status.Manager().MarkTrue(api.SucceedConditionType)

return platform, nil
return platform, nil, nil
}
16 changes: 8 additions & 8 deletions internal/controller/platform/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,24 +61,24 @@ func (action *initializeAction) CanHandle(platform *operatorapi.SonataFlowPlatfo
return platform.Status.GetTopLevelCondition().IsUnknown() || platform.Status.IsDuplicated()
}

func (action *initializeAction) Handle(ctx context.Context, platform *operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, error) {
func (action *initializeAction) Handle(ctx context.Context, platform *operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, *corev1.Event, error) {
duplicate, err := action.isPrimaryDuplicate(ctx, platform)
if err != nil {
return nil, err
return nil, nil, err
}
if duplicate {
// another platform already present in the namespace
if !platform.Status.IsDuplicated() {
plat := platform.DeepCopy()
plat.Status.Manager().MarkFalse(api.SucceedConditionType, operatorapi.PlatformDuplicatedReason, "")
return plat, nil
return plat, nil, nil
}

return nil, nil
return nil, nil, nil
}

if err = CreateOrUpdateWithDefaults(ctx, platform, true); err != nil {
return nil, err
return nil, nil, err
}
// nolint: staticcheck
if platform.Spec.Build.Config.BuildStrategy == operatorapi.OperatorBuildStrategy {
Expand All @@ -88,13 +88,13 @@ func (action *initializeAction) Handle(ctx context.Context, platform *operatorap
klog.V(log.I).InfoS("Create persistent volume claim")
err := createPersistentVolumeClaim(ctx, action.client, platform)
if err != nil {
return nil, err
return nil, nil, err
}
// Create the Kaniko warmer pod that caches the base image into the SonataFlow builder volume
klog.V(log.I).InfoS("Create Kaniko cache warmer pod")
err = createKanikoCacheWarmerPod(ctx, action.client, platform)
if err != nil {
return nil, err
return nil, nil, err
}
platform.Status.Manager().MarkFalse(api.SucceedConditionType, operatorapi.PlatformWarmingReason, "")
} else {
Expand All @@ -106,7 +106,7 @@ func (action *initializeAction) Handle(ctx context.Context, platform *operatorap
}
platform.Status.Version = metadata.SpecVersion

return platform, nil
return platform, nil, nil
}

// TODO: move this to Kaniko packages based on the platform context
Expand Down
52 changes: 29 additions & 23 deletions internal/controller/platform/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,38 +61,38 @@ func (action *serviceAction) CanHandle(platform *operatorapi.SonataFlowPlatform)
return platform.Status.IsReady()
}

func (action *serviceAction) Handle(ctx context.Context, platform *operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, error) {
func (action *serviceAction) Handle(ctx context.Context, platform *operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, *corev1.Event, error) {
// Refresh applied configuration
if err := CreateOrUpdateWithDefaults(ctx, platform, false); err != nil {
return nil, err
return nil, nil, err
}

psDI := services.NewDataIndexHandler(platform)
if psDI.IsServiceSetInSpec() {
if err := createOrUpdateServiceComponents(ctx, action.client, platform, psDI); err != nil {
return nil, err
if event, err := createOrUpdateServiceComponents(ctx, action.client, platform, psDI); err != nil {
return nil, event, err
}
}

psJS := services.NewJobServiceHandler(platform)
if psJS.IsServiceSetInSpec() {
if err := createOrUpdateServiceComponents(ctx, action.client, platform, psJS); err != nil {
return nil, err
if event, err := createOrUpdateServiceComponents(ctx, action.client, platform, psJS); err != nil {
return nil, event, err
}
}

return platform, nil
return platform, nil, nil
}

func createOrUpdateServiceComponents(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error {
func createOrUpdateServiceComponents(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) (*corev1.Event, error) {
if err := createOrUpdateConfigMap(ctx, client, platform, psh); err != nil {
return err
return nil, err
}
if err := createOrUpdateDeployment(ctx, client, platform, psh); err != nil {
return err
return nil, err
}
if err := createOrUpdateService(ctx, client, platform, psh); err != nil {
return err
return nil, err
}
return createOrUpdateKnativeResources(ctx, client, platform, psh)
}
Expand Down Expand Up @@ -307,24 +307,24 @@ func setSonataFlowPlatformFinalizer(ctx context.Context, c client.Client, platfo
return nil
}

func createOrUpdateKnativeResources(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error {
func createOrUpdateKnativeResources(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) (*corev1.Event, error) {
lbl, _ := getLabels(platform, psh)
objs, err := psh.GenerateKnativeResources(platform, lbl)
objs, event, err := psh.GenerateKnativeResources(platform, lbl)
if err != nil {
return err
return event, err
}
// Create or update triggers
for _, obj := range objs {
if triggerDef, ok := obj.(*eventingv1.Trigger); ok {
if platform.Namespace == obj.GetNamespace() {
if err := controllerutil.SetControllerReference(platform, obj, client.Scheme()); err != nil {
return err
return nil, err
}
} else {
// This is for Knative trigger in a different namespace
// Set the finalizer for trigger cleanup when the platform is deleted
if err := setSonataFlowPlatformFinalizer(ctx, client, platform); err != nil {
return err
return nil, err
}
}
trigger := &eventingv1.Trigger{
Expand All @@ -335,21 +335,21 @@ func createOrUpdateKnativeResources(ctx context.Context, client client.Client, p
return nil
})
if err != nil {
return err
return nil, err
}
addToSonataFlowPlatformTriggerList(platform, trigger)
}
}

if err := SafeUpdatePlatformStatus(ctx, platform); err != nil {
return err
return nil, err
}

// Create or update sinkbindings
for _, obj := range objs {
if sbDef, ok := obj.(*sourcesv1.SinkBinding); ok {
if err := controllerutil.SetControllerReference(platform, obj, client.Scheme()); err != nil {
return err
return nil, err
}
sinkBinding := &sourcesv1.SinkBinding{
ObjectMeta: sbDef.ObjectMeta,
Expand All @@ -359,18 +359,24 @@ func createOrUpdateKnativeResources(ctx context.Context, client client.Client, p
return nil
})
if err != nil {
return err
return nil, err
}
kSinkInjected, err := psh.CheckKSinkInjected()
if err != nil {
return err
return nil, err
}
if !kSinkInjected {
return fmt.Errorf("waiting for K_SINK injection for %s to complete", psh.GetServiceName())
msg := fmt.Sprintf("waiting for K_SINK injection for %s to complete", psh.GetServiceName())
event := &corev1.Event{
Type: corev1.EventTypeWarning,
Reason: services.WaitingKnativeEventing,
Message: msg,
}
return event, fmt.Errorf(msg)
}
}
}
return nil
return nil, nil
}

func addToSonataFlowPlatformTriggerList(platform *operatorapi.SonataFlowPlatform, trigger *eventingv1.Trigger) {
Expand Down
7 changes: 4 additions & 3 deletions internal/controller/platform/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package platform
import (
"context"

corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"

"github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata"
Expand All @@ -46,7 +47,7 @@ func (action *monitorAction) CanHandle(platform *operatorapi.SonataFlowPlatform)
return platform.Status.IsReady()
}

func (action *monitorAction) Handle(ctx context.Context, platform *operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, error) {
func (action *monitorAction) Handle(ctx context.Context, platform *operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, *corev1.Event, error) {
// Just track the version of the operator in the platform resource
if platform.Status.Version != metadata.SpecVersion {
platform.Status.Version = metadata.SpecVersion
Expand All @@ -55,8 +56,8 @@ func (action *monitorAction) Handle(ctx context.Context, platform *operatorapi.S

// Refresh applied configuration
if err := CreateOrUpdateWithDefaults(ctx, platform, false); err != nil {
return nil, err
return nil, nil, err
}

return platform, nil
return platform, nil, nil
}
27 changes: 19 additions & 8 deletions internal/controller/platform/services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
const (
quarkusHibernateORMDatabaseGeneration string = "QUARKUS_HIBERNATE_ORM_DATABASE_GENERATION"
quarkusFlywayMigrateAtStart string = "QUARKUS_FLYWAY_MIGRATE_AT_START"
WaitingKnativeEventing = "WaitingKnativeEventing"
)

type PlatformServiceHandler interface {
Expand Down Expand Up @@ -83,7 +84,7 @@ type PlatformServiceHandler interface {
// GenerateServiceProperties returns a property object that contains the application properties required by the service deployment
GenerateServiceProperties() (*properties.Properties, error)
// GenerateKnativeResources returns knative resources that bridge between workflow deploys and the service
GenerateKnativeResources(platform *operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object, error)
GenerateKnativeResources(platform *operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object, *corev1.Event, error)

// IsServiceSetInSpec returns true if the service is set in the spec.
IsServiceSetInSpec() bool
Expand Down Expand Up @@ -582,18 +583,23 @@ func (d *DataIndexHandler) newTrigger(labels map[string]string, brokerName, name
},
}
}
func (d *DataIndexHandler) GenerateKnativeResources(platform *operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object, error) {
func (d *DataIndexHandler) GenerateKnativeResources(platform *operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object, *corev1.Event, error) {
broker := d.GetSourceBroker()
if broker == nil || len(broker.Ref.Name) == 0 {
return nil, nil // Nothing to do
return nil, nil, nil // Nothing to do
}
brokerName := broker.Ref.Name
namespace := broker.Ref.Namespace
if len(namespace) == 0 {
namespace = platform.Namespace
}
if err := knative.ValidateBroker(brokerName, namespace); err != nil {
return nil, err
event := &corev1.Event{
Type: corev1.EventTypeWarning,
Reason: WaitingKnativeEventing,
Message: err.Error(),
}
return nil, event, err
}
serviceName := d.GetServiceName()
return []client.Object{
Expand All @@ -603,7 +609,7 @@ func (d *DataIndexHandler) GenerateKnativeResources(platform *operatorapi.Sonata
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-state", "ProcessInstanceStateDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-variable", "ProcessInstanceVariableDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-definition", "ProcessDefinitionEvent", constants.KogitoProcessDefinitionsEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "jobs", "JobEvent", constants.KogitoJobsPath, platform)}, nil
d.newTrigger(lbl, brokerName, namespace, serviceName, "jobs", "JobEvent", constants.KogitoJobsPath, platform)}, nil, nil
}

func (d JobServiceHandler) GetSourceBroker() *duckv1.Destination {
Expand All @@ -620,7 +626,7 @@ func (d JobServiceHandler) GetSink() *duckv1.Destination {
return GetPlatformBroker(d.platform)
}

func (j *JobServiceHandler) GenerateKnativeResources(platform *operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object, error) {
func (j *JobServiceHandler) GenerateKnativeResources(platform *operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object, *corev1.Event, error) {
broker := j.GetSourceBroker()
sink := j.GetSink()
resultObjs := []client.Object{}
Expand All @@ -632,7 +638,12 @@ func (j *JobServiceHandler) GenerateKnativeResources(platform *operatorapi.Sonat
namespace = platform.Namespace
}
if err := knative.ValidateBroker(brokerName, namespace); err != nil {
return nil, err
event := &corev1.Event{
Type: corev1.EventTypeWarning,
Reason: WaitingKnativeEventing,
Message: err.Error(),
}
return nil, event, err
}
jobCreateTrigger := &eventingv1.Trigger{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -712,7 +723,7 @@ func (j *JobServiceHandler) GenerateKnativeResources(platform *operatorapi.Sonat
}
resultObjs = append(resultObjs, sinkBinding)
}
return resultObjs, nil
return resultObjs, nil, nil
}

func (j *JobServiceHandler) CheckKSinkInjected() (bool, error) {
Expand Down
10 changes: 5 additions & 5 deletions internal/controller/platform/warm.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (action *warmAction) CanHandle(platform *operatorapi.SonataFlowPlatform) bo
return platform.Status.IsWarming()
}

func (action *warmAction) Handle(ctx context.Context, platform *operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, error) {
func (action *warmAction) Handle(ctx context.Context, platform *operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, *corev1.Event, error) {
// Check Kaniko warmer pod status
pod := corev1.Pod{
TypeMeta: metav1.TypeMeta{
Expand All @@ -69,19 +69,19 @@ func (action *warmAction) Handle(ctx context.Context, platform *operatorapi.Sona

err := action.reader.Get(ctx, types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, &pod)
if err != nil {
return nil, err
return nil, nil, err
}

switch pod.Status.Phase {
case corev1.PodSucceeded:
klog.V(log.D).InfoS("Kaniko cache successfully warmed up")
platform.Status.Manager().MarkTrueWithReason(api.SucceedConditionType, operatorapi.PlatformWarmingReason, "Kaniko cache successfully warmed up")
return platform, nil
return platform, nil, nil
case corev1.PodFailed:
return nil, errors.New("failed to warm up Kaniko cache")
return nil, nil, errors.New("failed to warm up Kaniko cache")
default:
klog.V(log.I).InfoS("Waiting for Kaniko cache to warm up...")
// Requeue
return nil, nil
return nil, nil, nil
}
}
Loading

0 comments on commit 9ea09df

Please sign in to comment.