Skip to content

Commit

Permalink
Refactor with_columns to with_columns_factory
Browse files Browse the repository at this point in the history
Central with_columns_factory from which dataframe libraries inherit.

Changes:
- Use with_columns_factory to inherit from and only need to add correct
  dataframe types and create_merge_node method.
- Refactored h_pandas.with_columns and h_polars.with_columns to inherit
  from it.
- Added async support for pandas (not sure if it makes sense for polars)
- Select=None appends all sink nodes with column types -- same as
  h_spark.with_columns
  • Loading branch information
jernejfrank committed Nov 18, 2024
1 parent 801e5ce commit c373b5d
Show file tree
Hide file tree
Showing 10 changed files with 1,810 additions and 1,603 deletions.
863 changes: 430 additions & 433 deletions examples/pandas/with_columns/notebook.ipynb

Large diffs are not rendered by default.

240 changes: 121 additions & 119 deletions examples/polars/notebook.ipynb

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions examples/polars/with_columns/README
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Using with_columns with Pandas
# Using with_columns with Polars

We show the ability to use the familiar `with_columns` from `polars`. Supported for both `pl.DataFrame` and `pl.LazyFrame`.

To see the example look at the notebook.

![image info](./dag.png)
![image info](./DAG_DataFrame.png)
![image info](./DAG_lazy.png)
1,364 changes: 673 additions & 691 deletions examples/polars/with_columns/notebook.ipynb

Large diffs are not rendered by default.

240 changes: 239 additions & 1 deletion hamilton/function_modifiers/recursive.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import abc
import inspect
import sys
import typing
from collections import defaultdict
from types import ModuleType
from typing import Any, Callable, Collection, Dict, List, Optional, Tuple, Type, TypedDict, Union

Expand All @@ -11,17 +14,21 @@
else:
from typing import NotRequired

from pandas import DataFrame as PandasDataFrame
from polars import DataFrame as PolarsDataFrame
from polars import LazyFrame as PolarsLazyFrame

# Copied this over from function_graph
# TODO -- determine the best place to put this code
from hamilton import graph_utils, node
from hamilton import graph_utils, node, registry
from hamilton.function_modifiers import base, dependencies
from hamilton.function_modifiers.base import InvalidDecoratorException, NodeTransformer
from hamilton.function_modifiers.dependencies import (
LiteralDependency,
ParametrizedDependency,
UpstreamDependency,
)
from hamilton.function_modifiers.expanders import extract_columns


def assign_namespace(node_name: str, namespace: str) -> str:
Expand Down Expand Up @@ -626,3 +633,234 @@ def prune_nodes(nodes: List[node.Node], select: Optional[List[str]] = None) -> L
stack.append(dep_node)
seen_nodes.add(dep)
return output


SUPPORTED_DATAFAME_TYPES = [PandasDataFrame, PolarsDataFrame, PolarsLazyFrame]


class with_columns_factory(base.NodeInjector, abc.ABC):
"""Performs with_columns operation on a dataframe. This is a special case of NodeInjector
that applies only to dataframes. For now can be used with:
- Pandas
- Polars
This is used when you want to extract some columns out of the dataframe, perform operations
on them and then append to the original dataframe.
def processed_data(data: pd.DataFrame) -> pd.DataFrame:
...
In this case we would build a subdag out of the node ``data`` and append selected nodes back to
the original dataframe before feeding it into ``processed_data``.
"""

# TODO: if we rename the column nodes into something smarter this can be avoided and
# can also modify columns in place
@staticmethod
def _check_for_duplicates(nodes_: List[node.Node]) -> bool:
"""Ensures that we don't run into name clashing of columns and group operations.
In the case when we extract columns for the user, because ``columns_to_pass`` was used, we want
to safeguard against nameclashing with functions that are passed into ``with_columns`` - i.e.
there are no functions that have the same name as the columns. This effectively means that
using ``columns_to_pass`` will only append new columns to the dataframe and for changing
existing columns ``pass_dataframe_as`` needs to be used.
"""
node_counter = defaultdict(int)
for node_ in nodes_:
node_counter[node_.name] += 1
if node_counter[node_.name] > 1:
return True
return False

def validate_dataframe_type(self):
if not set(self.allowed_dataframe_types).issubset(list(SUPPORTED_DATAFAME_TYPES)):
raise InvalidDecoratorException(
f"The provided dataframe types: {self.allowed_dataframe_types} are currently not supported "
"to be used in `with_columns`. Please reach out if you need it. "
f"We currently only support: {SUPPORTED_DATAFAME_TYPES}."
)

def __init__(
self,
*load_from: Union[Callable, ModuleType],
columns_to_pass: List[str] = None,
pass_dataframe_as: str = None,
select: List[str] = None,
namespace: str = None,
config_required: List[str] = None,
dataframe_types: Collection[Type] = None,
):
"""Instantiates a ``@with_column`` decorator.
:param load_from: The functions or modules that will be used to generate the group of map operations.
:param columns_to_pass: The initial schema of the dataframe. This is used to determine which
upstream inputs should be taken from the dataframe, and which shouldn't. Note that, if this is
left empty (and external_inputs is as well), we will assume that all dependencies come
from the dataframe. This cannot be used in conjunction with pass_dataframe_as.
:param pass_dataframe_as: The name of the dataframe that we're modifying, as known to the subdag.
If you pass this in, you are responsible for extracting columns out. If not provided, you have
to pass columns_to_pass in, and we will extract the columns out for you.
:param select: The end nodes that represent columns to be appended to the original dataframe
via with_columns. Existing columns will be overridden.
:param namespace: The namespace of the nodes, so they don't clash with the global namespace
and so this can be reused. If its left out, there will be no namespace (in which case you'll want
to be careful about repeating it/reusing the nodes in other parts of the DAG.)
:param config_required: the list of config keys that are required to resolve any functions. Pass in None\
if you want the functions/modules to have access to all possible config.
"""

