Skip to content

Commit

Permalink
Support dd.read_parquet(..., filesystem="arrow")
Browse files Browse the repository at this point in the history
This asks Boto3 for the relevant information to populate the Arrow
FileSystem object.  I'm assuming here that Boto3 can robustly find these
credentials, but someone with more experience should verify this.

I'm testing locally by verifying that the following returns:

```python
dd.read_parquet("s3://coiled-data/uber/", filesystem="arrow")
```

Happy to add that as a test here if preferred.
  • Loading branch information
mrocklin committed Oct 26, 2023
1 parent 1203b1b commit dbf6114
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 8 deletions.
30 changes: 24 additions & 6 deletions dask/dataframe/io/parquet/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@

import numpy as np
import pandas as pd
import pyarrow
import pyarrow as pa
import pyarrow.parquet as pq

# Check PyArrow version for feature support
from fsspec.core import expand_paths_if_needed, stringify_path
from fsspec.implementations.arrow import ArrowFSWrapper
from pyarrow import dataset as pa_ds
from pyarrow import fs as pa_fs

import dask
from dask.base import tokenize
Expand Down Expand Up @@ -445,7 +445,7 @@ def extract_filesystem(
fs = filesystem

# Handle pyarrow-based filesystem
if isinstance(fs, pa_fs.FileSystem) or fs in ("arrow", "pyarrow"):
if isinstance(fs, pyarrow.fs.FileSystem) or fs in ("arrow", "pyarrow"):
if isinstance(urlpath, (list, tuple, set)):
if not urlpath:
raise ValueError("empty urlpath sequence")
Expand All @@ -454,12 +454,30 @@ def extract_filesystem(
urlpath = [stringify_path(urlpath)]

if fs in ("arrow", "pyarrow"):
fs = type(pa_fs.FileSystem.from_uri(urlpath[0])[0])(
**(storage_options or {})
)
if urlpath[0].startswith("s3://"):
bucket = urlpath[0][5:].split("/")[0]
import boto3

session = boto3.session.Session()
credentials = session.get_credentials()
region = session.client("s3").get_bucket_location(Bucket=bucket)[
"LocationConstraint"
]
fs = pyarrow.fs.S3FileSystem(
secret_key=credentials.secret_key,
access_key=credentials.access_key,
region=region,
session_token=credentials.token,
)
else:
fs = type(pyarrow.fs.FileSystem.from_uri(urlpath[0])[0])(
**(storage_options or {})
)

fsspec_fs = ArrowFSWrapper(fs)
if urlpath[0].startswith("C:") and isinstance(fs, pa_fs.LocalFileSystem):
if urlpath[0].startswith("C:") and isinstance(
fs, pyarrow.fs.LocalFileSystem
):
# ArrowFSWrapper._strip_protocol not reliable on windows
# See: https://github.com/fsspec/filesystem_spec/issues/1137
from fsspec.implementations.local import LocalFileSystem
Expand Down
2 changes: 1 addition & 1 deletion dask/dataframe/io/parquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ def read_parquet(
automatically set ``split_row_groups`` to either 'adaptive' or ``False``.
blocksize : int or str, default 'default'
The desired size of each output ``DataFrame`` partition in terms of total
(uncompressed) parquet storage space. This argument is currenlty used to
(uncompressed) parquet storage space. This argument is currently used to
set the default value of ``split_row_groups`` (using row-group metadata
from a single file), and will be ignored if ``split_row_groups`` is not
set to 'infer' or 'adaptive'. Default may be engine-dependant, but is
Expand Down
2 changes: 1 addition & 1 deletion dask/dataframe/io/parquet/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def read_metadata(

@classmethod
def default_blocksize(cls):
return "128 MiB"
return "256 MiB"

@classmethod
def read_partition(
Expand Down

0 comments on commit dbf6114

Please sign in to comment.