Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unpin dask/distributed for development #1319

Merged
merged 39 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
61513d3
Unpin dask/distributed for development
charlesbluca Mar 19, 2024
0fb485d
First pass at unblocking dask-expr issues - replace _Frame usage
charlesbluca Mar 22, 2024
a83be6d
Merge remote-tracking branch 'upstream/main' into unpin-dask-distributed
charlesbluca Mar 26, 2024
920cd53
First pass at unblocking pytest errors
charlesbluca Mar 26, 2024
7f952d8
Disable predicate pushdown & its tests if dask-expr enabled
charlesbluca Apr 3, 2024
fc4a248
Make sure partition_borders is computed in limit map_partitions func
charlesbluca Apr 3, 2024
c3d8efe
Skip intake tests for now
charlesbluca Apr 3, 2024
4f1747c
Simplify cross join logic to avoid internal graph manipulation
charlesbluca Apr 5, 2024
e50e30b
Round trip timeseries fixture to pandas to avoid dask-expr bug
charlesbluca Apr 5, 2024
2d5c8c7
Fix skipif_dask_expr_enabled marker
charlesbluca Apr 5, 2024
2fe2e3c
Ignore warnings around mismatched dtypes in joins
charlesbluca Apr 5, 2024
a795526
Add handling for dask-expr to test_broadcast_join
charlesbluca Apr 5, 2024
8938a4a
Skip parquet stats tests for now
charlesbluca Apr 9, 2024
6305a17
Skip DPP tests on dask-expr for now
charlesbluca Apr 9, 2024
76b55e3
Pass ddf object as meta for test_describe_model
charlesbluca Apr 9, 2024
876d282
Add dask-expr handling to test_sort_topk
charlesbluca Apr 9, 2024
1e72f33
Avoid using Dask graph internals for random functions
charlesbluca Apr 9, 2024
fddf52f
Skip over window count tests for now
charlesbluca Apr 10, 2024
72dd6ab
Skip test_over_calls and test_over_with_windows
charlesbluca Apr 10, 2024
93bc5dc
Update timeseries fixture comment to acknowledge fix
charlesbluca Apr 10, 2024
5cc7cc5
More detailed messages for window test skips
charlesbluca Apr 10, 2024
9ba3f0a
Skip test_join_alias_w_projection for now
charlesbluca Apr 10, 2024
ffd695c
Un-xfail test_xgboost_training_prediction on win32
charlesbluca Apr 10, 2024
73d32a2
Windows failures are still intermittent
charlesbluca Apr 10, 2024
35aa225
Bump rust to 1.73 to circumvent conda sha256 errors
charlesbluca Apr 10, 2024
8e6dc05
Disable query planning in GPU CI for now
charlesbluca Apr 10, 2024
9563b89
Revert "Bump rust to 1.73 to circumvent conda sha256 errors"
charlesbluca Apr 10, 2024
b7ebab9
Use older conda-build version to try and resolve build issues
charlesbluca Apr 10, 2024
9f32482
Pin to an older version of conda-build and boa
charlesbluca Apr 10, 2024
207803b
Skip deadlocking xgboost test on GPU
charlesbluca Apr 11, 2024
6cbe5a9
Add subset of testing with query planning disabled
charlesbluca Apr 12, 2024
5a279b8
Add query-planning to job names
charlesbluca Apr 12, 2024
7ce1cbf
Fix syntax errors
charlesbluca Apr 12, 2024
6d762a5
Add dask-expr to CI environments, bump to pandas 2
charlesbluca Apr 15, 2024
536d156
Bump dask/dask-expr to 2024.2.1/0.5 to get around aggregation bug
charlesbluca Apr 15, 2024
53c6af5
Bump dask / dask-expr to 2024.3.1 / 1.0.5 to resolve drop bug
charlesbluca Apr 15, 2024
8d1d107
Bump dask / dask-expr to 2024.4.1 / 1.0.11 to resolve head bug
charlesbluca Apr 15, 2024
4a64319
Remove dask-expr workaround from timeseries fixture
charlesbluca Apr 15, 2024
99a5745
Unpin sqlalchemy in python 3.9 CI environment
charlesbluca Apr 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ description = "Bindings for DataFusion used by Dask-SQL"
readme = "README.md"
license = "Apache-2.0"
edition = "2021"
rust-version = "1.72"
rust-version = "1.73"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All for bumping! Just for my own knowledge was this a requirement or just doing it because it has been awhile? Either way is fine and this blocks nothing just want to know.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually just did this to try and triage some issues we've been seeing in the aarch64 builds around hash mismatches?

