diff --git a/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/mocks/storage_mock.go b/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/mocks/storage_mock.go index e7f928d7af0..75a6fab9288 100644 --- a/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/mocks/storage_mock.go +++ b/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/mocks/storage_mock.go @@ -234,6 +234,15 @@ func (s *StorageMock) GetSnapshotMeta( return args.Get(0).(*storage.SnapshotMeta), args.Error(1) } +func (s *StorageMock) GetIncremental( + ctx context.Context, + snapshotID string, +) (string, string, error) { + + args := s.Called(ctx, snapshotID) + return args.String(0), args.String(1), args.Error(2) +} + //////////////////////////////////////////////////////////////////////////////// func NewStorageMock() *StorageMock { diff --git a/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage.go b/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage.go index a08d047afbe..94441f89a21 100644 --- a/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage.go +++ b/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage.go @@ -138,4 +138,9 @@ type Storage interface { ctx context.Context, snapshotID string, ) (*SnapshotMeta, error) + + GetIncremental( + ctx context.Context, + disk *types.Disk, + ) (string, string, error) } diff --git a/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage_legacy.go b/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage_legacy.go index c12b9e24418..da398631f1e 100644 --- a/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage_legacy.go +++ b/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage_legacy.go @@ -277,3 +277,11 @@ func (s *legacyStorage) GetSnapshotMeta( return nil, task_errors.NewNonRetriableErrorf("not implemented") } + +func (s *legacyStorage) GetIncremental( + ctx context.Context, + disk *types.Disk, +) (string, string, error) { + + return "", "", task_errors.NewNonRetriableErrorf("not implemented") +} diff --git a/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage_ydb.go b/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage_ydb.go index 572b2872e65..a321a9d183c 100644 --- a/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage_ydb.go +++ b/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage_ydb.go @@ -238,3 +238,22 @@ func (s *storageYDB) GetSnapshotMeta( ) return snapshotMeta, err } + +func (s *storageYDB) GetIncremental( + ctx context.Context, + disk *types.Disk, +) (snapshotID string, checkpointID string, err error) { + + err = s.db.Execute( + ctx, + func(ctx context.Context, session *persistence.Session) (err error) { + snapshotID, checkpointID, err = s.getIncremental( + ctx, + session, + disk, + ) + return err + }, + ) + return snapshotID, checkpointID, err +} diff --git a/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage_ydb_impl.go b/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage_ydb_impl.go index bf382c08a66..4892b222f97 100644 --- a/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage_ydb_impl.go +++ b/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage_ydb_impl.go @@ -37,6 +37,31 @@ func makeShardID(s string) uint64 { //////////////////////////////////////////////////////////////////////////////// func (s *storageYDB) getIncremental( + ctx context.Context, + session *persistence.Session, + disk *types.Disk, +) (snapshotID string, checkpointID string, err error) { + + tx, err := session.BeginRWTransaction(ctx) + if err != nil { + return "", "", err + } + defer tx.Rollback(ctx) + + snapshotID, checkpointID, err = s.getIncrementalTx(ctx, tx, disk) + if err != nil { + return "", "", err + } + + err = tx.Commit(ctx) + if err != nil { + return "", "", err + } + + return snapshotID, checkpointID, err +} + +func (s *storageYDB) getIncrementalTx( ctx context.Context, tx *persistence.Transaction, disk *types.Disk, @@ -142,7 +167,7 @@ func (s *storageYDB) createSnapshot( state.diskID = snapshotMeta.Disk.DiskId state.checkpointID = snapshotMeta.CheckpointID - baseSnapshotID, baseCheckpointID, err := s.getIncremental( + baseSnapshotID, baseCheckpointID, err := s.getIncrementalTx( ctx, tx, snapshotMeta.Disk, diff --git a/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage_ydb_test.go b/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage_ydb_test.go index 361c9e2fdf7..0e4750dfaee 100644 --- a/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage_ydb_test.go +++ b/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage_ydb_test.go @@ -415,6 +415,20 @@ func testCases() []differentChunkStorageTestCase { //////////////////////////////////////////////////////////////////////////////// +func checkIncremental( + t *testing.T, + f *fixture, + disk *types.Disk, + expectedSnapshotID string, + expectedCheckpointID string, +) { + + snapshotID, checkpointID, err := f.storage.GetIncremental(f.ctx, disk) + require.NoError(t, err) + require.Equal(t, expectedSnapshotID, snapshotID) + require.Equal(t, expectedCheckpointID, checkpointID) +} + func checkBaseSnapshot( t *testing.T, ctx context.Context, @@ -493,12 +507,14 @@ func TestSnapshotsCreateIncrementalSnapshot(t *testing.T) { f := createFixture(t) defer f.teardown() + disk := types.Disk{ + ZoneId: "zone", + DiskId: "disk", + } + snapshot1 := SnapshotMeta{ - ID: "snapshot1", - Disk: &types.Disk{ - ZoneId: "zone", - DiskId: "disk", - }, + ID: "snapshot1", + Disk: &disk, CheckpointID: "checkpoint1", CreateTaskID: "create1", } @@ -508,6 +524,7 @@ func TestSnapshotsCreateIncrementalSnapshot(t *testing.T) { require.NotNil(t, created) require.Empty(t, created.BaseSnapshotID) require.Empty(t, created.BaseCheckpointID) + checkIncremental(t, f, &disk, "", "") created, err = f.storage.CreateSnapshot(f.ctx, SnapshotMeta{ ID: "snapshot2", @@ -525,6 +542,7 @@ func TestSnapshotsCreateIncrementalSnapshot(t *testing.T) { err = f.storage.SnapshotCreated(f.ctx, snapshot1.ID, 0, 0, 0, nil) require.NoError(t, err) + checkIncremental(t, f, &disk, "snapshot1", "checkpoint1") snapshot3 := SnapshotMeta{ ID: "snapshot3", @@ -545,6 +563,7 @@ func TestSnapshotsCreateIncrementalSnapshot(t *testing.T) { err = f.storage.SnapshotCreated(f.ctx, snapshot3.ID, 0, 0, 0, nil) require.NoError(t, err) + checkIncremental(t, f, &disk, "snapshot3", "checkpoint3") snapshot4 := SnapshotMeta{ ID: "snapshot4", @@ -565,6 +584,7 @@ func TestSnapshotsCreateIncrementalSnapshot(t *testing.T) { err = f.storage.SnapshotCreated(f.ctx, snapshot4.ID, 0, 0, 0, nil) require.NoError(t, err) + checkIncremental(t, f, &disk, "snapshot4", "checkpoint4") _, err = f.storage.DeletingSnapshot(f.ctx, snapshot1.ID, "delete1") require.NoError(t, err) diff --git a/cloud/disk_manager/internal/pkg/facade/image_service_test/image_service_test.go b/cloud/disk_manager/internal/pkg/facade/image_service_test/image_service_test.go index 60f8e27a616..a4bf6f23cde 100644 --- a/cloud/disk_manager/internal/pkg/facade/image_service_test/image_service_test.go +++ b/cloud/disk_manager/internal/pkg/facade/image_service_test/image_service_test.go @@ -238,7 +238,7 @@ func testImageServiceCreateImageFromDiskWithKind( require.NoError(t, err) require.Equal(t, float64(1), meta.Progress) - testcommon.RequireCheckpointsAreEmpty(t, ctx, diskID) + testcommon.RequireCheckpoint(t, ctx, diskID, imageID) checkUnencryptedImage( t, @@ -1144,7 +1144,7 @@ func TestImageServiceCreateIncrementalImageFromDisk(t *testing.T) { err = internal_client.GetOperationMetadata(ctx, client, operation.Id, &meta) require.NoError(t, err) require.Equal(t, float64(1), meta.Progress) - testcommon.RequireCheckpointsAreEmpty(t, ctx, diskID1) + testcommon.RequireCheckpoint(t, ctx, diskID1, imageID1) nbsClient := testcommon.NewNbsTestingClient(t, ctx, "zone-a") waitForWrite, err := nbsClient.GoWriteRandomBlocksToNbsDisk(ctx, diskID1) @@ -1177,7 +1177,7 @@ func TestImageServiceCreateIncrementalImageFromDisk(t *testing.T) { err = internal_client.GetOperationMetadata(ctx, client, operation.Id, &meta) require.NoError(t, err) require.Equal(t, float64(1), meta.Progress) - testcommon.RequireCheckpointsAreEmpty(t, ctx, diskID1) + testcommon.RequireCheckpoint(t, ctx, diskID1, imageID2) testcommon.CheckBaseSnapshot(t, ctx, imageID2, imageID1) @@ -1223,6 +1223,6 @@ func TestImageServiceCreateIncrementalImageFromDisk(t *testing.T) { err = internal_client.WaitOperation(ctx, client, operation.Id) require.NoError(t, err) - testcommon.RequireCheckpointsAreEmpty(t, ctx, diskID1) + testcommon.RequireCheckpointsDoNotExist(t, ctx, diskID1) testcommon.CheckConsistency(t, ctx) } diff --git a/cloud/disk_manager/internal/pkg/facade/snapshot_service_test/snapshot_service_test.go b/cloud/disk_manager/internal/pkg/facade/snapshot_service_test/snapshot_service_test.go index 3b28520a8ee..8d40b4b2a68 100644 --- a/cloud/disk_manager/internal/pkg/facade/snapshot_service_test/snapshot_service_test.go +++ b/cloud/disk_manager/internal/pkg/facade/snapshot_service_test/snapshot_service_test.go @@ -12,6 +12,7 @@ import ( "github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/clients/nbs" "github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/common" "github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/facade/testcommon" + "github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/types" ) //////////////////////////////////////////////////////////////////////////////// @@ -77,7 +78,7 @@ func testCreateSnapshotFromDisk( require.NoError(t, err) require.Equal(t, float64(1), meta.Progress) - testcommon.RequireCheckpointsAreEmpty(t, ctx, diskID) + testcommon.RequireCheckpoint(t, ctx, diskID, snapshotID) reqCtx = testcommon.GetRequestContext(t, ctx) operation, err = client.DeleteSnapshot(reqCtx, &disk_manager.DeleteSnapshotRequest{ @@ -604,17 +605,22 @@ func TestSnapshotServiceDeleteIncrementalSnapshotWhileCreating(t *testing.T) { err = internal_client.WaitOperation(ctx, client, deleteOperation.Id) require.NoError(t, err) - //nolint:sa9003 - // TODO: remove line above after - // https://github.com/ydb-platform/nbs/issues/2008 if creationErr == nil { - testcommon.RequireCheckpointsAreEmpty(t, ctx, diskID) + testcommon.RequireCheckpointsDoNotExist(t, ctx, diskID) } else { - // Checkpoint that corresponds to base snapshot should not be deleted. - // NOTE: we use snapshot id as checkpoint id. - // TODO: enable this check after resolving issue - // https://github.com/ydb-platform/nbs/issues/2008. - // testcommon.RequireCheckpoint(t, ctx, diskID, baseSnapshotID) + snapshotID, _, err := testcommon.GetIncremental(ctx, &types.Disk{ + ZoneId: "zone-a", + DiskId: diskID, + }) + require.NoError(t, err) + + // If there is a record about this disk left in incrementality table, + // checkpoint that corresponds to base snapshot should not be deleted. + if len(snapshotID) > 0 { + testcommon.RequireCheckpoint(t, ctx, diskID, baseSnapshotID) + } else { + testcommon.RequireCheckpointsDoNotExist(t, ctx, diskID) + } } snapshotID2 := t.Name() + "2" @@ -702,7 +708,7 @@ func TestSnapshotServiceDeleteSnapshot(t *testing.T) { err = internal_client.WaitOperation(ctx, client, operation2.Id) require.NoError(t, err) - testcommon.RequireCheckpointsAreEmpty(t, ctx, diskID) + testcommon.RequireCheckpointsDoNotExist(t, ctx, diskID) testcommon.CheckConsistency(t, ctx) } @@ -758,13 +764,13 @@ func TestSnapshotServiceDeleteSnapshotWhenCreationIsInFlight(t *testing.T) { err = internal_client.WaitOperation(ctx, client, operation.Id) require.NoError(t, err) - _ = internal_client.WaitOperation(ctx, client, createOp.Id) + createErr := internal_client.WaitOperation(ctx, client, createOp.Id) - // Should wait here because checkpoint is deleted on |createOp| operation - // cancel (and exact time of this event is unknown). - // TODO: enable this check after resolving issue - // https://github.com/ydb-platform/nbs/issues/2008. - // testcommon.WaitForCheckpointsAreEmpty(t, ctx, diskID) + if createErr != nil { + // Should wait here because checkpoint is deleted on |createOp| operation + // cancel (and exact time of this event is unknown). + testcommon.WaitForCheckpointsAreEmpty(t, ctx, diskID) + } testcommon.CheckConsistency(t, ctx) } diff --git a/cloud/disk_manager/internal/pkg/facade/testcommon/common.go b/cloud/disk_manager/internal/pkg/facade/testcommon/common.go index 1afe621460e..46bb33871f9 100644 --- a/cloud/disk_manager/internal/pkg/facade/testcommon/common.go +++ b/cloud/disk_manager/internal/pkg/facade/testcommon/common.go @@ -353,7 +353,7 @@ func RequireCheckpoint( require.EqualValues(t, checkpointID, checkpoints[0]) } -func RequireCheckpointsAreEmpty( +func RequireCheckpointsDoNotExist( t *testing.T, ctx context.Context, diskID string, @@ -626,6 +626,19 @@ func CheckConsistency(t *testing.T, ctx context.Context) { //////////////////////////////////////////////////////////////////////////////// +func GetIncremental( + ctx context.Context, + disk *types.Disk, +) (string, string, error) { + + storage, err := newSnapshotStorage(ctx) + if err != nil { + return "", "", err + } + + return storage.GetIncremental(ctx, disk) +} + func GetEncryptionKeyHash(encryptionDesc *types.EncryptionDesc) ([]byte, error) { switch key := encryptionDesc.Key.(type) { case *types.EncryptionDesc_KeyHash: