Skip to content

Commit

Permalink
Implemented sub-functions in jupynbm towards resolving error of exist…
Browse files Browse the repository at this point in the history
…ing notebook. Need to test with full command line. Started adding <output> subsection to text notebook in test_utils.text2nb. Added this to nb2text_with_output. Need to add it to nb2text after testing.
  • Loading branch information
JaumeAmoresDS committed Sep 15, 2024
1 parent 43d9e09 commit cbd19e5
Show file tree
Hide file tree
Showing 3 changed files with 370 additions and 25 deletions.
109 changes: 92 additions & 17 deletions nbmodular/jupynbm.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,25 @@

# %% auto 0
__all__ = [
"export_jupytext_modules",
"jupynbm",
"parse_argv_and_run_jupynbm",
"jupynbm_export_cli",
"import_jupytext_modules",
"nbmjupy",
"parse_argv_and_run_nbmjupy",
"nbmjupy_import_cli",
]

# %% ../nbs/jupynbm.ipynb 2
# Standard
import argparse
from logging import warn
import sys
import shutil
from pathlib import Path
from typing import List
from typing import List, Tuple
import os
import glob
import warnings

# 3rd party
from jupytext.cli import jupytext
Expand All @@ -29,21 +31,28 @@


# %% ../nbs/jupynbm.ipynb 4
def export_jupytext_modules(jupytext_path: str, nbm_path: str) -> None:
def jupynbm(jupytext_path: str, nbm_path: str) -> None:
"""
Export jupytext modules to nbmodular notebooks
"""
current_path = os.getcwd()
os.chdir(jupytext_path)

# move notebooks to jupytext_path
migrate_files(nbm_path, jupytext_path, ".ipynb", ".py")

# update the notebooks in jupytext_path
os.chdir(jupytext_path)
jupytext(
"--set-formats ipynb,py:percent --format-options comment_magics=false *.py".split()
)
os.chdir(current_path)

for f in glob.glob(f"{jupytext_path}/*.ipynb"):
shutil.move(f, nbm_path)
# move updated notebooks to nbm_path
migrate_files(jupytext_path, nbm_path, ".ipynb", ".py")

# update:
# - the doc files: nbm_path => nbs_path
# - the py files: nbm_path => lib_path
nbm_export_all_paths(nbm_path)


Expand Down Expand Up @@ -71,33 +80,99 @@ def parse_argv_and_run_jupynbm(argv: List[str]):
if args.nbm is None:
args.nbm = str(Path(get_config()["nbm_path"]).resolve())
logger.info(f"Exporting jupytext python modules from {args.jupytext} to {args.nbm}")
export_jupytext_modules(args.jupytext, args.nbm)
jupynbm(args.jupytext, args.nbm)


def jupynbm_export_cli():
parse_argv_and_run_jupynbm(sys.argv[1:])


def srcpaths_in_dst(
src_mfe_files: List[Path],
src_path: str,
dst_path: str,
alternative_suffix: str,
) -> Tuple[List[Path], List[Path]]:

dst_mfe_files = []
dst_ae_files = []
for src_mfe_file in src_mfe_files:
dst_relative_path = src_mfe_file.relative_to(src_path)
dst_mfe_file = Path(dst_path) / dst_relative_path
dst_mfe_files.append(dst_mfe_file)
dst_ae_file = dst_mfe_file.with_suffix(f"{alternative_suffix}")
dst_ae_files.append(dst_ae_file)
return dst_mfe_files, dst_ae_files


def rename_mfe_files(src_mfe_files, dst_mfe_files, dst_ae_files):
for dst_ae_file, dst_mfe_file, src_mfe_file in zip(
dst_ae_files, dst_mfe_files, src_mfe_files
):
if not dst_ae_file.exists():
warnings.warn(f"{dst_ae_file} does not exist, but {src_mfe_file} does.")
dst_mfe_file.parent.mkdir(parents=True, exist_ok=True)
src_mfe_file.rename(dst_mfe_file)


def migrate_files(src_path, dst_path, mf_suffix, alternative_suffix):
"""
src can be nbm_path
dst can be jupytext_path
mf_suffix can be .ipynb
alternative_suffix can be .py
Parameters
----------
src_path : _type_
_description_
dst_path : _type_
_description_
mf_suffix : _type_
_description_
alternative_suffix : _type_
_description_
"""
# gather updated list of notebook files
src_mfe_files = list(Path(src_path).rglob(f"*{mf_suffix}"))
dst_mfe_files = list(Path(dst_path).rglob(f"*{mf_suffix}"))
if len(dst_mfe_files) > 0:
raise FileExistsError(
f"There are already files with extension {mf_suffix} in the destination path"
)

