Skip to content

Commit

Permalink
Merge pull request #1182 from leonardoce/informer-lister
Browse files Browse the repository at this point in the history
Add PV informer and indexer
  • Loading branch information
k8s-ci-robot authored Nov 8, 2024
2 parents ab40e9d + 65bbcad commit 2965ff7
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 87 deletions.
1 change: 1 addition & 0 deletions cmd/snapshot-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func main() {
factory.Groupsnapshot().V1alpha1().VolumeGroupSnapshotContents(),
factory.Groupsnapshot().V1alpha1().VolumeGroupSnapshotClasses(),
coreFactory.Core().V1().PersistentVolumeClaims(),
coreFactory.Core().V1().PersistentVolumes(),
nodeInformer,
metricsManager,
*resyncPeriod,
Expand Down
1 change: 1 addition & 0 deletions pkg/common-controller/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1200,6 +1200,7 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte
informerFactory.Groupsnapshot().V1alpha1().VolumeGroupSnapshotContents(),
informerFactory.Groupsnapshot().V1alpha1().VolumeGroupSnapshotClasses(),
coreFactory.Core().V1().PersistentVolumeClaims(),
coreFactory.Core().V1().PersistentVolumes(),
nil,
metricsManager,
60*time.Second,
Expand Down
52 changes: 39 additions & 13 deletions pkg/common-controller/groupsnapshot_controller_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,14 +529,6 @@ func (ctrl *csiSnapshotCommonController) createSnapshotsForGroupSnapshotContent(
groupSnapshotContent.Name, err)
}

// TODO(leonardoce): this API server call is very expensive. We need to introduce a
// PV lister on the controller and an indexer on spec.csi.driver + "^" + spec.csi.volumeHandle
// to be used for fast lookups
pvs, err := ctrl.client.CoreV1().PersistentVolumes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
return groupSnapshotContent, fmt.Errorf("createSnapshotsForGroupSnapshotContent: error get PersistentVolumes list from API server: %v", err)
}

