Skip to content

Commit

Permalink
Implementing prod ops reconciliation flow
Browse files Browse the repository at this point in the history
Signed-off-by: Ricardo Zanini <[email protected]>
  • Loading branch information
ricardozanini committed Sep 14, 2023
1 parent 40430c5 commit f1b9c8f
Show file tree
Hide file tree
Showing 13 changed files with 257 additions and 131 deletions.
4 changes: 4 additions & 0 deletions api/metadata/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ const (

type ProfileType string

func (p ProfileType) String() string {
return string(p)
}

const (
DevProfile ProfileType = "dev"
ProdProfile ProfileType = "prod"
Expand Down
4 changes: 4 additions & 0 deletions api/v1alpha08/sonataflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ type SonataFlow struct {
Status SonataFlowStatus `json:"status,omitempty"`
}

func (s *SonataFlow) HasFlowContainerImage() bool {
return len(s.Spec.PodSpecTemplate.FlowContainer.Image) > 0
}

// SonataFlowList contains a list of SonataFlow
// +kubebuilder:object:root=true
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
8 changes: 7 additions & 1 deletion controllers/profiles/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,25 @@ import (

const (
defaultProfile = metadata.ProdProfile
// internal profile
opsProfile metadata.ProfileType = "prod_for_ops"
)

type reconcilerBuilder func(client client.Client, config *rest.Config) profiles.ProfileReconciler

var profileBuilders = map[metadata.ProfileType]reconcilerBuilder{
metadata.ProdProfile: prod.NewProfileReconciler,
metadata.DevProfile: dev.NewProfileReconciler,
opsProfile: prod.NewProfileForOpsReconciler,
}

func profileBuilder(workflow *operatorapi.SonataFlow) reconcilerBuilder {
profile := workflow.Annotations[metadata.Profile]
if len(profile) == 0 {
return profileBuilders[defaultProfile]
profile = defaultProfile.String()
}
if profile == metadata.ProdProfile.String() && workflow.HasFlowContainerImage() {
return profileBuilders[opsProfile]
}
if _, ok := profileBuilders[metadata.ProfileType(profile)]; !ok {
return profileBuilders[defaultProfile]
Expand Down
14 changes: 9 additions & 5 deletions controllers/profiles/common/mutate_visitors.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@ import (
// Only overrides the image if .spec.podSpecTemplate.flowContainer.Image is empty.
func ImageDeploymentMutateVisitor(workflow *operatorapi.SonataFlow, image string) MutateVisitor {
return func(object client.Object) controllerutil.MutateFn {
return func() error {
if len(workflow.Spec.PodSpecTemplate.FlowContainer.Image) == 0 {
deployment := object.(*appsv1.Deployment)
_, idx := kubeutil.GetContainerByName(DefaultContainerName, &deployment.Spec.Template.Spec)
deployment.Spec.Template.Spec.Containers[idx].Image = image
// noop since we already have an image in the flow container defined by the user.
if workflow.HasFlowContainerImage() {
return func() error {
return nil
}
}
return func() error {
deployment := object.(*appsv1.Deployment)
_, idx := kubeutil.GetContainerByName(DefaultContainerName, &deployment.Spec.Template.Spec)
deployment.Spec.Template.Spec.Containers[idx].Image = image
return nil
}
}
Expand Down
2 changes: 0 additions & 2 deletions controllers/profiles/common/object_creators.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,6 @@ func defaultContainer(workflow *operatorapi.SonataFlow) (*corev1.Container, erro
}
// immutable
defaultFlowContainer.Name = DefaultContainerName

// TODO: write a replace function
portIdx := -1
for i := range defaultFlowContainer.Ports {
if defaultFlowContainer.Ports[i].Name == DefaultHTTPWorkflowPortName ||
Expand Down
2 changes: 0 additions & 2 deletions controllers/profiles/common/status_enricher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import (
"github.com/kiegroup/kogito-serverless-operator/log"
)

// TODO: review the need for this

// NewStatusEnricher ...
func NewStatusEnricher(client client.Client, enricher StatusEnricherFn) *StatusEnricher {
return &StatusEnricher{
Expand Down
4 changes: 2 additions & 2 deletions controllers/profiles/dev/states_dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ func (e *ensureRunningWorkflowState) Do(ctx context.Context, workflow *operatora
}

devBaseContainerImage := workflowdef.GetDefaultWorkflowDevModeImageTag()
pl, errPl := platform.GetActivePlatform(ctx, e.C, workflow.Namespace)
pl, err := platform.GetActivePlatform(ctx, e.C, workflow.Namespace)
// check if the Platform available
if errPl == nil && len(pl.Spec.DevMode.BaseImage) > 0 {
if err == nil && len(pl.Spec.DevMode.BaseImage) > 0 {
devBaseContainerImage = pl.Spec.DevMode.BaseImage
}

Expand Down
137 changes: 137 additions & 0 deletions controllers/profiles/prod/deployment_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright 2023 Red Hat, Inc. and/or its affiliates
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package prod

import (
"context"

appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/kiegroup/kogito-serverless-operator/api"
operatorapi "github.com/kiegroup/kogito-serverless-operator/api/v1alpha08"
"github.com/kiegroup/kogito-serverless-operator/controllers/profiles/common"
"github.com/kiegroup/kogito-serverless-operator/log"
"github.com/kiegroup/kogito-serverless-operator/utils"
)

type deploymentHandler struct {
*common.StateSupport
ensurers *prodObjectEnsurers
}

func newDeploymentHandler(stateSupport *common.StateSupport, ensurer *prodObjectEnsurers) *deploymentHandler {
return &deploymentHandler{
StateSupport: stateSupport,
ensurers: ensurer,
}
}

func (d *deploymentHandler) handle(ctx context.Context, workflow *operatorapi.SonataFlow) (reconcile.Result, []client.Object, error) {
return d.handleWithImage(ctx, workflow, "")
}

func (d *deploymentHandler) handleWithImage(ctx context.Context, workflow *operatorapi.SonataFlow, image string) (reconcile.Result, []client.Object, error) {
propsCM, _, err := d.ensurers.propertiesConfigMap.Ensure(ctx, workflow, common.WorkflowPropertiesMutateVisitor(workflow, common.DefaultApplicationProperties))
if err != nil {
workflow.Status.Manager().MarkFalse(api.RunningConditionType, api.ExternalResourcesNotFoundReason, "Unable to retrieve the properties config map")
_, err = d.PerformStatusUpdate(ctx, workflow)
return ctrl.Result{}, nil, err
}

// Check if this Deployment already exists
// TODO: we should NOT do this. The ensurers are there to do exactly this fetch. Review once we refactor this reconciliation algorithm. See https://issues.redhat.com/browse/KOGITO-8524
existingDeployment := &appsv1.Deployment{}
requeue := false
if err := d.C.Get(ctx, client.ObjectKeyFromObject(workflow), existingDeployment); err != nil {
if !errors.IsNotFound(err) {
workflow.Status.Manager().MarkFalse(api.RunningConditionType, api.DeploymentUnavailableReason, "Unable to verify if deployment is available due to ", err)
_, err = d.PerformStatusUpdate(ctx, workflow)
return reconcile.Result{Requeue: false}, nil, err
}
deployment, _, err :=
d.ensurers.deployment.Ensure(
ctx,
workflow,
d.getDeploymentMutateVisitors(workflow, image, propsCM.(*v1.ConfigMap))...,
)
if err != nil {
workflow.Status.Manager().MarkFalse(api.RunningConditionType, api.DeploymentFailureReason, "Unable to perform the deploy due to ", err)
_, err = d.PerformStatusUpdate(ctx, workflow)
return reconcile.Result{}, nil, err
}
existingDeployment, _ = deployment.(*appsv1.Deployment)
requeue = true
}
// TODO: verify if deployment is ready. See https://issues.redhat.com/browse/KOGITO-8524

existingService := &v1.Service{}
if err := d.C.Get(ctx, client.ObjectKeyFromObject(workflow), existingService); err != nil {
if !errors.IsNotFound(err) {
return reconcile.Result{Requeue: false}, nil, err
}
service, _, err := d.ensurers.service.Ensure(ctx, workflow, common.ServiceMutateVisitor(workflow))
if err != nil {
workflow.Status.Manager().MarkFalse(api.RunningConditionType, api.DeploymentUnavailableReason, "Unable to make the service available due to ", err)
_, err = d.PerformStatusUpdate(ctx, workflow)
return reconcile.Result{}, nil, err
}
existingService, _ = service.(*v1.Service)
requeue = true
}
// TODO: verify if service is ready. See https://issues.redhat.com/browse/KOGITO-8524

objs := []client.Object{existingDeployment, existingService, propsCM}

if !requeue {
klog.V(log.I).InfoS("Skip reconcile: Deployment and service already exists",
"Deployment.Namespace", existingDeployment.Namespace, "Deployment.Name", existingDeployment.Name)
result, err := common.DeploymentHandler(d.C).SyncDeploymentStatus(ctx, workflow)
if err != nil {
return reconcile.Result{Requeue: false}, nil, err
}

if _, err := d.PerformStatusUpdate(ctx, workflow); err != nil {
return reconcile.Result{Requeue: false}, nil, err
}
return result, objs, nil
}

workflow.Status.Manager().MarkFalse(api.RunningConditionType, api.WaitingForDeploymentReason, "")
if _, err := d.PerformStatusUpdate(ctx, workflow); err != nil {
return reconcile.Result{Requeue: false}, nil, err
}
return reconcile.Result{RequeueAfter: common.RequeueAfterFollowDeployment}, objs, nil
}

func (d *deploymentHandler) getDeploymentMutateVisitors(
workflow *operatorapi.SonataFlow,
image string,
configMap *v1.ConfigMap) []common.MutateVisitor {
if utils.IsOpenShift() {
return []common.MutateVisitor{common.DeploymentMutateVisitor(workflow),
mountProdConfigMapsMutateVisitor(configMap),
addOpenShiftImageTriggerDeploymentMutateVisitor(workflow, image),
common.ImageDeploymentMutateVisitor(workflow, image)}
}
return []common.MutateVisitor{common.DeploymentMutateVisitor(workflow),
common.ImageDeploymentMutateVisitor(workflow, image),
mountProdConfigMapsMutateVisitor(configMap)}
}
30 changes: 20 additions & 10 deletions controllers/profiles/prod/object_creators_prod.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

"github.com/kiegroup/kogito-serverless-operator/api/v1alpha08"

"github.com/kiegroup/kogito-serverless-operator/controllers/profiles/common"
kubeutil "github.com/kiegroup/kogito-serverless-operator/utils/kubernetes"
"github.com/kiegroup/kogito-serverless-operator/workflowproj"
Expand All @@ -35,8 +37,14 @@ const (
// addOpenShiftImageTriggerDeploymentMutateVisitor adds the ImageStream trigger annotation to the Deployment
//
// See: https://docs.openshift.com/container-platform/4.13/openshift_images/triggering-updates-on-imagestream-changes.html
func addOpenShiftImageTriggerDeploymentMutateVisitor(image string) common.MutateVisitor {
func addOpenShiftImageTriggerDeploymentMutateVisitor(workflow *v1alpha08.SonataFlow, image string) common.MutateVisitor {
return func(object client.Object) controllerutil.MutateFn {
if workflow.HasFlowContainerImage() {
// noop since we don't need to build anything
return func() error {
return nil
}
}
return func() error {
annotations := make(map[string]string, len(object.(*appsv1.Deployment).Annotations)+1)
for k, v := range object.(*appsv1.Deployment).Annotations {
Expand All @@ -54,17 +62,19 @@ func mountProdConfigMapsMutateVisitor(propsCM *v1.ConfigMap) common.MutateVisito
return func(object client.Object) controllerutil.MutateFn {
return func() error {
deployment := object.(*appsv1.Deployment)
volumes := make([]v1.Volume, 0)
volumeMounts := make([]v1.VolumeMount, 0)
_, idx := kubeutil.GetContainerByName(common.DefaultContainerName, &deployment.Spec.Template.Spec)

volumes = append(volumes, kubeutil.VolumeConfigMap(common.ConfigMapWorkflowPropsVolumeName, propsCM.Name, v1.KeyToPath{Key: workflowproj.ApplicationPropertiesFileName, Path: workflowproj.ApplicationPropertiesFileName}))

volumeMounts = append(volumeMounts, kubeutil.VolumeMount(common.ConfigMapWorkflowPropsVolumeName, true, quarkusProdConfigMountPath))
if len(deployment.Spec.Template.Spec.Volumes) == 0 {
deployment.Spec.Template.Spec.Volumes = make([]v1.Volume, 1)
}
if len(deployment.Spec.Template.Spec.Containers[idx].VolumeMounts) == 0 {
deployment.Spec.Template.Spec.Containers[idx].VolumeMounts = make([]v1.VolumeMount, 1)
}

deployment.Spec.Template.Spec.Volumes = make([]v1.Volume, 0)
deployment.Spec.Template.Spec.Volumes = volumes
deployment.Spec.Template.Spec.Containers[0].VolumeMounts = make([]v1.VolumeMount, 0)
deployment.Spec.Template.Spec.Containers[0].VolumeMounts = volumeMounts
kubeutil.AddOrReplaceVolume(&deployment.Spec.Template.Spec,
kubeutil.VolumeConfigMap(common.ConfigMapWorkflowPropsVolumeName, propsCM.Name, v1.KeyToPath{Key: workflowproj.ApplicationPropertiesFileName, Path: workflowproj.ApplicationPropertiesFileName}))
kubeutil.AddOrReplaceVolumeMount(idx, &deployment.Spec.Template.Spec,
kubeutil.VolumeMount(common.ConfigMapWorkflowPropsVolumeName, true, quarkusProdConfigMountPath))

return nil
}
Expand Down
25 changes: 22 additions & 3 deletions controllers/profiles/prod/profile_prod.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,23 @@ const (

// prodObjectEnsurers is a struct for the objects that ReconciliationState needs to create in the platform for the Production profile.
// ReconciliationState that needs access to it must include this struct as an attribute and initialize it in the profile builder.
// Use newProdObjectEnsurers to facilitate building this struct
// Use newObjectEnsurers to facilitate building this struct
type prodObjectEnsurers struct {
deployment common.ObjectEnsurer
service common.ObjectEnsurer
propertiesConfigMap common.ObjectEnsurer
}

func newProdObjectEnsurers(support *common.StateSupport) *prodObjectEnsurers {
func newObjectEnsurers(support *common.StateSupport) *prodObjectEnsurers {
return &prodObjectEnsurers{
deployment: common.NewObjectEnsurer(support.C, common.DeploymentCreator),
service: common.NewObjectEnsurer(support.C, common.ServiceCreator),
propertiesConfigMap: common.NewObjectEnsurer(support.C, common.WorkflowPropsConfigMapCreator),
}
}

// NewProfileReconciler the default profile builder which includes a build state to run an internal build process
// to have an immutable workflow image deployed
func NewProfileReconciler(client client.Client, config *rest.Config) profiles.ProfileReconciler {
support := &common.StateSupport{
C: client,
Expand All @@ -64,7 +66,24 @@ func NewProfileReconciler(client client.Client, config *rest.Config) profiles.Pr
stateMachine := common.NewReconciliationStateMachine(
&newBuilderState{StateSupport: support},
&followBuildStatusState{StateSupport: support},
&deployWorkflowState{StateSupport: support, ensurers: newProdObjectEnsurers(support)},
&deployWithBuildWorkflowState{StateSupport: support, ensurers: newObjectEnsurers(support)},
)
reconciler := &prodProfile{
BaseReconciler: common.NewBaseProfileReconciler(support, stateMachine),
}

return reconciler
}

// NewProfileForOpsReconciler creates an alternative prod profile that won't require to build the workflow image in order to deploy
// the workflow application. It assumes that the image has been built somewhere else.
func NewProfileForOpsReconciler(client client.Client, config *rest.Config) profiles.ProfileReconciler {
support := &common.StateSupport{
C: client,
}
// the reconciliation state machine
stateMachine := common.NewReconciliationStateMachine(
&followDeployWorkflowState{StateSupport: support, ensurers: newObjectEnsurers(support)},
)
reconciler := &prodProfile{
BaseReconciler: common.NewBaseProfileReconciler(support, stateMachine),
Expand Down
12 changes: 6 additions & 6 deletions controllers/profiles/prod/profile_prod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ func Test_deployWorkflowReconciliationHandler_handleObjects(t *testing.T) {
WithRuntimeObjects(workflow, platform, build).
WithStatusSubresource(workflow, platform, build).
Build()
handler := &deployWorkflowState{
handler := &deployWithBuildWorkflowState{
StateSupport: fakeReconcilerSupport(client),
ensurers: newProdObjectEnsurers(&common.StateSupport{C: client}),
ensurers: newObjectEnsurers(&common.StateSupport{C: client}),
}
result, objects, err := handler.Do(context.TODO(), workflow)
assert.Greater(t, result.RequeueAfter, int64(0))
Expand Down Expand Up @@ -149,9 +149,9 @@ func Test_GenerationAnnotationCheck(t *testing.T) {
WithRuntimeObjects(workflow, platform).
WithStatusSubresource(workflow, platform, &operatorapi.SonataFlowBuild{}).Build()

handler := &deployWorkflowState{
handler := &deployWithBuildWorkflowState{
StateSupport: fakeReconcilerSupport(client),
ensurers: newProdObjectEnsurers(&common.StateSupport{C: client}),
ensurers: newObjectEnsurers(&common.StateSupport{C: client}),
}
result, objects, err := handler.Do(context.TODO(), workflow)
assert.Greater(t, result.RequeueAfter, int64(time.Second))
Expand All @@ -168,9 +168,9 @@ func Test_GenerationAnnotationCheck(t *testing.T) {
err = client.Update(context.TODO(), workflowChanged)
assert.NoError(t, err)
// reconcile
handler = &deployWorkflowState{
handler = &deployWithBuildWorkflowState{
StateSupport: fakeReconcilerSupport(client),
ensurers: newProdObjectEnsurers(&common.StateSupport{C: client}),
ensurers: newObjectEnsurers(&common.StateSupport{C: client}),
}
result, objects, err = handler.Do(context.TODO(), workflowChanged)
assert.NoError(t, err)
Expand Down
Loading

0 comments on commit f1b9c8f

Please sign in to comment.