diff --git a/dask/dataframe/io/parquet/arrow.py b/dask/dataframe/io/parquet/arrow.py index a1c237f30287..614e752e0258 100644 --- a/dask/dataframe/io/parquet/arrow.py +++ b/dask/dataframe/io/parquet/arrow.py @@ -10,6 +10,7 @@ import numpy as np import pandas as pd +import pyarrow import pyarrow as pa import pyarrow.parquet as pq @@ -17,7 +18,6 @@ from fsspec.core import expand_paths_if_needed, stringify_path from fsspec.implementations.arrow import ArrowFSWrapper from pyarrow import dataset as pa_ds -from pyarrow import fs as pa_fs import dask from dask.base import tokenize @@ -445,7 +445,7 @@ def extract_filesystem( fs = filesystem # Handle pyarrow-based filesystem - if isinstance(fs, pa_fs.FileSystem) or fs in ("arrow", "pyarrow"): + if isinstance(fs, pyarrow.fs.FileSystem) or fs in ("arrow", "pyarrow"): if isinstance(urlpath, (list, tuple, set)): if not urlpath: raise ValueError("empty urlpath sequence") @@ -454,12 +454,30 @@ def extract_filesystem( urlpath = [stringify_path(urlpath)] if fs in ("arrow", "pyarrow"): - fs = type(pa_fs.FileSystem.from_uri(urlpath[0])[0])( - **(storage_options or {}) - ) + if urlpath[0].startswith("s3://"): + bucket = urlpath[0][5:].split("/")[0] + import boto3 + + session = boto3.session.Session() + credentials = session.get_credentials() + region = session.client("s3").get_bucket_location(Bucket=bucket)[ + "LocationConstraint" + ] + fs = pyarrow.fs.S3FileSystem( + secret_key=credentials.secret_key, + access_key=credentials.access_key, + region=region, + session_token=credentials.token, + ) + else: + fs = type(pyarrow.fs.FileSystem.from_uri(urlpath[0])[0])( + **(storage_options or {}) + ) fsspec_fs = ArrowFSWrapper(fs) - if urlpath[0].startswith("C:") and isinstance(fs, pa_fs.LocalFileSystem): + if urlpath[0].startswith("C:") and isinstance( + fs, pyarrow.fs.LocalFileSystem + ): # ArrowFSWrapper._strip_protocol not reliable on windows # See: https://github.com/fsspec/filesystem_spec/issues/1137 from fsspec.implementations.local import LocalFileSystem diff --git a/dask/dataframe/io/parquet/core.py b/dask/dataframe/io/parquet/core.py index 584279e73823..ec153eea2f07 100644 --- a/dask/dataframe/io/parquet/core.py +++ b/dask/dataframe/io/parquet/core.py @@ -311,7 +311,7 @@ def read_parquet( automatically set ``split_row_groups`` to either 'adaptive' or ``False``. blocksize : int or str, default 'default' The desired size of each output ``DataFrame`` partition in terms of total - (uncompressed) parquet storage space. This argument is currenlty used to + (uncompressed) parquet storage space. This argument is currently used to set the default value of ``split_row_groups`` (using row-group metadata from a single file), and will be ignored if ``split_row_groups`` is not set to 'infer' or 'adaptive'. Default may be engine-dependant, but is diff --git a/dask/dataframe/io/parquet/utils.py b/dask/dataframe/io/parquet/utils.py index 1344902af03b..87c8cf7b81e4 100644 --- a/dask/dataframe/io/parquet/utils.py +++ b/dask/dataframe/io/parquet/utils.py @@ -174,7 +174,7 @@ def read_metadata( @classmethod def default_blocksize(cls): - return "128 MiB" + return "256 MiB" @classmethod def read_partition(