Skip to content

Commit

Permalink
filesystem.resolve_filesystem_and_path: Fix host for HDFS
Browse files Browse the repository at this point in the history
Using `fsspec.url_to_fs` resulted in using an incorrect host value
for instantiating HDFS filesystems.

For example `fsspec.url_to_fs("viewfs://root/user/someone")` would
call `fsspec.filesystem("viewfs", host="root")`, which could cause
errors. Instead in this case we need the host to be `viewfs://root`, so
we restore most of the code of that function from before #128.

However since we're using fsspec we can generalize to all its supported
file system implementations.
  • Loading branch information
npennequin committed Nov 27, 2024
1 parent 087d658 commit 1729706
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 5 deletions.
26 changes: 21 additions & 5 deletions cluster_pack/filesystem.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,32 @@
import logging
from typing import Tuple, Any
from urllib.parse import urlparse

import fsspec
import fsspec.implementations.arrow

# Support viewfs:// protocol for HDFS
fsspec.register_implementation("viewfs", fsspec.implementations.arrow.HadoopFileSystem)

_logger = logging.getLogger(__name__)


def resolve_filesystem_and_path(uri: str, **kwargs: Any) -> Tuple[fsspec.AbstractFileSystem, str]:
fs, fs_path = fsspec.url_to_fs(uri, **kwargs)
parsed_uri = urlparse(uri)
fs_path = parsed_uri.path
if parsed_uri.scheme == 'hdfs' or parsed_uri.scheme == 'viewfs':
netloc_split = parsed_uri.netloc.split(':')
host = netloc_split[0]
if host == '':
host = 'default'
else:
host = parsed_uri.scheme + "://" + host
port = 0
if len(netloc_split) == 2 and netloc_split[1].isnumeric():
port = int(netloc_split[1])

fs = fsspec.filesystem("hdfs", host=host, port=port, **kwargs)
elif parsed_uri.scheme == '':
# Input is local path such as /home/user/myfile.parquet
fs = fsspec.filesystem('file', **kwargs)
else:
fs = fsspec.filesystem(parsed_uri.scheme, **kwargs)

_logger.info(f"Resolved base filesystem: {type(fs)}")
return fs, fs_path
23 changes: 23 additions & 0 deletions tests/test_filesystem.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import os
from unittest import mock

import pytest
import subprocess
import tempfile
Expand Down Expand Up @@ -141,3 +143,24 @@ def test_put():

assert os.path.exists(remote_file)
assert os.stat(remote_file).st_mode & 0o777 == 0o755


@pytest.mark.parametrize(
"uri,expected_protocol,expected_kwargs,expected_path",
[
("viewfs:///path/", "hdfs", {"host": "default", "port": 0}, "/path/"),
("viewfs://root/path/", "hdfs", {"host": "viewfs://root", "port": 0}, "/path/"),
("viewfs://localhost:1234/path/", "hdfs", {"host": "viewfs://localhost", "port": 1234},
"/path/"),
("hdfs://root/path/", "hdfs", {"host": "hdfs://root", "port": 0}, "/path/"),
("/path/", "file", {}, "/path/"),
("file:///path/", "file", {}, "/path/"),
]
)
def test_resolve_filesystem_and_path(uri, expected_protocol, expected_kwargs, expected_path):
with mock.patch("fsspec.filesystem") as fsspec_filesystem_mock:
fs, path = filesystem.resolve_filesystem_and_path(uri)
args, kwargs = fsspec_filesystem_mock.call_args
assert args == (expected_protocol,)
assert kwargs == expected_kwargs
assert path == expected_path

0 comments on commit 1729706

Please sign in to comment.