Skip to content

Commit

Permalink
Merge pull request #702 from DanielYang59/consistent-ruff
Browse files Browse the repository at this point in the history
fix `TestClass` name, make module level variable all capital and other code cleanup
  • Loading branch information
shyuep authored Jan 9, 2025
2 parents 6e12c2d + 9a25bfd commit 802692f
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 77 deletions.
1 change: 0 additions & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,4 @@ jobs:
- name: mypy
run: |
mypy --version
rm -rf .mypy_cache
mypy src
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
exclude: ^(docs|tests/files|cmd_line|tasks.py)
exclude: ^(docs|tests/test_files|tasks.py)

ci:
autoupdate_schedule: monthly
Expand Down
3 changes: 0 additions & 3 deletions src/monty/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,9 @@ def run(self, timeout: Optional[float] = None, **kwargs) -> Self:

def target(**kw):
try:
# print('Thread started')
with Popen(self.command, **kw) as self.process:
self.output, self.error = self.process.communicate()
self.retcode = self.process.returncode
# print('Thread stopped')
except Exception:
self.error = traceback.format_exc()
self.retcode = -1
Expand All @@ -96,7 +94,6 @@ def target(**kw):
thread.join(timeout)

if thread.is_alive() and self.process is not None:
# print("Terminating process")
self.process.terminate()
self.killed = True
thread.join()
Expand Down
14 changes: 2 additions & 12 deletions src/monty/termcolor.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,7 @@ def cprint(
It accepts arguments of print function.
"""
try:
print((colored(text, color, on_color, attrs)), **kwargs)
except TypeError:
# flush is not supported by py2.7
kwargs.pop("flush", None)
print((colored(text, color, on_color, attrs)), **kwargs)
print((colored(text, color, on_color, attrs)), **kwargs)


def colored_map(text: str, cmap: dict) -> str:
Expand Down Expand Up @@ -162,12 +157,7 @@ def cprint_map(text: str, cmap: dict, **kwargs) -> None:
Examples:
cprint_map("Hello world", {"Hello": "red"})
"""
try:
print(colored_map(text, cmap), **kwargs)
except TypeError:
# flush is not supported by py2.7
kwargs.pop("flush", None)
print(colored_map(text, cmap), **kwargs)
print(colored_map(text, cmap), **kwargs)


def get_terminal_size():
Expand Down
16 changes: 8 additions & 8 deletions tests/test_design_patterns.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,32 +53,32 @@ def test_pickle(self):


@cached_class
class TestClass:
class ClassForTest:
def __init__(self, value: Any) -> None:
self.value = value


def test_caching():
# Test that instances are cached
inst1 = TestClass(1)
inst2 = TestClass(1)
inst1 = ClassForTest(1)
inst2 = ClassForTest(1)
assert inst1 is inst2

inst3 = TestClass(2)
inst3 = ClassForTest(2)
assert inst1 is not inst3


def test_picklability():
# Test that instances can be pickled and unpickled
original = TestClass(42)
original = ClassForTest(42)
pickled = pickle.dumps(original)
unpickled = pickle.loads(pickled)

# Check that the unpickled instance has the same value
assert original.value == unpickled.value

# Check that the unpickled instance is the same as a newly created instance
new_instance = TestClass(42)
new_instance = ClassForTest(42)
assert unpickled is new_instance


Expand All @@ -98,8 +98,8 @@ def __init__(self):

def test_class_identity():
# Ensure the decorated class is still recognized as the original class
assert isinstance(TestClass(1), TestClass)
assert type(TestClass(1)) is TestClass
assert isinstance(ClassForTest(1), ClassForTest)
assert type(ClassForTest(1)) is ClassForTest


def test_multiple_arguments():
Expand Down
6 changes: 3 additions & 3 deletions tests/test_dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class TestClassNew:
"""A dummy class for tests."""

def __post_init__(self):
print("Hello.")
pass

def method_a(self):
pass
Expand All @@ -156,7 +156,7 @@ class TestClassOld:
class_attrib_old = "OLD_ATTRIB"

def __post_init__(self):
print("Hello.")
pass

def method_b(self):
"""This is method_b."""
Expand All @@ -180,7 +180,7 @@ def test_requires():

@requires(fictitious_mod is not None, err_msg)
def use_fictitious_mod():
print("success")
pass

with pytest.raises(RuntimeError, match=err_msg):
use_fictitious_mod()
Expand Down
2 changes: 1 addition & 1 deletion tests/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ def test_pint_quantity(self):
cls = ClassContainingQuantity(qty=pint.Quantity("9.81 m/s**2"))

d = json.loads(MontyEncoder().encode(cls))
print(d)
assert isinstance(d, dict)

assert d["qty"]["@module"] == "pint"
assert d["qty"]["@class"] == "Quantity"
Expand Down
2 changes: 1 addition & 1 deletion tests/test_pprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ def __str__(self):
],
)