https://github.com/dask-contrib/dask-sql/actions/runs/8437291358/job/23106757782

The errors in that run seemed to imply that there was something wrong with the rust 1.72 packages, but actually it turns out that this seems to be an issue with newer versions of conda-build (and/or boa), looks like 9f32482 unblocked the builds

include = ["/src", "/dask_sql", "/LICENSE.txt", "pyproject.toml", "Cargo.toml", "Cargo.lock"]

[dependencies]
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/docker/conda.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
python>=3.9
dask==2024.1.1
dask>=2024.1.1
pandas>=1.4.0
jpype1>=1.0.2
openjdk>=8
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/docker/main.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ RUN mamba install -y \
# build requirements
"maturin>=1.3,<1.4" \
# core dependencies
"dask==2024.1.1" \
"dask>=2024.1.1" \
"pandas>=1.4.0" \
"fastapi>=0.92.0" \
"httpx>=0.24.1" \
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/environment-3.10.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ channels:
- conda-forge
dependencies:
- c-compiler
- dask==2024.1.1
- dask>=2024.1.1
- fastapi>=0.92.0
- fugue>=0.7.3
- httpx>=0.24.1
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/environment-3.11.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ channels:
- conda-forge
dependencies:
- c-compiler
- dask==2024.1.1
- dask>=2024.1.1
- fastapi>=0.92.0
- fugue>=0.7.3
- httpx>=0.24.1
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/environment-3.12.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ channels:
- conda-forge
dependencies:
- c-compiler
- dask==2024.1.1
- dask>=2024.1.1
- fastapi>=0.92.0
- fugue>=0.7.3
- httpx>=0.24.1
Expand Down
4 changes: 3 additions & 1 deletion continuous_integration/gpuci/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ cd "$WORKSPACE"
# Determine CUDA release version
export CUDA_REL=${CUDA_VERSION%.*}

export DASK_DATAFRAME__QUERY_PLANNING=false

################################################################################
# SETUP - Check environment
################################################################################
Expand Down Expand Up @@ -61,4 +63,4 @@ conda config --show-sources
conda list --show-channel-urls

rapids-logger "Python py.test for dask-sql"
py.test $WORKSPACE -n 4 -v -m gpu --runqueries --rungpu --junitxml="$WORKSPACE/junit-dask-sql.xml" --cov-config="$WORKSPACE/.coveragerc" --cov=dask_sql --cov-report=xml:"$WORKSPACE/dask-sql-coverage.xml" --cov-report term
py.test $WORKSPACE -n $PARALLEL_LEVEL -v -m gpu --runqueries --rungpu --junitxml="$WORKSPACE/junit-dask-sql.xml" --cov-config="$WORKSPACE/.coveragerc" --cov=dask_sql --cov-report=xml:"$WORKSPACE/dask-sql-coverage.xml" --cov-report term
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, good change.

