Skip to content

Commit

Permalink
Merge pull request #283 from red-hat-storage/sync_us--devel
Browse files Browse the repository at this point in the history
Syncing latest changes from upstream devel for ceph-csi
  • Loading branch information
openshift-merge-bot[bot] authored Mar 29, 2024
2 parents fc96fd1 + 3df396e commit b2c0df2
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 59 deletions.
50 changes: 31 additions & 19 deletions internal/rbd/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func validateStriping(parameters map[string]string) error {
func (cs *ControllerServer) parseVolCreateRequest(
ctx context.Context,
req *csi.CreateVolumeRequest,
cr *util.Credentials,
) (*rbdVolume, error) {
// TODO (sbezverk) Last check for not exceeding total storage capacity

Expand Down Expand Up @@ -226,6 +227,13 @@ func (cs *ControllerServer) parseVolCreateRequest(
return nil, status.Error(codes.InvalidArgument, err.Error())
}

err = rbdVol.Connect(cr)
if err != nil {
log.ErrorLog(ctx, "failed to connect to volume %v: %v", rbdVol.RbdImageName, err)

return nil, status.Error(codes.Internal, err.Error())
}

// NOTE: rbdVol does not contain VolID and RbdImageName populated, everything
// else is populated post create request parsing
return rbdVol, nil
Expand Down Expand Up @@ -324,7 +332,7 @@ func (cs *ControllerServer) CreateVolume(
return nil, status.Error(codes.InvalidArgument, err.Error())
}
defer cr.DeleteCredentials()
rbdVol, err := cs.parseVolCreateRequest(ctx, req)
rbdVol, err := cs.parseVolCreateRequest(ctx, req, cr)
if err != nil {
return nil, err
}
Expand All @@ -337,17 +345,16 @@ func (cs *ControllerServer) CreateVolume(
}
defer cs.VolumeLocks.Release(req.GetName())

err = rbdVol.Connect(cr)
if err != nil {
log.ErrorLog(ctx, "failed to connect to volume %v: %v", rbdVol.RbdImageName, err)

return nil, status.Error(codes.Internal, err.Error())
}

parentVol, rbdSnap, err := checkContentSource(ctx, req, cr)
if err != nil {
return nil, err
}
if parentVol != nil {
defer parentVol.Destroy()
}
if rbdSnap != nil {
defer rbdSnap.Destroy()
}

err = updateTopologyConstraints(rbdVol, rbdSnap)
if err != nil {
Expand Down Expand Up @@ -638,15 +645,14 @@ func (cs *ControllerServer) createVolumeFromSnapshot(
rbdVol *rbdVolume,
snapshotID string,
) error {
rbdSnap := &rbdSnapshot{}
if acquired := cs.SnapshotLocks.TryAcquire(snapshotID); !acquired {
log.ErrorLog(ctx, util.SnapshotOperationAlreadyExistsFmt, snapshotID)

return status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, snapshotID)
}
defer cs.SnapshotLocks.Release(snapshotID)

err := genSnapFromSnapID(ctx, rbdSnap, snapshotID, cr, secrets)
rbdSnap, err := genSnapFromSnapID(ctx, snapshotID, cr, secrets)
if err != nil {
if errors.Is(err, util.ErrPoolNotFound) {
log.ErrorLog(ctx, "failed to get backend snapshot for %s: %v", snapshotID, err)
Expand All @@ -656,10 +662,11 @@ func (cs *ControllerServer) createVolumeFromSnapshot(

return status.Error(codes.Internal, err.Error())
}
defer rbdSnap.Destroy()

// update parent name(rbd image name in snapshot)
rbdSnap.RbdImageName = rbdSnap.RbdSnapName
parentVol := generateVolFromSnap(rbdSnap)
parentVol := rbdSnap.toVolume()
// as we are operating on single cluster reuse the connection
parentVol.conn = rbdVol.conn.Copy()

Expand Down Expand Up @@ -789,8 +796,8 @@ func checkContentSource(
if snapshotID == "" {
return nil, nil, status.Errorf(codes.NotFound, "volume Snapshot ID cannot be empty")
}
rbdSnap := &rbdSnapshot{}
if err := genSnapFromSnapID(ctx, rbdSnap, snapshotID, cr, req.GetSecrets()); err != nil {
rbdSnap, err := genSnapFromSnapID(ctx, snapshotID, cr, req.GetSecrets())
if err != nil {
log.ErrorLog(ctx, "failed to get backend snapshot for %s: %v", snapshotID, err)
if !errors.Is(err, ErrSnapNotFound) {
return nil, nil, status.Error(codes.Internal, err.Error())
Expand Down Expand Up @@ -1230,7 +1237,7 @@ func cloneFromSnapshot(
cr *util.Credentials,
parameters map[string]string,
) (*csi.CreateSnapshotResponse, error) {
vol := generateVolFromSnap(rbdSnap)
vol := rbdSnap.toVolume()
err := vol.Connect(cr)
if err != nil {
uErr := undoSnapshotCloning(ctx, rbdVol, rbdSnap, vol, cr)
Expand Down Expand Up @@ -1315,7 +1322,7 @@ func (cs *ControllerServer) doSnapshotClone(
cr *util.Credentials,
) (*rbdVolume, error) {
// generate cloned volume details from snapshot
cloneRbd := generateVolFromSnap(rbdSnap)
cloneRbd := rbdSnap.toVolume()
defer cloneRbd.Destroy()
// add image feature for cloneRbd
f := []string{librbd.FeatureNameLayering, librbd.FeatureNameDeepFlatten}
Expand Down Expand Up @@ -1429,8 +1436,8 @@ func (cs *ControllerServer) DeleteSnapshot(
}
defer cs.OperationLocks.ReleaseDeleteLock(snapshotID)

rbdSnap := &rbdSnapshot{}
if err = genSnapFromSnapID(ctx, rbdSnap, snapshotID, cr, req.GetSecrets()); err != nil {
rbdSnap, err := genSnapFromSnapID(ctx, snapshotID, cr, req.GetSecrets())
if err != nil {
// if error is ErrPoolNotFound, the pool is already deleted we don't
// need to worry about deleting snapshot or omap data, return success
if errors.Is(err, util.ErrPoolNotFound) {
Expand All @@ -1443,12 +1450,16 @@ func (cs *ControllerServer) DeleteSnapshot(
// or partially complete (snap and snapOMap are garbage collected already), hence return
// success as deletion is complete
if errors.Is(err, util.ErrKeyNotFound) {
log.UsefulLog(ctx, "snapshot %s was been deleted already: %v", snapshotID, err)

return &csi.DeleteSnapshotResponse{}, nil
}

// if the error is ErrImageNotFound, We need to cleanup the image from
// trash and remove the metadata in OMAP.
if errors.Is(err, ErrImageNotFound) {
log.UsefulLog(ctx, "cleaning up leftovers of snapshot %s: %v", snapshotID, err)

err = cleanUpImageAndSnapReservation(ctx, rbdSnap, cr)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
Expand All @@ -1459,6 +1470,7 @@ func (cs *ControllerServer) DeleteSnapshot(

return nil, status.Error(codes.Internal, err.Error())
}
defer rbdSnap.Destroy()

// safeguard against parallel create or delete requests against the same
// name
Expand All @@ -1472,7 +1484,7 @@ func (cs *ControllerServer) DeleteSnapshot(
// Deleting snapshot and cloned volume
log.DebugLog(ctx, "deleting cloned rbd volume %s", rbdSnap.RbdSnapName)

rbdVol := generateVolFromSnap(rbdSnap)
rbdVol := rbdSnap.toVolume()

err = rbdVol.Connect(cr)
if err != nil {
Expand Down Expand Up @@ -1503,7 +1515,7 @@ func (cs *ControllerServer) DeleteSnapshot(
// cleanUpImageAndSnapReservation cleans up the image from the trash and
// snapshot reservation in rados OMAP.
func cleanUpImageAndSnapReservation(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credentials) error {
rbdVol := generateVolFromSnap(rbdSnap)
rbdVol := rbdSnap.toVolume()
err := rbdVol.Connect(cr)
if err != nil {
return status.Error(codes.Internal, err.Error())
Expand Down
2 changes: 1 addition & 1 deletion internal/rbd/rbd_journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func checkSnapCloneExists(
snapData.ImagePool, rbdSnap.Pool)
}

vol := generateVolFromSnap(rbdSnap)
vol := rbdSnap.toVolume()
defer vol.Destroy()
err = vol.Connect(cr)
if err != nil {
Expand Down
47 changes: 25 additions & 22 deletions internal/rbd/rbd_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,54 +949,57 @@ func (ri *rbdImage) checkImageChainHasFeature(ctx context.Context, feature uint6

// genSnapFromSnapID generates a rbdSnapshot structure from the provided identifier, updating
// the structure with elements from on-disk snapshot metadata as well.
//
// NOTE: The returned rbdSnapshot can be non-nil in case of an error. That
// seems to be required for the DeleteSnapshot procedure, so that OMAP
// attributes can be cleaned-up.
func genSnapFromSnapID(
ctx context.Context,
rbdSnap *rbdSnapshot,
snapshotID string,
cr *util.Credentials,
secrets map[string]string,
) error {
) (*rbdSnapshot, error) {
var vi util.CSIIdentifier

rbdSnap.VolID = snapshotID

err := vi.DecomposeCSIID(rbdSnap.VolID)
err := vi.DecomposeCSIID(snapshotID)
if err != nil {
log.ErrorLog(ctx, "error decoding snapshot ID (%s) (%s)", err, rbdSnap.VolID)
log.ErrorLog(ctx, "error decoding snapshot ID (%s) (%s)", err, snapshotID)

return err
return nil, err
}

rbdSnap := &rbdSnapshot{}
rbdSnap.VolID = snapshotID
rbdSnap.ClusterID = vi.ClusterID

rbdSnap.Monitors, _, err = util.GetMonsAndClusterID(ctx, rbdSnap.ClusterID, false)
if err != nil {
log.ErrorLog(ctx, "failed getting mons (%s)", err)

return err
return nil, err
}

rbdSnap.Pool, err = util.GetPoolName(rbdSnap.Monitors, cr, vi.LocationID)
if err != nil {
return err
return nil, err
}
rbdSnap.JournalPool = rbdSnap.Pool

rbdSnap.RadosNamespace, err = util.GetRadosNamespace(util.CsiConfigFile, rbdSnap.ClusterID)
if err != nil {
return err
return nil, err
}

j, err := snapJournal.Connect(rbdSnap.Monitors, rbdSnap.RadosNamespace, cr)
if err != nil {
return err
return nil, err
}
defer j.Destroy()

imageAttributes, err := j.GetImageAttributes(
ctx, rbdSnap.Pool, vi.ObjectUUID, true)
if err != nil {
return err
return rbdSnap, err
}
rbdSnap.ImageID = imageAttributes.ImageID
rbdSnap.RequestName = imageAttributes.RequestName
Expand All @@ -1009,48 +1012,48 @@ func genSnapFromSnapID(
rbdSnap.JournalPool, err = util.GetPoolName(rbdSnap.Monitors, cr, imageAttributes.JournalPoolID)
if err != nil {
// TODO: If pool is not found we may leak the image (as DeleteSnapshot will return success)
return err
return rbdSnap, err
}
}

err = rbdSnap.Connect(cr)
if err != nil {
return rbdSnap, fmt.Errorf("failed to connect to %q: %w",
rbdSnap, err)
}
defer func() {
if err != nil {
rbdSnap.Destroy()
}
}()
if err != nil {
return fmt.Errorf("failed to connect to %q: %w",
rbdSnap, err)
}

if imageAttributes.KmsID != "" && imageAttributes.EncryptionType == util.EncryptionTypeBlock {
err = rbdSnap.configureBlockEncryption(imageAttributes.KmsID, secrets)
if err != nil {
return fmt.Errorf("failed to configure block encryption for "+
return rbdSnap, fmt.Errorf("failed to configure block encryption for "+
"%q: %w", rbdSnap, err)
}
}
if imageAttributes.KmsID != "" && imageAttributes.EncryptionType == util.EncryptionTypeFile {
err = rbdSnap.configureFileEncryption(ctx, imageAttributes.KmsID, secrets)
if err != nil {
return fmt.Errorf("failed to configure file encryption for "+
return rbdSnap, fmt.Errorf("failed to configure file encryption for "+
"%q: %w", rbdSnap, err)
}
}

err = updateSnapshotDetails(rbdSnap)
if err != nil {
return fmt.Errorf("failed to update snapshot details for %q: %w", rbdSnap, err)
return rbdSnap, fmt.Errorf("failed to update snapshot details for %q: %w", rbdSnap, err)
}

return err
return rbdSnap, err
}

// updateSnapshotDetails will copy the details from the rbdVolume to the
// rbdSnapshot. example copying size from rbdVolume to rbdSnapshot.
func updateSnapshotDetails(rbdSnap *rbdSnapshot) error {
vol := generateVolFromSnap(rbdSnap)
vol := rbdSnap.toVolume()
err := vol.Connect(rbdSnap.conn.Creds)
if err != nil {
return err
Expand Down
35 changes: 18 additions & 17 deletions internal/rbd/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,23 +98,24 @@ func cleanUpSnapshot(
return nil
}

func generateVolFromSnap(rbdSnap *rbdSnapshot) *rbdVolume {
vol := new(rbdVolume)
vol.ClusterID = rbdSnap.ClusterID
vol.VolID = rbdSnap.VolID
vol.Monitors = rbdSnap.Monitors
vol.Pool = rbdSnap.Pool
vol.JournalPool = rbdSnap.JournalPool
vol.RadosNamespace = rbdSnap.RadosNamespace
vol.RbdImageName = rbdSnap.RbdSnapName
vol.ImageID = rbdSnap.ImageID
// copyEncryptionConfig cannot be used here because the volume and the
// snapshot will have the same volumeID which cases the panic in
// copyEncryptionConfig function.
vol.blockEncryption = rbdSnap.blockEncryption
vol.fileEncryption = rbdSnap.fileEncryption

return vol
func (rbdSnap *rbdSnapshot) toVolume() *rbdVolume {
return &rbdVolume{
rbdImage: rbdImage{
ClusterID: rbdSnap.ClusterID,
VolID: rbdSnap.VolID,
Monitors: rbdSnap.Monitors,
Pool: rbdSnap.Pool,
JournalPool: rbdSnap.JournalPool,
RadosNamespace: rbdSnap.RadosNamespace,
RbdImageName: rbdSnap.RbdSnapName,
ImageID: rbdSnap.ImageID,
// copyEncryptionConfig cannot be used here because the volume and the
// snapshot will have the same volumeID which cases the panic in
// copyEncryptionConfig function.
blockEncryption: rbdSnap.blockEncryption,
fileEncryption: rbdSnap.fileEncryption,
},
}
}

func undoSnapshotCloning(
Expand Down

0 comments on commit b2c0df2

Please sign in to comment.