From 8a0234ab98a7427b459646a6b90a811db1cbec3f Mon Sep 17 00:00:00 2001 From: jernejfrank Date: Mon, 4 Nov 2024 15:07:47 +0800 Subject: [PATCH 1/3] Refactor prune_nodes out of h_spark This functionlity will be shared with pandas and polars, putting it central. --- hamilton/function_modifiers/recursive.py | 32 ++++++++++++++++++++-- hamilton/plugins/h_spark.py | 31 +-------------------- tests/function_modifiers/test_recursive.py | 2 +- 3 files changed, 31 insertions(+), 34 deletions(-) diff --git a/hamilton/function_modifiers/recursive.py b/hamilton/function_modifiers/recursive.py index 791bc83c1..744d222e2 100644 --- a/hamilton/function_modifiers/recursive.py +++ b/hamilton/function_modifiers/recursive.py @@ -1,7 +1,7 @@ import inspect import sys from types import ModuleType -from typing import Any, Callable, Collection, Dict, List, Optional, Tuple, Type, Union +from typing import Any, Callable, Collection, Dict, List, Optional, Tuple, Type, TypedDict, Union _sys_version_info = sys.version_info _version_tuple = (_sys_version_info.major, _sys_version_info.minor, _sys_version_info.micro) @@ -12,8 +12,6 @@ from typing import NotRequired -from typing import TypedDict - # Copied this over from function_graph # TODO -- determine the best place to put this code from hamilton import graph_utils, node @@ -600,3 +598,31 @@ def required_config(self) -> Optional[List[str]]: :return: Any required config items. """ return None + + +def prune_nodes(nodes: List[node.Node], select: Optional[List[str]] = None) -> List[node.Node]: + """Prunes the nodes to only include those upstream from the select columns. + Conducts a depth-first search using the nodes `input_types` field. + + If select is None, we just assume all nodes should be included. + + :param nodes: Full set of nodes + :param select: Columns to select + :return: Pruned set of nodes + """ + if select is None: + return nodes + + node_name_map = {node_.name: node_ for node_ in nodes} + seen_nodes = set(select) + stack = list({node_name_map[col] for col in select if col in node_name_map}) + output = [] + while len(stack) > 0: + node_ = stack.pop() + output.append(node_) + for dep in node_.input_types: + if dep not in seen_nodes and dep in node_name_map: + dep_node = node_name_map[dep] + stack.append(dep_node) + seen_nodes.add(dep) + return output diff --git a/hamilton/plugins/h_spark.py b/hamilton/plugins/h_spark.py index 07353c4cf..b60eb7908 100644 --- a/hamilton/plugins/h_spark.py +++ b/hamilton/plugins/h_spark.py @@ -20,7 +20,7 @@ from hamilton.execution import graph_functions from hamilton.function_modifiers import base as fm_base from hamilton.function_modifiers import subdag -from hamilton.function_modifiers.recursive import assign_namespace +from hamilton.function_modifiers.recursive import assign_namespace, prune_nodes from hamilton.htypes import custom_subclass_check from hamilton.lifecycle import base as lifecycle_base @@ -700,34 +700,6 @@ def derive_dataframe_parameter_from_node(node_: node.Node, requested_parameter: return derive_dataframe_parameter(types_, requested_parameter, originating_function_name) -def prune_nodes(nodes: List[node.Node], select: Optional[List[str]] = None) -> List[node.Node]: - """Prunes the nodes to only include those upstream from the select columns. - Conducts a depth-first search using the nodes `input_types` field. - - If select is None, we just assume all nodes should be included. - - :param nodes: Full set of nodes - :param select: Columns to select - :return: Pruned set of nodes - """ - if select is None: - return nodes - - node_name_map = {node_.name: node_ for node_ in nodes} - seen_nodes = set(select) - stack = list({node_name_map[col] for col in select if col in node_name_map}) - output = [] - while len(stack) > 0: - node_ = stack.pop() - output.append(node_) - for dep in node_.input_types: - if dep not in seen_nodes and dep in node_name_map: - dep_node = node_name_map[dep] - stack.append(dep_node) - seen_nodes.add(dep) - return output - - class require_columns(fm_base.NodeTransformer): """Decorator for spark that allows for the specification of columns to transform. These are columns within a specific node in a decorator, enabling the user to make use of pyspark @@ -1041,7 +1013,6 @@ def final_df(initial_df: ps.DataFrame) -> ps.DataFrame: ) self.dataframe_subdag_param = pass_dataframe_as self.namespace = namespace - self.upstream_dependency = dataframe self.mode = mode self.config_required = config_required diff --git a/tests/function_modifiers/test_recursive.py b/tests/function_modifiers/test_recursive.py index fa2482e2b..7b3ae7a91 100644 --- a/tests/function_modifiers/test_recursive.py +++ b/tests/function_modifiers/test_recursive.py @@ -181,7 +181,7 @@ def test_reuse_subdag_end_to_end(): fg = graph.FunctionGraph.from_modules(tests.resources.reuse_subdag, config={"op": "subtract"}) prefixless_nodes = [] prefixed_nodes = collections.defaultdict(list) - for name, node in fg.nodes.items(): + for name, node in fg.nodes.items(): # noqa:F402 name_split = name.split(".") if len(name_split) == 1: prefixless_nodes.append(node) From 2e87361fc48b721a096c259397b99db6fda1ac5c Mon Sep 17 00:00:00 2001 From: jernejfrank Date: Mon, 4 Nov 2024 15:10:16 +0800 Subject: [PATCH 2/3] Add with_columns for pandas Exteded the with_columns to be used on pandas dataframe. Pandas does not have a native implementation of with_columns. This builds one by using the existing extract_columns decorator to create node(s) for each specified column(s) in the dataframe, then a subdag gets build the usual way and at the end selected end nodes get appended to a copy of the original dataframe using pandas.assing -- which creates a shallow copy of the original dataframe. In case columns with the same name are selected as end nodes it overrides the existing columns. --- docs/reference/decorators/with_columns.rst | 14 +- hamilton/plugins/h_pandas.py | 291 ++++++++++++++++ plugin_tests/h_pandas/__init__.py | 0 plugin_tests/h_pandas/conftest.py | 4 + plugin_tests/h_pandas/resources/__init__.py | 0 .../resources/with_columns_end_to_end.py | 66 ++++ plugin_tests/h_pandas/test_with_columns.py | 310 ++++++++++++++++++ 7 files changed, 684 insertions(+), 1 deletion(-) create mode 100644 hamilton/plugins/h_pandas.py create mode 100644 plugin_tests/h_pandas/__init__.py create mode 100644 plugin_tests/h_pandas/conftest.py create mode 100644 plugin_tests/h_pandas/resources/__init__.py create mode 100644 plugin_tests/h_pandas/resources/with_columns_end_to_end.py create mode 100644 plugin_tests/h_pandas/test_with_columns.py diff --git a/docs/reference/decorators/with_columns.rst b/docs/reference/decorators/with_columns.rst index 1cc03d892..9dbbb7b1f 100644 --- a/docs/reference/decorators/with_columns.rst +++ b/docs/reference/decorators/with_columns.rst @@ -2,7 +2,19 @@ with_columns ======================= -** Overview ** +Pandas +-------------- + +We have a ``with_columns`` option to run operations on columns of a Pandas dataframe and append the results as new columns. + +**Reference Documentation** + +.. autoclass:: hamilton.plugins.h_pandas.with_columns + :special-members: __init__ + + +PySpark +-------------- This is part of the hamilton pyspark integration. To install, run: diff --git a/hamilton/plugins/h_pandas.py b/hamilton/plugins/h_pandas.py new file mode 100644 index 000000000..722896dcf --- /dev/null +++ b/hamilton/plugins/h_pandas.py @@ -0,0 +1,291 @@ +import inspect +import sys +import typing +from collections import defaultdict +from types import ModuleType +from typing import Any, Callable, Collection, Dict, List, Tuple, Type, Union + +_sys_version_info = sys.version_info +_version_tuple = (_sys_version_info.major, _sys_version_info.minor, _sys_version_info.micro) + +if _version_tuple < (3, 11, 0): + pass +else: + pass + +import pandas as pd + +# Copied this over from function_graph +# TODO -- determine the best place to put this code +from hamilton import node +from hamilton.function_modifiers import base +from hamilton.function_modifiers.expanders import extract_columns +from hamilton.function_modifiers.recursive import assign_namespace, prune_nodes, subdag + + +class with_columns(base.NodeInjector): + """Initializes a with_columns decorator for pandas. This allows you to efficiently run groups of map operations on a dataframe. + + Here's an example of calling it -- if you've seen ``@subdag``, you should be familiar with + the concepts: + + .. code-block:: python + + # my_module.py + def a(a_from_df: pd.Series) -> pd.Series: + return _process(a) + + def b(b_from_df: pd.Series) -> pd.Series: + return _process(b) + + def a_b_average(a_from_df: pd.Series, b_from_df: pd.Series) -> pd.Series: + return (a_from_df + b_from_df) / 2 + + + .. code-block:: python + + # with_columns_module.py + def a_plus_b(a: pd.Series, b: pd.Series) -> pd.Series: + return a + b + + + # the with_columns call + @with_columns( + *[my_module], # Load from any module + *[a_plus_b], # or list operations directly + columns_to_pass=["a_from_df", "b_from_df"], # The columns to pass from the dataframe to + # the subdag + select=["a", "b", "a_plus_b", "a_b_average"], # The columns to select from the dataframe + ) + def final_df(initial_df: pd.DataFrame) -> pd.DataFrame: + # process, or just return unprocessed + ... + + In this instance the ``initial_df`` would get two columns added: ``a_plus_b`` and ``a_b_average``. + + The operations are applied in topological order. This allows you to + express the operations individually, making it easy to unit-test and reuse. + + Note that the operation is "append", meaning that the columns that are selected are appended + onto the dataframe. + + If the function takes multiple dataframes, the dataframe input to process will always be + the first argument. This will be passed to the subdag, transformed, and passed back to the function. + This follows the hamilton rule of reference by parameter name. To demonstarte this, in the code + above, the dataframe that is passed to the subdag is `initial_df`. That is transformed + by the subdag, and then returned as the final dataframe. + + You can read it as: + + "final_df is a function that transforms the upstream dataframe initial_df, running the transformations + from my_module. It starts with the columns a_from_df and b_from_df, and then adds the columns + a, b, and a_plus_b to the dataframe. It then returns the dataframe, and does some processing on it." + + In case you need more flexibility you can alternatively use ``pass_dataframe_as``, for example, + + .. code-block:: python + + # with_columns_module.py + def a_from_df(initial_df: pd.Series) -> pd.Series: + return initial_df["a_from_df"] / 100 + + def b_from_df(initial_df: pd.Series) -> pd.Series: + return initial_df["b_from_df"] / 100 + + + # the with_columns call + @with_columns( + *[my_module], + *[a_from_df], + columns_to_pass=["a_from_df", "b_from_df"], + select=["a_from_df", "b_from_df", "a", "b", "a_plus_b", "a_b_average"], + ) + def final_df(initial_df: pd.DataFrame) -> pd.DataFrame: + # process, or just return unprocessed + ... + + the above would output a dataframe where the two columns ``a_from_df`` and ``b_from_df`` get + overwritten. + """ + + @staticmethod + def _check_for_duplicates(nodes_: List[node.Node]) -> bool: + """Ensures that we don't run into name clashing of columns and group operations. + + In the case when we extract columns for the user, because ``columns_to_pass`` was used, we want + to safeguard against nameclashing with functions that are passed into ``with_columns`` - i.e. + there are no functions that have the same name as the columns. This effectively means that + using ``columns_to_pass`` will only append new columns to the dataframe and for changing + existing columns ``pass_dataframe_as`` needs to be used. + """ + node_counter = defaultdict(int) + for node_ in nodes_: + node_counter[node_.name] += 1 + if node_counter[node_.name] > 1: + return True + return False + + def __init__( + self, + *load_from: Union[Callable, ModuleType], + columns_to_pass: List[str] = None, + pass_dataframe_as: str = None, + select: List[str] = None, + namespace: str = None, + config_required: List[str] = None, + ): + """Instantiates a ``@with_column`` decorator. + + :param load_from: The functions or modules that will be used to generate the group of map operations. + :param columns_to_pass: The initial schema of the dataframe. This is used to determine which + upstream inputs should be taken from the dataframe, and which shouldn't. Note that, if this is + left empty (and external_inputs is as well), we will assume that all dependencies come + from the dataframe. This cannot be used in conjunction with pass_dataframe_as. + :param pass_dataframe_as: The name of the dataframe that we're modifying, as known to the subdag. + If you pass this in, you are responsible for extracting columns out. If not provided, you have + to pass columns_to_pass in, and we will extract the columns out for you. + :param namespace: The namespace of the nodes, so they don't clash with the global namespace + and so this can be reused. If its left out, there will be no namespace (in which case you'll want + to be careful about repeating it/reusing the nodes in other parts of the DAG.) + :param config_required: the list of config keys that are required to resolve any functions. Pass in None\ + if you want the functions/modules to have access to all possible config. + """ + + self.subdag_functions = subdag.collect_functions(load_from) + + if select is None: + raise ValueError("Please specify at least one column to append or update.") + else: + self.select = select + + if (pass_dataframe_as is not None and columns_to_pass is not None) or ( + pass_dataframe_as is None and columns_to_pass is None + ): + raise ValueError( + "You must specify only one of columns_to_pass and " + "pass_dataframe_as. " + "This is because specifying pass_dataframe_as injects into " + "the set of columns, allowing you to perform your own extraction" + "from the dataframe. We then execute all columns in the sbudag" + "in order, passing in that initial dataframe. If you want" + "to reference columns in your code, you'll have to specify " + "the set of initial columns, and allow the subdag decorator " + "to inject the dataframe through. The initial columns tell " + "us which parameters to take from that dataframe, so we can" + "feed the right data into the right columns." + ) + + self.initial_schema = columns_to_pass + self.dataframe_subdag_param = pass_dataframe_as + self.namespace = namespace + self.config_required = config_required + + def required_config(self) -> List[str]: + return self.config_required + + def _create_column_nodes( + self, inject_parameter: str, params: Dict[str, Type[Type]] + ) -> List[node.Node]: + output_type = params[inject_parameter] + + def temp_fn(**kwargs) -> pd.DataFrame: + return kwargs[inject_parameter] + + # We recreate the df node to use extract columns + temp_node = node.Node( + name=inject_parameter, + typ=output_type, + callabl=temp_fn, + input_types={inject_parameter: output_type}, + ) + + extract_columns_decorator = extract_columns(*self.initial_schema) + + out_nodes = extract_columns_decorator.transform_node(temp_node, config={}, fn=temp_fn) + return out_nodes[1:] + + def _get_inital_nodes( + self, fn: Callable, params: Dict[str, Type[Type]] + ) -> Tuple[str, Collection[node.Node]]: + """Selects the correct dataframe and optionally extracts out columns.""" + initial_nodes = [] + if self.dataframe_subdag_param is not None: + inject_parameter = self.dataframe_subdag_param + else: + # If we don't have a specified dataframe we assume it's the first argument + sig = inspect.signature(fn) + inject_parameter = list(sig.parameters.values())[0].name + input_types = typing.get_type_hints(fn) + + if not input_types[inject_parameter] == pd.DataFrame: + raise ValueError( + "First argument has to be a pandas DataFrame. If you wish to use a " + "different argument, please use `pass_dataframe_as` option." + ) + + initial_nodes.extend( + self._create_column_nodes(inject_parameter=inject_parameter, params=params) + ) + + if inject_parameter not in params: + raise base.InvalidDecoratorException( + f"Function: {fn.__name__} has a first parameter that is not a dependency. " + f"@with_columns requires the parameter names to match the function parameters. " + f"Thus it might not be compatible with some other decorators" + ) + + return inject_parameter, initial_nodes + + def _create_merge_node(self, upstream_node: str, node_name: str) -> node.Node: + "Node that adds to / overrides columns for the original dataframe based on selected output." + + def new_callable(**kwargs) -> Any: + df = kwargs[upstream_node] + columns_to_append = {} + for column in self.select: + columns_to_append[column] = kwargs[column] + + return df.assign(**columns_to_append) + + input_map = {column: pd.Series for column in self.select} + input_map[upstream_node] = pd.DataFrame + + return node.Node( + name=node_name, + typ=pd.DataFrame, + callabl=new_callable, + input_types=input_map, + ) + + def inject_nodes( + self, params: Dict[str, Type[Type]], config: Dict[str, Any], fn: Callable + ) -> Tuple[List[node.Node], Dict[str, str]]: + namespace = fn.__name__ if self.namespace is None else self.namespace + + inject_parameter, initial_nodes = self._get_inital_nodes(fn=fn, params=params) + + subdag_nodes = subdag.collect_nodes(config, self.subdag_functions) + + if with_columns._check_for_duplicates(initial_nodes + subdag_nodes): + raise ValueError( + "You can only specify columns once. You used `columns_to_pass` and we " + "extract the columns for you. In this case they cannot be overwritten -- only new columns get " + "appended. If you want to modify in-place columns pass in a dataframe and " + "extract + modify the columns and afterwards select them." + ) + + pruned_nodes = prune_nodes(subdag_nodes, self.select) + if len(pruned_nodes) == 0: + raise ValueError( + f"No nodes found upstream from select columns: {self.select} for function: " + f"{fn.__qualname__}" + ) + + merge_node = self._create_merge_node(inject_parameter, node_name="__append") + + output_nodes = initial_nodes + pruned_nodes + [merge_node] + output_nodes = subdag.add_namespace(output_nodes, namespace) + return output_nodes, {inject_parameter: assign_namespace(merge_node.name, namespace)} + + def validate(self, fn: Callable): + pass diff --git a/plugin_tests/h_pandas/__init__.py b/plugin_tests/h_pandas/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/plugin_tests/h_pandas/conftest.py b/plugin_tests/h_pandas/conftest.py new file mode 100644 index 000000000..bc5ef5b5a --- /dev/null +++ b/plugin_tests/h_pandas/conftest.py @@ -0,0 +1,4 @@ +from hamilton import telemetry + +# disable telemetry for all tests! +telemetry.disable_telemetry() diff --git a/plugin_tests/h_pandas/resources/__init__.py b/plugin_tests/h_pandas/resources/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/plugin_tests/h_pandas/resources/with_columns_end_to_end.py b/plugin_tests/h_pandas/resources/with_columns_end_to_end.py new file mode 100644 index 000000000..16d493e46 --- /dev/null +++ b/plugin_tests/h_pandas/resources/with_columns_end_to_end.py @@ -0,0 +1,66 @@ +import pandas as pd + +from hamilton.function_modifiers import config +from hamilton.plugins.h_pandas import with_columns + + +def upstream_factor() -> int: + return 3 + + +def initial_df() -> pd.DataFrame: + return pd.DataFrame({"col_1": [1, 2, 3, 4], "col_2": [11, 12, 13, 14], "col_3": [1, 1, 1, 1]}) + + +def subtract_1_from_2(col_1: pd.Series, col_2: pd.Series) -> pd.Series: + return col_2 - col_1 + + +@config.when(factor=5) +def multiply_3__by_5(col_3: pd.Series) -> pd.Series: + return col_3 * 5 + + +@config.when(factor=7) +def multiply_3__by_7(col_3: pd.Series) -> pd.Series: + return col_3 * 7 + + +def add_1_by_user_adjustment_factor(col_1: pd.Series, user_factor: int) -> pd.Series: + return col_1 + user_factor + + +def multiply_2_by_upstream_3(col_2: pd.Series, upstream_factor: int) -> pd.Series: + return col_2 * upstream_factor + + +@with_columns( + subtract_1_from_2, + multiply_3__by_5, + multiply_3__by_7, + add_1_by_user_adjustment_factor, + multiply_2_by_upstream_3, + columns_to_pass=["col_1", "col_2", "col_3"], + select=[ + "subtract_1_from_2", + "multiply_3", + "add_1_by_user_adjustment_factor", + "multiply_2_by_upstream_3", + ], + namespace="some_subdag", +) +def final_df(initial_df: pd.DataFrame) -> pd.DataFrame: + return initial_df + + +def col_3(initial_df: pd.DataFrame) -> pd.Series: + return pd.Series([0, 2, 4, 6]) + + +@with_columns( + col_3, + pass_dataframe_as="initial_df", + select=["col_3", "multiply_3"], +) +def final_df_2(initial_df: pd.DataFrame) -> pd.DataFrame: + return initial_df diff --git a/plugin_tests/h_pandas/test_with_columns.py b/plugin_tests/h_pandas/test_with_columns.py new file mode 100644 index 000000000..b3a84d666 --- /dev/null +++ b/plugin_tests/h_pandas/test_with_columns.py @@ -0,0 +1,310 @@ +import pandas as pd +import pytest + +from hamilton import driver, node +from hamilton.function_modifiers.base import NodeInjector +from hamilton.plugins.h_pandas import with_columns + +from .resources import with_columns_end_to_end + + +def dummy_fn_with_columns(col_1: pd.Series) -> pd.Series: + return col_1 + 100 + + +def test_detect_duplicate_nodes(): + node_a = node.Node.from_fn(dummy_fn_with_columns, name="a") + node_b = node.Node.from_fn(dummy_fn_with_columns, name="a") + node_c = node.Node.from_fn(dummy_fn_with_columns, name="c") + + if not with_columns._check_for_duplicates([node_a, node_b, node_c]): + raise (AssertionError) + + if with_columns._check_for_duplicates([node_a, node_c]): + raise (AssertionError) + + +def test_select_not_empty(): + error_message = "Please specify at least one column to append or update." + + with pytest.raises(ValueError) as e: + with_columns(dummy_fn_with_columns) + assert str(e.value) == error_message + + +def test_columns_to_pass_and_pass_dataframe_as_raises_error(): + error_message = ( + "You must specify only one of columns_to_pass and " + "pass_dataframe_as. " + "This is because specifying pass_dataframe_as injects into " + "the set of columns, allowing you to perform your own extraction" + "from the dataframe. We then execute all columns in the sbudag" + "in order, passing in that initial dataframe. If you want" + "to reference columns in your code, you'll have to specify " + "the set of initial columns, and allow the subdag decorator " + "to inject the dataframe through. The initial columns tell " + "us which parameters to take from that dataframe, so we can" + "feed the right data into the right columns." + ) + + with pytest.raises(ValueError) as e: + with_columns( + dummy_fn_with_columns, columns_to_pass=["a"], pass_dataframe_as="a", select=["a"] + ) + assert str(e.value) == error_message + + +def test_first_parameter_is_dataframe(): + error_message = ( + "First argument has to be a pandas DataFrame. If you wish to use a " + "different argument, please use `pass_dataframe_as` option." + ) + + def target_fn(upstream_df: int) -> pd.DataFrame: + return upstream_df + + dummy_node = node.Node.from_fn(target_fn) + + decorator = with_columns( + dummy_fn_with_columns, columns_to_pass=["col_1"], select=["dummy_fn_with_columns"] + ) + injectable_params = NodeInjector.find_injectable_params([dummy_node]) + + with pytest.raises(ValueError) as e: + decorator._get_inital_nodes(fn=target_fn, params=injectable_params) + + assert str(e.value) == error_message + + +def test_create_column_nodes_pass_dataframe(): + def target_fn(some_var: int, upstream_df: pd.DataFrame) -> pd.DataFrame: + return upstream_df + + dummy_node = node.Node.from_fn(target_fn) + + decorator = with_columns( + dummy_fn_with_columns, pass_dataframe_as="upstream_df", select=["dummy_fn_with_columns"] + ) + injectable_params = NodeInjector.find_injectable_params([dummy_node]) + inject_parameter, initial_nodes = decorator._get_inital_nodes( + fn=target_fn, params=injectable_params + ) + + assert inject_parameter == "upstream_df" + assert len(initial_nodes) == 0 + + +def test_create_column_nodes_extract_single_columns(): + def dummy_df() -> pd.DataFrame: + return pd.DataFrame({"col_1": [1, 2, 3, 4], "col_2": [11, 12, 13, 14]}) + + def target_fn(upstream_df: pd.DataFrame) -> pd.DataFrame: + return upstream_df + + dummy_node = node.Node.from_fn(target_fn) + + decorator = with_columns( + dummy_fn_with_columns, columns_to_pass=["col_1"], select=["dummy_fn_with_columns"] + ) + injectable_params = NodeInjector.find_injectable_params([dummy_node]) + + inject_parameter, initial_nodes = decorator._get_inital_nodes( + fn=target_fn, params=injectable_params + ) + + assert inject_parameter == "upstream_df" + assert len(initial_nodes) == 1 + assert initial_nodes[0].name == "col_1" + assert initial_nodes[0].type == pd.Series + pd.testing.assert_series_equal( + initial_nodes[0].callable(upstream_df=dummy_df()), + pd.Series([1, 2, 3, 4]), + check_names=False, + ) + + +def test_create_column_nodes_extract_multiple_columns(): + def dummy_df() -> pd.DataFrame: + return pd.DataFrame({"col_1": [1, 2, 3, 4], "col_2": [11, 12, 13, 14]}) + + def target_fn(upstream_df: pd.DataFrame) -> pd.DataFrame: + return upstream_df + + dummy_node = node.Node.from_fn(target_fn) + + decorator = with_columns( + dummy_fn_with_columns, columns_to_pass=["col_1", "col_2"], select=["dummy_fn_with_columns"] + ) + injectable_params = NodeInjector.find_injectable_params([dummy_node]) + + inject_parameter, initial_nodes = decorator._get_inital_nodes( + fn=target_fn, params=injectable_params + ) + + assert inject_parameter == "upstream_df" + assert len(initial_nodes) == 2 + assert initial_nodes[0].name == "col_1" + assert initial_nodes[1].name == "col_2" + assert initial_nodes[0].type == pd.Series + assert initial_nodes[1].type == pd.Series + pd.testing.assert_series_equal( + initial_nodes[0].callable(upstream_df=dummy_df()), + pd.Series([1, 2, 3, 4]), + check_names=False, + ) + pd.testing.assert_series_equal( + initial_nodes[1].callable(upstream_df=dummy_df()), + pd.Series([11, 12, 13, 14]), + check_names=False, + ) + + +def test_no_matching_select_column_error(): + def target_fn(upstream_df: pd.DataFrame) -> pd.DataFrame: + return upstream_df + + dummy_node = node.Node.from_fn(target_fn) + select = "wrong_column" + + decorator = with_columns( + dummy_fn_with_columns, columns_to_pass=["col_1", "col_2"], select=select + ) + injectable_params = NodeInjector.find_injectable_params([dummy_node]) + + error_message = ( + f"No nodes found upstream from select columns: {select} for function: " + f"{target_fn.__qualname__}" + ) + with pytest.raises(ValueError) as e: + decorator.inject_nodes(injectable_params, {}, fn=target_fn) + + assert str(e.value) == error_message + + +def test_append_into_original_df(): + def dummy_df() -> pd.DataFrame: + return pd.DataFrame({"col_1": [1, 2, 3, 4], "col_2": [11, 12, 13, 14]}) + + def target_fn(upstream_df: pd.DataFrame) -> pd.DataFrame: + return upstream_df + + decorator = with_columns( + dummy_fn_with_columns, columns_to_pass=["col_1", "col_2"], select=["dummy_fn_with_columns"] + ) + merge_node = decorator._create_merge_node(upstream_node="upstream_df", node_name="merge_node") + + output_df = merge_node.callable( + upstream_df=dummy_df(), + dummy_fn_with_columns=dummy_fn_with_columns(col_1=pd.Series([1, 2, 3, 4])), + ) + assert merge_node.name == "merge_node" + assert merge_node.type == pd.DataFrame + + pd.testing.assert_series_equal(output_df["col_1"], pd.Series([1, 2, 3, 4]), check_names=False) + pd.testing.assert_series_equal( + output_df["col_2"], pd.Series([11, 12, 13, 14]), check_names=False + ) + pd.testing.assert_series_equal( + output_df["dummy_fn_with_columns"], pd.Series([101, 102, 103, 104]), check_names=False + ) + + +def test_override_original_column_in_df(): + def dummy_df() -> pd.DataFrame: + return pd.DataFrame({"col_1": [1, 2, 3, 4], "col_2": [11, 12, 13, 14]}) + + def target_fn(upstream_df: pd.DataFrame) -> pd.DataFrame: + return upstream_df + + def col_1() -> pd.Series: + return pd.Series([0, 3, 5, 7]) + + decorator = with_columns(col_1, pass_dataframe_as=["upstream_df"], select=["col_1"]) + merge_node = decorator._create_merge_node(upstream_node="upstream_df", node_name="merge_node") + + output_df = merge_node.callable(upstream_df=dummy_df(), col_1=col_1()) + assert merge_node.name == "merge_node" + assert merge_node.type == pd.DataFrame + + pd.testing.assert_series_equal(output_df["col_1"], pd.Series([0, 3, 5, 7]), check_names=False) + pd.testing.assert_series_equal( + output_df["col_2"], pd.Series([11, 12, 13, 14]), check_names=False + ) + + +def test_assign_custom_namespace_with_columns(): + def target_fn(upstream_df: pd.DataFrame) -> pd.DataFrame: + return upstream_df + + dummy_node = node.Node.from_fn(target_fn) + decorator = with_columns( + dummy_fn_with_columns, + columns_to_pass=["col_1", "col_2"], + select=["dummy_fn_with_columns"], + namespace="dummy_namespace", + ) + nodes_ = decorator.transform_dag([dummy_node], {}, target_fn) + + assert nodes_[0].name == "target_fn" + assert nodes_[1].name == "dummy_namespace.col_1" + assert nodes_[2].name == "dummy_namespace.col_2" + assert nodes_[3].name == "dummy_namespace.dummy_fn_with_columns" + assert nodes_[4].name == "dummy_namespace.__append" + + +def test_end_to_end_with_columns_automatic_extract(): + config_5 = { + "factor": 5, + } + dr = driver.Builder().with_modules(with_columns_end_to_end).with_config(config_5).build() + result = dr.execute(final_vars=["final_df"], inputs={"user_factor": 1000})["final_df"] + + expected_df = pd.DataFrame( + { + "col_1": [1, 2, 3, 4], + "col_2": [11, 12, 13, 14], + "col_3": [1, 1, 1, 1], + "subtract_1_from_2": [10, 10, 10, 10], + "multiply_3": [5, 5, 5, 5], + "add_1_by_user_adjustment_factor": [1001, 1002, 1003, 1004], + "multiply_2_by_upstream_3": [33, 36, 39, 42], + } + ) + pd.testing.assert_frame_equal(result, expected_df) + + config_7 = { + "factor": 7, + } + dr = driver.Builder().with_modules(with_columns_end_to_end).with_config(config_7).build() + result = dr.execute(final_vars=["final_df"], inputs={"user_factor": 1000})["final_df"] + + expected_df = pd.DataFrame( + { + "col_1": [1, 2, 3, 4], + "col_2": [11, 12, 13, 14], + "col_3": [1, 1, 1, 1], + "subtract_1_from_2": [10, 10, 10, 10], + "multiply_3": [7, 7, 7, 7], + "add_1_by_user_adjustment_factor": [1001, 1002, 1003, 1004], + "multiply_2_by_upstream_3": [33, 36, 39, 42], + } + ) + pd.testing.assert_frame_equal(result, expected_df) + + +def test_end_to_end_with_columns_pass_dataframe(): + config_5 = { + "factor": 5, + } + dr = driver.Builder().with_modules(with_columns_end_to_end).with_config(config_5).build() + + result = dr.execute(final_vars=["final_df_2"])["final_df_2"] + expected_df = pd.DataFrame( + { + "col_1": [1, 2, 3, 4], + "col_2": [11, 12, 13, 14], + "col_3": [0, 2, 4, 6], + "multiply_3": [0, 10, 20, 30], + } + ) + pd.testing.assert_frame_equal(result, expected_df) From af0a0ed914abb9a3900da2b49a66beafc294c48a Mon Sep 17 00:00:00 2001 From: jernejfrank Date: Mon, 4 Nov 2024 15:15:17 +0800 Subject: [PATCH 3/3] Add with_columns for pandas hello_world example --- examples/pandas/with_columns/DAG.png | Bin 0 -> 111327 bytes examples/pandas/with_columns/README | 7 + examples/pandas/with_columns/my_functions.py | 47 ++ examples/pandas/with_columns/notebook.ipynb | 723 +++++++++++++++++++ 4 files changed, 777 insertions(+) create mode 100644 examples/pandas/with_columns/DAG.png create mode 100644 examples/pandas/with_columns/README create mode 100644 examples/pandas/with_columns/my_functions.py create mode 100644 examples/pandas/with_columns/notebook.ipynb diff --git a/examples/pandas/with_columns/DAG.png b/examples/pandas/with_columns/DAG.png new file mode 100644 index 0000000000000000000000000000000000000000..9621e973728f39667ba51d76e992013af0ff0c7b GIT binary patch literal 111327 zcmd?RbySvV^e&7#iX#exsFZ*~NJ}@EAl+Tk-Q75fC@C!DvONa?vyFzdU4Gry@;EQKcXlR$}(9q6zUcLlB zG2PKFf&X376c%`fc8dJ-xjZ=t4ebt^;Iqdv_MetV929zVCQsH6ztHD4P5ynE4qua? z=)*MTYQaZs=|zPI7iE-Xr(uMlabtx`Q$#4f$>33lbgY3v(%)W(fkoaJbVYxizmogs z!i22GDVa16U!==rGP@jD$NxKi75Tn=P2-xk*9MC$v`lnf zowHpTOq`z9{_iDraEVBurj}MnaIg%a;!Af*>7)l_WVQ2nXUot>q~~+)_qQrKiT|x% z`ehgjG-@unY;_1UHZ(ABHrnJ^&54z7wsKKDc<_)DF{OV#k6vp@*Yt`siiwed;|33*+|!Ld2mTA(Iat41g3*7D~` zig7&<+#Pkw{`&Q6cbY6U&yT7LRJ6327Hj@|Cn?{)J$wG_*#ll)wYBoyXqsD1+xxm_ zgLx#zk6uJ9_K7`~n8)z}J-xVAolk(jzkev5YMMsP+l3akvF-%HAX33}`Rv}$k6%wu zOvJvrfZo{AA?eSv^D%ADjokqmqXhqP8%VNTQm~(_C>43bHpZt z`GPUd8}F}RV6d2tatlfs8m7IyihqxdjqMr