Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

with_columns for Pandas #1209

Merged
merged 3 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion docs/reference/decorators/with_columns.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,19 @@
with_columns
=======================

** Overview **
Pandas
--------------

We have a ``with_columns`` option to run operations on columns of a Pandas dataframe and append the results as new columns.

**Reference Documentation**

.. autoclass:: hamilton.plugins.h_pandas.with_columns
:special-members: __init__


PySpark
--------------

This is part of the hamilton pyspark integration. To install, run:

Expand Down
Binary file added examples/pandas/with_columns/DAG.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
7 changes: 7 additions & 0 deletions examples/pandas/with_columns/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Using with_columns with Pandas

We show the ability to use the familiar `with_columns` from either `pyspark` or `polars` on a Pandas dataframe.

To see the example look at the notebook.

![image info](./dag.png)
47 changes: 47 additions & 0 deletions examples/pandas/with_columns/my_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import pandas as pd

from hamilton.function_modifiers import config

"""
Notes:
1. This file is used for all the [ray|dask|spark]/hello_world examples.
2. It therefore show cases how you can write something once and not only scale it, but port it
to different frameworks with ease!
"""


@config.when(case="millions")
def avg_3wk_spend__millions(spend: pd.Series) -> pd.Series:
"""Rolling 3 week average spend."""
return spend.rolling(3).mean() / 1e6


@config.when(case="thousands")
def avg_3wk_spend__thousands(spend: pd.Series) -> pd.Series:
"""Rolling 3 week average spend."""
return spend.rolling(3).mean() / 1e3


def spend_per_signup(spend: pd.Series, signups: pd.Series) -> pd.Series:
"""The cost per signup in relation to spend."""
return spend / signups


def spend_mean(spend: pd.Series) -> float:
"""Shows function creating a scalar. In this case it computes the mean of the entire column."""
return spend.mean()


def spend_zero_mean(spend: pd.Series, spend_mean: float) -> pd.Series:
"""Shows function that takes a scalar. In this case to zero mean spend."""
return spend - spend_mean


def spend_std_dev(spend: pd.Series) -> float:
"""Function that computes the standard deviation of the spend column."""
return spend.std()


def spend_zero_mean_unit_variance(spend_zero_mean: pd.Series, spend_std_dev: float) -> pd.Series:
"""Function showing one way to make spend have zero mean and unit variance."""
return spend_zero_mean / spend_std_dev
723 changes: 723 additions & 0 deletions examples/pandas/with_columns/notebook.ipynb

Large diffs are not rendered by default.

32 changes: 29 additions & 3 deletions hamilton/function_modifiers/recursive.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import inspect
import sys
from types import ModuleType
from typing import Any, Callable, Collection, Dict, List, Optional, Tuple, Type, Union
from typing import Any, Callable, Collection, Dict, List, Optional, Tuple, Type, TypedDict, Union

_sys_version_info = sys.version_info
_version_tuple = (_sys_version_info.major, _sys_version_info.minor, _sys_version_info.micro)
Expand All @@ -12,8 +12,6 @@
from typing import NotRequired


from typing import TypedDict

# Copied this over from function_graph
# TODO -- determine the best place to put this code
from hamilton import graph_utils, node
Expand Down Expand Up @@ -600,3 +598,31 @@ def required_config(self) -> Optional[List[str]]:
:return: Any required config items.
"""
return None


def prune_nodes(nodes: List[node.Node], select: Optional[List[str]] = None) -> List[node.Node]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, would make this live here and have pandas/spark refer to it

"""Prunes the nodes to only include those upstream from the select columns.
Conducts a depth-first search using the nodes `input_types` field.

If select is None, we just assume all nodes should be included.

:param nodes: Full set of nodes
:param select: Columns to select
:return: Pruned set of nodes
"""
if select is None:
return nodes

node_name_map = {node_.name: node_ for node_ in nodes}
seen_nodes = set(select)
stack = list({node_name_map[col] for col in select if col in node_name_map})
output = []
while len(stack) > 0:
node_ = stack.pop()
output.append(node_)
for dep in node_.input_types:
if dep not in seen_nodes and dep in node_name_map:
dep_node = node_name_map[dep]
stack.append(dep_node)
seen_nodes.add(dep)
return output
Loading