Skip to content

Commit

Permalink
Merge pull request #352 from red-hat-storage/sync_us--devel
Browse files Browse the repository at this point in the history
Syncing latest changes from upstream devel for ceph-csi
  • Loading branch information
openshift-merge-bot[bot] authored Aug 1, 2024
2 parents e97b32c + e3697f4 commit 0da37f9
Show file tree
Hide file tree
Showing 19 changed files with 298 additions and 156 deletions.
1 change: 1 addition & 0 deletions PendingReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@
- cephfs: support omap data store in radosnamespace via cli argument in [PR](https://github.com/ceph/ceph-csi/pull/4652)
- deploy: radosNamespaceCephFS can be configured for ceph-csi-cephfs chart in [PR](https://github.com/ceph/ceph-csi/pull/4652)
- build: update ceph release to squid in [PR](https://github.com/ceph/ceph-csi/pull/4735)
- build: CentOS Stream 9 is used as OS in the container-images [PR](https://github.com/ceph/ceph-csi/pull/4735)

## NOTE
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/ceph/ceph-csi/api v0.0.0-00010101000000-000000000000
github.com/ceph/go-ceph v0.28.0
github.com/container-storage-interface/spec v1.10.0
github.com/csi-addons/spec v0.2.1-0.20240718113938-dc98b454ba65
github.com/csi-addons/spec v0.2.1-0.20240730084235-3958a5b17d24
github.com/gemalto/kmip-go v0.0.10
github.com/golang/protobuf v1.5.4
github.com/google/fscrypt v0.3.6-0.20240502174735-068b9f8f5dec
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,8 @@ github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ
github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4=
github.com/csi-addons/spec v0.2.1-0.20240718113938-dc98b454ba65 h1:i9JGGQTEmRQXSpQQPR96+DV4D4o+V1+gjAWf+bpxQxk=
github.com/csi-addons/spec v0.2.1-0.20240718113938-dc98b454ba65/go.mod h1:Mwq4iLiUV4s+K1bszcWU6aMsR5KPsbIYzzszJ6+56vI=
github.com/csi-addons/spec v0.2.1-0.20240730084235-3958a5b17d24 h1:tJETaYbnnzlCSaqDXQzbszYyuAtG/sFzm6DargeVzJA=
github.com/csi-addons/spec v0.2.1-0.20240730084235-3958a5b17d24/go.mod h1:Mwq4iLiUV4s+K1bszcWU6aMsR5KPsbIYzzszJ6+56vI=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
Expand Down
89 changes: 23 additions & 66 deletions internal/csi-addons/rbd/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,16 +282,7 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,

rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
if err != nil {
switch {
case errors.Is(err, corerbd.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID)
default:
err = status.Errorf(codes.Internal, err.Error())
}

return nil, err
return nil, getGRPCError(err)
}
mirror, err := rbdVol.ToMirror()
if err != nil {
Expand All @@ -309,7 +300,7 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
return nil, err
}

info, err := mirror.GetMirroringInfo()
info, err := mirror.GetMirroringInfo(ctx)
if err != nil {
log.ErrorLog(ctx, err.Error())

Expand All @@ -322,7 +313,7 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,

return nil, getGRPCError(err)
}
err = mirror.EnableMirroring(mirroringMode)
err = mirror.EnableMirroring(ctx, mirroringMode)
if err != nil {
log.ErrorLog(ctx, err.Error())

Expand Down Expand Up @@ -361,16 +352,7 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,

rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
if err != nil {
switch {
case errors.Is(err, corerbd.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID)
default:
err = status.Errorf(codes.Internal, err.Error())
}

return nil, err
return nil, getGRPCError(err)
}
mirror, err := rbdVol.ToMirror()
if err != nil {
Expand All @@ -383,7 +365,7 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
return nil, err
}

info, err := mirror.GetMirroringInfo()
info, err := mirror.GetMirroringInfo(ctx)
if err != nil {
log.ErrorLog(ctx, err.Error())

Expand All @@ -396,7 +378,7 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
case librbd.MirrorImageDisabling.String():
return nil, status.Errorf(codes.Aborted, "%s is in disabling state", volumeID)
case librbd.MirrorImageEnabled.String():
err = corerbd.DisableVolumeReplication(mirror, info.IsPrimary(), force)
err = corerbd.DisableVolumeReplication(mirror, ctx, info.IsPrimary(), force)
if err != nil {
return nil, getGRPCError(err)
}
Expand Down Expand Up @@ -438,23 +420,14 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,

rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
if err != nil {
switch {
case errors.Is(err, corerbd.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID)
default:
err = status.Errorf(codes.Internal, err.Error())
}

return nil, err
return nil, getGRPCError(err)
}
mirror, err := rbdVol.ToMirror()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

info, err := mirror.GetMirroringInfo()
info, err := mirror.GetMirroringInfo(ctx)
if err != nil {
log.ErrorLog(ctx, err.Error())

Expand All @@ -474,9 +447,9 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
if req.GetForce() {
// workaround for https://github.com/ceph/ceph-csi/issues/2736
// TODO: remove this workaround when the issue is fixed
err = mirror.ForcePromote(cr)
err = mirror.ForcePromote(ctx, cr)
} else {
err = mirror.Promote(req.GetForce())
err = mirror.Promote(ctx, req.GetForce())
}
if err != nil {
log.ErrorLog(ctx, err.Error())
Expand Down Expand Up @@ -540,16 +513,7 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context,

rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
if err != nil {
switch {
case errors.Is(err, corerbd.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID)
default:
err = status.Errorf(codes.Internal, err.Error())
}

return nil, err
return nil, getGRPCError(err)
}
mirror, err := rbdVol.ToMirror()
if err != nil {
Expand All @@ -563,7 +527,7 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context,
return nil, status.Error(codes.Internal, err.Error())
}

info, err := mirror.GetMirroringInfo()
info, err := mirror.GetMirroringInfo(ctx)
if err != nil {
log.ErrorLog(ctx, err.Error())

Expand Down Expand Up @@ -592,7 +556,7 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context,
return nil, status.Error(codes.Internal, err.Error())
}

err = mirror.Demote()
err = mirror.Demote(ctx)
if err != nil {
log.ErrorLog(ctx, err.Error())

Expand Down Expand Up @@ -659,23 +623,14 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,

rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
if err != nil {
switch {
case errors.Is(err, corerbd.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID)
default:
err = status.Errorf(codes.Internal, err.Error())
}

return nil, err
return nil, getGRPCError(err)
}
mirror, err := rbdVol.ToMirror()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

info, err := mirror.GetMirroringInfo()
info, err := mirror.GetMirroringInfo(ctx)
if err != nil {
// in case of Resync the image will get deleted and gets recreated and
// it takes time for this operation.
Expand All @@ -693,7 +648,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
return nil, status.Error(codes.InvalidArgument, "image is in primary state")
}

sts, err := mirror.GetGlobalMirroringStatus()
sts, err := mirror.GetGlobalMirroringStatus(ctx)
if err != nil {
// the image gets recreated after issuing resync
if errors.Is(err, corerbd.ErrImageNotFound) {
Expand Down Expand Up @@ -764,7 +719,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
}
log.DebugLog(ctx, "image %s, savedImageTime=%v, currentImageTime=%v", rbdVol, st, creationTime.AsTime())
if req.GetForce() && st.Equal(creationTime.AsTime()) {
err = mirror.Resync()
err = mirror.Resync(ctx)
if err != nil {
return nil, getGRPCError(err)
}
Expand Down Expand Up @@ -830,6 +785,8 @@ func getGRPCError(err error) error {
}

errorStatusMap := map[error]codes.Code{
corerbd.ErrImageNotFound: codes.NotFound,
util.ErrPoolNotFound: codes.NotFound,
corerbd.ErrInvalidArgument: codes.InvalidArgument,
corerbd.ErrFlattenInProgress: codes.Aborted,
corerbd.ErrAborted: codes.Aborted,
Expand Down Expand Up @@ -875,9 +832,9 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
if err != nil {
switch {
case errors.Is(err, corerbd.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
err = status.Errorf(codes.NotFound, err.Error())
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID)
err = status.Errorf(codes.NotFound, err.Error())
default:
err = status.Errorf(codes.Internal, err.Error())
}
Expand All @@ -889,7 +846,7 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
return nil, status.Error(codes.Internal, err.Error())
}

info, err := mirror.GetMirroringInfo()
info, err := mirror.GetMirroringInfo(ctx)
if err != nil {
log.ErrorLog(ctx, err.Error())

Expand All @@ -905,7 +862,7 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
return nil, status.Error(codes.InvalidArgument, "image is not in primary state")
}

mirrorStatus, err := mirror.GetGlobalMirroringStatus()
mirrorStatus, err := mirror.GetGlobalMirroringStatus(ctx)
if err != nil {
if errors.Is(err, corerbd.ErrImageNotFound) {
return nil, status.Error(codes.Aborted, err.Error())
Expand Down
11 changes: 11 additions & 0 deletions internal/csi-addons/rbd/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

corerbd "github.com/ceph/ceph-csi/internal/rbd"
"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util"

librbd "github.com/ceph/go-ceph/rbd"
"github.com/ceph/go-ceph/rbd/admin"
Expand Down Expand Up @@ -594,6 +595,16 @@ func TestGetGRPCError(t *testing.T) {
err: nil,
expectedErr: status.Error(codes.OK, "ok string"),
},
{
name: "ErrImageNotFound",
err: corerbd.ErrImageNotFound,
expectedErr: status.Error(codes.NotFound, corerbd.ErrImageNotFound.Error()),
},
{
name: "ErrPoolNotFound",
err: util.ErrPoolNotFound,
expectedErr: status.Error(codes.NotFound, util.ErrPoolNotFound.Error()),
},
}

for _, tt := range tests {
Expand Down
59 changes: 52 additions & 7 deletions internal/csi-addons/rbd/volumegroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package rbd

import (
"context"
"fmt"
"slices"

"github.com/ceph/ceph-csi/internal/rbd"
Expand Down Expand Up @@ -87,6 +88,11 @@ func (vs *VolumeGroupServer) CreateVolumeGroup(

// resolve all volumes
volumes := make([]types.Volume, len(req.GetVolumeIds()))
defer func() {
for _, vol := range volumes {
vol.Destroy(ctx)
}
}()
for i, id := range req.GetVolumeIds() {
vol, err := mgr.GetVolumeByID(ctx, id)
if err != nil {
Expand All @@ -97,9 +103,6 @@ func (vs *VolumeGroupServer) CreateVolumeGroup(
req.GetName(),
err.Error())
}

//nolint:gocritic // need to call .Destroy() for all volumes
defer vol.Destroy(ctx)
volumes[i] = vol
}

Expand All @@ -117,6 +120,21 @@ func (vs *VolumeGroupServer) CreateVolumeGroup(

log.DebugLog(ctx, "VolumeGroup %q has been created: %+v", req.GetName(), vg)

// extract the flatten mode
flattenMode, err := getFlattenMode(ctx, req.GetParameters())
if err != nil {
return nil, err
}
// Flatten the image if the flatten mode is set to FlattenModeForce
// before adding it to the volume group.
for _, v := range volumes {
err = v.HandleParentImageExistence(ctx, flattenMode)
if err != nil {
err = fmt.Errorf("failed to handle parent image for volume group %q: %w", vg, err)

return nil, getGRPCError(err)
}
}
// add each rbd-image to the RBDVolumeGroup
for _, vol := range volumes {
err = vg.AddVolume(ctx, vol)
Expand Down Expand Up @@ -328,17 +346,44 @@ func (vs *VolumeGroupServer) ModifyVolumeGroupMembership(
}
}

// add the new volumes to the group
for _, id := range toAdd {
vol, getErr := mgr.GetVolumeByID(ctx, id)
if getErr != nil {
// resolve all volumes
volumes := make([]types.Volume, len(toAdd))
defer func() {
for _, vol := range volumes {
vol.Destroy(ctx)
}
}()
for i, id := range toAdd {
var vol types.Volume
vol, err = mgr.GetVolumeByID(ctx, id)
if err != nil {
return nil, status.Errorf(
codes.NotFound,
"failed to find a volume with CSI ID %q: %v",
id,
err)
}
volumes[i] = vol
}

// extract the flatten mode
flattenMode, err := getFlattenMode(ctx, req.GetParameters())
if err != nil {
return nil, err
}
// Flatten the image if the flatten mode is set to FlattenModeForce
// before adding it to the volume group.
for _, vol := range volumes {
err = vol.HandleParentImageExistence(ctx, flattenMode)
if err != nil {
err = fmt.Errorf("failed to handle parent image for volume group %q: %w", vg, err)

return nil, getGRPCError(err)
}
}

// add the new volumes to the group
for _, vol := range volumes {
err = vg.AddVolume(ctx, vol)
if err != nil {
return nil, status.Errorf(
Expand Down
5 changes: 5 additions & 0 deletions internal/csi-common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ func GetIDFromReplication(req interface{}) string {
if src != nil && src.GetVolume() != nil {
reqID = src.GetVolume().GetVolumeId()
}
if reqID == "" {
if src != nil && src.GetVolumegroup() != nil {
reqID = src.GetVolumegroup().GetVolumeGroupId()
}
}
if reqID == "" {
reqID = r.GetVolumeId() //nolint:nolintlint,staticcheck // req.VolumeId is deprecated
}
Expand Down
Loading

0 comments on commit 0da37f9

Please sign in to comment.