2 changes: 1 addition & 1 deletion continuous_integration/gpuci/environment-3.10.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ channels:
dependencies:
- c-compiler
- zlib
- dask==2024.1.1
- dask>=2024.1.1
- fastapi>=0.92.0
- fugue>=0.7.3
- httpx>=0.24.1
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/gpuci/environment-3.9.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ channels:
dependencies:
- c-compiler
- zlib
- dask==2024.1.1
- dask>=2024.1.1
- fastapi>=0.92.0
- fugue>=0.7.3
- httpx>=0.24.1
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/recipe/conda_build_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ c_compiler_version:
rust_compiler:
- rust
rust_compiler_version:
- '1.72'
- '1.73'
maturin:
- '1.3'
xz: # [linux64]
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/recipe/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ requirements:
- xz # [linux64]
run:
- python
- dask ==2024.1.1
- dask >=2024.1.1
- pandas >=1.4.0
- fastapi >=0.92.0
- httpx >=0.24.1
Expand Down
16 changes: 12 additions & 4 deletions dask_sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,15 +262,23 @@ def create_table(
self.schema[schema_name].filepaths[table_name.lower()] = input_table
elif hasattr(input_table, "dask") and dd.utils.is_dataframe_like(input_table):
try:
dask_filepath = hlg_layer(
input_table.dask, "read-parquet"
).creation_info["args"][0]
if dd._dask_expr_enabled():
from dask_expr.io.parquet import ReadParquet

dask_filepath = None
operations = input_table.find_operations(ReadParquet)
for op in operations:
dask_filepath = op._args[0]
else:
dask_filepath = hlg_layer(
input_table.dask, "read-parquet"
).creation_info["args"][0]
dc.filepath = dask_filepath
self.schema[schema_name].filepaths[table_name.lower()] = dask_filepath
except KeyError:
logger.debug("Expected 'read-parquet' layer")

if parquet_statistics and not statistics:
if parquet_statistics and not dd._dask_expr_enabled() and not statistics:
statistics = parquet_statistics(dc.df)
if statistics:
row_count = 0
Expand Down
6 changes: 3 additions & 3 deletions dask_sql/physical/rel/custom/wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def transform(self, X):
estimator=self._postfit_estimator,
meta=output_meta,
)
elif isinstance(X, dd._Frame):
elif isinstance(X, dd.DataFrame):
if output_meta is None:
output_meta = _transform(X._meta_nonempty, self._postfit_estimator)
try:
Expand Down Expand Up @@ -305,7 +305,7 @@ def predict(self, X):
)
return result

