Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable mutation of the output of nodes in a linear fashion via decorators #922

Closed
skrawcz opened this issue May 27, 2024 · 9 comments
Closed
Labels

Comments

@skrawcz
Copy link
Collaborator

skrawcz commented May 27, 2024

Is your feature request related to a problem? Please describe.
This is similar to #701 -- but a distributed version.

People don't want to change column/dataframe/artifact names. This conflicts with Hamilton. Is there some API we can support?

Describe the solution you'd like
One idea is that you pipe transforms together and have the framework group things so that there isn't a renaming issue.

E.g.

def data_set(...) -> pd.DataFrame:
  ...
  return df

@mutate
def _data_set(data_set: pd.DataFrame) -> pd.DataFrame:
  # some mutation
  return df

@mutate
def _data_set(data_set: pd.DataFrame) -> pd.DataFrame:
  # some other mutation
  return df

Notes:

  1. python modules can only expose one function with the same name -- this is the last one defined.
  2. this means that anything we want to use downstream can only be defined once.
  3. the mutating functions here in the above are prefixed _ which is reserved for private functions. Which is fine I think because these transform functions aren't useful by themselves -- and shouldn't be exposed directly. It also gets around the naming issue of (1) --- we can have the decorator register and capture these. Open decision as to what "declares" the connection to the function -- the first argument name? or the name of the function? or?
  4. Order matters. The idea is that the decorator builds up an ordered list of transforms. This allows one to experiment with commenting out functions etc as they're developing...
  5. When Hamilton inspects this module, it then pulls data_set and then checks what was registered against it via @mutate. One initial constraint we can have is that @mutate has to be in the same module; We should design for multi-module, but as a first pass constrain to same module...
  6. Hamilton would then render this correctly exposing those nodes in the graph... and expose data_set as the result of applying all those transforms.

Describe alternatives you've considered
Alternative / additional decorator use could be:

@mutate("data_set")
def _some_func_name(arg1: pd.DataFrame) -> pd.DataFrame:
  # assumes arg1 maps to data_set ? 
  # some other mutation
  return df

To enable one to have functions names that don't have to match.

This could then help one to write mutations like this -- which I think is a potential vote for allowing multi-module registration --and using module order then to imply transform order application.

for helper_func in helper_funcs:
    mutate.register("data_set", helper_func)

Additional context

Here's some code that proves you can intercept and register the functions in mutate decorator.

import collections
function_registry = collections.defaultdict(list)

import functools

