Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

with_columns for Pandas #1209

Merged
merged 3 commits into from
Nov 6, 2024

Conversation

jernejfrank
Copy link
Contributor

@jernejfrank jernejfrank commented Oct 30, 2024

Creating the option to use with_columns on Pandas data frames.

Partially addressing #1158.

Changes

  • added new decorator

How I tested this

  • unit tests
  • e2e
  • example

Notes

Known issues:

  • config.when does not work for individual nodes within the with_columns subtag.

The other thing is, at the moment, this extracts relevant columns from the df and then appends them. This would then be an eager execution.

Polars and PySpark can do with_columns lazy and use their optimizers under the hood. Pandas to my knowledge doesn't have that, but there are two ways how we could mimic laziness:

  1. using eval
df.eval('new_column = f(previous_columns)'))
  1. usingassign
df.assign(new_column = lambda x: x(previous_columns))

My reasoning so far is, that anybody who is concerned with latency will not be using Pandas in the first place so this seems like an overkill.

Checklist

  • PR has an informative and human-readable title (this will be pulled into the release notes)
  • Changes are limited to a single goal (no scope creep)
  • Code passed the pre-commit check & code is left cleaner/nicer than when first encountered.
  • Any change in functionality is tested
  • New functions are documented (with a description, list of inputs, and expected output)
  • Placeholder code is flagged / future TODOs are captured in comments
  • Project documentation has been updated if adding/changing functionality.

Copy link
Contributor

@ellipsis-dev ellipsis-dev bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ Changes requested. Reviewed everything up to 0c41502 in 50 seconds

More details
  • Looked at 755 lines of code in 7 files
  • Skipped 2 files when reviewing.
  • Skipped posting 4 drafted comments based on config settings.
1. tests/function_modifiers/test_recursive.py:778
  • Draft comment:
    Typo in function name. Consider renaming substract_1_from_2 to subtract_1_from_2 for clarity and correctness. This applies to other occurrences as well.
  • Reason this comment was not posted:
    Marked as duplicate.
2. hamilton/function_modifiers/recursive.py:778
  • Draft comment:
    Typo in function name. Consider renaming substract_1_from_2 to subtract_1_from_2 for clarity and correctness. This applies to other occurrences as well.
  • Reason this comment was not posted:
    Marked as duplicate.
3. tests/resources/with_columns.py:10
  • Draft comment:
    Typo in function name substract_1_from_2. It should be subtract_1_from_2. This typo is present in multiple places, including the function definition and its usage in decorators.
  • Reason this comment was not posted:
    Marked as duplicate.
4. tests/function_modifiers/test_recursive.py:778
  • Draft comment:
    Typo in function name substract_1_from_2. It should be subtract_1_from_2. This typo is present in multiple places, including the function definition and its usage in decorators.
  • Reason this comment was not posted:
    Marked as duplicate.

Workflow ID: wflow_tdIW9EERoU9TNVSK


Want Ellipsis to fix these issues? Tag @ellipsis-dev in a comment. You can customize Ellipsis with 👍 / 👎 feedback, review rules, user-specific overrides, quiet mode, and more.

return pd.DataFrame({"col_1": [1, 2, 3, 4], "col_2": [11, 12, 13, 14], "col_3": [1, 1, 1, 1]})


def substract_1_from_2(col_1: pd.Series, col_2: pd.Series) -> pd.Series:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in function name. Consider renaming substract_1_from_2 to subtract_1_from_2 for clarity and correctness.

Suggested change
def substract_1_from_2(col_1: pd.Series, col_2: pd.Series) -> pd.Series:
def subtract_1_from_2(col_1: pd.Series, col_2: pd.Series) -> pd.Series:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import pytest

from hamilton import ad_hoc_utils, graph
from hamilton import ad_hoc_utils, driver, graph, node
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use

from hamilton import ..., node as hamilton_node

because the name node is overriden elsewhere (e.g., line 187)

Copy link
Collaborator

@elijahbenizzy elijahbenizzy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good! Main comments about how to do the assign-style operations. Definitely on the right track!

@@ -88,6 +88,7 @@

subdag = recursive.subdag
parameterized_subdag = recursive.parameterized_subdag
with_columns = recursive.with_columns
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably live in a pandas extension -- while pandas has, historically, been the dependency for Hamilton, we want to move it out (and this is very pandas-specific logic)



# TODO: Copied here from h_spark, needs refactoring
def prune_nodes(nodes: List[node.Node], select: Optional[List[str]] = None) -> List[node.Node]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, would make this live here and have pandas/spark refer to it


@staticmethod
def _check_for_duplicates(nodes_: List[node.Node]) -> bool:
"Ensures that we don't run into name clashing of columns and group operations."
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth expanding in the docstrings a bit more about what this means -- I'm getting a little lost int he details (E.G. is this a user error? What's the remediation?) More for internal docs.

*load_from: Union[Callable, ModuleType],
columns_to_pass: Union[str, List[str]] = None,
pass_dataframe_as: str = None,
select: Union[str, List[str]] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think we should keep this consistenct with the h_spark with_columns -- maybe we add a single string ability for that? Or we remove it for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will make it consistent with spark -- in case this becomes a pain point can always add it later on.

:param namespace: The namespace of the nodes, so they don't clash with the global namespace
and so this can be reused. If its left out, there will be no namespace (in which case you'll want
to be careful about repeating it/reusing the nodes in other parts of the DAG.)
:param config_required: the list of config keys that are required to resolve any functions. Pass in None\
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should solve the problem of config wiring through...

