-
Notifications
You must be signed in to change notification settings - Fork 127
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add support for s3 checkpoint: need a fix on check_path_is_local
- Loading branch information
Showing
8 changed files
with
642 additions
and
49 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from .fsspec import check_path_is_local, fs_copy, fs_open | ||
from .s3_mover import S3Mover | ||
|
||
__all__ = ["S3Mover", "fs_open", "fs_copy", "check_path_is_local"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
import contextlib | ||
from pathlib import Path | ||
from typing import Tuple, Union | ||
|
||
import fsspec | ||
from fsspec.implementations import local | ||
|
||
|
||
def get_filesystem_and_path(path: Path, storage_options=None) -> Tuple[fsspec.AbstractFileSystem, str]: | ||
# Use supported filesystems in `fsspec`. If you need another one, please use `fsspec.registry.register_implementation` | ||
# DO NOT USE `mode` argument as it adds a suffix `0.part` when using `mode="w"`. | ||
fs, _, paths = fsspec.core.get_fs_token_paths(str(path), storage_options=storage_options) | ||
assert len(paths) == 1 | ||
return fs, paths[0] | ||
|
||
|
||
@contextlib.contextmanager | ||
def fs_open( | ||
file: Union[str, Path], | ||
mode="r", | ||
): | ||
# TODO @thomasw21: pass storage options | ||
fs, path = get_filesystem_and_path(file) | ||
with fs.open(path, mode=mode) as f: | ||
yield f | ||
|
||
|
||
def fs_copy( | ||
input_file: Union[str, Path], | ||
output_file: Union[str, Path], | ||
): | ||
"""Copy file from input to output (possibly on s3/other fs)""" | ||
with fs_open(input_file, mode="rb") as fi, fs_open(output_file, mode="wb") as fo: | ||
fo.write(fi.read()) | ||
|
||
|
||
def check_path_is_local(path: Path, storage_options=None) -> bool: | ||
return isinstance(get_filesystem_and_path(path=path, storage_options=storage_options)[0], local.LocalFileSystem) |
Oops, something went wrong.