Skip to content

Commit

Permalink
Rewrite all other methods in DynTreeNode to be iterative
Browse files Browse the repository at this point in the history
  • Loading branch information
blaginin committed Oct 29, 2024
1 parent c26e07f commit 6a4dd2c
Showing 1 changed file with 123 additions and 93 deletions.
216 changes: 123 additions & 93 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,104 @@ macro_rules! map_until_stop_and_collect {
}}
}

macro_rules! rewrite_recursive {
($START:ident, $NAME:ident, $TRANSFORM_UP:expr, $TRANSFORM_DOWN:expr) => {
let mut queue = vec![ProcessingState::NotStarted($START)];

while let Some(item) = queue.pop() {
match item {
ProcessingState::NotStarted($NAME) => {
let node = $TRANSFORM_DOWN?;

queue.push(match node.tnr {
TreeNodeRecursion::Continue => {
ProcessingState::ProcessingChildren {
non_processed_children: node
.data
.arc_children()
.into_iter()
.cloned()
.rev()
.collect(),
item: node,
processed_children: vec![],
}
}
TreeNodeRecursion::Jump => ProcessingState::ProcessedAllChildren(
node.with_tnr(TreeNodeRecursion::Continue),
),
TreeNodeRecursion::Stop => {
ProcessingState::ProcessedAllChildren(node)
}
})
}
ProcessingState::ProcessingChildren {
mut item,
mut non_processed_children,
mut processed_children,
} => match item.tnr {
TreeNodeRecursion::Continue | TreeNodeRecursion::Jump => {
if let Some(non_processed_item) = non_processed_children.pop() {
queue.push(ProcessingState::ProcessingChildren {
item,
non_processed_children,
processed_children,
});
queue.push(ProcessingState::NotStarted(non_processed_item));
} else {
item.transformed |=
processed_children.iter().any(|item| item.transformed);
item.data = item.data.with_new_arc_children(
processed_children.into_iter().map(|c| c.data).collect(),
)?;
queue.push(ProcessingState::ProcessedAllChildren(item))
}
}
TreeNodeRecursion::Stop => {
processed_children.extend(
non_processed_children
.into_iter()
.rev()
.map(Transformed::no),
);
item.transformed |=
processed_children.iter().any(|item| item.transformed);
item.data = item.data.with_new_arc_children(
processed_children.into_iter().map(|c| c.data).collect(),
)?;
queue.push(ProcessingState::ProcessedAllChildren(item));
}
},
ProcessingState::ProcessedAllChildren(node) => {
let node = node.transform_parent(|$NAME| $TRANSFORM_UP)?;

if let Some(ProcessingState::ProcessingChildren {
item: mut parent_node,
non_processed_children,
mut processed_children,
..
}) = queue.pop()
{
parent_node.tnr = node.tnr;
processed_children.push(node);

queue.push(ProcessingState::ProcessingChildren {
item: parent_node,
non_processed_children,
processed_children,
})
} else {
debug_assert_eq!(queue.len(), 0);
return Ok(node);
}
}
}
}

unreachable!();
};
}

/// Transformation helper to access [`Transformed`] fields in a [`Result`] easily.
///
/// # Example
Expand Down Expand Up @@ -999,103 +1097,35 @@ impl<T: DynTreeNode + ?Sized> TreeNode for Arc<T> {
}
}

fn rewrite<R: TreeNodeRewriter<Node = Self>>(
fn transform_down_up<
FD: FnMut(Self) -> Result<Transformed<Self>>,
FU: FnMut(Self) -> Result<Transformed<Self>>,
>(
self,
rewriter: &mut R,
mut f_down: FD,
mut f_up: FU,
) -> Result<Transformed<Self>> {
let mut queue = vec![ProcessingState::NotStarted(self)];

while let Some(item) = queue.pop() {
match item {
ProcessingState::NotStarted(node) => {
let node = rewriter.f_down(node)?;

queue.push(match node.tnr {
TreeNodeRecursion::Continue => {
ProcessingState::ProcessingChildren {
non_processed_children: node
.data
.arc_children()
.into_iter()
.cloned()
.rev()
.collect(),
item: node,
processed_children: vec![],
}
}
TreeNodeRecursion::Jump => ProcessingState::ProcessedAllChildren(
node.with_tnr(TreeNodeRecursion::Continue),
),
TreeNodeRecursion::Stop => {
ProcessingState::ProcessedAllChildren(node)
}
})
}
ProcessingState::ProcessingChildren {
mut item,
mut non_processed_children,
mut processed_children,
} => match item.tnr {
TreeNodeRecursion::Continue | TreeNodeRecursion::Jump => {
if let Some(non_processed_item) = non_processed_children.pop() {
queue.push(ProcessingState::ProcessingChildren {
item,
non_processed_children,
processed_children,
});
queue.push(ProcessingState::NotStarted(non_processed_item));
} else {
item.transformed =
processed_children.iter().any(|item| item.transformed);
item.data = item.data.with_new_arc_children(
processed_children.into_iter().map(|c| c.data).collect(),
)?;
queue.push(ProcessingState::ProcessedAllChildren(item))
}
}
TreeNodeRecursion::Stop => {
processed_children.extend(
non_processed_children
.into_iter()
.rev()
.map(Transformed::no),
);
item.transformed =
processed_children.iter().any(|item| item.transformed);
item.data = item.data.with_new_arc_children(
processed_children.into_iter().map(|c| c.data).collect(),
)?;
queue.push(ProcessingState::ProcessedAllChildren(item));
}
},
ProcessingState::ProcessedAllChildren(node) => {
let node = node.transform_parent(|n| rewriter.f_up(n))?;

if let Some(ProcessingState::ProcessingChildren {
item: mut parent_node,
non_processed_children,
mut processed_children,
..
}) = queue.pop()
{
parent_node.tnr = node.tnr;
processed_children.push(node);
rewrite_recursive!(self, node, f_up(node), f_down(node));
}

queue.push(ProcessingState::ProcessingChildren {
item: parent_node,
non_processed_children,
processed_children,
})
} else {
debug_assert_eq!(queue.len(), 0);
return Ok(node);
}
}
}
}
fn transform_down<F: FnMut(Self) -> Result<Transformed<Self>>>(
self,
f: F,
) -> Result<Transformed<Self>> {
self.transform_down_up(f, |node| Ok(Transformed::no(node)))
}

unreachable!();
fn transform_up<F: FnMut(Self) -> Result<Transformed<Self>>>(
self,
f: F,
) -> Result<Transformed<Self>> {
self.transform_down_up(|node| Ok(Transformed::no(node)), f)
}
fn rewrite<R: TreeNodeRewriter<Node = Self>>(
self,
rewriter: &mut R,
) -> Result<Transformed<Self>> {
rewrite_recursive!(self, node, rewriter.f_up(node), rewriter.f_down(node));
}

fn visit<'n, V: TreeNodeVisitor<'n, Node = Self>>(
Expand Down

0 comments on commit 6a4dd2c

Please sign in to comment.