Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/polars with columns , async with_columns pandas #1234

Merged
merged 6 commits into from
Nov 27, 2024

Conversation

jernejfrank
Copy link
Contributor

@jernejfrank jernejfrank commented Nov 18, 2024

Please let me know if scope creep too big and I can cut some things out into a new PR.

Changes

  • extract_columns for polars LazyFrame (converts columns into Expressions that just need to be collected at the end)
  • with_columns for polars (supported eager and lazy execution -- that's why the above change)
  • refactored with_columns by using registry single dispatch method and enabled it for Pandas / Polars for now.
  • async with_columns for pandas
  • target=None selects all viable sink nodes and appends to data frame

How I tested this

  • unit
  • e2e
  • examples

Notes

The one thing I didn't touch is the spark extension (zero experience with pyspark), because the implementation so far is different / not sure if we can use extract_columns as straightforward as with pandas/polars, but happy to tackle that as well in case its possible.

Checklist

  • PR has an informative and human-readable title (this will be pulled into the release notes)
  • Changes are limited to a single goal (no scope creep)
  • Code passed the pre-commit check & code is left cleaner/nicer than when first encountered.
  • Any change in functionality is tested
  • New functions are documented (with a description, list of inputs, and expected output)
  • Placeholder code is flagged / future TODOs are captured in comments
  • Project documentation has been updated if adding/changing functionality.

Copy link
Contributor

@ellipsis-dev ellipsis-dev bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ Changes requested. Reviewed everything up to 1818302 in 1 minute and 22 seconds

More details
  • Looked at 1618 lines of code in 16 files
  • Skipped 5 files when reviewing.
  • Skipped posting 9 drafted comments based on config settings.
1. docs/reference/decorators/with_columns.rst:19
  • Draft comment:
    Consider adding more details specific to Polars, such as its support for both eager and lazy execution modes, to differentiate it from the Pandas with_columns decorator.
  • Reason this comment was not posted:
    Decided after close inspection that this draft comment was likely wrong and/or not actionable:
    The comment is suggesting an enhancement to the documentation by providing more specific details about Polars. However, the comment does not point out a necessary code change or a clear issue with the current documentation. It seems to be more of a suggestion for improvement rather than a required change.
    The comment does not indicate a clear issue with the current documentation that needs to be addressed. It is more of a suggestion for additional information, which may not be necessary for the current scope of the documentation.
    While the suggestion could enhance the documentation, it is not addressing a critical issue or a necessary change. The current documentation already provides a basic description of the with_columns decorator for Polars.
    The comment should be deleted as it does not point out a necessary change or issue with the current documentation. It is more of a suggestion for additional information.
2. hamilton/function_modifiers/recursive.py:680
  • Draft comment:
    Consider including the list of supported dataframe types directly in the error message for clarity.
  • Reason this comment was not posted:
    Confidence changes required: 50%
    The with_columns_factory class in recursive.py is designed to handle different dataframe types, but the error message in validate_dataframe_type could be more informative by listing the supported types directly.
3. hamilton/plugins/h_polars.py:225
  • Draft comment:
    Consider adding support for async functions in the with_columns decorator for Polars, similar to the implementation in the Pandas version.
  • Reason this comment was not posted:
    Marked as duplicate.
4. plugin_tests/h_polars/test_with_columns.py:18
  • Draft comment:
    Consider adding tests for lazy execution mode in Polars to ensure that the with_columns decorator works correctly in both eager and lazy modes.
  • Reason this comment was not posted:
    Decided after close inspection that this draft comment was likely wrong and/or not actionable:
    The comment seems redundant because the file already includes tests for lazy execution mode. The presence of these tests indicates that the functionality is already being verified.
    I might be missing the specific context or intention behind the comment. Perhaps the comment is suggesting a different aspect of lazy execution that is not covered.
    The existing tests seem comprehensive for lazy execution mode, covering both automatic extraction and passing of dataframes.
    The comment is redundant as the file already contains tests for lazy execution mode. It should be removed.
5. examples/polars/with_columns/README:1
  • Draft comment:
    The title mentions Pandas, but this example is for Polars. Consider updating the title to reflect that this is a Polars example.
  • Reason this comment was not posted:
    Marked as duplicate.
6. hamilton/plugins/h_pandas.py:23
  • Draft comment:
    The with_columns class has similar docstrings and constructor parameters as the one in h_polars.py. Consider refactoring to avoid repetition and adhere to the DRY principle.
  • Reason this comment was not posted:
    Marked as duplicate.
7. hamilton/plugins/h_polars.py:74
  • Draft comment:
    The with_columns class has similar docstrings and constructor parameters as the one in h_pandas.py. Consider refactoring to avoid repetition and adhere to the DRY principle.
  • Reason this comment was not posted:
    Marked as duplicate.
8. examples/polars/with_columns/my_functions.py:14
  • Draft comment:
    This function duplicates existing functionality for calculating a rolling mean with a window size of 3. Consider extending the existing code to handle different scaling factors.

  • function rolling_mean (my_functions_lazy.py)

  • function rolling_mean (my_functions.py)

  • Reason this comment was not posted:
    Comment looked like it was already resolved.

9. examples/polars/with_columns/my_functions.py:22
  • Draft comment:
    This function duplicates existing functionality in my_functions_lazy.py. Consider using the existing function to avoid duplication.

  • function avg_3wk_spend__thousands (my_functions_lazy.py)

  • Reason this comment was not posted:
    Marked as duplicate.

Workflow ID: wflow_IDbe3sT5aVexYnLm


Want Ellipsis to fix these issues? Tag @ellipsis-dev in a comment. You can customize Ellipsis with 👍 / 👎 feedback, review rules, user-specific overrides, quiet mode, and more.

examples/polars/with_columns/README Outdated Show resolved Hide resolved
hamilton/function_modifiers/recursive.py Outdated Show resolved Hide resolved
@jernejfrank jernejfrank force-pushed the feat/polars_with_columns branch from 1818302 to c373b5d Compare November 18, 2024 09:59
Copy link
Collaborator

@zilto zilto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall every excited about this feature! All of my comments are related to the plugins system and I didn't check the specific logic of the function modifier.

I think for now, you can use the registry or abstract backend approach. A larger cleanup and unification of the plugin system is planned for Hamilton 2.0.

An open question is whether we should support narwhals. I would be in favor given polars is already a Hamilton plugin.

@@ -11,17 +14,21 @@
else:
from typing import NotRequired

from pandas import DataFrame as PandasDataFrame
from polars import DataFrame as PolarsDataFrame
from polars import LazyFrame as PolarsLazyFrame
Copy link
Collaborator

@zilto zilto Nov 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can depend on polars here. We a few options here:

  1. hamilton.registry. Older approach used by most extensions, but involves many moving pieces.
  2. hamilton.experimental.h_databackends. Newer, but less used. It powers fingerprinting for caching and schema tracking. I think it's good for internal applications.
  3. Use narwhals, which has 0 external dependencies (see comment Generic @with_columns dataframe support #1158). We should have native support for pandas since it's a Hamilton dependency and for Spark, since it's already implemented. Otherwise, implementing @with_columns using Narwhals could support backends: cuDF, Modin, pandas, polars, pyarrow, dask, ibis, duckdb, Vaex, Spark

Registry approach:

Here are the broad steps, you can use @extract_columns as a reference

  • The hamilton.plugins.pandas_extensions registers a module-level constants DATAFRAME_TYPE and COLUMN_TYPE
  • Loading the hamilton.registry, which stores types from loaded extensions under DF_TYPE_AND_COLUMN_TYPES
  • The hamilton.plugins.pandas_extensions also implements @singledispatch implementations of hamilton.registry.fill_with_scalar() and others
  • The decorator uses the single dispatch implementations via hamilton.registry.fill_with_scalar().

This way @extract_columns is always a defined operation, but it will fail at runtime when encountering a polars.DataFrame if there is no registered implementations for this type (or earlier with more checks). Implementations are decentralized, each one is under its own plugins.XX_extensions with relevant 3rd party imports.

databackends

You can use schema tracking as an example.

  1. Define abstract types in hamilton.experimental.h_databackends. The benefit is that you can use @singledispatch and isinstance()/issubclass() checks against these types without having to import 3rd party libraries.
  2. In hamilton.plugins.schema, you can define AbstractVaexColumn a single dispatch implementation for get_arrow_schema() without importing Vaex. If you need the package, you can import inside the singledispatch implementation because you know that matchin the AbstractVaexColumn type means Vaex is available in the current environment
  3. It has its own registry of abstract types available at hamilton.experimental.h_databackends.DATAFRAME_TYPES and ...h_databackends.COLUMN_TYPES

This allows centralized implementations definitions that might be easier to manage.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we should use the registry approach if possible to keep this consistent. E.G> look at the dataframe types we have registered and use those.

That said, we may want to add a with_columns registration that registers per dataframe type... As we don't support it. Databackends could solve it as well if it's simple but I'd worry about a bifurcation in approach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all the feedback @zilto!

Happy to go ahead with registry-approach since it seems a straightforward change / keeps it consistent with extract_columns that is used under the hood.

Could you confirm the following just to double-check that I understand correctly:
The change necessary is that we move away from a factory pattern with subclassing the different implementations from and changing the now abstract method create_merge_node into a @singledispatch method with the plugin extensions h_pandas and h_polars implementing it, the same as in extract_columns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Use narwhals, which has 0 external dependencies (see comment Generic @with_columns dataframe support #1158). We should have native support for pandas since it's a Hamilton dependency and for Spark, since it's already implemented. Otherwise, implementing @with_columns using Narwhals could support backends: cuDF, Modin, pandas, polars, pyarrow, dask, ibis, duckdb, Vaex, Spark

Regarding Narwhals, I haven't used it (yet), but from a quick look at the docs it looks like a great library. I am in favor of implementing it as a plugin, but I would be vary using it indirectly to implement internals since I don't think we should depend on them (you never know if / when they stop supporting it, API changes, etc.).

Copy link
Collaborator

@zilto zilto Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you confirm the following just to double-check that I understand correctly:
The change necessary is that we move away from a factory pattern with subclassing the different implementations from and changing the now abstract method create_merge_node into a @singledispatch method with the plugin extensions h_pandas and h_polars implementing it, the same as in extract_columns.

The immediate concern is avoiding polars import outside hamilton.plugins.

Thinking about "how would someone implement support for another library (e.g., Ibis)", I see two options to write this base class:

  1. A new implementation needs to subclass with_columns
  • The validate_dataframe_type should rely on hamilton.registry to avoid importing polars (see @extract_columns)
  • In ibis_extensions, with_columns is subclassed and create_merge_node is implemented
  1. (preferred) add create_merge_node() has a @singledispatch in hamilton.registry
  • In ibis_extensions, only the create_merge_node() needs to be implemented. The with_columns class leverages the registry for all implementations and won't require subclassing

Copy link
Collaborator

@elijahbenizzy elijahbenizzy Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create_merge_node I don't think is the right way to do this, but close. Perhaps the atomic operation is with_columns as @jernejfrank has it, right?

I might be wrong about this, but the problem here is that we have not really defined an algebra for doing the atomic operations for with_columns cross-dataframe and it's not actually clear to me it's the same. We might be close enough, but I worry that we're assuming a less leaky abstraction.

I like the way you have it with the subclassing -- only thing I'm concerned about is getting us into a bind with an overoptimized interface. The subclassing hides logic for this, self-contained in the factory class... We can always implement a specific version per dataframe if needed.

So yeah, value in unifying (as you have it), but also the possibility of over-unifying.

hamilton/function_modifiers/recursive.py Outdated Show resolved Hide resolved
hamilton/function_modifiers/recursive.py Outdated Show resolved Hide resolved
return inject_parameter, initial_nodes

@abc.abstractmethod
def create_merge_node(self, upstream_node: str, node_name: str) -> node.Node:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a hack, we could directly reuse ResultBuilder implementations, which have a .build_result() method (example)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this is appropriate here, the with_columns call could be somewhere in the middle of the DAG and the user doesn't want to collect a LazyFrame just yet for example.

This node is mostly meant to implement the different with_columns flavours -- for example in Polars its straightforward API matching since a with_columns method already exists, Pandas we needed to manually implement it ourselves.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's also a hamilton.plugins.h_polars_lazyframe with a result builder for lazy frames.

From my understanding, the create_merge_node() serves the same purpose as ResultBuilder.build_result(), which we have implementations for pandas, polars, polars lazy frame, vaex, ibis, and more (IIRC). Wanted to point it out to save you development time. I think there are two paths:

  • duplicate code: copy-paste logic from ResultBuilder into this create_merge_node method, but make sure to leave a # TODO noting that this is the same logic as result builders
  • refactor to extract logic: the .build_result() or create_merge_node() should eventually end up as a method under hamilton.registry with @singledispatch. Then, the same implementation could be called by @with_columns and ResultBuilder

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite. The ResultBuilder will take all the end nodes (final_vars) and try to build a dataframe out of it. There are two cases and an edge case:

  1. If the end node is a dataframe, it adopts that.
  2. If the end nodes are a list of series, it combines them into a dataframe.
  3. Edge case: In the case the end nodes are mixed the result is funky and definetly not something useful.

In our case we want to keep the original dataframe and append columns to it, so we need to insert all original columns + new columns into the ResultsBuilder. In the case a selected column already exists in the dataframe we need to prefer the selected over the existing.

Now, when the user initiates the with_columns decorator he passes in a subset of the existing columns / will most likely extract himself a subset of existing columns and not all. So, for us to generate the whole frame we will need to essentially extract all the columns in the background to be able to use the ResultBuilder logic.

It can be done, but I find the amount of gluing necessary to get what we need a bit much; I find we need a better reason to justify it than just code reusability. Mind you, we will still need to register the merge_node in the registry anyways and there we can levarage library-specific with_columns implementations / implementing it for libraries that don't have it (e.g. Pandas -- literally one line of code) in a more succinct way.

So all in all, I am not a fan of using ResultBuilder here unless there is a good argument about the overall design structure.

Copy link
Collaborator

@zilto zilto Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per my initial comment, code duplication is totally reasonable and desirable here, but we should add clear comments in @with_columns source code AND in the docstrings, docs, and user guides

Code reuse is not a goal in itself. Rather, it's about limiting the number of "concepts/abstraction" we introduce in Hamilton, which impacts downstream maintenance, documentation, user onboarding, etc. A frequent challenge mentioned with Hamilton is that 5 ways to do a single thing are presented to users, which can lead to heterogeneous coding practices within a team.

You highlighted good points that the Reference docs explain well:

  • ResultBuilder().build_result(): merge columns and scalars into a dataframe
  • @with_columns merge operation: subdag that assigns new columns to a dataframe, e.g., merge a dataframe and columns

But then likely scenarios:

User A has code using ResultBuilder. Workflow gets complex, they want to use @subdag. I tell them you can't pass a result builder to a @subdag. Then, @with_columns should solve the problem, but it doesn't have the same behavior for handling scalars (I'm not fond of it personally)

The main challenge to unify both is that ResultBuilder need backwards compatibility for handling scalar. However, as you're writing logic to handle merging of dataframes + columns, it seems a reasonable TODO to update ResultBuilder to handle dataframes + columns + scalars in a backwards compatible (or non-backwards in a future major release).

Then, we have a much simpler story for users "result builder merges X, Y, Z, and @with_columns is @subdag + ResultBuilder" without other caveats. Part of the documentation effort is also orienting users, making informative suggestion and comparisons, and writing in red bold letters what problem it solves / when it works / when it won't work.

Copy link
Collaborator

@elijahbenizzy elijahbenizzy Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment above -- I think we don't have the right general pattern yet, and unifying is more trouble than it's worth. Bringing in the ResultBuilder construct is, IMO, a little more complexity than needed. It's also a concat operation -- slightly different, and we don't always want to use the result builder to join.

If we keep with the factory class (which is cool), then we'll need a path to bypass it. E.G. to register a per-dataframe version, which I think this allows (we can define it at h_polars and h_pandas level if needed, or they can just point centrally).

Copy link
Collaborator

@zilto zilto Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a call with Jernej in the meantime yesterday. The main notes:

  • my intent with this particular thread was to point that out that currently @with_columns != @subdag + ResultBuilder. My impression from the docs and the code is that it should be the end goal.
  • the current implementations work, are useful, and should be merged. We just have to add mentions in docs and # TODO where appropriate. I think it's reasonable to think that ResultBuilder users will be @with_columns users. The current differences between the two should be clearly spelled
  • the current @with_columns doesn't fill scalars like the ResultBuilder. But also, Polars .with_columns() does fill scalars, but PySpark .withColumns() doesn't (see below)
import polars as pl

df = pl.DataFrame({"a": [10, 11], "b": ["aa", "bb"]})
df.with_columns(1)

abliteral
i64stri32
10"aa"1
11"bb"1

import pyspark
spark = pyspark.sql.SparkSession.builder.getOrCreate()
df = spark.createDataFrame([(10, 11), ("aa", "bb")], schema=["a", "b"])
df.withColumns({"c": 1}).show()

63 jcol = _create_column_from_name(col)
64 else:
---> 65 raise PySparkTypeError(
66 error_class="NOT_COLUMN_OR_STR",
67 message_parameters={"arg_name": "col", "arg_type": type(col).name},
68 )
69 return jcol
PySparkTypeError: [NOT_COLUMN_OR_STR] Argument col should be a Column or str, got int.

hamilton/plugins/h_pandas.py Show resolved Hide resolved
tests/function_modifiers/test_recursive.py Outdated Show resolved Hide resolved
@jernejfrank
Copy link
Contributor Author

I left with_columns in h_pandas for backwards compatibility and have something similar in h_polars to be consistent, but IMO we should deprecate h_pandas (super short lived plugin lol), remove it from h_polars and keep it central in recursive.py.

@jernejfrank jernejfrank force-pushed the feat/polars_with_columns branch from 1856d9c to 343b5af Compare November 19, 2024 13:35
f"No nodes found upstream from select columns: {self.select} for function: "
f"{fn.__qualname__}"
)
# In case no node is selected we append all possible nodes that have a column type matching
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding here another comment for future reference:

At the moment ResultBuilder will create a column out of a scalar, native Polars with_columns does the same.

In our with_columns implementation this results in an error since we enforce only matching series type can be appended into the dataframe.

I am torn how we want to handle this: on the one hand we should try to mimick the same behaviour as external libraries (since that is what users are used to / easier adoption with legacy code), on the other hand enabling this can lead to nasty bugs because it will fail silently.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To circle back, I thnk we're removing result builder to reduce complexity, right?

Copy link
Collaborator

@elijahbenizzy elijahbenizzy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, what you did here is super cool. This took me a while to wrap my head around.

I think I'm on board with the general approach, but have the following concerns (that I'm still grokikng). Thanks @zilto for pointing these out:

  1. The level of commonality between implementations -- should the goal be to make it easy to add a new implementation (E.G. by using the registry), or is that premature optimization?
  2. How it's packaged -- what's the import/usage? Should the user care that there's a generic with_columns? Or not?
  3. Related to (2) registry + import structure

Let's hop on a call at some point? Or discuss asynchronously?

@@ -26,6 +26,15 @@ def fill_with_scalar_vaex(
return df


@registry.with_columns.register(vaex.dataframe.DataFrame)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be registering it as NotImplemented? Or should the fact that we don't register it mean it's not implemented...



# Do we need this here?
class with_columns(with_columns_factory):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So yeah, I think this should live individually -- the contract shouldb e per-dataframe-type (for now). Then we can add a central one, but this enables separate implementations and the fact that we're using the same class to be an implementation detail.

hamilton/plugins/h_polars.py Show resolved Hide resolved
@@ -79,6 +79,9 @@ def load_extension(plugin_module: ExtensionName):
assert hasattr(
mod, f"fill_with_scalar_{plugin_module}"
), f"Error extension missing fill_with_scalar_{plugin_module}"
assert hasattr(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AHh, I see this is the answer to the above.Makes sense leaving it unimplemented in a few!

@@ -626,3 +629,314 @@ def prune_nodes(nodes: List[node.Node], select: Optional[List[str]] = None) -> L
stack.append(dep_node)
seen_nodes.add(dep)
return output


class with_columns(base.NodeInjector, abc.ABC):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is abstract -- where are the functions that are left as abstract? E.G. what does a subclass have to implement?

@jernejfrank
Copy link
Contributor Author

Ok, so here is my line of thinking with regards to the changes:

The with_columns consists of three parts:

  1. We need the input node. This can be a full dataframe if pass_datafame_as is used or we need to extract columns into nodes if columns_to_pass is used. Given that some data frames are supported in Hamilton's extract_columns and some are not, this should be implemented on a per library basis.
  2. We need the subtag nodes. Again, we can re-use Hamilton's subdag functionality, but some libraries again will need more (see h_spark) and is therefore again to be implemented on a per library basis.
  3. Last is combining eveything into a single dataframe again to be implemented on a per library basis.

So what I decided is to leave three abstract methods:

  1. get_initial_nodes
  2. get_subdag_nodes
  3. create_merge_node

that should create enough flexibility to implement any dataframe library, but is also concrete enough to wire together everything in inject_nodes from NodeInjector.

Now, every plugin library, h_pandas, h_polars, and h_polars_lazyframe inherits from this class and in the their initialisation calls out to the parent factory init, but passes in the required dataframe type (e.g. pd.DataFrame, pl.DataFrame, or pl.LazyFrame) which is in turn derived from the extension modules. So in effect we use the registry approach without hard-binding us to needing to implementat any functionality in there.

Since that part of the API is private, should we want to switch to registry, the refactoring is straightforward and shouldn't get us into trouble down the road.

Copy link
Collaborator

@elijahbenizzy elijahbenizzy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good! Happy with the architecture choices. One thing that would be helpful -- let's add a page on the docs (E.G. a README in function_modifiers) that addresses architectural decisions. A lot here on how we did it, and it'll be really helpful for picking up later.

Left a few comments, can approve and ship soon!

hamilton/function_modifiers/recursive.py Outdated Show resolved Hide resolved
hamilton/function_modifiers/recursive.py Outdated Show resolved Hide resolved
return kwargs[inject_parameter]
if inspect.iscoroutinefunction(fn):

async def temp_fn(**kwargs) -> Any:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why this is necessary -- we should be able to have it work with both sync and async? E.G. this function doesn't need to be awaited? Or is it that we end up awaiting this so we have to use an async function (even though its the same)? Comment would help.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just checked, it is not.. I thought since extract_columns is checking for a co-routine, we need to wire it through, but turns out it works without it as well. Same for the create_merge_node.

f"It might not be compatible with some other decorators."
)

if input_types[inject_parameter] != required_type:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to use a subclass check instead of equality

# TODO: if we rename the column nodes into something smarter this can be avoided and
# can also modify columns in place
@staticmethod
def _check_for_duplicates(nodes_: List[node.Node]) -> bool:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit -- _contains_duplicates might be better. Also, probably best not to have an underscore as we use it externally.

f"No nodes found upstream from select columns: {self.select} for function: "
f"{fn.__qualname__}"
)
# In case no node is selected we append all possible nodes that have a column type matching
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To circle back, I thnk we're removing result builder to reduce complexity, right?

@elijahbenizzy
Copy link
Collaborator

Ok, so here is my line of thinking with regards to the changes:

The with_columns consists of three parts:

  1. We need the input node. This can be a full dataframe if pass_datafame_as is used or we need to extract columns into nodes if columns_to_pass is used. Given that some data frames are supported in Hamilton's extract_columns and some are not, this should be implemented on a per library basis.
  2. We need the subtag nodes. Again, we can re-use Hamilton's subdag functionality, but some libraries again will need more (see h_spark) and is therefore again to be implemented on a per library basis.
  3. Last is combining eveything into a single dataframe again to be implemented on a per library basis.

So what I decided is to leave three abstract methods:

  1. get_initial_nodes
  2. get_subdag_nodes
  3. create_merge_node

that should create enough flexibility to implement any dataframe library, but is also concrete enough to wire together everything in inject_nodes from NodeInjector.

Now, every plugin library, h_pandas, h_polars, and h_polars_lazyframe inherits from this class and in the their initialisation calls out to the parent factory init, but passes in the required dataframe type (e.g. pd.DataFrame, pl.DataFrame, or pl.LazyFrame) which is in turn derived from the extension modules. So in effect we use the registry approach without hard-binding us to needing to implementat any functionality in there.

Since that part of the API is private, should we want to switch to registry, the refactoring is straightforward and shouldn't get us into trouble down the road.

Nice, I think this is a good overview. Note there might still be shared stuff between the implementations, in which case you have two options to reduce duplicated code (should you want):

  1. Joint/helper functions
  2. Additional subclasses, E.G. for column-specific ones (polars/pandas)

But I think these will be options for later, and quite possibly over-engineered.

To be clear, this doesn't look like it works with spark, yet? Do you think that's a possibility?

@jernejfrank jernejfrank force-pushed the feat/polars_with_columns branch from 9b9bf89 to 1958784 Compare November 24, 2024 04:29
Adds column type to be pl.Expr and allows using extract_columns with
Polars LazyFrame. The collect() is left to the user.
@jernejfrank jernejfrank force-pushed the feat/polars_with_columns branch 2 times, most recently from f7ff620 to 1d76a7d Compare November 26, 2024 02:10
Copy link
Collaborator

@elijahbenizzy elijahbenizzy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is way cleaner -- nice work! And more powerful. Will take another look later tonight but just left a few comments -- this is very inline with how we discussed it.


We show the ability to use the familiar `with_columns` from `polars`. Supported for both `pl.DataFrame` and `pl.LazyFrame`.

To see the example look at the notebook.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

link to notebook from this - nice to be able to click

@@ -0,0 +1,19 @@
# with_columns_base

Documenting the current design flow for the `with_columns` decorator. It belongs to the `NodeInjector` lifecycle.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth explaining why NodeInjector (E.G. cause it still runs the function as a node but injects the dataframe with columns in)

inject_parameter = target_dataframe
else:
# If we don't have a specified dataframe we assume it's the first argument
inject_parameter = list(sig.parameters.values())[0].name
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably check if the function has a first argument, otherwise error out?

input_map = {column: column_type for column in self.select}
input_map[inject_parameter] = self.dataframe_type
merge_node = node.Node(
name="__append",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we do the __ pattern earlier?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to change that, spark has the last node as _select -- I think in our case we should use _append or _with_columns to avoid misunderstanding with the select functionality of polars.

Adds base class to use for implementing with_columns that si dataframe
library specific. See README for design choices.
Inherits from with_columns_base.

Previous implementation is based on NodeGenerator where the last node
had to be implemented. New implementation is based on NodeInjector where
the last node is created later. The tests are adjusted so that they do
not check this node.
Uses with_columns_base and adjusted internally. Tests had to be adjusted
to test pandas specific functionality and internal function naming
changes.
Supporting now using with_columns for both polars eager and lazy
execution.

Inherits from with_columns_base. Some functionality is shared with
h_pandas.with_columns -- TODO: refactor out common logic when
appropriate.
Add docs for new with_columns implementations.
@jernejfrank jernejfrank force-pushed the feat/polars_with_columns branch from 1d76a7d to 40b0f84 Compare November 26, 2024 23:52
@elijahbenizzy elijahbenizzy merged commit a483a07 into DAGWorks-Inc:main Nov 27, 2024
24 checks passed
@jernejfrank jernejfrank deleted the feat/polars_with_columns branch November 28, 2024 01:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants