Skip to content

Commit

Permalink
Change graph iteration impl
Browse files Browse the repository at this point in the history
  • Loading branch information
mustafasrepo committed Dec 25, 2023
1 parent 479bdc4 commit c7e59bc
Showing 1 changed file with 113 additions and 21 deletions.
134 changes: 113 additions & 21 deletions datafusion/optimizer/src/generate_join_orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use datafusion_expr::{CrossJoin, Expr, Filter, LogicalPlan, LogicalPlanBuilder};
/// forward_mapping: vec![HashSet{0, 1}, HashSet{0}, HashSet{1}],
/// backward_mapping: vec![HashSet{0, 1}, HashSet{0, 2}]
/// }
#[derive(Debug)]
#[derive(Debug, PartialEq)]
struct BipartiteGraph {
forward_mapping: Vec<HashSet<usize>>,
backward_mapping: Vec<HashSet<usize>>,
Expand Down Expand Up @@ -204,23 +204,30 @@ impl BipartiteGraph {
/// ];
/// ```
fn split_bipartite_graph(&self) -> Result<Vec<BipartiteGraph>> {
let all_indices = (0..self.forward_mapping.len()).collect::<Vec<_>>();
let all_indices = (0..self.backward_mapping.len()).collect::<Vec<_>>();
let mut indices_covered = vec![];
let mut distinct_indices = vec![];
let mut missing_indices = set_difference_vec(&all_indices, &indices_covered);
println!("self:{:?}", self);
while !missing_indices.is_empty() {
// Since missing_indices is not empty, we can use 0th index safely
let indices = self.generate_join_orderings_from_idx(missing_indices[0]);
let (indices, visited_indices) = self.generate_join_orderings_from_idx2(missing_indices[0]);
let visited_indices = visited_indices.into_iter().collect::<Vec<_>>();
println!("indices: {:?}", indices);
println!("visited_indices: {:?}", visited_indices);
// All of the vectors inside indices consists of same values (they are permutation of each other).
// Hence we can use 0th index
distinct_indices.push(indices[0].clone());
indices_covered.extend(indices[0].clone());
distinct_indices.push(visited_indices.clone());
indices_covered.extend(visited_indices);
missing_indices = set_difference_vec(&all_indices, &indices_covered);

}
println!("distinct_indices: {:?}", distinct_indices);
let graphs = distinct_indices
.into_iter()
.map(|proj_indices| self.project(&proj_indices))
.map(|proj_indices| self.project2(&proj_indices))
.collect::<Result<Vec<_>>>()?;
println!("graphs: {:?}", graphs);
Ok(graphs)
}

Expand Down Expand Up @@ -266,6 +273,44 @@ impl BipartiteGraph {
))
}

fn get_dependencies(&self, backward_indices: &HashSet<usize>, used_indices: &[usize]) -> Vec<HashSet<usize>> {
self.indices_pointed_backward(&backward_indices).into_iter().map(|indices| set_difference(&indices, used_indices)).filter(|new_indices| !new_indices.is_empty()).collect()
}

fn add_dependencies_as_suffix(&self, result: Vec<Vec<usize>>, dependencies: &[HashSet<usize>]) -> Vec<Vec<usize>> {
let n_branch = dependencies.len();
let new_suffixes = dependencies.iter().permutations(n_branch).map(|deps| {
let suffixes = deps.into_iter().flat_map(|dep| dep.into_iter().cloned().sorted().unique().collect::<Vec<_>>()).collect::<Vec<_>>();
suffixes
}).collect::<Vec<_>>();
if result.is_empty(){
new_suffixes
} else {
result.into_iter().flat_map(|result|{
new_suffixes.iter().map(|new_suffix| {
let mut new_result = result.clone();
new_result.extend(new_suffix.to_vec());
new_result.into_iter().unique().collect()
}).collect::<Vec<_>>()
}).collect::<Vec<_>>()
}
}
fn generate_join_orderings_from_idx2(&self, idx: usize) -> (Vec<Vec<usize>>, HashSet<usize>) {
let mut join_child_multi_indices = vec![];
let mut dependencies = vec![self.backward_mapping[idx].clone()];
let mut visited_conditions = HashSet::new();
visited_conditions.insert(idx);
while !dependencies.is_empty() {
join_child_multi_indices = self.add_dependencies_as_suffix(join_child_multi_indices, &dependencies);
let used_indices = join_child_multi_indices[0].clone();
let forward_indices = dependencies.iter().flat_map(|elems| elems.clone()).collect::<HashSet<_>>();
let backward_indices = self.indices_pointed_forward(&forward_indices).into_iter().flatten().collect::<HashSet<_>>();
dependencies = self.get_dependencies(&backward_indices, &used_indices);
visited_conditions.extend(backward_indices);
}
(join_child_multi_indices, visited_conditions)
}