if dataframe_types is None:
raise ValueError("You need to specify which dataframe types it will be applied to.")
else:
if isinstance(dataframe_types, Type):
dataframe_types = [dataframe_types]
self.allowed_dataframe_types = dataframe_types
self.validate_dataframe_type()

self.subdag_functions = subdag.collect_functions(load_from)
self.select = select

if (pass_dataframe_as is not None and columns_to_pass is not None) or (
pass_dataframe_as is None and columns_to_pass is None
):
raise ValueError(
"You must specify only one of columns_to_pass and "
"pass_dataframe_as. "
"This is because specifying pass_dataframe_as injects into "
"the set of columns, allowing you to perform your own extraction"
"from the dataframe. We then execute all columns in the sbudag"
"in order, passing in that initial dataframe. If you want"
"to reference columns in your code, you'll have to specify "
"the set of initial columns, and allow the subdag decorator "
"to inject the dataframe through. The initial columns tell "
"us which parameters to take from that dataframe, so we can"
"feed the right data into the right columns."
)

self.initial_schema = columns_to_pass
self.dataframe_subdag_param = pass_dataframe_as
self.namespace = namespace
self.config_required = config_required

def required_config(self) -> List[str]:
return self.config_required

def _create_column_nodes(
self, inject_parameter: str, params: Dict[str, Type[Type]]
) -> List[node.Node]:
output_type = params[inject_parameter]

if self.is_async:

async def temp_fn(**kwargs) -> Any:
return kwargs[inject_parameter]
else:

def temp_fn(**kwargs) -> Any:
return kwargs[inject_parameter]

# We recreate the df node to use extract columns
temp_node = node.Node(
name=inject_parameter,
typ=output_type,
callabl=temp_fn,
input_types={inject_parameter: output_type},
)

extract_columns_decorator = extract_columns(*self.initial_schema)

out_nodes = extract_columns_decorator.transform_node(temp_node, config={}, fn=temp_fn)
return out_nodes[1:]

def _get_inital_nodes(
self, fn: Callable, params: Dict[str, Type[Type]]
) -> Tuple[str, Collection[node.Node]]:
"""Selects the correct dataframe and optionally extracts out columns."""
initial_nodes = []
sig = inspect.signature(fn)
input_types = typing.get_type_hints(fn)

if self.dataframe_subdag_param is not None:
inject_parameter = self.dataframe_subdag_param
else:
# If we don't have a specified dataframe we assume it's the first argument
inject_parameter = list(sig.parameters.values())[0].name

if inject_parameter not in params:
raise base.InvalidDecoratorException(
f"Function: {fn.__name__} does not have the parameter {inject_parameter} as a dependency. "
f"@with_columns requires the parameter names to match the function parameters. "
f"If you wish do not wish to use the first argument, please use `pass_dataframe_as` option. "
f"It might not be compatible with some other decorators."
)

if input_types[inject_parameter] not in self.allowed_dataframe_types:
raise ValueError(f"Dataframe has to be a {self.allowed_dataframe_types} DataFrame.")
else:
self.dataframe_type = input_types[inject_parameter]

initial_nodes = (
[]
if self.dataframe_subdag_param is not None
else self._create_column_nodes(inject_parameter=inject_parameter, params=params)
)

return inject_parameter, initial_nodes

@abc.abstractmethod
def create_merge_node(self, upstream_node: str, node_name: str) -> node.Node:
"""Should create a node that merges the results back into the original dataframe.
Node that adds to / overrides columns for the original dataframe based on selected output.
This will be platform specific, see Pandas and Polars plugins for implementation.
"""
pass

def inject_nodes(
self, params: Dict[str, Type[Type]], config: Dict[str, Any], fn: Callable
) -> Tuple[List[node.Node], Dict[str, str]]:
self.is_async = inspect.iscoroutinefunction(fn)
namespace = fn.__name__ if self.namespace is None else self.namespace

inject_parameter, initial_nodes = self._get_inital_nodes(fn=fn, params=params)

subdag_nodes = subdag.collect_nodes(config, self.subdag_functions)

# TODO: for now we restrict that if user wants to change columns that already exist, he needs to
# pass the dataframe and extract them himself. If we add namespace to initial nodes and rewire the
# initial node names with the ongoing ones that have a column argument, we can also allow in place
# changes when using columns_to_pass
if with_columns_factory._check_for_duplicates(initial_nodes + subdag_nodes):
raise ValueError(
"You can only specify columns once. You used `columns_to_pass` and we "
"extract the columns for you. In this case they cannot be overwritten -- only new columns get "
"appended. If you want to modify in-place columns pass in a dataframe and "
"extract + modify the columns and afterwards select them."
)

pruned_nodes = prune_nodes(subdag_nodes, self.select)
if len(pruned_nodes) == 0:
raise ValueError(
f"No nodes found upstream from select columns: {self.select} for function: "
f"{fn.__qualname__}"
)
# In case no node is selected we append all possible nodes that have a column type matching
# what the dataframe expects
if self.select is None:
self.select = [
sink_node.name
for sink_node in pruned_nodes
if sink_node.type == registry.get_column_type_from_df_type(self.dataframe_type)
]

merge_node = self.create_merge_node(inject_parameter, node_name="__append")

output_nodes = initial_nodes + pruned_nodes + [merge_node]
output_nodes = subdag.add_namespace(output_nodes, namespace)
return output_nodes, {inject_parameter: assign_namespace(merge_node.name, namespace)}

def validate(self, fn: Callable):
self.validate_dataframe_type()
Loading

0 comments on commit c373b5d

Please sign in to comment.