From db7931d10ebf87b43ee196d22e648beb4100bb6d Mon Sep 17 00:00:00 2001 From: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Date: Wed, 31 Jan 2024 10:49:07 -0800 Subject: [PATCH] Rust optimizer improvements (#1199) * create optimizer_config * add join_reorder configs * add verbose_optimizer * add test_dynamic_partition_pruning * skip 3.8 tests * remove dpp default * style * temporarily remove rust changes * temp remove sql.rs changes * readd rust changes * check optimized_plan is Ok * sql.dynamic_partition_pruning.verbose * edit yaml style * edit * sql.optimizer.verbose * lowercase * set object type * Update test_config.py * update version * Unpin dask/distributed for development --------- Co-authored-by: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> --- continuous_integration/docker/conda.txt | 2 +- continuous_integration/docker/main.dockerfile | 2 +- continuous_integration/environment-3.10.yaml | 2 +- continuous_integration/environment-3.11.yaml | 2 +- continuous_integration/environment-3.12.yaml | 2 +- .../gpuci/environment-3.10.yaml | 2 +- .../gpuci/environment-3.9.yaml | 2 +- continuous_integration/recipe/meta.yaml | 2 +- dask_sql/context.py | 33 ++++++-- dask_sql/sql-schema.yaml | 36 +++++++++ dask_sql/sql.yaml | 11 +++ docs/environment.yml | 2 +- docs/requirements-docs.txt | 2 +- pyproject.toml | 4 +- src/lib.rs | 1 + src/sql.rs | 80 ++++++++++++++----- src/sql/optimizer.rs | 23 ++++-- .../optimizer/dynamic_partition_pruning.rs | 18 +++-- src/sql/optimizer/join_reorder.rs | 23 +++--- tests/unit/test_config.py | 57 +++++++++++++ 20 files changed, 244 insertions(+), 62 deletions(-) diff --git a/continuous_integration/docker/conda.txt b/continuous_integration/docker/conda.txt index 72ce2f1b9..270c2febd 100644 --- a/continuous_integration/docker/conda.txt +++ b/continuous_integration/docker/conda.txt @@ -1,5 +1,5 @@ python>=3.9 -dask>=2022.3.0,<=2024.1.1 +dask>=2022.3.0 pandas>=1.4.0 jpype1>=1.0.2 openjdk>=8 diff --git a/continuous_integration/docker/main.dockerfile b/continuous_integration/docker/main.dockerfile index 458735234..78cd46938 100644 --- a/continuous_integration/docker/main.dockerfile +++ b/continuous_integration/docker/main.dockerfile @@ -16,7 +16,7 @@ RUN mamba install -y \ # build requirements "maturin>=1.3,<1.4" \ # core dependencies - "dask>=2022.3.0,<=2024.1.1" \ + "dask>=2022.3.0" \ "pandas>=1.4.0" \ "fastapi>=0.92.0" \ "httpx>=0.24.1" \ diff --git a/continuous_integration/environment-3.10.yaml b/continuous_integration/environment-3.10.yaml index 54de0ce90..912e2c54e 100644 --- a/continuous_integration/environment-3.10.yaml +++ b/continuous_integration/environment-3.10.yaml @@ -3,7 +3,7 @@ channels: - conda-forge dependencies: - c-compiler -- dask>=2022.3.0,<=2024.1.1 +- dask>=2022.3.0 - fastapi>=0.92.0 - fugue>=0.7.3 - httpx>=0.24.1 diff --git a/continuous_integration/environment-3.11.yaml b/continuous_integration/environment-3.11.yaml index 882e225e7..cd77ac8d5 100644 --- a/continuous_integration/environment-3.11.yaml +++ b/continuous_integration/environment-3.11.yaml @@ -3,7 +3,7 @@ channels: - conda-forge dependencies: - c-compiler -- dask>=2022.3.0,<=2024.1.1 +- dask>=2022.3.0 - fastapi>=0.92.0 - fugue>=0.7.3 - httpx>=0.24.1 diff --git a/continuous_integration/environment-3.12.yaml b/continuous_integration/environment-3.12.yaml index 48d56068e..53b52e629 100644 --- a/continuous_integration/environment-3.12.yaml +++ b/continuous_integration/environment-3.12.yaml @@ -3,7 +3,7 @@ channels: - conda-forge dependencies: - c-compiler -- dask>=2022.3.0,<=2024.1.1 +- dask>=2022.3.0 - fastapi>=0.92.0 - fugue>=0.7.3 - httpx>=0.24.1 diff --git a/continuous_integration/gpuci/environment-3.10.yaml b/continuous_integration/gpuci/environment-3.10.yaml index 3f19cca23..8ad4e3fdf 100644 --- a/continuous_integration/gpuci/environment-3.10.yaml +++ b/continuous_integration/gpuci/environment-3.10.yaml @@ -9,7 +9,7 @@ channels: dependencies: - c-compiler - zlib -- dask>=2022.3.0,<=2024.1.1 +- dask>=2022.3.0 - fastapi>=0.92.0 - fugue>=0.7.3 - httpx>=0.24.1 diff --git a/continuous_integration/gpuci/environment-3.9.yaml b/continuous_integration/gpuci/environment-3.9.yaml index 703d5465e..96bec123d 100644 --- a/continuous_integration/gpuci/environment-3.9.yaml +++ b/continuous_integration/gpuci/environment-3.9.yaml @@ -9,7 +9,7 @@ channels: dependencies: - c-compiler - zlib -- dask>=2022.3.0,<=2024.1.1 +- dask>=2022.3.0 - fastapi>=0.92.0 - fugue>=0.7.3 - httpx>=0.24.1 diff --git a/continuous_integration/recipe/meta.yaml b/continuous_integration/recipe/meta.yaml index e30f53efd..60a5aa299 100644 --- a/continuous_integration/recipe/meta.yaml +++ b/continuous_integration/recipe/meta.yaml @@ -32,7 +32,7 @@ requirements: - xz # [linux64] run: - python - - dask >=2022.3.0,<=2024.1.1 + - dask >=2022.3.0 - pandas >=1.4.0 - fastapi >=0.92.0 - httpx >=0.24.1 diff --git a/dask_sql/context.py b/dask_sql/context.py index faab98e90..d20e919e9 100644 --- a/dask_sql/context.py +++ b/dask_sql/context.py @@ -13,6 +13,7 @@ from dask_sql._datafusion_lib import ( DaskSchema, DaskSQLContext, + DaskSQLOptimizerConfig, DaskTable, DFOptimizationException, DFParsingException, @@ -98,13 +99,20 @@ def __init__(self, logging_level=logging.INFO): # A started SQL server (useful for jupyter notebooks) self.sql_server = None - # Create the `DaskSQLContext` Rust context - self.context = DaskSQLContext(self.catalog_name, self.schema_name) - self.context.register_schema(self.schema_name, DaskSchema(self.schema_name)) + # Create the `DaskSQLOptimizerConfig` Rust context + optimizer_config = DaskSQLOptimizerConfig( + dask_config.get("sql.dynamic_partition_pruning"), + dask_config.get("sql.fact_dimension_ratio"), + dask_config.get("sql.max_fact_tables"), + dask_config.get("sql.preserve_user_order"), + dask_config.get("sql.filter_selectivity"), + ) - self.context.apply_dynamic_partition_pruning( - dask_config.get("sql.dynamic_partition_pruning") + # Create the `DaskSQLContext` Rust context + self.context = DaskSQLContext( + self.catalog_name, self.schema_name, optimizer_config ) + self.context.register_schema(self.schema_name, DaskSchema(self.schema_name)) # # Register any default plugins, if nothing was registered before. RelConverter.add_plugin_class(logical.DaskAggregatePlugin, replace=False) @@ -542,11 +550,16 @@ def explain( :obj:`str`: a description of the created relational algebra. """ + dynamic_partition_pruning = dask_config.get("sql.dynamic_partition_pruning") + if not dask_config.get("sql.optimizer.verbose"): + dask_config.set({"sql.dynamic_partition_pruning": False}) + if dataframes is not None: for df_name, df in dataframes.items(): self.create_table(df_name, df, gpu=gpu) _, rel_string = self._get_ral(sql) + dask_config.set({"sql.dynamic_partition_pruning": dynamic_partition_pruning}) return rel_string def visualize(self, sql: str, filename="mydask.png") -> None: # pragma: no cover @@ -799,9 +812,15 @@ def _get_ral(self, sql): """Helper function to turn the sql query into a relational algebra and resulting column names""" logger.debug(f"Entering _get_ral('{sql}')") - self.context.apply_dynamic_partition_pruning( - dask_config.get("sql.dynamic_partition_pruning") + + optimizer_config = DaskSQLOptimizerConfig( + dask_config.get("sql.dynamic_partition_pruning"), + dask_config.get("sql.fact_dimension_ratio"), + dask_config.get("sql.max_fact_tables"), + dask_config.get("sql.preserve_user_order"), + dask_config.get("sql.filter_selectivity"), ) + self.context.set_optimizer_config(optimizer_config) # get the schema of what we currently have registered schemas = self._prepare_schemas() diff --git a/dask_sql/sql-schema.yaml b/dask_sql/sql-schema.yaml index eaab6936a..ace253094 100644 --- a/dask_sql/sql-schema.yaml +++ b/dask_sql/sql-schema.yaml @@ -69,6 +69,42 @@ properties: description: | Whether to apply the dynamic partition pruning optimizer rule. + optimizer: + type: object + properties: + + verbose: + type: boolean + description: | + The dynamic partition pruning optimizer rule can sometimes result in extremely long + c.explain() outputs which are not helpful to the user. Setting this option to true + allows the user to see the entire output, while setting it to false truncates the + output. Default is false. + + fact_dimension_ratio: + type: [number, "null"] + description: | + Ratio of the size of the dimension tables to fact tables. Parameter for dynamic partition + pruning and join reorder optimizer rules. + + max_fact_tables: + type: [integer, "null"] + description: | + Maximum number of fact tables to allow in a join. Parameter for join reorder optimizer + rule. + + preserve_user_order: + type: [boolean, "null"] + description: | + Whether to preserve user-defined order of unfiltered dimensions. Parameter for join + reorder optimizer rule. + + filter_selectivity: + type: [number, "null"] + description: | + Constant to use when determining the number of rows produced by a filtered relation. + Parameter for join reorder optimizer rule. + sort: type: object properties: diff --git a/dask_sql/sql.yaml b/dask_sql/sql.yaml index 42434d20d..13082a85d 100644 --- a/dask_sql/sql.yaml +++ b/dask_sql/sql.yaml @@ -18,6 +18,17 @@ sql: dynamic_partition_pruning: True + optimizer: + verbose: False + + fact_dimension_ratio: null + + max_fact_tables: null + + preserve_user_order: null + + filter_selectivity: null + sort: topk-nelem-limit: 1000000 diff --git a/docs/environment.yml b/docs/environment.yml index 10ab623d6..2d0e08ba0 100644 --- a/docs/environment.yml +++ b/docs/environment.yml @@ -6,7 +6,7 @@ dependencies: - sphinx>=4.0.0 - sphinx-tabs - dask-sphinx-theme>=2.0.3 - - dask>=2022.3.0,<=2024.1.1 + - dask>=2022.3.0 - pandas>=1.4.0 - fugue>=0.7.3 # FIXME: https://github.com/fugue-project/fugue/issues/526 diff --git a/docs/requirements-docs.txt b/docs/requirements-docs.txt index 9fd5d4738..1f2052a92 100644 --- a/docs/requirements-docs.txt +++ b/docs/requirements-docs.txt @@ -1,7 +1,7 @@ sphinx>=4.0.0 sphinx-tabs dask-sphinx-theme>=3.0.0 -dask>=2022.3.0,<=2024.1.1 +dask>=2022.3.0 pandas>=1.4.0 fugue>=0.7.3 # FIXME: https://github.com/fugue-project/fugue/issues/526 diff --git a/pyproject.toml b/pyproject.toml index 3d2ee4843..75ec4519f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,8 +27,8 @@ classifiers = [ readme = "README.md" requires-python = ">=3.9" dependencies = [ - "dask[dataframe]>=2022.3.0,<=2024.1.1", - "distributed>=2022.3.0,<=2024.1.1", + "dask[dataframe]>=2022.3.0", + "distributed>=2022.3.0", "pandas>=1.4.0", "fastapi>=0.92.0", "httpx>=0.24.1", diff --git a/src/lib.rs b/src/lib.rs index 921478973..51b3ecfe4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,6 +29,7 @@ fn _datafusion_lib(py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; // Exceptions m.add( diff --git a/src/sql.rs b/src/sql.rs index 4d7908eee..f0f23d4d2 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -87,7 +87,37 @@ pub struct DaskSQLContext { current_schema: String, schemas: HashMap, options: ConfigOptions, + optimizer_config: DaskSQLOptimizerConfig, +} + +#[pyclass(name = "DaskSQLOptimizerConfig", module = "dask_sql", subclass)] +#[derive(Debug, Clone)] +pub struct DaskSQLOptimizerConfig { dynamic_partition_pruning: bool, + fact_dimension_ratio: Option, + max_fact_tables: Option, + preserve_user_order: Option, + filter_selectivity: Option, +} + +#[pymethods] +impl DaskSQLOptimizerConfig { + #[new] + pub fn new( + dynamic_partition_pruning: bool, + fact_dimension_ratio: Option, + max_fact_tables: Option, + preserve_user_order: Option, + filter_selectivity: Option, + ) -> Self { + Self { + dynamic_partition_pruning, + fact_dimension_ratio, + max_fact_tables, + preserve_user_order, + filter_selectivity, + } + } } impl ContextProvider for DaskSQLContext { @@ -478,18 +508,22 @@ impl ContextProvider for DaskSQLContext { #[pymethods] impl DaskSQLContext { #[new] - pub fn new(default_catalog_name: &str, default_schema_name: &str) -> Self { + pub fn new( + default_catalog_name: &str, + default_schema_name: &str, + optimizer_config: DaskSQLOptimizerConfig, + ) -> Self { Self { current_catalog: default_catalog_name.to_owned(), current_schema: default_schema_name.to_owned(), schemas: HashMap::new(), options: ConfigOptions::new(), - dynamic_partition_pruning: false, + optimizer_config, } } - pub fn apply_dynamic_partition_pruning(&mut self, config: bool) -> PyResult<()> { - self.dynamic_partition_pruning = config; + pub fn set_optimizer_config(&mut self, config: DaskSQLOptimizerConfig) -> PyResult<()> { + self.optimizer_config = config; Ok(()) } @@ -591,23 +625,29 @@ impl DaskSQLContext { Ok(existing_plan) } _ => { - let optimized_plan = optimizer::DaskSqlOptimizer::new() - .optimize(existing_plan.original_plan) - .map(|k| PyLogicalPlan { - original_plan: k, - current_node: None, - }) - .map_err(py_optimization_exp); - + let optimized_plan = optimizer::DaskSqlOptimizer::new( + self.optimizer_config.fact_dimension_ratio, + self.optimizer_config.max_fact_tables, + self.optimizer_config.preserve_user_order, + self.optimizer_config.filter_selectivity, + ) + .optimize(existing_plan.original_plan) + .map(|k| PyLogicalPlan { + original_plan: k, + current_node: None, + }) + .map_err(py_optimization_exp); if let Ok(optimized_plan) = optimized_plan { - if self.dynamic_partition_pruning { - optimizer::DaskSqlOptimizer::dynamic_partition_pruner() - .optimize_once(optimized_plan.original_plan) - .map(|k| PyLogicalPlan { - original_plan: k, - current_node: None, - }) - .map_err(py_optimization_exp) + if self.optimizer_config.dynamic_partition_pruning { + optimizer::DaskSqlOptimizer::dynamic_partition_pruner( + self.optimizer_config.fact_dimension_ratio, + ) + .optimize_once(optimized_plan.original_plan) + .map(|k| PyLogicalPlan { + original_plan: k, + current_node: None, + }) + .map_err(py_optimization_exp) } else { Ok(optimized_plan) } diff --git a/src/sql/optimizer.rs b/src/sql/optimizer.rs index 85f335572..2c8be3a56 100644 --- a/src/sql/optimizer.rs +++ b/src/sql/optimizer.rs @@ -42,7 +42,12 @@ pub struct DaskSqlOptimizer { impl DaskSqlOptimizer { /// Creates a new instance of the DaskSqlOptimizer with all the DataFusion desired /// optimizers as well as any custom `OptimizerRule` trait impls that might be desired. - pub fn new() -> Self { + pub fn new( + fact_dimension_ratio: Option, + max_fact_tables: Option, + preserve_user_order: Option, + filter_selectivity: Option, + ) -> Self { debug!("Creating new instance of DaskSqlOptimizer"); let rules: Vec> = vec![ @@ -75,7 +80,12 @@ impl DaskSqlOptimizer { Arc::new(PushDownFilter::new()), // Arc::new(SingleDistinctToGroupBy::new()), // Dask-SQL specific optimizations - Arc::new(JoinReorder::default()), + Arc::new(JoinReorder::new( + fact_dimension_ratio, + max_fact_tables, + preserve_user_order, + filter_selectivity, + )), // The previous optimizations added expressions and projections, // that might benefit from the following rules Arc::new(SimplifyExpressions::new()), @@ -94,9 +104,10 @@ impl DaskSqlOptimizer { // Create a separate instance of this optimization rule, since we want to ensure that it only // runs one time - pub fn dynamic_partition_pruner() -> Self { - let rule: Vec> = - vec![Arc::new(DynamicPartitionPruning::new())]; + pub fn dynamic_partition_pruner(fact_dimension_ratio: Option) -> Self { + let rule: Vec> = vec![Arc::new( + DynamicPartitionPruning::new(fact_dimension_ratio.unwrap_or(0.3)), + )]; Self { optimizer: Optimizer::with_rules(rule), @@ -170,7 +181,7 @@ mod tests { let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap(); // optimize the logical plan - let optimizer = DaskSqlOptimizer::new(); + let optimizer = DaskSqlOptimizer::new(None, None, None, None); optimizer.optimize(plan) } diff --git a/src/sql/optimizer/dynamic_partition_pruning.rs b/src/sql/optimizer/dynamic_partition_pruning.rs index d94359f3a..b6ab70508 100644 --- a/src/sql/optimizer/dynamic_partition_pruning.rs +++ b/src/sql/optimizer/dynamic_partition_pruning.rs @@ -36,11 +36,16 @@ use log::warn; use crate::sql::table::DaskTableSource; // Optimizer rule for dynamic partition pruning -pub struct DynamicPartitionPruning {} +pub struct DynamicPartitionPruning { + /// Ratio of the size of the dimension tables to fact tables + fact_dimension_ratio: f64, +} impl DynamicPartitionPruning { - pub fn new() -> Self { - Self {} + pub fn new(fact_dimension_ratio: f64) -> Self { + Self { + fact_dimension_ratio, + } } } @@ -106,9 +111,6 @@ impl OptimizerRule for DynamicPartitionPruning { (left_table.unwrap(), right_table.unwrap()); let (left_field, right_field) = (left_field.unwrap(), right_field.unwrap()); - // TODO: Consider allowing the fact_dimension_ratio to be configured by the - // user. See issue: https://github.com/dask-contrib/dask-sql/issues/1121 - let fact_dimension_ratio = 0.3; let (mut left_filtered_table, mut right_filtered_table) = (None, None); // Check if join uses an alias instead of the table name itself. Need to use @@ -136,7 +138,7 @@ impl OptimizerRule for DynamicPartitionPruning { .size .unwrap_or(largest_size as usize) as f64 / largest_size - < fact_dimension_ratio + < self.fact_dimension_ratio { left_filtered_table = read_table(left_table.clone(), left_field.clone(), tables.clone()); @@ -149,7 +151,7 @@ impl OptimizerRule for DynamicPartitionPruning { .size .unwrap_or(largest_size as usize) as f64 / largest_size - < fact_dimension_ratio + < self.fact_dimension_ratio { right_filtered_table = read_table(right_table.clone(), right_field.clone(), tables.clone()); diff --git a/src/sql/optimizer/join_reorder.rs b/src/sql/optimizer/join_reorder.rs index 8997f9d96..163ca2069 100644 --- a/src/sql/optimizer/join_reorder.rs +++ b/src/sql/optimizer/join_reorder.rs @@ -13,10 +13,10 @@ use log::warn; use crate::sql::table::DaskTableSource; pub struct JoinReorder { - /// Maximum number of fact tables to allow in a join - max_fact_tables: usize, /// Ratio of the size of the dimension tables to fact tables fact_dimension_ratio: f64, + /// Maximum number of fact tables to allow in a join + max_fact_tables: usize, /// Whether to preserve user-defined order of unfiltered dimensions preserve_user_order: bool, /// Constant to use when determining the number of rows produced by a @@ -24,14 +24,19 @@ pub struct JoinReorder { filter_selectivity: f64, } -impl Default for JoinReorder { - fn default() -> Self { +impl JoinReorder { + pub fn new( + fact_dimension_ratio: Option, + max_fact_tables: Option, + preserve_user_order: Option, + filter_selectivity: Option, + ) -> Self { Self { - max_fact_tables: 2, - // FIXME: fact_dimension_ratio should be 0.3 - fact_dimension_ratio: 0.7, - preserve_user_order: true, - filter_selectivity: 1.0, + // FIXME: Default value for fact_dimension_ratio should be 0.3, not 0.7 + fact_dimension_ratio: fact_dimension_ratio.unwrap_or(0.7), + max_fact_tables: max_fact_tables.unwrap_or(2), + preserve_user_order: preserve_user_order.unwrap_or(true), + filter_selectivity: filter_selectivity.unwrap_or(1.0), } } } diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 56406244d..ad4fb2883 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -1,12 +1,16 @@ import os +import sys from unittest import mock +import dask.dataframe as dd +import pandas as pd import pytest import yaml from dask import config as dask_config # Required to instantiate default sql config import dask_sql # noqa: F401 +from dask_sql import Context def test_custom_yaml(tmpdir): @@ -96,3 +100,56 @@ def test_dask_setconfig(): assert dask_config.get("sql.foo") == {"bar": 1, "baz": "2"} assert dask_config.get("sql.foo") == {"bar": 1} dask_config.refresh() + + +@pytest.mark.skipif( + sys.version_info < (3, 10), + reason="Writing and reading the Dask DataFrame causes a ProtocolError", +) +def test_dynamic_partition_pruning(tmpdir): + c = Context() + + df1 = pd.DataFrame( + { + "x": [1, 2, 3], + "z": [7, 8, 9], + }, + ) + dd.from_pandas(df1, npartitions=3).to_parquet(os.path.join(tmpdir, "df1")) + df1 = dd.read_parquet(os.path.join(tmpdir, "df1")) + c.create_table("df1", df1) + + df2 = pd.DataFrame( + { + "x": [1, 2, 3] * 1000, + "y": [4, 5, 6] * 1000, + }, + ) + dd.from_pandas(df2, npartitions=3).to_parquet(os.path.join(tmpdir, "df2")) + df2 = dd.read_parquet(os.path.join(tmpdir, "df2")) + c.create_table("df2", df2) + + query = "SELECT * FROM df1, df2 WHERE df1.x = df2.x AND df1.z=7" + inlist_expr = "df2.x IN ([Int64(1)])" + + # Default value is False + dask_config.set({"sql.optimizer.verbose": True}) + + # When DPP is turned off, the explain output will not contain the INLIST expression + dask_config.set({"sql.dynamic_partition_pruning": False}) + explain_string = c.explain(query) + assert inlist_expr not in explain_string + + # When DPP is turned on but sql.optimizer.verbose is off, the explain output will not contain the + # INLIST expression + dask_config.set({"sql.dynamic_partition_pruning": True}) + dask_config.set({"sql.optimizer.verbose": False}) + explain_string = c.explain(query) + assert inlist_expr not in explain_string + + # When both DPP and sql.optimizer.verbose are turned on, the explain output will contain the INLIST + # expression + dask_config.set({"sql.dynamic_partition_pruning": True}) + dask_config.set({"sql.optimizer.verbose": True}) + explain_string = c.explain(query) + assert inlist_expr in explain_string