Skip to content

Commit

Permalink
integration/test_storage: Add tests for StorageVolumeSnapshot
Browse files Browse the repository at this point in the history
Signed-off-by: hamistao <[email protected]>
  • Loading branch information
hamistao committed Jun 12, 2024
1 parent a5c71f0 commit 4b50799
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 8 deletions.
86 changes: 85 additions & 1 deletion integration/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def setUp(self):
super().setUp()

if not self.client.has_api_extension("storage"):
self.skipTest("Required \'storage\' LXD API extension not available!")
self.skipTest("Required 'storage' LXD API extension not available!")


class TestStoragePools(StorageTestCase):
Expand Down Expand Up @@ -152,3 +152,87 @@ def test_patch(self):
# as we're not using ZFS (and can't in these integration tests) we
# can't really patch anything on a dir volume.
pass


class TestStorageVolumeSnapshot(StorageTestCase):
"""Tests for :py:class:`pylxd.models.storage_pool.StorageVolumeSnapshot"""

def setUp(self):
super().setUp()

if not self.client.has_api_extension("storage_api_volume_snapshots"):
self.skipTest(
"Required 'storage_api_volume_snapshots' LXD API extension not available!"
)

def test_create_get_restore_delete_volume_snapshot(self):
# Create pool and volume
pool = self.create_storage_pool()
self.addCleanup(self.delete_storage_volume, pool)

volume = self.create_storage_volume(pool)
self.addCleanup(self.delete_storage_volume, pool, volume.name)

# Create a few snapshots
first_snapshot = volume.snapshots.create()
self.assertEqual(first_snapshot.name, "snap0")

second_snapshot = volume.snapshots.create()
self.assertEqual(second_snapshot.name, "snap1")

# Try restoring the volume from one of the snapshots
first_snapshot.restore()
volume.restore("snap0")

# Create new snapshot with defined name and expiration date
custom_snapshot_name = "custom-snapshot"
custom_snapshot_expiry_date = "1983-06-16T00:00:00-00:00"

custom_snapshot = volume.snapshots.create(
name=custom_snapshot_name, expires_at=custom_snapshot_expiry_date
)
self.assertTrue(custom_snapshot.is_expired())
self.assertEqual(custom_snapshot.name, custom_snapshot_name)
self.assertEqual(custom_snapshot.expires_at, custom_snapshot_expiry_date)

# Try getting a snapshot
custom_snapshot_copy = volume.snapshots.get(custom_snapshot_name)
self.assertEqual(custom_snapshot, custom_snapshot_copy)

# Get all snapshots from the volume
all_snapshots = volume.snapshots.all()
self.assertEqual(len(all_snapshots), 3)

all_names = [snapshot.name for snapshot in all_snapshots]
for snapshot_name in ["snap0", "snap1", custom_snapshot_name]:
self.assertIn(snapshot_name, all_names)

# Delete a snapshot
second_snapshot.delete()

self.assertFalse(volume.snapshots.exists(second_snapshot.name))

self.assertRaises(
exceptions.NotFound, volume.snapshots.get, custom_snapshot_name
)

all_snapshots = volume.snapshots.all()
self.assertEqual(len(all_snapshots), 2)

all_names = [snapshot.name for snapshot in all_snapshots]
for snapshot_name in ["snap0", custom_snapshot_name]:
self.assertIn(snapshot_name, all_names)

self.assertFalse("snap1", all_names)

@unittest.skip("Can't test PUT on volume snapshots yet")
def test_put(self):
pass

@unittest.skip("Can't test .save() on volume snapshots yet")
def test_save(self):
pass

@unittest.skip("Can't test PATCH on volume snapshots yet")
def test_patch(self):
pass
12 changes: 5 additions & 7 deletions integration/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,11 @@ def delete_storage_pool(self, name):
pass

def create_storage_volume(self, pool_name):
# create a storage volume with a name in the form 'xxx1' on the provided pool.
volume_name = "".join(random.sample(string.ascii_lowercase, 3)) + "1"
try:
pool = self.lxd.storage_pools.get(pool_name)
return pool.volumes.create(volume_name)
except:
return ""
# create a storage volume with a name in the form 'volume-xxxxxx' on the provided pool.
volume_name = "-".join("volume", random.sample(string.ascii_lowercase, 6))
pool = self.lxd.storage_pools.get(pool_name)
volume = pool.volumes.create(volume_name)
return volume.name

def delete_storage_volume(self, pool_name, volume_name):
try:
Expand Down

0 comments on commit 4b50799

Please sign in to comment.