-
Notifications
You must be signed in to change notification settings - Fork 131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
with_columns for Pandas #1209
with_columns for Pandas #1209
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Changes requested. Reviewed everything up to 0c41502 in 50 seconds
More details
- Looked at
755
lines of code in7
files - Skipped
2
files when reviewing. - Skipped posting
4
drafted comments based on config settings.
1. tests/function_modifiers/test_recursive.py:778
- Draft comment:
Typo in function name. Consider renamingsubstract_1_from_2
tosubtract_1_from_2
for clarity and correctness. This applies to other occurrences as well. - Reason this comment was not posted:
Marked as duplicate.
2. hamilton/function_modifiers/recursive.py:778
- Draft comment:
Typo in function name. Consider renamingsubstract_1_from_2
tosubtract_1_from_2
for clarity and correctness. This applies to other occurrences as well. - Reason this comment was not posted:
Marked as duplicate.
3. tests/resources/with_columns.py:10
- Draft comment:
Typo in function namesubstract_1_from_2
. It should besubtract_1_from_2
. This typo is present in multiple places, including the function definition and its usage in decorators. - Reason this comment was not posted:
Marked as duplicate.
4. tests/function_modifiers/test_recursive.py:778
- Draft comment:
Typo in function namesubstract_1_from_2
. It should besubtract_1_from_2
. This typo is present in multiple places, including the function definition and its usage in decorators. - Reason this comment was not posted:
Marked as duplicate.
Workflow ID: wflow_tdIW9EERoU9TNVSK
Want Ellipsis to fix these issues? Tag @ellipsis-dev
in a comment. You can customize Ellipsis with 👍 / 👎 feedback, review rules, user-specific overrides, quiet
mode, and more.
tests/resources/with_columns.py
Outdated
return pd.DataFrame({"col_1": [1, 2, 3, 4], "col_2": [11, 12, 13, 14], "col_3": [1, 1, 1, 1]}) | ||
|
||
|
||
def substract_1_from_2(col_1: pd.Series, col_2: pd.Series) -> pd.Series: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo in function name. Consider renaming substract_1_from_2
to subtract_1_from_2
for clarity and correctness.
def substract_1_from_2(col_1: pd.Series, col_2: pd.Series) -> pd.Series: | |
def subtract_1_from_2(col_1: pd.Series, col_2: pd.Series) -> pd.Series: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import pytest | ||
|
||
from hamilton import ad_hoc_utils, graph | ||
from hamilton import ad_hoc_utils, driver, graph, node |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could use
from hamilton import ..., node as hamilton_node
because the name node
is overriden elsewhere (e.g., line 187)
0c41502
to
4691398
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good! Main comments about how to do the assign-style operations. Definitely on the right track!
@@ -88,6 +88,7 @@ | |||
|
|||
subdag = recursive.subdag | |||
parameterized_subdag = recursive.parameterized_subdag | |||
with_columns = recursive.with_columns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably live in a pandas extension -- while pandas has, historically, been the dependency for Hamilton, we want to move it out (and this is very pandas-specific logic)
|
||
|
||
# TODO: Copied here from h_spark, needs refactoring | ||
def prune_nodes(nodes: List[node.Node], select: Optional[List[str]] = None) -> List[node.Node]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, would make this live here and have pandas/spark refer to it
|
||
@staticmethod | ||
def _check_for_duplicates(nodes_: List[node.Node]) -> bool: | ||
"Ensures that we don't run into name clashing of columns and group operations." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be worth expanding in the docstrings a bit more about what this means -- I'm getting a little lost int he details (E.G. is this a user error? What's the remediation?) More for internal docs.
*load_from: Union[Callable, ModuleType], | ||
columns_to_pass: Union[str, List[str]] = None, | ||
pass_dataframe_as: str = None, | ||
select: Union[str, List[str]] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I think we should keep this consistenct with the h_spark
with_columns
-- maybe we add a single string ability for that? Or we remove it for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will make it consistent with spark -- in case this becomes a pain point can always add it later on.
:param namespace: The namespace of the nodes, so they don't clash with the global namespace | ||
and so this can be reused. If its left out, there will be no namespace (in which case you'll want | ||
to be careful about repeating it/reusing the nodes in other parts of the DAG.) | ||
:param config_required: the list of config keys that are required to resolve any functions. Pass in None\ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should solve the problem of config wiring through...
self.namespace = namespace | ||
# This never gets used within the class, but pyspark had it so keeping it here in case we | ||
# need to access it from somewhere outside | ||
self.upstream_dependency = pd.DataFrame |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remote this for now? Should also be able to remove from h_spark -- it's an artifact and was left in erroneously.
out = [] | ||
for column in self.initial_schema: | ||
|
||
def extractor_fn( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ooh we should be able to share logic with extract_columns
... Might be good to leave this as a TODO note
else: | ||
# If we don't have a specified dataframe we assume it's the first argument | ||
sig = inspect.signature(fn) | ||
inject_parameter = list(sig.parameters.values())[0].name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SHould be able to validate the type for the parameter as well?
def new_callable(**kwargs) -> Any: | ||
df = kwargs[upstream_node] | ||
for column in self.select: | ||
df[column] = kwargs[column] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, we're mutating this... We should be careful as that breaks assumptions.
The nice thing about pandas is that the dataframes shouldn't mess with memory if you create new ones, the series are what takes up the memory. So we should be able to continually modify it by creating new ones (I think...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be worth using assign
or something like that... Looking here (you mentioned similar things in your comments on the code): https://stackoverflow.com/questions/72291290/how-to-create-new-column-dynamically-in-pandas-like-we-do-in-pyspark-withcolumn
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at the way Narwhals implements the Polars .with_columns()
for other backends can also be informative.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked a bit deeper:
assign
does a copy of the data frame. (It should be shallow unless the flag is set tocopy="deep"
. From this thread my impression is that assign was created for chaining operations together and leaving the original data frame intact.- Narwhals uses
concat(dfs, axis=1, **extra_kwargs)
under the hood and this does have a warning: "concat() makes a full copy of the data, and iteratively reusing concat() can create unnecessary copies. Collect all DataFrame or Series objects in a list before using concat()." However, for our use case I guess we can do this only once so should also be ok.
And tested it -- seems like concat
is the way to go in terms of performance:
[EDIT] Concat is annoying because you would need to filter out individual columns that have changed / are provided as selected
by the user and safeguard since concat
would just duplicate them in the data frame. .assign()
seems to do this automatically (probably where the discrepancy is coming from).
%timeit pd.concat([df:(10000, 5000), cols:2000], axis=1)
280 ms ± 22.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit df:(10000, 5000).assign(**cols:2000)
4.44 s ± 765 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%memit pd.concat([df:(10000, 5000), cols:2000], axis=1)
peak memory: 2107.36 MiB, increment: 0.20 MiB
%memit df:(10000, 5000).assign(**cols:2000)
peak memory: 2525.92 MiB, increment: 37.08 MiB
%load_ext memory_profiler
import numpy as np
import pandas as pd
import warnings
warnings.simplefilter(action='ignore', category=pd.errors.PerformanceWarning)
pd_size = 10000
df = pd.DataFrame(np.random.randint(0,pd_size,size=(pd_size, pd_size//2)))#, columns=list('ABCD'))
cols = []
for i in range(pd_size//10):
cols.append(np.random.rand(pd_size))
concat_cols = [pd.Series(col) for col in cols]
assign_cols = {f"{pd_size+i+1}":col for i,col in enumerate(cols)}
print(f'\n%timeit pd.concat([df:{df.shape}, cols:{len(concat_cols)}], axis=1)')
temp_df = [df.copy()]
temp_df.extend(concat_cols)
%timeit l=pd.concat(temp_df, axis=1)
print(f'\n%timeit df:{df.shape}.assign(**cols:{len(assign_cols)})')
temp_df = df.copy()
%timeit temp_df.assign(**assign_cols)
print(f'\n%memit pd.concat([df:{df.shape}, cols:{len(concat_cols)}], axis=1)')
temp_df = [df.copy()]
temp_df.extend(concat_cols)
%memit l=pd.concat(temp_df, axis=1)
print(f'\n%memit df:{df.shape}.assign(**cols:{len(assign_cols)})')
temp_df = df.copy()
%memit temp_df.assign(**assign_cols)
pd.testing.assert_frame_equal(result, expected_df) | ||
|
||
|
||
def test_end_to_end_with_columns_pass_dataframe(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other tests you may want to consider
- Inputs from outside the subdag
- Inputs from external stuff
- Config
b83f57e
to
fcf911f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
THis looks good -- let's rebase, I'll take one more look tomorrow morning, then we can merge and release!
fcf911f
to
6d3ff4a
Compare
This functionlity will be shared with pandas and polars, putting it central.
Exteded the with_columns to be used on pandas dataframe. Pandas does not have a native implementation of with_columns. This builds one by using the existing extract_columns decorator to create node(s) for each specified column(s) in the dataframe, then a subdag gets build the usual way and at the end selected end nodes get appended to a copy of the original dataframe using pandas.assing -- which creates a shallow copy of the original dataframe. In case columns with the same name are selected as end nodes it overrides the existing columns.
6d3ff4a
to
af0a0ed
Compare
Creating the option to use
with_columns
on Pandas data frames.Partially addressing #1158.
Changes
How I tested this
Notes
Known issues:
with_columns
subtag.The other thing is, at the moment, this extracts relevant columns from the df and then appends them. This would then be an eager execution.
Polars and PySpark can do
with_columns
lazy and use their optimizers under the hood. Pandas to my knowledge doesn't have that, but there are two ways how we could mimic laziness:eval
assign
My reasoning so far is, that anybody who is concerned with latency will not be using Pandas in the first place so this seems like an overkill.
Checklist