def mutate(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        return func(*args, **kwargs)

    qualified_key = func.__module__ + "." + func.__name__
    # Modify the function's name
    func.__name__ = func.__name__ + str(len(function_registry[qualified_key]))
    wrapper.__name__ = func.__name__

    # Register the function
    function_registry[qualified_key].append(wrapper)

    return wrapper

Then:

from decorator import mutate
import pandas as pd

def my_function(input: pd.Series) -> pd.Series:
    return input

@mutate
def _my_function(my_function: pd.Series) -> pd.Series:
    return my_function + 1

@mutate
def _my_function(my_function: pd.Series) -> pd.Series:
    return my_function * 2
@skrawcz skrawcz added enhancement New feature or request decorators labels May 27, 2024
@jernejfrank
Copy link
Contributor

This would be a really cool functionality to have!

I'm considering the following scenario:
I already built a DAG that gives me a baseline functionality or produced a delivarable I was chasing a deadline for. Now I want to experiment by tweaking some nodes, but I would prefer it to be non-invasive and to turn it on or off. Something like doing A/B testing for example, but without needing to decorate the code with config.when. If my transforms work I can make them more permanent, otherwise just delete the one function in one place without needing to do anything else.

We can even add something like turning mutations on or off at building time to test how mutations affect the result.

--
I prefer the alternative implementation for the following reasons:

  1. I think we should leave the original functions alone, i.e. not add any decorators to them or impose additional naming on them (otherwise the functionality is already there by using other decorators like pipe_* / wiring a DAG in the canonical way).
  2. This restriction makes it hard to map the mutating transformation to more than one original function and we loose the ability to apply the same transformation to multiple nodes.
  3. The alternative solution allows us to pass in a list of function names that would get the same function, can pass in value() arguments, etc. (I guess this flips the original idea that you only need to comment out the particular function name from the list to cancel the mutation on it, instead of commenting out a whole function)
  4. We still need to think about order, but the naming issue goes away completely since the transforms can be named anything and the decorator binds them.

Let me know if I misunderstood or missed something from the original idea.

@skrawcz
Copy link
Collaborator Author

skrawcz commented Sep 25, 2024

Do you have a code example of what it would look like and the updates you'd need to make?

I think the key here is just determining what workflows we want to enable this to work best for, along with showing what the friction points we currently have are. E.g. is it notebook dev? or is it reusing transforms across multiple data sets? or? What does the current code look like to achieve that outcome?

This will then help guide how one registers these additional functions.

Now that I think about it, this also seems also pretty close the ideas with that @with_columns does; Note we haven't made a generic with_columns that works over any dataframe (#1158).

@jernejfrank
Copy link
Contributor

Here is a very rough sketch of what I had in mind:

# sample_module.py

import pandas as pd
from dev import mutate


def my_function(input: pd.Series) -> pd.Series:
    return input

def other_function(input: pd.Series) -> pd.Series:
    return input

def no_mutations(input: pd.Series) -> pd.Series:
    return input

@mutate(functions=["my_function", "other_function"])
def _transform_1(input_function: pd.Series) -> pd.Series:
    return input_function + 1


@mutate(functions=["other_function"])
def _transform_2(input_function: pd.Series) -> pd.Series:
    return input_function * 2

Here is a very crude implementation, but the essential idea is that we find the right functions and build a pipe_output for each one in the background (pipe_output I put here to fast-jump to a working solution, there's should be a better way of implementing this).

# I think we need to touch only two places

from hamilton.graph_utils import find_functions
import inspect
from types import ModuleType
from typing import Callable, List, Tuple

import collections
import functools
from hamilton.function_modifiers.macros import pipe_output, step
import sample_module


# the actual decorator #############################################################################
def mutate(functions):
    def decorator(fn):
        @functools.wraps(fn)
        def wrapped_fn(*args, **kwargs):
            return fn(*args, **kwargs)
        # We make all mutable functions hidden
        if not fn.__name__.startswith("_"):
            # Probably change __qualname__ as well?
            wrapped_fn.__name__ = f"_mutate_{fn.__name__}"
        else:
            wrapped_fn.__name__ = f"_mutate_{fn.__name__[1:]}"
        wrapped_fn.functions_to_mutate = functions
        return wrapped_fn
    return decorator

# somewhere in graph.py ############################################################################
def find_mutations(function_module: ModuleType) -> List[Tuple[str, Callable]]:
    """Function to determine the set of functions we want to build a graph from.

    This iterates through the function module and grabs all function definitions.
    :return: list of tuples of (func_name, function).
    """

    def valid_fn(fn):
        return (
            inspect.isfunction(fn)
            and "_mutate_" in fn.__name__
            and is_submodule(inspect.getmodule(fn), function_module)
        )

    return [f for f in inspect.getmembers(function_module, predicate=valid_fn)]

def create_mutation_pipeline(functions, mutations):
    pipelines = collections.defaultdict(list)
    for mutation in mutations:
        f = mutation[1]
        for target_function in f.__dict__["functions_to_mutate"]:
            pipelines[target_function].append((f))
    return pipelines


def attach_pipelines(functions,pipelines):
    new_functions = []
    for f in functions:
        if not f[0] in pipelines:
            new_functions.append((f[0],f[1]))
            continue

        steps = []
        for transform in pipelines[f[0]]:
            # place to bind additional args and stuff you can put into step
            steps.append(step(transform))
            
        new_functions.append((f[0],pipe_output(*steps)(f[1])))

if __name__ == "__main__":
    functions = sum([find_functions(module) for module in [sample_module]], [])
    mutations = sum([find_mutations(module) for module in [sample_module]], [])
    print(functions)
    print(mutations)
    pipelines = create_mutation_pipeline(functions=functions, mutations=mutations)
    print(pipelines)
    new_functions = attach_pipelines(functions=functions, pipelines=pipelines)
    print(new_functions)

@jernejfrank
Copy link
Contributor

jernejfrank commented Sep 25, 2024

re: with_columns -- I'll take a look tomorrow, I wasn't aware of it!

@skrawcz
Copy link
Collaborator Author

skrawcz commented Sep 26, 2024

Another use case sketch - I want to pull data but keep it the same name...

def cust_data(...) -> pd.DataFrame:
  ... pull data

@mutate(cust_data)
@check_output(...)
def _filter(cust_data: pd.DataFrame) -> pd.DataFrame:
   ... filter it

@mutate(cust_data) # do we assume target is first argument?
def _join(cust_data: pd.DataFrame, foo_dep: pd.DataFrame) -> pd.DataFrame:
    ... join ; we would allow other dag dependencies I think

def aggregate_data(cust_data: pd.DataFrame) -> pd.DataFrame:
    ... this depends on final cust_data node -- so would be a caveat for users that the functions below still apply.
    ... (maybe we would have to have a linter to sort files with @mutate)

@mutate(cust_data, some_other_data) # some_other_data is another function;  doesn't have to be function pointers, but could be strings.
def _sort(generic_df: pd.DataFrame) -> pd.DataFrame:
    ... this is a generic helper that is applied to two functions...

@mutate(cust_data, target_="some_data") # required for combined decorator usage?
@with_columns(...)
def _features(some_data: pd.DataFrame) -> pd.DataFrame:  
   ... with columns add features...

Things to think about:

  1. unit testing -- it is possible this way.
  2. is the first argument name meaningful? or not?
  3. do we allow inter module or only allow intra module? We like forcing people to curate code. Inter could make that tricky... inter could allow for people to "break encapsulation" -- not sure that's a good thing (it would require good error and introspection)
  4. this makes it fuzzy to find what is upstream of a node -- with the function pointer in the decorator it is simpler, if it was a string name a little harder...

@jernejfrank
Copy link
Contributor

Nice, that makes a strong point for the use case that we shouldn't limit @mutate to impact a single function.

  1. I would bind it the same way as pipe: first argument is the output of the function you want to mutate; other arguments can be passed in via source/value (maybe even a dict in case of multiple functions so we can do different values? but, this sounds complicated...)
  2. yes, on the other hand it would allow to have a single module with only mutate functions as a central place to check out all data processing stuff - not sure in practice which is better. I guess that boils down to the debate of who the end user is: data scientists who need to transform something or their engineering counterparts who want to force them to ship somewhat organised code?
  3. agreed, but this depends on the decision around 3. If we allow inter-module than you would need to import all the functions, which becomes a hassle compared to just writing a list of strings. Can't we assume upstream to be from the "mother" node that gets transformed? (if we are wiring them together internally, should be able to force sharing the namespace?)

Will work on it more tomorrow to get something concrete out.

@skrawcz
Copy link
Collaborator Author

skrawcz commented Sep 27, 2024

Some random thoughts:

Mutate should only be able to work with a select few other decorators:

  • check_output & check_output_custom
  • subdag like ones potentially like subdag & with_columns. i.e. they can only return a single output.

Other decorators I don't think make much sense.

Things to think through: tricky part is that we have "step" that turns into a "node" and that's what we want to potentially apply the check_output and subdag operators on...

We could constraint this initial feature to not enable this to not make this too big of a project -- but this should be taken into account.

@skrawcz
Copy link
Collaborator Author

skrawcz commented Sep 27, 2024

The other way to think about this, is what code is it replacing:

def raw_cust_data(...) -> pd.DataFrame:
  ... pull data

@check_output(...)
def cust_data_filtered(raw_cust_data: pd.DataFrame) -> pd.DataFrame:
   ... filter it

def cust_data_joined(cust_data_filtered: pd.DataFrame, foo_dep: pd.DataFrame) -> pd.DataFrame:
    ... join

def aggregate_data(cust_data_features: pd.DataFrame) -> pd.DataFrame:
    .... aggregate

def cust_data_sorted(cust_data_filtered: pd.DataFrame) -> pd.DataFrame:
    ... sorted

def some_data_sorted(some_data: pd.DataFrame) -> pd.DataFrame:
    ...

@with_columns(...)
def cust_data_features(cust_data_sorted: pd.DataFrame) -> pd.DataFrame:  
   ... with columns add features...

So it replaces:

  1. having to have unique names and then changing wiring if you want to add/remove/replace something.
  2. enabling more verb like names on functions
  3. potentially simpler "reuse" of transform functions across DAG paths...

@jernejfrank jernejfrank mentioned this issue Sep 28, 2024
7 tasks
@zilto
Copy link
Collaborator

zilto commented Nov 5, 2024

Closing since #1160 was merged

@zilto zilto closed this as completed Nov 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants