Skip to content

Commit

Permalink
rbd: fix resync issue
Browse files Browse the repository at this point in the history
During the Demote volume store
the image creation timestamp.

During Resync do below operation

* Check image creation timestamp
stored during Demote operation
and current creation timestamp during Resync
and check both are equal and its for
force resync then issue resync
* If the image on both sides is
not in unknown state, check
last_snapshot_timestamp on the
local mirror description, if its present
send volumeReady as false or else return
error message.

If both the images are in up+unknown the
send volumeReady as true.

Signed-off-by: Madhu Rajanna <[email protected]>
  • Loading branch information
Madhu-1 authored and mergify[bot] committed Aug 30, 2023
1 parent 4016876 commit e013cfe
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 111 deletions.
121 changes: 102 additions & 19 deletions internal/csi-addons/rbd/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ const (
// imageMirrorModeJournal uses journaling to propagate RBD images between
// ceph clusters.
imageMirrorModeJournal imageMirroringMode = "journal"

// imageCreationTimeKey is the key to get/set the image creation timestamp
// on the image metadata. The key is starting with `.rbd` so that it will
// not get replicated to remote cluster.
imageCreationTimeKey = ".rbd.image.creation_time"
)

const (
Expand Down Expand Up @@ -480,6 +485,14 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context,

return nil, err
}

creationTime, err := rbdVol.GetImageCreationTime()
if err != nil {
log.ErrorLog(ctx, err.Error())

return nil, status.Error(codes.Internal, err.Error())
}

mirroringInfo, err := rbdVol.GetImageMirroringInfo()
if err != nil {
log.ErrorLog(ctx, err.Error())
Expand All @@ -497,6 +510,17 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context,

// demote image to secondary
if mirroringInfo.Primary {
// store the image creation time for resync
_, err = rbdVol.GetMetadata(imageCreationTimeKey)
if err != nil && errors.Is(err, librbd.ErrNotFound) {
err = rbdVol.SetMetadata(imageCreationTimeKey, timestampToString(creationTime))
}
if err != nil {
log.ErrorLog(ctx, err.Error())

return nil, status.Error(codes.Internal, err.Error())
}

err = rbdVol.DemoteImage()
if err != nil {
log.ErrorLog(ctx, err.Error())
Expand Down Expand Up @@ -538,6 +562,8 @@ func checkRemoteSiteStatus(ctx context.Context, mirrorStatus *librbd.GlobalMirro
// ResyncVolume extracts the RBD volume information from the volumeID, If the
// image is present, mirroring is enabled and the image is in demoted state.
// If yes it will resync the image to correct the split-brain.
//
//nolint:gocyclo,cyclop // TODO: reduce complexity
func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
req *replication.ResyncVolumeRequest,
) (*replication.ResyncVolumeResponse, error) {
Expand Down Expand Up @@ -578,7 +604,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
// it takes time for this operation.
log.ErrorLog(ctx, err.Error())

return nil, status.Error(codes.Aborted, err.Error())
return nil, status.Errorf(codes.Aborted, err.Error())
}

if mirroringInfo.State != librbd.MirrorImageEnabled {
Expand Down Expand Up @@ -637,14 +663,40 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
ready = checkRemoteSiteStatus(ctx, mirrorStatus)
}

err = rbdVol.ResyncVol(localStatus, req.Force)
creationTime, err := rbdVol.GetImageCreationTime()
if err != nil {
return nil, getGRPCError(err)
return nil, status.Errorf(codes.Internal, "failed to get image info for %s: %s", rbdVol, err.Error())
}

err = checkVolumeResyncStatus(localStatus)
// image creation time is stored in the image metadata. it looks like
// `"seconds:1692879841 nanos:631526669"`
savedImageTime, err := rbdVol.GetMetadata(imageCreationTimeKey)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
return nil, status.Errorf(codes.Internal,
"failed to get %s key from image metadata for %s: %s",
imageCreationTimeKey,
rbdVol,
err.Error())
}

st, err := timestampFromString(savedImageTime)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to parse image creation time: %s", err.Error())
}
log.DebugLog(ctx, "image %s, savedImageTime=%v, currentImageTime=%v", rbdVol, st, creationTime.AsTime())

if req.Force && st.Equal(creationTime.AsTime()) {
err = rbdVol.ResyncVol(localStatus)
if err != nil {
return nil, getGRPCError(err)
}
}

if !ready {
err = checkVolumeResyncStatus(localStatus)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
}

err = rbdVol.RepairResyncedImageID(ctx, ready)
Expand All @@ -659,6 +711,40 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
return resp, nil
}