# generate corresponding file in dst_path
dst_mfe_files, dst_ae_files = srcpaths_in_dst(
src_mfe_files, src_path, dst_path, alternative_suffix
)

# move src mfe files to their corresponding folders in dst_path
rename_mfe_files(src_mfe_files, dst_mfe_files, dst_ae_files)


# %% ../nbs/jupynbm.ipynb 6
def import_jupytext_modules(nbm_path: str, jupytext_path: str) -> None:
def nbmjupy(nbm_path: str, jupytext_path: str) -> None:
"""
Export jupytext modules to nbmodular notebooks
nbm notebooks to jupytext .py files
Moves the notebooks .ipynb to jupytext_path before converting to .py
"""
current_path = os.getcwd()

# update the notebooks in nbm_path: lib_path => nbm_path
nbm_update_all_paths(nbm_path)

# move notebooks from nbm to jupytext folder
for f in glob.glob(f"{nbm_path}/*.ipynb"):
shutil.move(f, jupytext_path)
# move updated notebooks to jupytext_path
migrate_files(nbm_path, jupytext_path, ".ipynb", ".py")

# convert notebooks to .py files
os.chdir(jupytext_path)
jupytext(
"--set-formats ipynb,py:percent --format-options comment_magics=false *.ipynb".split()
)

# move notebooks from jupytext to nbm folder
for f in glob.glob(f"{jupytext_path}/*.ipynb"):
shutil.move(f, nbm_path)
# move the .ipynb files back to nbm_path
migrate_files(jupytext_path, nbm_path, ".ipynb", ".py")

os.chdir(current_path)

Expand Down Expand Up @@ -126,7 +201,7 @@ def parse_argv_and_run_nbmjupy(argv: List[str]):
if args.nbm is None:
args.nbm = str(Path(get_config()["nbm_path"]).resolve())
logger.info(f"Exporting nbm notebooks from {args.nbm} to {args.jupytext}")
import_jupytext_modules(args.nbm, args.jupytext)
nbmjupy(args.nbm, args.jupytext)


def nbmjupy_import_cli():
Expand Down
214 changes: 214 additions & 0 deletions nbmodular/test_jupynbm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
# %% imports
from pathlib import Path
import shutil
import warnings
from unittest.mock import patch, MagicMock

import pytest
from nbmodular.jupynbm import srcpaths_in_dst, rename_mfe_files, migrate_files


# %%
# test_rename_mfe_files_triggers_warning
src_mfe_files = [
Path("/src/path/file1.ipynb"),
Path("/src/path/subdir/file2.ipynb"),
]
src_path = "/src/path"
dst_path = "/dst/path"
alternative_suffix = ".py"

expected_dst_mfe_files = [
Path("/dst/path/file1.ipynb"),
Path("/dst/path/subdir/file2.ipynb"),
]
expected_dst_ae_files = [
Path("/dst/path/file1.py"),
Path("/dst/path/subdir/file2.py"),
]

dst_mfe_files, dst_ae_files = srcpaths_in_dst(
src_mfe_files, src_path, dst_path, alternative_suffix
)

assert dst_mfe_files == expected_dst_mfe_files
assert dst_ae_files == expected_dst_ae_files


# %%
# def test_rename_mfe_files_triggers_warning():
tmp_path = Path("tmp")
src_path = tmp_path / "src"
dst_path = tmp_path / "dst"
src_path.mkdir(parents=True, exist_ok=True)
dst_path.mkdir(parents=True, exist_ok=True)

src_mfe_files = [
src_path / "file1.ipynb",
src_path / "subdir" / "file2.ipynb",
]
for file in src_mfe_files:
file.parent.mkdir(parents=True, exist_ok=True)
file.touch()

dst_mfe_files = [
dst_path / "file1.ipynb",
dst_path / "subdir" / "file2.ipynb",
]
dst_ae_files = [
dst_path / "file1.py",
dst_path / "subdir" / "file2.py",
]


# %%
# (cont.)
# def test_rename_mfe_files_triggers_warning():