print(draw_tree(root))
assert draw_tree(root)
90 changes: 45 additions & 45 deletions tests/test_shutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,61 +20,61 @@
remove,
)

test_dir = os.path.join(os.path.dirname(__file__), "test_files")
TEST_DIR = os.path.join(os.path.dirname(__file__), "test_files")


class TestCopyR:
def setup_method(self):
os.mkdir(os.path.join(test_dir, "cpr_src"))
os.mkdir(os.path.join(TEST_DIR, "cpr_src"))
with open(
os.path.join(test_dir, "cpr_src", "test"), "w", encoding="utf-8"
os.path.join(TEST_DIR, "cpr_src", "test"), "w", encoding="utf-8"
) as f:
f.write("what")
os.mkdir(os.path.join(test_dir, "cpr_src", "sub"))
os.mkdir(os.path.join(TEST_DIR, "cpr_src", "sub"))
with open(
os.path.join(test_dir, "cpr_src", "sub", "testr"), "w", encoding="utf-8"
os.path.join(TEST_DIR, "cpr_src", "sub", "testr"), "w", encoding="utf-8"
) as f:
f.write("what2")
if platform.system() != "Windows":
os.symlink(
os.path.join(test_dir, "cpr_src", "test"),
os.path.join(test_dir, "cpr_src", "mysymlink"),
os.path.join(TEST_DIR, "cpr_src", "test"),
os.path.join(TEST_DIR, "cpr_src", "mysymlink"),
)

def test_recursive_copy_and_compress(self):
copy_r(os.path.join(test_dir, "cpr_src"), os.path.join(test_dir, "cpr_dst"))
assert os.path.exists(os.path.join(test_dir, "cpr_dst", "test"))
assert os.path.exists(os.path.join(test_dir, "cpr_dst", "sub", "testr"))

compress_dir(os.path.join(test_dir, "cpr_src"))
assert os.path.exists(os.path.join(test_dir, "cpr_src", "test.gz"))
assert os.path.exists(os.path.join(test_dir, "cpr_src", "sub", "testr.gz"))

decompress_dir(os.path.join(test_dir, "cpr_src"))
assert os.path.exists(os.path.join(test_dir, "cpr_src", "test"))
assert os.path.exists(os.path.join(test_dir, "cpr_src", "sub", "testr"))
with open(os.path.join(test_dir, "cpr_src", "test"), encoding="utf-8") as f:
copy_r(os.path.join(TEST_DIR, "cpr_src"), os.path.join(TEST_DIR, "cpr_dst"))
assert os.path.exists(os.path.join(TEST_DIR, "cpr_dst", "test"))
assert os.path.exists(os.path.join(TEST_DIR, "cpr_dst", "sub", "testr"))

compress_dir(os.path.join(TEST_DIR, "cpr_src"))
assert os.path.exists(os.path.join(TEST_DIR, "cpr_src", "test.gz"))
assert os.path.exists(os.path.join(TEST_DIR, "cpr_src", "sub", "testr.gz"))

decompress_dir(os.path.join(TEST_DIR, "cpr_src"))
assert os.path.exists(os.path.join(TEST_DIR, "cpr_src", "test"))
assert os.path.exists(os.path.join(TEST_DIR, "cpr_src", "sub", "testr"))
with open(os.path.join(TEST_DIR, "cpr_src", "test"), encoding="utf-8") as f:
txt = f.read()
assert txt == "what"

def test_pathlib(self):
test_path = Path(test_dir)
test_path = Path(TEST_DIR)
copy_r(test_path / "cpr_src", test_path / "cpr_dst")
assert (test_path / "cpr_dst" / "test").exists()
assert (test_path / "cpr_dst" / "sub" / "testr").exists()

def teardown_method(self):
shutil.rmtree(os.path.join(test_dir, "cpr_src"))
shutil.rmtree(os.path.join(test_dir, "cpr_dst"))
shutil.rmtree(os.path.join(TEST_DIR, "cpr_src"))
shutil.rmtree(os.path.join(TEST_DIR, "cpr_dst"))


class TestCompressFileDir:
def setup_method(self):
with open(os.path.join(test_dir, "tempfile"), "w", encoding="utf-8") as f:
with open(os.path.join(TEST_DIR, "tempfile"), "w", encoding="utf-8") as f:
f.write("hello world")

def test_compress_and_decompress_file(self):
fname = os.path.join(test_dir, "tempfile")
fname = os.path.join(TEST_DIR, "tempfile")

for fmt in ["gz", "bz2"]:
compress_file(fname, fmt)
Expand All @@ -97,8 +97,8 @@ def test_compress_and_decompress_file(self):
assert decompress_file("non-existent.bz2") is None

def test_compress_and_decompress_with_target_dir(self):
fname = os.path.join(test_dir, "tempfile")
target_dir = os.path.join(test_dir, "temp_target_dir")
fname = os.path.join(TEST_DIR, "tempfile")
target_dir = os.path.join(TEST_DIR, "temp_target_dir")

for fmt in ["gz", "bz2"]:
compress_file(fname, fmt, target_dir)
Expand All @@ -121,22 +121,22 @@ def test_compress_and_decompress_with_target_dir(self):
assert f.read() == "hello world"

def teardown_method(self):
os.remove(os.path.join(test_dir, "tempfile"))
os.remove(os.path.join(TEST_DIR, "tempfile"))


class TestGzipDir:
def setup_method(self):
os.mkdir(os.path.join(test_dir, "gzip_dir"))
os.mkdir(os.path.join(TEST_DIR, "gzip_dir"))
with open(
os.path.join(test_dir, "gzip_dir", "tempfile"), "w", encoding="utf-8"
os.path.join(TEST_DIR, "gzip_dir", "tempfile"), "w", encoding="utf-8"
) as f:
f.write("what")

self.mtime = os.path.getmtime(os.path.join(test_dir, "gzip_dir", "tempfile"))
self.mtime = os.path.getmtime(os.path.join(TEST_DIR, "gzip_dir", "tempfile"))

def test_gzip_dir(self):
full_f = os.path.join(test_dir, "gzip_dir", "tempfile")
gzip_dir(os.path.join(test_dir, "gzip_dir"))
full_f = os.path.join(TEST_DIR, "gzip_dir", "tempfile")
gzip_dir(os.path.join(TEST_DIR, "gzip_dir"))

assert os.path.exists(f"{full_f}.gz")
assert not os.path.exists(full_f)
Expand All @@ -148,7 +148,7 @@ def test_gzip_dir(self):

def test_gzip_dir_file_coexist(self):
"""Test case where both file and file.gz exist."""
full_f = os.path.join(test_dir, "gzip_dir", "temptestfile")
full_f = os.path.join(TEST_DIR, "gzip_dir", "temptestfile")
gz_f = f"{full_f}.gz"

# Create both the file and its gzipped version
Expand All @@ -160,7 +160,7 @@ def test_gzip_dir_file_coexist(self):
with pytest.warns(
UserWarning, match="Both temptestfile and temptestfile.gz exist."
):
gzip_dir(os.path.join(test_dir, "gzip_dir"))
gzip_dir(os.path.join(TEST_DIR, "gzip_dir"))

# Verify contents of the files
with open(full_f, "r", encoding="utf-8") as f:
Expand All @@ -170,13 +170,13 @@ def test_gzip_dir_file_coexist(self):
assert g.read() == b"gzipped"

def test_handle_sub_dirs(self):
sub_dir = os.path.join(test_dir, "gzip_dir", "sub_dir")
sub_dir = os.path.join(TEST_DIR, "gzip_dir", "sub_dir")
sub_file = os.path.join(sub_dir, "new_tempfile")
os.mkdir(sub_dir)
with open(sub_file, "w", encoding="utf-8") as f:
f.write("anotherwhat")

gzip_dir(os.path.join(test_dir, "gzip_dir"))
gzip_dir(os.path.join(TEST_DIR, "gzip_dir"))

assert os.path.exists(f"{sub_file}.gz")
assert not os.path.exists(sub_file)
Expand All @@ -185,31 +185,31 @@ def test_handle_sub_dirs(self):
assert g.readline().decode("utf-8") == "anotherwhat"

def teardown_method(self):
shutil.rmtree(os.path.join(test_dir, "gzip_dir"))
shutil.rmtree(os.path.join(TEST_DIR, "gzip_dir"))


class TestRemove:
@unittest.skipIf(platform.system() == "Windows", "Skip on windows")
def test_remove_file(self):
tempdir = tempfile.mkdtemp(dir=test_dir)
tempdir = tempfile.mkdtemp(dir=TEST_DIR)
tempf = tempfile.mkstemp(dir=tempdir)[1]
remove(tempf)
assert not os.path.isfile(tempf)
shutil.rmtree(tempdir)

@unittest.skipIf(platform.system() == "Windows", "Skip on windows")
def test_remove_folder(self):
tempdir = tempfile.mkdtemp(dir=test_dir)
tempdir = tempfile.mkdtemp(dir=TEST_DIR)
remove(tempdir)
assert not os.path.isdir(tempdir)

@unittest.skipIf(platform.system() == "Windows", "Skip on windows")
def test_remove_symlink(self):
tempdir = tempfile.mkdtemp(dir=test_dir)
tempdir = tempfile.mkdtemp(dir=TEST_DIR)
tempf = tempfile.mkstemp(dir=tempdir)[1]

os.symlink(tempdir, os.path.join(test_dir, "temp_link"))
templink = os.path.join(test_dir, "temp_link")
os.symlink(tempdir, os.path.join(TEST_DIR, "temp_link"))
templink = os.path.join(TEST_DIR, "temp_link")
remove(templink)
assert os.path.isfile(tempf)
assert os.path.isdir(tempdir)
Expand All @@ -218,11 +218,11 @@ def test_remove_symlink(self):

@unittest.skipIf(platform.system() == "Windows", "Skip on windows")
def test_remove_symlink_follow(self):
tempdir = tempfile.mkdtemp(dir=test_dir)
tempdir = tempfile.mkdtemp(dir=TEST_DIR)
tempf = tempfile.mkstemp(dir=tempdir)[1]

os.symlink(tempdir, os.path.join(test_dir, "temp_link"))
templink = os.path.join(test_dir, "temp_link")
os.symlink(tempdir, os.path.join(TEST_DIR, "temp_link"))
templink = os.path.join(TEST_DIR, "temp_link")
remove(templink, follow_symlink=True)
assert not os.path.isfile(tempf)
assert not os.path.isdir(tempdir)
Expand Down
5 changes: 3 additions & 2 deletions tests/test_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ def test_command():
sleep05 = Command("sleep 0.5")

sleep05.run(timeout=1)
print(sleep05)
assert sleep05.retcode == 0
# DEBUG: this unit test fail in Windows CI intermittently (PR702)
full_msg = f"{sleep05=}\n{sleep05.error=}\n{sleep05.output}"
assert sleep05.retcode == 0, full_msg
assert not sleep05.killed

sleep05.run(timeout=0.1)
Expand Down

0 comments on commit 802692f

Please sign in to comment.