Skip to content

Commit

Permalink
feat: substitute macro variables (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
Snoopy1866 authored Dec 26, 2024
1 parent 9ca5326 commit 8fb041f
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 23 deletions.
2 changes: 1 addition & 1 deletion python/submit/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "submit"
version = "0.2.0"
version = "0.3.0"
description = "Extract code block which should be submitted to regulatory agency."
readme = "README.md"
requires-python = ">=3.10"
Expand Down
69 changes: 51 additions & 18 deletions python/submit/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
COMMENT_NOT_SUBMIT_NEGIN: str = rf"\/\*{SYLBOMS}*NOT\s*SUBMIT\s*BEGIN{SYLBOMS}*\*\/"
COMMENT_NOT_SUBMIT_END: str = rf"\/\*{SYLBOMS}*NOT\s*SUBMIT\s*END{SYLBOMS}*\*\/"

# 宏变量
# 仅支持一层宏变量引用(例如:&id),不支持嵌套宏变量引用(例如:&&id、&&&id)
MACRO_VAR = r"(?<!&)(&[A-Za-z_][A-Za-z0_9_]*)"


class ConvertMode(IntFlag):
"""转换模式。"""
Expand Down Expand Up @@ -53,14 +57,19 @@ def get_available_values(cls) -> list[ConvertMode]:


def copy_file(
sas_file: str, txt_file: str, convert_mode: ConvertMode = ConvertMode.BOTH, encoding: str | None = None
sas_file: str,
txt_file: str,
convert_mode: ConvertMode = ConvertMode.BOTH,
macro_subs: dict[str, str] | None = None,
encoding: str | None = None,
) -> None:
"""将 SAS 代码复制到 txt 文件中,并移除指定标记之间的内容。
Args:
sas_file (str): SAS 文件路径。
txt_file (str): TXT 文件路径。
convert_mode (ConvertMode, optional): 转换模式,默认值为 ConvertMode.BOTH。
macro_subs (dict[str, str] | None, optional): 一个字典,其键为 SAS 代码中的宏变量名称,值为替代的字符串,默认值为 None。
encoding (str | None, optional): 字符编码,默认值为 None,将自动检测编码。
"""

Expand All @@ -69,35 +78,36 @@ def copy_file(
encoding = detect(f.read())["encoding"]

with open(sas_file, "r", encoding=encoding) as f:
sas_code = f.read()
code = f.read()

# 提取代码片段
if convert_mode & ConvertMode.NEGATIVE:
# 移除不需要递交的代码片段
sas_code = re.sub(
rf"{COMMENT_NOT_SUBMIT_NEGIN}.*?{COMMENT_NOT_SUBMIT_END}",
"",
sas_code,
flags=re.I | re.S,
)
code = re.sub(rf"{COMMENT_NOT_SUBMIT_NEGIN}.*?{COMMENT_NOT_SUBMIT_END}", "", code, flags=re.I | re.S)

if convert_mode & ConvertMode.POSITIVE:
# 提取需要递交的代码片段
sas_code = re.findall(rf"{COMMENT_SUBMIT_BEGIN}(.*?){COMMENT_SUBMIT_END}", sas_code, re.I | re.S)
sas_code = "".join(sas_code)

txt_code = sas_code

txt_code_dir = os.path.dirname(txt_file)
if not os.path.exists(txt_code_dir):
os.makedirs(txt_code_dir)
code = re.findall(rf"{COMMENT_SUBMIT_BEGIN}(.*?){COMMENT_SUBMIT_END}", code, re.I | re.S)
code = "".join(code)

# 替换宏变量
if macro_subs is not None:
for key, value in macro_subs.items():
regex_macro = re.compile(rf"(?<!&)&{key}")
code = re.sub(regex_macro, value, code)

txt_file_dir = os.path.dirname(txt_file)
if not os.path.exists(txt_file_dir):
os.makedirs(txt_file_dir)
with open(txt_file, "w", encoding=encoding) as f:
f.write(txt_code)
f.write(code)


def copy_directory(
sas_dir: str,
txt_dir: str,
convert_mode: ConvertMode = ConvertMode.BOTH,
macro_subs: dict[str, str] | None = None,
exclude_files: list[str] = None,
exclude_dirs: list[str] = None,
encoding: str | None = None,
Expand All @@ -108,6 +118,7 @@ def copy_directory(
sas_dir (str): SAS 文件夹路径。
txt_dir (str): TXT 文件夹路径。
convert_mode (ConvertMode, optional): 转换模式,默认值为 ConvertMode.BOTH。
macro_subs (dict[str, str] | None, optional): 一个字典,其键为 SAS 代码中的宏变量名称,值为替代的字符串,默认值为 None。
exclude_files (list[str], optional): 排除文件列表,默认值为 None。
exclude_dirs (list[str], optional): 排除目录列表,默认值为 None。
encoding (str | None, optional): 字符编码,默认值为 None,将自动检测编码。
Expand All @@ -129,7 +140,24 @@ def copy_directory(
if file.endswith(".sas"):
sas_file = os.path.join(dirpath, file)
txt_file = os.path.join(txt_dir, ref_path, file.replace(".sas", ".txt"))
copy_file(sas_file, txt_file, convert_mode=convert_mode, encoding=encoding)
copy_file(sas_file, txt_file, convert_mode=convert_mode, macro_subs=macro_subs, encoding=encoding)


def parse_dict(arg: str) -> dict[str, str]:
"""解析字典字符串。
Args:
arg (str): 字典字符串。
Returns:
dict[str, str]: 字典。
"""

arg = arg.strip("{}")
try:
return dict([ele.strip("\"'") for ele in item.split("=")] for item in arg.split(","))
except ValueError:
raise argparse.ArgumentTypeError("无效的字典字符串")


def main() -> None:
Expand All @@ -155,6 +183,9 @@ def main() -> None:
default="both",
help="转换模式(默认 both)",
)
parent_parser.add_argument(
"--macro-subs", type=parse_dict, help="宏变量替换,格式为 {key1=value1,key2=value2}(默认无)"
)
parent_parser.add_argument("--encoding", default=None, help="编码格式(默认自动检测)")

# 子命令 copyfile
Expand All @@ -176,13 +207,15 @@ def main() -> None:
sas_file=args.sas_file,
txt_file=args.txt_file,
convert_mode=args.convert_mode,
macro_subs=args.macro_subs,
encoding=args.encoding,
)
elif args.command == "copydir":
copy_directory(
sas_dir=args.sas_dir,
txt_dir=args.txt_dir,
convert_mode=args.convert_mode,
macro_subs=args.macro_subs,
exclude_files=args.exclude_files,
exclude_dirs=args.exclude_dirs,
encoding=args.encoding,
Expand Down
6 changes: 4 additions & 2 deletions python/submit/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ def shared_test_directory(tmp_path_factory: pytest.TempPathFactory) -> Path:
proc datasets library = work memtype = data kill noprint;
quit;
%let id = %str();
/*====SUBMIT BEGIN====*/
proc sql;
create table t2 as select * from adam.adae;
create table t2 as select * from adam.adeff&id;
quit;
/*====SUBMIT END====*/
Expand Down Expand Up @@ -175,7 +177,7 @@ def shared_validate_directory(tmp_path_factory: pytest.TempPathFactory) -> Path:
""")
(dir_tfl / "t2.txt").write_text("""
proc sql;
create table t2 as select * from adam.adae;
create table t2 as select * from adam.adeff;
quit;
""")
(dir_tfl / "t3.txt").write_text("""
Expand Down
4 changes: 3 additions & 1 deletion python/submit/tests/test_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ def test_copy_file(self, shared_test_directory: Path, shared_validate_directory:
assert re.sub(r"\s*", "", tmp_code) == re.sub(r"\s*", "", validate_code)

def test_copy_directory(self, shared_test_directory: Path, shared_validate_directory: Path, tmp_path: Path):
copy_directory(shared_test_directory, tmp_path, exclude_dirs=["other"], exclude_files=["fcmp.sas"])
copy_directory(
shared_test_directory, tmp_path, exclude_dirs=["other"], exclude_files=["fcmp.sas"], macro_subs={"id": ""}
)
copy_directory(shared_test_directory / "macro", tmp_path / "macro", convert_mode=ConvertMode.NEGATIVE)

for validate_file in shared_validate_directory.rglob("*.txt"):
Expand Down
2 changes: 1 addition & 1 deletion python/submit/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 8fb041f

Please sign in to comment.