with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
rename_mfe_files(src_mfe_files, dst_mfe_files, dst_ae_files)
assert len(w) == 2
assert issubclass(w[-1].category, UserWarning)
assert "does not exist" in str(w[-1].message)

for dst_mfe_file in dst_mfe_files:
assert dst_mfe_file.exists()

for src_mfe_file in src_mfe_files:
assert not src_mfe_file.exists()


# %%
# (cont.)
# def test_rename_mfe_files_triggers_warning():
shutil.rmtree(tmp_path)


# %% test_rename_mfe_files2


@patch("pathlib.Path.rename")
@patch("pathlib.Path.exists", MagicMock(return_value=False))
@patch("pathlib.Path.mkdir")
def test_rename_mfe_files2(mock_mkdir, mock_rename):
src_mfe_files = [
Path("/src/path/file1.ipynb"),
Path("/src/path/subdir/file2.ipynb"),
]
dst_mfe_files = [
Path("/dst/path/file1.ipynb"),
Path("/dst/path/subdir/file2.ipynb"),
]
dst_ae_files = [
Path("/dst/path/file1.py"),
Path("/dst/path/subdir/file2.py"),
]

rename_mfe_files(src_mfe_files, dst_mfe_files, dst_ae_files)

assert mock_rename.call_count == 2
assert mock_mkdir.call_count == 2
mock_mkdir.assert_any_call(parents=True, exist_ok=True)


# %%
test_rename_mfe_files2()


# %%
# def test_rename_mfe_files ():
tmp_path = Path("tmp")
src_path = tmp_path / "src"
dst_path = tmp_path / "dst"
src_path.mkdir(parents=True, exist_ok=True)
dst_path.mkdir(parents=True, exist_ok=True)

src_mfe_files = [
src_path / "file1.ipynb",
src_path / "subdir" / "file2.ipynb",
]
dst_ae_files = [
dst_path / "file1.py",
dst_path / "subdir" / "file2.py",
]
for src_mfe_file, dst_ae_file in zip(src_mfe_files, dst_ae_files):
src_mfe_file.parent.mkdir(parents=True, exist_ok=True)
dst_ae_file.parent.mkdir(parents=True, exist_ok=True)
src_mfe_file.touch()
dst_ae_file.touch()


dst_mfe_files = [
dst_path / "file1.ipynb",
dst_path / "subdir" / "file2.ipynb",
]

with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
rename_mfe_files(src_mfe_files, dst_mfe_files, dst_ae_files)
assert len(w) == 0

shutil.rmtree(tmp_path)

# %%
# def test_migrate_files ():
tmp_path = Path("tmp")
src_path = tmp_path / "src"
dst_path = tmp_path / "dst"
src_path.mkdir(parents=True, exist_ok=True)
dst_path.mkdir(parents=True, exist_ok=True)

src_mfe_files = [
src_path / "file1.ipynb",
src_path / "subdir" / "file2.ipynb",
]
for file in src_mfe_files:
file.parent.mkdir(parents=True, exist_ok=True)
file.touch()

migrate_files(str(src_path), str(dst_path), ".ipynb", ".py")

dst_mfe_files = [
dst_path / "file1.ipynb",
dst_path / "subdir" / "file2.ipynb",
]
for dst_mfe_file in dst_mfe_files:
assert dst_mfe_file.exists()

for src_mfe_file in src_mfe_files:
assert not src_mfe_file.exists()

shutil.rmtree(tmp_path)


# %%
# def test_migrate_files_with_existing_files():
tmp_path = Path("tmp")
src_path = tmp_path / "src"
dst_path = tmp_path / "dst"
src_path.mkdir(parents=True, exist_ok=True)
dst_path.mkdir(parents=True, exist_ok=True)

src_mfe_files = [
src_path / "file1.ipynb",
src_path / "subdir" / "file2.ipynb",
]
for file in src_mfe_files:
file.parent.mkdir(parents=True, exist_ok=True)
file.touch()

dst_mfe_files = [
dst_path / "file1.ipynb",
dst_path / "subdir" / "file2.ipynb",
]
for file in dst_mfe_files:
file.parent.mkdir(parents=True, exist_ok=True)
file.touch()

with pytest.raises(FileExistsError):
migrate_files(str(src_path), str(dst_path), ".ipynb", ".py")

shutil.rmtree(tmp_path)

# %%
Loading

0 comments on commit cbd19e5

Please sign in to comment.