From 65bbcadf6165e849dde40598ce0428b93d9cc4de Mon Sep 17 00:00:00 2001 From: Leonardo Cecchi Date: Sat, 2 Nov 2024 10:05:01 +0000 Subject: [PATCH] Add PV informer and indexer --- cmd/snapshot-controller/main.go | 1 + pkg/common-controller/framework_test.go | 1 + .../groupsnapshot_controller_helper.go | 52 ++++++++--- .../snapshot_controller_base.go | 29 +++++- pkg/utils/pvs.go | 35 ++++--- pkg/utils/pvs_test.go | 92 +++++++------------ 6 files changed, 123 insertions(+), 87 deletions(-) diff --git a/cmd/snapshot-controller/main.go b/cmd/snapshot-controller/main.go index 1f5a157f6..cf7edcd29 100644 --- a/cmd/snapshot-controller/main.go +++ b/cmd/snapshot-controller/main.go @@ -218,6 +218,7 @@ func main() { factory.Groupsnapshot().V1alpha1().VolumeGroupSnapshotContents(), factory.Groupsnapshot().V1alpha1().VolumeGroupSnapshotClasses(), coreFactory.Core().V1().PersistentVolumeClaims(), + coreFactory.Core().V1().PersistentVolumes(), nodeInformer, metricsManager, *resyncPeriod, diff --git a/pkg/common-controller/framework_test.go b/pkg/common-controller/framework_test.go index 2f90083fa..9627a3fb1 100644 --- a/pkg/common-controller/framework_test.go +++ b/pkg/common-controller/framework_test.go @@ -1200,6 +1200,7 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte informerFactory.Groupsnapshot().V1alpha1().VolumeGroupSnapshotContents(), informerFactory.Groupsnapshot().V1alpha1().VolumeGroupSnapshotClasses(), coreFactory.Core().V1().PersistentVolumeClaims(), + coreFactory.Core().V1().PersistentVolumes(), nil, metricsManager, 60*time.Second, diff --git a/pkg/common-controller/groupsnapshot_controller_helper.go b/pkg/common-controller/groupsnapshot_controller_helper.go index 85f2129e8..de92d9509 100644 --- a/pkg/common-controller/groupsnapshot_controller_helper.go +++ b/pkg/common-controller/groupsnapshot_controller_helper.go @@ -529,14 +529,6 @@ func (ctrl *csiSnapshotCommonController) createSnapshotsForGroupSnapshotContent( groupSnapshotContent.Name, err) } - // TODO(leonardoce): this API server call is very expensive. We need to introduce a - // PV lister on the controller and an indexer on spec.csi.driver + "^" + spec.csi.volumeHandle - // to be used for fast lookups - pvs, err := ctrl.client.CoreV1().PersistentVolumes().List(context.TODO(), metav1.ListOptions{}) - if err != nil { - return groupSnapshotContent, fmt.Errorf("createSnapshotsForGroupSnapshotContent: error get PersistentVolumes list from API server: %v", err) - } - // Phase 1: create the VolumeSnapshotContent and VolumeSnapshot objects klog.V(4).Infof( "createSnapshotsForGroupSnapshotContent[%s]: creating volumesnapshots and volumesnapshotcontent for group snapshot content", @@ -550,12 +542,13 @@ func (ctrl *csiSnapshotCommonController) createSnapshotsForGroupSnapshotContent( snapshotHandle := snapshot.SnapshotHandle volumeHandle := snapshot.VolumeHandle - pv := utils.GetPersistentVolumeFromHandle(pvs, groupSnapshotContent.Spec.Driver, volumeHandle) - if pv == nil { + pv, err := ctrl.findPersistentVolumeByCSIDriverHandle(groupSnapshotContent.Spec.Driver, volumeHandle) + if err != nil { klog.Errorf( - "updateGroupSnapshotContentStatus: unable to find PV for volumeHandle:[%s] and CSI driver:[%s]", + "updateGroupSnapshotContentStatus: error while finding PV for volumeHandle:[%s] and CSI driver:[%s]: %s", volumeHandle, - groupSnapshotContent.Spec.Driver) + groupSnapshotContent.Spec.Driver, + err) } volumeSnapshotContentName := getSnapshotContentNameForVolumeGroupSnapshotContent( @@ -615,7 +608,7 @@ func (ctrl *csiSnapshotCommonController) createSnapshotsForGroupSnapshotContent( // The status will be set by VolumeSnapshot reconciler } - _, err := ctrl.clientset.SnapshotV1().VolumeSnapshotContents().Create(ctx, volumeSnapshotContent, metav1.CreateOptions{}) + _, err = ctrl.clientset.SnapshotV1().VolumeSnapshotContents().Create(ctx, volumeSnapshotContent, metav1.CreateOptions{}) if err != nil && !apierrs.IsAlreadyExists(err) { return groupSnapshotContent, fmt.Errorf( "createSnapshotsForGroupSnapshotContent: creating volumesnapshotcontent %w", err) @@ -693,6 +686,39 @@ func (ctrl *csiSnapshotCommonController) createSnapshotsForGroupSnapshotContent( return newGroupSnapshotObj, nil } +// findPersistentVolumeByCSIDriverHandle looks at an existing PersistentVolume +// by CSI driver name and volume handle. +func (ctrl *csiSnapshotCommonController) findPersistentVolumeByCSIDriverHandle(driverName, volumeHandle string) (*v1.PersistentVolume, error) { + pvList, err := ctrl.pvIndexer.ByIndex( + utils.CSIDriverHandleIndexName, + utils.PersistentVolumeKeyFuncByCSIDriverHandle(driverName, volumeHandle), + ) + switch { + case err != nil: + return nil, err + + case len(pvList) == 0: + return nil, nil + + case len(pvList) > 1: + klog.Errorf( + "findPersistentVolumeByCSIDriverHandle: multiple PVs found for for volumeHandle:[%s] and CSI driver:[%s]", + volumeHandle, + driverName) + return nil, fmt.Errorf("multiple PVs found") + + default: + if pvObject, ok := pvList[0].(*v1.PersistentVolume); ok { + return pvObject, nil + } + + klog.Errorf( + "findPersistentVolumeByCSIDriverHandle: found erroneous content in the index") + klog.V(5).Info("findPersistentVolumeByCSIDriverHandle: erroneous content", pvList[0]) + return nil, fmt.Errorf("found erroneous indexed content") + } +} + // getSnapshotNameForVolumeGroupSnapshotContent returns a unique snapshot name for a VolumeGroupSnapshotContent. func getSnapshotNameForVolumeGroupSnapshotContent(groupSnapshotContentUUID, volumeHandle string) string { return fmt.Sprintf("snapshot-%x", sha256.Sum256([]byte(groupSnapshotContentUUID+volumeHandle))) diff --git a/pkg/common-controller/snapshot_controller_base.go b/pkg/common-controller/snapshot_controller_base.go index ba6ac43ea..28ee64836 100644 --- a/pkg/common-controller/snapshot_controller_base.go +++ b/pkg/common-controller/snapshot_controller_base.go @@ -62,6 +62,8 @@ type csiSnapshotCommonController struct { classListerSynced cache.InformerSynced pvcLister corelisters.PersistentVolumeClaimLister pvcListerSynced cache.InformerSynced + pvLister corelisters.PersistentVolumeLister + pvListerSynced cache.InformerSynced nodeLister corelisters.NodeLister nodeListerSynced cache.InformerSynced groupSnapshotLister groupsnapshotlisters.VolumeGroupSnapshotLister @@ -83,6 +85,8 @@ type csiSnapshotCommonController struct { enableDistributedSnapshotting bool preventVolumeModeConversion bool enableVolumeGroupSnapshots bool + + pvIndexer cache.Indexer } // NewCSISnapshotController returns a new *csiSnapshotCommonController @@ -96,6 +100,7 @@ func NewCSISnapshotCommonController( volumeGroupSnapshotContentInformer groupsnapshotinformers.VolumeGroupSnapshotContentInformer, volumeGroupSnapshotClassInformer groupsnapshotinformers.VolumeGroupSnapshotClassInformer, pvcInformer coreinformers.PersistentVolumeClaimInformer, + pvInformer coreinformers.PersistentVolumeInformer, nodeInformer coreinformers.NodeInformer, metricsManager metrics.MetricsManager, resyncPeriod time.Duration, @@ -128,6 +133,22 @@ func NewCSISnapshotCommonController( ctrl.pvcLister = pvcInformer.Lister() ctrl.pvcListerSynced = pvcInformer.Informer().HasSynced + ctrl.pvLister = pvInformer.Lister() + ctrl.pvListerSynced = pvInformer.Informer().HasSynced + + pvInformer.Informer().AddIndexers(map[string]cache.IndexFunc{ + utils.CSIDriverHandleIndexName: func(obj interface{}) ([]string, error) { + if pv, ok := obj.(*v1.PersistentVolume); ok { + if key := utils.PersistentVolumeKeyFunc(pv); key != "" { + return []string{key}, nil + } + } + + return nil, nil + }, + }) + ctrl.pvIndexer = pvInformer.Informer().GetIndexer() + volumeSnapshotInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { ctrl.enqueueSnapshotWork(obj) }, @@ -212,7 +233,13 @@ func (ctrl *csiSnapshotCommonController) Run(workers int, stopCh <-chan struct{} klog.Infof("Starting snapshot controller") defer klog.Infof("Shutting snapshot controller") - informersSynced := []cache.InformerSynced{ctrl.snapshotListerSynced, ctrl.contentListerSynced, ctrl.classListerSynced, ctrl.pvcListerSynced} + informersSynced := []cache.InformerSynced{ + ctrl.snapshotListerSynced, + ctrl.contentListerSynced, + ctrl.classListerSynced, + ctrl.pvcListerSynced, + ctrl.pvListerSynced, + } if ctrl.enableDistributedSnapshotting { informersSynced = append(informersSynced, ctrl.nodeListerSynced) } diff --git a/pkg/utils/pvs.go b/pkg/utils/pvs.go index 675488a37..b2ba5a267 100644 --- a/pkg/utils/pvs.go +++ b/pkg/utils/pvs.go @@ -16,21 +16,26 @@ limitations under the License. package utils -import v1 "k8s.io/api/core/v1" - -// GetPersistentVolumeFromHandle looks for the PV having a certain CSI driver name -// and corresponding to a volume with a given handle, in a PV List. -// If the PV is not found, returns nil -func GetPersistentVolumeFromHandle(pvList *v1.PersistentVolumeList, driverName, volumeHandle string) *v1.PersistentVolume { - for i := range pvList.Items { - if pvList.Items[i].Spec.CSI == nil { - continue - } - - if pvList.Items[i].Spec.CSI.Driver == driverName && pvList.Items[i].Spec.CSI.VolumeHandle == volumeHandle { - return &pvList.Items[i] - } +import ( + "fmt" + + v1 "k8s.io/api/core/v1" +) + +const CSIDriverHandleIndexName = "ByVolumeHandle" + +// PersistentVolumeKeyFunc maps a persistent volume to a string usable +// as KeyFunc to recover it from the CSI driver name and the volume handle. +// If the passed PV is not CSI-based, it will return the empty string +func PersistentVolumeKeyFunc(pv *v1.PersistentVolume) string { + if pv != nil && pv.Spec.CSI != nil { + return fmt.Sprintf("%s^%s", pv.Spec.CSI.Driver, pv.Spec.CSI.VolumeHandle) } + return "" +} - return nil +// PersistentVolumeKeyFuncByCSIDriverHandle returns the key to be used form +// the individual data components +func PersistentVolumeKeyFuncByCSIDriverHandle(driverName, volumeHandle string) string { + return fmt.Sprintf("%s^%s", driverName, volumeHandle) } diff --git a/pkg/utils/pvs_test.go b/pkg/utils/pvs_test.go index f7fff7ff0..9c94e877c 100644 --- a/pkg/utils/pvs_test.go +++ b/pkg/utils/pvs_test.go @@ -23,84 +23,60 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func TestGetPersistentVolumeFromHandle(t *testing.T) { +func TestPersistentVolumeKeyFunc(t *testing.T) { testDriverName := "hostpath.csi.k8s.io" testVolumeHandle := "df39ea9e-1296-11ef-adde-baf37ed30dae" testPvName := "pv-name" - pvListTest := v1.PersistentVolumeList{ - Items: []v1.PersistentVolume{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: testPvName, - }, - Spec: v1.PersistentVolumeSpec{ - PersistentVolumeSource: v1.PersistentVolumeSource{ - CSI: &v1.CSIPersistentVolumeSource{ - Driver: testDriverName, - VolumeHandle: testVolumeHandle, - }, - }, + csiPV := v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: testPvName, + }, + Spec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{ + CSI: &v1.CSIPersistentVolumeSource{ + Driver: testDriverName, + VolumeHandle: testVolumeHandle, }, }, - { - ObjectMeta: metav1.ObjectMeta{ - Name: "pv-no-csi", - }, - Spec: v1.PersistentVolumeSpec{ - PersistentVolumeSource: v1.PersistentVolumeSource{ - HostPath: &v1.HostPathVolumeSource{}, - }, - }, + }, + } + hostPathPV := v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pv-no-csi", + }, + Spec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{ + HostPath: &v1.HostPathVolumeSource{}, }, }, } tests := []struct { - testName string - driverName string - volumeHandle string - pvList v1.PersistentVolumeList - pvName string + testName string + pv *v1.PersistentVolume + expectedKey string }{ { - testName: "empty-pv-list", - driverName: testDriverName, - volumeHandle: testVolumeHandle, - pvName: "", - }, - { - testName: "pv-in-list", - driverName: testDriverName, - volumeHandle: testVolumeHandle, - pvList: pvListTest, - pvName: testPvName, + testName: "nil-pv", + pv: nil, + expectedKey: "", }, { - testName: "not-existing-volume-handle", - driverName: testDriverName, - volumeHandle: "not-existing-volume-handle", - pvList: pvListTest, - pvName: "", + testName: "csi-pv", + pv: &csiPV, + expectedKey: "hostpath.csi.k8s.io^df39ea9e-1296-11ef-adde-baf37ed30dae", }, { - testName: "invalid-driver-name", - driverName: "invalid-driver-name", - volumeHandle: testVolumeHandle, - pvList: pvListTest, - pvName: "", + testName: "hostpath-pv", + pv: &hostPathPV, + expectedKey: "", }, } for _, tt := range tests { - got := GetPersistentVolumeFromHandle(&tt.pvList, tt.driverName, tt.volumeHandle) - if got == nil { - if len(tt.pvName) != 0 { - t.Errorf("%v: GetPersistentVolumeFromHandle = %v WANT %v", tt.testName, got, tt.pvName) - } - } else { - if tt.pvName != got.Name { - t.Errorf("%v: GetPersistentVolumeFromHandle = %v WANT %v", tt.testName, got.Name, tt.pvName) - } + got := PersistentVolumeKeyFunc(tt.pv) + if got != tt.expectedKey { + t.Errorf("%v: PersistentVolumeKeyFunc = %#v WANT %#v", tt.testName, got, tt.expectedKey) } } }