diff --git a/internal/rbd/controllerserver.go b/internal/rbd/controllerserver.go index 6709765fa27..7d6ef585232 100644 --- a/internal/rbd/controllerserver.go +++ b/internal/rbd/controllerserver.go @@ -573,7 +573,7 @@ func (cs *ControllerServer) repairExistingVolume(ctx context.Context, req *csi.C // are more than the `minSnapshotOnImage` Add a task to flatten all the // temporary cloned images. func flattenTemporaryClonedImages(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) error { - snaps, err := rbdVol.listSnapshots() + snaps, children, err := rbdVol.listSnapAndChildren() if err != nil { if errors.Is(err, ErrImageNotFound) { return status.Error(codes.InvalidArgument, err.Error()) @@ -589,9 +589,19 @@ func flattenTemporaryClonedImages(ctx context.Context, rbdVol *rbdVolume, cr *ut len(snaps), rbdVol, maxSnapshotsOnImage) + + if len(children) == 0 { + // if none of the child images(are in trash) exist, we can't flatten them. + // return ResourceExhausted error message as we have reached the hard limit. + log.ErrorLog(ctx, "child images of image %q cannot be flatten", rbdVol) + + return status.Errorf(codes.ResourceExhausted, + "rbd image %q has %d snapshots but child images cannot be flattened", + rbdVol, len(snaps)) + } err = flattenClonedRbdImages( ctx, - snaps, + children, rbdVol.Pool, rbdVol.Monitors, rbdVol.RbdImageName, @@ -610,13 +620,21 @@ func flattenTemporaryClonedImages(ctx context.Context, rbdVol *rbdVolume, cr *ut len(snaps), rbdVol, minSnapshotsOnImageToStartFlatten) + if len(children) == 0 { + // if none of the child images(are in trash) exist, we can't flatten them. + // return nil since we have only reach the soft limit. + log.DebugLog(ctx, "child images of image %q cannot be flatten", rbdVol) + + return nil + } // If we start flattening all the snapshots at one shot the volume // creation time will be affected,so we will flatten only the extra // snapshots. - snaps = snaps[minSnapshotsOnImageToStartFlatten-1:] + extraSnapshots := min(len(snaps)-int(minSnapshotsOnImageToStartFlatten), len(children)) + children = children[:extraSnapshots] err = flattenClonedRbdImages( ctx, - snaps, + children, rbdVol.Pool, rbdVol.Monitors, rbdVol.RbdImageName, diff --git a/internal/rbd/rbd_util.go b/internal/rbd/rbd_util.go index fc423feeb99..ec86eedae4b 100644 --- a/internal/rbd/rbd_util.go +++ b/internal/rbd/rbd_util.go @@ -781,13 +781,9 @@ func (ri *rbdImage) getCloneDepth(ctx context.Context) (uint, error) { } } -type trashSnapInfo struct { - origSnapName string -} - func flattenClonedRbdImages( ctx context.Context, - snaps []librbd.SnapInfo, + children []string, pool, monitors, rbdImageName string, cr *util.Credentials, ) error { @@ -803,26 +799,9 @@ func flattenClonedRbdImages( return err } - var origNameList []trashSnapInfo - for _, snapInfo := range snaps { - // check if the snapshot belongs to trash namespace. - isTrash, retErr := rv.isTrashSnap(snapInfo.Id) - if retErr != nil { - return retErr - } - if isTrash { - // get original snap name for the snapshot in trash namespace - origSnapName, retErr := rv.getOrigSnapName(snapInfo.Id) - if retErr != nil { - return retErr - } - origNameList = append(origNameList, trashSnapInfo{origSnapName}) - } - } - - for _, snapName := range origNameList { - rv.RbdImageName = snapName.origSnapName + for _, childName := range children { + rv.RbdImageName = childName err = rv.flattenRbdImage(ctx, true, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth) if err != nil { log.ErrorLog(ctx, "failed to flatten %s; err %v", rv, err) @@ -2048,57 +2027,26 @@ func (ri *rbdImage) DisableDeepFlatten() error { return image.UpdateFeatures(librbd.FeatureDeepFlatten, false) } -func (ri *rbdImage) listSnapshots() ([]librbd.SnapInfo, error) { +// listSnapAndChildren returns list of names of snapshots and child images. +func (ri *rbdImage) listSnapAndChildren() ([]librbd.SnapInfo, []string, error) { image, err := ri.open() if err != nil { - return nil, err + return nil, nil, err } defer image.Close() - snapInfoList, err := image.GetSnapshotNames() - if err != nil { - return nil, err - } - - return snapInfoList, nil -} - -// isTrashSnap returns true if the snapshot belongs to trash namespace. -func (ri *rbdImage) isTrashSnap(snapID uint64) (bool, error) { - image, err := ri.open() - if err != nil { - return false, err - } - defer image.Close() - - // Get namespace type for the snapshot - nsType, err := image.GetSnapNamespaceType(snapID) - if err != nil { - return false, err - } - - if nsType == librbd.SnapNamespaceTypeTrash { - return true, nil - } - - return false, nil -} - -// getOrigSnapName returns the original snap name for -// the snapshots in Trash Namespace. -func (ri *rbdImage) getOrigSnapName(snapID uint64) (string, error) { - image, err := ri.open() + snaps, err := image.GetSnapshotNames() if err != nil { - return "", err + return nil, nil, err } - defer image.Close() - origSnapName, err := image.GetSnapTrashNamespace(snapID) + // ListChildren() returns pools, images, err. + _, children, err := image.ListChildren() if err != nil { - return "", err + return nil, nil, err } - return origSnapName, nil + return snaps, children, nil } func (ri *rbdImage) isCompatibleEncryption(dst *rbdImage) error {