diff --git a/python/vegafusion/vegafusion/runtime.py b/python/vegafusion/vegafusion/runtime.py index e4140fa96..997562a88 100644 --- a/python/vegafusion/vegafusion/runtime.py +++ b/python/vegafusion/vegafusion/runtime.py @@ -176,6 +176,52 @@ def _import_or_register_inline_datasets(self, inline_datasets=None): return imported_inline_datasets + def build_pre_transform_spec_plan( + self, + spec, + preserve_interactivity=True, + keep_signals=None, + keep_datasets=None, + ): + """ + Diagnostic function that returns the plan used by the pre_transform_spec method + + :param spec: A Vega specification dict or JSON string + :param preserve_interactivity: If True (default) then the interactive behavior of + the chart will pre preserved. This requires that all the data that participates + in interactions be included in the resulting spec rather than being pre-transformed. + If False, then all possible data transformations are applied even if they break + the original interactive behavior of the chart. + :param keep_signals: Signals from the input spec that must be included in the + pre-transformed spec. A list with elements that are either: + - The name of a top-level signal as a string + - A two-element tuple where the first element is the name of a signal as a string + and the second element is the nested scope of the dataset as a list of integers + :param keep_datasets: Datasets from the input spec that must be included in the + pre-transformed spec. A list with elements that are either: + - The name of a top-level dataset as a string + - A two-element tuple where the first element is the name of a dataset as a string + and the second element is the nested scope of the dataset as a list of integers + :return: + dict with keys: + - "client_spec": Planned client spec + - "server_spec: Planned server spec + - "comm_plan": Communication plan + - "warnings": List of planner warnings + """ + if self._grpc_channel: + raise ValueError("build_pre_transform_spec_plan not yet supported over gRPC") + else: + # Parse input keep signals and datasets + keep_signals = parse_variables(keep_signals) + keep_datasets = parse_variables(keep_datasets) + return self.embedded_runtime.build_pre_transform_spec_plan( + spec, + preserve_interactivity=preserve_interactivity, + keep_signals=keep_signals, + keep_datasets=keep_datasets, + ) + def pre_transform_spec( self, spec, diff --git a/vegafusion-core/src/planning/dependency_graph.rs b/vegafusion-core/src/planning/dependency_graph.rs index b66f36d76..9005544f1 100644 --- a/vegafusion-core/src/planning/dependency_graph.rs +++ b/vegafusion-core/src/planning/dependency_graph.rs @@ -16,6 +16,19 @@ use petgraph::prelude::{DiGraph, EdgeRef, NodeIndex}; use petgraph::Incoming; use std::collections::{HashMap, HashSet}; +pub fn toposort_dependency_graph( + data_graph: &DiGraph<(ScopedVariable, DependencyNodeSupported), ()>, +) -> Result> { + Ok(match toposort(&data_graph, None) { + Ok(v) => v, + Err(err) => { + return Err(VegaFusionError::internal(format!( + "Failed to sort datasets topologically: {err:?}" + ))) + } + }) +} + /// get HashSet of all data variables with fully supported parents that are themselves fully or /// partially supported pub fn get_supported_data_variables( @@ -24,14 +37,7 @@ pub fn get_supported_data_variables( ) -> Result> { let data_graph = build_dependency_graph(chart_spec, config)?; // Sort dataset nodes topologically - let nodes: Vec = match toposort(&data_graph, None) { - Ok(v) => v, - Err(err) => { - return Err(VegaFusionError::internal(format!( - "Failed to sort datasets topologically: {err:?}" - ))) - } - }; + let nodes: Vec = toposort_dependency_graph(&data_graph)?; // Traverse nodes and save those to supported_vars that are supported with all supported // parents diff --git a/vegafusion-core/src/planning/fuse.rs b/vegafusion-core/src/planning/fuse.rs new file mode 100644 index 000000000..ccb187238 --- /dev/null +++ b/vegafusion-core/src/planning/fuse.rs @@ -0,0 +1,386 @@ +use crate::planning::dependency_graph::{build_dependency_graph, toposort_dependency_graph}; +use crate::proto::gen::tasks::VariableNamespace; +use crate::spec::chart::ChartSpec; +use crate::task_graph::graph::ScopedVariable; +use petgraph::prelude::{EdgeRef, NodeIndex}; +use petgraph::Direction; +use vegafusion_common::error::Result; + +/// Optimize the server_spec by fusing datasets. +/// Currently, this function fuses parent datasets into their children when the parent dataset +/// does not include an aggregation or has exactly one child. This has the effect of pushing +/// transforms up through at least the first aggregation into the root dataset's transform +/// pipeline. +pub fn fuse_datasets(server_spec: &mut ChartSpec, do_not_fuse: &[ScopedVariable]) -> Result<()> { + let data_graph = build_dependency_graph(server_spec, &Default::default())?; + let nodes: Vec = toposort_dependency_graph(&data_graph)?; + + 'outer: for node_index in &nodes { + let (parent_var, _) = data_graph.node_weight(*node_index).unwrap(); + let Ok(parent_dataset) = server_spec.get_nested_data(&parent_var.1, &parent_var.0.name).cloned() else { continue }; + + // Don't push down datasets that are required by client + if do_not_fuse.contains(parent_var) { + continue; + } + + // Extract child variables + let edges = data_graph.edges_directed(*node_index, Direction::Outgoing); + let child_vars = edges + .into_iter() + .map(|edge| { + let target_index = edge.target(); + let (child_var, _) = data_graph.node_weight(target_index).unwrap(); + child_var + }) + .collect::>(); + + if child_vars.is_empty() { + continue; + } + + // Don't fuse down datasets with aggregate transforms and multiple children + if parent_dataset.has_aggregate() && child_vars.len() > 1 { + continue; + } + + // Make sure all children are datasets + let all_children_data = child_vars + .iter() + .all(|v| v.0.ns() == VariableNamespace::Data); + if !all_children_data { + continue; + } + + // Make sure all children have this named dataset as source + // (it's possible for a dataset's transform pipeline to reference this dataset, in which + // case we can't fuse.) + for child_var in &child_vars { + let Ok(child_dataset) = server_spec.get_nested_data(&child_var.1, &child_var.0.name) else { continue 'outer}; + if child_dataset.source.as_ref() != Some(&parent_dataset.name) { + continue 'outer; + } + } + + // Extract child datasets + for child_var in &child_vars { + let Ok(child_dataset) = server_spec.get_nested_data_mut(&child_var.1, &child_var.0.name) else { continue 'outer}; + parent_dataset.fuse_into(child_dataset)?; + } + + // Convert parent dataset into a stub + let Ok(parent_dataset) = server_spec.get_nested_data_mut(&parent_var.1, &parent_var.0.name) else { continue }; + parent_dataset.transform = Default::default(); + parent_dataset.source = Default::default(); + parent_dataset.values = Default::default(); + parent_dataset.url = Default::default(); + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use crate::planning::fuse::fuse_datasets; + use crate::proto::gen::tasks::Variable; + use crate::spec::chart::ChartSpec; + use serde_json::json; + + #[test] + fn test_simple_fuse() { + let chart: ChartSpec = serde_json::from_value(json!( + { + "data": [ + { + "name": "data_0", + "url": "path/to/data.json", + "transform": [ + { + "type": "filter", + "expr": "datum.A > 0" + } + ] + }, + { + "name": "data_1", + "source": "data_0", + "transform": [ + { + "type": "formula", + "expr": "datum.A * 2", + "as": "B" + } + ] + }, + { + "name": "data_2", + "source": "data_0", + "transform": [ + { + "type": "formula", + "expr": "datum.A * 3", + "as": "C" + } + ] + }, + { + "name": "data_3", + "source": "data_1", + "transform": [ + { + "type": "formula", + "expr": "datum.A * 4", + "as": "C" + } + ] + }, + ] + } + )) + .unwrap(); + + // try fuse + let mut new_chart = chart.clone(); + fuse_datasets( + &mut new_chart, + &[ + (Variable::new_data("data_2"), vec![]), + (Variable::new_data("data_3"), vec![]), + ], + ) + .unwrap(); + let pretty_output = serde_json::to_string_pretty(&new_chart).unwrap(); + println!("{}", pretty_output); + assert_eq!( + pretty_output, + r#"{ + "$schema": "https://vega.github.io/schema/vega/v5.json", + "data": [ + { + "name": "data_0" + }, + { + "name": "data_1" + }, + { + "name": "data_2", + "url": "path/to/data.json", + "transform": [ + { + "type": "filter", + "expr": "datum.A > 0" + }, + { + "type": "formula", + "expr": "datum.A * 3", + "as": "C" + } + ] + }, + { + "name": "data_3", + "url": "path/to/data.json", + "transform": [ + { + "type": "filter", + "expr": "datum.A > 0" + }, + { + "type": "formula", + "expr": "datum.A * 2", + "as": "B" + }, + { + "type": "formula", + "expr": "datum.A * 4", + "as": "C" + } + ] + } + ] +}"# + ); + // Check that fuse is prevented if parent dataset is needed by client + let mut new_chart = chart.clone(); + fuse_datasets(&mut new_chart, &[(Variable::new_data("data_0"), vec![])]).unwrap(); + let pretty_output = serde_json::to_string_pretty(&new_chart).unwrap(); + println!("{}", pretty_output); + assert_eq!( + pretty_output, + r#"{ + "$schema": "https://vega.github.io/schema/vega/v5.json", + "data": [ + { + "name": "data_0", + "url": "path/to/data.json", + "transform": [ + { + "type": "filter", + "expr": "datum.A > 0" + } + ] + }, + { + "name": "data_1" + }, + { + "name": "data_2", + "source": "data_0", + "transform": [ + { + "type": "formula", + "expr": "datum.A * 3", + "as": "C" + } + ] + }, + { + "name": "data_3", + "source": "data_0", + "transform": [ + { + "type": "formula", + "expr": "datum.A * 2", + "as": "B" + }, + { + "type": "formula", + "expr": "datum.A * 4", + "as": "C" + } + ] + } + ] +}"# + ); + } + + #[test] + fn test_simple_fuse_with_aggregate() { + let chart: ChartSpec = serde_json::from_value(json!( + { + "data": [ + { + "name": "data_0", + "url": "path/to/data.json", + "transform": [ + { + "type": "filter", + "expr": "datum.A > 0" + } + ] + }, + { + "name": "data_1", + "source": "data_0", + "transform": [ + { + "type": "aggregate", + "fields": ["foo", "bar", "baz"], + "ops": ["valid", "sum", "median"], + "groupby": [] + } + ] + }, + { + "name": "data_2", + "source": "data_0", + "transform": [ + { + "type": "formula", + "expr": "datum.bar * 3", + "as": "C" + } + ] + }, + { + "name": "data_3", + "source": "data_1", + "transform": [ + { + "type": "formula", + "expr": "datum.A * 4", + "as": "C" + } + ] + }, + ] + } + )) + .unwrap(); + + // try fuse + let mut new_chart = chart.clone(); + fuse_datasets( + &mut new_chart, + &[ + (Variable::new_data("data_2"), vec![]), + (Variable::new_data("data_3"), vec![]), + ], + ) + .unwrap(); + let pretty_output = serde_json::to_string_pretty(&new_chart).unwrap(); + println!("{}", pretty_output); + assert_eq!( + pretty_output, + r#"{ + "$schema": "https://vega.github.io/schema/vega/v5.json", + "data": [ + { + "name": "data_0" + }, + { + "name": "data_1", + "url": "path/to/data.json", + "transform": [ + { + "type": "filter", + "expr": "datum.A > 0" + }, + { + "type": "aggregate", + "groupby": [], + "fields": [ + "foo", + "bar", + "baz" + ], + "ops": [ + "valid", + "sum", + "median" + ] + } + ] + }, + { + "name": "data_2", + "url": "path/to/data.json", + "transform": [ + { + "type": "filter", + "expr": "datum.A > 0" + }, + { + "type": "formula", + "expr": "datum.bar * 3", + "as": "C" + } + ] + }, + { + "name": "data_3", + "source": "data_1", + "transform": [ + { + "type": "formula", + "expr": "datum.A * 4", + "as": "C" + } + ] + } + ] +}"# + ); + } +} diff --git a/vegafusion-core/src/planning/mod.rs b/vegafusion-core/src/planning/mod.rs index 08fa30b80..4919db681 100644 --- a/vegafusion-core/src/planning/mod.rs +++ b/vegafusion-core/src/planning/mod.rs @@ -1,5 +1,6 @@ pub mod dependency_graph; pub mod extract; +pub mod fuse; pub mod optimize_server; pub mod plan; pub mod projection_pushdown; diff --git a/vegafusion-core/src/planning/plan.rs b/vegafusion-core/src/planning/plan.rs index 49910c983..6305c6ef9 100644 --- a/vegafusion-core/src/planning/plan.rs +++ b/vegafusion-core/src/planning/plan.rs @@ -1,5 +1,6 @@ use crate::error::Result; use crate::planning::extract::extract_server_data; +use crate::planning::fuse::fuse_datasets; use crate::planning::optimize_server::split_data_url_nodes; use crate::planning::projection_pushdown::projection_pushdown; use crate::planning::split_domain_data::split_domain_data; @@ -7,7 +8,7 @@ use crate::planning::stitch::{stitch_specs, CommPlan}; use crate::planning::stringify_local_datetimes::stringify_local_datetimes; use crate::planning::unsupported_data_warning::add_unsupported_data_warnings; use crate::proto::gen::pretransform::{ - pre_transform_spec_warning::WarningType, PreTransformSpecWarning, + pre_transform_spec_warning::WarningType, PlannerWarning, PreTransformSpecWarning, }; use crate::spec::chart::ChartSpec; use crate::task_graph::graph::ScopedVariable; @@ -51,7 +52,17 @@ impl From<&PreTransformSpecWarning> for PreTransformSpecWarningSpec { } } -#[derive(Clone, Debug)] +impl From<&PlannerWarning> for PreTransformSpecWarning { + fn from(value: &PlannerWarning) -> Self { + PreTransformSpecWarning { + warning_type: Some(WarningType::Planner(PlannerWarning { + message: value.message.clone(), + })), + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] pub enum PlannerWarnings { StringifyDatetimeMixedUsage(String), UnsupportedTransforms(String), @@ -75,6 +86,7 @@ pub struct PlannerConfig { pub extract_inline_data: bool, pub extract_server_data: bool, pub allow_client_to_server_comms: bool, + pub fuse_datasets: bool, pub client_only_vars: Vec, pub keep_variables: Vec, } @@ -89,12 +101,28 @@ impl Default for PlannerConfig { extract_inline_data: false, extract_server_data: true, allow_client_to_server_comms: true, + fuse_datasets: true, client_only_vars: Default::default(), keep_variables: Default::default(), } } } +impl PlannerConfig { + pub fn pre_transformed_spec_config( + preserve_interactivity: bool, + keep_variables: Vec, + ) -> PlannerConfig { + PlannerConfig { + stringify_local_datetimes: true, + extract_inline_data: true, + allow_client_to_server_comms: !preserve_interactivity, + keep_variables, + ..Default::default() + } + } +} + #[derive(Debug)] pub struct SpecPlan { pub server_spec: ChartSpec, @@ -166,6 +194,12 @@ impl SpecPlan { split_data_url_nodes(&mut server_spec)?; } + if config.fuse_datasets { + let mut do_not_fuse = config.keep_variables.clone(); + do_not_fuse.extend(comm_plan.server_to_client.clone()); + fuse_datasets(&mut server_spec, do_not_fuse.as_slice())?; + } + if config.stringify_local_datetimes { stringify_local_datetimes( &mut server_spec, diff --git a/vegafusion-core/src/spec/data.rs b/vegafusion-core/src/spec/data.rs index 731a55e42..6202ab670 100644 --- a/vegafusion-core/src/spec/data.rs +++ b/vegafusion-core/src/spec/data.rs @@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::{HashMap, HashSet}; use vegafusion_common::data::table::VegaFusionTable; +use vegafusion_common::error::VegaFusionError; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct DataSpec { @@ -142,6 +143,40 @@ impl DataSpec { Ok(output_local_datetime_columns) } + + /// Fuse this dataset into a child dataset. This mutates the child to include this dataset's + /// source data and transforms. The name of the child is preserved. + pub fn fuse_into(&self, child: &mut DataSpec) -> Result<()> { + if Some(&self.name) != child.source.as_ref() { + return Err(VegaFusionError::internal(format!( + "Incompatible fuse dataset names {:?} and {:?}", + self.name, child.source + ))); + } + if self.on.is_some() { + return Err(VegaFusionError::internal( + "Cannot fuse dataset with \"on\" trigger", + )); + } + + // Copy over source dataset info + child.source = self.source.clone(); + child.url = self.url.clone(); + child.format = self.format.clone(); + child.values = self.values.clone(); + + // Prepend this dataset's transforms + let mut new_transforms = self.transform.clone(); + new_transforms.extend(child.transform.clone()); + child.transform = new_transforms; + Ok(()) + } + + pub fn has_aggregate(&self) -> bool { + self.transform + .iter() + .any(|tx| matches!(tx, &TransformSpec::Aggregate(_))) + } } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Hash, Eq)] diff --git a/vegafusion-python-embed/src/lib.rs b/vegafusion-python-embed/src/lib.rs index 5fdb7e0dd..485699744 100644 --- a/vegafusion-python-embed/src/lib.rs +++ b/vegafusion-python-embed/src/lib.rs @@ -16,9 +16,11 @@ use crate::connection::{PySqlConnection, PySqlDataset}; use crate::dataframe::PyDataFrame; use env_logger::{Builder, Target}; use pythonize::depythonize; +use serde_json::json; use vegafusion_common::data::table::VegaFusionTable; use vegafusion_core::patch::patch_pre_transformed_spec; -use vegafusion_core::planning::plan::PreTransformSpecWarningSpec; +use vegafusion_core::planning::plan::{PlannerConfig, PreTransformSpecWarningSpec, SpecPlan}; +use vegafusion_core::planning::watch::WatchPlan; use vegafusion_core::proto::gen::tasks::Variable; use vegafusion_core::spec::chart::ChartSpec; use vegafusion_core::task_graph::graph::ScopedVariable; @@ -159,6 +161,45 @@ impl PyVegaFusionRuntime { Ok(PyBytes::new(py, &response_bytes).into()) } + #[allow(clippy::too_many_arguments)] + pub fn build_pre_transform_spec_plan( + &self, + spec: PyObject, + preserve_interactivity: Option, + keep_signals: Option)>>, + keep_datasets: Option)>>, + ) -> PyResult { + let spec = parse_json_spec(spec)?; + let preserve_interactivity = preserve_interactivity.unwrap_or(false); + + // Build keep_variables + let mut keep_variables: Vec = Vec::new(); + for (name, scope) in keep_signals.unwrap_or_default() { + keep_variables.push((Variable::new_signal(&name), scope)) + } + for (name, scope) in keep_datasets.unwrap_or_default() { + keep_variables.push((Variable::new_data(&name), scope)) + } + + let plan = SpecPlan::try_new( + &spec, + &PlannerConfig::pre_transformed_spec_config(preserve_interactivity, keep_variables), + )?; + let watch_plan = WatchPlan::from(plan.comm_plan); + + let json_plan = json!({ + "server_spec": plan.server_spec, + "client_spec": plan.client_spec, + "comm_plan": watch_plan, + "warnings": plan.warnings, + }); + + Python::with_gil(|py| -> PyResult { + let py_plan = pythonize::pythonize(py, &json_plan)?; + Ok(py_plan) + }) + } + #[allow(clippy::too_many_arguments)] pub fn pre_transform_spec( &self, diff --git a/vegafusion-runtime/src/task_graph/runtime.rs b/vegafusion-runtime/src/task_graph/runtime.rs index 9395d84ca..87ad58001 100644 --- a/vegafusion-runtime/src/task_graph/runtime.rs +++ b/vegafusion-runtime/src/task_graph/runtime.rs @@ -498,6 +498,7 @@ impl VegaFusionRuntime { split_domain_data: false, projection_pushdown: false, allow_client_to_server_comms: true, + keep_variables: Vec::from(variables), ..Default::default() }, )?; @@ -752,13 +753,7 @@ impl VegaFusionRuntime { // Create spec plan let plan = SpecPlan::try_new( spec, - &PlannerConfig { - stringify_local_datetimes: true, - extract_inline_data: true, - allow_client_to_server_comms: !preserve_interactivity, - keep_variables, - ..Default::default() - }, + &PlannerConfig::pre_transformed_spec_config(preserve_interactivity, keep_variables), )?; // println!("pre transform client_spec: {}", serde_json::to_string_pretty(&plan.client_spec).unwrap()); // println!("pre transform server_spec: {}", serde_json::to_string_pretty(&plan.server_spec).unwrap()); diff --git a/vegafusion-runtime/tests/test_projection_pushdown.rs b/vegafusion-runtime/tests/test_projection_pushdown.rs index 69727fc81..96584e597 100644 --- a/vegafusion-runtime/tests/test_projection_pushdown.rs +++ b/vegafusion-runtime/tests/test_projection_pushdown.rs @@ -35,6 +35,7 @@ mod test_custom_specs { let planner_config = PlannerConfig { projection_pushdown: true, + fuse_datasets: false, ..Default::default() }; let spec_plan = SpecPlan::try_new(&spec, &planner_config).unwrap();