// Phase 1: create the VolumeSnapshotContent and VolumeSnapshot objects
klog.V(4).Infof(
"createSnapshotsForGroupSnapshotContent[%s]: creating volumesnapshots and volumesnapshotcontent for group snapshot content",
Expand All @@ -550,12 +542,13 @@ func (ctrl *csiSnapshotCommonController) createSnapshotsForGroupSnapshotContent(
snapshotHandle := snapshot.SnapshotHandle
volumeHandle := snapshot.VolumeHandle

pv := utils.GetPersistentVolumeFromHandle(pvs, groupSnapshotContent.Spec.Driver, volumeHandle)
if pv == nil {
pv, err := ctrl.findPersistentVolumeByCSIDriverHandle(groupSnapshotContent.Spec.Driver, volumeHandle)
if err != nil {
klog.Errorf(
"updateGroupSnapshotContentStatus: unable to find PV for volumeHandle:[%s] and CSI driver:[%s]",
"updateGroupSnapshotContentStatus: error while finding PV for volumeHandle:[%s] and CSI driver:[%s]: %s",
volumeHandle,
groupSnapshotContent.Spec.Driver)
groupSnapshotContent.Spec.Driver,
err)
}

volumeSnapshotContentName := getSnapshotContentNameForVolumeGroupSnapshotContent(
Expand Down Expand Up @@ -615,7 +608,7 @@ func (ctrl *csiSnapshotCommonController) createSnapshotsForGroupSnapshotContent(
// The status will be set by VolumeSnapshot reconciler
}

_, err := ctrl.clientset.SnapshotV1().VolumeSnapshotContents().Create(ctx, volumeSnapshotContent, metav1.CreateOptions{})
_, err = ctrl.clientset.SnapshotV1().VolumeSnapshotContents().Create(ctx, volumeSnapshotContent, metav1.CreateOptions{})
if err != nil && !apierrs.IsAlreadyExists(err) {
return groupSnapshotContent, fmt.Errorf(
"createSnapshotsForGroupSnapshotContent: creating volumesnapshotcontent %w", err)
Expand Down Expand Up @@ -693,6 +686,39 @@ func (ctrl *csiSnapshotCommonController) createSnapshotsForGroupSnapshotContent(
return newGroupSnapshotObj, nil
}

// findPersistentVolumeByCSIDriverHandle looks at an existing PersistentVolume
// by CSI driver name and volume handle.
func (ctrl *csiSnapshotCommonController) findPersistentVolumeByCSIDriverHandle(driverName, volumeHandle string) (*v1.PersistentVolume, error) {
pvList, err := ctrl.pvIndexer.ByIndex(
utils.CSIDriverHandleIndexName,
utils.PersistentVolumeKeyFuncByCSIDriverHandle(driverName, volumeHandle),
)
switch {
case err != nil:
return nil, err

case len(pvList) == 0:
return nil, nil

case len(pvList) > 1:
klog.Errorf(
"findPersistentVolumeByCSIDriverHandle: multiple PVs found for for volumeHandle:[%s] and CSI driver:[%s]",
volumeHandle,
driverName)
return nil, fmt.Errorf("multiple PVs found")

default:
if pvObject, ok := pvList[0].(*v1.PersistentVolume); ok {
return pvObject, nil
}

klog.Errorf(
"findPersistentVolumeByCSIDriverHandle: found erroneous content in the index")
klog.V(5).Info("findPersistentVolumeByCSIDriverHandle: erroneous content", pvList[0])
return nil, fmt.Errorf("found erroneous indexed content")
}
}

// getSnapshotNameForVolumeGroupSnapshotContent returns a unique snapshot name for a VolumeGroupSnapshotContent.
func getSnapshotNameForVolumeGroupSnapshotContent(groupSnapshotContentUUID, volumeHandle string) string {
return fmt.Sprintf("snapshot-%x", sha256.Sum256([]byte(groupSnapshotContentUUID+volumeHandle)))
Expand Down
29 changes: 28 additions & 1 deletion pkg/common-controller/snapshot_controller_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ type csiSnapshotCommonController struct {
classListerSynced cache.InformerSynced
pvcLister corelisters.PersistentVolumeClaimLister
pvcListerSynced cache.InformerSynced
pvLister corelisters.PersistentVolumeLister
pvListerSynced cache.InformerSynced
nodeLister corelisters.NodeLister
nodeListerSynced cache.InformerSynced
groupSnapshotLister groupsnapshotlisters.VolumeGroupSnapshotLister
Expand All @@ -83,6 +85,8 @@ type csiSnapshotCommonController struct {
enableDistributedSnapshotting bool
preventVolumeModeConversion bool
enableVolumeGroupSnapshots bool

pvIndexer cache.Indexer
}

// NewCSISnapshotController returns a new *csiSnapshotCommonController
Expand All @@ -96,6 +100,7 @@ func NewCSISnapshotCommonController(
volumeGroupSnapshotContentInformer groupsnapshotinformers.VolumeGroupSnapshotContentInformer,
volumeGroupSnapshotClassInformer groupsnapshotinformers.VolumeGroupSnapshotClassInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
pvInformer coreinformers.PersistentVolumeInformer,
nodeInformer coreinformers.NodeInformer,
metricsManager metrics.MetricsManager,
resyncPeriod time.Duration,
Expand Down Expand Up @@ -128,6 +133,22 @@ func NewCSISnapshotCommonController(
ctrl.pvcLister = pvcInformer.Lister()
ctrl.pvcListerSynced = pvcInformer.Informer().HasSynced

ctrl.pvLister = pvInformer.Lister()
ctrl.pvListerSynced = pvInformer.Informer().HasSynced

pvInformer.Informer().AddIndexers(map[string]cache.IndexFunc{
utils.CSIDriverHandleIndexName: func(obj interface{}) ([]string, error) {
if pv, ok := obj.(*v1.PersistentVolume); ok {
if key := utils.PersistentVolumeKeyFunc(pv); key != "" {
return []string{key}, nil
}
}

return nil, nil
},
})
ctrl.pvIndexer = pvInformer.Informer().GetIndexer()

volumeSnapshotInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { ctrl.enqueueSnapshotWork(obj) },
Expand Down Expand Up @@ -212,7 +233,13 @@ func (ctrl *csiSnapshotCommonController) Run(workers int, stopCh <-chan struct{}
klog.Infof("Starting snapshot controller")
defer klog.Infof("Shutting snapshot controller")

informersSynced := []cache.InformerSynced{ctrl.snapshotListerSynced, ctrl.contentListerSynced, ctrl.classListerSynced, ctrl.pvcListerSynced}
informersSynced := []cache.InformerSynced{
ctrl.snapshotListerSynced,
ctrl.contentListerSynced,
ctrl.classListerSynced,
ctrl.pvcListerSynced,
ctrl.pvListerSynced,
}
if ctrl.enableDistributedSnapshotting {
informersSynced = append(informersSynced, ctrl.nodeListerSynced)
}
Expand Down
35 changes: 20 additions & 15 deletions pkg/utils/pvs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,26 @@ limitations under the License.

package utils

import v1 "k8s.io/api/core/v1"

// GetPersistentVolumeFromHandle looks for the PV having a certain CSI driver name
// and corresponding to a volume with a given handle, in a PV List.
// If the PV is not found, returns nil
func GetPersistentVolumeFromHandle(pvList *v1.PersistentVolumeList, driverName, volumeHandle string) *v1.PersistentVolume {
for i := range pvList.Items {
if pvList.Items[i].Spec.CSI == nil {
continue
}

if pvList.Items[i].Spec.CSI.Driver == driverName && pvList.Items[i].Spec.CSI.VolumeHandle == volumeHandle {
return &pvList.Items[i]
}
import (
"fmt"

v1 "k8s.io/api/core/v1"
)

const CSIDriverHandleIndexName = "ByVolumeHandle"

// PersistentVolumeKeyFunc maps a persistent volume to a string usable
// as KeyFunc to recover it from the CSI driver name and the volume handle.
// If the passed PV is not CSI-based, it will return the empty string
func PersistentVolumeKeyFunc(pv *v1.PersistentVolume) string {
if pv != nil && pv.Spec.CSI != nil {
return fmt.Sprintf("%s^%s", pv.Spec.CSI.Driver, pv.Spec.CSI.VolumeHandle)
}
return ""
}

return nil
// PersistentVolumeKeyFuncByCSIDriverHandle returns the key to be used form
// the individual data components
func PersistentVolumeKeyFuncByCSIDriverHandle(driverName, volumeHandle string) string {
return fmt.Sprintf("%s^%s", driverName, volumeHandle)
}
92 changes: 34 additions & 58 deletions pkg/utils/pvs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,84 +23,60 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestGetPersistentVolumeFromHandle(t *testing.T) {
func TestPersistentVolumeKeyFunc(t *testing.T) {
testDriverName := "hostpath.csi.k8s.io"
testVolumeHandle := "df39ea9e-1296-11ef-adde-baf37ed30dae"
testPvName := "pv-name"

pvListTest := v1.PersistentVolumeList{
Items: []v1.PersistentVolume{
{
ObjectMeta: metav1.ObjectMeta{
Name: testPvName,
},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
CSI: &v1.CSIPersistentVolumeSource{
Driver: testDriverName,
VolumeHandle: testVolumeHandle,
},
},
csiPV := v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: testPvName,
},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
CSI: &v1.CSIPersistentVolumeSource{
Driver: testDriverName,
VolumeHandle: testVolumeHandle,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "pv-no-csi",
},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
HostPath: &v1.HostPathVolumeSource{},
},
},
},
}
hostPathPV := v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: "pv-no-csi",
},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
HostPath: &v1.HostPathVolumeSource{},
},
},
}

tests := []struct {
testName string
driverName string
volumeHandle string
pvList v1.PersistentVolumeList
pvName string
testName string
pv *v1.PersistentVolume
expectedKey string
}{
{
testName: "empty-pv-list",
driverName: testDriverName,
volumeHandle: testVolumeHandle,
pvName: "",
},
{
testName: "pv-in-list",
driverName: testDriverName,
volumeHandle: testVolumeHandle,
pvList: pvListTest,
pvName: testPvName,
testName: "nil-pv",
pv: nil,
expectedKey: "",
},
{
testName: "not-existing-volume-handle",
driverName: testDriverName,
volumeHandle: "not-existing-volume-handle",
pvList: pvListTest,
pvName: "",
testName: "csi-pv",
pv: &csiPV,
expectedKey: "hostpath.csi.k8s.io^df39ea9e-1296-11ef-adde-baf37ed30dae",
},
{
testName: "invalid-driver-name",
driverName: "invalid-driver-name",
volumeHandle: testVolumeHandle,
pvList: pvListTest,
pvName: "",
testName: "hostpath-pv",
pv: &hostPathPV,
expectedKey: "",
},
}
for _, tt := range tests {
got := GetPersistentVolumeFromHandle(&tt.pvList, tt.driverName, tt.volumeHandle)
if got == nil {
if len(tt.pvName) != 0 {
t.Errorf("%v: GetPersistentVolumeFromHandle = %v WANT %v", tt.testName, got, tt.pvName)
}
} else {
if tt.pvName != got.Name {
t.Errorf("%v: GetPersistentVolumeFromHandle = %v WANT %v", tt.testName, got.Name, tt.pvName)
}
got := PersistentVolumeKeyFunc(tt.pv)
if got != tt.expectedKey {
t.Errorf("%v: PersistentVolumeKeyFunc = %#v WANT %#v", tt.testName, got, tt.expectedKey)
}
}
}

0 comments on commit 2965ff7

Please sign in to comment.