elif isinstance(X, dd._Frame):
elif isinstance(X, dd.DataFrame):
if output_meta is None:
# dask-dataframe relies on dd.core.no_default
# for infering meta
Expand Down Expand Up @@ -364,7 +364,7 @@ def predict_proba(self, X):
meta=output_meta,
chunks=(X.chunks[0], len(self._postfit_estimator.classes_)),
)
elif isinstance(X, dd._Frame):
elif isinstance(X, dd.DataFrame):
if output_meta is None:
# dask-dataframe relies on dd.core.no_default
# for infering meta
Expand Down
3 changes: 2 additions & 1 deletion dask_sql/physical/rel/logical/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ def filter_or_scalar(
# In SQL, a NULL in a boolean is False on filtering
filter_condition = filter_condition.fillna(False)
out = df[filter_condition]
if dask_config.get("sql.predicate_pushdown"):
# dask-expr should implicitly handle predicate pushdown
if dask_config.get("sql.predicate_pushdown") and not dd._dask_expr_enabled():
return attempt_predicate_pushdown(out, add_filters=add_filters)
else:
return out
Expand Down
42 changes: 5 additions & 37 deletions dask_sql/physical/rel/logical/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

import dask.dataframe as dd
from dask import config as dask_config
from dask.base import tokenize
from dask.highlevelgraph import HighLevelGraph

from dask_sql.datacontainer import ColumnContainer, DataContainer
from dask_sql.physical.rel.base import BaseRelPlugin
Expand Down Expand Up @@ -132,41 +130,11 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai
# TODO: we should implement a shortcut
# for filter conditions that are always false

def merge_single_partitions(lhs_partition, rhs_partition):
# Do a cross join with the two partitions
# TODO: it would be nice to apply the filter already here
# problem: this would mean we need to ship the rex to the
# workers (as this is executed on the workers),
# which is definitely not possible (java dependency, JVM start...)
lhs_partition = lhs_partition.assign(common=1)
rhs_partition = rhs_partition.assign(common=1)

return lhs_partition.merge(rhs_partition, on="common").drop(
columns="common"
)

# Iterate nested over all partitions from lhs and rhs and merge them
name = "cross-join-" + tokenize(df_lhs_renamed, df_rhs_renamed)
dsk = {
(name, i * df_rhs_renamed.npartitions + j): (
merge_single_partitions,
(df_lhs_renamed._name, i),
(df_rhs_renamed._name, j),
)
for i in range(df_lhs_renamed.npartitions)
for j in range(df_rhs_renamed.npartitions)
}

graph = HighLevelGraph.from_collections(
name, dsk, dependencies=[df_lhs_renamed, df_rhs_renamed]
)

meta = dd.dispatch.concat(
[df_lhs_renamed._meta_nonempty, df_rhs_renamed._meta_nonempty], axis=1
)
# TODO: Do we know the divisions in any way here?
divisions = [None] * (len(dsk) + 1)
df = dd.DataFrame(graph, name, meta=meta, divisions=divisions)
df = dd.merge(
df_lhs_renamed.assign(common=1),
df_rhs_renamed.assign(common=1),
on="common",
).drop(columns="common")

warnings.warn(
"Need to do a cross-join, which is typically very resource heavy",
Expand Down
5 changes: 5 additions & 0 deletions dask_sql/physical/rel/logical/limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def _apply_limit(self, df: dd.DataFrame, limit: int, offset: int) -> dd.DataFram
# check if the first partition contains our desired window
if (
dask_config.get("sql.limit.check-first-partition")
and not dd._dask_expr_enabled()
and all(
[
isinstance(
Expand All @@ -79,6 +80,10 @@ def _apply_limit(self, df: dd.DataFrame, limit: int, offset: int) -> dd.DataFram
def limit_partition_func(df, partition_borders, partition_info=None):
"""Limit the partition to values contained within the specified window, returning an empty dataframe if there are none"""

# with dask-expr we may need to explicitly compute here
if hasattr(partition_borders, "compute"):
partition_borders = partition_borders.compute()

# TODO: remove the `cumsum` call here when dask#9067 is resolved
partition_borders = partition_borders.cumsum().to_dict()
partition_index = (
Expand Down
11 changes: 7 additions & 4 deletions dask_sql/physical/rel/logical/table_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from functools import reduce
from typing import TYPE_CHECKING

from dask.dataframe import _dask_expr_enabled
from dask.utils_test import hlg_layer

from dask_sql.datacontainer import DataContainer
Expand Down Expand Up @@ -108,9 +109,11 @@ def _apply_filters(self, table_scan, rel, dc, context):
],
)
df = filter_or_scalar(df, df_condition)
try:
logger.debug(hlg_layer(df.dask, "read-parquet").creation_info)
except KeyError:
pass

if not _dask_expr_enabled():
try:
logger.debug(hlg_layer(df.dask, "read-parquet").creation_info)
except KeyError:
pass

return DataContainer(df, cc)
38 changes: 13 additions & 25 deletions dask_sql/physical/rex/core/call.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
import dask.dataframe as dd
import numpy as np
import pandas as pd
from dask.base import tokenize
from dask.dataframe.core import Series
from dask.highlevelgraph import HighLevelGraph
from dask.utils import random_state_data

from dask_sql._datafusion_lib import SqlTypeName
Expand Down Expand Up @@ -828,37 +825,28 @@ def random_function(self, partition, random_state, kwargs):

def random_frame(self, seed: int, dc: DataContainer, **kwargs) -> dd.Series:
"""This function - in contrast to others in this module - will only ever be called on data frames"""

random_state = np.random.RandomState(seed=seed)

# Idea taken from dask.DataFrame.sample:
# initialize a random state for each of the partitions
# separately and then create a random series
# for each partition
df = dc.df
name = "sample-" + tokenize(df, random_state)

state_data = random_state_data(df.npartitions, random_state)
dsk = {
(name, i): (
self.random_function,
(df._name, i),
np.random.RandomState(state),
kwargs,
state_data = random_state_data(df.npartitions, np.random.RandomState(seed=seed))

def random_partition_func(df, state_data, partition_info=None):
"""Create a random number for each partition"""
partition_index = (
partition_info["number"] if partition_info is not None else 0
)
for i, state in enumerate(state_data)
}

graph = HighLevelGraph.from_collections(name, dsk, dependencies=[df])
random_series = Series(graph, name, ("random", "float64"), df.divisions)
state = np.random.RandomState(state_data[partition_index])
return self.random_function(df, state, kwargs)

random_series = df.map_partitions(
random_partition_func, state_data, meta=("random", "float64")
)

# This part seems to be stupid, but helps us do a very simple
# task without going into the (private) internals of Dask:
# copy all meta information from the original input dataframe
# This is important so that the returned series looks
# exactly like coming from the input dataframe
return_df = df.assign(random=random_series)["random"]
return return_df
return df.assign(random=random_series)["random"]


class RandOperation(BaseRandomOperation):
Expand Down
14 changes: 7 additions & 7 deletions dask_sql/physical/utils/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,10 +304,10 @@ def combine(self, other: DNF | _And | _Or | list | tuple | None) -> DNF:
# Specify functions that must be generated with
# a different API at the dataframe-collection level
_special_op_mappings = {
M.fillna: dd._Frame.fillna,
M.isin: dd._Frame.isin,
M.isna: dd._Frame.isna,
M.astype: dd._Frame.astype,
M.fillna: dd.DataFrame.fillna,
M.isin: dd.DataFrame.isin,
M.isna: dd.DataFrame.isna,
M.astype: dd.DataFrame.astype,
}

# Convert _pass_through_ops to respect "special" mappings
Expand All @@ -316,7 +316,7 @@ def combine(self, other: DNF | _And | _Or | list | tuple | None) -> DNF:

def _preprocess_layers(input_layers):
# NOTE: This is a Layer-specific work-around to deal with
# the fact that `dd._Frame.isin(values)` will add a distinct
# the fact that `dd.DataFrame.isin(values)` will add a distinct
# `MaterializedLayer` for the `values` argument.
# See: https://github.com/dask-contrib/dask-sql/issues/607
skip = set()
Expand Down Expand Up @@ -418,9 +418,9 @@ def _dnf_filter_expression(self, dsk):
func = _blockwise_logical_dnf
elif op == operator.getitem:
func = _blockwise_getitem_dnf
elif op == dd._Frame.isin:
elif op == dd.DataFrame.isin:
func = _blockwise_isin_dnf
elif op == dd._Frame.isna:
elif op == dd.DataFrame.isna:
func = _blockwise_isna_dnf
elif op == operator.inv:
func = _blockwise_inv_dnf
Expand Down
4 changes: 2 additions & 2 deletions docs/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ dependencies:
- sphinx>=4.0.0
- sphinx-tabs
- dask-sphinx-theme>=2.0.3
- dask==2024.1.1
- dask>=2024.1.1
- pandas>=1.4.0
- fugue>=0.7.3
# FIXME: https://github.com/fugue-project/fugue/issues/526
Expand All @@ -19,4 +19,4 @@ dependencies:
- pygments>=2.7.1
- tabulate
- ucx-proc=*=cpu
- rust>=1.72
- rust>=1.73
2 changes: 1 addition & 1 deletion docs/requirements-docs.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
sphinx>=4.0.0
sphinx-tabs
dask-sphinx-theme>=3.0.0
dask==2024.1.1
dask>=2024.1.1
pandas>=1.4.0
fugue>=0.7.3
# FIXME: https://github.com/fugue-project/fugue/issues/526
Expand Down
Loading
Loading