From 6aebbaa132c682951f8f29bd4c3a08e4355760f9 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Thu, 26 Oct 2023 05:54:44 -0500 Subject: [PATCH] Support dd.read_parquet(..., filesystem="arrow") This asks Boto3 for the relevant information to populate the Arrow FileSystem object. I'm assuming here that Boto3 can robustly find these credentials, but someone with more experience should verify this. I'm testing locally by verifying that the following returns: ```python dd.read_parquet("s3://coiled-data/uber/", filesystem="arrow") ``` Happy to add that as a test here if preferred. --- dask/dataframe/io/parquet/arrow.py | 30 ++++++++++++++++++++++++------ dask/dataframe/io/parquet/core.py | 2 +- 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/dask/dataframe/io/parquet/arrow.py b/dask/dataframe/io/parquet/arrow.py index a1c237f3028..614e752e025 100644 --- a/dask/dataframe/io/parquet/arrow.py +++ b/dask/dataframe/io/parquet/arrow.py @@ -10,6 +10,7 @@ import numpy as np import pandas as pd +import pyarrow import pyarrow as pa import pyarrow.parquet as pq @@ -17,7 +18,6 @@ from fsspec.core import expand_paths_if_needed, stringify_path from fsspec.implementations.arrow import ArrowFSWrapper from pyarrow import dataset as pa_ds -from pyarrow import fs as pa_fs import dask from dask.base import tokenize @@ -445,7 +445,7 @@ def extract_filesystem( fs = filesystem # Handle pyarrow-based filesystem - if isinstance(fs, pa_fs.FileSystem) or fs in ("arrow", "pyarrow"): + if isinstance(fs, pyarrow.fs.FileSystem) or fs in ("arrow", "pyarrow"): if isinstance(urlpath, (list, tuple, set)): if not urlpath: raise ValueError("empty urlpath sequence") @@ -454,12 +454,30 @@ def extract_filesystem( urlpath = [stringify_path(urlpath)] if fs in ("arrow", "pyarrow"): - fs = type(pa_fs.FileSystem.from_uri(urlpath[0])[0])( - **(storage_options or {}) - ) + if urlpath[0].startswith("s3://"): + bucket = urlpath[0][5:].split("/")[0] + import boto3 + + session = boto3.session.Session() + credentials = session.get_credentials() + region = session.client("s3").get_bucket_location(Bucket=bucket)[ + "LocationConstraint" + ] + fs = pyarrow.fs.S3FileSystem( + secret_key=credentials.secret_key, + access_key=credentials.access_key, + region=region, + session_token=credentials.token, + ) + else: + fs = type(pyarrow.fs.FileSystem.from_uri(urlpath[0])[0])( + **(storage_options or {}) + ) fsspec_fs = ArrowFSWrapper(fs) - if urlpath[0].startswith("C:") and isinstance(fs, pa_fs.LocalFileSystem): + if urlpath[0].startswith("C:") and isinstance( + fs, pyarrow.fs.LocalFileSystem + ): # ArrowFSWrapper._strip_protocol not reliable on windows # See: https://github.com/fsspec/filesystem_spec/issues/1137 from fsspec.implementations.local import LocalFileSystem diff --git a/dask/dataframe/io/parquet/core.py b/dask/dataframe/io/parquet/core.py index 584279e7382..ec153eea2f0 100644 --- a/dask/dataframe/io/parquet/core.py +++ b/dask/dataframe/io/parquet/core.py @@ -311,7 +311,7 @@ def read_parquet( automatically set ``split_row_groups`` to either 'adaptive' or ``False``. blocksize : int or str, default 'default' The desired size of each output ``DataFrame`` partition in terms of total - (uncompressed) parquet storage space. This argument is currenlty used to + (uncompressed) parquet storage space. This argument is currently used to set the default value of ``split_row_groups`` (using row-group metadata from a single file), and will be ignored if ``split_row_groups`` is not set to 'infer' or 'adaptive'. Default may be engine-dependant, but is