Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
jdye64 committed May 30, 2023
2 parents 856f7c0 + d2e43ef commit 4b86547
Show file tree
Hide file tree
Showing 24 changed files with 300 additions and 90 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/conda.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:
channel-priority: strict
- name: Install dependencies
run: |
mamba install boa conda-verify
mamba install -c conda-forge boa conda-verify
which python
pip list
Expand All @@ -66,6 +66,6 @@ jobs:
LABEL: ${{ github.ref == 'refs/heads/datafusion-sql-planner' && 'dev_datafusion' || 'dev' }}
run: |
# install anaconda for upload
mamba install anaconda-client
mamba install -c conda-forge anaconda-client
anaconda upload --label $LABEL linux-64/*.tar.bz2
5 changes: 5 additions & 0 deletions .github/workflows/test-upstream.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ jobs:
use-mamba: true
python-version: "3.8"
channel-priority: strict
- name: Install Protoc
uses: arduino/setup-protoc@v1
with:
version: '3.x'
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Optionally update upstream cargo dependencies
if: env.which_upstream == 'DataFusion'
env:
Expand Down
2 changes: 2 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ def pytest_runtest_setup(item):
pytest.skip("need --rungpu option to run")
# FIXME: P2P shuffle isn't fully supported on GPU, so we must explicitly disable it
dask.config.set({"dataframe.shuffle.algorithm": "tasks"})
# manually enable cudf decimal support
dask.config.set({"sql.mappings.decimal_support": "cudf"})
else:
dask.config.set({"dataframe.shuffle.algorithm": None})
if "queries" in item.keywords and not item.config.getoption("--runqueries"):
Expand Down
1 change: 1 addition & 0 deletions continuous_integration/environment-3.10-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ channels:
- conda-forge
- nodefaults
dependencies:
- c-compiler
- dask>=2022.3.0
# FIXME: handling is needed for httpx-based fastapi>=0.87.0
- fastapi>=0.69.0,<0.87.0
Expand Down
1 change: 1 addition & 0 deletions continuous_integration/environment-3.8-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ channels:
- conda-forge
- nodefaults
dependencies:
- c-compiler
- dask=2022.3.0
- fastapi=0.69.0
- fugue=0.7.3
Expand Down
1 change: 1 addition & 0 deletions continuous_integration/environment-3.9-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ channels:
- conda-forge
- nodefaults
dependencies:
- c-compiler
- dask>=2022.3.0
# FIXME: handling is needed for httpx-based fastapi>=0.87.0
- fastapi>=0.69.0,<0.87.0
Expand Down
14 changes: 7 additions & 7 deletions continuous_integration/gpuci/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,19 @@ gpuci_logger "Install awscli"
gpuci_mamba_retry install -y -c conda-forge awscli

gpuci_logger "Download parquet dataset"
gpuci_retry aws s3 cp --only-show-errors "${DASK_SQL_BUCKET_NAME}parquet_2gb/" tests/unit/data/ --recursive
gpuci_retry aws s3 cp --only-show-errors "${DASK_SQL_BUCKET_NAME}parquet_2gb_sorted/" tests/unit/data/ --recursive

gpuci_logger "Download query files"
gpuci_retry aws s3 cp --only-show-errors "${DASK_SQL_BUCKET_NAME}queries/" tests/unit/queries/ --recursive

gpuci_logger "Install dask"
python -m pip install git+https://github.com/dask/dask

gpuci_logger "Install distributed"
python -m pip install git+https://github.com/dask/distributed
# TODO: source install once dask/distributed are unpinned by dask-cuda
# gpuci_logger "Install dask"
# python -m pip install git+https://github.com/dask/dask
# gpuci_logger "Install distributed"
# python -m pip install git+https://github.com/dask/distributed

gpuci_logger "Install dask-sql"
pip install -e ".[dev]"
pip install -e . -vv

gpuci_logger "Check Python version"
python --version
Expand Down
3 changes: 2 additions & 1 deletion continuous_integration/gpuci/environment-3.10.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ name: dask-sql
channels:
- rapidsai
- rapidsai-nightly
- nvidia
- conda-forge
- nvidia
- nodefaults
dependencies:
- c-compiler
- dask>=2022.3.0
# FIXME: handling is needed for httpx-based fastapi>=0.87.0
- fastapi>=0.69.0,<0.87.0
Expand Down
3 changes: 2 additions & 1 deletion continuous_integration/gpuci/environment-3.9.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ name: dask-sql
channels:
- rapidsai
- rapidsai-nightly
- nvidia
- conda-forge
- nvidia
- nodefaults
dependencies:
- c-compiler
- dask>=2022.3.0
# FIXME: handling is needed for httpx-based fastapi>=0.87.0
- fastapi>=0.69.0,<0.87.0
Expand Down
4 changes: 4 additions & 0 deletions continuous_integration/recipe/conda_build_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,7 @@ python:
- 3.8
- 3.9
- 3.10
c_compiler_version:
- 11
rust_compiler_version:
- 1.69
3 changes: 2 additions & 1 deletion continuous_integration/recipe/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ build:

requirements:
build:
- {{ compiler('rust') }} >=1.65.0
- {{ compiler('c') }}
- {{ compiler('rust') }}
- setuptools-rust >=1.5.2
host:
- pip
Expand Down
12 changes: 6 additions & 6 deletions dask_planner/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions dask_planner/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub enum DaskPlannerError {
ParserError(ParserError),
TokenizerError(TokenizerError),
Internal(String),
InvalidIOFilter(String),
}

impl Display for DaskPlannerError {
Expand All @@ -23,6 +24,7 @@ impl Display for DaskPlannerError {
Self::ParserError(e) => write!(f, "SQL Parser Error: {e}"),
Self::TokenizerError(e) => write!(f, "SQL Tokenizer Error: {e}"),
Self::Internal(e) => write!(f, "Internal Error: {e}"),
Self::InvalidIOFilter(e) => write!(f, "Invalid pyarrow filter: {e} encountered. Defaulting to Dask CPU/GPU bound task operation"),
}
}
}
Expand Down
143 changes: 141 additions & 2 deletions dask_planner/src/sql/logical/table_scan.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::sync::Arc;

use datafusion_python::{
datafusion_common::DFSchema,
datafusion_expr::{logical_plan::TableScan, LogicalPlan},
datafusion_common::{DFSchema, ScalarValue},
datafusion_expr::{logical_plan::TableScan, Expr, LogicalPlan},
};
use pyo3::prelude::*;

use crate::{
error::DaskPlannerError,
expression::{py_expr_list, PyExpr},
sql::exceptions::py_type_err,
};
Expand All @@ -18,6 +19,138 @@ pub struct PyTableScan {
input: Arc<LogicalPlan>,
}

#[pyclass(name = "FilteredResult", module = "dask_planner", subclass)]
#[derive(Debug, Clone)]
pub struct PyFilteredResult {
// Certain Expr(s) do not have supporting logic in pyarrow for IO filtering
// at read time. Those Expr(s) cannot be ignored however. This field stores
// those Expr(s) so that they can be used on the Python side to create
// Dask operations that handle that filtering as an extra task in the graph.
#[pyo3(get)]
pub io_unfilterable_exprs: Vec<PyExpr>,
// Expr(s) that can have their filtering logic performed in the pyarrow IO logic
// are stored here in a DNF format that is expected by pyarrow.
#[pyo3(get)]
pub filtered_exprs: Vec<(String, String, Vec<PyObject>)>,
}

impl PyTableScan {
/// Ensures that a valid Expr variant type is present
fn _valid_expr_type(expr: &[Expr]) -> bool {
expr.iter()
.all(|f| matches!(f, Expr::Column(_) | Expr::Literal(_)))
}

/// Transform the singular Expr instance into its DNF form serialized in a Vec instance. Possibly recursively expanding
/// it as well if needed.
pub fn _expand_dnf_filter(
filter: &Expr,
py: Python,
) -> Result<Vec<(String, String, Vec<PyObject>)>, DaskPlannerError> {
let mut filter_tuple: Vec<(String, String, Vec<PyObject>)> = Vec::new();

match filter {
Expr::InList {
expr,
list,
negated,
} => {
// Only handle simple Expr(s) for InList operations for now
if PyTableScan::_valid_expr_type(list) {
// While ANSI SQL would not allow for anything other than a Column or Literal
// value in this "identifying" `expr` we explicitly check that here just to be sure.
// IF it is something else it is returned to Dask to handle
let ident = match *expr.clone() {
Expr::Column(col) => Ok(col.name),
Expr::Alias(_, name) => Ok(name),
Expr::Literal(val) => Ok(format!("{}", val)),
_ => Err(DaskPlannerError::InvalidIOFilter(format!(
"Invalid InList Expr type `{}`. using in Dask instead",
filter
))),
};

let op = if *negated { "not in" } else { "in" };
let il: Result<Vec<PyObject>, DaskPlannerError> = list
.iter()
.map(|f| match f {
Expr::Column(col) => Ok(col.name.clone().into_py(py)),
Expr::Alias(_, name) => Ok(name.clone().into_py(py)),
Expr::Literal(val) => match val {
ScalarValue::Boolean(val) => Ok(val.unwrap().into_py(py)),
ScalarValue::Float32(val) => Ok(val.unwrap().into_py(py)),
ScalarValue::Float64(val) => Ok(val.unwrap().into_py(py)),
ScalarValue::Int8(val) => Ok(val.unwrap().into_py(py)),
ScalarValue::Int16(val) => Ok(val.unwrap().into_py(py)),
ScalarValue::Int32(val) => Ok(val.unwrap().into_py(py)),
ScalarValue::Int64(val) => Ok(val.unwrap().into_py(py)),
ScalarValue::UInt8(val) => Ok(val.unwrap().into_py(py)),
ScalarValue::UInt16(val) => Ok(val.unwrap().into_py(py)),
ScalarValue::UInt32(val) => Ok(val.unwrap().into_py(py)),
ScalarValue::UInt64(val) => Ok(val.unwrap().into_py(py)),
ScalarValue::Utf8(val) => Ok(val.clone().unwrap().into_py(py)),
ScalarValue::LargeUtf8(val) => Ok(val.clone().unwrap().into_py(py)),
_ => Err(DaskPlannerError::InvalidIOFilter(format!(
"Unsupported ScalarValue `{}` encountered. using in Dask instead",
filter
))),
},
_ => Ok(f.canonical_name().into_py(py)),
})
.collect();

filter_tuple.push((
ident.unwrap_or(expr.canonical_name()),
op.to_string(),
il?,
));
Ok(filter_tuple)
} else {
let er = DaskPlannerError::InvalidIOFilter(format!(
"Invalid identifying column Expr instance `{}`. using in Dask instead",
filter
));
Err::<Vec<(String, String, Vec<PyObject>)>, DaskPlannerError>(er)
}
}
_ => {
let er = DaskPlannerError::InvalidIOFilter(format!(
"Unable to apply filter: `{}` to IO reader, using in Dask instead",
filter
));
Err::<Vec<(String, String, Vec<PyObject>)>, DaskPlannerError>(er)
}
}
}

/// Consume the `TableScan` filters (Expr(s)) and convert them into a PyArrow understandable
/// DNF format that can be directly passed to PyArrow IO readers for Predicate Pushdown. Expr(s)
/// that cannot be converted to correlating PyArrow IO calls will be returned as is and can be
/// used in the Python logic to form Dask tasks for the graph to do computational filtering.
pub fn _expand_dnf_filters(
input: &Arc<LogicalPlan>,
filters: &[Expr],
py: Python,
) -> PyFilteredResult {
let mut filtered_exprs: Vec<(String, String, Vec<PyObject>)> = Vec::new();
let mut unfiltered_exprs: Vec<PyExpr> = Vec::new();

filters
.iter()
.for_each(|f| match PyTableScan::_expand_dnf_filter(f, py) {
Ok(mut expanded_dnf_filter) => filtered_exprs.append(&mut expanded_dnf_filter),
Err(_e) => {
unfiltered_exprs.push(PyExpr::from(f.clone(), Some(vec![input.clone()])))
}
});

PyFilteredResult {
io_unfilterable_exprs: unfiltered_exprs,
filtered_exprs,
}
}
}

#[pymethods]
impl PyTableScan {
#[pyo3(name = "getTableScanProjects")]
Expand Down Expand Up @@ -45,6 +178,12 @@ impl PyTableScan {
fn scan_filters(&self) -> PyResult<Vec<PyExpr>> {
py_expr_list(&self.input, &self.table_scan.filters)
}

#[pyo3(name = "getDNFFilters")]
fn dnf_io_filters(&self, py: Python) -> PyResult<PyFilteredResult> {
let results = PyTableScan::_expand_dnf_filters(&self.input, &self.table_scan.filters, py);
Ok(results)
}
}

impl TryFrom<LogicalPlan> for PyTableScan {
Expand Down
10 changes: 10 additions & 0 deletions dask_sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pandas as pd
from dask import config as dask_config
from dask.base import optimize
from dask.utils_test import hlg_layer

from dask_planner.rust import (
DaskSchema,
Expand Down Expand Up @@ -247,6 +248,15 @@ def create_table(
if type(input_table) == str:
dc.filepath = input_table
self.schema[schema_name].filepaths[table_name.lower()] = input_table
elif hasattr(input_table, "dask") and dd.utils.is_dataframe_like(input_table):
try:
dask_filepath = hlg_layer(
input_table.dask, "read-parquet"
).creation_info["args"][0]
dc.filepath = dask_filepath
self.schema[schema_name].filepaths[table_name.lower()] = dask_filepath
except KeyError:
logger.debug("Expected 'read-parquet' layer")

if parquet_statistics and not statistics:
statistics = parquet_statistics(dc.df)
Expand Down
Loading

0 comments on commit 4b86547

Please sign in to comment.