Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Syncing latest changes from upstream devel for ceph-csi #352

Merged
merged 11 commits into from
Aug 1, 2024
1 change: 1 addition & 0 deletions PendingReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@
- cephfs: support omap data store in radosnamespace via cli argument in [PR](https://github.com/ceph/ceph-csi/pull/4652)
- deploy: radosNamespaceCephFS can be configured for ceph-csi-cephfs chart in [PR](https://github.com/ceph/ceph-csi/pull/4652)
- build: update ceph release to squid in [PR](https://github.com/ceph/ceph-csi/pull/4735)
- build: CentOS Stream 9 is used as OS in the container-images [PR](https://github.com/ceph/ceph-csi/pull/4735)

## NOTE
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/ceph/ceph-csi/api v0.0.0-00010101000000-000000000000
github.com/ceph/go-ceph v0.28.0
github.com/container-storage-interface/spec v1.10.0
github.com/csi-addons/spec v0.2.1-0.20240718113938-dc98b454ba65
github.com/csi-addons/spec v0.2.1-0.20240730084235-3958a5b17d24
github.com/gemalto/kmip-go v0.0.10
github.com/golang/protobuf v1.5.4
github.com/google/fscrypt v0.3.6-0.20240502174735-068b9f8f5dec
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,8 @@ github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ
github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4=
github.com/csi-addons/spec v0.2.1-0.20240718113938-dc98b454ba65 h1:i9JGGQTEmRQXSpQQPR96+DV4D4o+V1+gjAWf+bpxQxk=
github.com/csi-addons/spec v0.2.1-0.20240718113938-dc98b454ba65/go.mod h1:Mwq4iLiUV4s+K1bszcWU6aMsR5KPsbIYzzszJ6+56vI=
github.com/csi-addons/spec v0.2.1-0.20240730084235-3958a5b17d24 h1:tJETaYbnnzlCSaqDXQzbszYyuAtG/sFzm6DargeVzJA=
github.com/csi-addons/spec v0.2.1-0.20240730084235-3958a5b17d24/go.mod h1:Mwq4iLiUV4s+K1bszcWU6aMsR5KPsbIYzzszJ6+56vI=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
Expand Down
89 changes: 23 additions & 66 deletions internal/csi-addons/rbd/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,16 +282,7 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,

rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
if err != nil {
switch {
case errors.Is(err, corerbd.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID)
default:
err = status.Errorf(codes.Internal, err.Error())
}

return nil, err
return nil, getGRPCError(err)
}
mirror, err := rbdVol.ToMirror()
if err != nil {
Expand All @@ -309,7 +300,7 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
return nil, err
}

info, err := mirror.GetMirroringInfo()
info, err := mirror.GetMirroringInfo(ctx)
if err != nil {
log.ErrorLog(ctx, err.Error())

Expand All @@ -322,7 +313,7 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,

return nil, getGRPCError(err)
}
err = mirror.EnableMirroring(mirroringMode)
err = mirror.EnableMirroring(ctx, mirroringMode)
if err != nil {
log.ErrorLog(ctx, err.Error())

Expand Down Expand Up @@ -361,16 +352,7 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,

rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
if err != nil {
switch {
case errors.Is(err, corerbd.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID)
default:
err = status.Errorf(codes.Internal, err.Error())
}

return nil, err
return nil, getGRPCError(err)
}
mirror, err := rbdVol.ToMirror()
if err != nil {
Expand All @@ -383,7 +365,7 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
return nil, err
}

info, err := mirror.GetMirroringInfo()
info, err := mirror.GetMirroringInfo(ctx)
if err != nil {
log.ErrorLog(ctx, err.Error())

Expand All @@ -396,7 +378,7 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
case librbd.MirrorImageDisabling.String():
return nil, status.Errorf(codes.Aborted, "%s is in disabling state", volumeID)
case librbd.MirrorImageEnabled.String():
err = corerbd.DisableVolumeReplication(mirror, info.IsPrimary(), force)
err = corerbd.DisableVolumeReplication(mirror, ctx, info.IsPrimary(), force)
if err != nil {
return nil, getGRPCError(err)
}
Expand Down Expand Up @@ -438,23 +420,14 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,

rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
if err != nil {
switch {
case errors.Is(err, corerbd.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID)
default:
err = status.Errorf(codes.Internal, err.Error())
}

return nil, err
return nil, getGRPCError(err)
}
mirror, err := rbdVol.ToMirror()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

info, err := mirror.GetMirroringInfo()
info, err := mirror.GetMirroringInfo(ctx)
if err != nil {
log.ErrorLog(ctx, err.Error())

Expand All @@ -474,9 +447,9 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
if req.GetForce() {
// workaround for https://github.com/ceph/ceph-csi/issues/2736
// TODO: remove this workaround when the issue is fixed
err = mirror.ForcePromote(cr)
err = mirror.ForcePromote(ctx, cr)
} else {
err = mirror.Promote(req.GetForce())
err = mirror.Promote(ctx, req.GetForce())
}
if err != nil {
log.ErrorLog(ctx, err.Error())
Expand Down Expand Up @@ -540,16 +513,7 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context,

rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
if err != nil {
switch {
case errors.Is(err, corerbd.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID)
default:
err = status.Errorf(codes.Internal, err.Error())
}

return nil, err
return nil, getGRPCError(err)
}
mirror, err := rbdVol.ToMirror()
if err != nil {
Expand All @@ -563,7 +527,7 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context,
return nil, status.Error(codes.Internal, err.Error())
}

info, err := mirror.GetMirroringInfo()
info, err := mirror.GetMirroringInfo(ctx)
if err != nil {
log.ErrorLog(ctx, err.Error())

Expand Down Expand Up @@ -592,7 +556,7 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context,
return nil, status.Error(codes.Internal, err.Error())
}

err = mirror.Demote()
err = mirror.Demote(ctx)
if err != nil {
log.ErrorLog(ctx, err.Error())

Expand Down Expand Up @@ -659,23 +623,14 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,

rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
if err != nil {
switch {
case errors.Is(err, corerbd.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID)
default:
err = status.Errorf(codes.Internal, err.Error())
}

return nil, err
return nil, getGRPCError(err)
}
mirror, err := rbdVol.ToMirror()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

info, err := mirror.GetMirroringInfo()
info, err := mirror.GetMirroringInfo(ctx)
if err != nil {
// in case of Resync the image will get deleted and gets recreated and
// it takes time for this operation.
Expand All @@ -693,7 +648,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
return nil, status.Error(codes.InvalidArgument, "image is in primary state")
}

sts, err := mirror.GetGlobalMirroringStatus()
sts, err := mirror.GetGlobalMirroringStatus(ctx)
if err != nil {
// the image gets recreated after issuing resync
if errors.Is(err, corerbd.ErrImageNotFound) {
Expand Down Expand Up @@ -764,7 +719,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
}
log.DebugLog(ctx, "image %s, savedImageTime=%v, currentImageTime=%v", rbdVol, st, creationTime.AsTime())
if req.GetForce() && st.Equal(creationTime.AsTime()) {
err = mirror.Resync()
err = mirror.Resync(ctx)
if err != nil {
return nil, getGRPCError(err)
}
Expand Down Expand Up @@ -830,6 +785,8 @@ func getGRPCError(err error) error {
}

errorStatusMap := map[error]codes.Code{
corerbd.ErrImageNotFound: codes.NotFound,
util.ErrPoolNotFound: codes.NotFound,
corerbd.ErrInvalidArgument: codes.InvalidArgument,
corerbd.ErrFlattenInProgress: codes.Aborted,
corerbd.ErrAborted: codes.Aborted,
Expand Down Expand Up @@ -875,9 +832,9 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
if err != nil {
switch {
case errors.Is(err, corerbd.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
err = status.Errorf(codes.NotFound, err.Error())
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID)
err = status.Errorf(codes.NotFound, err.Error())
default:
err = status.Errorf(codes.Internal, err.Error())
}
Expand All @@ -889,7 +846,7 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
return nil, status.Error(codes.Internal, err.Error())
}

info, err := mirror.GetMirroringInfo()
info, err := mirror.GetMirroringInfo(ctx)
if err != nil {
log.ErrorLog(ctx, err.Error())

Expand All @@ -905,7 +862,7 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
return nil, status.Error(codes.InvalidArgument, "image is not in primary state")
}

mirrorStatus, err := mirror.GetGlobalMirroringStatus()
mirrorStatus, err := mirror.GetGlobalMirroringStatus(ctx)
if err != nil {
if errors.Is(err, corerbd.ErrImageNotFound) {
return nil, status.Error(codes.Aborted, err.Error())
Expand Down
11 changes: 11 additions & 0 deletions internal/csi-addons/rbd/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

corerbd "github.com/ceph/ceph-csi/internal/rbd"
"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util"

librbd "github.com/ceph/go-ceph/rbd"
"github.com/ceph/go-ceph/rbd/admin"
Expand Down Expand Up @@ -594,6 +595,16 @@ func TestGetGRPCError(t *testing.T) {
err: nil,
expectedErr: status.Error(codes.OK, "ok string"),
},
{
name: "ErrImageNotFound",
err: corerbd.ErrImageNotFound,
expectedErr: status.Error(codes.NotFound, corerbd.ErrImageNotFound.Error()),
},
{
name: "ErrPoolNotFound",
err: util.ErrPoolNotFound,
expectedErr: status.Error(codes.NotFound, util.ErrPoolNotFound.Error()),
},
}

for _, tt := range tests {
Expand Down
59 changes: 52 additions & 7 deletions internal/csi-addons/rbd/volumegroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package rbd

import (
"context"
"fmt"
"slices"

"github.com/ceph/ceph-csi/internal/rbd"
Expand Down Expand Up @@ -87,6 +88,11 @@ func (vs *VolumeGroupServer) CreateVolumeGroup(

// resolve all volumes
volumes := make([]types.Volume, len(req.GetVolumeIds()))
defer func() {
for _, vol := range volumes {
vol.Destroy(ctx)
}
}()
for i, id := range req.GetVolumeIds() {
vol, err := mgr.GetVolumeByID(ctx, id)
if err != nil {
Expand All @@ -97,9 +103,6 @@ func (vs *VolumeGroupServer) CreateVolumeGroup(
req.GetName(),
err.Error())
}

//nolint:gocritic // need to call .Destroy() for all volumes
defer vol.Destroy(ctx)
volumes[i] = vol
}

Expand All @@ -117,6 +120,21 @@ func (vs *VolumeGroupServer) CreateVolumeGroup(

log.DebugLog(ctx, "VolumeGroup %q has been created: %+v", req.GetName(), vg)

// extract the flatten mode
flattenMode, err := getFlattenMode(ctx, req.GetParameters())
if err != nil {
return nil, err
}
// Flatten the image if the flatten mode is set to FlattenModeForce
// before adding it to the volume group.
for _, v := range volumes {
err = v.HandleParentImageExistence(ctx, flattenMode)
if err != nil {
err = fmt.Errorf("failed to handle parent image for volume group %q: %w", vg, err)

return nil, getGRPCError(err)
}
}
// add each rbd-image to the RBDVolumeGroup
for _, vol := range volumes {
err = vg.AddVolume(ctx, vol)
Expand Down Expand Up @@ -328,17 +346,44 @@ func (vs *VolumeGroupServer) ModifyVolumeGroupMembership(
}
}

// add the new volumes to the group
for _, id := range toAdd {
vol, getErr := mgr.GetVolumeByID(ctx, id)
if getErr != nil {
// resolve all volumes
volumes := make([]types.Volume, len(toAdd))
defer func() {
for _, vol := range volumes {
vol.Destroy(ctx)
}
}()
for i, id := range toAdd {
var vol types.Volume
vol, err = mgr.GetVolumeByID(ctx, id)
if err != nil {
return nil, status.Errorf(
codes.NotFound,
"failed to find a volume with CSI ID %q: %v",
id,
err)
}
volumes[i] = vol
}

// extract the flatten mode
flattenMode, err := getFlattenMode(ctx, req.GetParameters())
if err != nil {
return nil, err
}
// Flatten the image if the flatten mode is set to FlattenModeForce
// before adding it to the volume group.
for _, vol := range volumes {
err = vol.HandleParentImageExistence(ctx, flattenMode)
if err != nil {
err = fmt.Errorf("failed to handle parent image for volume group %q: %w", vg, err)

return nil, getGRPCError(err)
}
}

// add the new volumes to the group
for _, vol := range volumes {
err = vg.AddVolume(ctx, vol)
if err != nil {
return nil, status.Errorf(
Expand Down
5 changes: 5 additions & 0 deletions internal/csi-common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ func GetIDFromReplication(req interface{}) string {
if src != nil && src.GetVolume() != nil {
reqID = src.GetVolume().GetVolumeId()
}
if reqID == "" {
if src != nil && src.GetVolumegroup() != nil {
reqID = src.GetVolumegroup().GetVolumeGroupId()
}
}
if reqID == "" {
reqID = r.GetVolumeId() //nolint:nolintlint,staticcheck // req.VolumeId is deprecated
}
Expand Down
Loading