Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
Create PR for #70 This PR is add decorator and test code for create new folder if it dose not exist. ```python def create_folder_if_not_exists(func: Any) -> Callable[..., int]: """Decorator to create folder if it does not exist.""" def wrapper(*args: Any, **kwargs: Any) -> int: try: filepath = kwargs["filepath"] except KeyError: filepath = args[0] folder = os.path.dirname(filepath) if not os.path.exists(folder) and folder != "": os.makedirs(folder) return func(*args, **kwargs) return wrapper ``` The reason why the try except statement is used is because other methods that call the write_csv function sometimes pass the filepath into args and sometimes kwarg. usage ```python @create_folder_if_not_exists def write_csv(filepath: Path, records: List[dict], schema: dict, **kwargs: Any) -> int: """Write a CSV file.""" if "properties" not in schema: raise ValueError("Stream's schema has no properties defined.") keys: List[str] = list(schema["properties"].keys()) with open(filepath, "w", encoding="utf-8", newline="") as fp: writer = csv.DictWriter(fp, fieldnames=keys, dialect="excel", **kwargs) writer.writeheader() for record_count, record in enumerate(records, start=1): writer.writerow(record) return record_count ``` If there is a better way please let me know --------- Co-authored-by: Edgar Ramírez Mondragón <[email protected]>
- Loading branch information