diff --git a/datafusion/optimizer/src/generate_join_orders.rs b/datafusion/optimizer/src/generate_join_orders.rs index beeb391807f2..aa65711b5cd1 100644 --- a/datafusion/optimizer/src/generate_join_orders.rs +++ b/datafusion/optimizer/src/generate_join_orders.rs @@ -46,7 +46,7 @@ use datafusion_expr::{CrossJoin, Expr, Filter, LogicalPlan, LogicalPlanBuilder}; /// forward_mapping: vec![HashSet{0, 1}, HashSet{0}, HashSet{1}], /// backward_mapping: vec![HashSet{0, 1}, HashSet{0, 2}] /// } -#[derive(Debug)] +#[derive(Debug, PartialEq)] struct BipartiteGraph { forward_mapping: Vec>, backward_mapping: Vec>, @@ -204,23 +204,30 @@ impl BipartiteGraph { /// ]; /// ``` fn split_bipartite_graph(&self) -> Result> { - let all_indices = (0..self.forward_mapping.len()).collect::>(); + let all_indices = (0..self.backward_mapping.len()).collect::>(); let mut indices_covered = vec![]; let mut distinct_indices = vec![]; let mut missing_indices = set_difference_vec(&all_indices, &indices_covered); + println!("self:{:?}", self); while !missing_indices.is_empty() { // Since missing_indices is not empty, we can use 0th index safely - let indices = self.generate_join_orderings_from_idx(missing_indices[0]); + let (indices, visited_indices) = self.generate_join_orderings_from_idx2(missing_indices[0]); + let visited_indices = visited_indices.into_iter().collect::>(); + println!("indices: {:?}", indices); + println!("visited_indices: {:?}", visited_indices); // All of the vectors inside indices consists of same values (they are permutation of each other). // Hence we can use 0th index - distinct_indices.push(indices[0].clone()); - indices_covered.extend(indices[0].clone()); + distinct_indices.push(visited_indices.clone()); + indices_covered.extend(visited_indices); missing_indices = set_difference_vec(&all_indices, &indices_covered); + } + println!("distinct_indices: {:?}", distinct_indices); let graphs = distinct_indices .into_iter() - .map(|proj_indices| self.project(&proj_indices)) + .map(|proj_indices| self.project2(&proj_indices)) .collect::>>()?; + println!("graphs: {:?}", graphs); Ok(graphs) } @@ -266,6 +273,44 @@ impl BipartiteGraph { )) } + fn get_dependencies(&self, backward_indices: &HashSet, used_indices: &[usize]) -> Vec> { + self.indices_pointed_backward(&backward_indices).into_iter().map(|indices| set_difference(&indices, used_indices)).filter(|new_indices| !new_indices.is_empty()).collect() + } + + fn add_dependencies_as_suffix(&self, result: Vec>, dependencies: &[HashSet]) -> Vec> { + let n_branch = dependencies.len(); + let new_suffixes = dependencies.iter().permutations(n_branch).map(|deps| { + let suffixes = deps.into_iter().flat_map(|dep| dep.into_iter().cloned().sorted().unique().collect::>()).collect::>(); + suffixes + }).collect::>(); + if result.is_empty(){ + new_suffixes + } else { + result.into_iter().flat_map(|result|{ + new_suffixes.iter().map(|new_suffix| { + let mut new_result = result.clone(); + new_result.extend(new_suffix.to_vec()); + new_result.into_iter().unique().collect() + }).collect::>() + }).collect::>() + } + } + fn generate_join_orderings_from_idx2(&self, idx: usize) -> (Vec>, HashSet) { + let mut join_child_multi_indices = vec![]; + let mut dependencies = vec![self.backward_mapping[idx].clone()]; + let mut visited_conditions = HashSet::new(); + visited_conditions.insert(idx); + while !dependencies.is_empty() { + join_child_multi_indices = self.add_dependencies_as_suffix(join_child_multi_indices, &dependencies); + let used_indices = join_child_multi_indices[0].clone(); + let forward_indices = dependencies.iter().flat_map(|elems| elems.clone()).collect::>(); + let backward_indices = self.indices_pointed_forward(&forward_indices).into_iter().flatten().collect::>(); + dependencies = self.get_dependencies(&backward_indices, &used_indices); + visited_conditions.extend(backward_indices); + } + (join_child_multi_indices, visited_conditions) + } + fn generate_join_orderings_from_idx(&self, idx: usize) -> Vec> { let mut join_child_multi_indices = vec![]; let mut results = vec![vec![idx]]; @@ -332,10 +377,10 @@ impl BipartiteGraph { /// The outer vector contains all possible join orderings fn generate_possible_join_orderings_helper(&self) -> Vec> { let mut join_child_multi_indices = self - .forward_mapping + .backward_mapping .iter() .enumerate() - .map(|(idx, _group)| self.generate_join_orderings_from_idx(idx)) + .map(|(idx, _group)| self.generate_join_orderings_from_idx2(idx).0) .collect::>(); // Sort by alternative path length // If there is less alternative paths, dependencies are deconstructed better. @@ -374,7 +419,7 @@ impl BipartiteGraph { let mut offset = 0; elems.iter_mut().for_each(|items| { items.iter_mut().for_each(|item| *item += offset); - offset += items.len(); + // offset += items.len(); }); // Concat indices of each group. elems.into_iter().flatten().collect::>() @@ -398,15 +443,35 @@ impl BipartiteGraph { /// A `Result` containing a new `BipartiteGraph` instance that represents the projection of the /// current graph onto the selected vertices. If successful, the result will contain the projected /// graph; otherwise, an error is returned. - fn project(&self, indices: &[usize]) -> Result { + // fn project(&self, indices: &[usize]) -> Result { + // // Sort indices, this is not necessary. However, it makes result deterministic. + // let indices = indices.iter().cloned().sorted().collect::>(); + // let new_forward_mapping = get_at_indices(&self.forward_mapping, &indices)?; + // let new_backward_mapping = self + // .backward_mapping + // .iter() + // .map(|right_indices| { + // right_indices + // .iter() + // .filter_map(|item| indices.iter().position(|elem| elem == item)) + // .collect() + // }) + // .collect(); + // Ok(Self { + // forward_mapping: new_forward_mapping, + // backward_mapping: new_backward_mapping, + // }) + // } + + fn project2(&self, indices: &[usize]) -> Result { // Sort indices, this is not necessary. However, it makes result deterministic. let indices = indices.iter().cloned().sorted().collect::>(); - let new_forward_mapping = get_at_indices(&self.forward_mapping, &indices)?; - let new_backward_mapping = self - .backward_mapping + let new_backward_mapping = get_at_indices(&self.backward_mapping, &indices)?; + let new_forward_mapping = self + .forward_mapping .iter() - .map(|right_indices| { - right_indices + .map(|left_indices| { + left_indices .iter() .filter_map(|item| indices.iter().position(|elem| elem == item)) .collect() @@ -632,6 +697,39 @@ mod tests { use datafusion_common::Result; use std::collections::HashSet; + #[test] + fn test_project() -> Result<()> { + let test_cases = vec![ + // ---------- TEST CASE 1 ------------ + ( + // forward mapping + vec![vec![0], vec![0], vec![1], vec![1]], + // expected splitted mapping + vec![vec![vec![0], vec![0], vec![], vec![]], vec![vec![], vec![], vec![0], vec![0]]], + ), + ]; + for (forward_mapping, expected) in test_cases { + let forward_mapping = forward_mapping + .into_iter() + .map(|elems| elems.into_iter().collect::>()) + .collect::>(); + let graph = BipartiteGraph::try_new_from_forward_mapping(forward_mapping); + let split_mapping = graph.split_bipartite_graph()?; + + let mut expected_graph = vec![]; + for expected_forward_mapping in expected{ + let expected_forward_mapping = expected_forward_mapping + .into_iter() + .map(|elems| elems.into_iter().collect::>()) + .collect::>(); + let graph = BipartiteGraph::try_new_from_forward_mapping(expected_forward_mapping); + expected_graph.push(graph); + } + assert_eq!(split_mapping, expected_graph); + } + Ok(()) + } + #[test] fn test_from_forward_mapping() -> Result<()> { let test_cases = vec![ @@ -669,10 +767,7 @@ mod tests { // expected iteration indices vec![ vec![0, 1, 2, 3], - vec![1, 0, 2, 3], - vec![2, 0, 1, 3], vec![2, 3, 0, 1], - vec![3, 2, 0, 1], ], ), // ---------- TEST CASE 2 ------------ @@ -682,9 +777,6 @@ mod tests { // expected iteration indices vec![ vec![0, 1, 2, 3], - vec![0, 1, 3, 2], - vec![1, 0, 2, 3], - vec![1, 0, 3, 2], ], ), ];