From ec64780c206cdb040eee740b17865e6f0ff81cd8 Mon Sep 17 00:00:00 2001 From: Sebastiaan Huber Date: Tue, 17 Oct 2023 15:56:15 +0200 Subject: [PATCH] Tests: Refactor transport tests from `unittest` to `pytest` (#6152) Now detects all plugins using `get_entry_points` instead of manually parsing module files to detect `Transport` plugins. Uses `pytest.mark.parametrize` to run all tests for all registered transport plugins. --- tests/transports/test_all_plugins.py | 467 ++++++++++----------------- tests/transports/test_local.py | 50 +-- tests/transports/test_ssh.py | 132 ++++---- 3 files changed, 254 insertions(+), 395 deletions(-) diff --git a/tests/transports/test_all_plugins.py b/tests/transports/test_all_plugins.py index 6781c76a22..262172bb00 100644 --- a/tests/transports/test_all_plugins.py +++ b/tests/transports/test_all_plugins.py @@ -7,7 +7,7 @@ # For further information on the license, see the LICENSE.txt file # # For further information please visit http://www.aiida.net # ########################################################################### -# pylint: disable=too-many-lines,fixme +# pylint: disable=too-many-lines,fixme,redefined-outer-name """ This module contains a set of unittest test classes that can be loaded from the plugin. @@ -23,12 +23,13 @@ import string import tempfile import time -import unittest import uuid import psutil +import pytest -from aiida.plugins import SchedulerFactory +from aiida.plugins import SchedulerFactory, TransportFactory, entry_point +from aiida.transports import Transport # TODO : test for copy with pattern # TODO : test for copy with/without patterns, overwriting folder @@ -36,111 +37,37 @@ # TODO : silly cases of copy/put/get from self to self -def get_all_custom_transports(): - """ - Autodiscover all custom transports defined in the variable - plugin_transpors inside each test_* file in this folder. - - Therefore, do not move this function out of this file. - - :return: a dictionary of objects as defined in the various plugin_transport - variables of the different files (the key is the module in which - it was found) - """ - import importlib - - modulename = __name__.rpartition('.')[0] - this_full_fname = __file__ - thisdir, thisfname = os.path.split(this_full_fname) - - test_modules = [ - os.path.split(fname)[1][:-3] - for fname in os.listdir(thisdir) - if fname.endswith('.py') and fname.startswith('test_') - ] - - # Remove this module: note that I should be careful because __file__, from - # the second time on, is the pyc file rather than the py file - thisbasename = os.path.splitext(thisfname)[0] - try: - test_modules.remove(thisbasename) - except IndexError: - print(f'Warning, this module ({thisbasename}) was not found!') - - all_custom_transports = {} - for module in test_modules: - module = importlib.import_module('.'.join([modulename, module])) - custom_transport = module.__dict__.get('plugin_transport', None) - if custom_transport is None: - print(f'Define the plugin_transport variable inside the {module} module!') - else: - all_custom_transports[module] = custom_transport - - return all_custom_transports - - -def run_for_all_plugins(actual_test_method): - """ - Decorator method that actually run the methods with an additional - parameter (custom_transport), once for every custom_transport defined - in the test_* files [except this one]. - """ - - class CollectiveException(Exception): - pass - - all_custom_transports = get_all_custom_transports() +@pytest.fixture(scope='function', params=entry_point.get_entry_point_names('aiida.transports')) +def custom_transport(request) -> Transport: + """Fixture that parametrizes over all the registered implementations of the ``CommonRelaxWorkChain``.""" + if request.param == 'core.ssh': + kwargs = {'machine': 'localhost', 'timeout': 30, 'load_system_host_keys': True, 'key_policy': 'AutoAddPolicy'} + else: + kwargs = {} - def test_all_plugins(self): - """ - The wrapper function that calls the subfunction for each transport. - """ - exceptions = [] - for tr_name, custom_transport in all_custom_transports.items(): - try: - actual_test_method(self, custom_transport) - except Exception as exception: # pylint: disable=broad-except - import traceback - exceptions.append((exception, traceback.format_exc(), tr_name)) - - if exceptions: - if all(isinstance(exc[0], AssertionError) for exc in exceptions): - exception_to_raise = AssertionError - else: - exception_to_raise = CollectiveException + return TransportFactory(request.param)(**kwargs) - messages = ['*** At least one test for a subplugin failed. See below ***', ''] - for exc in exceptions: - messages.append(f"*** [For plugin {exc[2]}]: Exception '{type(exc[0]).__name__}': {exc[0]}") - messages.append(exc[1]) - raise exception_to_raise('\n'.join(messages)) - - return test_all_plugins - - -class TestBasicFunctionality(unittest.TestCase): +class TestBasicFunctionality: """ Tests to check basic functionality of transports. """ - @run_for_all_plugins def test_is_open(self, custom_transport): """Test that the is_open property works.""" - self.assertFalse(custom_transport.is_open) + assert not custom_transport.is_open with custom_transport: - self.assertTrue(custom_transport.is_open) + assert custom_transport.is_open - self.assertFalse(custom_transport.is_open) + assert not custom_transport.is_open -class TestDirectoryManipulation(unittest.TestCase): +class TestDirectoryManipulation: """ Tests to check, create and delete folders. """ - @run_for_all_plugins def test_makedirs(self, custom_transport): """ Verify the functioning of makedirs command @@ -150,7 +77,7 @@ def test_makedirs(self, custom_transport): directory = 'temp_dir_test' transport.chdir(location) - self.assertEqual(location, transport.getcwd()) + assert location == transport.getcwd() while transport.isdir(directory): # I append a random letter/number until it is unique directory += random.choice(string.ascii_uppercase + string.digits) @@ -162,11 +89,11 @@ def test_makedirs(self, custom_transport): # I create the tree transport.makedirs(dir_tree) # verify the existence - self.assertTrue(transport.isdir('1')) - self.assertTrue(dir_tree) + assert transport.isdir('1') + assert dir_tree # try to recreate the same folder - with self.assertRaises(OSError): + with pytest.raises(OSError): transport.makedirs(dir_tree) # recreate but with ignore flag @@ -178,7 +105,6 @@ def test_makedirs(self, custom_transport): transport.chdir('..') transport.rmdir(directory) - @run_for_all_plugins def test_rmtree(self, custom_transport): """ Verify the functioning of rmtree command @@ -188,7 +114,7 @@ def test_rmtree(self, custom_transport): directory = 'temp_dir_test' transport.chdir(location) - self.assertEqual(location, transport.getcwd()) + assert location == transport.getcwd() while transport.isdir(directory): # I append a random letter/number until it is unique directory += random.choice(string.ascii_uppercase + string.digits) @@ -202,7 +128,7 @@ def test_rmtree(self, custom_transport): # remove it transport.rmtree('1') # verify the removal - self.assertFalse(transport.isdir('1')) + assert not transport.isdir('1') # also tests that it works with a single file # create file @@ -213,12 +139,11 @@ def test_rmtree(self, custom_transport): # remove it transport.rmtree(local_file_name) # verify the removal - self.assertFalse(transport.isfile(local_file_name)) + assert not transport.isfile(local_file_name) transport.chdir('..') transport.rmdir(directory) - @run_for_all_plugins def test_listdir(self, custom_transport): """ create directories, verify listdir, delete a folder with subfolders @@ -229,7 +154,7 @@ def test_listdir(self, custom_transport): directory = 'temp_dir_test' trans.chdir(location) - self.assertEqual(location, trans.getcwd()) + assert location == trans.getcwd() while trans.isdir(directory): # I append a random letter/number until it is unique directory += random.choice(string.ascii_uppercase + string.digits) @@ -246,11 +171,11 @@ def test_listdir(self, custom_transport): list_found = trans.listdir('.') - self.assertTrue(sorted(list_found) == sorted(list_of_dir + list_of_files)) + assert sorted(list_found) == sorted(list_of_dir + list_of_files) - self.assertTrue(sorted(trans.listdir('.', 'a*')), sorted(['as', 'a2', 'a4f'])) - self.assertTrue(sorted(trans.listdir('.', 'a?')), sorted(['as', 'a2'])) - self.assertTrue(sorted(trans.listdir('.', 'a[2-4]*')), sorted(['a2', 'a4f'])) + assert sorted(trans.listdir('.', 'a*')), sorted(['as', 'a2', 'a4f']) + assert sorted(trans.listdir('.', 'a?')), sorted(['as', 'a2']) + assert sorted(trans.listdir('.', 'a[2-4]*')), sorted(['a2', 'a4f']) for this_dir in list_of_dir: trans.rmdir(this_dir) @@ -261,7 +186,6 @@ def test_listdir(self, custom_transport): trans.chdir('..') trans.rmdir(directory) - @run_for_all_plugins def test_listdir_withattributes(self, custom_transport): """ create directories, verify listdir_withattributes, delete a folder with subfolders @@ -283,7 +207,7 @@ def simplify_attributes(data): directory = 'temp_dir_test' trans.chdir(location) - self.assertEqual(location, trans.getcwd()) + assert location == trans.getcwd() while trans.isdir(directory): # I append a random letter/number until it is unique directory += random.choice(string.ascii_uppercase + string.digits) @@ -301,23 +225,16 @@ def simplify_attributes(data): comparison_list = {k: True for k in list_of_dir} for k in list_of_files: comparison_list[k] = False - self.assertTrue(simplify_attributes(trans.listdir_withattributes('.')), comparison_list) - - self.assertTrue( - simplify_attributes(trans.listdir_withattributes('.', 'a*')), { - 'as': True, - 'a2': True, - 'a4f': True, - 'a': False - } - ) - self.assertTrue(simplify_attributes(trans.listdir_withattributes('.', 'a?')), {'as': True, 'a2': True}) - self.assertTrue( - simplify_attributes(trans.listdir_withattributes('.', 'a[2-4]*')), { - 'a2': True, - 'a4f': True - } - ) + + assert simplify_attributes(trans.listdir_withattributes('.')), comparison_list + assert simplify_attributes(trans.listdir_withattributes('.', 'a*')), { + 'as': True, + 'a2': True, + 'a4f': True, + 'a': False + } + assert simplify_attributes(trans.listdir_withattributes('.', 'a?')), {'as': True, 'a2': True} + assert simplify_attributes(trans.listdir_withattributes('.', 'a[2-4]*')), {'a2': True, 'a4f': True} for this_dir in list_of_dir: trans.rmdir(this_dir) @@ -328,7 +245,6 @@ def simplify_attributes(data): trans.chdir('..') trans.rmdir(directory) - @run_for_all_plugins def test_dir_creation_deletion(self, custom_transport): """Test creating and deleting directories.""" with custom_transport as transport: @@ -336,21 +252,20 @@ def test_dir_creation_deletion(self, custom_transport): directory = 'temp_dir_test' transport.chdir(location) - self.assertEqual(location, transport.getcwd()) + assert location == transport.getcwd() while transport.isdir(directory): # I append a random letter/number until it is unique directory += random.choice(string.ascii_uppercase + string.digits) transport.mkdir(directory) - with self.assertRaises(OSError): + with pytest.raises(OSError): # I create twice the same directory transport.mkdir(directory) transport.isdir(directory) - self.assertFalse(transport.isfile(directory)) + assert not transport.isfile(directory) transport.rmdir(directory) - @run_for_all_plugins def test_dir_copy(self, custom_transport): """ Verify if in the copy of a directory also the protection bits @@ -369,16 +284,15 @@ def test_dir_copy(self, custom_transport): dest_directory = f'{directory}_copy' transport.copy(directory, dest_directory) - with self.assertRaises(ValueError): + with pytest.raises(ValueError): transport.copy(directory, '') - with self.assertRaises(ValueError): + with pytest.raises(ValueError): transport.copy('', directory) transport.rmdir(directory) transport.rmdir(dest_directory) - @run_for_all_plugins def test_dir_permissions_creation_modification(self, custom_transport): # pylint: disable=invalid-name """ verify if chmod raises IOError when trying to change bits on a @@ -400,13 +314,13 @@ def test_dir_permissions_creation_modification(self, custom_transport): # pylin transport.chmod(directory, 0o777) # test if the security bits have changed - self.assertEqual(transport.get_mode(directory), 0o777) + assert transport.get_mode(directory) == 0o777 # change permissions transport.chmod(directory, 0o511) # test if the security bits have changed - self.assertEqual(transport.get_mode(directory), 0o511) + assert transport.get_mode(directory) == 0o511 # TODO : bug in paramiko. When changing the directory to very low \ # I cannot set it back to higher permissions @@ -419,18 +333,17 @@ def test_dir_permissions_creation_modification(self, custom_transport): # pylin # change permissions of an empty string, non existing folder. fake_dir = '' - with self.assertRaises(IOError): + with pytest.raises(IOError): transport.chmod(fake_dir, 0o777) fake_dir = 'pippo' - with self.assertRaises(IOError): + with pytest.raises(IOError): # chmod to a non existing folder transport.chmod(fake_dir, 0o777) transport.chdir('..') transport.rmdir(directory) - @run_for_all_plugins def test_dir_reading_permissions(self, custom_transport): """ Try to enter a directory with no read permissions. @@ -452,23 +365,22 @@ def test_dir_reading_permissions(self, custom_transport): transport.chmod(directory, 0) # test if the security bits have changed - self.assertEqual(transport.get_mode(directory), 0) + assert transport.get_mode(directory) == 0 old_cwd = transport.getcwd() - with self.assertRaises(IOError): + with pytest.raises(IOError): transport.chdir(directory) new_cwd = transport.getcwd() - self.assertEqual(old_cwd, new_cwd) + assert old_cwd == new_cwd # TODO : the test leaves a directory even if it is successful # The bug is in paramiko. After lowering the permissions, # I cannot restore them to higher values # transport.rmdir(directory) - @run_for_all_plugins def test_isfile_isdir_to_empty_string(self, custom_transport): """ I check that isdir or isfile return False when executed on an @@ -477,10 +389,9 @@ def test_isfile_isdir_to_empty_string(self, custom_transport): with custom_transport as transport: location = transport.normalize(os.path.join('/', 'tmp')) transport.chdir(location) - self.assertFalse(transport.isdir('')) - self.assertFalse(transport.isfile('')) + assert not transport.isdir('') + assert not transport.isfile('') - @run_for_all_plugins def test_isfile_isdir_to_non_existing_string(self, custom_transport): """ I check that isdir or isfile return False when executed on an @@ -490,12 +401,11 @@ def test_isfile_isdir_to_non_existing_string(self, custom_transport): location = transport.normalize(os.path.join('/', 'tmp')) transport.chdir(location) fake_folder = 'pippo' - self.assertFalse(transport.isfile(fake_folder)) - self.assertFalse(transport.isdir(fake_folder)) - with self.assertRaises(IOError): + assert not transport.isfile(fake_folder) + assert not transport.isdir(fake_folder) + with pytest.raises(IOError): transport.chdir(fake_folder) - @run_for_all_plugins def test_chdir_to_empty_string(self, custom_transport): """ I check that if I pass an empty string to chdir, the cwd does @@ -506,10 +416,10 @@ def test_chdir_to_empty_string(self, custom_transport): new_dir = transport.normalize(os.path.join('/', 'tmp')) transport.chdir(new_dir) transport.chdir('') - self.assertEqual(new_dir, transport.getcwd()) + assert new_dir == transport.getcwd() -class TestPutGetFile(unittest.TestCase): +class TestPutGetFile: """ Test to verify whether the put and get functions behave correctly on files. 1) they work @@ -517,7 +427,6 @@ class TestPutGetFile(unittest.TestCase): 3) they reject empty strings """ - @run_for_all_plugins def test_put_and_get(self, custom_transport): """Test putting and getting files.""" local_dir = os.path.join('/', 'tmp') @@ -550,9 +459,9 @@ def test_put_and_get(self, custom_transport): list_of_files = transport.listdir('.') # it is False because local_file_name has the full path, # while list_of_files has not - self.assertFalse(local_file_name in list_of_files) - self.assertTrue(remote_file_name in list_of_files) - self.assertFalse(retrieved_file_name in list_of_files) + assert not local_file_name in list_of_files + assert remote_file_name in list_of_files + assert not retrieved_file_name in list_of_files os.remove(local_file_name) transport.remove(remote_file_name) @@ -561,7 +470,6 @@ def test_put_and_get(self, custom_transport): transport.chdir('..') transport.rmdir(directory) - @run_for_all_plugins def test_put_get_abs_path(self, custom_transport): """ test of exception for non existing files and abs path @@ -587,30 +495,30 @@ def test_put_get_abs_path(self, custom_transport): pathlib.Path(local_file_name).touch() # partial_file_name is not an abs path - with self.assertRaises(ValueError): + with pytest.raises(ValueError): transport.put(partial_file_name, remote_file_name) - with self.assertRaises(ValueError): + with pytest.raises(ValueError): transport.putfile(partial_file_name, remote_file_name) # retrieved_file_name does not exist - with self.assertRaises(OSError): + with pytest.raises(OSError): transport.put(retrieved_file_name, remote_file_name) - with self.assertRaises(OSError): + with pytest.raises(OSError): transport.putfile(retrieved_file_name, remote_file_name) # remote_file_name does not exist - with self.assertRaises(IOError): + with pytest.raises(IOError): transport.get(remote_file_name, retrieved_file_name) - with self.assertRaises(IOError): + with pytest.raises(IOError): transport.getfile(remote_file_name, retrieved_file_name) transport.put(local_file_name, remote_file_name) transport.putfile(local_file_name, remote_file_name) # local filename is not an abs path - with self.assertRaises(ValueError): + with pytest.raises(ValueError): transport.get(remote_file_name, 'delete_me.txt') - with self.assertRaises(ValueError): + with pytest.raises(ValueError): transport.getfile(remote_file_name, 'delete_me.txt') transport.remove(remote_file_name) @@ -619,7 +527,6 @@ def test_put_get_abs_path(self, custom_transport): transport.chdir('..') transport.rmdir(directory) - @run_for_all_plugins def test_put_get_empty_string(self, custom_transport): """ test of exception put/get of empty strings @@ -648,15 +555,15 @@ def test_put_get_empty_string(self, custom_transport): # localpath is an empty string # ValueError because it is not an abs path - with self.assertRaises(ValueError): + with pytest.raises(ValueError): transport.put('', remote_file_name) - with self.assertRaises(ValueError): + with pytest.raises(ValueError): transport.putfile('', remote_file_name) # remote path is an empty string - with self.assertRaises(IOError): + with pytest.raises(IOError): transport.put(local_file_name, '') - with self.assertRaises(IOError): + with pytest.raises(IOError): transport.putfile(local_file_name, '') transport.put(local_file_name, remote_file_name) @@ -664,16 +571,16 @@ def test_put_get_empty_string(self, custom_transport): transport.putfile(local_file_name, remote_file_name) # remote path is an empty string - with self.assertRaises(IOError): + with pytest.raises(IOError): transport.get('', retrieved_file_name) - with self.assertRaises(IOError): + with pytest.raises(IOError): transport.getfile('', retrieved_file_name) # local path is an empty string # ValueError because it is not an abs path - with self.assertRaises(ValueError): + with pytest.raises(ValueError): transport.get(remote_file_name, '') - with self.assertRaises(ValueError): + with pytest.raises(ValueError): transport.getfile(remote_file_name, '') # TODO : get doesn't retrieve empty files. @@ -686,14 +593,14 @@ def test_put_get_empty_string(self, custom_transport): transport.remove(remote_file_name) # If it couldn't end the copy, it leaves what he did on # local file - self.assertTrue('file_retrieved.txt' in transport.listdir('.')) + assert 'file_retrieved.txt' in transport.listdir('.') os.remove(retrieved_file_name) transport.chdir('..') transport.rmdir(directory) -class TestPutGetTree(unittest.TestCase): +class TestPutGetTree: """ Test to verify whether the put and get functions behave correctly on folders. 1) they work @@ -701,7 +608,6 @@ class TestPutGetTree(unittest.TestCase): 3) they reject empty strings """ - @run_for_all_plugins def test_put_and_get(self, custom_transport): """Test putting and getting files.""" local_dir = os.path.join('/', 'tmp') @@ -744,16 +650,16 @@ def test_put_and_get(self, custom_transport): list_of_dirs = transport.listdir('.') # # it is False because local_file_name has the full path, # # while list_of_files has not - self.assertFalse(local_subfolder in list_of_dirs) - self.assertTrue(remote_subfolder in list_of_dirs) - self.assertFalse(retrieved_subfolder in list_of_dirs) - self.assertTrue('tmp1' in list_of_dirs) - self.assertTrue('tmp3' in list_of_dirs) + assert not local_subfolder in list_of_dirs + assert remote_subfolder in list_of_dirs + assert not retrieved_subfolder in list_of_dirs + assert 'tmp1' in list_of_dirs + assert 'tmp3' in list_of_dirs list_pushed_file = transport.listdir('tmp2') list_retrieved_file = transport.listdir('tmp3') - self.assertTrue('file.txt' in list_pushed_file) - self.assertTrue('file.txt' in list_retrieved_file) + assert 'file.txt' in list_pushed_file + assert 'file.txt' in list_retrieved_file shutil.rmtree(local_subfolder) shutil.rmtree(retrieved_subfolder) @@ -762,7 +668,6 @@ def test_put_and_get(self, custom_transport): transport.chdir('..') transport.rmdir(directory) - @run_for_all_plugins def test_put_and_get_overwrite(self, custom_transport): """Test putting and getting files with overwrites.""" local_dir = os.path.join('/', 'tmp') @@ -798,13 +703,13 @@ def test_put_and_get_overwrite(self, custom_transport): transport.put(local_subfolder, remote_subfolder) transport.get(remote_subfolder, retrieved_subfolder) - with self.assertRaises(OSError): + with pytest.raises(OSError): transport.put(local_subfolder, remote_subfolder, overwrite=False) - with self.assertRaises(OSError): + with pytest.raises(OSError): transport.get(remote_subfolder, retrieved_subfolder, overwrite=False) - with self.assertRaises(OSError): + with pytest.raises(OSError): transport.puttree(local_subfolder, remote_subfolder, overwrite=False) - with self.assertRaises(OSError): + with pytest.raises(OSError): transport.gettree(remote_subfolder, retrieved_subfolder, overwrite=False) shutil.rmtree(local_subfolder) @@ -815,7 +720,6 @@ def test_put_and_get_overwrite(self, custom_transport): transport.chdir('..') transport.rmtree(directory) - @run_for_all_plugins def test_copy(self, custom_transport): """Test copying.""" local_dir = os.path.join('/', 'tmp') @@ -846,44 +750,43 @@ def test_copy(self, custom_transport): # first test the copy. Copy of two files matching patterns, into a folder transport.copy(os.path.join('local', '*.txt'), '.') - self.assertEqual(set(['a.txt', 'c.txt', 'local']), set(transport.listdir('.'))) + assert set(['a.txt', 'c.txt', 'local']) == set(transport.listdir('.')) transport.remove('a.txt') transport.remove('c.txt') # second test copy. Copy of two folders transport.copy('local', 'prova') - self.assertEqual(set(['prova', 'local']), set(transport.listdir('.'))) - self.assertEqual(set(['a.txt', 'b.tmp', 'c.txt']), set(transport.listdir('prova'))) + assert set(['prova', 'local']) == set(transport.listdir('.')) + assert set(['a.txt', 'b.tmp', 'c.txt']) == set(transport.listdir('prova')) transport.rmtree('prova') # third test copy. Can copy one file into a new file transport.copy(os.path.join('local', '*.tmp'), 'prova') - self.assertEqual(set(['prova', 'local']), set(transport.listdir('.'))) + assert set(['prova', 'local']) == set(transport.listdir('.')) transport.remove('prova') # fourth test copy: can't copy more than one file on the same file, # i.e., the destination should be a folder - with self.assertRaises(OSError): + with pytest.raises(OSError): transport.copy(os.path.join('local', '*.txt'), 'prova') # fifth test, copying one file into a folder transport.mkdir('prova') transport.copy(os.path.join('local', 'a.txt'), 'prova') - self.assertEqual(set(transport.listdir('prova')), set(['a.txt'])) + assert set(transport.listdir('prova')) == set(['a.txt']) transport.rmtree('prova') # sixth test, copying one file into a file transport.copy(os.path.join('local', 'a.txt'), 'prova') - self.assertTrue(transport.isfile('prova')) + assert transport.isfile('prova') transport.remove('prova') # copy of folder into an existing folder # NOTE: the command cp has a different behavior on Mac vs Ubuntu # tests performed locally on a Mac may result in a failure. transport.mkdir('prova') transport.copy('local', 'prova') - self.assertEqual(set(['local']), set(transport.listdir('prova'))) - self.assertEqual(set(['a.txt', 'b.tmp', 'c.txt']), set(transport.listdir(os.path.join('prova', 'local')))) + assert set(['local']) == set(transport.listdir('prova')) + assert set(['a.txt', 'b.tmp', 'c.txt']) == set(transport.listdir(os.path.join('prova', 'local'))) transport.rmtree('prova') # exit transport.chdir('..') transport.rmtree(directory) - @run_for_all_plugins def test_put(self, custom_transport): """Test putting files.""" # pylint: disable=too-many-statements @@ -917,50 +820,49 @@ def test_put(self, custom_transport): # first test putransport. Copy of two files matching patterns, into a folder transport.put(os.path.join(local_base_dir, '*.txt'), '.') - self.assertEqual(set(['a.txt', 'c.txt', 'local']), set(transport.listdir('.'))) + assert set(['a.txt', 'c.txt', 'local']) == set(transport.listdir('.')) transport.remove('a.txt') transport.remove('c.txt') # second. Copy of folder into a non existing folder transport.put(local_base_dir, 'prova') - self.assertEqual(set(['prova', 'local']), set(transport.listdir('.'))) - self.assertEqual(set(['a.txt', 'b.tmp', 'c.txt']), set(transport.listdir('prova'))) + assert set(['prova', 'local']) == set(transport.listdir('.')) + assert set(['a.txt', 'b.tmp', 'c.txt']) == set(transport.listdir('prova')) transport.rmtree('prova') # third. copy of folder into an existing folder transport.mkdir('prova') transport.put(local_base_dir, 'prova') - self.assertEqual(set(['prova', 'local']), set(transport.listdir('.'))) - self.assertEqual(set(['local']), set(transport.listdir('prova'))) - self.assertEqual(set(['a.txt', 'b.tmp', 'c.txt']), set(transport.listdir(os.path.join('prova', 'local')))) + assert set(['prova', 'local']) == set(transport.listdir('.')) + assert set(['local']) == set(transport.listdir('prova')) + assert set(['a.txt', 'b.tmp', 'c.txt']) == set(transport.listdir(os.path.join('prova', 'local'))) transport.rmtree('prova') # third test copy. Can copy one file into a new file transport.put(os.path.join(local_base_dir, '*.tmp'), 'prova') - self.assertEqual(set(['prova', 'local']), set(transport.listdir('.'))) + assert set(['prova', 'local']) == set(transport.listdir('.')) transport.remove('prova') # fourth test copy: can't copy more than one file on the same file, # i.e., the destination should be a folder - with self.assertRaises(OSError): + with pytest.raises(OSError): transport.put(os.path.join(local_base_dir, '*.txt'), 'prova') # copy of folder into file with open(os.path.join(local_dir, directory, 'existing.txt'), 'w', encoding='utf8') as fhandle: fhandle.write(text) - with self.assertRaises(OSError): + with pytest.raises(OSError): transport.put(os.path.join(local_base_dir), 'existing.txt') transport.remove('existing.txt') # fifth test, copying one file into a folder transport.mkdir('prova') transport.put(os.path.join(local_base_dir, 'a.txt'), 'prova') - self.assertEqual(set(transport.listdir('prova')), set(['a.txt'])) + assert set(transport.listdir('prova')) == set(['a.txt']) transport.rmtree('prova') # sixth test, copying one file into a file transport.put(os.path.join(local_base_dir, 'a.txt'), 'prova') - self.assertTrue(transport.isfile('prova')) + assert transport.isfile('prova') transport.remove('prova') # exit transport.chdir('..') transport.rmtree(directory) - @run_for_all_plugins def test_get(self, custom_transport): """Test getting files.""" # pylint: disable=too-many-statements @@ -995,54 +897,50 @@ def test_get(self, custom_transport): # first test put. Copy of two files matching patterns, into a folder transport.get(os.path.join('local', '*.txt'), local_destination) - self.assertEqual(set(['a.txt', 'c.txt', 'local']), set(os.listdir(local_destination))) + assert set(['a.txt', 'c.txt', 'local']) == set(os.listdir(local_destination)) os.remove(os.path.join(local_destination, 'a.txt')) os.remove(os.path.join(local_destination, 'c.txt')) # second. Copy of folder into a non existing folder transport.get('local', os.path.join(local_destination, 'prova')) - self.assertEqual(set(['prova', 'local']), set(os.listdir(local_destination))) - self.assertEqual( - set(['a.txt', 'b.tmp', 'c.txt']), set(os.listdir(os.path.join(local_destination, 'prova'))) - ) + assert set(['prova', 'local']) == set(os.listdir(local_destination)) + assert set(['a.txt', 'b.tmp', 'c.txt']) == set(os.listdir(os.path.join(local_destination, 'prova'))) shutil.rmtree(os.path.join(local_destination, 'prova')) # third. copy of folder into an existing folder os.mkdir(os.path.join(local_destination, 'prova')) transport.get('local', os.path.join(local_destination, 'prova')) - self.assertEqual(set(['prova', 'local']), set(os.listdir(local_destination))) - self.assertEqual(set(['local']), set(os.listdir(os.path.join(local_destination, 'prova')))) - self.assertEqual( - set(['a.txt', 'b.tmp', 'c.txt']), set(os.listdir(os.path.join(local_destination, 'prova', 'local'))) - ) + assert set(['prova', 'local']) == set(os.listdir(local_destination)) + assert set(['local']) == set(os.listdir(os.path.join(local_destination, 'prova'))) + assert set(['a.txt', 'b.tmp', + 'c.txt']) == set(os.listdir(os.path.join(local_destination, 'prova', 'local'))) shutil.rmtree(os.path.join(local_destination, 'prova')) # third test copy. Can copy one file into a new file transport.get(os.path.join('local', '*.tmp'), os.path.join(local_destination, 'prova')) - self.assertEqual(set(['prova', 'local']), set(os.listdir(local_destination))) + assert set(['prova', 'local']) == set(os.listdir(local_destination)) os.remove(os.path.join(local_destination, 'prova')) # fourth test copy: can't copy more than one file on the same file, # i.e., the destination should be a folder - with self.assertRaises(OSError): + with pytest.raises(OSError): transport.get(os.path.join('local', '*.txt'), os.path.join(local_destination, 'prova')) # copy of folder into file with open(os.path.join(local_destination, 'existing.txt'), 'w', encoding='utf8') as fhandle: fhandle.write(text) - with self.assertRaises(OSError): + with pytest.raises(OSError): transport.get('local', os.path.join(local_destination, 'existing.txt')) os.remove(os.path.join(local_destination, 'existing.txt')) # fifth test, copying one file into a folder os.mkdir(os.path.join(local_destination, 'prova')) transport.get(os.path.join('local', 'a.txt'), os.path.join(local_destination, 'prova')) - self.assertEqual(set(os.listdir(os.path.join(local_destination, 'prova'))), set(['a.txt'])) + assert set(os.listdir(os.path.join(local_destination, 'prova'))) == set(['a.txt']) shutil.rmtree(os.path.join(local_destination, 'prova')) # sixth test, copying one file into a file transport.get(os.path.join('local', 'a.txt'), os.path.join(local_destination, 'prova')) - self.assertTrue(os.path.isfile(os.path.join(local_destination, 'prova'))) + assert os.path.isfile(os.path.join(local_destination, 'prova')) os.remove(os.path.join(local_destination, 'prova')) # exit transport.chdir('..') transport.rmtree(directory) - @run_for_all_plugins def test_put_get_abs_path(self, custom_transport): """ test of exception for non existing files and abs path @@ -1070,37 +968,37 @@ def test_put_get_abs_path(self, custom_transport): pathlib.Path(local_file_name).touch() # 'tmp1' is not an abs path - with self.assertRaises(ValueError): + with pytest.raises(ValueError): transport.put('tmp1', remote_subfolder) - with self.assertRaises(ValueError): + with pytest.raises(ValueError): transport.putfile('tmp1', remote_subfolder) - with self.assertRaises(ValueError): + with pytest.raises(ValueError): transport.puttree('tmp1', remote_subfolder) # 'tmp3' does not exist - with self.assertRaises(OSError): + with pytest.raises(OSError): transport.put(retrieved_subfolder, remote_subfolder) - with self.assertRaises(OSError): + with pytest.raises(OSError): transport.putfile(retrieved_subfolder, remote_subfolder) - with self.assertRaises(OSError): + with pytest.raises(OSError): transport.puttree(retrieved_subfolder, remote_subfolder) # remote_file_name does not exist - with self.assertRaises(IOError): + with pytest.raises(IOError): transport.get('non_existing', retrieved_subfolder) - with self.assertRaises(IOError): + with pytest.raises(IOError): transport.getfile('non_existing', retrieved_subfolder) - with self.assertRaises(IOError): + with pytest.raises(IOError): transport.gettree('non_existing', retrieved_subfolder) transport.put(local_subfolder, remote_subfolder) # local filename is not an abs path - with self.assertRaises(ValueError): + with pytest.raises(ValueError): transport.get(remote_subfolder, 'delete_me_tree') - with self.assertRaises(ValueError): + with pytest.raises(ValueError): transport.getfile(remote_subfolder, 'delete_me_tree') - with self.assertRaises(ValueError): + with pytest.raises(ValueError): transport.gettree(remote_subfolder, 'delete_me_tree') os.remove(os.path.join(local_subfolder, 'file.txt')) @@ -1110,7 +1008,6 @@ def test_put_get_abs_path(self, custom_transport): transport.chdir('..') transport.rmdir(directory) - @run_for_all_plugins def test_put_get_empty_string(self, custom_transport): """ test of exception put/get of empty strings @@ -1143,22 +1040,22 @@ def test_put_get_empty_string(self, custom_transport): # localpath is an empty string # ValueError because it is not an abs path - with self.assertRaises(ValueError): + with pytest.raises(ValueError): transport.puttree('', remote_subfolder) # remote path is an empty string - with self.assertRaises(IOError): + with pytest.raises(IOError): transport.puttree(local_subfolder, '') transport.puttree(local_subfolder, remote_subfolder) # remote path is an empty string - with self.assertRaises(IOError): + with pytest.raises(IOError): transport.gettree('', retrieved_subfolder) # local path is an empty string # ValueError because it is not an abs path - with self.assertRaises(ValueError): + with pytest.raises(ValueError): transport.gettree(remote_subfolder, '') # TODO : get doesn't retrieve empty files. @@ -1171,14 +1068,13 @@ def test_put_get_empty_string(self, custom_transport): transport.rmdir(remote_subfolder) # If it couldn't end the copy, it leaves what he did on local file # here I am mixing local with remote - self.assertTrue('file.txt' in transport.listdir('tmp3')) + assert 'file.txt' in transport.listdir('tmp3') os.remove(os.path.join(retrieved_subfolder, 'file.txt')) os.rmdir(retrieved_subfolder) transport.chdir('..') transport.rmdir(directory) - @run_for_all_plugins def test_gettree_nested_directory(self, custom_transport): """Test `gettree` for a nested directory.""" with tempfile.TemporaryDirectory() as dir_remote, tempfile.TemporaryDirectory() as dir_local: @@ -1193,14 +1089,13 @@ def test_gettree_nested_directory(self, custom_transport): transport.gettree(os.path.join(dir_remote, 'sub/path'), os.path.join(dir_local, 'sub/path')) -class TestExecuteCommandWait(unittest.TestCase): +class TestExecuteCommandWait: """ Test some simple command executions and stdin/stdout management. It also checks for escaping of the folder names. """ - @run_for_all_plugins def test_exec_pwd(self, custom_transport): """ I create a strange subfolder with a complicated name and @@ -1226,31 +1121,29 @@ def test_exec_pwd(self, custom_transport): delete_at_end = True transport.mkdir(subfolder) - self.assertTrue(transport.isdir(subfolder)) + assert transport.isdir(subfolder) transport.chdir(subfolder) - self.assertEqual(subfolder_fullpath, transport.getcwd()) + assert subfolder_fullpath == transport.getcwd() retcode, stdout, stderr = transport.exec_command_wait('pwd') - self.assertEqual(retcode, 0) + assert retcode == 0 # I have to strip it because 'pwd' returns a trailing \n - self.assertEqual(stdout.strip(), subfolder_fullpath) - self.assertEqual(stderr, '') + assert stdout.strip() == subfolder_fullpath + assert stderr == '' if delete_at_end: transport.chdir(location) transport.rmdir(subfolder) - @run_for_all_plugins def test_exec_with_stdin_string(self, custom_transport): """Test command execution with a stdin string.""" test_string = 'some_test String' with custom_transport as transport: retcode, stdout, stderr = transport.exec_command_wait('cat', stdin=test_string) - self.assertEqual(retcode, 0) - self.assertEqual(stdout, test_string) - self.assertEqual(stderr, '') + assert retcode == 0 + assert stdout == test_string + assert stderr == '' - @run_for_all_plugins def test_exec_with_stdin_bytes(self, custom_transport): """Test command execution with a stdin bytes. @@ -1260,22 +1153,20 @@ def test_exec_with_stdin_bytes(self, custom_transport): test_string = b'some_test bytes with non-unicode -> \xFA' with custom_transport as transport: retcode, stdout, stderr = transport.exec_command_wait_bytes('cat', stdin=test_string) - self.assertEqual(retcode, 0) - self.assertEqual(stdout, test_string) - self.assertEqual(stderr, b'') + assert retcode == 0 + assert stdout == test_string + assert stderr == b'' - @run_for_all_plugins def test_exec_with_stdin_filelike(self, custom_transport): """Test command execution with a stdin from filelike.""" test_string = 'some_test String' stdin = io.StringIO(test_string) with custom_transport as transport: retcode, stdout, stderr = transport.exec_command_wait('cat', stdin=stdin) - self.assertEqual(retcode, 0) - self.assertEqual(stdout, test_string) - self.assertEqual(stderr, '') + assert retcode == 0 + assert stdout == test_string + assert stderr == '' - @run_for_all_plugins def test_exec_with_stdin_filelike_bytes(self, custom_transport): """Test command execution with a stdin from filelike of type bytes. @@ -1290,11 +1181,10 @@ def test_exec_with_stdin_filelike_bytes(self, custom_transport): stdin = io.BytesIO(test_string) with custom_transport as transport: retcode, stdout, stderr = transport.exec_command_wait_bytes('cat', stdin=stdin) - self.assertEqual(retcode, 0) - self.assertEqual(stdout, test_string) - self.assertEqual(stderr, b'') + assert retcode == 0 + assert stdout == test_string + assert stderr == b'' - @run_for_all_plugins def test_exec_with_stdin_filelike_bytes_decoding(self, custom_transport): """Test command execution with a stdin from filelike of type bytes, trying to decode. @@ -1308,18 +1198,16 @@ def test_exec_with_stdin_filelike_bytes_decoding(self, custom_transport): test_string = b'some_test bytes with non-unicode -> \xFA' stdin = io.BytesIO(test_string) with custom_transport as transport: - with self.assertRaises(UnicodeDecodeError): + with pytest.raises(UnicodeDecodeError): transport.exec_command_wait('cat', stdin=stdin, encoding='utf-8') - @run_for_all_plugins def test_exec_with_wrong_stdin(self, custom_transport): """Test command execution with incorrect stdin string.""" # I pass a number with custom_transport as transport: - with self.assertRaises(ValueError): + with pytest.raises(ValueError): transport.exec_command_wait('cat', stdin=1) - @run_for_all_plugins def test_transfer_big_stdout(self, custom_transport): # pylint: disable=too-many-locals """Test the transfer of a large amount of data on stdout.""" # Create a "big" file of > 2MB (10MB here; in general, larger than the buffer size) @@ -1339,7 +1227,7 @@ def test_transfer_big_stdout(self, custom_transport): # pylint: disable=too-man # We cannot use tempfile.mkdtemp because we're on a remote folder location = trans.normalize(os.path.join('/', 'tmp')) trans.chdir(location) - self.assertEqual(location, trans.getcwd()) + assert location == trans.getcwd() directory = 'temp_dir_test_transfer_big_stdout' while trans.isdir(directory): @@ -1381,15 +1269,15 @@ def test_transfer_big_stdout(self, custom_transport): # pylint: disable=too-man # I get its content via the stdout; emulate also network slowness (note I cat twice) retcode, stdout, stderr = trans.exec_command_wait(f'cat {fname} ; sleep 1 ; cat {fname}') - self.assertEqual(stderr, '') - self.assertEqual(stdout, fcontent + fcontent) - self.assertEqual(retcode, 0) + assert stderr == '' + assert stdout == fcontent + fcontent + assert retcode == 0 # I get its content via the stderr; emulate also network slowness (note I cat twice) retcode, stdout, stderr = trans.exec_command_wait(f'cat {fname} >&2 ; sleep 1 ; cat {fname} >&2') - self.assertEqual(stderr, fcontent + fcontent) - self.assertEqual(stdout, '') - self.assertEqual(retcode, 0) + assert stderr == fcontent + fcontent + assert stdout == '' + assert retcode == 0 # This time, I cat one one on each of the two streams intermittently, to check # that this does not hang. @@ -1401,9 +1289,9 @@ def test_transfer_big_stdout(self, custom_transport): # pylint: disable=too-man retcode, stdout, stderr = trans.exec_command_wait(f'python3 {script_fname}') - self.assertEqual(stderr, fcontent) - self.assertEqual(stdout, fcontent) - self.assertEqual(retcode, 0) + assert stderr == fcontent + assert stdout == fcontent + assert retcode == 0 # Clean-up trans.remove(fname) @@ -1412,7 +1300,7 @@ def test_transfer_big_stdout(self, custom_transport): # pylint: disable=too-man trans.rmdir(directory) -class TestDirectScheduler(unittest.TestCase): +class TestDirectScheduler: """ Test how the direct scheduler works. @@ -1422,7 +1310,6 @@ class TestDirectScheduler(unittest.TestCase): the infrastructure to test on multiple transport plugins. """ - @run_for_all_plugins def test_asynchronous_execution(self, custom_transport): """Test that the execution of a long(ish) command via the direct scheduler does not block. @@ -1454,10 +1341,8 @@ def test_asynchronous_execution(self, custom_transport): # SSH connection etc.) and I don't want to have false failures. # Actually, if the time is short, it could mean also that the execution failed! # So I double check later that the execution was successful. - self.assertTrue( - elapsed_time < 5, 'Getting back control after remote execution took more than 5 seconds! ' - 'Probably submission is blocking' - ) + assert elapsed_time < 5, \ + 'Getting back control after remote execution took more than 5 seconds! Probably submission blocks' # Check that the job is still running # Wait 0.2 more seconds, so that I don't do a super-quick check that might return True @@ -1470,9 +1355,9 @@ def test_asynchronous_execution(self, custom_transport): # - a new scheduler is used and does not use the process PID, or the job_id of the 'direct' scheduler # is not anymore simply the job PID job_id = int(job_id_string) - self.assertTrue( - psutil.pid_exists(job_id), 'The job is not there after a bit more than 1 second! Probably it failed' - ) + assert psutil.pid_exists( + job_id + ), 'The job is not there after a bit more than 1 second! Probably it failed' finally: # Clean up by killing the remote job. # This assumes it's on the same machine; if we add tests on a different machine, diff --git a/tests/transports/test_local.py b/tests/transports/test_local.py index fa88e00468..d4dd39e396 100644 --- a/tests/transports/test_local.py +++ b/tests/transports/test_local.py @@ -7,54 +7,38 @@ # For further information on the license, see the LICENSE.txt file # # For further information please visit http://www.aiida.net # ########################################################################### -"""Tests for the `LocalTransport`.""" -import unittest +"""Test :mod:`aiida.transports.plugins.local`.""" +import getpass + +import pytest from aiida.transports.plugins.local import LocalTransport from aiida.transports.transport import TransportInternalError -# This will be used by test_all_plugins - -plugin_transport = LocalTransport() - - -class TestGeneric(unittest.TestCase): - """ - Test whoami on localhost. - """ - def test_whoami(self): - """Test the `whoami` command.""" - import getpass +def test_basic(): + """Test constructor.""" + with LocalTransport(): + pass - with LocalTransport() as transport: - self.assertEqual(transport.whoami(), getpass.getuser()) +def test_whoami(): + """Test the `whoami` command.""" + with LocalTransport() as transport: + assert transport.whoami() == getpass.getuser() -class TestBasicConnection(unittest.TestCase): - """ - Test basic connections. - """ - - def test_closed_connection(self): - """Test running a command on a closed connection.""" - - with self.assertRaises(TransportInternalError): - transport = LocalTransport() - transport.listdir() - @staticmethod - def test_basic(): - """Test constructor.""" - with LocalTransport(): - pass +def test_closed_connection(): + """Test running a command on a closed connection.""" + with pytest.raises(TransportInternalError): + transport = LocalTransport() + transport.listdir() def test_gotocomputer(): """Test gotocomputer""" with LocalTransport() as transport: cmd_str = transport.gotocomputer_command('/remote_dir/') - expected_str = ( """bash -c "if [ -d '/remote_dir/' ] ;""" """ then cd '/remote_dir/' ; bash -l ; else echo ' ** The directory' ; """ diff --git a/tests/transports/test_ssh.py b/tests/transports/test_ssh.py index 8b2043f878..85b351a934 100644 --- a/tests/transports/test_ssh.py +++ b/tests/transports/test_ssh.py @@ -7,103 +7,93 @@ # For further information on the license, see the LICENSE.txt file # # For further information please visit http://www.aiida.net # ########################################################################### -"""Test the `SshTransport` plugin on localhost.""" +"""Test :mod:`aiida.transports.plugins.ssh`.""" import logging -import unittest import paramiko +import pytest from aiida.transports.plugins.ssh import SshTransport from aiida.transports.transport import TransportInternalError -# This will be used by test_all_plugins -plugin_transport = SshTransport(machine='localhost', timeout=30, load_system_host_keys=True, key_policy='AutoAddPolicy') +def test_closed_connection_ssh(): + """Test calling command on a closed connection.""" + with pytest.raises(TransportInternalError): + transport = SshTransport(machine='localhost') + transport._exec_command_internal('ls') # pylint: disable=protected-access -class TestBasicConnection(unittest.TestCase): - """ - Test basic connections. - """ +def test_closed_connection_sftp(): + """Test calling sftp command on a closed connection.""" + with pytest.raises(TransportInternalError): + transport = SshTransport(machine='localhost') + transport.listdir() - def test_closed_connection_ssh(self): - """Test calling command on a closed connection.""" - with self.assertRaises(TransportInternalError): - transport = SshTransport(machine='localhost') - transport._exec_command_internal('ls') # pylint: disable=protected-access - def test_closed_connection_sftp(self): - """Test calling sftp command on a closed connection.""" - with self.assertRaises(TransportInternalError): - transport = SshTransport(machine='localhost') - transport.listdir() +def test_auto_add_policy(): + """Test the auto add policy.""" + with SshTransport(machine='localhost', timeout=30, load_system_host_keys=True, key_policy='AutoAddPolicy'): + pass - @staticmethod - def test_auto_add_policy(): - """Test the auto add policy.""" - with SshTransport(machine='localhost', timeout=30, load_system_host_keys=True, key_policy='AutoAddPolicy'): - pass - @staticmethod - def test_proxy_jump(): - """Test the connection with a proxy jump or several""" - with SshTransport( - machine='localhost', - proxy_jump='localhost', - timeout=30, - load_system_host_keys=True, - key_policy='AutoAddPolicy' - ): - pass +def test_proxy_jump(): + """Test the connection with a proxy jump or several""" + with SshTransport( + machine='localhost', proxy_jump='localhost', timeout=30, load_system_host_keys=True, key_policy='AutoAddPolicy' + ): + pass - # kind of pointless, but should work and to check that proxy chaining works - with SshTransport( - machine='localhost', - proxy_jump='localhost,localhost,localhost', - timeout=30, - load_system_host_keys=True, - key_policy='AutoAddPolicy' - ): - pass + # kind of pointless, but should work and to check that proxy chaining works + with SshTransport( + machine='localhost', + proxy_jump='localhost,localhost,localhost', + timeout=30, + load_system_host_keys=True, + key_policy='AutoAddPolicy' + ): + pass - def test_proxy_jump_invalid(self): - """Test proper error reporting when invalid host as a proxy""" - - # import is also that when Python is running with debug warnings `-Wd` - # no unclosed files are reported. - with self.assertRaises(paramiko.SSHException): - with SshTransport( - machine='localhost', - proxy_jump='localhost,nohost', - timeout=30, - load_system_host_keys=True, - key_policy='AutoAddPolicy' - ): - pass - - @staticmethod - def test_proxy_command(): - """Test the connection with a proxy command""" + +def test_proxy_jump_invalid(): + """Test proper error reporting when invalid host as a proxy""" + + # import is also that when Python is running with debug warnings `-Wd` + # no unclosed files are reported. + with pytest.raises(paramiko.SSHException): with SshTransport( machine='localhost', - proxy_command='ssh -W localhost:22 localhost', + proxy_jump='localhost,nohost', timeout=30, load_system_host_keys=True, key_policy='AutoAddPolicy' ): pass - def test_no_host_key(self): - """Test if there is no host key.""" - # Disable logging to avoid output during test - logging.disable(logging.ERROR) - with self.assertRaises(paramiko.SSHException): - with SshTransport(machine='localhost', timeout=30, load_system_host_keys=False): - pass +def test_proxy_command(): + """Test the connection with a proxy command""" + with SshTransport( + machine='localhost', + proxy_command='ssh -W localhost:22 localhost', + timeout=30, + load_system_host_keys=True, + key_policy='AutoAddPolicy' + ): + pass + + +def test_no_host_key(): + """Test if there is no host key.""" + # Disable logging to avoid output during test + logging.disable(logging.ERROR) + + with pytest.raises(paramiko.SSHException): + with SshTransport(machine='localhost', timeout=30, load_system_host_keys=False): + pass - # Reset logging level - logging.disable(logging.NOTSET) + # Reset logging level + logging.disable(logging.NOTSET) def test_gotocomputer():