diff --git a/kvmagent/kvmagent/plugins/shared_block_plugin.py b/kvmagent/kvmagent/plugins/shared_block_plugin.py index dc0a5f4220..bdb71b8906 100755 --- a/kvmagent/kvmagent/plugins/shared_block_plugin.py +++ b/kvmagent/kvmagent/plugins/shared_block_plugin.py @@ -17,6 +17,7 @@ from zstacklib.utils import linux from zstacklib.utils import lock from zstacklib.utils import lvm +from zstacklib.utils import list_ops from zstacklib.utils import bash from zstacklib.utils import qemu_img, qcow2 from zstacklib.utils import traceable_shell @@ -353,6 +354,7 @@ class SharedBlockPlugin(kvmagent.KvmAgent): CONVERT_VOLUME_FORMAT_PATH = "/sharedblock/volume/convertformat" SHRINK_SNAPSHOT_PATH = "/sharedblock/snapshot/shrink" GET_QCOW2_HASH_VALUE_PATH = "/sharedblock/getqcow2hash" + CHECK_LOCK_PATH = "/sharedblock/lock/check" vgs_in_progress = set() vg_size = {} @@ -401,6 +403,7 @@ def start(self): http_server.register_async_uri(self.GET_DOWNLOAD_BITS_FROM_KVM_HOST_PROGRESS_PATH, self.get_download_bits_from_kvmhost_progress) http_server.register_async_uri(self.SHRINK_SNAPSHOT_PATH, self.shrink_snapshot) http_server.register_async_uri(self.GET_QCOW2_HASH_VALUE_PATH, self.get_qcow2_hashvalue) + http_server.register_async_uri(self.CHECK_LOCK_PATH, self.check_lock) self.imagestore_client = ImageStoreClient() @@ -444,10 +447,10 @@ def get_disk_paths(disks): return diskPaths - def create_vg_if_not_found(self, vgUuid, disks, hostUuid, allDisks, forceWipe=False): + def create_vg_if_not_found(self, vgUuid, disks, hostUuid, allDisks, forceWipe=False, is_first_create_vg=False): # type: (str, set([CheckDisk]), str, set([CheckDisk]), bool) -> bool @linux.retry(times=5, sleep_time=random.uniform(0.1, 3)) - def find_vg(vgUuid, raise_exception = True): + def find_vg(vgUuid, raise_exception=True): cmd = shell.ShellCmd("timeout 5 vgscan --ignorelockingfailure; vgs --nolocking -t %s -otags | grep %s" % (vgUuid, INIT_TAG)) cmd(is_exception=False) if cmd.return_code != 0 and raise_exception: @@ -457,14 +460,17 @@ def find_vg(vgUuid, raise_exception = True): return True @linux.retry(times=3, sleep_time=random.uniform(0.1, 3)) - def create_vg(hostUuid, vgUuid, diskPaths, raise_excption = True): + def create_vg(hostUuid, vgUuid, diskPaths, raise_exception=True): + if not is_first_create_vg: + raise Exception("vg %s has already been created before, and there may be a risk of data loss during " + "secondary creation. Please check your storage" % vgUuid) cmd = shell.ShellCmd("vgcreate -qq --shared --addtag '%s::%s::%s::%s' --metadatasize %s %s %s" % (INIT_TAG, hostUuid, time.time(), linux.get_hostname(), DEFAULT_VG_METADATA_SIZE, vgUuid, " ".join(diskPaths))) cmd(is_exception=False) logger.debug("created vg %s, ret: %s, stdout: %s, stderr: %s" % (vgUuid, cmd.return_code, cmd.stdout, cmd.stderr)) - if cmd.return_code != 0 and raise_excption: + if cmd.return_code != 0 and raise_exception: raise RetryException("ret: %s, stdout: %s, stderr: %s" % (cmd.return_code, cmd.stdout, cmd.stderr)) elif cmd.return_code != 0: @@ -499,7 +505,7 @@ def ping(self, req): cmd = jsonobject.loads(req[http.REQUEST_BODY]) rsp = AgentRsp() size_cache = self.vg_size.get(cmd.vgUuid) - if size_cache != None and linux.get_current_timestamp() - size_cache['currentTimestamp'] < 60: + if size_cache is not None and linux.get_current_timestamp() - size_cache['currentTimestamp'] < 60: rsp.totalCapacity = size_cache['totalCapacity'] rsp.availableCapacity = size_cache['availableCapacity'] elif cmd.vgUuid not in self.vgs_in_progress: @@ -640,7 +646,7 @@ def config_lvm(host_id, enableLvmetad=False): lvm.start_lvmlockd(cmd.ioTimeout) logger.debug("find/create vg %s lock..." % cmd.vgUuid) - rsp.isFirst = self.create_vg_if_not_found(cmd.vgUuid, disks, cmd.hostUuid, allDisks, cmd.forceWipe) + rsp.isFirst = self.create_vg_if_not_found(cmd.vgUuid, disks, cmd.hostUuid, allDisks, cmd.forceWipe, cmd.isFirst) # sanlock table: # @@ -1641,4 +1647,45 @@ def get_qcow2_hashvalue(self, req): with lvm.RecursiveOperateLv(abs_path, shared=True): rsp.hashValue = secret.get_image_hash(abs_path) + return jsonobject.dumps(rsp) + + @kvmagent.replyerror + def check_lock(self, req): + cmd = jsonobject.loads(req[http.REQUEST_BODY]) + rsp = AgentRsp() + if cmd.vgUuids is None or len(cmd.vgUuids) == 0: + return jsonobject.dumps(rsp) + + rsp.failedVgs = {} + + def vgck(vg_group): + if len(vg_group) == 0: + return + + vgUuid = vg_group.pop(0) + r, o, e = lvm.vgck(vgUuid, 10) + + if o is not None and o != "": + for es in o.strip().splitlines(): + if "lock start in progress" in es: + break + elif "held by other host" in es: + break + elif "Reading VG %s without a lock" % vgUuid in es: + rsp.failedVgs.update({vgUuid : o}) + break + vgck(vg_group) + + # up to three worker threads executing vgck + threads_maxnum = 3 + vg_groups = list_ops.list_split(cmd.vgUuids, threads_maxnum) + + threads = [] + for vg_group in vg_groups: + if len(vg_group) != 0: + threads.append(thread.ThreadFacade.run_in_thread(vgck, (vg_group,))) + + for t in threads: + t.join() + return jsonobject.dumps(rsp) \ No newline at end of file diff --git a/kvmagent/kvmagent/test/shareblock_testsuite/shared_block_plugin_teststub.py b/kvmagent/kvmagent/test/shareblock_testsuite/shared_block_plugin_teststub.py index 741e077989..d49c0711ff 100644 --- a/kvmagent/kvmagent/test/shareblock_testsuite/shared_block_plugin_teststub.py +++ b/kvmagent/kvmagent/test/shareblock_testsuite/shared_block_plugin_teststub.py @@ -16,14 +16,15 @@ def __init__(self): def disconnect(self, vgUuid, hostUuid): return sharedblock_utils.shareblock_disconnect(vgUuid=vgUuid,hostUuid=hostUuid) - def connect(self, sharedBlockUuids, allSharedBlockUuids, vgUuid, hostUuid, hostId, forceWipe=False): + def connect(self, sharedBlockUuids, allSharedBlockUuids, vgUuid, hostUuid, hostId, forceWipe=False, isFirst=True): return sharedblock_utils.shareblock_connect( sharedBlockUuids=sharedBlockUuids, allSharedBlockUuids=allSharedBlockUuids, vgUuid=vgUuid, hostId=hostId, hostUuid=hostUuid, - forceWipe=forceWipe + forceWipe=forceWipe, + isFirst=isFirst ) def logout(self, vgUuid, hostUuid): diff --git a/kvmagent/kvmagent/test/shareblock_testsuite/test_shareblock_connect.py b/kvmagent/kvmagent/test/shareblock_testsuite/test_shareblock_connect.py index a4ad8f5787..ca9543605c 100644 --- a/kvmagent/kvmagent/test/shareblock_testsuite/test_shareblock_connect.py +++ b/kvmagent/kvmagent/test/shareblock_testsuite/test_shareblock_connect.py @@ -1,8 +1,9 @@ +import os.path import random from kvmagent.test.shareblock_testsuite.shared_block_plugin_teststub import SharedBlockPluginTestStub from kvmagent.test.utils import sharedblock_utils,pytest_utils,storage_device_utils -from zstacklib.utils import bash +from zstacklib.utils import bash,lvm from unittest import TestCase from zstacklib.test.utils import misc,env import concurrent.futures @@ -50,4 +51,19 @@ def test_sharedblock_connect(self): to_do.append(future) for future in concurrent.futures.as_completed(to_do): - self.assertEqual(future.result().success, True, future.result().error) \ No newline at end of file + if not future.result().success: + self.assertEqual("other thread is connecting now" in future.result().error, True, future.result().error) + + r, o ,e = bash.bash_roe("vgchange --lockstop %s" % vgUuid) + self.assertEqual(0, r, e) + + r, o = bash.bash_ro(" pvs -oname --noheading -Svg_name=%s" % vgUuid) + disk = o.strip().splitlines()[0].strip() + bash.bash_roe("wipefs -af %s" % disk) + r = bash.bash_r("vgs | grep %s" % vgUuid) + self.assertNotEqual(0, r) + + rsp = self.connect([blockUuid], [blockUuid], vgUuid, hostUuid, hostId, forceWipe=True, isFirst=False) + self.assertEqual(False, rsp.success, rsp.error) + r = bash.bash_r("vgs | grep %s" % vgUuid) + self.assertNotEqual(0, r) diff --git a/kvmagent/kvmagent/test/shareblock_testsuite/test_shareblock_data_volume_with_iothreadpin.py b/kvmagent/kvmagent/test/shareblock_testsuite/test_shareblock_data_volume_with_iothreadpin.py index f893adc6f1..6f29c64b64 100644 --- a/kvmagent/kvmagent/test/shareblock_testsuite/test_shareblock_data_volume_with_iothreadpin.py +++ b/kvmagent/kvmagent/test/shareblock_testsuite/test_shareblock_data_volume_with_iothreadpin.py @@ -11,7 +11,6 @@ storage_device_utils.init_storagedevice_plugin() -init_kvmagent() vm_utils.init_vm_plugin() diff --git a/kvmagent/kvmagent/test/shareblock_testsuite/test_shareblock_data_volume_with_multi_queues.py b/kvmagent/kvmagent/test/shareblock_testsuite/test_shareblock_data_volume_with_multi_queues.py index e1dd8d6705..94c00b3b46 100644 --- a/kvmagent/kvmagent/test/shareblock_testsuite/test_shareblock_data_volume_with_multi_queues.py +++ b/kvmagent/kvmagent/test/shareblock_testsuite/test_shareblock_data_volume_with_multi_queues.py @@ -11,7 +11,6 @@ storage_device_utils.init_storagedevice_plugin() -init_kvmagent() vm_utils.init_vm_plugin() diff --git a/kvmagent/kvmagent/test/shareblock_testsuite/test_sharedblock_check_lock.py b/kvmagent/kvmagent/test/shareblock_testsuite/test_sharedblock_check_lock.py new file mode 100644 index 0000000000..aa3a80d49e --- /dev/null +++ b/kvmagent/kvmagent/test/shareblock_testsuite/test_sharedblock_check_lock.py @@ -0,0 +1,91 @@ +# coding=utf-8 +from kvmagent.test.shareblock_testsuite.shared_block_plugin_teststub import SharedBlockPluginTestStub +from kvmagent.test.utils import sharedblock_utils,pytest_utils,storage_device_utils +from zstacklib.utils import bash, lvm, jsonobject +from unittest import TestCase +from zstacklib.test.utils import misc,env + + +storage_device_utils.init_storagedevice_plugin() + +PKG_NAME = __name__ + +# must create iSCSI stroage before run test +__ENV_SETUP__ = { + 'self': { + 'xml':'http://smb.zstack.io/mirror/ztest/xml/threeDiskVm.xml', + 'init':['bash ./createTwoLunsIscsiStorage.sh'] + } +} + +hostUuid = "8b12f74e6a834c5fa90304b8ea54b1dd" +hostId = 24 +vgUuid = "36b02490bb944233b0b01990a450ba83" +vg2Uuid = "ee09b7986bbc4a1f85f439b168c3aee7" + +## describe: case will manage by ztest +class TestSharedBlockPlugin(TestCase, SharedBlockPluginTestStub): + @classmethod + def setUpClass(cls): + pass + @pytest_utils.ztest_decorater + def test_sharedblock_check_lock(self): + self_vm = env.get_vm_metadata('self') + rsp = storage_device_utils.iscsi_login( + self_vm.ip,"3260" + ) + self.assertEqual(rsp.success, True, rsp.error) + + # test connect shareblock + r, o = bash.bash_ro("ls /dev/disk/by-id | grep scsi|awk -F '-' '{print $2}'") + blockUuids = o.strip().splitlines() + + rsp = self.connect(blockUuids[0 : 1], blockUuids, vgUuid, hostUuid, hostId, forceWipe=True) + self.assertEqual(True, rsp.success, rsp.error) + o = bash.bash_o("vgs") + self.assertEqual(True, rsp.success, o) + + rsp = self.connect(blockUuids[1 : 2], blockUuids, vg2Uuid, hostUuid, hostId, forceWipe=True) + self.assertEqual(True, rsp.success, rsp.error) + + # normal + rsp = sharedblock_utils.sharedblock_check_lock( + vgUuids=[vgUuid, vg2Uuid], + ) + self.assertEqual(True, rsp.success, rsp.error) + self.assertEqual(0, len(rsp.failedVgs), rsp.failedVgs.__dict__) + + # vg without a lock + lvm.drop_vg_lock(vgUuid) + + rsp = sharedblock_utils.sharedblock_check_lock( + vgUuids=[vgUuid, vg2Uuid], + ) + self.assertEqual(True, rsp.success, rsp.error) + self.assertEqual(1, len(rsp.failedVgs), rsp.failedVgs.__dict__) + self.assertEqual(rsp.failedVgs.hasattr(vgUuid), True, rsp.failedVgs.__dict__) + + rsp = self.connect(blockUuids[0 : 1], blockUuids, vgUuid, hostUuid, hostId, forceWipe=False) + self.assertEqual(True, rsp.success, rsp.error) + + # If there is no lv, restarting lvmlockd may not restore vg lock(lvm 2.03.11) + bash.bash_errorout("lvcreate --size 10M %s" % vgUuid) + bash.bash_errorout("lvcreate --size 10M %s" % vg2Uuid) + + # kill lvmlockd + lvm.stop_lvmlockd() + rsp = sharedblock_utils.sharedblock_check_lock( + vgUuids=[vgUuid, vg2Uuid], + ) + self.assertEqual(True, rsp.success, rsp.error) + self.assertEqual(rsp.failedVgs.hasattr(vgUuid), True, str(rsp.failedVgs)) + self.assertEqual(rsp.failedVgs.hasattr(vg2Uuid), True, str(rsp.failedVgs)) + + rsp = self.connect(blockUuids[0 : 1], blockUuids, vgUuid, hostUuid, hostId, forceWipe=False) + self.assertEqual(True, rsp.success, rsp.error) + rsp = self.connect(blockUuids[1 : 2], blockUuids, vg2Uuid, hostUuid, hostId, forceWipe=False) + self.assertEqual(True, rsp.success, rsp.error) + bash.bash_errorout("lvcreate --size 10M %s" % vgUuid) + bash.bash_errorout("lvcreate --size 10M %s" % vg2Uuid) + + diff --git a/kvmagent/kvmagent/test/shareblock_testsuite/test_virtio_scsi_shareblock_data_volume_with_iothreadpin.py b/kvmagent/kvmagent/test/shareblock_testsuite/test_virtio_scsi_shareblock_data_volume_with_iothreadpin.py index 9cb625c92a..2499e813fd 100644 --- a/kvmagent/kvmagent/test/shareblock_testsuite/test_virtio_scsi_shareblock_data_volume_with_iothreadpin.py +++ b/kvmagent/kvmagent/test/shareblock_testsuite/test_virtio_scsi_shareblock_data_volume_with_iothreadpin.py @@ -11,7 +11,6 @@ storage_device_utils.init_storagedevice_plugin() -init_kvmagent() vm_utils.init_vm_plugin() diff --git a/kvmagent/kvmagent/test/utils/pytest_utils.py b/kvmagent/kvmagent/test/utils/pytest_utils.py index 8a925d86d3..cc6bc64abe 100644 --- a/kvmagent/kvmagent/test/utils/pytest_utils.py +++ b/kvmagent/kvmagent/test/utils/pytest_utils.py @@ -1,10 +1,11 @@ import os import coverage from zstacklib.test.utils import env +from zstacklib.utils import debug Out_flag = True - +debug.install_runtime_tracedumper() class PytestExtension(object): cov = None diff --git a/kvmagent/kvmagent/test/utils/sharedblock_utils.py b/kvmagent/kvmagent/test/utils/sharedblock_utils.py index 2afece4f22..7f614f6350 100644 --- a/kvmagent/kvmagent/test/utils/sharedblock_utils.py +++ b/kvmagent/kvmagent/test/utils/sharedblock_utils.py @@ -20,7 +20,7 @@ def sharedblock_ping(vgUuid): })) @misc.return_jsonobject() -def shareblock_connect(sharedBlockUuids=None, allSharedBlockUuids=None, vgUuid=None,hostId=None,hostUuid=None, forceWipe=True): +def shareblock_connect(sharedBlockUuids=None, allSharedBlockUuids=None, vgUuid=None,hostId=None,hostUuid=None, forceWipe=True, isFirst=True): return get_sharedblock_plugin().connect(misc.make_a_request({ "sharedBlockUuids":sharedBlockUuids, # [], ls /dev/disk/by-id -l|grep scsi "allSharedBlockUuids":allSharedBlockUuids, @@ -28,7 +28,8 @@ def shareblock_connect(sharedBlockUuids=None, allSharedBlockUuids=None, vgUuid=N "hostId":hostId, "hostUuid": hostUuid, "forceWipe": forceWipe, - "primaryStorageUuid":vgUuid + "primaryStorageUuid":vgUuid, + "isFirst":isFirst })) @misc.return_jsonobject() @@ -313,3 +314,9 @@ def shareblock_get_qcow2_hashvalue(installPath=None): return get_sharedblock_plugin().get_qcow2_hashvalue(misc.make_a_request({ "installPath": installPath })) + +@misc.return_jsonobject() +def sharedblock_check_lock(vgUuids=[]): + return get_sharedblock_plugin().check_lock(misc.make_a_request({ + "vgUuids": vgUuids + })) diff --git a/zstacklib/zstacklib/utils/list_ops.py b/zstacklib/zstacklib/utils/list_ops.py index 38b98b03d5..6b6b0e8491 100644 --- a/zstacklib/zstacklib/utils/list_ops.py +++ b/zstacklib/zstacklib/utils/list_ops.py @@ -29,3 +29,15 @@ def list_and(list1, list2): new_list.append(item) return new_list + +def list_split(list, n): + if len(list) == 0: + return list + + if n <= 0: + raise Exception("num must be positive") + + result = [[] for _ in range(min(n, len(list)))] + for i, v in enumerate(list): + result[i % n].append(v) + return result diff --git a/zstacklib/zstacklib/utils/lvm.py b/zstacklib/zstacklib/utils/lvm.py index 50d61cbf5d..31dc11a713 100644 --- a/zstacklib/zstacklib/utils/lvm.py +++ b/zstacklib/zstacklib/utils/lvm.py @@ -1814,9 +1814,12 @@ def check_pv_status(vgUuid, timeout): return True, "" +@bash.in_bash +def vgck(vgUuid, timeout): + return bash.bash_roe('timeout -s SIGKILL %s vgck %s 2>&1' % (timeout, vgUuid)) def lvm_vgck(vgUuid, timeout): - health, o, e = bash.bash_roe('timeout -s SIGKILL %s vgck %s 2>&1' % (360 if timeout < 360 else timeout, vgUuid)) + health, o, e = vgck(vgUuid, 360 if timeout < 360 else timeout) check_stuck_vglk() if health != 0: