Skip to content

Commit

Permalink
Replace dictzip with idzip [2] #518 by @bergentroll
Browse files Browse the repository at this point in the history
  • Loading branch information
ilius committed Nov 19, 2023
1 parent 8116a54 commit 15143fc
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 54 deletions.
5 changes: 3 additions & 2 deletions pyglossary/core_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ def emit(self, record: logging.LogRecord) -> None:
else:
self.recordsByLevel[level] = [record]

def popLog(self, level: int, msg: str) -> "logging.LogRecord | None":
def popLog(self, level: int, msg: str, partial=False) -> "logging.LogRecord | None":
if level not in self.recordsByLevel:
return None
records = self.recordsByLevel[level]
for index, record in enumerate(records):
if record.getMessage() == msg:
rec_msg = record.getMessage()
if msg == rec_msg or (msg in rec_msg and partial):
return records.pop(index)
return None

Expand Down
44 changes: 23 additions & 21 deletions pyglossary/os_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,32 +87,34 @@ def _dictzip(filename: str) -> bool:
dictzipCmd = shutil.which("dictzip")
if not dictzipCmd:
return False
b_out, b_err = subprocess.Popen(
[dictzipCmd, filename],
stdout=subprocess.PIPE).communicate()
log.debug(f"dictzip command: {dictzipCmd!r}")
if b_err:
err = b_err.decode("utf-8").replace('\n', ' ')
log.error(f"dictzip error: {err}")
if b_out:
out = b_out.decode("utf-8").replace('\n', ' ')
log.error(f"dictzip error: {out}")
try:
subprocess.run(
[dictzipCmd, filename],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
except subprocess.CalledProcessError as proc_err:
err_msg = proc_err.output.decode("utf-8").replace("\n", ";")
retcode = proc_err.returncode
log.error(f"dictzip exit {retcode}: {err_msg}")
return True


def _nozip(filename: str) -> bool:
log.warning(
"Dictzip compression requires idzip module or dictzip utility,"
f" run `{core.pip} install python-idzip` to install or make sure"
" dictzip is in your $PATH")
return False


def runDictzip(filename: str) -> None:
def runDictzip(filename: str | Path, method="") -> None:
"""Compress file into dictzip format."""
for fun in (_idzip, _dictzip, _nozip):
if fun(filename):
return
res = None
if method in ["", "idzip"]:
res = _idzip(filename)
if not res and method in ["", "dictzip"]:
res = _dictzip(filename)
if not res:
log.warning(
"Dictzip compression requires idzip module or dictzip utility,"
f" run `{core.pip} install python-idzip` to install or make sure"
" dictzip is in your $PATH",
)


def _rmtreeError(
Expand Down
92 changes: 62 additions & 30 deletions tests/dictzip_test.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import gzip
import tempfile
import logging
from pathlib import Path

import idzip as _idzip # noqa: F401
from glossary_errors_test import TestGlossaryErrorsBase
from glossary_v2_errors_test import TestGlossaryErrorsBase

from pyglossary.os_utils import runDictzip

Expand All @@ -15,35 +14,68 @@
fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in
culpa qui officia deserunt mollit anim id est laborum.
"""
MISSING_DEP_MARK = "Dictzip compression requires idzip module or dictzip utility,"


# TODO Test also dictzip func
# TODO Check if dictzip in GH Action and avoid if not
class AsciiLowerUpperTest(TestGlossaryErrorsBase):
def make_dz(self, path: Path) -> Path:
"""Get path of dzipped file contains TEXT."""
test_file_path = Path(path)/"test_file.txt"
result_file_path = test_file_path.parent/(test_file_path.name + ".dz")
with open(test_file_path, "a") as tmp_file:
class TestDictzip(TestGlossaryErrorsBase):
def setUp(self) -> None:
super().setUp()
self.test_file_path = Path(self.tempDir)/"test_file.txt"
filename = self.test_file_path.name + ".dz"
self.result_file_path = self.test_file_path.parent/filename
with open(self.test_file_path, "a") as tmp_file:
tmp_file.write(TEXT)
runDictzip(str(test_file_path))
return result_file_path

def test_compressed_exists(self):
with tempfile.TemporaryDirectory() as tmp_dir:
result_file_path = self.make_dz(tmp_dir)
self.assertTrue(result_file_path.exists())
self.assertTrue(result_file_path.is_file())

def test_compressed_matches(self):
with tempfile.TemporaryDirectory() as tmp_dir:
result_file_path = self.make_dz(tmp_dir)
with gzip.open(result_file_path, 'r') as file:
result = file.read().decode()

def skip_on_dep(self, method: str) -> None:
warn = self.mockLog.popLog(logging.WARNING, MISSING_DEP_MARK, partial=True)
if warn:
self.skipTest(f"Missing {method} dependency")

def test_idzip_compressed_exists(self) -> None:
method="idzip"
runDictzip(self.test_file_path, method)
self.skip_on_dep(method)
self.assertTrue(self.result_file_path.exists())
self.assertTrue(self.result_file_path.is_file())

def test_idzip_compressed_matches(self) -> None:
method="idzip"
runDictzip(self.test_file_path, method)
self.skip_on_dep(method)
with gzip.open(self.result_file_path, "r") as file:
result = file.read().decode()
self.assertEqual(result, TEXT)

def test_dictzip_compressed_exists(self) -> None:
method="dictzip"
runDictzip(self.test_file_path, method)
self.skip_on_dep(method)
self.assertTrue(self.result_file_path.exists())
self.assertTrue(self.result_file_path.is_file())

def test_dictzip_compressed_matches(self) -> None:
method="dictzip"
runDictzip(self.test_file_path, method)
self.skip_on_dep(method)
with gzip.open(self.result_file_path, "r") as file:
result = file.read().decode()
self.assertEqual(result, TEXT)

def test_missing_target(self):
filename = "/NOT_EXISTING_PATH/file.txt"
expected_msg = f"[Errno 2] No such file or directory: '{filename}'"
runDictzip(filename)
self.assertLogError(expected_msg)
def test_dictzip_missing_target(self) -> None:
method="idzip"
filename = "/NOT_EXISTED_PATH/file.txt"
expected = f"No such file or directory: '{filename}'"
runDictzip(filename, method)
self.skip_on_dep(method)
err = self.mockLog.popLog(logging.ERROR, expected, partial=True)
self.assertIsNotNone(err)

def test_idzip_missing_target(self) -> None:
method="dictzip"
filename = "/NOT_EXISTED_PATH/boilerplate.txt"
expected = f'Cannot open "{filename}"'
runDictzip(filename, method)
self.skip_on_dep(method)
err = self.mockLog.popLog(logging.ERROR, expected, partial=True)
self.assertIsNotNone(err)

2 changes: 1 addition & 1 deletion tests/g_aard2_slob_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def __init__(self, *args, **kwargs):
"100-en-fa-res.slob": "0216d006",
"100-en-fa-res-slob.txt": "c73100b3",
"100-en-fa-res-slob-sort.txt": "8253fe96",
"300-ru-en.txt": "77cfee2f",
"300-ru-en.txt": "77cfee2f",
})

def setUp(self):
Expand Down

0 comments on commit 15143fc

Please sign in to comment.