Skip to content

Commit

Permalink
Lift aggregate transforms out of facets
Browse files Browse the repository at this point in the history
  • Loading branch information
jonmmease committed Nov 2, 2023
1 parent 849ffb3 commit 86ce9ca
Show file tree
Hide file tree
Showing 19 changed files with 1,250 additions and 28 deletions.
2 changes: 1 addition & 1 deletion vegafusion-core/src/patch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ fn arrayify_int_key_objects(obj: &Value) -> Value {
}
}
Value::Array(arr) => {
Value::Array(arr.iter().map(|v| arrayify_int_key_objects(v)).collect())
Value::Array(arr.iter().map(arrayify_int_key_objects).collect())
}
_ => obj.clone(),
}
Expand Down
118 changes: 100 additions & 18 deletions vegafusion-core/src/planning/dependency_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use petgraph::prelude::{DiGraph, EdgeRef, NodeIndex};
use petgraph::Incoming;
use std::collections::{HashMap, HashSet};

pub fn toposort_dependency_graph(
data_graph: &DiGraph<(ScopedVariable, DependencyNodeSupported), ()>,
) -> Result<Vec<NodeIndex>> {
pub type DependencyGraph = DiGraph<(ScopedVariable, DependencyNodeSupported), ()>;

pub fn toposort_dependency_graph(data_graph: &DependencyGraph) -> Result<Vec<NodeIndex>> {
Ok(match toposort(&data_graph, None) {
Ok(v) => v,
Err(err) => {
Expand All @@ -35,7 +35,7 @@ pub fn get_supported_data_variables(
chart_spec: &ChartSpec,
config: &PlannerConfig,
) -> Result<HashMap<ScopedVariable, DependencyNodeSupported>> {
let data_graph = build_dependency_graph(chart_spec, config)?;
let (data_graph, _) = build_dependency_graph(chart_spec, config)?;
// Sort dataset nodes topologically
let nodes: Vec<NodeIndex> = toposort_dependency_graph(&data_graph)?;

Expand Down Expand Up @@ -129,7 +129,7 @@ pub fn get_supported_data_variables(
pub fn build_dependency_graph(
chart_spec: &ChartSpec,
config: &PlannerConfig,
) -> Result<DiGraph<(ScopedVariable, DependencyNodeSupported), ()>> {
) -> Result<(DependencyGraph, HashMap<ScopedVariable, NodeIndex>)> {
let task_scope = chart_spec.to_task_scope()?;

// Initialize graph with nodes
Expand All @@ -144,16 +144,17 @@ pub fn build_dependency_graph(
);
chart_spec.walk(&mut edges_visitor)?;

Ok(nodes_visitor.dependency_graph)
Ok((nodes_visitor.dependency_graph, nodes_visitor.node_indexes))
}

/// Visitor to initialize directed graph with nodes for each dataset (no edges yet)
#[derive(Debug)]
pub struct AddDependencyNodesVisitor<'a> {
pub dependency_graph: DiGraph<(ScopedVariable, DependencyNodeSupported), ()>,
pub dependency_graph: DependencyGraph,
pub node_indexes: HashMap<ScopedVariable, NodeIndex>,
planner_config: &'a PlannerConfig,
task_scope: &'a TaskScope,
mark_index: u32,
}

impl<'a> AddDependencyNodesVisitor<'a> {
Expand All @@ -174,6 +175,7 @@ impl<'a> AddDependencyNodesVisitor<'a> {
node_indexes,
planner_config,
task_scope,
mark_index: 0,
}
}
}
Expand Down Expand Up @@ -223,14 +225,19 @@ impl<'a> ChartVisitor for AddDependencyNodesVisitor<'a> {
}

fn visit_non_group_mark(&mut self, mark: &MarkSpec, scope: &[u32]) -> Result<()> {
// Named non-group marks can serve as datasets
if let Some(name) = &mark.name {
let scoped_var = (Variable::new_data(name), Vec::from(scope));
let node_index = self
.dependency_graph
.add_node((scoped_var.clone(), DependencyNodeSupported::Unsupported));
self.node_indexes.insert(scoped_var, node_index);
}
// non-group marks can serve as datasets
let name = mark
.name
.clone()
.unwrap_or_else(|| format!("unnamed_mark_{}", self.mark_index));
self.mark_index += 1;

let scoped_var = (Variable::new_data(&name), Vec::from(scope));
let node_index = self
.dependency_graph
.add_node((scoped_var.clone(), DependencyNodeSupported::Unsupported));
self.node_indexes.insert(scoped_var, node_index);

Ok(())
}

Expand All @@ -247,7 +254,7 @@ impl<'a> ChartVisitor for AddDependencyNodesVisitor<'a> {

// Named group marks can serve as datasets
if let Some(name) = &mark.name {
let parent_scope = Vec::from(&scope[..scope.len()]);
let parent_scope = Vec::from(&scope[..(scope.len() - 1)]);
let scoped_var = (Variable::new_data(name), parent_scope);
let node_index = self
.dependency_graph
Expand All @@ -262,21 +269,23 @@ impl<'a> ChartVisitor for AddDependencyNodesVisitor<'a> {
/// Visitor to add directed edges to graph with data nodes
#[derive(Debug)]
pub struct AddDependencyEdgesVisitor<'a> {
pub dependency_graph: &'a mut DiGraph<(ScopedVariable, DependencyNodeSupported), ()>,
pub dependency_graph: &'a mut DependencyGraph,
node_indexes: &'a HashMap<ScopedVariable, NodeIndex>,
task_scope: &'a TaskScope,
mark_index: u32,
}

impl<'a> AddDependencyEdgesVisitor<'a> {
pub fn new(
dependency_graph: &'a mut DiGraph<(ScopedVariable, DependencyNodeSupported), ()>,
dependency_graph: &'a mut DependencyGraph,
node_indexes: &'a HashMap<ScopedVariable, NodeIndex>,
task_scope: &'a TaskScope,
) -> Self {
Self {
dependency_graph,
node_indexes,
task_scope,
mark_index: 0,
}
}
}
Expand Down Expand Up @@ -338,6 +347,79 @@ impl<'a> ChartVisitor for AddDependencyEdgesVisitor<'a> {
Ok(())
}

fn visit_group_mark(&mut self, mark: &MarkSpec, scope: &[u32]) -> Result<()> {
// Facet datasets have parents
if let Some(from) = &mark.from {
if let Some(facet) = &from.facet {
// Scoped var for this facet dataset
let scoped_var = (Variable::new_data(&facet.name), Vec::from(scope));

let node_index = self
.node_indexes
.get(&scoped_var)
.with_context(|| format!("Missing data node: {scoped_var:?}"))?;

// Build scoped var for parent node
let source = &facet.data;
let source_var = Variable::new_data(source);
// Resolve scope up one level because source dataset must be defined in parent,
// not as a dataset within this group mark
let resolved = self
.task_scope
.resolve_scope(&source_var, &scope[..(scope.len() - 1)])?;
let scoped_source_var = (resolved.var, resolved.scope);

let source_node_index = self
.node_indexes
.get(&scoped_source_var)
.with_context(|| format!("Missing data node: {scoped_source_var:?}"))?;

// Add directed edge
self.dependency_graph
.add_edge(*source_node_index, *node_index, ());
}
}

Ok(())
}

fn visit_non_group_mark(&mut self, mark: &MarkSpec, scope: &[u32]) -> Result<()> {
// non-group marks can serve as datasets
let name = mark
.name
.clone()
.unwrap_or_else(|| format!("unnamed_mark_{}", self.mark_index));
self.mark_index += 1;

let Some(from) = &mark.from else { return Ok(()) };
let Some(source) = &from.data else { return Ok(()) };

// Scoped var for this facet dataset
let scoped_var = (Variable::new_data(&name), Vec::from(scope));

let node_index = self
.node_indexes
.get(&scoped_var)
.with_context(|| format!("Missing data node: {scoped_var:?}"))?;

let source_var = Variable::new_data(source);
// Resolve scope up one level because source dataset must be defined in parent,
// not as a dataset within this group mark
let resolved = self.task_scope.resolve_scope(&source_var, scope)?;
let scoped_source_var = (resolved.var, resolved.scope);

let source_node_index = self
.node_indexes
.get(&scoped_source_var)
.with_context(|| format!("Missing data node: {scoped_source_var:?}"))?;

// Add directed edge
self.dependency_graph
.add_edge(*source_node_index, *node_index, ());

Ok(())
}

/// Add edges into a signal node
fn visit_signal(&mut self, signal: &SignalSpec, scope: &[u32]) -> Result<()> {
// Scoped var for this node
Expand Down
2 changes: 1 addition & 1 deletion vegafusion-core/src/planning/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use vegafusion_common::error::Result;
/// transforms up through at least the first aggregation into the root dataset's transform
/// pipeline.
pub fn fuse_datasets(server_spec: &mut ChartSpec, do_not_fuse: &[ScopedVariable]) -> Result<()> {
let data_graph = build_dependency_graph(server_spec, &Default::default())?;
let (data_graph, _) = build_dependency_graph(server_spec, &Default::default())?;
let nodes: Vec<NodeIndex> = toposort_dependency_graph(&data_graph)?;

'outer: for node_index in &nodes {
Expand Down
Loading

0 comments on commit 86ce9ca

Please sign in to comment.