diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 52a302f4..fbbe51bd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -25,43 +25,73 @@ jobs: matrix: numfocus_nightly: [false] os: ["ubuntu-latest"] - pyarrow: ["3.0.0", "4.0.1", "nightly"] + pandas: [""] + pyarrow: ["4.0.1", "nightly"] python: ["3.8"] include: - numfocus_nightly: true os: "ubuntu-latest" + pandas: "" pyarrow: "4.0.1" python: "3.10" - numfocus_nightly: false os: "ubuntu-latest" + pandas: "1.5.3" + pyarrow: "4.0.1" + python: "3.11" + - numfocus_nightly: false + os: "ubuntu-latest" + pandas: "1.5.3" + pyarrow: "13.0.0" + python: "3.11" + - numfocus_nightly: false + os: "ubuntu-latest" + pandas: "" pyarrow: "5.0.0" python: "3.9" - numfocus_nightly: false os: "ubuntu-latest" + pandas: "" pyarrow: "6.0.1" python: "3.9" - numfocus_nightly: false os: "ubuntu-latest" + pandas: "" pyarrow: "7.0.0" python: "3.10" - numfocus_nightly: false os: "ubuntu-latest" + pandas: "" pyarrow: "8.0.1" python: "3.10" - numfocus_nightly: false os: "ubuntu-latest" + pandas: "" pyarrow: "9.0.0" python: "3.10" - numfocus_nightly: false os: "ubuntu-latest" + pandas: "" pyarrow: "10.0.1" python: "3.11" - numfocus_nightly: false os: "ubuntu-latest" + pandas: "" pyarrow: "11.0.0" python: "3.11" + - numfocus_nightly: false + os: "ubuntu-latest" + pandas: "" + pyarrow: "12.0.0" + python: "3.11" + - numfocus_nightly: false + os: "ubuntu-latest" + pandas: "" + pyarrow: "13.0.0" + python: "3.11" - numfocus_nightly: false os: "macos-latest" + pandas: "" pyarrow: "4.0.1" python: "3.8" continue-on-error: ${{ matrix.numfocus_nightly || matrix.pyarrow == 'nightly' }} @@ -89,22 +119,28 @@ jobs: cache-env: true extra-specs: | python=${{ matrix.PYTHON_VERSION }} - - name: Install repository - run: python -m pip install --no-build-isolation --no-deps --disable-pip-version-check -e . - name: Install Pyarrow (non-nightly) - run: micromamba install pyarrow==${{ matrix.pyarrow }} - if: matrix.pyarrow != 'nightly' + # Don't pin python as older versions of pyarrow require older versions of python + # Pin asv so it doesn't get updated before the benchmarks are run + run: micromamba install -y --no-py-pin pyarrow==${{ matrix.pyarrow }} "pandas<2.1.0" "asv<0.6" + if: matrix.pyarrow != 'nightly' && matrix.pandas == '' - name: Install Pyarrow (nightly) # Install both arrow-cpp and pyarrow to make sure that we have the # latest nightly of both packages. It is sadly not guaranteed that the # nightlies and the latest release would otherwise work together. run: micromamba update -c arrow-nightlies -c conda-forge arrow-cpp pyarrow if: matrix.pyarrow == 'nightly' - - name: Pip Instal NumFOCUS nightly + - name: Install Pyarrow (downgrade pandas) + run: micromamba install -y --no-py-pin pyarrow==${{ matrix.pyarrow }} pandas==${{ matrix.pandas }} + if: matrix.pyarrow != 'nightly' && matrix.pandas != '' + - name: Pip Install NumFOCUS nightly # NumFOCUS nightly wheels, contains numpy and pandas # TODO(gh-45): Re-add numpy - run: python -m pip install --pre --upgrade --timeout=60 --extra-index-url https://pypi.anaconda.org/scipy-wheels-nightly/simple pandas + # TODO: Remove pandas version pin once https://github.com/pandas-dev/pandas/issues/55014 is fixed + run: python -m pip install --pre --upgrade --timeout=60 --extra-index-url https://pypi.anaconda.org/scipy-wheels-nightly/simple "pandas<2.1.0" if: matrix.numfocus_nightly + - name: Install repository + run: python -m pip install --no-build-isolation --no-deps --disable-pip-version-check -e . - name: Test import run: | python -c "import plateau" diff --git a/CHANGES.rst b/CHANGES.rst index 82747f66..952a02d7 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -2,6 +2,14 @@ Changelog ========= +Plateau 4.2.0 (unreleased) +========================== + +* Support pandas 2 +* Test pyarrow 12 and 13 +* Prevent dask from casting all object dtypes to strings +* Remove tests for pyarrow<=3 as they fail with pandas>=2 + Plateau 4.1.5 (2023-03-14) ========================== diff --git a/docs/environment-docs.yml b/docs/environment-docs.yml index c130e24e..db25109f 100644 --- a/docs/environment-docs.yml +++ b/docs/environment-docs.yml @@ -3,13 +3,13 @@ channels: - conda-forge dependencies: - python>=3.8 - - dask[dataframe] + - dask<2023.9.2 - decorator - msgpack-python>=0.5.2 # Currently dask and numpy==1.16.0 clash - numpy!=1.15.0,!=1.16.0 - - pandas>=0.23.0, !=1.0.0 - - pyarrow>=0.17.1,!=1.0.0 + - pandas>=0.23.0,!=1.0.0,<2.1.0 + - pyarrow>=4 - simplejson - minimalkv - toolz diff --git a/environment.yml b/environment.yml index ad41e42d..99a00260 100644 --- a/environment.yml +++ b/environment.yml @@ -3,14 +3,15 @@ channels: - conda-forge - nodefaults dependencies: - - dask!=2021.5.1,!=2021.6.0 # gh475 - 2021.5.1 and 2021.6.0 broke ci, omit those versions + # TODO: Investigate issue with dask 2023.9.2 + - dask!=2021.5.1,!=2021.6.0,<2023.9.2 # gh475 - 2021.5.1 and 2021.6.0 broke ci, omit those versions - decorator - msgpack-python>=0.5.2 # Currently dask and numpy==1.16.0 clash # TODO: add support for numpy>=1.23 - numpy!=1.15.0,!=1.16.0 - - pandas>=0.23.0,!=1.0.0 - - pyarrow>=0.17.1,!=1.0.0 + - pandas>=0.23.0,!=1.0.0,<2.1.0 + - pyarrow>=4 - simplejson - minimalkv>=1.4.2 - toolz @@ -36,6 +37,6 @@ dependencies: # CLI - ipython # ASV // Benchmark - - asv + - asv<0.6 # Packaging infrastructure - python-build diff --git a/plateau/core/common_metadata.py b/plateau/core/common_metadata.py index 31983786..efc72849 100644 --- a/plateau/core/common_metadata.py +++ b/plateau/core/common_metadata.py @@ -10,12 +10,14 @@ import pyarrow.parquet as pq import simplejson from minimalkv import KeyValueStore +from packaging import version from plateau.core import naming from plateau.core._compat import load_json from plateau.core.naming import SINGLE_TABLE from plateau.core.utils import ensure_string_type from plateau.serialization._parquet import PARQUET_VERSION +from plateau.serialization._util import schema_metadata_bytes_to_object _logger = logging.getLogger() @@ -28,6 +30,8 @@ "normalize_column_order", ) +PYARROW_LT_13 = version.parse(pa.__version__) < version.parse("13") + class SchemaWrapper: """Wrapper object for pyarrow.Schema to handle forwards and backwards @@ -736,7 +740,9 @@ def _dict_to_binary(dct): return simplejson.dumps(dct, sort_keys=True).encode("utf8") -def empty_dataframe_from_schema(schema, columns=None, date_as_object=False): +def empty_dataframe_from_schema( + schema, columns=None, date_as_object=False, coerce_temporal_nanoseconds=True +): """Create an empty DataFrame from provided schema. Parameters @@ -746,14 +752,26 @@ def empty_dataframe_from_schema(schema, columns=None, date_as_object=False): columns: Union[None, List[str]] Optional list of columns that should be part of the resulting DataFrame. All columns in that list must also be part of the provided schema. + date_as_object: bool + Cast dates to objects. + coerce_temporal_nanoseconds: bool + Coerce date32, date64, duration and timestamp units to nanoseconds to retain behaviour of pandas 1.x. + Only applicable to pandas version >= 2.0 and PyArrow version >= 13.0.0. Returns ------- DataFrame Empty DataFrame with requested columns and types. """ - - df = schema.internal().empty_table().to_pandas(date_as_object=date_as_object) + # HACK: Cast bytes to object in metadata until Pandas bug is fixed: https://github.com/pandas-dev/pandas/issues/50127 + schema = schema_metadata_bytes_to_object(schema.internal()) + + # Prior to pyarrow 13.0.0 coerce_temporal_nanoseconds didn't exist + # as it was introduced for backwards compatibility with pandas 1.x + _coerce = {} + if not PYARROW_LT_13: + _coerce["coerce_temporal_nanoseconds"] = coerce_temporal_nanoseconds + df = schema.empty_table().to_pandas(date_as_object=date_as_object, **_coerce) df.columns = df.columns.map(ensure_string_type) if columns is not None: diff --git a/plateau/core/index.py b/plateau/core/index.py index 6a8631fc..53aa904a 100644 --- a/plateau/core/index.py +++ b/plateau/core/index.py @@ -6,6 +6,7 @@ import pandas as pd import pyarrow as pa import pyarrow.parquet as pq +from packaging import version from toolz.itertoolz import partition_all import plateau.core._time @@ -37,6 +38,8 @@ "PartitionIndex", ) +PYARROW_LT_13 = version.parse(pa.__version__) < version.parse("13") + class IndexBase(CopyMixin): """Initialize an IndexBase. @@ -136,11 +139,21 @@ def __repr__(self) -> str: class_=type(self).__name__, attrs=", ".join(repr_str) ) - def observed_values(self, date_as_object=True) -> np.ndarray: + def observed_values( + self, date_as_object=True, coerce_temporal_nanoseconds=True + ) -> np.ndarray: """Return an array of all observed values.""" keys = np.array(list(self.index_dct.keys())) labeled_array = pa.array(keys, type=self.dtype) - return np.array(labeled_array.to_pandas(date_as_object=date_as_object)) + + # Prior to pyarrow 13.0.0 coerce_temporal_nanoseconds didn't exist + # as it was introduced for backwards compatibility with pandas 1.x + _coerce = {} + if not PYARROW_LT_13: + _coerce["coerce_temporal_nanoseconds"] = coerce_temporal_nanoseconds + return np.array( + labeled_array.to_pandas(date_as_object=date_as_object, **_coerce) + ) @staticmethod def normalize_value(dtype: pa.DataType, value: Any) -> Any: @@ -476,7 +489,10 @@ def as_flat_series( table = _index_dct_to_table( self.index_dct, column=self.column, dtype=self.dtype ) - df = table.to_pandas(date_as_object=date_as_object) + # Prior to pyarrow 13.0.0 coerce_temporal_nanoseconds didn't exist + # as it was introduced for backwards compatibility with pandas 1.x + _coerce = {} if PYARROW_LT_13 else {"coerce_temporal_nanoseconds": True} + df = table.to_pandas(date_as_object=date_as_object, **_coerce) if predicates is not None: # If there is a conjunction without any reference to the index @@ -862,7 +878,10 @@ def _parquet_bytes_to_dict(column: str, index_buffer: bytes): if column_type == pa.timestamp("us"): column_type = pa.timestamp("ns") - df = table.to_pandas() + # Prior to pyarrow 13.0.0 coerce_temporal_nanoseconds didn't exist + # as it was introduced for backwards compatibility with pandas 1.x + _coerce = {} if PYARROW_LT_13 else {"coerce_temporal_nanoseconds": True} + df = table.to_pandas(**_coerce) index_dct = dict( zip(df[column].values, (list(x) for x in df[_PARTITION_COLUMN_NAME].values)) diff --git a/plateau/io/dask/compression.py b/plateau/io/dask/compression.py index e179ea41..eab81b05 100644 --- a/plateau/io/dask/compression.py +++ b/plateau/io/dask/compression.py @@ -2,6 +2,7 @@ from functools import partial from typing import List, Union +import dask import dask.dataframe as dd import pandas as pd @@ -109,7 +110,8 @@ def pack_payload(df: dd.DataFrame, group_key: Union[List[str], str]) -> dd.DataF _pack_payload = partial(pack_payload_pandas, group_key=group_key) - return df.map_partitions(_pack_payload, meta=packed_meta) + with dask.config.set({"dataframe.convert-string": False}): + return df.map_partitions(_pack_payload, meta=packed_meta) def unpack_payload_pandas( @@ -154,6 +156,7 @@ def unpack_payload(df: dd.DataFrame, unpack_meta: pd.DataFrame) -> dd.DataFrame: ) return df - return df.map_partitions( - unpack_payload_pandas, unpack_meta=unpack_meta, meta=unpack_meta - ) + with dask.config.set({"dataframe.convert-string": False}): + return df.map_partitions( + unpack_payload_pandas, unpack_meta=unpack_meta, meta=unpack_meta + ) diff --git a/plateau/io/dask/dataframe.py b/plateau/io/dask/dataframe.py index 36fda4be..e0aea8ef 100644 --- a/plateau/io/dask/dataframe.py +++ b/plateau/io/dask/dataframe.py @@ -150,18 +150,20 @@ def read_dataset_as_ddf( divisions.sort() divisions_lst = list(divisions) divisions_lst.append(divisions[-1]) - ddf = from_map( - ReadPlateauPartition(columns=columns), - mps, - meta=meta, - label="read-plateau", - divisions=divisions_lst, - store=ds_factory.store_factory, - categoricals=categoricals, - predicate_pushdown_to_io=predicate_pushdown_to_io, - dates_as_object=dates_as_object, - predicates=predicates, - ) + + with dask.config.set({"dataframe.convert-string": False}): + ddf = from_map( + ReadPlateauPartition(columns=columns), + mps, + meta=meta, + label="read-plateau", + divisions=divisions_lst, + store=ds_factory.store_factory, + categoricals=categoricals, + predicate_pushdown_to_io=predicate_pushdown_to_io, + dates_as_object=dates_as_object, + predicates=predicates, + ) if dask_index_on: return ddf.set_index(dask_index_on, divisions=divisions_lst, sorted=True) else: @@ -329,21 +331,24 @@ def store_dataset_from_ddf( if not overwrite: raise_if_dataset_exists(dataset_uuid=dataset_uuid, store=store) - mp_ser = _write_dataframe_partitions( - ddf=ddf, - store=ds_factory.store_factory, - dataset_uuid=dataset_uuid, - table=table, - secondary_indices=secondary_indices, - shuffle=shuffle, - repartition_ratio=repartition_ratio, - num_buckets=num_buckets, - sort_partitions_by=sort_partitions_by, - df_serializer=df_serializer, - metadata_version=metadata_version, - partition_on=partition_on, - bucket_by=bucket_by, - ) + + with dask.config.set({"dataframe.convert-string": False}): + mp_ser = _write_dataframe_partitions( + ddf=ddf, + store=ds_factory.store_factory, + dataset_uuid=dataset_uuid, + table=table, + secondary_indices=secondary_indices, + shuffle=shuffle, + repartition_ratio=repartition_ratio, + num_buckets=num_buckets, + sort_partitions_by=sort_partitions_by, + df_serializer=df_serializer, + metadata_version=metadata_version, + partition_on=partition_on, + bucket_by=bucket_by, + ) + return mp_ser.reduction( chunk=_id, aggregate=_commit_store_from_reduction, @@ -471,21 +476,22 @@ def update_dataset_from_ddf( inferred_indices = _ensure_compatible_indices(ds_factory, secondary_indices) del secondary_indices - mp_ser = _write_dataframe_partitions( - ddf=ddf, - store=ds_factory.store_factory if ds_factory else store, - dataset_uuid=dataset_uuid or ds_factory.dataset_uuid, - table=table, - secondary_indices=inferred_indices, - shuffle=shuffle, - repartition_ratio=repartition_ratio, - num_buckets=num_buckets, - sort_partitions_by=sort_partitions_by, - df_serializer=df_serializer, - metadata_version=metadata_version, - partition_on=cast(List[str], partition_on), - bucket_by=bucket_by, - ) + with dask.config.set({"dataframe.convert-string": False}): + mp_ser = _write_dataframe_partitions( + ddf=ddf, + store=ds_factory.store_factory if ds_factory else store, + dataset_uuid=dataset_uuid or ds_factory.dataset_uuid, + table=table, + secondary_indices=inferred_indices, + shuffle=shuffle, + repartition_ratio=repartition_ratio, + num_buckets=num_buckets, + sort_partitions_by=sort_partitions_by, + df_serializer=df_serializer, + metadata_version=metadata_version, + partition_on=cast(List[str], partition_on), + bucket_by=bucket_by, + ) return mp_ser.reduction( chunk=_id, @@ -567,24 +573,26 @@ def collect_dataset_metadata( mps = list( dispatch_metapartitions_from_factory(dataset_factory, predicates=predicates) ) - if mps: - random.shuffle(mps) - # ensure that even with sampling at least one metapartition is returned - cutoff_index = max(1, int(len(mps) * frac)) - mps = mps[:cutoff_index] - ddf = dd.from_delayed( - [ - dask.delayed(MetaPartition.get_parquet_metadata)( - mp, store=dataset_factory.store_factory - ) - for mp in mps - ], - meta=_METADATA_SCHEMA, - ) - else: - df = pd.DataFrame(columns=_METADATA_SCHEMA.keys()) - df = df.astype(_METADATA_SCHEMA) - ddf = dd.from_pandas(df, npartitions=1) + + with dask.config.set({"dataframe.convert-string": False}): + if mps: + random.shuffle(mps) + # ensure that even with sampling at least one metapartition is returned + cutoff_index = max(1, int(len(mps) * frac)) + mps = mps[:cutoff_index] + ddf = dd.from_delayed( + [ + dask.delayed(MetaPartition.get_parquet_metadata)( + mp, store=dataset_factory.store_factory + ) + for mp in mps + ], + meta=_METADATA_SCHEMA, + ) + else: + df = pd.DataFrame(columns=_METADATA_SCHEMA.keys()) + df = df.astype(_METADATA_SCHEMA) + ddf = dd.from_pandas(df, npartitions=1) return ddf @@ -651,12 +659,15 @@ def hash_dataset( columns=columns, dates_as_object=True, ) - if not group_key: - return ddf.map_partitions(_hash_partition, meta="uint64").astype("uint64") - else: - ddf2 = pack_payload(ddf, group_key=group_key) - return ( - ddf2.groupby(group_key) - .apply(_unpack_hash, unpack_meta=ddf._meta, subset=subset, meta="uint64") - .astype("uint64") - ) + with dask.config.set({"dataframe.convert-string": False}): + if not group_key: + return ddf.map_partitions(_hash_partition, meta="uint64").astype("uint64") + else: + ddf2 = pack_payload(ddf, group_key=group_key) + return ( + ddf2.groupby(group_key) + .apply( + _unpack_hash, unpack_meta=ddf._meta, subset=subset, meta="uint64" + ) + .astype("uint64") + ) diff --git a/plateau/serialization/_csv.py b/plateau/serialization/_csv.py index 5560d4b9..ac4926cc 100644 --- a/plateau/serialization/_csv.py +++ b/plateau/serialization/_csv.py @@ -8,6 +8,7 @@ import pandas as pd import pyarrow as pa from minimalkv import KeyValueStore +from packaging import version from pandas.errors import EmptyDataError from ._generic import ( @@ -18,6 +19,8 @@ filter_df_from_predicates, ) +PYARROW_LT_13 = version.parse(pa.__version__) < version.parse("13") + class CsvSerializer(DataFrameSerializer): def __init__(self, compress=True): @@ -85,7 +88,11 @@ def restore_dataframe( def store(self, store, key_prefix, df): if isinstance(df, pa.Table): - df = df.to_pandas() + # Prior to pyarrow 13.0.0 coerce_temporal_nanoseconds didn't exist + # as it was introduced for backwards compatibility with pandas 1.x + _coerce = {} if PYARROW_LT_13 else {"coerce_temporal_nanoseconds": True} + df = df.to_pandas(**_coerce) + key = f"{key_prefix}.csv" result_stream = BytesIO() iostream: BufferedIOBase diff --git a/plateau/serialization/_parquet.py b/plateau/serialization/_parquet.py index 953571e3..b7391d27 100644 --- a/plateau/serialization/_parquet.py +++ b/plateau/serialization/_parquet.py @@ -23,7 +23,7 @@ filter_df_from_predicates, ) from ._io_buffer import BlockBuffer -from ._util import ensure_unicode_string_type +from ._util import ensure_unicode_string_type, schema_metadata_bytes_to_object try: # Only check for BotoStore instance if boto is really installed @@ -41,6 +41,7 @@ BACKOFF_TIME = 0.01 # 10 ms PYARROW_LT_6 = version.parse(pa.__version__) < version.parse("6") PYARROW_LT_8 = version.parse(pa.__version__) < version.parse("8") +PYARROW_LT_13 = version.parse(pa.__version__) < version.parse("13") # Since pyarrow 6, the Parquet version/features can be selected more granular. # Version 2.0 is equal to 2.4 but 2.4 doesn't trigger deprecation warnings. @@ -255,11 +256,16 @@ def _restore_dataframe( else: # ARROW-5139 Column projection with empty columns returns a table w/out index if columns == []: + # Prior to pyarrow 13.0.0 coerce_temporal_nanoseconds didn't exist + # as it was introduced for backwards compatibility with pandas 1.x + _coerce = {} + if not PYARROW_LT_13: + _coerce["coerce_temporal_nanoseconds"] = True # Create an arrow table with expected index length. df = ( parquet_file.schema.to_arrow_schema() .empty_table() - .to_pandas(date_as_object=date_as_object) + .to_pandas(date_as_object=date_as_object, **_coerce) ) index = pd.Index( pd.RangeIndex(start=0, stop=parquet_file.metadata.num_rows), @@ -284,7 +290,13 @@ def _restore_dataframe( table = _reset_dictionary_columns(table, exclude=categories) - df = table.to_pandas(date_as_object=date_as_object) + # HACK: Cast bytes to object in metadata until Pandas bug is fixed: https://github.com/pandas-dev/pandas/issues/50127 + table = table.cast(schema_metadata_bytes_to_object(table.schema)) + + # Prior to pyarrow 13.0.0 coerce_temporal_nanoseconds didn't exist + # as it was introduced for backwards compatibility with pandas 1.x + _coerce = {} if PYARROW_LT_13 else {"coerce_temporal_nanoseconds": True} + df = table.to_pandas(date_as_object=date_as_object, **_coerce) # XXX: Patch until Pyarrow bug is resolved: https://issues.apache.org/jira/browse/ARROW-18099?filter=-2 if categories: diff --git a/plateau/serialization/_util.py b/plateau/serialization/_util.py index f556c981..223eca6d 100644 --- a/plateau/serialization/_util.py +++ b/plateau/serialization/_util.py @@ -1,3 +1,6 @@ +from pyarrow import Schema + + def _check_contains_null(val): if isinstance(val, bytes): for byte in val: @@ -16,3 +19,8 @@ def ensure_unicode_string_type(obj): return obj.decode("utf8") else: return str(obj) + + +def schema_metadata_bytes_to_object(schema: Schema) -> Schema: + meta = schema.metadata[b"pandas"].decode().replace("bytes", "object").encode() + return schema.with_metadata({b"pandas": meta}) diff --git a/plateau/serialization/testing.py b/plateau/serialization/testing.py index b3cd33f2..ad1bf995 100644 --- a/plateau/serialization/testing.py +++ b/plateau/serialization/testing.py @@ -36,16 +36,16 @@ def get_dataframe_not_nested(n): "bool": pd.Series( [1] * int(np.floor(n / 2)) + [0] * int(np.ceil(n / 2)), dtype=np.bool_ ), - "int8": pd.Series(range(n), dtype=np.int8), - "int16": pd.Series(range(n), dtype=np.int16), - "int32": pd.Series(range(n), dtype=np.int32), - "int64": pd.Series(range(n), dtype=np.int64), - "uint8": pd.Series(range(n), dtype=np.uint8), - "uint16": pd.Series(range(n), dtype=np.uint16), - "uint32": pd.Series(range(n), dtype=np.uint32), - "uint64": pd.Series(range(n), dtype=np.uint64), - "float32": pd.Series([float(x) for x in range(n)], dtype=np.float32), - "float64": pd.Series([float(x) for x in range(n)], dtype=np.float64), + "int8": pd.Series(range(n)).astype(np.int8), + "int16": pd.Series(range(n)).astype(np.int16), + "int32": pd.Series(range(n)).astype(np.int32), + "int64": pd.Series(range(n)).astype(np.int64), + "uint8": pd.Series(range(n)).astype(np.uint8), + "uint16": pd.Series(range(n)).astype(np.uint16), + "uint32": pd.Series(range(n)).astype(np.uint32), + "uint64": pd.Series(range(n)).astype(np.uint64), + "float32": pd.Series([float(x) for x in range(n)]).astype(np.float32), + "float64": pd.Series([float(x) for x in range(n)]).astype(np.float64), "date": pd.Series( [date(2018, 1, x % 31 + 1) for x in range(1, n + 1)], dtype=object ), diff --git a/reference-data/arrow-compat/12.0.0.parquet b/reference-data/arrow-compat/12.0.0.parquet new file mode 100644 index 00000000..33c0b3b4 Binary files /dev/null and b/reference-data/arrow-compat/12.0.0.parquet differ diff --git a/reference-data/arrow-compat/13.0.0.parquet b/reference-data/arrow-compat/13.0.0.parquet new file mode 100644 index 00000000..2c97acf9 Binary files /dev/null and b/reference-data/arrow-compat/13.0.0.parquet differ diff --git a/setup.cfg b/setup.cfg index 8c54f0e8..c3581723 100644 --- a/setup.cfg +++ b/setup.cfg @@ -13,13 +13,13 @@ classifiers = [options] include_package_data = true install_requires = - dask[dataframe]!=2021.5.1,!=2021.6.0 # gh475 - 2021.5.1 and 2021.6.0 broke ci, omit those versions + dask[dataframe]!=2021.5.1,!=2021.6.0,<2023.9.2 # gh475 - 2021.5.1 and 2021.6.0 broke ci, omit those versions decorator msgpack>=0.5.2 # Currently dask and numpy==1.16.0 clash numpy!=1.15.0,!=1.16.0 - pandas>=0.23.0, !=1.0.0 - pyarrow>=0.17.1,!=1.0.0 + pandas>=0.23.0,!=1.0.0,<2.1.0 + pyarrow>=4 simplejson minimalkv>=1.4.2 toolz diff --git a/tests/io/dask/dataframe/test_compression.py b/tests/io/dask/dataframe/test_compression.py index c18fcb98..2ca358cd 100644 --- a/tests/io/dask/dataframe/test_compression.py +++ b/tests/io/dask/dataframe/test_compression.py @@ -1,3 +1,4 @@ +import dask import dask.dataframe as dd import pandas as pd import pandas.testing as pdt @@ -13,9 +14,10 @@ def test_pack_payload(df_all_types): # For a single row dataframe the packing actually has a few more bytes - df = dd.from_pandas( - pd.concat([df_all_types] * 10, ignore_index=True), npartitions=3 - ) + with dask.config.set({"dataframe.convert-string": False}): + df = dd.from_pandas( + pd.concat([df_all_types] * 10, ignore_index=True), npartitions=3 + ) size_before = df.memory_usage(deep=True).sum() packed_df = pack_payload(df, group_key=list(df.columns[-2:])) @@ -66,7 +68,8 @@ def test_pack_payload_pandas_empty(df_all_types): @pytest.mark.parametrize("num_group_cols", [1, 4]) def test_pack_payload_roundtrip(df_all_types, num_group_cols): group_key = list(df_all_types.columns[-num_group_cols:]) - df_all_types = dd.from_pandas(df_all_types, npartitions=2) + with dask.config.set({"dataframe.convert-string": False}): + df_all_types = dd.from_pandas(df_all_types, npartitions=2) pdt.assert_frame_equal( df_all_types.compute(), unpack_payload( diff --git a/tests/io/dask/dataframe/test_read.py b/tests/io/dask/dataframe/test_read.py index 348e091f..e6480d50 100644 --- a/tests/io/dask/dataframe/test_read.py +++ b/tests/io/dask/dataframe/test_read.py @@ -277,3 +277,43 @@ def restore_dataframe(cls, store, key, filter_query, columns, *args, **kwargs): )["colA"] assert_dask_eq(ddf_auto, ddf_manual) assert fake_called + + +def test_dask_index_on_non_string_raises(store_factory): + dataset_uuid = "dataset_uuid" + colA = 1 + df1 = pd.DataFrame({colA: [1, 2]}) + store_dataframes_as_dataset( + store=store_factory, dataset_uuid=dataset_uuid, dfs=[df1] + ) + with pytest.raises( + TypeError, + match=f"The paramter `dask_index_on` must be a string but got {type(colA)}", + ): + read_dataset_as_ddf( + dataset_uuid=dataset_uuid, + store=store_factory, + table="table", + dask_index_on=colA, + ) + + +def test_dask_dispatch_by_raises_if_index_on_not_none(store_factory): + dataset_uuid = "dataset_uuid" + colA = "ColumnA" + df1 = pd.DataFrame({colA: [1, 2]}) + store_dataframes_as_dataset( + store=store_factory, dataset_uuid=dataset_uuid, dfs=[df1] + ) + with pytest.raises( + ValueError, + match="`read_dataset_as_ddf` got parameters `dask_index_on` and `dispatch_by`. " + "Note that `dispatch_by` can only be used if `dask_index_on` is None.", + ): + read_dataset_as_ddf( + dataset_uuid=dataset_uuid, + store=store_factory, + table="table", + dask_index_on=colA, + dispatch_by=[colA], + ) diff --git a/tests/io/dask/dataframe/test_update.py b/tests/io/dask/dataframe/test_update.py index 3bb7c827..fe0c190f 100644 --- a/tests/io/dask/dataframe/test_update.py +++ b/tests/io/dask/dataframe/test_update.py @@ -24,13 +24,14 @@ def _id(part): def _update_dataset(partitions, *args, **kwargs): # TODO: Simplify once parse_input_to_metapartition is removed / obsolete - if isinstance(partitions, pd.DataFrame): - partitions = dd.from_pandas(partitions, npartitions=1) - elif partitions is not None: - delayed_partitions = [dask.delayed(_id)(part) for part in partitions] - partitions = dd.from_delayed(delayed_partitions) - else: - partitions = None + with dask.config.set({"dataframe.convert-string": False}): + if isinstance(partitions, pd.DataFrame): + partitions = dd.from_pandas(partitions, npartitions=1) + elif partitions is not None: + delayed_partitions = [dask.delayed(_id)(part) for part in partitions] + partitions = dd.from_delayed(delayed_partitions) + else: + partitions = None # Replace `table_name` with `table` keyword argument to enable shared test code # via `bound_update_dataset` fixture diff --git a/tests/serialization/test_arrow_compat.py b/tests/serialization/test_arrow_compat.py index f5a99467..087d64ca 100644 --- a/tests/serialization/test_arrow_compat.py +++ b/tests/serialization/test_arrow_compat.py @@ -27,6 +27,8 @@ "9.0.0", "10.0.1", "11.0.0", + "12.0.0", + "13.0.0", ]