Skip to content

Commit

Permalink
Rust optimizer improvements (#1199)
Browse files Browse the repository at this point in the history
* create optimizer_config

* add join_reorder configs

* add verbose_optimizer

* add test_dynamic_partition_pruning

* skip 3.8 tests

* remove dpp default

* style

* temporarily remove rust changes

* temp remove sql.rs changes

* readd rust changes

* check optimized_plan is Ok

* sql.dynamic_partition_pruning.verbose

* edit yaml style

* edit

* sql.optimizer.verbose

* lowercase

* set object type

* Update test_config.py

* update version

* Unpin dask/distributed for development

---------

Co-authored-by: Charles Blackmon-Luca <[email protected]>
  • Loading branch information
sarahyurick and charlesbluca authored Jan 31, 2024
1 parent d6bbc14 commit db7931d
Show file tree
Hide file tree
Showing 20 changed files with 244 additions and 62 deletions.
2 changes: 1 addition & 1 deletion continuous_integration/docker/conda.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
python>=3.9
dask>=2022.3.0,<=2024.1.1
dask>=2022.3.0
pandas>=1.4.0
jpype1>=1.0.2
openjdk>=8
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/docker/main.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ RUN mamba install -y \
# build requirements
"maturin>=1.3,<1.4" \
# core dependencies
"dask>=2022.3.0,<=2024.1.1" \
"dask>=2022.3.0" \
"pandas>=1.4.0" \
"fastapi>=0.92.0" \
"httpx>=0.24.1" \
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/environment-3.10.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ channels:
- conda-forge
dependencies:
- c-compiler
- dask>=2022.3.0,<=2024.1.1
- dask>=2022.3.0
- fastapi>=0.92.0
- fugue>=0.7.3
- httpx>=0.24.1
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/environment-3.11.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ channels:
- conda-forge
dependencies:
- c-compiler
- dask>=2022.3.0,<=2024.1.1
- dask>=2022.3.0
- fastapi>=0.92.0
- fugue>=0.7.3
- httpx>=0.24.1
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/environment-3.12.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ channels:
- conda-forge
dependencies:
- c-compiler
- dask>=2022.3.0,<=2024.1.1
- dask>=2022.3.0
- fastapi>=0.92.0
- fugue>=0.7.3
- httpx>=0.24.1
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/gpuci/environment-3.10.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ channels:
dependencies:
- c-compiler
- zlib
- dask>=2022.3.0,<=2024.1.1
- dask>=2022.3.0
- fastapi>=0.92.0
- fugue>=0.7.3
- httpx>=0.24.1
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/gpuci/environment-3.9.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ channels:
dependencies:
- c-compiler
- zlib
- dask>=2022.3.0,<=2024.1.1
- dask>=2022.3.0
- fastapi>=0.92.0
- fugue>=0.7.3
- httpx>=0.24.1
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/recipe/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ requirements:
- xz # [linux64]
run:
- python
- dask >=2022.3.0,<=2024.1.1
- dask >=2022.3.0
- pandas >=1.4.0
- fastapi >=0.92.0
- httpx >=0.24.1
Expand Down
33 changes: 26 additions & 7 deletions dask_sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from dask_sql._datafusion_lib import (
DaskSchema,
DaskSQLContext,
DaskSQLOptimizerConfig,
DaskTable,
DFOptimizationException,
DFParsingException,
Expand Down Expand Up @@ -98,13 +99,20 @@ def __init__(self, logging_level=logging.INFO):
# A started SQL server (useful for jupyter notebooks)
self.sql_server = None

# Create the `DaskSQLContext` Rust context
self.context = DaskSQLContext(self.catalog_name, self.schema_name)
self.context.register_schema(self.schema_name, DaskSchema(self.schema_name))
# Create the `DaskSQLOptimizerConfig` Rust context
optimizer_config = DaskSQLOptimizerConfig(
dask_config.get("sql.dynamic_partition_pruning"),
dask_config.get("sql.fact_dimension_ratio"),
dask_config.get("sql.max_fact_tables"),
dask_config.get("sql.preserve_user_order"),
dask_config.get("sql.filter_selectivity"),
)

self.context.apply_dynamic_partition_pruning(
dask_config.get("sql.dynamic_partition_pruning")
# Create the `DaskSQLContext` Rust context
self.context = DaskSQLContext(
self.catalog_name, self.schema_name, optimizer_config
)
self.context.register_schema(self.schema_name, DaskSchema(self.schema_name))

# # Register any default plugins, if nothing was registered before.
RelConverter.add_plugin_class(logical.DaskAggregatePlugin, replace=False)
Expand Down Expand Up @@ -542,11 +550,16 @@ def explain(
:obj:`str`: a description of the created relational algebra.
"""
dynamic_partition_pruning = dask_config.get("sql.dynamic_partition_pruning")
if not dask_config.get("sql.optimizer.verbose"):
dask_config.set({"sql.dynamic_partition_pruning": False})

if dataframes is not None:
for df_name, df in dataframes.items():
self.create_table(df_name, df, gpu=gpu)

_, rel_string = self._get_ral(sql)
dask_config.set({"sql.dynamic_partition_pruning": dynamic_partition_pruning})
return rel_string

def visualize(self, sql: str, filename="mydask.png") -> None: # pragma: no cover
Expand Down Expand Up @@ -799,9 +812,15 @@ def _get_ral(self, sql):
"""Helper function to turn the sql query into a relational algebra and resulting column names"""

logger.debug(f"Entering _get_ral('{sql}')")
self.context.apply_dynamic_partition_pruning(
dask_config.get("sql.dynamic_partition_pruning")

optimizer_config = DaskSQLOptimizerConfig(
dask_config.get("sql.dynamic_partition_pruning"),
dask_config.get("sql.fact_dimension_ratio"),
dask_config.get("sql.max_fact_tables"),
dask_config.get("sql.preserve_user_order"),
dask_config.get("sql.filter_selectivity"),
)
self.context.set_optimizer_config(optimizer_config)

# get the schema of what we currently have registered
schemas = self._prepare_schemas()
Expand Down
36 changes: 36 additions & 0 deletions dask_sql/sql-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,42 @@ properties:
description: |
Whether to apply the dynamic partition pruning optimizer rule.
optimizer:
type: object
properties:

verbose:
type: boolean
description: |
The dynamic partition pruning optimizer rule can sometimes result in extremely long
c.explain() outputs which are not helpful to the user. Setting this option to true
allows the user to see the entire output, while setting it to false truncates the
output. Default is false.
fact_dimension_ratio:
type: [number, "null"]
description: |
Ratio of the size of the dimension tables to fact tables. Parameter for dynamic partition
pruning and join reorder optimizer rules.
max_fact_tables:
type: [integer, "null"]
description: |
Maximum number of fact tables to allow in a join. Parameter for join reorder optimizer
rule.
preserve_user_order:
type: [boolean, "null"]
description: |
Whether to preserve user-defined order of unfiltered dimensions. Parameter for join
reorder optimizer rule.
filter_selectivity:
type: [number, "null"]
description: |
Constant to use when determining the number of rows produced by a filtered relation.
Parameter for join reorder optimizer rule.
sort:
type: object
properties:
Expand Down
11 changes: 11 additions & 0 deletions dask_sql/sql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@ sql:

dynamic_partition_pruning: True

optimizer:
verbose: False

fact_dimension_ratio: null

max_fact_tables: null

preserve_user_order: null

filter_selectivity: null

sort:
topk-nelem-limit: 1000000

Expand Down
2 changes: 1 addition & 1 deletion docs/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ dependencies:
- sphinx>=4.0.0
- sphinx-tabs
- dask-sphinx-theme>=2.0.3
- dask>=2022.3.0,<=2024.1.1
- dask>=2022.3.0
- pandas>=1.4.0
- fugue>=0.7.3
# FIXME: https://github.com/fugue-project/fugue/issues/526
Expand Down
2 changes: 1 addition & 1 deletion docs/requirements-docs.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
sphinx>=4.0.0
sphinx-tabs
dask-sphinx-theme>=3.0.0
dask>=2022.3.0,<=2024.1.1
dask>=2022.3.0
pandas>=1.4.0
fugue>=0.7.3
# FIXME: https://github.com/fugue-project/fugue/issues/526
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ classifiers = [
readme = "README.md"
requires-python = ">=3.9"
dependencies = [
"dask[dataframe]>=2022.3.0,<=2024.1.1",
"distributed>=2022.3.0,<=2024.1.1",
"dask[dataframe]>=2022.3.0",
"distributed>=2022.3.0",
"pandas>=1.4.0",
"fastapi>=0.92.0",
"httpx>=0.24.1",
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ fn _datafusion_lib(py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<sql::function::DaskFunction>()?;
m.add_class::<sql::table::DaskStatistics>()?;
m.add_class::<sql::logical::PyLogicalPlan>()?;
m.add_class::<sql::DaskSQLOptimizerConfig>()?;

// Exceptions
m.add(
Expand Down
80 changes: 60 additions & 20 deletions src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,37 @@ pub struct DaskSQLContext {
current_schema: String,
schemas: HashMap<String, schema::DaskSchema>,
options: ConfigOptions,
optimizer_config: DaskSQLOptimizerConfig,
}

#[pyclass(name = "DaskSQLOptimizerConfig", module = "dask_sql", subclass)]
#[derive(Debug, Clone)]
pub struct DaskSQLOptimizerConfig {
dynamic_partition_pruning: bool,
fact_dimension_ratio: Option<f64>,
max_fact_tables: Option<usize>,
preserve_user_order: Option<bool>,
filter_selectivity: Option<f64>,
}

#[pymethods]
impl DaskSQLOptimizerConfig {
#[new]
pub fn new(
dynamic_partition_pruning: bool,
fact_dimension_ratio: Option<f64>,
max_fact_tables: Option<usize>,
preserve_user_order: Option<bool>,
filter_selectivity: Option<f64>,
) -> Self {
Self {
dynamic_partition_pruning,
fact_dimension_ratio,
max_fact_tables,
preserve_user_order,
filter_selectivity,
}
}
}

impl ContextProvider for DaskSQLContext {
Expand Down Expand Up @@ -478,18 +508,22 @@ impl ContextProvider for DaskSQLContext {
#[pymethods]
impl DaskSQLContext {
#[new]
pub fn new(default_catalog_name: &str, default_schema_name: &str) -> Self {
pub fn new(
default_catalog_name: &str,
default_schema_name: &str,
optimizer_config: DaskSQLOptimizerConfig,
) -> Self {
Self {
current_catalog: default_catalog_name.to_owned(),
current_schema: default_schema_name.to_owned(),
schemas: HashMap::new(),
options: ConfigOptions::new(),
dynamic_partition_pruning: false,
optimizer_config,
}
}

pub fn apply_dynamic_partition_pruning(&mut self, config: bool) -> PyResult<()> {
self.dynamic_partition_pruning = config;
pub fn set_optimizer_config(&mut self, config: DaskSQLOptimizerConfig) -> PyResult<()> {
self.optimizer_config = config;
Ok(())
}

Expand Down Expand Up @@ -591,23 +625,29 @@ impl DaskSQLContext {
Ok(existing_plan)
}
_ => {
let optimized_plan = optimizer::DaskSqlOptimizer::new()
.optimize(existing_plan.original_plan)
.map(|k| PyLogicalPlan {
original_plan: k,
current_node: None,
})
.map_err(py_optimization_exp);

let optimized_plan = optimizer::DaskSqlOptimizer::new(
self.optimizer_config.fact_dimension_ratio,
self.optimizer_config.max_fact_tables,
self.optimizer_config.preserve_user_order,
self.optimizer_config.filter_selectivity,
)
.optimize(existing_plan.original_plan)
.map(|k| PyLogicalPlan {
original_plan: k,
current_node: None,
})
.map_err(py_optimization_exp);
if let Ok(optimized_plan) = optimized_plan {
if self.dynamic_partition_pruning {
optimizer::DaskSqlOptimizer::dynamic_partition_pruner()
.optimize_once(optimized_plan.original_plan)
.map(|k| PyLogicalPlan {
original_plan: k,
current_node: None,
})
.map_err(py_optimization_exp)
if self.optimizer_config.dynamic_partition_pruning {
optimizer::DaskSqlOptimizer::dynamic_partition_pruner(
self.optimizer_config.fact_dimension_ratio,
)
.optimize_once(optimized_plan.original_plan)
.map(|k| PyLogicalPlan {
original_plan: k,
current_node: None,
})
.map_err(py_optimization_exp)
} else {
Ok(optimized_plan)
}
Expand Down
23 changes: 17 additions & 6 deletions src/sql/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ pub struct DaskSqlOptimizer {
impl DaskSqlOptimizer {
/// Creates a new instance of the DaskSqlOptimizer with all the DataFusion desired
/// optimizers as well as any custom `OptimizerRule` trait impls that might be desired.
pub fn new() -> Self {
pub fn new(
fact_dimension_ratio: Option<f64>,
max_fact_tables: Option<usize>,
preserve_user_order: Option<bool>,
filter_selectivity: Option<f64>,
) -> Self {
debug!("Creating new instance of DaskSqlOptimizer");

let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
Expand Down Expand Up @@ -75,7 +80,12 @@ impl DaskSqlOptimizer {
Arc::new(PushDownFilter::new()),
// Arc::new(SingleDistinctToGroupBy::new()),
// Dask-SQL specific optimizations
Arc::new(JoinReorder::default()),
Arc::new(JoinReorder::new(
fact_dimension_ratio,
max_fact_tables,
preserve_user_order,
filter_selectivity,
)),
// The previous optimizations added expressions and projections,
// that might benefit from the following rules
Arc::new(SimplifyExpressions::new()),
Expand All @@ -94,9 +104,10 @@ impl DaskSqlOptimizer {

// Create a separate instance of this optimization rule, since we want to ensure that it only
// runs one time
pub fn dynamic_partition_pruner() -> Self {
let rule: Vec<Arc<dyn OptimizerRule + Sync + Send>> =
vec![Arc::new(DynamicPartitionPruning::new())];
pub fn dynamic_partition_pruner(fact_dimension_ratio: Option<f64>) -> Self {
let rule: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![Arc::new(
DynamicPartitionPruning::new(fact_dimension_ratio.unwrap_or(0.3)),
)];

Self {
optimizer: Optimizer::with_rules(rule),
Expand Down Expand Up @@ -170,7 +181,7 @@ mod tests {
let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap();

// optimize the logical plan
let optimizer = DaskSqlOptimizer::new();
let optimizer = DaskSqlOptimizer::new(None, None, None, None);
optimizer.optimize(plan)
}

Expand Down
Loading

0 comments on commit db7931d

Please sign in to comment.