From dbf61145c8c80fe700c33d04235d920b4b4bda48 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Thu, 26 Oct 2023 05:54:44 -0500 Subject: [PATCH] Support dd.read_parquet(..., filesystem="arrow") This asks Boto3 for the relevant information to populate the Arrow FileSystem object. I'm assuming here that Boto3 can robustly find these credentials, but someone with more experience should verify this. I'm testing locally by verifying that the following returns: ```python dd.read_parquet("s3://coiled-data/uber/", filesystem="arrow") ``` Happy to add that as a test here if preferred. --- dask/dataframe/io/parquet/arrow.py | 30 ++++++++++++++++++++++++------ dask/dataframe/io/parquet/core.py | 2 +- dask/dataframe/io/parquet/utils.py | 2 +- 3 files changed, 26 insertions(+), 8 deletions(-) diff --git a/dask/dataframe/io/parquet/arrow.py b/dask/dataframe/io/parquet/arrow.py index a1c237f30287..614e752e0258 100644 --- a/dask/dataframe/io/parquet/arrow.py +++ b/dask/dataframe/io/parquet/arrow.py @@ -10,6 +10,7 @@ import numpy as np import pandas as pd +import pyarrow import pyarrow as pa import pyarrow.parquet as pq @@ -17,7 +18,6 @@ from fsspec.core import expand_paths_if_needed, stringify_path from fsspec.implementations.arrow import ArrowFSWrapper from pyarrow import dataset as pa_ds -from pyarrow import fs as pa_fs import dask from dask.base import tokenize @@ -445,7 +445,7 @@ def extract_filesystem( fs = filesystem # Handle pyarrow-based filesystem - if isinstance(fs, pa_fs.FileSystem) or fs in ("arrow", "pyarrow"): + if isinstance(fs, pyarrow.fs.FileSystem) or fs in ("arrow", "pyarrow"): if isinstance(urlpath, (list, tuple, set)): if not urlpath: raise ValueError("empty urlpath sequence") @@ -454,12 +454,30 @@ def extract_filesystem( urlpath = [stringify_path(urlpath)] if fs in ("arrow", "pyarrow"): - fs = type(pa_fs.FileSystem.from_uri(urlpath[0])[0])( - **(storage_options or {}) - ) + if urlpath[0].startswith("s3://"): + bucket = urlpath[0][5:].split("/")[0] + import boto3 + + session = boto3.session.Session() + credentials = session.get_credentials() + region = session.client("s3").get_bucket_location(Bucket=bucket)[ + "LocationConstraint" + ] + fs = pyarrow.fs.S3FileSystem( + secret_key=credentials.secret_key, + access_key=credentials.access_key, + region=region, + session_token=credentials.token, + ) + else: + fs = type(pyarrow.fs.FileSystem.from_uri(urlpath[0])[0])( + **(storage_options or {}) + ) fsspec_fs = ArrowFSWrapper(fs) - if urlpath[0].startswith("C:") and isinstance(fs, pa_fs.LocalFileSystem): + if urlpath[0].startswith("C:") and isinstance( + fs, pyarrow.fs.LocalFileSystem + ): # ArrowFSWrapper._strip_protocol not reliable on windows # See: https://github.com/fsspec/filesystem_spec/issues/1137 from fsspec.implementations.local import LocalFileSystem diff --git a/dask/dataframe/io/parquet/core.py b/dask/dataframe/io/parquet/core.py index 584279e73823..ec153eea2f07 100644 --- a/dask/dataframe/io/parquet/core.py +++ b/dask/dataframe/io/parquet/core.py @@ -311,7 +311,7 @@ def read_parquet( automatically set ``split_row_groups`` to either 'adaptive' or ``False``. blocksize : int or str, default 'default' The desired size of each output ``DataFrame`` partition in terms of total - (uncompressed) parquet storage space. This argument is currenlty used to + (uncompressed) parquet storage space. This argument is currently used to set the default value of ``split_row_groups`` (using row-group metadata from a single file), and will be ignored if ``split_row_groups`` is not set to 'infer' or 'adaptive'. Default may be engine-dependant, but is diff --git a/dask/dataframe/io/parquet/utils.py b/dask/dataframe/io/parquet/utils.py index 1344902af03b..87c8cf7b81e4 100644 --- a/dask/dataframe/io/parquet/utils.py +++ b/dask/dataframe/io/parquet/utils.py @@ -174,7 +174,7 @@ def read_metadata( @classmethod def default_blocksize(cls): - return "128 MiB" + return "256 MiB" @classmethod def read_partition(