From 4ab37b8dca43fe4e8b757f9e93f6b021a875f3e0 Mon Sep 17 00:00:00 2001 From: Roman Bednar Date: Tue, 1 Oct 2024 16:53:21 +0200 Subject: [PATCH] add new controller based on workload controller --- .../csicontrollerset/csi_controller_set.go | 36 ++ ...si_driver_controller_service_controller.go | 1 + ...iver_controller_service_controller_test.go | 319 +------------- ..._controller_service_workload_controller.go | 106 +++++ ...roller_service_workload_controller_test.go | 389 ++++++++++++++++++ .../deployment_workload_controller.go | 244 +++++++++++ 6 files changed, 777 insertions(+), 318 deletions(-) create mode 100644 pkg/operator/csi/csidrivercontrollerservicecontroller/csi_driver_controller_service_workload_controller.go create mode 100644 pkg/operator/csi/csidrivercontrollerservicecontroller/csi_driver_controller_service_workload_controller_test.go create mode 100644 pkg/operator/deploymentcontroller/deployment_workload_controller.go diff --git a/pkg/operator/csi/csicontrollerset/csi_controller_set.go b/pkg/operator/csi/csicontrollerset/csi_controller_set.go index 1e1b4fc626..e41550784e 100644 --- a/pkg/operator/csi/csicontrollerset/csi_controller_set.go +++ b/pkg/operator/csi/csicontrollerset/csi_controller_set.go @@ -200,6 +200,42 @@ func (c *CSIControllerSet) WithCSIDriverControllerService( return c } +func (c *CSIControllerSet) WithCSIDriverControllerServiceWorkload( + name string, + operandNamespace string, + assetFunc resourceapply.AssetFunc, + file string, + kubeClient kubernetes.Interface, + kubeInformersForNamespaces v1helpers.KubeInformersForNamespaces, + configInformer configinformers.SharedInformerFactory, + preconditions []deploymentcontroller.PreconditionFunc, + optionalInformers []factory.Informer, + optionalDeploymentHooks ...deploymentcontroller.DeploymentHookFunc, +) *CSIControllerSet { + manifestFile, err := assetFunc(file) + if err != nil { + panic(fmt.Sprintf("asset: Asset(%v): %v", file, err)) + } + + namespacedInformerFactory := kubeInformersForNamespaces.InformersFor(operandNamespace) + + c.csiDriverControllerServiceController = csidrivercontrollerservicecontroller.NewCSIDriverControllerServiceWorkloadController( + name, + operandNamespace, + manifestFile, + c.eventRecorder, + c.operatorClient, + kubeClient, + kubeInformersForNamespaces, + namespacedInformerFactory.Apps().V1().Deployments(), + configInformer, + preconditions, + optionalInformers, + optionalDeploymentHooks..., + ) + return c +} + func (c *CSIControllerSet) WithCSIDriverNodeService( name string, assetFunc resourceapply.AssetFunc, diff --git a/pkg/operator/csi/csidrivercontrollerservicecontroller/csi_driver_controller_service_controller.go b/pkg/operator/csi/csidrivercontrollerservicecontroller/csi_driver_controller_service_controller.go index c89caa5800..53f7c5cea4 100644 --- a/pkg/operator/csi/csidrivercontrollerservicecontroller/csi_driver_controller_service_controller.go +++ b/pkg/operator/csi/csidrivercontrollerservicecontroller/csi_driver_controller_service_controller.go @@ -63,6 +63,7 @@ import ( // Progressing: indicates that the CSI Controller Service is being deployed. // Degraded: produced when the sync() method returns an error. +// TODO: remove after all CSI operators migrate to NewCSIDriverControllerServiceWorkloadController func NewCSIDriverControllerServiceController( name string, manifest []byte, diff --git a/pkg/operator/csi/csidrivercontrollerservicecontroller/csi_driver_controller_service_controller_test.go b/pkg/operator/csi/csidrivercontrollerservicecontroller/csi_driver_controller_service_controller_test.go index c841c1d897..ee8700ced9 100644 --- a/pkg/operator/csi/csidrivercontrollerservicecontroller/csi_driver_controller_service_controller_test.go +++ b/pkg/operator/csi/csidrivercontrollerservicecontroller/csi_driver_controller_service_controller_test.go @@ -2,204 +2,21 @@ package csidrivercontrollerservicecontroller import ( "context" - "fmt" - "strings" "testing" - appsv1 "k8s.io/api/apps/v1" - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" coreinformers "k8s.io/client-go/informers" fakecore "k8s.io/client-go/kubernetes/fake" - configv1 "github.com/openshift/api/config/v1" - opv1 "github.com/openshift/api/operator/v1" fakeconfig "github.com/openshift/client-go/config/clientset/versioned/fake" configinformers "github.com/openshift/client-go/config/informers/externalversions" "github.com/openshift/library-go/pkg/controller/factory" "github.com/openshift/library-go/pkg/operator/events" - "github.com/openshift/library-go/pkg/operator/resource/resourceread" "github.com/openshift/library-go/pkg/operator/v1helpers" ) -const ( - controllerName = "TestCSIDriverControllerServiceController" - deploymentName = "test-csi-driver-controller" - operandName = "test-csi-driver" - operandNamespace = "openshift-test-csi-driver" - - csiDriverContainerName = "csi-driver" - provisionerContainerName = "csi-provisioner" - attacherContainerName = "csi-attacher" - resizerContainerName = "csi-resizer" - snapshotterContainerName = "csi-snapshotter" - livenessProbeContainerName = "csi-liveness-probe" - kubeRBACProxyContainerName = "provisioner-kube-rbac-proxy" - - defaultClusterID = "ID1234" - - hookDeploymentAnnKey = "operator.openshift.io/foo" - hookDeploymentAnnVal = "bar" -) - -type images struct { - csiDriver string - attacher string - provisioner string - resizer string - snapshotter string - livenessProbe string - kubeRBACProxy string -} - -// Drivers - -type driverModifier func(*fakeDriverInstance) *fakeDriverInstance - -func makeFakeDriverInstance(modifiers ...driverModifier) *fakeDriverInstance { - instance := &fakeDriverInstance{ - ObjectMeta: metav1.ObjectMeta{ - Name: "cluster", - Generation: 0, - }, - Spec: opv1.OperatorSpec{ - ManagementState: opv1.Managed, - }, - Status: opv1.OperatorStatus{}, - } - for _, modifier := range modifiers { - instance = modifier(instance) - } - return instance -} - -func getIndex(containers []v1.Container, name string) int { - for i := range containers { - if containers[i].Name == name { - return i - } - } - return -1 -} - -// Deployments - -type deploymentModifier func(*appsv1.Deployment) *appsv1.Deployment - -func makeDeployment(clusterID string, logLevel int, images images, modifiers ...deploymentModifier) *appsv1.Deployment { - manifest := makeFakeManifest() - dep := resourceread.ReadDeploymentV1OrDie(manifest) - - // Replace the placeholders in the manifest (, ${DRIVER_IMAGE}, ${LOG_LEVEL}) - containers := dep.Spec.Template.Spec.Containers - if images.csiDriver != "" { - if idx := getIndex(containers, csiDriverContainerName); idx > -1 { - containers[idx].Image = images.csiDriver - for j, arg := range containers[idx].Args { - if strings.HasPrefix(arg, "--k8s-tag-cluster-id=") { - dep.Spec.Template.Spec.Containers[idx].Args[j] = fmt.Sprintf("--k8s-tag-cluster-id=%s", clusterID) - } - } - } - } - - if images.provisioner != "" { - if idx := getIndex(containers, provisionerContainerName); idx > -1 { - containers[idx].Image = images.provisioner - } - } - - if images.attacher != "" { - if idx := getIndex(containers, attacherContainerName); idx > -1 { - containers[idx].Image = images.attacher - } - } - - if images.resizer != "" { - if idx := getIndex(containers, resizerContainerName); idx > -1 { - containers[idx].Image = images.resizer - } - } - - if images.snapshotter != "" { - if idx := getIndex(containers, snapshotterContainerName); idx > -1 { - containers[idx].Image = images.snapshotter - } - } - - if images.livenessProbe != "" { - if idx := getIndex(containers, livenessProbeContainerName); idx > -1 { - containers[idx].Image = images.livenessProbe - } - } - - if images.kubeRBACProxy != "" { - if idx := getIndex(containers, kubeRBACProxyContainerName); idx > -1 { - containers[idx].Image = images.kubeRBACProxy - } - } - - for i, container := range dep.Spec.Template.Spec.Containers { - for j, arg := range container.Args { - if strings.HasPrefix(arg, "--v=") { - dep.Spec.Template.Spec.Containers[i].Args[j] = fmt.Sprintf("--v=%d", logLevel) - } - } - } - - var one int32 = 1 - dep.Spec.Replicas = &one - - for _, modifier := range modifiers { - dep = modifier(dep) - } - - return dep -} - -func withDeploymentReplicas(replicas int32) deploymentModifier { - return func(instance *appsv1.Deployment) *appsv1.Deployment { - instance.Spec.Replicas = &replicas - return instance - } -} - -func withDeploymentGeneration(generations ...int64) deploymentModifier { - return func(instance *appsv1.Deployment) *appsv1.Deployment { - instance.Generation = generations[0] - if len(generations) > 1 { - instance.Status.ObservedGeneration = generations[1] - } - return instance - } -} - -// Infrastructure -func makeInfra() *configv1.Infrastructure { - return &configv1.Infrastructure{ - ObjectMeta: metav1.ObjectMeta{ - Name: infraConfigName, - Namespace: v1.NamespaceAll, - }, - Status: configv1.InfrastructureStatus{ - InfrastructureName: defaultClusterID, - Platform: configv1.AWSPlatformType, - PlatformStatus: &configv1.PlatformStatus{ - AWS: &configv1.AWSPlatformStatus{}, - }, - }, - } -} - -func deploymentAnnotationHook(opSpec *opv1.OperatorSpec, instance *appsv1.Deployment) error { - if instance.Annotations == nil { - instance.Annotations = map[string]string{} - } - instance.Annotations[hookDeploymentAnnKey] = hookDeploymentAnnVal - return nil -} - +// TODO: remove after all CSI operators migrate to NewCSIDriverControllerServiceWorkloadController func TestDeploymentHook(t *testing.T) { // Initialize coreClient := fakecore.NewSimpleClientset() @@ -239,137 +56,3 @@ func TestDeploymentHook(t *testing.T) { t.Fatalf("Annotation %q not found in Deployment", hookDeploymentAnnKey) } } - -func defaultImages() images { - return images{ - csiDriver: "quay.io/openshift/origin-test-csi-driver:latest", - provisioner: "quay.io/openshift/origin-csi-external-provisioner:latest", - attacher: "quay.io/openshift/origin-csi-external-attacher:latest", - resizer: "quay.io/openshift/origin-csi-external-resizer:latest", - snapshotter: "quay.io/openshift/origin-csi-external-snapshotter:latest", - livenessProbe: "quay.io/openshift/origin-csi-livenessprobe:latest", - kubeRBACProxy: "quay.io/openshift/origin-kube-rbac-proxy:latest", - } -} - -// fakeInstance is a fake CSI driver instance that also fullfils the OperatorClient interface -type fakeDriverInstance struct { - metav1.ObjectMeta - Spec opv1.OperatorSpec - Status opv1.OperatorStatus -} - -func makeFakeManifest() []byte { - return []byte(` -kind: Deployment -apiVersion: apps/v1 -metadata: - name: test-csi-driver-controller - namespace: openshift-test-csi-driver -spec: - selector: - matchLabels: - app: test-csi-driver-controller - serviceName: test-csi-driver-controller - replicas: 1 - template: - metadata: - labels: - app: test-csi-driver-controller - spec: - nodeSelector: - node-role.kubernetes.io/master: "" - containers: - - name: csi-driver - image: ${DRIVER_IMAGE} - args: - - --endpoint=$(CSI_ENDPOINT) - - --k8s-tag-cluster-id=${CLUSTER_ID} - - --logtostderr - - --v=${LOG_LEVEL} - env: - - name: CSI_ENDPOINT - value: unix:///var/lib/csi/sockets/pluginproxy/csi.sock - ports: - - name: healthz - containerPort: 19808 - protocol: TCP - volumeMounts: - - name: socket-dir - mountPath: /var/lib/csi/sockets/pluginproxy/ - - name: csi-provisioner - image: ${PROVISIONER_IMAGE} - args: - - --provisioner=test.csi.openshift.io - - --csi-address=$(ADDRESS) - - --feature-gates=Topology=true - - --http-endpoint=localhost:8202 - - --v=${LOG_LEVEL} - env: - - name: ADDRESS - value: /var/lib/csi/sockets/pluginproxy/csi.sock - volumeMounts: - - name: socket-dir - mountPath: /var/lib/csi/sockets/pluginproxy/ - # In reality, each sidecar needs its own kube-rbac-proxy. Using just one for the unit tests. - - name: provisioner-kube-rbac-proxy - args: - - --secure-listen-address=0.0.0.0:9202 - - --upstream=http://127.0.0.1:8202/ - - --tls-cert-file=/etc/tls/private/tls.crt - - --tls-private-key-file=/etc/tls/private/tls.key - - --logtostderr=true - image: ${KUBE_RBAC_PROXY_IMAGE} - imagePullPolicy: IfNotPresent - ports: - - containerPort: 9202 - name: provisioner-m - protocol: TCP - resources: - requests: - memory: 20Mi - cpu: 10m - volumeMounts: - - mountPath: /etc/tls/private - name: metrics-serving-cert - - name: csi-attacher - image: ${ATTACHER_IMAGE} - args: - - --csi-address=$(ADDRESS) - - --v=${LOG_LEVEL} - env: - - name: ADDRESS - value: /var/lib/csi/sockets/pluginproxy/csi.sock - volumeMounts: - - name: socket-dir - mountPath: /var/lib/csi/sockets/pluginproxy/ - - name: csi-resizer - image: ${RESIZER_IMAGE} - args: - - --csi-address=$(ADDRESS) - - --v=${LOG_LEVEL} - env: - - name: ADDRESS - value: /var/lib/csi/sockets/pluginproxy/csi.sock - volumeMounts: - - name: socket-dir - mountPath: /var/lib/csi/sockets/pluginproxy/ - - name: csi-snapshotter - image: ${SNAPSHOTTER_IMAGE} - args: - - --csi-address=$(ADDRESS) - - --v=${LOG_LEVEL} - env: - - name: ADDRESS - value: /var/lib/csi/sockets/pluginproxy/csi.sock - volumeMounts: - - mountPath: /var/lib/csi/sockets/pluginproxy/ - name: socket-dir - volumes: - - name: socket-dir - emptyDir: {} - - name: metrics-serving-cert - secret: - secretName: gcp-pd-csi-driver-controller-metrics-serving-cert -`) -} diff --git a/pkg/operator/csi/csidrivercontrollerservicecontroller/csi_driver_controller_service_workload_controller.go b/pkg/operator/csi/csidrivercontrollerservicecontroller/csi_driver_controller_service_workload_controller.go new file mode 100644 index 0000000000..f56b723e45 --- /dev/null +++ b/pkg/operator/csi/csidrivercontrollerservicecontroller/csi_driver_controller_service_workload_controller.go @@ -0,0 +1,106 @@ +package csidrivercontrollerservicecontroller + +import ( + configv1 "github.com/openshift/api/config/v1" + "github.com/openshift/library-go/pkg/config/leaderelection" + appsinformersv1 "k8s.io/client-go/informers/apps/v1" + "k8s.io/client-go/kubernetes" + + configinformers "github.com/openshift/client-go/config/informers/externalversions" + "github.com/openshift/library-go/pkg/controller/factory" + dc "github.com/openshift/library-go/pkg/operator/deploymentcontroller" + "github.com/openshift/library-go/pkg/operator/events" + "github.com/openshift/library-go/pkg/operator/v1helpers" +) + +// NewCSIDriverControllerServiceWorkloadController is a controller that deploys a CSI Controller Service to a given namespace. +// +// The CSI Controller Service is represented by a Deployment. The reason it's a Deployment is because this object +// can be evicted and it's shut down on node drain, which is important for master nodes. This Deployment deploys a +// pod with the CSI driver and sidecars containers (provisioner, attacher, resizer, snapshotter, liveness-probe). +// +// On every sync, this controller reads the Deployment from a static file and overrides a few fields: +// +// 1. Container image locations +// +// The controller will replace the images specified in the static file if their name follows a certain nomenclature AND its +// respective environemnt variable is set. This is a list of environment variables that the controller understands: +// +// DRIVER_IMAGE +// PROVISIONER_IMAGE +// ATTACHER_IMAGE +// RESIZER_IMAGE +// SNAPSHOTTER_IMAGE +// LIVENESS_PROBE_IMAGE +// +// The names above should be wrapped by a ${}, e.g., ${DIVER_IMAGE} in static file. +// +// 2. Log level +// +// The controller can also override the log level passed in to the CSI driver container. +// In order to do that, the placeholder ${LOG_LEVEL} from the manifest file is replaced with the value specified +// in the OperatorClient resource (Spec.LogLevel). +// +// 3. Cluster ID +// +// The placeholder ${CLUSTER_ID} specified in the static file is replaced with the cluster ID (sometimes referred as infra ID). +// This is mostly used by CSI drivers to tag volumes and snapshots so that those resources can be cleaned up on cluster deletion. +// +// 4. Leader election parameters +// The placeholders ${LEADER_ELECTION_LEASE_DURATION}, ${LEADER_ELECTION_RENEW_DEADLINE} and ${LEADER_ELECTION_RETRY_PERIOD} +// are replaced with OpenShift's recommended parameters for leader election. +// +// 5. TLS Cipher Suites +// +// The placeholders ${TLS_CIPHER_SUITES} and ${TLS_MIN_VERSION} are replaced with recommended OCP defaults. +// These are primarily meant for Kube RBAC sidecars, which may allow some insecure TLS versions and ciphers suites. +// +// This controller supports removable operands, as configured in pkg/operator/management. +// +// This controller produces the following conditions: +// +// Available: indicates that the CSI Controller Service was successfully deployed and at least one Deployment replica is available. +// Progressing: indicates that the CSI Controller Service is being deployed. +// Degraded: produced when the sync() method returns an error. + +func NewCSIDriverControllerServiceWorkloadController( + name string, + operandNamespace string, + manifest []byte, + recorder events.Recorder, + operatorClient v1helpers.OperatorClientWithFinalizers, + kubeClient kubernetes.Interface, + kubeInformersForNamespaces v1helpers.KubeInformersForNamespaces, + deployInformer appsinformersv1.DeploymentInformer, + configInformer configinformers.SharedInformerFactory, + preconditions []dc.PreconditionFunc, + optionalInformers []factory.Informer, + optionalDeploymentHooks ...dc.DeploymentHookFunc, +) factory.Controller { + optionalInformers = append(optionalInformers, configInformer.Config().V1().Infrastructures().Informer()) + var optionalManifestHooks []dc.ManifestHookFunc + optionalManifestHooks = append(optionalManifestHooks, WithPlaceholdersHook(configInformer)) + optionalManifestHooks = append(optionalManifestHooks, WithServingInfo()) + leConfig := leaderelection.LeaderElectionDefaulting(configv1.LeaderElection{}, "default", "default") + optionalManifestHooks = append(optionalManifestHooks, WithLeaderElectionReplacerHook(leConfig)) + + var deploymentHooks []dc.DeploymentHookFunc + deploymentHooks = append(deploymentHooks, WithControlPlaneTopologyHook(configInformer)) + deploymentHooks = append(deploymentHooks, optionalDeploymentHooks...) + + var newDeploymentController = dc.NewWorkloadController( + name, + operandNamespace, + manifest, + recorder, + operatorClient, + kubeClient, + deployInformer, + kubeInformersForNamespaces, + preconditions, + optionalInformers, + optionalManifestHooks, + deploymentHooks..., + ) + return newDeploymentController +} diff --git a/pkg/operator/csi/csidrivercontrollerservicecontroller/csi_driver_controller_service_workload_controller_test.go b/pkg/operator/csi/csidrivercontrollerservicecontroller/csi_driver_controller_service_workload_controller_test.go new file mode 100644 index 0000000000..9265299db6 --- /dev/null +++ b/pkg/operator/csi/csidrivercontrollerservicecontroller/csi_driver_controller_service_workload_controller_test.go @@ -0,0 +1,389 @@ +package csidrivercontrollerservicecontroller + +import ( + "context" + "fmt" + configv1 "github.com/openshift/api/config/v1" + opv1 "github.com/openshift/api/operator/v1" + dc "github.com/openshift/library-go/pkg/operator/deploymentcontroller" + "github.com/openshift/library-go/pkg/operator/resource/resourceread" + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + "strings" + "testing" + + fakeconfig "github.com/openshift/client-go/config/clientset/versioned/fake" + configinformers "github.com/openshift/client-go/config/informers/externalversions" + "github.com/openshift/library-go/pkg/controller/factory" + "github.com/openshift/library-go/pkg/operator/events" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + coreinformers "k8s.io/client-go/informers" + fakecore "k8s.io/client-go/kubernetes/fake" + + "github.com/openshift/library-go/pkg/operator/v1helpers" +) + +const ( + controllerName = "TestCSIDriverControllerServiceController" + deploymentName = "test-csi-driver-controller" + operandName = "test-csi-driver" + operandNamespace = "openshift-test-csi-driver" + + csiDriverContainerName = "csi-driver" + provisionerContainerName = "csi-provisioner" + attacherContainerName = "csi-attacher" + resizerContainerName = "csi-resizer" + snapshotterContainerName = "csi-snapshotter" + livenessProbeContainerName = "csi-liveness-probe" + kubeRBACProxyContainerName = "provisioner-kube-rbac-proxy" + + defaultClusterID = "ID1234" + + hookDeploymentAnnKey = "operator.openshift.io/foo" + hookDeploymentAnnVal = "bar" +) + +type images struct { + csiDriver string + attacher string + provisioner string + resizer string + snapshotter string + livenessProbe string + kubeRBACProxy string +} + +// Drivers + +type driverModifier func(*fakeDriverInstance) *fakeDriverInstance + +func makeFakeDriverInstance(modifiers ...driverModifier) *fakeDriverInstance { + instance := &fakeDriverInstance{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster", + Generation: 0, + }, + Spec: opv1.OperatorSpec{ + ManagementState: opv1.Managed, + }, + Status: opv1.OperatorStatus{}, + } + for _, modifier := range modifiers { + instance = modifier(instance) + } + return instance +} + +func getIndex(containers []v1.Container, name string) int { + for i := range containers { + if containers[i].Name == name { + return i + } + } + return -1 +} + +// Deployments + +type deploymentModifier func(*appsv1.Deployment) *appsv1.Deployment + +func makeDeployment(clusterID string, logLevel int, images images, modifiers ...deploymentModifier) *appsv1.Deployment { + manifest := makeFakeManifest() + dep := resourceread.ReadDeploymentV1OrDie(manifest) + + // Replace the placeholders in the manifest (, ${DRIVER_IMAGE}, ${LOG_LEVEL}) + containers := dep.Spec.Template.Spec.Containers + if images.csiDriver != "" { + if idx := getIndex(containers, csiDriverContainerName); idx > -1 { + containers[idx].Image = images.csiDriver + for j, arg := range containers[idx].Args { + if strings.HasPrefix(arg, "--k8s-tag-cluster-id=") { + dep.Spec.Template.Spec.Containers[idx].Args[j] = fmt.Sprintf("--k8s-tag-cluster-id=%s", clusterID) + } + } + } + } + + if images.provisioner != "" { + if idx := getIndex(containers, provisionerContainerName); idx > -1 { + containers[idx].Image = images.provisioner + } + } + + if images.attacher != "" { + if idx := getIndex(containers, attacherContainerName); idx > -1 { + containers[idx].Image = images.attacher + } + } + + if images.resizer != "" { + if idx := getIndex(containers, resizerContainerName); idx > -1 { + containers[idx].Image = images.resizer + } + } + + if images.snapshotter != "" { + if idx := getIndex(containers, snapshotterContainerName); idx > -1 { + containers[idx].Image = images.snapshotter + } + } + + if images.livenessProbe != "" { + if idx := getIndex(containers, livenessProbeContainerName); idx > -1 { + containers[idx].Image = images.livenessProbe + } + } + + if images.kubeRBACProxy != "" { + if idx := getIndex(containers, kubeRBACProxyContainerName); idx > -1 { + containers[idx].Image = images.kubeRBACProxy + } + } + + for i, container := range dep.Spec.Template.Spec.Containers { + for j, arg := range container.Args { + if strings.HasPrefix(arg, "--v=") { + dep.Spec.Template.Spec.Containers[i].Args[j] = fmt.Sprintf("--v=%d", logLevel) + } + } + } + + var one int32 = 1 + dep.Spec.Replicas = &one + + for _, modifier := range modifiers { + dep = modifier(dep) + } + + return dep +} + +func withDeploymentReplicas(replicas int32) deploymentModifier { + return func(instance *appsv1.Deployment) *appsv1.Deployment { + instance.Spec.Replicas = &replicas + return instance + } +} + +func withDeploymentGeneration(generations ...int64) deploymentModifier { + return func(instance *appsv1.Deployment) *appsv1.Deployment { + instance.Generation = generations[0] + if len(generations) > 1 { + instance.Status.ObservedGeneration = generations[1] + } + return instance + } +} + +// Infrastructure +func makeInfra() *configv1.Infrastructure { + return &configv1.Infrastructure{ + ObjectMeta: metav1.ObjectMeta{ + Name: infraConfigName, + Namespace: v1.NamespaceAll, + }, + Status: configv1.InfrastructureStatus{ + InfrastructureName: defaultClusterID, + Platform: configv1.AWSPlatformType, + PlatformStatus: &configv1.PlatformStatus{ + AWS: &configv1.AWSPlatformStatus{}, + }, + }, + } +} + +func deploymentAnnotationHook(opSpec *opv1.OperatorSpec, instance *appsv1.Deployment) error { + if instance.Annotations == nil { + instance.Annotations = map[string]string{} + } + instance.Annotations[hookDeploymentAnnKey] = hookDeploymentAnnVal + return nil +} + +func TestWorkloadHook(t *testing.T) { + // Initialize + coreClient := fakecore.NewSimpleClientset() + kubeInformers := v1helpers.NewKubeInformersForNamespaces(coreClient, operandNamespace) + initialInfras := []runtime.Object{makeInfra()} + configClient := fakeconfig.NewSimpleClientset(initialInfras...) + configInformerFactory := configinformers.NewSharedInformerFactory(configClient, 0) + configInformerFactory.Config().V1().Infrastructures().Informer().GetIndexer().Add(initialInfras[0]) + driverInstance := makeFakeDriverInstance() + fakeOperatorClient := v1helpers.NewFakeOperatorClient(&driverInstance.Spec, &driverInstance.Status, nil /*triggerErr func*/) + namespacedInformerFactory := coreinformers.NewSharedInformerFactoryWithOptions(coreClient, 0, coreinformers.WithNamespace(operandNamespace)) + + controller := NewCSIDriverControllerServiceWorkloadController( + controllerName, + operandNamespace, + makeFakeManifest(), + events.NewInMemoryRecorder(operandName), + fakeOperatorClient, + coreClient, + kubeInformers, + namespacedInformerFactory.Apps().V1().Deployments(), + configInformerFactory, + getPreconditions(), + nil, + deploymentAnnotationHook, + ) + + // Act + err := controller.Sync(context.TODO(), factory.NewSyncContext(controllerName, events.NewInMemoryRecorder("test-csi-driver"))) + if err != nil { + t.Fatalf("sync() returned unexpected error: %v", err) + } + + // Assert + actualDeployment, err := coreClient.AppsV1().Deployments(operandNamespace).Get(context.TODO(), deploymentName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get Deployment %s: %v", deploymentName, err) + } + + // Deployment should have the annotation specified in the hook function + if actualDeployment.Annotations[hookDeploymentAnnKey] != hookDeploymentAnnVal { + t.Fatalf("Annotation %q not found in Deployment", hookDeploymentAnnKey) + } +} + +func getPreconditions() []dc.PreconditionFunc { + return []dc.PreconditionFunc{ + func(ctx context.Context) (bool, error) { + return true, nil + }, + } +} + +func defaultImages() images { + return images{ + csiDriver: "quay.io/openshift/origin-test-csi-driver:latest", + provisioner: "quay.io/openshift/origin-csi-external-provisioner:latest", + attacher: "quay.io/openshift/origin-csi-external-attacher:latest", + resizer: "quay.io/openshift/origin-csi-external-resizer:latest", + snapshotter: "quay.io/openshift/origin-csi-external-snapshotter:latest", + livenessProbe: "quay.io/openshift/origin-csi-livenessprobe:latest", + kubeRBACProxy: "quay.io/openshift/origin-kube-rbac-proxy:latest", + } +} + +// fakeInstance is a fake CSI driver instance that also fullfils the OperatorClient interface +type fakeDriverInstance struct { + metav1.ObjectMeta + Spec opv1.OperatorSpec + Status opv1.OperatorStatus +} + +func makeFakeManifest() []byte { + return []byte(` +kind: Deployment +apiVersion: apps/v1 +metadata: + name: test-csi-driver-controller + namespace: openshift-test-csi-driver +spec: + selector: + matchLabels: + app: test-csi-driver-controller + serviceName: test-csi-driver-controller + replicas: 1 + template: + metadata: + labels: + app: test-csi-driver-controller + spec: + nodeSelector: + node-role.kubernetes.io/master: "" + containers: + - name: csi-driver + image: ${DRIVER_IMAGE} + args: + - --endpoint=$(CSI_ENDPOINT) + - --k8s-tag-cluster-id=${CLUSTER_ID} + - --logtostderr + - --v=${LOG_LEVEL} + env: + - name: CSI_ENDPOINT + value: unix:///var/lib/csi/sockets/pluginproxy/csi.sock + ports: + - name: healthz + containerPort: 19808 + protocol: TCP + volumeMounts: + - name: socket-dir + mountPath: /var/lib/csi/sockets/pluginproxy/ + - name: csi-provisioner + image: ${PROVISIONER_IMAGE} + args: + - --provisioner=test.csi.openshift.io + - --csi-address=$(ADDRESS) + - --feature-gates=Topology=true + - --http-endpoint=localhost:8202 + - --v=${LOG_LEVEL} + env: + - name: ADDRESS + value: /var/lib/csi/sockets/pluginproxy/csi.sock + volumeMounts: + - name: socket-dir + mountPath: /var/lib/csi/sockets/pluginproxy/ + # In reality, each sidecar needs its own kube-rbac-proxy. Using just one for the unit tests. + - name: provisioner-kube-rbac-proxy + args: + - --secure-listen-address=0.0.0.0:9202 + - --upstream=http://127.0.0.1:8202/ + - --tls-cert-file=/etc/tls/private/tls.crt + - --tls-private-key-file=/etc/tls/private/tls.key + - --logtostderr=true + image: ${KUBE_RBAC_PROXY_IMAGE} + imagePullPolicy: IfNotPresent + ports: + - containerPort: 9202 + name: provisioner-m + protocol: TCP + resources: + requests: + memory: 20Mi + cpu: 10m + volumeMounts: + - mountPath: /etc/tls/private + name: metrics-serving-cert + - name: csi-attacher + image: ${ATTACHER_IMAGE} + args: + - --csi-address=$(ADDRESS) + - --v=${LOG_LEVEL} + env: + - name: ADDRESS + value: /var/lib/csi/sockets/pluginproxy/csi.sock + volumeMounts: + - name: socket-dir + mountPath: /var/lib/csi/sockets/pluginproxy/ + - name: csi-resizer + image: ${RESIZER_IMAGE} + args: + - --csi-address=$(ADDRESS) + - --v=${LOG_LEVEL} + env: + - name: ADDRESS + value: /var/lib/csi/sockets/pluginproxy/csi.sock + volumeMounts: + - name: socket-dir + mountPath: /var/lib/csi/sockets/pluginproxy/ + - name: csi-snapshotter + image: ${SNAPSHOTTER_IMAGE} + args: + - --csi-address=$(ADDRESS) + - --v=${LOG_LEVEL} + env: + - name: ADDRESS + value: /var/lib/csi/sockets/pluginproxy/csi.sock + volumeMounts: + - mountPath: /var/lib/csi/sockets/pluginproxy/ + name: socket-dir + volumes: + - name: socket-dir + emptyDir: {} + - name: metrics-serving-cert + secret: + secretName: gcp-pd-csi-driver-controller-metrics-serving-cert +`) +} diff --git a/pkg/operator/deploymentcontroller/deployment_workload_controller.go b/pkg/operator/deploymentcontroller/deployment_workload_controller.go new file mode 100644 index 0000000000..b21192e7e9 --- /dev/null +++ b/pkg/operator/deploymentcontroller/deployment_workload_controller.go @@ -0,0 +1,244 @@ +package deploymentcontroller + +import ( + "context" + "fmt" + "github.com/openshift/library-go/pkg/controller/factory" + "github.com/openshift/library-go/pkg/operator/management" + appsv1 "k8s.io/api/apps/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + appsinformersv1 "k8s.io/client-go/informers/apps/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + + opv1 "github.com/openshift/api/operator/v1" + "github.com/openshift/library-go/pkg/operator/apiserver/controller/workload" + "github.com/openshift/library-go/pkg/operator/events" + "github.com/openshift/library-go/pkg/operator/resource/resourceapply" + "github.com/openshift/library-go/pkg/operator/resource/resourcemerge" + "github.com/openshift/library-go/pkg/operator/resource/resourceread" + "github.com/openshift/library-go/pkg/operator/status" + "github.com/openshift/library-go/pkg/operator/v1helpers" +) + +type MyController struct { + DeploymentController + name string + preconditions []PreconditionFunc +} + +type PreconditionFunc func(context.Context) (bool, error) + +func NewWorkloadController( + name string, + operandNamespace string, + manifest []byte, + recorder events.Recorder, + operatorClient v1helpers.OperatorClientWithFinalizers, + kubeClient kubernetes.Interface, + deployInformer appsinformersv1.DeploymentInformer, + kubeInformersForNamespaces v1helpers.KubeInformersForNamespaces, + preconditions []PreconditionFunc, + optionalInformers []factory.Informer, + optionalManifestHooks []ManifestHookFunc, + optionalDeploymentHooks ...DeploymentHookFunc, +) factory.Controller { + c := DelegateBuilder( + manifest, + recorder, + operatorClient, + kubeClient, + deployInformer, + ).WithPrecondition( + preconditions, + ).WithManifestHooks( + optionalManifestHooks..., + ).WithDeploymentHooks( + optionalDeploymentHooks..., + ) + + workloadController := workload.NewController( + name, + operandNamespace, + operandNamespace, + status.VersionForOperandFromEnv(), + "", //Not needed. + "", //Not needed. + operatorClient, + kubeClient, + kubeInformersForNamespaces.PodLister(), + optionalInformers, + []factory.Informer{kubeInformersForNamespaces.InformersFor(operandNamespace).Core().V1().Namespaces().Informer()}, + c, + nil, //Not used? + c.recorder, + status.NewVersionGetter(), + ) + + return workloadController +} + +func DelegateBuilder( + manifest []byte, + recorder events.Recorder, + operatorClient v1helpers.OperatorClientWithFinalizers, + kubeClient kubernetes.Interface, + deployInformer appsinformersv1.DeploymentInformer, +) *MyController { + return &MyController{ + DeploymentController: DeploymentController{ + manifest: manifest, + recorder: recorder, + operatorClient: operatorClient, + kubeClient: kubeClient, + deployInformer: deployInformer, + }, + } +} + +func (c *MyController) WithPrecondition(preconditions []PreconditionFunc) *MyController { + c.preconditions = append(c.preconditions, preconditions...) + return c +} + +func (c *MyController) WithManifestHooks(hooks ...ManifestHookFunc) *MyController { + c.optionalManifestHooks = hooks + return c +} + +func (c *MyController) WithDeploymentHooks(hooks ...DeploymentHookFunc) *MyController { + c.optionalDeploymentHooks = hooks + return c +} + +// Name returns the name of the DeploymentController. +func (c *MyController) Name() string { + return c.name +} + +// PreconditionFulfilled is a function that indicates whether all prerequisites are met and we can Sync. +func (c *MyController) PreconditionFulfilled(ctx context.Context) (bool, error) { + return c.preconditionFulfilledInternal(ctx) +} + +func (c *MyController) preconditionFulfilledInternal(ctx context.Context) (bool, error) { + klog.V(4).Infof("Precondition checks started.") + var errs []error + ok := true + if c.preconditions != nil { + for _, precondition := range c.preconditions { + if ok, err := precondition(ctx); err != nil || !ok { + errs = append(errs, err) + ok = false + } + } + } + preconditionErrors := v1helpers.NewMultiLineAggregate(errs) + + return ok, preconditionErrors +} + +func (c *MyController) Sync(ctx context.Context, syncContext factory.SyncContext) (*appsv1.Deployment, bool, []error) { + errors := []error{} + + opSpec, opStatus, _, err := c.operatorClient.GetOperatorState() + if apierrors.IsNotFound(err) && management.IsOperatorRemovable() { + return nil, true, errors + } + if err != nil { + errors = append(errors, err) + } + + meta, err := c.operatorClient.GetObjectMeta() + if err != nil { + errors = append(errors, err) + } + + required, err := c.getDeployment(opSpec) + if err != nil { + errors = append(errors, err) + return nil, true, errors + } + + // TODO: Reconsider the approach for calculating controller conditions. + // Currently, conditions are derived from a Deployment loaded from YAML manifests, which may not be ideal. + // Consider removing Available/Progressing/Degraded conditions from controller and allowing operators to add their own specific conditions as needed. + var deployment *appsv1.Deployment + if management.IsOperatorRemovable() && meta.DeletionTimestamp != nil { + deployment, err = c.syncDeleting(ctx, required) + if err != nil { + errors = append(errors, err) + } + } else { + deployment, err = c.syncManaged(ctx, required, opStatus, syncContext) + if err != nil { + errors = append(errors, err) + } + } + + //TODO: Returning operatorConfigAtHighestGeneration=true always, we don't have status.observedGeneration in operator. + return deployment, true, errors +} + +func (c *MyController) syncManaged(ctx context.Context, required *appsv1.Deployment, opStatus *opv1.OperatorStatus, syncContext factory.SyncContext) (*appsv1.Deployment, error) { + klog.V(4).Infof("syncManaged") + + if management.IsOperatorRemovable() { + if err := v1helpers.EnsureFinalizer(ctx, c.operatorClient, c.name); err != nil { + return nil, err + } + } + + deployment, _, err := resourceapply.ApplyDeployment( + ctx, + c.kubeClient.AppsV1(), + syncContext.Recorder(), + required, + resourcemerge.ExpectedDeploymentGeneration(required, opStatus.Generations), + ) + if err != nil { + return nil, err + } + + return deployment, err +} + +func (c *MyController) syncDeleting(ctx context.Context, required *appsv1.Deployment) (*appsv1.Deployment, error) { + klog.V(4).Infof("syncDeleting") + + err := c.kubeClient.AppsV1().Deployments(required.Namespace).Delete(ctx, required.Name, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return nil, err + } else { + klog.V(2).Infof("Deleted Deployment %s/%s", required.Namespace, required.Name) + } + + // All removed, remove the finalizer as the last step + if v1helpers.RemoveFinalizer(ctx, c.operatorClient, c.name); err != nil { + return nil, err + } + + return required, nil +} + +func (c *MyController) getDeployment(opSpec *opv1.OperatorSpec) (*appsv1.Deployment, error) { + manifest := c.manifest + for i := range c.optionalManifestHooks { + var err error + manifest, err = c.optionalManifestHooks[i](opSpec, manifest) + if err != nil { + return nil, fmt.Errorf("error running hook function (index=%d): %w", i, err) + } + } + + required := resourceread.ReadDeploymentV1OrDie(manifest) + + for i := range c.optionalDeploymentHooks { + err := c.optionalDeploymentHooks[i](opSpec, required) + if err != nil { + return nil, fmt.Errorf("error running hook function (index=%d): %w", i, err) + } + } + return required, nil +}