From 17297062fb37f1d46509faecbc9c39f0d2a55d59 Mon Sep 17 00:00:00 2001 From: Nicolas Pennequin Date: Tue, 26 Nov 2024 23:28:00 +0100 Subject: [PATCH] filesystem.resolve_filesystem_and_path: Fix host for HDFS Using `fsspec.url_to_fs` resulted in using an incorrect host value for instantiating HDFS filesystems. For example `fsspec.url_to_fs("viewfs://root/user/someone")` would call `fsspec.filesystem("viewfs", host="root")`, which could cause errors. Instead in this case we need the host to be `viewfs://root`, so we restore most of the code of that function from before #128. However since we're using fsspec we can generalize to all its supported file system implementations. --- cluster_pack/filesystem.py | 26 +++++++++++++++++++++----- tests/test_filesystem.py | 23 +++++++++++++++++++++++ 2 files changed, 44 insertions(+), 5 deletions(-) diff --git a/cluster_pack/filesystem.py b/cluster_pack/filesystem.py index a03ab33..2c5b068 100644 --- a/cluster_pack/filesystem.py +++ b/cluster_pack/filesystem.py @@ -1,16 +1,32 @@ import logging from typing import Tuple, Any +from urllib.parse import urlparse import fsspec -import fsspec.implementations.arrow - -# Support viewfs:// protocol for HDFS -fsspec.register_implementation("viewfs", fsspec.implementations.arrow.HadoopFileSystem) _logger = logging.getLogger(__name__) def resolve_filesystem_and_path(uri: str, **kwargs: Any) -> Tuple[fsspec.AbstractFileSystem, str]: - fs, fs_path = fsspec.url_to_fs(uri, **kwargs) + parsed_uri = urlparse(uri) + fs_path = parsed_uri.path + if parsed_uri.scheme == 'hdfs' or parsed_uri.scheme == 'viewfs': + netloc_split = parsed_uri.netloc.split(':') + host = netloc_split[0] + if host == '': + host = 'default' + else: + host = parsed_uri.scheme + "://" + host + port = 0 + if len(netloc_split) == 2 and netloc_split[1].isnumeric(): + port = int(netloc_split[1]) + + fs = fsspec.filesystem("hdfs", host=host, port=port, **kwargs) + elif parsed_uri.scheme == '': + # Input is local path such as /home/user/myfile.parquet + fs = fsspec.filesystem('file', **kwargs) + else: + fs = fsspec.filesystem(parsed_uri.scheme, **kwargs) + _logger.info(f"Resolved base filesystem: {type(fs)}") return fs, fs_path diff --git a/tests/test_filesystem.py b/tests/test_filesystem.py index 068d46a..ae70806 100644 --- a/tests/test_filesystem.py +++ b/tests/test_filesystem.py @@ -1,4 +1,6 @@ import os +from unittest import mock + import pytest import subprocess import tempfile @@ -141,3 +143,24 @@ def test_put(): assert os.path.exists(remote_file) assert os.stat(remote_file).st_mode & 0o777 == 0o755 + + +@pytest.mark.parametrize( + "uri,expected_protocol,expected_kwargs,expected_path", + [ + ("viewfs:///path/", "hdfs", {"host": "default", "port": 0}, "/path/"), + ("viewfs://root/path/", "hdfs", {"host": "viewfs://root", "port": 0}, "/path/"), + ("viewfs://localhost:1234/path/", "hdfs", {"host": "viewfs://localhost", "port": 1234}, + "/path/"), + ("hdfs://root/path/", "hdfs", {"host": "hdfs://root", "port": 0}, "/path/"), + ("/path/", "file", {}, "/path/"), + ("file:///path/", "file", {}, "/path/"), + ] +) +def test_resolve_filesystem_and_path(uri, expected_protocol, expected_kwargs, expected_path): + with mock.patch("fsspec.filesystem") as fsspec_filesystem_mock: + fs, path = filesystem.resolve_filesystem_and_path(uri) + args, kwargs = fsspec_filesystem_mock.call_args + assert args == (expected_protocol,) + assert kwargs == expected_kwargs + assert path == expected_path