From c373b5d4e7ac98535ec4ab8d8ab251f6cc34f509 Mon Sep 17 00:00:00 2001 From: jernejfrank Date: Mon, 18 Nov 2024 16:13:49 +0800 Subject: [PATCH] Refactor with_columns to with_columns_factory Central with_columns_factory from which dataframe libraries inherit. Changes: - Use with_columns_factory to inherit from and only need to add correct dataframe types and create_merge_node method. - Refactored h_pandas.with_columns and h_polars.with_columns to inherit from it. - Added async support for pandas (not sure if it makes sense for polars) - Select=None appends all sink nodes with column types -- same as h_spark.with_columns --- examples/pandas/with_columns/notebook.ipynb | 863 ++++++------ examples/polars/notebook.ipynb | 240 ++-- examples/polars/with_columns/README | 5 +- examples/polars/with_columns/notebook.ipynb | 1364 +++++++++---------- hamilton/function_modifiers/recursive.py | 240 +++- hamilton/plugins/h_pandas.py | 184 +-- hamilton/plugins/h_polars.py | 166 +-- plugin_tests/h_pandas/test_with_columns.py | 74 +- plugin_tests/h_polars/test_with_columns.py | 199 ++- tests/function_modifiers/test_recursive.py | 78 ++ 10 files changed, 1810 insertions(+), 1603 deletions(-) diff --git a/examples/pandas/with_columns/notebook.ipynb b/examples/pandas/with_columns/notebook.ipynb index 49eca8ed7..8b9ba9de9 100644 --- a/examples/pandas/with_columns/notebook.ipynb +++ b/examples/pandas/with_columns/notebook.ipynb @@ -22,7 +22,7 @@ }, { "cell_type": "code", - "execution_count": 1, + "execution_count": null, "metadata": {}, "outputs": [ { @@ -59,334 +59,228 @@ "\n", "\n", - "\n", - "\n", - "\n", + "\n", + "\n", + "\n", "\n", "cluster__legend\n", - "\n", - "Legend\n", + "\n", + "Legend\n", "\n", "\n", "\n", "case\n", - "\n", - "\n", - "\n", - "case\n", - "thousands\n", + "\n", + "\n", + "\n", + "case\n", + "thousands\n", "\n", - "\n", + "\n", "\n", - "initial_df\n", - "\n", - "initial_df\n", - "DataFrame\n", - "\n", - "\n", - "\n", - "final_df.signups\n", - "\n", - "final_df.signups\n", - "Series\n", - "\n", - "\n", - "\n", - "initial_df->final_df.signups\n", - "\n", - "\n", + "final_df.spend_zero_mean_unit_variance\n", + "\n", + "final_df.spend_zero_mean_unit_variance\n", + "Series\n", "\n", "\n", - "\n", + "\n", "final_df.__append\n", - "\n", - "final_df.__append\n", - "DataFrame\n", - "\n", - "\n", - "\n", - "initial_df->final_df.__append\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "final_df.spend\n", - "\n", - "final_df.spend\n", - "Series\n", + "\n", + "final_df.__append\n", + "DataFrame\n", "\n", - "\n", - "\n", - "initial_df->final_df.spend\n", - "\n", - "\n", + "\n", + "\n", + "final_df.spend_zero_mean_unit_variance->final_df.__append\n", + "\n", + "\n", "\n", - "\n", + "\n", "\n", - "final_df.avg_3wk_spend\n", - "\n", - "final_df.avg_3wk_spend: case\n", - "Series\n", + "final_df.spend_mean\n", + "\n", + "final_df.spend_mean\n", + "float\n", "\n", - "\n", - "\n", - "final_df.avg_3wk_spend->final_df.__append\n", - "\n", + "\n", + "\n", + "final_df.spend_zero_mean\n", + "\n", + "final_df.spend_zero_mean\n", + "Series\n", "\n", - "\n", - "\n", - "final_df.signups->final_df.__append\n", - "\n", - "\n", - "\n", + "\n", + "\n", + "final_df.spend_mean->final_df.spend_zero_mean\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.signups\n", + "\n", + "final_df.signups\n", + "Series\n", "\n", "\n", - "\n", + "\n", "final_df.spend_per_signup\n", - "\n", - "final_df.spend_per_signup\n", - "Series\n", + "\n", + "final_df.spend_per_signup\n", + "Series\n", "\n", "\n", - "\n", + "\n", "final_df.signups->final_df.spend_per_signup\n", - "\n", - "\n", + "\n", + "\n", "\n", - "\n", + "\n", "\n", - "final_df.spend_zero_mean\n", - "\n", - "final_df.spend_zero_mean\n", - "Series\n", + "final_df.avg_3wk_spend\n", + "\n", + "final_df.avg_3wk_spend: case\n", + "Series\n", "\n", - "\n", - "\n", - "final_df.spend_zero_mean_unit_variance\n", - "\n", - "final_df.spend_zero_mean_unit_variance\n", - "Series\n", + "\n", + "\n", + "final_df.avg_3wk_spend->final_df.__append\n", + "\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.spend_zero_mean->final_df.spend_zero_mean_unit_variance\n", - "\n", - "\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend_zero_mean_unit_variance->final_df.__append\n", - "\n", - "\n", + "\n", + "\n", + "final_df.spend_zero_mean->final_df.__append\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df\n", - "\n", - "final_df\n", - "DataFrame\n", - "\n", - "\n", - "\n", - "final_df.__append->final_df\n", - "\n", - "\n", + "\n", + "final_df\n", + "DataFrame\n", "\n", - "\n", - "\n", - "final_df.spend_mean\n", - "\n", - "final_df.spend_mean\n", - "float\n", - "\n", - "\n", - "\n", - "final_df.spend_mean->final_df.spend_zero_mean\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "final_df.spend_std_dev\n", - "\n", - "final_df.spend_std_dev\n", - "float\n", + "\n", + "\n", + "final_df.spend\n", + "\n", + "final_df.spend\n", + "Series\n", "\n", - "\n", - "\n", - "final_df.spend_std_dev->final_df.spend_zero_mean_unit_variance\n", - "\n", - "\n", + "\n", + "\n", + "final_df.spend->final_df.spend_mean\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.spend->final_df.avg_3wk_spend\n", - "\n", - "\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.spend->final_df.spend_zero_mean\n", - "\n", - "\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend->final_df.__append\n", - "\n", + "\n", + "\n", + "final_df.spend->final_df.spend_per_signup\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend->final_df.spend_mean\n", - "\n", - "\n", + "\n", + "\n", + "final_df.spend_std_dev\n", + "\n", + "final_df.spend_std_dev\n", + "float\n", "\n", "\n", - "\n", - "final_df.spend->final_df.spend_std_dev\n", - "\n", - "\n", - "\n", - "\n", "\n", - "final_df.spend->final_df.spend_per_signup\n", - "\n", - "\n", + "final_df.spend->final_df.spend_std_dev\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.spend_per_signup->final_df.__append\n", - "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.__append->final_df\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_std_dev->final_df.spend_zero_mean_unit_variance\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "initial_df\n", + "\n", + "initial_df\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "initial_df->final_df.signups\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "initial_df->final_df.spend\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "initial_df->final_df.__append\n", + "\n", + "\n", + "\n", "\n", "\n", "\n", "config\n", - "\n", - "\n", - "\n", - "config\n", + "\n", + "\n", + "\n", + "config\n", "\n", "\n", "\n", "function\n", - "\n", - "function\n", + "\n", + "function\n", "\n", "\n", "\n", "output\n", - "\n", - "output\n", + "\n", + "output\n", "\n", "\n", "\n" ], "text/plain": [ - "" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
signupsspendavg_3wk_spendspend_per_signupspend_zero_mean_unit_variance
0110000000.0NaN10000000.0-1.064405
11010000000.0NaN1000000.0-1.064405
25020000000.013333.333333400000.0-0.483821
310040000000.023333.333333400000.00.677349
420040000000.033333.333333200000.00.677349
540050000000.043333.333333125000.01.257934
\n", - "
" - ], - "text/plain": [ - " signups spend avg_3wk_spend spend_per_signup \\\n", - "0 1 10000000.0 NaN 10000000.0 \n", - "1 10 10000000.0 NaN 1000000.0 \n", - "2 50 20000000.0 13333.333333 400000.0 \n", - "3 100 40000000.0 23333.333333 400000.0 \n", - "4 200 40000000.0 33333.333333 200000.0 \n", - "5 400 50000000.0 43333.333333 125000.0 \n", - "\n", - " spend_zero_mean_unit_variance \n", - "0 -1.064405 \n", - "1 -1.064405 \n", - "2 -0.483821 \n", - "3 0.677349 \n", - "4 0.677349 \n", - "5 1.257934 " + "" ] }, "metadata": {}, @@ -419,7 +313,7 @@ "@with_columns(\n", " *[my_functions],\n", " columns_to_pass=[\"spend\", \"signups\"], # The columns to select from the dataframe\n", - " select=output_columns, # The columns to append to the dataframe\n", + " # select=output_columns, # The columns to append to the dataframe\n", " # config_required = [\"a\"]\n", ")\n", "def final_df(initial_df: pd.DataFrame) -> pd.DataFrame:\n", @@ -435,13 +329,13 @@ "name": "stdout", "output_type": "stream", "text": [ - " signups spend avg_3wk_spend spend_per_signup \\\n", - "0 1 10000000.0 NaN 10000000.0 \n", - "1 10 10000000.0 NaN 1000000.0 \n", - "2 50 20000000.0 13.333333 400000.0 \n", - "3 100 40000000.0 23.333333 400000.0 \n", - "4 200 40000000.0 33.333333 200000.0 \n", - "5 400 50000000.0 43.333333 125000.0 \n", + " signups spend avg_3wk_spend spend_per_signup spend_zero_mean \\\n", + "0 1 10000000.0 NaN 10000000.0 -1.833333e+07 \n", + "1 10 10000000.0 NaN 1000000.0 -1.833333e+07 \n", + "2 50 20000000.0 13.333333 400000.0 -8.333333e+06 \n", + "3 100 40000000.0 23.333333 400000.0 1.166667e+07 \n", + "4 200 40000000.0 33.333333 200000.0 1.166667e+07 \n", + "5 400 50000000.0 43.333333 125000.0 2.166667e+07 \n", "\n", " spend_zero_mean_unit_variance \n", "0 -1.064405 \n", @@ -461,232 +355,228 @@ "\n", "\n", - "\n", - "\n", - "\n", + "\n", + "\n", + "\n", "\n", "cluster__legend\n", - "\n", - "Legend\n", + "\n", + "Legend\n", "\n", "\n", "\n", "case\n", - "\n", - "\n", - "\n", - "case\n", - "millions\n", + "\n", + "\n", + "\n", + "case\n", + "millions\n", "\n", - "\n", + "\n", "\n", - "initial_df\n", - "\n", - "initial_df\n", - "DataFrame\n", - "\n", - "\n", - "\n", - "final_df.signups\n", - "\n", - "final_df.signups\n", - "Series\n", - "\n", - "\n", - "\n", - "initial_df->final_df.signups\n", - "\n", - "\n", + "final_df.spend_zero_mean_unit_variance\n", + "\n", + "final_df.spend_zero_mean_unit_variance\n", + "Series\n", "\n", "\n", - "\n", + "\n", "final_df.__append\n", - "\n", - "final_df.__append\n", - "DataFrame\n", + "\n", + "final_df.__append\n", + "DataFrame\n", "\n", - "\n", - "\n", - "initial_df->final_df.__append\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "final_df.spend\n", - "\n", - "final_df.spend\n", - "Series\n", - "\n", - "\n", - "\n", - "initial_df->final_df.spend\n", - "\n", - "\n", + "\n", + "\n", + "final_df.spend_zero_mean_unit_variance->final_df.__append\n", + "\n", + "\n", "\n", - "\n", + "\n", "\n", - "final_df.avg_3wk_spend\n", - "\n", - "final_df.avg_3wk_spend: case\n", - "Series\n", + "final_df.spend_mean\n", + "\n", + "final_df.spend_mean\n", + "float\n", "\n", - "\n", - "\n", - "final_df.avg_3wk_spend->final_df.__append\n", - "\n", + "\n", + "\n", + "final_df.spend_zero_mean\n", + "\n", + "final_df.spend_zero_mean\n", + "Series\n", "\n", - "\n", - "\n", - "final_df.signups->final_df.__append\n", - "\n", - "\n", - "\n", + "\n", + "\n", + "final_df.spend_mean->final_df.spend_zero_mean\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.signups\n", + "\n", + "final_df.signups\n", + "Series\n", "\n", "\n", - "\n", + "\n", "final_df.spend_per_signup\n", - "\n", - "final_df.spend_per_signup\n", - "Series\n", + "\n", + "final_df.spend_per_signup\n", + "Series\n", "\n", "\n", - "\n", + "\n", "final_df.signups->final_df.spend_per_signup\n", - "\n", - "\n", + "\n", + "\n", "\n", - "\n", + "\n", "\n", - "final_df.spend_zero_mean\n", - "\n", - "final_df.spend_zero_mean\n", - "Series\n", + "final_df.avg_3wk_spend\n", + "\n", + "final_df.avg_3wk_spend: case\n", + "Series\n", "\n", - "\n", - "\n", - "final_df.spend_zero_mean_unit_variance\n", - "\n", - "final_df.spend_zero_mean_unit_variance\n", - "Series\n", + "\n", + "\n", + "final_df.avg_3wk_spend->final_df.__append\n", + "\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.spend_zero_mean->final_df.spend_zero_mean_unit_variance\n", - "\n", - "\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend_zero_mean_unit_variance->final_df.__append\n", - "\n", - "\n", + "\n", + "\n", + "final_df.spend_zero_mean->final_df.__append\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df\n", - "\n", - "final_df\n", - "DataFrame\n", + "\n", + "final_df\n", + "DataFrame\n", "\n", - "\n", - "\n", - "final_df.__append->final_df\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "final_df.spend_mean\n", - "\n", - "final_df.spend_mean\n", - "float\n", - "\n", - "\n", - "\n", - "final_df.spend_mean->final_df.spend_zero_mean\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "final_df.spend_std_dev\n", - "\n", - "final_df.spend_std_dev\n", - "float\n", + "\n", + "\n", + "final_df.spend\n", + "\n", + "final_df.spend\n", + "Series\n", "\n", - "\n", - "\n", - "final_df.spend_std_dev->final_df.spend_zero_mean_unit_variance\n", - "\n", - "\n", + "\n", + "\n", + "final_df.spend->final_df.spend_mean\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.spend->final_df.avg_3wk_spend\n", - "\n", - "\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.spend->final_df.spend_zero_mean\n", - "\n", - "\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend->final_df.__append\n", - "\n", + "\n", + "\n", + "final_df.spend->final_df.spend_per_signup\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend->final_df.spend_mean\n", - "\n", - "\n", + "\n", + "\n", + "final_df.spend_std_dev\n", + "\n", + "final_df.spend_std_dev\n", + "float\n", "\n", "\n", - "\n", - "final_df.spend->final_df.spend_std_dev\n", - "\n", - "\n", - "\n", - "\n", "\n", - "final_df.spend->final_df.spend_per_signup\n", - "\n", - "\n", + "final_df.spend->final_df.spend_std_dev\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.spend_per_signup->final_df.__append\n", - "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.__append->final_df\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.spend_std_dev->final_df.spend_zero_mean_unit_variance\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "initial_df\n", + "\n", + "initial_df\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "initial_df->final_df.signups\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "initial_df->final_df.spend\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "initial_df->final_df.__append\n", + "\n", + "\n", + "\n", "\n", "\n", "\n", "config\n", - "\n", - "\n", - "\n", - "config\n", + "\n", + "\n", + "\n", + "config\n", "\n", "\n", "\n", "function\n", - "\n", - "function\n", + "\n", + "function\n", "\n", "\n", "\n", "output\n", - "\n", - "output\n", + "\n", + "output\n", "\n", "\n", "\n" ], "text/plain": [ - "" + "" ] }, "execution_count": 3, @@ -701,6 +591,113 @@ "dr.visualize_execution(final_vars=[\"final_df\"])\n" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# We can also run it async" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/jernejfrank/miniconda3/envs/hamilton/lib/python3.10/site-packages/pyspark/pandas/__init__.py:50: UserWarning: 'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is required to set this environment variable to '1' in both driver and executor sides if you use pyarrow>=2.0.0. pandas-on-Spark will set it for you but it does not work if there is a Spark context already launched.\n", + " warnings.warn(\n", + "/Users/jernejfrank/miniconda3/envs/hamilton/lib/python3.10/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n", + " from .autonotebook import tqdm as notebook_tqdm\n" + ] + } + ], + "source": [ + "%reload_ext hamilton.plugins.jupyter_magic" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "%%cell_to_module with_columns_async\n", + "\n", + "import asyncio\n", + "import pandas as pd\n", + "from hamilton.plugins.h_pandas import with_columns\n", + "\n", + "async def data_input() -> pd.DataFrame:\n", + " await asyncio.sleep(0.0001)\n", + " return pd.DataFrame({\n", + " \"a\": [1, 2, 3],\n", + " \"b\": [4, 5, 6],\n", + " \"c\": [7, 8, 9]\n", + " })\n", + "\n", + "\n", + "async def multiply_a(a: pd.Series) -> pd.Series:\n", + " await asyncio.sleep(0.0001)\n", + " return a * 10\n", + "\n", + "\n", + "async def mean_b(b: pd.Series) -> pd.Series:\n", + " await asyncio.sleep(0.0001)\n", + " return b.mean()\n", + "\n", + "async def a_plus_b(a: pd.Series, b: pd.Series) -> pd.Series:\n", + " await asyncio.sleep(0.0001)\n", + " return a + b\n", + "\n", + "async def multiply_a_plus_mean_b(multiply_a: pd.Series, mean_b: pd.Series) -> pd.Series:\n", + " await asyncio.sleep(0.0001)\n", + " return multiply_a + mean_b\n", + "\n", + "\n", + "@with_columns(\n", + " multiply_a,mean_b,a_plus_b, multiply_a_plus_mean_b,\n", + " columns_to_pass=[\"a\", \"b\"]\n", + ")\n", + "def final_df(data_input: pd.DataFrame) -> pd.DataFrame:\n", + " return data_input" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " a b c multiply_a mean_b a_plus_b multiply_a_plus_mean_b\n", + "0 1 4 7 10 5.0 5 15.0\n", + "1 2 5 8 20 5.0 7 25.0\n", + "2 3 6 9 30 5.0 9 35.0\n" + ] + } + ], + "source": [ + "import asyncio\n", + "from hamilton import async_driver\n", + "import with_columns_async\n", + "\n", + "async def main():\n", + " await asyncio.sleep(2)\n", + " dr = (await async_driver.Builder()\n", + " .with_modules(with_columns_async)\n", + " .with_config({\"case\":\"millions\"})\n", + " .build())\n", + " results = await dr.execute([\"final_df\"])\n", + " print(results[\"final_df\"])\n", + "\n", + "await main()\n" + ] + }, { "cell_type": "code", "execution_count": null, diff --git a/examples/polars/notebook.ipynb b/examples/polars/notebook.ipynb index c8cad7e44..c81678590 100644 --- a/examples/polars/notebook.ipynb +++ b/examples/polars/notebook.ipynb @@ -38,8 +38,10 @@ "name": "stderr", "output_type": "stream", "text": [ - "/Users/stefankrawczyk/.pyenv/versions/knowledge_retrieval-py39/lib/python3.9/site-packages/pyspark/pandas/__init__.py:50: UserWarning: 'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is required to set this environment variable to '1' in both driver and executor sides if you use pyarrow>=2.0.0. pandas-on-Spark will set it for you but it does not work if there is a Spark context already launched.\n", - " warnings.warn(\n" + "/Users/jernejfrank/miniconda3/envs/hamilton/lib/python3.10/site-packages/pyspark/pandas/__init__.py:50: UserWarning: 'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is required to set this environment variable to '1' in both driver and executor sides if you use pyarrow>=2.0.0. pandas-on-Spark will set it for you but it does not work if there is a Spark context already launched.\n", + " warnings.warn(\n", + "/Users/jernejfrank/miniconda3/envs/hamilton/lib/python3.10/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n", + " from .autonotebook import tqdm as notebook_tqdm\n" ] } ], @@ -70,177 +72,177 @@ "\n", "\n", - "\n", "\n", - "\n", - "\n", - "\n", + "\n", + "\n", + "\n", "\n", "cluster__legend\n", - "\n", - "Legend\n", + "\n", + "Legend\n", "\n", - "\n", + "\n", "\n", + "spend_zero_mean\n", + "\n", + "spend_zero_mean\n", + "Series\n", + "\n", + "\n", + "\n", + "spend_zero_mean_unit_variance\n", + "\n", + "spend_zero_mean_unit_variance\n", + "Series\n", + "\n", + "\n", + "\n", + "spend_zero_mean->spend_zero_mean_unit_variance\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "spend_mean\n", + "\n", + "spend_mean\n", + "float\n", + "\n", + "\n", + "\n", + "spend_mean->spend_zero_mean\n", + "\n", + "\n", + "\n", + "\n", + "\n", "spend_per_signup\n", "\n", "spend_per_signup\n", "Series\n", "\n", "\n", - "\n", + "\n", "avg_3wk_spend\n", - "\n", - "avg_3wk_spend\n", - "Series\n", + "\n", + "avg_3wk_spend\n", + "Series\n", "\n", - "\n", - "\n", - "spend\n", - "\n", - "spend\n", - "Series\n", + "\n", + "\n", + "base_df\n", + "\n", + "base_df\n", + "DataFrame\n", "\n", - "\n", - "\n", - "spend->spend_per_signup\n", - "\n", - "\n", + "\n", + "\n", + "signups\n", + "\n", + "signups\n", + "Series\n", "\n", - "\n", - "\n", - "spend->avg_3wk_spend\n", - "\n", - "\n", + "\n", + "\n", + "base_df->signups\n", + "\n", + "\n", "\n", - "\n", - "\n", - "spend_mean\n", - "\n", - "spend_mean\n", - "float\n", + "\n", + "\n", + "spend\n", + "\n", + "spend\n", + "Series\n", "\n", - "\n", - "\n", - "spend->spend_mean\n", - "\n", - "\n", + "\n", + "\n", + "base_df->spend\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "spend_std_dev\n", "\n", "spend_std_dev\n", "float\n", "\n", - "\n", + "\n", "\n", - "spend->spend_std_dev\n", - "\n", - "\n", + "spend_std_dev->spend_zero_mean_unit_variance\n", + "\n", + "\n", "\n", - "\n", - "\n", - "spend_zero_mean\n", - "\n", - "spend_zero_mean\n", - "Series\n", + "\n", + "\n", + "signups->spend_per_signup\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "spend->spend_zero_mean\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "spend_mean->spend_zero_mean\n", - "\n", - "\n", + "\n", + "\n", "\n", - "\n", - "\n", - "base_df\n", - "\n", - "base_df\n", - "DataFrame\n", + "\n", + "\n", + "spend->spend_mean\n", + "\n", + "\n", "\n", - "\n", + "\n", "\n", - "base_df->spend\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "signups\n", - "\n", - "signups\n", - "Series\n", + "spend->spend_per_signup\n", + "\n", + "\n", "\n", - "\n", + "\n", "\n", - "base_df->signups\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "spend_zero_mean_unit_variance\n", - "\n", - "spend_zero_mean_unit_variance\n", - "Series\n", - "\n", - "\n", - "\n", - "spend_std_dev->spend_zero_mean_unit_variance\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "signups->spend_per_signup\n", - "\n", - "\n", + "spend->avg_3wk_spend\n", + "\n", + "\n", "\n", - "\n", - "\n", - "spend_zero_mean->spend_zero_mean_unit_variance\n", - "\n", - "\n", + "\n", + "\n", + "spend->spend_std_dev\n", + "\n", + "\n", "\n", "\n", "\n", "_base_df_inputs\n", - "\n", - "base_df_location\n", - "str\n", + "\n", + "base_df_location\n", + "str\n", "\n", "\n", - "\n", + "\n", "_base_df_inputs->base_df\n", - "\n", - "\n", + "\n", + "\n", "\n", "\n", "\n", "input\n", - "\n", - "input\n", + "\n", + "input\n", "\n", "\n", "\n", "function\n", - "\n", - "function\n", + "\n", + "function\n", "\n", "\n", "\n" ], "text/plain": [ - "" + "" ] }, "metadata": {}, @@ -777,7 +779,7 @@ ], "metadata": { "kernelspec": { - "display_name": "Python 3 (ipykernel)", + "display_name": "hamilton", "language": "python", "name": "python3" }, @@ -791,7 +793,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.9" + "version": "3.10.14" } }, "nbformat": 4, diff --git a/examples/polars/with_columns/README b/examples/polars/with_columns/README index aaa19f4fe..86db77204 100644 --- a/examples/polars/with_columns/README +++ b/examples/polars/with_columns/README @@ -1,7 +1,8 @@ -# Using with_columns with Pandas +# Using with_columns with Polars We show the ability to use the familiar `with_columns` from `polars`. Supported for both `pl.DataFrame` and `pl.LazyFrame`. To see the example look at the notebook. -![image info](./dag.png) +![image info](./DAG_DataFrame.png) +![image info](./DAG_lazy.png) diff --git a/examples/polars/with_columns/notebook.ipynb b/examples/polars/with_columns/notebook.ipynb index 1314c88d0..99b19178d 100644 --- a/examples/polars/with_columns/notebook.ipynb +++ b/examples/polars/with_columns/notebook.ipynb @@ -59,233 +59,228 @@ "\n", "\n", - "\n", - "\n", - "\n", + "\n", + "\n", + "\n", "\n", "cluster__legend\n", - "\n", - "Legend\n", + "\n", + "Legend\n", "\n", "\n", "\n", "case\n", - "\n", - "\n", - "\n", - "case\n", - "thousands\n", + "\n", + "\n", + "\n", + "case\n", + "thousands\n", "\n", - "\n", + "\n", "\n", - "final_df.signups\n", - "\n", - "final_df.signups\n", - "Series\n", - "\n", - "\n", - "\n", - "final_df.spend_per_signup\n", - "\n", - "final_df.spend_per_signup\n", - "Series\n", - "\n", - "\n", - "\n", - "final_df.signups->final_df.spend_per_signup\n", - "\n", - "\n", + "final_df.spend_zero_mean\n", + "\n", + "final_df.spend_zero_mean\n", + "Series\n", "\n", "\n", - "\n", + "\n", "final_df.__append\n", - "\n", - "final_df.__append\n", - "DataFrame\n", + "\n", + "final_df.__append\n", + "DataFrame\n", "\n", - "\n", - "\n", - "final_df.signups->final_df.__append\n", - "\n", + "\n", + "\n", + "final_df.spend_zero_mean->final_df.__append\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend_per_signup->final_df.__append\n", - "\n", + "\n", + "\n", + "final_df.spend_zero_mean_unit_variance\n", + "\n", + "final_df.spend_zero_mean_unit_variance\n", + "Series\n", "\n", - "\n", - "\n", - "final_df.spend\n", - "\n", - "final_df.spend\n", - "Series\n", + "\n", + "\n", + "final_df.spend_zero_mean->final_df.spend_zero_mean_unit_variance\n", + "\n", + "\n", "\n", - "\n", + "\n", + "\n", + "final_df.spend_mean\n", + "\n", + "final_df.spend_mean\n", + "float\n", + "\n", + "\n", "\n", - "final_df.spend->final_df.spend_per_signup\n", - "\n", - "\n", + "final_df.spend_mean->final_df.spend_zero_mean\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend->final_df.__append\n", - "\n", + "\n", + "\n", + "final_df\n", + "\n", + "final_df\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "final_df.__append->final_df\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.spend_std_dev\n", - "\n", - "final_df.spend_std_dev\n", - "float\n", + "\n", + "final_df.spend_std_dev\n", + "float\n", "\n", - "\n", - "\n", - "final_df.spend->final_df.spend_std_dev\n", - "\n", - "\n", + "\n", + "\n", + "final_df.spend_std_dev->final_df.spend_zero_mean_unit_variance\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend_mean\n", - "\n", - "final_df.spend_mean\n", - "float\n", + "\n", + "\n", + "final_df.spend\n", + "\n", + "final_df.spend\n", + "Series\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.spend_zero_mean\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.spend->final_df.spend_mean\n", - "\n", - "\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend_zero_mean\n", - "\n", - "final_df.spend_zero_mean\n", - "Series\n", + "\n", + "\n", + "final_df.spend->final_df.spend_std_dev\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend->final_df.spend_zero_mean\n", - "\n", - "\n", + "\n", + "\n", + "final_df.spend_per_signup\n", + "\n", + "final_df.spend_per_signup\n", + "Series\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.spend_per_signup\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.avg_3wk_spend\n", - "\n", - "final_df.avg_3wk_spend: case\n", - "Series\n", + "\n", + "final_df.avg_3wk_spend: case\n", + "Series\n", "\n", "\n", - "\n", + "\n", "final_df.spend->final_df.avg_3wk_spend\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "final_df.spend_zero_mean_unit_variance\n", - "\n", - "final_df.spend_zero_mean_unit_variance\n", - "Series\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.spend_zero_mean_unit_variance->final_df.__append\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "final_df\n", - "\n", - "final_df\n", - "DataFrame\n", - "\n", - "\n", - "\n", - "final_df.__append->final_df\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "final_df.spend_std_dev->final_df.spend_zero_mean_unit_variance\n", - "\n", - "\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend_mean->final_df.spend_zero_mean\n", - "\n", - "\n", + "\n", + "\n", + "initial_df\n", + "\n", + "initial_df\n", + "DataFrame\n", "\n", - "\n", - "\n", - "final_df.spend_zero_mean->final_df.spend_zero_mean_unit_variance\n", - "\n", - "\n", + "\n", + "\n", + "initial_df->final_df.__append\n", + "\n", "\n", - "\n", - "\n", - "final_df.avg_3wk_spend->final_df.__append\n", - "\n", - "\n", - "\n", + "\n", + "\n", + "initial_df->final_df.spend\n", + "\n", + "\n", "\n", - "\n", - "\n", - "initial_df\n", - "\n", - "initial_df\n", - "DataFrame\n", + "\n", + "\n", + "final_df.signups\n", + "\n", + "final_df.signups\n", + "Series\n", "\n", "\n", - "\n", + "\n", "initial_df->final_df.signups\n", - "\n", - "\n", + "\n", + "\n", "\n", - "\n", - "\n", - "initial_df->final_df.spend\n", - "\n", - "\n", + "\n", + "\n", + "final_df.signups->final_df.spend_per_signup\n", + "\n", + "\n", "\n", - "\n", - "\n", - "initial_df->final_df.__append\n", - "\n", - "\n", - "\n", + "\n", + "\n", + "final_df.spend_per_signup->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.avg_3wk_spend->final_df.__append\n", + "\n", + "\n", + "\n", "\n", "\n", "\n", "config\n", - "\n", - "\n", - "\n", - "config\n", + "\n", + "\n", + "\n", + "config\n", "\n", "\n", "\n", "function\n", - "\n", - "function\n", + "\n", + "function\n", "\n", "\n", "\n", "output\n", - "\n", - "output\n", + "\n", + "output\n", "\n", "\n", "\n" ], "text/plain": [ - "" + "" ] }, "metadata": {}, @@ -318,7 +313,7 @@ "@with_columns(\n", " *[my_functions],\n", " columns_to_pass=[\"spend\", \"signups\"], # The columns to select from the dataframe\n", - " select=output_columns, # The columns to append to the dataframe\n", + " # select=output_columns, # The columns to append to the dataframe\n", " # config_required = [\"a\"]\n", ")\n", "def final_df(initial_df: pl.DataFrame) -> pl.DataFrame:\n", @@ -334,19 +329,20 @@ "name": "stdout", "output_type": "stream", "text": [ - "shape: (6, 5)\n", - "┌─────────┬───────┬───────────────┬──────────────────┬───────────────────────────────┐\n", - "│ signups ┆ spend ┆ avg_3wk_spend ┆ spend_per_signup ┆ spend_zero_mean_unit_variance │\n", - "│ --- ┆ --- ┆ --- ┆ --- ┆ --- │\n", - "│ i64 ┆ f64 ┆ f64 ┆ f64 ┆ f64 │\n", - "╞═════════╪═══════╪═══════════════╪══════════════════╪═══════════════════════════════╡\n", - "│ 1 ┆ 1e7 ┆ null ┆ 1e7 ┆ -1.064405 │\n", - "│ 10 ┆ 1e7 ┆ null ┆ 1e6 ┆ -1.064405 │\n", - "│ 50 ┆ 2e7 ┆ 13.333333 ┆ 400000.0 ┆ -0.483821 │\n", - "│ 100 ┆ 4e7 ┆ 23.333333 ┆ 400000.0 ┆ 0.677349 │\n", - "│ 200 ┆ 4e7 ┆ 33.333333 ┆ 200000.0 ┆ 0.677349 │\n", - "│ 400 ┆ 5e7 ┆ 43.333333 ┆ 125000.0 ┆ 1.257934 │\n", - "└─────────┴───────┴───────────────┴──────────────────┴───────────────────────────────┘\n" + "shape: (6, 6)\n", + "┌─────────┬───────┬───────────────┬──────────────────┬─────────────────┬───────────────────────────┐\n", + "│ signups ┆ spend ┆ avg_3wk_spend ┆ spend_per_signup ┆ spend_zero_mean ┆ spend_zero_mean_unit_vari │\n", + "│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ ance │\n", + "│ i64 ┆ f64 ┆ f64 ┆ f64 ┆ f64 ┆ --- │\n", + "│ ┆ ┆ ┆ ┆ ┆ f64 │\n", + "╞═════════╪═══════╪═══════════════╪══════════════════╪═════════════════╪═══════════════════════════╡\n", + "│ 1 ┆ 1e7 ┆ null ┆ 1e7 ┆ -1.8333e7 ┆ -1.064405 │\n", + "│ 10 ┆ 1e7 ┆ null ┆ 1e6 ┆ -1.8333e7 ┆ -1.064405 │\n", + "│ 50 ┆ 2e7 ┆ 13.333333 ┆ 400000.0 ┆ -8.3333e6 ┆ -0.483821 │\n", + "│ 100 ┆ 4e7 ┆ 23.333333 ┆ 400000.0 ┆ 1.1667e7 ┆ 0.677349 │\n", + "│ 200 ┆ 4e7 ┆ 33.333333 ┆ 200000.0 ┆ 1.1667e7 ┆ 0.677349 │\n", + "│ 400 ┆ 5e7 ┆ 43.333333 ┆ 125000.0 ┆ 2.1667e7 ┆ 1.257934 │\n", + "└─────────┴───────┴───────────────┴──────────────────┴─────────────────┴───────────────────────────┘\n" ] }, { @@ -358,233 +354,228 @@ "\n", "\n", - "\n", - "\n", - "\n", + "\n", + "\n", + "\n", "\n", "cluster__legend\n", - "\n", - "Legend\n", + "\n", + "Legend\n", "\n", "\n", "\n", "case\n", - "\n", - "\n", - "\n", - "case\n", - "millions\n", + "\n", + "\n", + "\n", + "case\n", + "millions\n", "\n", - "\n", + "\n", "\n", - "final_df.signups\n", - "\n", - "final_df.signups\n", - "Series\n", - "\n", - "\n", - "\n", - "final_df.spend_per_signup\n", - "\n", - "final_df.spend_per_signup\n", - "Series\n", - "\n", - "\n", - "\n", - "final_df.signups->final_df.spend_per_signup\n", - "\n", - "\n", + "final_df.spend_zero_mean\n", + "\n", + "final_df.spend_zero_mean\n", + "Series\n", "\n", "\n", - "\n", + "\n", "final_df.__append\n", - "\n", - "final_df.__append\n", - "DataFrame\n", + "\n", + "final_df.__append\n", + "DataFrame\n", "\n", - "\n", - "\n", - "final_df.signups->final_df.__append\n", - "\n", + "\n", + "\n", + "final_df.spend_zero_mean->final_df.__append\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend_per_signup->final_df.__append\n", - "\n", + "\n", + "\n", + "final_df.spend_zero_mean_unit_variance\n", + "\n", + "final_df.spend_zero_mean_unit_variance\n", + "Series\n", "\n", - "\n", - "\n", - "final_df.spend\n", - "\n", - "final_df.spend\n", - "Series\n", + "\n", + "\n", + "final_df.spend_zero_mean->final_df.spend_zero_mean_unit_variance\n", + "\n", + "\n", "\n", - "\n", + "\n", + "\n", + "final_df.spend_mean\n", + "\n", + "final_df.spend_mean\n", + "float\n", + "\n", + "\n", "\n", - "final_df.spend->final_df.spend_per_signup\n", - "\n", - "\n", + "final_df.spend_mean->final_df.spend_zero_mean\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend->final_df.__append\n", - "\n", + "\n", + "\n", + "final_df\n", + "\n", + "final_df\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "final_df.__append->final_df\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.spend_std_dev\n", - "\n", - "final_df.spend_std_dev\n", - "float\n", + "\n", + "final_df.spend_std_dev\n", + "float\n", "\n", - "\n", - "\n", - "final_df.spend->final_df.spend_std_dev\n", - "\n", - "\n", + "\n", + "\n", + "final_df.spend_std_dev->final_df.spend_zero_mean_unit_variance\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend_mean\n", - "\n", - "final_df.spend_mean\n", - "float\n", + "\n", + "\n", + "final_df.spend\n", + "\n", + "final_df.spend\n", + "Series\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.spend_zero_mean\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.spend->final_df.spend_mean\n", - "\n", - "\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend_zero_mean\n", - "\n", - "final_df.spend_zero_mean\n", - "Series\n", + "\n", + "\n", + "final_df.spend->final_df.spend_std_dev\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend->final_df.spend_zero_mean\n", - "\n", - "\n", + "\n", + "\n", + "final_df.spend_per_signup\n", + "\n", + "final_df.spend_per_signup\n", + "Series\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.spend_per_signup\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.avg_3wk_spend\n", - "\n", - "final_df.avg_3wk_spend: case\n", - "Series\n", + "\n", + "final_df.avg_3wk_spend: case\n", + "Series\n", "\n", "\n", - "\n", + "\n", "final_df.spend->final_df.avg_3wk_spend\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "final_df.spend_zero_mean_unit_variance\n", - "\n", - "final_df.spend_zero_mean_unit_variance\n", - "Series\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.spend_zero_mean_unit_variance->final_df.__append\n", - "\n", - "\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df\n", - "\n", - "final_df\n", - "DataFrame\n", + "\n", + "\n", + "initial_df\n", + "\n", + "initial_df\n", + "DataFrame\n", "\n", - "\n", - "\n", - "final_df.__append->final_df\n", - "\n", - "\n", + "\n", + "\n", + "initial_df->final_df.__append\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend_std_dev->final_df.spend_zero_mean_unit_variance\n", - "\n", - "\n", + "\n", + "\n", + "initial_df->final_df.spend\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend_mean->final_df.spend_zero_mean\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "final_df.spend_zero_mean->final_df.spend_zero_mean_unit_variance\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "final_df.avg_3wk_spend->final_df.__append\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "initial_df\n", - "\n", - "initial_df\n", - "DataFrame\n", + "\n", + "\n", + "final_df.signups\n", + "\n", + "final_df.signups\n", + "Series\n", "\n", "\n", - "\n", + "\n", "initial_df->final_df.signups\n", - "\n", - "\n", + "\n", + "\n", "\n", - "\n", - "\n", - "initial_df->final_df.spend\n", - "\n", - "\n", + "\n", + "\n", + "final_df.signups->final_df.spend_per_signup\n", + "\n", + "\n", "\n", - "\n", - "\n", - "initial_df->final_df.__append\n", - "\n", - "\n", - "\n", + "\n", + "\n", + "final_df.spend_per_signup->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.avg_3wk_spend->final_df.__append\n", + "\n", + "\n", + "\n", "\n", "\n", "\n", "config\n", - "\n", - "\n", - "\n", - "config\n", + "\n", + "\n", + "\n", + "config\n", "\n", "\n", "\n", "function\n", - "\n", - "function\n", + "\n", + "function\n", "\n", "\n", "\n", "output\n", - "\n", - "output\n", + "\n", + "output\n", "\n", "\n", "\n" ], "text/plain": [ - "" + "" ] }, "execution_count": 3, @@ -625,7 +616,7 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 7, "metadata": {}, "outputs": [ { @@ -637,233 +628,228 @@ "\n", "\n", - "\n", - "\n", - "\n", + "\n", + "\n", + "\n", "\n", "cluster__legend\n", - "\n", - "Legend\n", + "\n", + "Legend\n", "\n", "\n", "\n", "case\n", - "\n", - "\n", - "\n", - "case\n", - "thousands\n", + "\n", + "\n", + "\n", + "case\n", + "thousands\n", "\n", - "\n", + "\n", "\n", - "final_df.signups\n", - "\n", - "final_df.signups\n", - "Expr\n", - "\n", - "\n", - "\n", - "final_df.spend_per_signup\n", - "\n", - "final_df.spend_per_signup\n", - "Expr\n", - "\n", - "\n", - "\n", - "final_df.signups->final_df.spend_per_signup\n", - "\n", - "\n", + "final_df.spend_zero_mean\n", + "\n", + "final_df.spend_zero_mean\n", + "Expr\n", "\n", "\n", - "\n", + "\n", "final_df.__append\n", - "\n", - "final_df.__append\n", - "LazyFrame\n", + "\n", + "final_df.__append\n", + "LazyFrame\n", "\n", - "\n", - "\n", - "final_df.signups->final_df.__append\n", - "\n", + "\n", + "\n", + "final_df.spend_zero_mean->final_df.__append\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend_per_signup->final_df.__append\n", - "\n", + "\n", + "\n", + "final_df.spend_zero_mean_unit_variance\n", + "\n", + "final_df.spend_zero_mean_unit_variance\n", + "Expr\n", "\n", - "\n", - "\n", - "final_df.spend\n", - "\n", - "final_df.spend\n", - "Expr\n", + "\n", + "\n", + "final_df.spend_zero_mean->final_df.spend_zero_mean_unit_variance\n", + "\n", + "\n", "\n", - "\n", + "\n", + "\n", + "final_df.spend_mean\n", + "\n", + "final_df.spend_mean\n", + "float\n", + "\n", + "\n", "\n", - "final_df.spend->final_df.spend_per_signup\n", - "\n", - "\n", + "final_df.spend_mean->final_df.spend_zero_mean\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend->final_df.__append\n", - "\n", + "\n", + "\n", + "final_df\n", + "\n", + "final_df\n", + "LazyFrame\n", + "\n", + "\n", + "\n", + "final_df.__append->final_df\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.spend_std_dev\n", - "\n", - "final_df.spend_std_dev\n", - "float\n", + "\n", + "final_df.spend_std_dev\n", + "float\n", "\n", - "\n", - "\n", - "final_df.spend->final_df.spend_std_dev\n", - "\n", - "\n", + "\n", + "\n", + "final_df.spend_std_dev->final_df.spend_zero_mean_unit_variance\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend_mean\n", - "\n", - "final_df.spend_mean\n", - "float\n", + "\n", + "\n", + "final_df.spend\n", + "\n", + "final_df.spend\n", + "Expr\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.spend_zero_mean\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.spend->final_df.spend_mean\n", - "\n", - "\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend_zero_mean\n", - "\n", - "final_df.spend_zero_mean\n", - "Expr\n", + "\n", + "\n", + "final_df.spend->final_df.spend_std_dev\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend->final_df.spend_zero_mean\n", - "\n", - "\n", + "\n", + "\n", + "final_df.spend_per_signup\n", + "\n", + "final_df.spend_per_signup\n", + "Expr\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.spend_per_signup\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.avg_3wk_spend\n", - "\n", - "final_df.avg_3wk_spend: case\n", - "Expr\n", + "\n", + "final_df.avg_3wk_spend: case\n", + "Expr\n", "\n", "\n", - "\n", + "\n", "final_df.spend->final_df.avg_3wk_spend\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "final_df.spend_zero_mean_unit_variance\n", - "\n", - "final_df.spend_zero_mean_unit_variance\n", - "Expr\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.spend_zero_mean_unit_variance->final_df.__append\n", - "\n", - "\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df\n", - "\n", - "final_df\n", - "LazyFrame\n", - "\n", - "\n", - "\n", - "final_df.__append->final_df\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "final_df.spend_std_dev->final_df.spend_zero_mean_unit_variance\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "final_df.spend_mean->final_df.spend_zero_mean\n", - "\n", - "\n", + "\n", + "\n", + "initial_df\n", + "\n", + "initial_df\n", + "LazyFrame\n", "\n", - "\n", - "\n", - "final_df.spend_zero_mean->final_df.spend_zero_mean_unit_variance\n", - "\n", - "\n", + "\n", + "\n", + "initial_df->final_df.__append\n", + "\n", "\n", - "\n", - "\n", - "final_df.avg_3wk_spend->final_df.__append\n", - "\n", - "\n", - "\n", + "\n", + "\n", + "initial_df->final_df.spend\n", + "\n", + "\n", "\n", - "\n", - "\n", - "initial_df\n", - "\n", - "initial_df\n", - "LazyFrame\n", + "\n", + "\n", + "final_df.signups\n", + "\n", + "final_df.signups\n", + "Expr\n", "\n", "\n", - "\n", + "\n", "initial_df->final_df.signups\n", - "\n", - "\n", + "\n", + "\n", "\n", - "\n", - "\n", - "initial_df->final_df.spend\n", - "\n", - "\n", + "\n", + "\n", + "final_df.signups->final_df.spend_per_signup\n", + "\n", + "\n", "\n", - "\n", - "\n", - "initial_df->final_df.__append\n", - "\n", - "\n", - "\n", + "\n", + "\n", + "final_df.spend_per_signup->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.avg_3wk_spend->final_df.__append\n", + "\n", + "\n", + "\n", "\n", "\n", "\n", "config\n", - "\n", - "\n", - "\n", - "config\n", + "\n", + "\n", + "\n", + "config\n", "\n", "\n", "\n", "function\n", - "\n", - "function\n", + "\n", + "function\n", "\n", "\n", "\n", "output\n", - "\n", - "output\n", + "\n", + "output\n", "\n", "\n", "\n" ], "text/plain": [ - "" + "" ] }, "metadata": {}, @@ -896,7 +882,7 @@ "@with_columns(\n", " *[my_functions_lazy],\n", " columns_to_pass=[\"spend\", \"signups\"], # The columns to select from the dataframe\n", - " select=output_columns, # The columns to append to the dataframe\n", + " # select=output_columns, # The columns to append to the dataframe\n", " # config_required = [\"a\"]\n", ")\n", "def final_df(initial_df: pl.LazyFrame) -> pl.LazyFrame:\n", @@ -905,26 +891,27 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "shape: (6, 5)\n", - "┌─────────┬───────┬───────────────┬──────────────────┬───────────────────────────────┐\n", - "│ signups ┆ spend ┆ avg_3wk_spend ┆ spend_per_signup ┆ spend_zero_mean_unit_variance │\n", - "│ --- ┆ --- ┆ --- ┆ --- ┆ --- │\n", - "│ i64 ┆ f64 ┆ f64 ┆ f64 ┆ f64 │\n", - "╞═════════╪═══════╪═══════════════╪══════════════════╪═══════════════════════════════╡\n", - "│ 1 ┆ 1e7 ┆ null ┆ 1e7 ┆ -1.064405 │\n", - "│ 10 ┆ 1e7 ┆ null ┆ 1e6 ┆ -1.064405 │\n", - "│ 50 ┆ 2e7 ┆ 13.333333 ┆ 400000.0 ┆ -0.483821 │\n", - "│ 100 ┆ 4e7 ┆ 23.333333 ┆ 400000.0 ┆ 0.677349 │\n", - "│ 200 ┆ 4e7 ┆ 33.333333 ┆ 200000.0 ┆ 0.677349 │\n", - "│ 400 ┆ 5e7 ┆ 43.333333 ┆ 125000.0 ┆ 1.257934 │\n", - "└─────────┴───────┴───────────────┴──────────────────┴───────────────────────────────┘\n" + "shape: (6, 6)\n", + "┌─────────┬───────┬───────────────┬──────────────────┬─────────────────┬───────────────────────────┐\n", + "│ signups ┆ spend ┆ avg_3wk_spend ┆ spend_per_signup ┆ spend_zero_mean ┆ spend_zero_mean_unit_vari │\n", + "│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ ance │\n", + "│ i64 ┆ f64 ┆ f64 ┆ f64 ┆ f64 ┆ --- │\n", + "│ ┆ ┆ ┆ ┆ ┆ f64 │\n", + "╞═════════╪═══════╪═══════════════╪══════════════════╪═════════════════╪═══════════════════════════╡\n", + "│ 1 ┆ 1e7 ┆ null ┆ 1e7 ┆ -1.8333e7 ┆ -1.064405 │\n", + "│ 10 ┆ 1e7 ┆ null ┆ 1e6 ┆ -1.8333e7 ┆ -1.064405 │\n", + "│ 50 ┆ 2e7 ┆ 13.333333 ┆ 400000.0 ┆ -8.3333e6 ┆ -0.483821 │\n", + "│ 100 ┆ 4e7 ┆ 23.333333 ┆ 400000.0 ┆ 1.1667e7 ┆ 0.677349 │\n", + "│ 200 ┆ 4e7 ┆ 33.333333 ┆ 200000.0 ┆ 1.1667e7 ┆ 0.677349 │\n", + "│ 400 ┆ 5e7 ┆ 43.333333 ┆ 125000.0 ┆ 2.1667e7 ┆ 1.257934 │\n", + "└─────────┴───────┴───────────────┴──────────────────┴─────────────────┴───────────────────────────┘\n" ] }, { @@ -936,236 +923,231 @@ "\n", "\n", - "\n", - "\n", - "\n", + "\n", + "\n", + "\n", "\n", "cluster__legend\n", - "\n", - "Legend\n", + "\n", + "Legend\n", "\n", "\n", "\n", "case\n", - "\n", - "\n", - "\n", - "case\n", - "millions\n", + "\n", + "\n", + "\n", + "case\n", + "millions\n", "\n", - "\n", + "\n", "\n", - "final_df.signups\n", - "\n", - "final_df.signups\n", - "Expr\n", - "\n", - "\n", - "\n", - "final_df.spend_per_signup\n", - "\n", - "final_df.spend_per_signup\n", - "Expr\n", - "\n", - "\n", - "\n", - "final_df.signups->final_df.spend_per_signup\n", - "\n", - "\n", + "final_df.spend_zero_mean\n", + "\n", + "final_df.spend_zero_mean\n", + "Expr\n", "\n", "\n", - "\n", + "\n", "final_df.__append\n", - "\n", - "final_df.__append\n", - "LazyFrame\n", + "\n", + "final_df.__append\n", + "LazyFrame\n", "\n", - "\n", - "\n", - "final_df.signups->final_df.__append\n", - "\n", + "\n", + "\n", + "final_df.spend_zero_mean->final_df.__append\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend_per_signup->final_df.__append\n", - "\n", + "\n", + "\n", + "final_df.spend_zero_mean_unit_variance\n", + "\n", + "final_df.spend_zero_mean_unit_variance\n", + "Expr\n", "\n", - "\n", - "\n", - "final_df.spend\n", - "\n", - "final_df.spend\n", - "Expr\n", + "\n", + "\n", + "final_df.spend_zero_mean->final_df.spend_zero_mean_unit_variance\n", + "\n", + "\n", "\n", - "\n", + "\n", + "\n", + "final_df.spend_mean\n", + "\n", + "final_df.spend_mean\n", + "float\n", + "\n", + "\n", "\n", - "final_df.spend->final_df.spend_per_signup\n", - "\n", - "\n", + "final_df.spend_mean->final_df.spend_zero_mean\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend->final_df.__append\n", - "\n", + "\n", + "\n", + "final_df\n", + "\n", + "final_df\n", + "LazyFrame\n", + "\n", + "\n", + "\n", + "final_df.__append->final_df\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.spend_std_dev\n", - "\n", - "final_df.spend_std_dev\n", - "float\n", + "\n", + "final_df.spend_std_dev\n", + "float\n", "\n", - "\n", - "\n", - "final_df.spend->final_df.spend_std_dev\n", - "\n", - "\n", + "\n", + "\n", + "final_df.spend_std_dev->final_df.spend_zero_mean_unit_variance\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend_mean\n", - "\n", - "final_df.spend_mean\n", - "float\n", + "\n", + "\n", + "final_df.spend\n", + "\n", + "final_df.spend\n", + "Expr\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.spend_zero_mean\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.spend->final_df.spend_mean\n", - "\n", - "\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend_zero_mean\n", - "\n", - "final_df.spend_zero_mean\n", - "Expr\n", + "\n", + "\n", + "final_df.spend->final_df.spend_std_dev\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend->final_df.spend_zero_mean\n", - "\n", - "\n", + "\n", + "\n", + "final_df.spend_per_signup\n", + "\n", + "final_df.spend_per_signup\n", + "Expr\n", + "\n", + "\n", + "\n", + "final_df.spend->final_df.spend_per_signup\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.avg_3wk_spend\n", - "\n", - "final_df.avg_3wk_spend: case\n", - "Expr\n", + "\n", + "final_df.avg_3wk_spend: case\n", + "Expr\n", "\n", "\n", - "\n", + "\n", "final_df.spend->final_df.avg_3wk_spend\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "final_df.spend_zero_mean_unit_variance\n", - "\n", - "final_df.spend_zero_mean_unit_variance\n", - "Expr\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "final_df.spend_zero_mean_unit_variance->final_df.__append\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "final_df\n", - "\n", - "final_df\n", - "LazyFrame\n", - "\n", - "\n", - "\n", - "final_df.__append->final_df\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "final_df.spend_std_dev->final_df.spend_zero_mean_unit_variance\n", - "\n", - "\n", + "\n", + "\n", "\n", - "\n", - "\n", - "final_df.spend_mean->final_df.spend_zero_mean\n", - "\n", - "\n", + "\n", + "\n", + "initial_df\n", + "\n", + "initial_df\n", + "LazyFrame\n", "\n", - "\n", - "\n", - "final_df.spend_zero_mean->final_df.spend_zero_mean_unit_variance\n", - "\n", - "\n", + "\n", + "\n", + "initial_df->final_df.__append\n", + "\n", "\n", - "\n", - "\n", - "final_df.avg_3wk_spend->final_df.__append\n", - "\n", - "\n", - "\n", + "\n", + "\n", + "initial_df->final_df.spend\n", + "\n", + "\n", "\n", - "\n", - "\n", - "initial_df\n", - "\n", - "initial_df\n", - "LazyFrame\n", + "\n", + "\n", + "final_df.signups\n", + "\n", + "final_df.signups\n", + "Expr\n", "\n", "\n", - "\n", + "\n", "initial_df->final_df.signups\n", - "\n", - "\n", + "\n", + "\n", "\n", - "\n", - "\n", - "initial_df->final_df.spend\n", - "\n", - "\n", + "\n", + "\n", + "final_df.signups->final_df.spend_per_signup\n", + "\n", + "\n", "\n", - "\n", - "\n", - "initial_df->final_df.__append\n", - "\n", - "\n", - "\n", + "\n", + "\n", + "final_df.spend_per_signup->final_df.__append\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_df.avg_3wk_spend->final_df.__append\n", + "\n", + "\n", + "\n", "\n", "\n", "\n", "config\n", - "\n", - "\n", - "\n", - "config\n", + "\n", + "\n", + "\n", + "config\n", "\n", "\n", "\n", "function\n", - "\n", - "function\n", + "\n", + "function\n", "\n", "\n", "\n", "output\n", - "\n", - "output\n", + "\n", + "output\n", "\n", "\n", "\n" ], "text/plain": [ - "" + "" ] }, - "execution_count": 6, + "execution_count": 8, "metadata": {}, "output_type": "execute_result" } diff --git a/hamilton/function_modifiers/recursive.py b/hamilton/function_modifiers/recursive.py index 744d222e2..35576b891 100644 --- a/hamilton/function_modifiers/recursive.py +++ b/hamilton/function_modifiers/recursive.py @@ -1,5 +1,8 @@ +import abc import inspect import sys +import typing +from collections import defaultdict from types import ModuleType from typing import Any, Callable, Collection, Dict, List, Optional, Tuple, Type, TypedDict, Union @@ -11,10 +14,13 @@ else: from typing import NotRequired +from pandas import DataFrame as PandasDataFrame +from polars import DataFrame as PolarsDataFrame +from polars import LazyFrame as PolarsLazyFrame # Copied this over from function_graph # TODO -- determine the best place to put this code -from hamilton import graph_utils, node +from hamilton import graph_utils, node, registry from hamilton.function_modifiers import base, dependencies from hamilton.function_modifiers.base import InvalidDecoratorException, NodeTransformer from hamilton.function_modifiers.dependencies import ( @@ -22,6 +28,7 @@ ParametrizedDependency, UpstreamDependency, ) +from hamilton.function_modifiers.expanders import extract_columns def assign_namespace(node_name: str, namespace: str) -> str: @@ -626,3 +633,234 @@ def prune_nodes(nodes: List[node.Node], select: Optional[List[str]] = None) -> L stack.append(dep_node) seen_nodes.add(dep) return output + + +SUPPORTED_DATAFAME_TYPES = [PandasDataFrame, PolarsDataFrame, PolarsLazyFrame] + + +class with_columns_factory(base.NodeInjector, abc.ABC): + """Performs with_columns operation on a dataframe. This is a special case of NodeInjector + that applies only to dataframes. For now can be used with: + + - Pandas + - Polars + + This is used when you want to extract some columns out of the dataframe, perform operations + on them and then append to the original dataframe. + + def processed_data(data: pd.DataFrame) -> pd.DataFrame: + ... + + In this case we would build a subdag out of the node ``data`` and append selected nodes back to + the original dataframe before feeding it into ``processed_data``. + """ + + # TODO: if we rename the column nodes into something smarter this can be avoided and + # can also modify columns in place + @staticmethod + def _check_for_duplicates(nodes_: List[node.Node]) -> bool: + """Ensures that we don't run into name clashing of columns and group operations. + + In the case when we extract columns for the user, because ``columns_to_pass`` was used, we want + to safeguard against nameclashing with functions that are passed into ``with_columns`` - i.e. + there are no functions that have the same name as the columns. This effectively means that + using ``columns_to_pass`` will only append new columns to the dataframe and for changing + existing columns ``pass_dataframe_as`` needs to be used. + """ + node_counter = defaultdict(int) + for node_ in nodes_: + node_counter[node_.name] += 1 + if node_counter[node_.name] > 1: + return True + return False + + def validate_dataframe_type(self): + if not set(self.allowed_dataframe_types).issubset(list(SUPPORTED_DATAFAME_TYPES)): + raise InvalidDecoratorException( + f"The provided dataframe types: {self.allowed_dataframe_types} are currently not supported " + "to be used in `with_columns`. Please reach out if you need it. " + f"We currently only support: {SUPPORTED_DATAFAME_TYPES}." + ) + + def __init__( + self, + *load_from: Union[Callable, ModuleType], + columns_to_pass: List[str] = None, + pass_dataframe_as: str = None, + select: List[str] = None, + namespace: str = None, + config_required: List[str] = None, + dataframe_types: Collection[Type] = None, + ): + """Instantiates a ``@with_column`` decorator. + + :param load_from: The functions or modules that will be used to generate the group of map operations. + :param columns_to_pass: The initial schema of the dataframe. This is used to determine which + upstream inputs should be taken from the dataframe, and which shouldn't. Note that, if this is + left empty (and external_inputs is as well), we will assume that all dependencies come + from the dataframe. This cannot be used in conjunction with pass_dataframe_as. + :param pass_dataframe_as: The name of the dataframe that we're modifying, as known to the subdag. + If you pass this in, you are responsible for extracting columns out. If not provided, you have + to pass columns_to_pass in, and we will extract the columns out for you. + :param select: The end nodes that represent columns to be appended to the original dataframe + via with_columns. Existing columns will be overridden. + :param namespace: The namespace of the nodes, so they don't clash with the global namespace + and so this can be reused. If its left out, there will be no namespace (in which case you'll want + to be careful about repeating it/reusing the nodes in other parts of the DAG.) + :param config_required: the list of config keys that are required to resolve any functions. Pass in None\ + if you want the functions/modules to have access to all possible config. + """ + + if dataframe_types is None: + raise ValueError("You need to specify which dataframe types it will be applied to.") + else: + if isinstance(dataframe_types, Type): + dataframe_types = [dataframe_types] + self.allowed_dataframe_types = dataframe_types + self.validate_dataframe_type() + + self.subdag_functions = subdag.collect_functions(load_from) + self.select = select + + if (pass_dataframe_as is not None and columns_to_pass is not None) or ( + pass_dataframe_as is None and columns_to_pass is None + ): + raise ValueError( + "You must specify only one of columns_to_pass and " + "pass_dataframe_as. " + "This is because specifying pass_dataframe_as injects into " + "the set of columns, allowing you to perform your own extraction" + "from the dataframe. We then execute all columns in the sbudag" + "in order, passing in that initial dataframe. If you want" + "to reference columns in your code, you'll have to specify " + "the set of initial columns, and allow the subdag decorator " + "to inject the dataframe through. The initial columns tell " + "us which parameters to take from that dataframe, so we can" + "feed the right data into the right columns." + ) + + self.initial_schema = columns_to_pass + self.dataframe_subdag_param = pass_dataframe_as + self.namespace = namespace + self.config_required = config_required + + def required_config(self) -> List[str]: + return self.config_required + + def _create_column_nodes( + self, inject_parameter: str, params: Dict[str, Type[Type]] + ) -> List[node.Node]: + output_type = params[inject_parameter] + + if self.is_async: + + async def temp_fn(**kwargs) -> Any: + return kwargs[inject_parameter] + else: + + def temp_fn(**kwargs) -> Any: + return kwargs[inject_parameter] + + # We recreate the df node to use extract columns + temp_node = node.Node( + name=inject_parameter, + typ=output_type, + callabl=temp_fn, + input_types={inject_parameter: output_type}, + ) + + extract_columns_decorator = extract_columns(*self.initial_schema) + + out_nodes = extract_columns_decorator.transform_node(temp_node, config={}, fn=temp_fn) + return out_nodes[1:] + + def _get_inital_nodes( + self, fn: Callable, params: Dict[str, Type[Type]] + ) -> Tuple[str, Collection[node.Node]]: + """Selects the correct dataframe and optionally extracts out columns.""" + initial_nodes = [] + sig = inspect.signature(fn) + input_types = typing.get_type_hints(fn) + + if self.dataframe_subdag_param is not None: + inject_parameter = self.dataframe_subdag_param + else: + # If we don't have a specified dataframe we assume it's the first argument + inject_parameter = list(sig.parameters.values())[0].name + + if inject_parameter not in params: + raise base.InvalidDecoratorException( + f"Function: {fn.__name__} does not have the parameter {inject_parameter} as a dependency. " + f"@with_columns requires the parameter names to match the function parameters. " + f"If you wish do not wish to use the first argument, please use `pass_dataframe_as` option. " + f"It might not be compatible with some other decorators." + ) + + if input_types[inject_parameter] not in self.allowed_dataframe_types: + raise ValueError(f"Dataframe has to be a {self.allowed_dataframe_types} DataFrame.") + else: + self.dataframe_type = input_types[inject_parameter] + + initial_nodes = ( + [] + if self.dataframe_subdag_param is not None + else self._create_column_nodes(inject_parameter=inject_parameter, params=params) + ) + + return inject_parameter, initial_nodes + + @abc.abstractmethod + def create_merge_node(self, upstream_node: str, node_name: str) -> node.Node: + """Should create a node that merges the results back into the original dataframe. + + Node that adds to / overrides columns for the original dataframe based on selected output. + + This will be platform specific, see Pandas and Polars plugins for implementation. + """ + pass + + def inject_nodes( + self, params: Dict[str, Type[Type]], config: Dict[str, Any], fn: Callable + ) -> Tuple[List[node.Node], Dict[str, str]]: + self.is_async = inspect.iscoroutinefunction(fn) + namespace = fn.__name__ if self.namespace is None else self.namespace + + inject_parameter, initial_nodes = self._get_inital_nodes(fn=fn, params=params) + + subdag_nodes = subdag.collect_nodes(config, self.subdag_functions) + + # TODO: for now we restrict that if user wants to change columns that already exist, he needs to + # pass the dataframe and extract them himself. If we add namespace to initial nodes and rewire the + # initial node names with the ongoing ones that have a column argument, we can also allow in place + # changes when using columns_to_pass + if with_columns_factory._check_for_duplicates(initial_nodes + subdag_nodes): + raise ValueError( + "You can only specify columns once. You used `columns_to_pass` and we " + "extract the columns for you. In this case they cannot be overwritten -- only new columns get " + "appended. If you want to modify in-place columns pass in a dataframe and " + "extract + modify the columns and afterwards select them." + ) + + pruned_nodes = prune_nodes(subdag_nodes, self.select) + if len(pruned_nodes) == 0: + raise ValueError( + f"No nodes found upstream from select columns: {self.select} for function: " + f"{fn.__qualname__}" + ) + # In case no node is selected we append all possible nodes that have a column type matching + # what the dataframe expects + if self.select is None: + self.select = [ + sink_node.name + for sink_node in pruned_nodes + if sink_node.type == registry.get_column_type_from_df_type(self.dataframe_type) + ] + + merge_node = self.create_merge_node(inject_parameter, node_name="__append") + + output_nodes = initial_nodes + pruned_nodes + [merge_node] + output_nodes = subdag.add_namespace(output_nodes, namespace) + return output_nodes, {inject_parameter: assign_namespace(merge_node.name, namespace)} + + def validate(self, fn: Callable): + self.validate_dataframe_type() diff --git a/hamilton/plugins/h_pandas.py b/hamilton/plugins/h_pandas.py index 149db960b..df6a147b6 100644 --- a/hamilton/plugins/h_pandas.py +++ b/hamilton/plugins/h_pandas.py @@ -1,9 +1,6 @@ -import inspect import sys -import typing -from collections import defaultdict from types import ModuleType -from typing import Any, Callable, Collection, Dict, List, Tuple, Type, Union +from typing import Any, Callable, List, Union _sys_version_info = sys.version_info _version_tuple = (_sys_version_info.major, _sys_version_info.minor, _sys_version_info.micro) @@ -17,13 +14,13 @@ # Copied this over from function_graph # TODO -- determine the best place to put this code -from hamilton import node -from hamilton.function_modifiers import base -from hamilton.function_modifiers.expanders import extract_columns -from hamilton.function_modifiers.recursive import assign_namespace, prune_nodes, subdag +from hamilton import node, registry +from hamilton.function_modifiers.recursive import ( + with_columns_factory, +) -class with_columns(base.NodeInjector): +class with_columns(with_columns_factory): """Initializes a with_columns decorator for pandas. This allows you to efficiently run groups of map operations on a dataframe. Here's an example of calling it -- if you've seen ``@subdag``, you should be familiar with @@ -108,23 +105,6 @@ def final_df(initial_df: pd.DataFrame) -> pd.DataFrame: overwritten. """ - @staticmethod - def _check_for_duplicates(nodes_: List[node.Node]) -> bool: - """Ensures that we don't run into name clashing of columns and group operations. - - In the case when we extract columns for the user, because ``columns_to_pass`` was used, we want - to safeguard against nameclashing with functions that are passed into ``with_columns`` - i.e. - there are no functions that have the same name as the columns. This effectively means that - using ``columns_to_pass`` will only append new columns to the dataframe and for changing - existing columns ``pass_dataframe_as`` needs to be used. - """ - node_counter = defaultdict(int) - for node_ in nodes_: - node_counter[node_.name] += 1 - if node_counter[node_.name] > 1: - return True - return False - def __init__( self, *load_from: Union[Callable, ModuleType], @@ -153,142 +133,44 @@ def __init__( if you want the functions/modules to have access to all possible config. """ - self.subdag_functions = subdag.collect_functions(load_from) - - # TODO: select none should append all nodes like h_spark - if select is None: - raise ValueError("Please specify at least one column to append or update.") - else: - self.select = select - - if (pass_dataframe_as is not None and columns_to_pass is not None) or ( - pass_dataframe_as is None and columns_to_pass is None - ): - raise ValueError( - "You must specify only one of columns_to_pass and " - "pass_dataframe_as. " - "This is because specifying pass_dataframe_as injects into " - "the set of columns, allowing you to perform your own extraction" - "from the dataframe. We then execute all columns in the sbudag" - "in order, passing in that initial dataframe. If you want" - "to reference columns in your code, you'll have to specify " - "the set of initial columns, and allow the subdag decorator " - "to inject the dataframe through. The initial columns tell " - "us which parameters to take from that dataframe, so we can" - "feed the right data into the right columns." - ) - - self.initial_schema = columns_to_pass - self.dataframe_subdag_param = pass_dataframe_as - self.namespace = namespace - self.config_required = config_required - - def required_config(self) -> List[str]: - return self.config_required - - def _create_column_nodes( - self, inject_parameter: str, params: Dict[str, Type[Type]] - ) -> List[node.Node]: - output_type = params[inject_parameter] - - def temp_fn(**kwargs) -> pd.DataFrame: - return kwargs[inject_parameter] - - # We recreate the df node to use extract columns - temp_node = node.Node( - name=inject_parameter, - typ=output_type, - callabl=temp_fn, - input_types={inject_parameter: output_type}, + super().__init__( + *load_from, + columns_to_pass=columns_to_pass, + pass_dataframe_as=pass_dataframe_as, + select=select, + namespace=namespace, + config_required=config_required, + dataframe_types=pd.DataFrame, ) - extract_columns_decorator = extract_columns(*self.initial_schema) + def create_merge_node(self, upstream_node: str, node_name: str) -> node.Node: + "Node that adds to / overrides columns for the original dataframe based on selected output." + if self.is_async: - out_nodes = extract_columns_decorator.transform_node(temp_node, config={}, fn=temp_fn) - return out_nodes[1:] + async def new_callable(**kwargs) -> Any: + df = kwargs[upstream_node] + columns_to_append = {} + for column in self.select: + columns_to_append[column] = kwargs[column] - def _get_inital_nodes( - self, fn: Callable, params: Dict[str, Type[Type]] - ) -> Tuple[str, Collection[node.Node]]: - """Selects the correct dataframe and optionally extracts out columns.""" - initial_nodes = [] - if self.dataframe_subdag_param is not None: - inject_parameter = self.dataframe_subdag_param + return df.assign(**columns_to_append) else: - # If we don't have a specified dataframe we assume it's the first argument - sig = inspect.signature(fn) - inject_parameter = list(sig.parameters.values())[0].name - input_types = typing.get_type_hints(fn) - - if not input_types[inject_parameter] == pd.DataFrame: - raise ValueError( - "First argument has to be a pandas DataFrame. If you wish to use a " - "different argument, please use `pass_dataframe_as` option." - ) - - initial_nodes.extend( - self._create_column_nodes(inject_parameter=inject_parameter, params=params) - ) - - if inject_parameter not in params: - raise base.InvalidDecoratorException( - f"Function: {fn.__name__} has a first parameter that is not a dependency. " - f"@with_columns requires the parameter names to match the function parameters. " - f"Thus it might not be compatible with some other decorators" - ) - - return inject_parameter, initial_nodes - - def _create_merge_node(self, upstream_node: str, node_name: str) -> node.Node: - "Node that adds to / overrides columns for the original dataframe based on selected output." - def new_callable(**kwargs) -> Any: - df = kwargs[upstream_node] - columns_to_append = {} - for column in self.select: - columns_to_append[column] = kwargs[column] + def new_callable(**kwargs) -> Any: + df = kwargs[upstream_node] + columns_to_append = {} + for column in self.select: + columns_to_append[column] = kwargs[column] - return df.assign(**columns_to_append) + return df.assign(**columns_to_append) - input_map = {column: pd.Series for column in self.select} - input_map[upstream_node] = pd.DataFrame + column_type = registry.get_column_type_from_df_type(self.dataframe_type) + input_map = {column: column_type for column in self.select} + input_map[upstream_node] = self.dataframe_type return node.Node( name=node_name, - typ=pd.DataFrame, + typ=self.dataframe_type, callabl=new_callable, input_types=input_map, ) - - def inject_nodes( - self, params: Dict[str, Type[Type]], config: Dict[str, Any], fn: Callable - ) -> Tuple[List[node.Node], Dict[str, str]]: - namespace = fn.__name__ if self.namespace is None else self.namespace - - inject_parameter, initial_nodes = self._get_inital_nodes(fn=fn, params=params) - - subdag_nodes = subdag.collect_nodes(config, self.subdag_functions) - - if with_columns._check_for_duplicates(initial_nodes + subdag_nodes): - raise ValueError( - "You can only specify columns once. You used `columns_to_pass` and we " - "extract the columns for you. In this case they cannot be overwritten -- only new columns get " - "appended. If you want to modify in-place columns pass in a dataframe and " - "extract + modify the columns and afterwards select them." - ) - - pruned_nodes = prune_nodes(subdag_nodes, self.select) - if len(pruned_nodes) == 0: - raise ValueError( - f"No nodes found upstream from select columns: {self.select} for function: " - f"{fn.__qualname__}" - ) - - merge_node = self._create_merge_node(inject_parameter, node_name="__append") - - output_nodes = initial_nodes + pruned_nodes + [merge_node] - output_nodes = subdag.add_namespace(output_nodes, namespace) - return output_nodes, {inject_parameter: assign_namespace(merge_node.name, namespace)} - - def validate(self, fn: Callable): - pass diff --git a/hamilton/plugins/h_polars.py b/hamilton/plugins/h_polars.py index 4ee93aaa9..ec0ad2686 100644 --- a/hamilton/plugins/h_polars.py +++ b/hamilton/plugins/h_polars.py @@ -1,9 +1,6 @@ -import inspect import sys -import typing -from collections import defaultdict from types import ModuleType -from typing import Any, Callable, Collection, Dict, List, Tuple, Type, Union +from typing import Any, Callable, Dict, List, Type, Union import polars as pl @@ -17,10 +14,10 @@ # Copied this over from function_graph # TODO -- determine the best place to put this code -from hamilton import base, node -from hamilton.function_modifiers.base import InvalidDecoratorException, NodeInjector -from hamilton.function_modifiers.expanders import extract_columns -from hamilton.function_modifiers.recursive import assign_namespace, prune_nodes, subdag +from hamilton import base, node, registry +from hamilton.function_modifiers.recursive import ( + with_columns_factory, +) class PolarsDataFrameResult(base.ResultMixin): @@ -74,10 +71,7 @@ def output_type(self) -> Type: return pl.DataFrame -dataframe_columns_type_mapping = {pl.DataFrame: pl.Series, pl.LazyFrame: pl.Expr} - - -class with_columns(NodeInjector): +class with_columns(with_columns_factory): """Initializes a with_columns decorator for polars. This allows you to efficiently run groups of map operations on a dataframe. We support @@ -186,23 +180,6 @@ def final_df(initial_df: pl.DataFrame) -> pl.DataFrame: overwritten. """ - @staticmethod - def _check_for_duplicates(nodes_: List[node.Node]) -> bool: - """Ensures that we don't run into name clashing of columns and group operations. - - In the case when we extract columns for the user, because ``columns_to_pass`` was used, we want - to safeguard against nameclashing with functions that are passed into ``with_columns`` - i.e. - there are no functions that have the same name as the columns. This effectively means that - using ``columns_to_pass`` will only append new columns to the dataframe and for changing - existing columns ``pass_dataframe_as`` needs to be used. - """ - node_counter = defaultdict(int) - for node_ in nodes_: - node_counter[node_.name] += 1 - if node_counter[node_.name] > 1: - return True - return False - def __init__( self, *load_from: Union[Callable, ModuleType], @@ -232,95 +209,17 @@ def __init__( if you want the functions/modules to have access to all possible config. """ - self.subdag_functions = subdag.collect_functions(load_from) - - # TODO: select none should append all nodes like h_spark - if select is None: - raise ValueError("Please specify at least one column to append or update.") - else: - self.select = select - - if (pass_dataframe_as is not None and columns_to_pass is not None) or ( - pass_dataframe_as is None and columns_to_pass is None - ): - raise ValueError( - "You must specify only one of columns_to_pass and " - "pass_dataframe_as. " - "This is because specifying pass_dataframe_as injects into " - "the set of columns, allowing you to perform your own extraction" - "from the dataframe. We then execute all columns in the sbudag" - "in order, passing in that initial dataframe. If you want" - "to reference columns in your code, you'll have to specify " - "the set of initial columns, and allow the subdag decorator " - "to inject the dataframe through. The initial columns tell " - "us which parameters to take from that dataframe, so we can" - "feed the right data into the right columns." - ) - - self.initial_schema = columns_to_pass - self.dataframe_subdag_param = pass_dataframe_as - self.namespace = namespace - self.config_required = config_required - - def required_config(self) -> List[str]: - return self.config_required - - def _create_column_nodes( - self, inject_parameter: str, params: Dict[str, Type[Type]] - ) -> List[node.Node]: - output_type = params[inject_parameter] - - def temp_fn(**kwargs) -> Any: - return kwargs[inject_parameter] - - # We recreate the df node to use extract columns - temp_node = node.Node( - name=inject_parameter, - typ=output_type, - callabl=temp_fn, - input_types={inject_parameter: output_type}, + super().__init__( + *load_from, + columns_to_pass=columns_to_pass, + pass_dataframe_as=pass_dataframe_as, + select=select, + namespace=namespace, + config_required=config_required, + dataframe_types=[pl.DataFrame, pl.LazyFrame], ) - extract_columns_decorator = extract_columns(*self.initial_schema) - - out_nodes = extract_columns_decorator.transform_node(temp_node, config={}, fn=temp_fn) - return out_nodes[1:] - - def _get_inital_nodes( - self, fn: Callable, params: Dict[str, Type[Type]] - ) -> Tuple[str, Collection[node.Node]]: - """Selects the correct dataframe and optionally extracts out columns.""" - initial_nodes = [] - sig = inspect.signature(fn) - - if self.dataframe_subdag_param is not None: - inject_parameter = self.dataframe_subdag_param - else: - # If we don't have a specified dataframe we assume it's the first argument - inject_parameter = list(sig.parameters.values())[0].name - input_types = typing.get_type_hints(fn) - - if input_types[inject_parameter] not in [pl.DataFrame, pl.LazyFrame]: - raise ValueError( - "First argument has to be a pandas DataFrame. If you wish to use a " - "different argument, please use `pass_dataframe_as` option." - ) - - initial_nodes.extend( - self._create_column_nodes(inject_parameter=inject_parameter, params=params) - ) - - self.dataframe_type = sig.parameters[inject_parameter].annotation - if inject_parameter not in params: - raise InvalidDecoratorException( - f"Function: {fn.__name__} has a first parameter that is not a dependency. " - f"@with_columns requires the parameter names to match the function parameters. " - f"Thus it might not be compatible with some other decorators" - ) - - return inject_parameter, initial_nodes - - def _create_merge_node(self, upstream_node: str, node_name: str) -> node.Node: + def create_merge_node(self, upstream_node: str, node_name: str) -> node.Node: "Node that adds to / overrides columns for the original dataframe based on selected output." def new_callable(**kwargs) -> Any: @@ -331,7 +230,7 @@ def new_callable(**kwargs) -> Any: return df.with_columns(**columns_to_append) - column_type = dataframe_columns_type_mapping[self.dataframe_type] + column_type = registry.get_column_type_from_df_type(self.dataframe_type) input_map = {column: column_type for column in self.select} input_map[upstream_node] = self.dataframe_type @@ -341,36 +240,3 @@ def new_callable(**kwargs) -> Any: callabl=new_callable, input_types=input_map, ) - - def inject_nodes( - self, params: Dict[str, Type[Type]], config: Dict[str, Any], fn: Callable - ) -> Tuple[List[node.Node], Dict[str, str]]: - namespace = fn.__name__ if self.namespace is None else self.namespace - - inject_parameter, initial_nodes = self._get_inital_nodes(fn=fn, params=params) - - subdag_nodes = subdag.collect_nodes(config, self.subdag_functions) - - if with_columns._check_for_duplicates(initial_nodes + subdag_nodes): - raise ValueError( - "You can only specify columns once. You used `columns_to_pass` and we " - "extract the columns for you. In this case they cannot be overwritten -- only new columns get " - "appended. If you want to modify in-place columns pass in a dataframe and " - "extract + modify the columns and afterwards select them." - ) - - pruned_nodes = prune_nodes(subdag_nodes, self.select) - if len(pruned_nodes) == 0: - raise ValueError( - f"No nodes found upstream from select columns: {self.select} for function: " - f"{fn.__qualname__}" - ) - - merge_node = self._create_merge_node(inject_parameter, node_name="__append") - - output_nodes = initial_nodes + pruned_nodes + [merge_node] - output_nodes = subdag.add_namespace(output_nodes, namespace) - return output_nodes, {inject_parameter: assign_namespace(merge_node.name, namespace)} - - def validate(self, fn: Callable): - pass diff --git a/plugin_tests/h_pandas/test_with_columns.py b/plugin_tests/h_pandas/test_with_columns.py index b3a84d666..e56c72ecf 100644 --- a/plugin_tests/h_pandas/test_with_columns.py +++ b/plugin_tests/h_pandas/test_with_columns.py @@ -1,3 +1,5 @@ +import inspect + import pandas as pd import pytest @@ -12,54 +14,7 @@ def dummy_fn_with_columns(col_1: pd.Series) -> pd.Series: return col_1 + 100 -def test_detect_duplicate_nodes(): - node_a = node.Node.from_fn(dummy_fn_with_columns, name="a") - node_b = node.Node.from_fn(dummy_fn_with_columns, name="a") - node_c = node.Node.from_fn(dummy_fn_with_columns, name="c") - - if not with_columns._check_for_duplicates([node_a, node_b, node_c]): - raise (AssertionError) - - if with_columns._check_for_duplicates([node_a, node_c]): - raise (AssertionError) - - -def test_select_not_empty(): - error_message = "Please specify at least one column to append or update." - - with pytest.raises(ValueError) as e: - with_columns(dummy_fn_with_columns) - assert str(e.value) == error_message - - -def test_columns_to_pass_and_pass_dataframe_as_raises_error(): - error_message = ( - "You must specify only one of columns_to_pass and " - "pass_dataframe_as. " - "This is because specifying pass_dataframe_as injects into " - "the set of columns, allowing you to perform your own extraction" - "from the dataframe. We then execute all columns in the sbudag" - "in order, passing in that initial dataframe. If you want" - "to reference columns in your code, you'll have to specify " - "the set of initial columns, and allow the subdag decorator " - "to inject the dataframe through. The initial columns tell " - "us which parameters to take from that dataframe, so we can" - "feed the right data into the right columns." - ) - - with pytest.raises(ValueError) as e: - with_columns( - dummy_fn_with_columns, columns_to_pass=["a"], pass_dataframe_as="a", select=["a"] - ) - assert str(e.value) == error_message - - def test_first_parameter_is_dataframe(): - error_message = ( - "First argument has to be a pandas DataFrame. If you wish to use a " - "different argument, please use `pass_dataframe_as` option." - ) - def target_fn(upstream_df: int) -> pd.DataFrame: return upstream_df @@ -70,11 +25,10 @@ def target_fn(upstream_df: int) -> pd.DataFrame: ) injectable_params = NodeInjector.find_injectable_params([dummy_node]) - with pytest.raises(ValueError) as e: + # Raises error that is not pandas dataframe + with pytest.raises(ValueError): decorator._get_inital_nodes(fn=target_fn, params=injectable_params) - assert str(e.value) == error_message - def test_create_column_nodes_pass_dataframe(): def target_fn(some_var: int, upstream_df: pd.DataFrame) -> pd.DataFrame: @@ -107,7 +61,7 @@ def target_fn(upstream_df: pd.DataFrame) -> pd.DataFrame: dummy_fn_with_columns, columns_to_pass=["col_1"], select=["dummy_fn_with_columns"] ) injectable_params = NodeInjector.find_injectable_params([dummy_node]) - + decorator.is_async = inspect.iscoroutinefunction(target_fn) inject_parameter, initial_nodes = decorator._get_inital_nodes( fn=target_fn, params=injectable_params ) @@ -136,7 +90,7 @@ def target_fn(upstream_df: pd.DataFrame) -> pd.DataFrame: dummy_fn_with_columns, columns_to_pass=["col_1", "col_2"], select=["dummy_fn_with_columns"] ) injectable_params = NodeInjector.find_injectable_params([dummy_node]) - + decorator.is_async = inspect.iscoroutinefunction(target_fn) inject_parameter, initial_nodes = decorator._get_inital_nodes( fn=target_fn, params=injectable_params ) @@ -188,10 +142,15 @@ def dummy_df() -> pd.DataFrame: def target_fn(upstream_df: pd.DataFrame) -> pd.DataFrame: return upstream_df + dummy_node = node.Node.from_fn(target_fn) + decorator = with_columns( dummy_fn_with_columns, columns_to_pass=["col_1", "col_2"], select=["dummy_fn_with_columns"] ) - merge_node = decorator._create_merge_node(upstream_node="upstream_df", node_name="merge_node") + injectable_params = NodeInjector.find_injectable_params([dummy_node]) + decorator.is_async = inspect.iscoroutinefunction(target_fn) + _ = decorator._get_inital_nodes(fn=target_fn, params=injectable_params) + merge_node = decorator.create_merge_node(upstream_node="upstream_df", node_name="merge_node") output_df = merge_node.callable( upstream_df=dummy_df(), @@ -219,8 +178,13 @@ def target_fn(upstream_df: pd.DataFrame) -> pd.DataFrame: def col_1() -> pd.Series: return pd.Series([0, 3, 5, 7]) - decorator = with_columns(col_1, pass_dataframe_as=["upstream_df"], select=["col_1"]) - merge_node = decorator._create_merge_node(upstream_node="upstream_df", node_name="merge_node") + dummy_node = node.Node.from_fn(target_fn) + + decorator = with_columns(col_1, pass_dataframe_as="upstream_df", select=["col_1"]) + injectable_params = NodeInjector.find_injectable_params([dummy_node]) + decorator.is_async = inspect.iscoroutinefunction(target_fn) + _ = decorator._get_inital_nodes(fn=target_fn, params=injectable_params) + merge_node = decorator.create_merge_node(upstream_node="upstream_df", node_name="merge_node") output_df = merge_node.callable(upstream_df=dummy_df(), col_1=col_1()) assert merge_node.name == "merge_node" diff --git a/plugin_tests/h_polars/test_with_columns.py b/plugin_tests/h_polars/test_with_columns.py index dc4055cd4..d9fde4be7 100644 --- a/plugin_tests/h_polars/test_with_columns.py +++ b/plugin_tests/h_polars/test_with_columns.py @@ -1,11 +1,208 @@ +import inspect + import polars as pl +import pytest from polars.testing import assert_frame_equal -from hamilton import driver +from hamilton import driver, node +from hamilton.function_modifiers.base import NodeInjector +from hamilton.plugins.h_polars import with_columns from .resources import with_columns_end_to_end, with_columns_end_to_end_lazy +def dummy_fn_with_columns(col_1: pl.Series) -> pl.Series: + return col_1 + 100 + + +def test_create_column_nodes_pass_dataframe(): + def target_fn(some_var: int, upstream_df: pl.DataFrame) -> pl.DataFrame: + return upstream_df + + dummy_node = node.Node.from_fn(target_fn) + + decorator = with_columns( + dummy_fn_with_columns, pass_dataframe_as="upstream_df", select=["dummy_fn_with_columns"] + ) + injectable_params = NodeInjector.find_injectable_params([dummy_node]) + inject_parameter, initial_nodes = decorator._get_inital_nodes( + fn=target_fn, params=injectable_params + ) + + assert inject_parameter == "upstream_df" + assert len(initial_nodes) == 0 + + +def test_create_column_nodes_extract_single_columns(): + def dummy_df() -> pl.DataFrame: + return pl.DataFrame({"col_1": [1, 2, 3, 4], "col_2": [11, 12, 13, 14]}) + + def target_fn(upstream_df: pl.DataFrame) -> pl.DataFrame: + return upstream_df + + dummy_node = node.Node.from_fn(target_fn) + + decorator = with_columns( + dummy_fn_with_columns, columns_to_pass=["col_1"], select=["dummy_fn_with_columns"] + ) + injectable_params = NodeInjector.find_injectable_params([dummy_node]) + decorator.is_async = inspect.iscoroutinefunction(target_fn) + inject_parameter, initial_nodes = decorator._get_inital_nodes( + fn=target_fn, params=injectable_params + ) + + assert inject_parameter == "upstream_df" + assert len(initial_nodes) == 1 + assert initial_nodes[0].name == "col_1" + assert initial_nodes[0].type == pl.Series + pl.testing.assert_series_equal( + initial_nodes[0].callable(upstream_df=dummy_df()), + pl.Series([1, 2, 3, 4]), + check_names=False, + ) + + +def test_create_column_nodes_extract_multiple_columns(): + def dummy_df() -> pl.DataFrame: + return pl.DataFrame({"col_1": [1, 2, 3, 4], "col_2": [11, 12, 13, 14]}) + + def target_fn(upstream_df: pl.DataFrame) -> pl.DataFrame: + return upstream_df + + dummy_node = node.Node.from_fn(target_fn) + + decorator = with_columns( + dummy_fn_with_columns, columns_to_pass=["col_1", "col_2"], select=["dummy_fn_with_columns"] + ) + injectable_params = NodeInjector.find_injectable_params([dummy_node]) + decorator.is_async = inspect.iscoroutinefunction(target_fn) + inject_parameter, initial_nodes = decorator._get_inital_nodes( + fn=target_fn, params=injectable_params + ) + + assert inject_parameter == "upstream_df" + assert len(initial_nodes) == 2 + assert initial_nodes[0].name == "col_1" + assert initial_nodes[1].name == "col_2" + assert initial_nodes[0].type == pl.Series + assert initial_nodes[1].type == pl.Series + pl.testing.assert_series_equal( + initial_nodes[0].callable(upstream_df=dummy_df()), + pl.Series([1, 2, 3, 4]), + check_names=False, + ) + pl.testing.assert_series_equal( + initial_nodes[1].callable(upstream_df=dummy_df()), + pl.Series([11, 12, 13, 14]), + check_names=False, + ) + + +def test_no_matching_select_column_error(): + def target_fn(upstream_df: pl.DataFrame) -> pl.DataFrame: + return upstream_df + + dummy_node = node.Node.from_fn(target_fn) + select = "wrong_column" + + decorator = with_columns( + dummy_fn_with_columns, columns_to_pass=["col_1", "col_2"], select=select + ) + injectable_params = NodeInjector.find_injectable_params([dummy_node]) + + error_message = ( + f"No nodes found upstream from select columns: {select} for function: " + f"{target_fn.__qualname__}" + ) + with pytest.raises(ValueError) as e: + decorator.inject_nodes(injectable_params, {}, fn=target_fn) + + assert str(e.value) == error_message + + +def test_append_into_original_df(): + def dummy_df() -> pl.DataFrame: + return pl.DataFrame({"col_1": [1, 2, 3, 4], "col_2": [11, 12, 13, 14]}) + + def target_fn(upstream_df: pl.DataFrame) -> pl.DataFrame: + return upstream_df + + dummy_node = node.Node.from_fn(target_fn) + + decorator = with_columns( + dummy_fn_with_columns, columns_to_pass=["col_1", "col_2"], select=["dummy_fn_with_columns"] + ) + injectable_params = NodeInjector.find_injectable_params([dummy_node]) + decorator.is_async = inspect.iscoroutinefunction(target_fn) + _ = decorator._get_inital_nodes(fn=target_fn, params=injectable_params) + merge_node = decorator.create_merge_node(upstream_node="upstream_df", node_name="merge_node") + + output_df = merge_node.callable( + upstream_df=dummy_df(), + dummy_fn_with_columns=dummy_fn_with_columns(col_1=pl.Series([1, 2, 3, 4])), + ) + assert merge_node.name == "merge_node" + assert merge_node.type == pl.DataFrame + + pl.testing.assert_series_equal(output_df["col_1"], pl.Series([1, 2, 3, 4]), check_names=False) + pl.testing.assert_series_equal( + output_df["col_2"], pl.Series([11, 12, 13, 14]), check_names=False + ) + pl.testing.assert_series_equal( + output_df["dummy_fn_with_columns"], pl.Series([101, 102, 103, 104]), check_names=False + ) + + +def test_override_original_column_in_df(): + def dummy_df() -> pl.DataFrame: + return pl.DataFrame({"col_1": [1, 2, 3, 4], "col_2": [11, 12, 13, 14]}) + + def target_fn(upstream_df: pl.DataFrame) -> pl.DataFrame: + return upstream_df + + def col_1() -> pl.Series: + return pl.col("col_1") * 100 + + dummy_node = node.Node.from_fn(target_fn) + + decorator = with_columns(col_1, pass_dataframe_as="upstream_df", select=["col_1"]) + injectable_params = NodeInjector.find_injectable_params([dummy_node]) + _ = decorator._get_inital_nodes(fn=target_fn, params=injectable_params) + + merge_node = decorator.create_merge_node(upstream_node="upstream_df", node_name="merge_node") + + output_df = merge_node.callable(upstream_df=dummy_df(), col_1=col_1()) + assert merge_node.name == "merge_node" + assert merge_node.type == pl.DataFrame + + pl.testing.assert_series_equal( + output_df["col_1"], pl.Series([100, 200, 300, 400]), check_names=False + ) + pl.testing.assert_series_equal( + output_df["col_2"], pl.Series([11, 12, 13, 14]), check_names=False + ) + + +def test_assign_custom_namespace_with_columns(): + def target_fn(upstream_df: pl.DataFrame) -> pl.DataFrame: + return upstream_df + + dummy_node = node.Node.from_fn(target_fn) + decorator = with_columns( + dummy_fn_with_columns, + columns_to_pass=["col_1", "col_2"], + select=["dummy_fn_with_columns"], + namespace="dummy_namespace", + ) + nodes_ = decorator.transform_dag([dummy_node], {}, target_fn) + + assert nodes_[0].name == "target_fn" + assert nodes_[1].name == "dummy_namespace.col_1" + assert nodes_[2].name == "dummy_namespace.col_2" + assert nodes_[3].name == "dummy_namespace.dummy_fn_with_columns" + assert nodes_[4].name == "dummy_namespace.__append" + + def test_end_to_end_with_columns_automatic_extract(): config_5 = { "factor": 5, diff --git a/tests/function_modifiers/test_recursive.py b/tests/function_modifiers/test_recursive.py index 7b3ae7a91..a16118109 100644 --- a/tests/function_modifiers/test_recursive.py +++ b/tests/function_modifiers/test_recursive.py @@ -539,3 +539,81 @@ def test_recursive_validate_config_inputs_happy(config, inputs): def test_recursive_validate_config_inputs_sad(config, inputs): with pytest.raises(InvalidDecoratorException): _validate_config_inputs(config, inputs) + + +from pandas import DataFrame as PandasDataFrame +from polars import DataFrame as PolarsDataFrame +from polars import LazyFrame as PolarsLazyFrame + +from hamilton import node +from hamilton.function_modifiers.recursive import with_columns_factory + + +class TestWithColumnsFactory(with_columns_factory): + def create_merge_node(self, upstream_node, node_name): + pass + + +def dummy_fn_with_columns(col_1: int) -> int: + return col_1 + 100 + + +def test_detect_duplicate_nodes(): + node_a = node.Node.from_fn(dummy_fn_with_columns, name="a") + node_b = node.Node.from_fn(dummy_fn_with_columns, name="a") + node_c = node.Node.from_fn(dummy_fn_with_columns, name="c") + + if not with_columns_factory._check_for_duplicates([node_a, node_b, node_c]): + raise (AssertionError) + + if with_columns_factory._check_for_duplicates([node_a, node_c]): + raise (AssertionError) + + +@pytest.mark.parametrize( + "dataframe", + [ + (PandasDataFrame), + ([PolarsLazyFrame, PolarsDataFrame]), + ], +) +def test_init_requirements(dataframe): + # Missing dataframe_type and select + with pytest.raises(ValueError): + TestWithColumnsFactory(dummy_fn_with_columns) + + # Wrong dataframe_type and missing select + with pytest.raises(InvalidDecoratorException): + TestWithColumnsFactory(dummy_fn_with_columns, dataframe_types=int) + + # Valid dataframe_type and missing select + with pytest.raises(InvalidDecoratorException): + TestWithColumnsFactory( + dummy_fn_with_columns, + dataframe_types=int, + columns_to_pass="some_col", + select="some_col", + ) + + # Valid dataframe_type and clashing select with pass_dataframe_as + with pytest.raises(ValueError): + TestWithColumnsFactory( + dummy_fn_with_columns, + dataframe_types=dataframe, + select="some_col", + columns_to_pass="some_col", + pass_dataframe_as="some_df", + ) + + valid_config = TestWithColumnsFactory( # noqa:F841 + dummy_fn_with_columns, + dataframe_types=dataframe, + columns_to_pass="some_col", + select="some_col", + ) + valid_config = TestWithColumnsFactory( # noqa:F841 + dummy_fn_with_columns, + dataframe_types=dataframe, + pass_dataframe_as="some_df", + select="some_col", + )