// timestampToString converts the time.Time object to string.
func timestampToString(st *timestamppb.Timestamp) string {
return fmt.Sprintf("seconds:%d nanos:%d", st.Seconds, st.Nanos)
}

// timestampFromString parses the timestamp string and returns the time.Time
// object.
func timestampFromString(timestamp string) (time.Time, error) {
st := time.Time{}
parts := strings.Fields(timestamp)
if len(parts) != 2 {
return st, fmt.Errorf("failed to parse image creation time: %s", timestamp)
}
if len(strings.Split(parts[0], ":")) != 2 || len(strings.Split(parts[1], ":")) != 2 {
return st, fmt.Errorf("failed to parse image creation time: %s", timestamp)
}
secondsStr := strings.Split(parts[0], ":")[1]
nanosStr := strings.Split(parts[1], ":")[1]

seconds, err := strconv.ParseInt(secondsStr, 10, 64)
if err != nil {
return st, fmt.Errorf("failed to parse image creation time seconds: %s", err.Error())
}

nanos, err := strconv.ParseInt(nanosStr, 10, 32)
if err != nil {
return st, fmt.Errorf("failed to parse image creation time nenos: %s", err.Error())
}

st = time.Unix(seconds, nanos)

return st, nil
}

func getGRPCError(err error) error {
if err == nil {
return status.Error(codes.OK, codes.OK.String())
Expand Down Expand Up @@ -854,20 +940,17 @@ func getLastSyncInfo(description string) (*replication.GetVolumeReplicationInfoR
}

func checkVolumeResyncStatus(localStatus librbd.SiteMirrorImageStatus) error {
// we are considering 2 states to check resync started and resync completed
// as below. all other states will be considered as an error state so that
// cephCSI can return error message and volume replication operator can
// mark the VolumeReplication status as not resyncing for the volume.

// If the state is Replaying means the resync is going on.
// Once the volume on remote cluster is demoted and resync
// is completed the image state will be moved to UNKNOWN.
// RBD mirror daemon should be always running on the primary cluster.
if !localStatus.Up || (localStatus.State != librbd.MirrorImageStatusStateReplaying &&
localStatus.State != librbd.MirrorImageStatusStateUnknown) {
return fmt.Errorf(
"not resyncing. Local status: daemon up=%t image is in %q state",
localStatus.Up, localStatus.State)
// we are considering local snapshot timestamp to check if the resync is
// started or not, if we dont see local_snapshot_timestamp in the
// description of localStatus, we are returning error. if we see the local
// snapshot timestamp in the description we return resyncing started.
description := localStatus.Description
resp, err := getLastSyncInfo(description)
if err != nil {
return fmt.Errorf("failed to get last sync info: %w", err)
}
if resp.LastSyncTime == nil {
return errors.New("last sync time is nil")
}

return nil
Expand Down
127 changes: 70 additions & 57 deletions internal/csi-addons/rbd/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,74 +225,26 @@ func TestCheckVolumeResyncStatus(t *testing.T) {
wantErr bool
}{
{
name: "test when rbd mirror daemon is not running",
name: "test when local_snapshot_timestamp is non zero",
args: librbd.SiteMirrorImageStatus{
State: librbd.MirrorImageStatusStateUnknown,
Up: false,
},
wantErr: true,
},
{
name: "test for unknown state",
args: librbd.SiteMirrorImageStatus{
State: librbd.MirrorImageStatusStateUnknown,
Up: true,
},
wantErr: false,
},
{
name: "test for error state",
args: librbd.SiteMirrorImageStatus{
State: librbd.MirrorImageStatusStateError,
Up: true,
},
wantErr: true,
},
{
name: "test for syncing state",
args: librbd.SiteMirrorImageStatus{
State: librbd.MirrorImageStatusStateSyncing,
Up: true,
},
wantErr: true,
},
{
name: "test for starting_replay state",
args: librbd.SiteMirrorImageStatus{
State: librbd.MirrorImageStatusStateStartingReplay,
Up: true,
},
wantErr: true,
},
{
name: "test for replaying state",
args: librbd.SiteMirrorImageStatus{
State: librbd.MirrorImageStatusStateReplaying,
Up: true,
//nolint:lll // sample output cannot be split into multiple lines.
Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
},
wantErr: false,
},
{
name: "test for stopping_replay state",
args: librbd.SiteMirrorImageStatus{
State: librbd.MirrorImageStatusStateStoppingReplay,
Up: true,
},
wantErr: true,
},
{
name: "test for stopped state",
name: "test when local_snapshot_timestamp is zero",
//nolint:lll // sample output cannot be split into multiple lines.
args: librbd.SiteMirrorImageStatus{
State: librbd.MirrorImageStatusStateStopped,
Up: true,
Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":0,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
},
wantErr: true,
},
{
name: "test for invalid state",
name: "test when local_snapshot_timestamp is not present",
//nolint:lll // sample output cannot be split into multiple lines.
args: librbd.SiteMirrorImageStatus{
State: librbd.MirrorImageStatusState(100),
Up: true,
Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
},
wantErr: true,
},
Expand Down Expand Up @@ -644,3 +596,64 @@ func TestGetGRPCError(t *testing.T) {
})
}
}

func Test_timestampFromString(t *testing.T) {
tm := timestamppb.Now()
t.Parallel()
tests := []struct {
name string
timestamp string
want time.Time
wantErr bool
}{
{
name: "valid timestamp",
timestamp: timestampToString(tm),
want: tm.AsTime().Local(),
wantErr: false,
},
{
name: "invalid timestamp",
timestamp: "invalid",
want: time.Time{},
wantErr: true,
},
{
name: "empty timestamp",
timestamp: "",
want: time.Time{},
wantErr: true,
},
{
name: "invalid format",
timestamp: "seconds:%d nanos:%d",
want: time.Time{},
wantErr: true,
},
{
name: "missing nanos",
timestamp: "seconds:10",
want: time.Time{},
wantErr: true,
},
{
name: "missing seconds",
timestamp: "nanos:0",
want: time.Time{},
wantErr: true,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
got, err := timestampFromString(tt.timestamp)
if (err != nil) != tt.wantErr {
t.Errorf("timestampFromString() error = %v, wantErr %v", err, tt.wantErr)
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("timestampFromString() = %v, want %v", got, tt.want)
}
})
}
}
13 changes: 13 additions & 0 deletions internal/rbd/rbd_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1558,6 +1558,19 @@ func (rv *rbdVolume) setImageOptions(ctx context.Context, options *librbd.ImageO
return nil
}

// GetImageCreationTime returns the creation time of the image. if the image
// creation time is not set, it queries the image info and returns the creation time.
func (ri *rbdImage) GetImageCreationTime() (*timestamppb.Timestamp, error) {
if ri.CreatedAt != nil {
return ri.CreatedAt, nil
}
if err := ri.getImageInfo(); err != nil {
return nil, err
}

return ri.CreatedAt, nil
}

// getImageInfo queries rbd about the given image and returns its metadata, and returns
// ErrImageNotFound if provided image is not found.
func (ri *rbdImage) getImageInfo() error {
Expand Down
Loading

0 comments on commit e013cfe

Please sign in to comment.