self.namespace = namespace
# This never gets used within the class, but pyspark had it so keeping it here in case we
# need to access it from somewhere outside
self.upstream_dependency = pd.DataFrame
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remote this for now? Should also be able to remove from h_spark -- it's an artifact and was left in erroneously.

out = []
for column in self.initial_schema:

def extractor_fn(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooh we should be able to share logic with extract_columns... Might be good to leave this as a TODO note

else:
# If we don't have a specified dataframe we assume it's the first argument
sig = inspect.signature(fn)
inject_parameter = list(sig.parameters.values())[0].name
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SHould be able to validate the type for the parameter as well?

def new_callable(**kwargs) -> Any:
df = kwargs[upstream_node]
for column in self.select:
df[column] = kwargs[column]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, we're mutating this... We should be careful as that breaks assumptions.

The nice thing about pandas is that the dataframes shouldn't mess with memory if you create new ones, the series are what takes up the memory. So we should be able to continually modify it by creating new ones (I think...)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth using assign or something like that... Looking here (you mentioned similar things in your comments on the code): https://stackoverflow.com/questions/72291290/how-to-create-new-column-dynamically-in-pandas-like-we-do-in-pyspark-withcolumn

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the way Narwhals implements the Polars .with_columns() for other backends can also be informative.

Copy link
Contributor Author

@jernejfrank jernejfrank Nov 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked a bit deeper:

  1. assign does a copy of the data frame. (It should be shallow unless the flag is set to copy="deep". From this thread my impression is that assign was created for chaining operations together and leaving the original data frame intact.
  2. Narwhals uses concat(dfs, axis=1, **extra_kwargs) under the hood and this does have a warning: "concat() makes a full copy of the data, and iteratively reusing concat() can create unnecessary copies. Collect all DataFrame or Series objects in a list before using concat()." However, for our use case I guess we can do this only once so should also be ok.

And tested it -- seems like concat is the way to go in terms of performance:

[EDIT] Concat is annoying because you would need to filter out individual columns that have changed / are provided as selected by the user and safeguard since concat would just duplicate them in the data frame. .assign() seems to do this automatically (probably where the discrepancy is coming from).

%timeit pd.concat([df:(10000, 5000), cols:2000], axis=1)
280 ms ± 22.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit df:(10000, 5000).assign(**cols:2000)
4.44 s ± 765 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%memit pd.concat([df:(10000, 5000), cols:2000], axis=1)
peak memory: 2107.36 MiB, increment: 0.20 MiB

%memit df:(10000, 5000).assign(**cols:2000)
peak memory: 2525.92 MiB, increment: 37.08 MiB
%load_ext memory_profiler
import numpy as np
import pandas as pd
import warnings  
warnings.simplefilter(action='ignore', category=pd.errors.PerformanceWarning)

pd_size = 10000
df = pd.DataFrame(np.random.randint(0,pd_size,size=(pd_size, pd_size//2)))#, columns=list('ABCD'))

cols = []
for i in range(pd_size//10):
    cols.append(np.random.rand(pd_size))

concat_cols = [pd.Series(col) for col in cols]
assign_cols = {f"{pd_size+i+1}":col for i,col in enumerate(cols)}


print(f'\n%timeit pd.concat([df:{df.shape}, cols:{len(concat_cols)}], axis=1)')
temp_df = [df.copy()]
temp_df.extend(concat_cols)
%timeit l=pd.concat(temp_df, axis=1)

print(f'\n%timeit df:{df.shape}.assign(**cols:{len(assign_cols)})')
temp_df = df.copy()
%timeit temp_df.assign(**assign_cols)


print(f'\n%memit pd.concat([df:{df.shape}, cols:{len(concat_cols)}], axis=1)')
temp_df = [df.copy()]
temp_df.extend(concat_cols)
%memit l=pd.concat(temp_df, axis=1)

print(f'\n%memit df:{df.shape}.assign(**cols:{len(assign_cols)})')
temp_df = df.copy()
%memit temp_df.assign(**assign_cols)

pd.testing.assert_frame_equal(result, expected_df)


def test_end_to_end_with_columns_pass_dataframe():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other tests you may want to consider

  1. Inputs from outside the subdag
  2. Inputs from external stuff
  3. Config

@jernejfrank jernejfrank force-pushed the feat/with_columns branch 3 times, most recently from b83f57e to fcf911f Compare November 3, 2024 23:11
Copy link
Collaborator

@elijahbenizzy elijahbenizzy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

THis looks good -- let's rebase, I'll take one more look tomorrow morning, then we can merge and release!

This functionlity will be shared with pandas and polars, putting it
central.
Exteded the with_columns to be used on pandas dataframe.

Pandas does not have a native implementation of with_columns. This
builds one by using the existing extract_columns decorator to create
node(s) for each specified column(s) in the dataframe, then a subdag
gets build the usual way and at the end selected end nodes get appended
to a copy of the original dataframe using pandas.assing -- which creates
a shallow copy of the original dataframe. In case columns with the same name
are selected as end nodes it overrides the existing columns.
@elijahbenizzy elijahbenizzy merged commit 5f3ac72 into DAGWorks-Inc:main Nov 6, 2024
24 checks passed
@zilto zilto mentioned this pull request Nov 7, 2024
@jernejfrank jernejfrank deleted the feat/with_columns branch November 12, 2024 12:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants