Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove termtree dependency #11416

Merged
merged 3 commits into from
Jul 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion datafusion/physical-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ tokio = { workspace = true }
[dev-dependencies]
rstest = { workspace = true }
rstest_reuse = "0.7.0"
termtree = "0.5.0"
tokio = { workspace = true, features = [
"rt-multi-thread",
"fs",
Expand Down
86 changes: 55 additions & 31 deletions datafusion/physical-plan/src/aggregates/topk/heap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion_common::Result;
use datafusion_physical_expr::aggregate::utils::adjust_output_array;
use half::f16;
use std::cmp::Ordering;
use std::fmt::{Debug, Formatter};
use std::fmt::{Debug, Display, Formatter};
use std::sync::Arc;

/// A custom version of `Ord` that only exists to we can implement it for the Values in our heap
Expand Down Expand Up @@ -323,29 +323,53 @@ impl<VAL: ValueType> TopKHeap<VAL> {
}
}

#[cfg(test)]
fn _tree_print(&self, idx: usize) -> Option<termtree::Tree<String>> {
let hi = self.heap.get(idx)?;
match hi {
None => None,
Some(hi) => {
let label =
format!("val={:?} idx={}, bucket={}", hi.val, idx, hi.map_idx);
let left = self._tree_print(idx * 2 + 1);
let right = self._tree_print(idx * 2 + 2);
let children = left.into_iter().chain(right);
let me = termtree::Tree::new(label).with_leaves(children);
Some(me)
fn _tree_print(
&self,
idx: usize,
prefix: String,
is_tail: bool,
output: &mut String,
) {
if let Some(Some(hi)) = self.heap.get(idx) {
let connector = if idx != 0 {
if is_tail {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

"└── "
} else {
"├── "
}
} else {
""
};
output.push_str(&format!(
"{}{}val={:?} idx={}, bucket={}\n",
prefix, connector, hi.val, idx, hi.map_idx
));
let new_prefix = if is_tail { "" } else { "│ " };
let child_prefix = format!("{}{}", prefix, new_prefix);

let left_idx = idx * 2 + 1;
let right_idx = idx * 2 + 2;

let left_exists = left_idx < self.len;
let right_exists = right_idx < self.len;

if left_exists {
self._tree_print(left_idx, child_prefix.clone(), !right_exists, output);
}
if right_exists {
self._tree_print(right_idx, child_prefix, true, output);
}
}
}
}

#[cfg(test)]
fn tree_print(&self) -> String {
match self._tree_print(0) {
None => "".to_string(),
Some(root) => format!("{}", root),
impl<VAL: ValueType> Display for TopKHeap<VAL> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut output = String::new();
if self.heap.first().is_some() {
self._tree_print(0, String::new(), true, &mut output);
}
write!(f, "{}", output)
}
}

Expand All @@ -361,9 +385,9 @@ impl<VAL: ValueType> HeapItem<VAL> {
impl<VAL: ValueType> Debug for HeapItem<VAL> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("bucket=")?;
self.map_idx.fmt(f)?;
Debug::fmt(&self.map_idx, f)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this change needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. If it's not changed, the compiler will fail and report fmt being ambiguous

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense -- thank you

f.write_str(" val=")?;
self.val.fmt(f)?;
Debug::fmt(&self.val, f)?;
f.write_str("\n")?;
Ok(())
}
Expand Down Expand Up @@ -462,7 +486,7 @@ mod tests {
let mut heap = TopKHeap::new(10, false);
heap.append_or_replace(1, 1, &mut map);

let actual = heap.tree_print();
let actual = heap.to_string();
let expected = r#"
val=1 idx=0, bucket=1
"#;
Expand All @@ -482,7 +506,7 @@ val=1 idx=0, bucket=1
heap.append_or_replace(2, 2, &mut map);
assert_eq!(map, vec![(2, 0), (1, 1)]);

let actual = heap.tree_print();
let actual = heap.to_string();
let expected = r#"
val=2 idx=0, bucket=2
└── val=1 idx=1, bucket=1
Expand All @@ -500,7 +524,7 @@ val=2 idx=0, bucket=2
heap.append_or_replace(1, 1, &mut map);
heap.append_or_replace(2, 2, &mut map);
heap.append_or_replace(3, 3, &mut map);
let actual = heap.tree_print();
let actual = heap.to_string();
let expected = r#"
val=3 idx=0, bucket=3
├── val=1 idx=1, bucket=1
Expand All @@ -510,7 +534,7 @@ val=3 idx=0, bucket=3

let mut map = vec![];
heap.append_or_replace(0, 0, &mut map);
let actual = heap.tree_print();
let actual = heap.to_string();
let expected = r#"
val=2 idx=0, bucket=2
├── val=1 idx=1, bucket=1
Expand All @@ -531,7 +555,7 @@ val=2 idx=0, bucket=2
heap.append_or_replace(2, 2, &mut map);
heap.append_or_replace(3, 3, &mut map);
heap.append_or_replace(4, 4, &mut map);
let actual = heap.tree_print();
let actual = heap.to_string();
let expected = r#"
val=4 idx=0, bucket=4
├── val=3 idx=1, bucket=3
Expand All @@ -542,7 +566,7 @@ val=4 idx=0, bucket=4

let mut map = vec![];
heap.replace_if_better(1, 0, &mut map);
let actual = heap.tree_print();
let actual = heap.to_string();
let expected = r#"
val=4 idx=0, bucket=4
├── val=1 idx=1, bucket=1
Expand All @@ -563,7 +587,7 @@ val=4 idx=0, bucket=4
heap.append_or_replace(1, 1, &mut map);
heap.append_or_replace(2, 2, &mut map);

let actual = heap.tree_print();
let actual = heap.to_string();
let expected = r#"
val=2 idx=0, bucket=2
└── val=1 idx=1, bucket=1
Expand All @@ -584,7 +608,7 @@ val=2 idx=0, bucket=2
heap.append_or_replace(1, 1, &mut map);
heap.append_or_replace(2, 2, &mut map);

let actual = heap.tree_print();
let actual = heap.to_string();
let expected = r#"
val=2 idx=0, bucket=2
└── val=1 idx=1, bucket=1
Expand All @@ -607,7 +631,7 @@ val=2 idx=0, bucket=2
heap.append_or_replace(1, 1, &mut map);
heap.append_or_replace(2, 2, &mut map);

let actual = heap.tree_print();
let actual = heap.to_string();
let expected = r#"
val=2 idx=0, bucket=2
└── val=1 idx=1, bucket=1
Expand All @@ -616,7 +640,7 @@ val=2 idx=0, bucket=2

let numbers = vec![(0, 1), (1, 2)];
heap.renumber(numbers.as_slice());
let actual = heap.tree_print();
let actual = heap.to_string();
let expected = r#"
val=2 idx=0, bucket=1
└── val=1 idx=1, bucket=2
Expand Down