fn generate_join_orderings_from_idx(&self, idx: usize) -> Vec<Vec<usize>> {
let mut join_child_multi_indices = vec![];
let mut results = vec![vec![idx]];
Expand Down Expand Up @@ -332,10 +377,10 @@ impl BipartiteGraph {
/// The outer vector contains all possible join orderings
fn generate_possible_join_orderings_helper(&self) -> Vec<Vec<usize>> {
let mut join_child_multi_indices = self
.forward_mapping
.backward_mapping
.iter()
.enumerate()
.map(|(idx, _group)| self.generate_join_orderings_from_idx(idx))
.map(|(idx, _group)| self.generate_join_orderings_from_idx2(idx).0)
.collect::<Vec<_>>();
// Sort by alternative path length
// If there is less alternative paths, dependencies are deconstructed better.
Expand Down Expand Up @@ -374,7 +419,7 @@ impl BipartiteGraph {
let mut offset = 0;
elems.iter_mut().for_each(|items| {
items.iter_mut().for_each(|item| *item += offset);
offset += items.len();
// offset += items.len();
});
// Concat indices of each group.
elems.into_iter().flatten().collect::<Vec<_>>()
Expand All @@ -398,15 +443,35 @@ impl BipartiteGraph {
/// A `Result` containing a new `BipartiteGraph` instance that represents the projection of the
/// current graph onto the selected vertices. If successful, the result will contain the projected
/// graph; otherwise, an error is returned.
fn project(&self, indices: &[usize]) -> Result<Self> {
// fn project(&self, indices: &[usize]) -> Result<Self> {
// // Sort indices, this is not necessary. However, it makes result deterministic.
// let indices = indices.iter().cloned().sorted().collect::<Vec<_>>();
// let new_forward_mapping = get_at_indices(&self.forward_mapping, &indices)?;
// let new_backward_mapping = self
// .backward_mapping
// .iter()
// .map(|right_indices| {
// right_indices
// .iter()
// .filter_map(|item| indices.iter().position(|elem| elem == item))
// .collect()
// })
// .collect();
// Ok(Self {
// forward_mapping: new_forward_mapping,
// backward_mapping: new_backward_mapping,
// })
// }

fn project2(&self, indices: &[usize]) -> Result<Self> {
// Sort indices, this is not necessary. However, it makes result deterministic.
let indices = indices.iter().cloned().sorted().collect::<Vec<_>>();
let new_forward_mapping = get_at_indices(&self.forward_mapping, &indices)?;
let new_backward_mapping = self
.backward_mapping
let new_backward_mapping = get_at_indices(&self.backward_mapping, &indices)?;
let new_forward_mapping = self
.forward_mapping
.iter()
.map(|right_indices| {
right_indices
.map(|left_indices| {
left_indices
.iter()
.filter_map(|item| indices.iter().position(|elem| elem == item))
.collect()
Expand Down Expand Up @@ -632,6 +697,39 @@ mod tests {
use datafusion_common::Result;
use std::collections::HashSet;

#[test]
fn test_project() -> Result<()> {
let test_cases = vec![
// ---------- TEST CASE 1 ------------
(
// forward mapping
vec![vec![0], vec![0], vec![1], vec![1]],
// expected splitted mapping
vec![vec![vec![0], vec![0], vec![], vec![]], vec![vec![], vec![], vec![0], vec![0]]],
),
];
for (forward_mapping, expected) in test_cases {
let forward_mapping = forward_mapping
.into_iter()
.map(|elems| elems.into_iter().collect::<HashSet<_>>())
.collect::<Vec<_>>();
let graph = BipartiteGraph::try_new_from_forward_mapping(forward_mapping);
let split_mapping = graph.split_bipartite_graph()?;

let mut expected_graph = vec![];
for expected_forward_mapping in expected{
let expected_forward_mapping = expected_forward_mapping
.into_iter()
.map(|elems| elems.into_iter().collect::<HashSet<_>>())
.collect::<Vec<_>>();
let graph = BipartiteGraph::try_new_from_forward_mapping(expected_forward_mapping);
expected_graph.push(graph);
}
assert_eq!(split_mapping, expected_graph);
}
Ok(())
}

#[test]
fn test_from_forward_mapping() -> Result<()> {
let test_cases = vec![
Expand Down Expand Up @@ -669,10 +767,7 @@ mod tests {
// expected iteration indices
vec![
vec![0, 1, 2, 3],
vec![1, 0, 2, 3],
vec![2, 0, 1, 3],
vec![2, 3, 0, 1],
vec![3, 2, 0, 1],
],
),
// ---------- TEST CASE 2 ------------
Expand All @@ -682,9 +777,6 @@ mod tests {
// expected iteration indices
vec![
vec![0, 1, 2, 3],
vec![0, 1, 3, 2],
vec![1, 0, 2, 3],
vec![1, 0, 3, 2],
],
),
];
Expand Down

0 comments on commit c7e59bc

Please sign in to comment.