Skip to content

Commit

Permalink
Implements the pipe decorator
Browse files Browse the repository at this point in the history
`pipe` allows for function chaining/redefinition. It has a few caveats
(see the documentation), but they're all placeholders -- we will have
the chance to improve it over time, as more people use it.

We also place in, but don't turn on/expose the `@flow` decorator, which
is equivalent to `pipe`, but doesn't pass the result into the next
function. Thus it can be used to build a procedural subdag as a
component of a declarative DAG.
  • Loading branch information
elijahbenizzy committed Oct 16, 2023
1 parent 11f816c commit 0af987d
Show file tree
Hide file tree
Showing 10 changed files with 1,062 additions and 252 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ This paradigm makes modifications easy to build and track, ensures code is self-
✅ Model a dataflow -- If you can model your problem as a DAG in python, Hamilton is the cleanest way to build it.<br/>
✅ Unmaintainable spaghetti code -- Hamilton dataflows are unit testable, self-documenting, and provide lineage.<br/>
✅ Long iteration/experimentation cycles -- Hamilton provides a clear, quick, and methodical path to debugging/modifying/extending your code.<br/>
✅ Reusing code across contexts -- Hamilton encourages code that is independent of infrastructure and can run regardless of execution setting.
✅ Reusing code across contexts -- Hamilton encourages code that is independent of infrastructure and can run regardless of execution setting.

## Problems Hamilton Does not Solve
❌ Provisioning infrastructure -- you want a macro-orchestration system (see airflow, kubeflow, sagemaker, etc...).<br/>
Expand Down
366 changes: 145 additions & 221 deletions examples/parallelism/star_counting/notebook.ipynb

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions hamilton/function_modifiers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@

# does decorator
does = macros.does
pipe = macros.pipe
apply = macros.apply

# resolve transform/model decorator
dynamic_transform = macros.dynamic_transform
Expand Down
4 changes: 2 additions & 2 deletions hamilton/function_modifiers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def transform_dag(

# TODO -- delete this/replace with the version that will be added by
# https://github.com/DAGWorks-Inc/hamilton/pull/249/ as part of the Node class
def _reassign_input_names(node_: node.Node, input_names: Dict[str, Any]) -> node.Node:
def _reassign_inputs(node_: node.Node, input_names: Dict[str, Any]) -> node.Node:
"""Reassigns the input names of a node. Useful for applying
a node to a separate input if needed. Note that things can get a
little strange if you have multiple inputs with the same name, so
Expand Down Expand Up @@ -309,7 +309,7 @@ def transform_dag(
for node_ in nodes:
# if there's an intersection then we want to rename the input
if set(node_.input_types.keys()) & set(rename_map.keys()):
out.append(_reassign_input_names(node_, rename_map))
out.append(_reassign_inputs(node_, rename_map))
else:
out.append(node_)
out.extend(nodes_to_inject)
Expand Down
122 changes: 101 additions & 21 deletions hamilton/function_modifiers/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,99 @@
replacing if/else/switch statements in standard dataflow definition libraries"""


class ConfigResolver:
def __init__(self, resolves: Callable[[Dict[str, Any]], bool], config_used: List[str]):
self.resolves = resolves
self._config_used = config_used

@property
def optional_config(self) -> Dict[str, Any]:
return {key: None for key in self._config_used}

def __call__(self, config: Dict[str, Any]) -> bool:
return self.resolves(config)

@staticmethod
def when(name=None, **key_value_pairs) -> "ConfigResolver":
"""Yields a decorator that resolves the function if all keys in the config are equal to the corresponding value.
:param key_value_pairs: Keys and corresponding values to look up in the config
:return: a configuration decorator
"""

def resolves(configuration: Dict[str, Any]) -> bool:
return all(value == configuration.get(key) for key, value in key_value_pairs.items())

return ConfigResolver(resolves, config_used=list(key_value_pairs.keys()))

@staticmethod
def when_not(name=None, **key_value_pairs: Any) -> "ConfigResolver":
"""Yields a decorator that resolves the function if none keys in the config are equal to the corresponding value
``@config.when_not(param=value)`` will be included if the parameter is _not_ equal to the value specified.
:param key_value_pairs: Keys and corresponding values to look up in the config
:return: a configuration decorator
"""

def resolves(configuration: Dict[str, Any]) -> bool:
return all(value != configuration.get(key) for key, value in key_value_pairs.items())

return ConfigResolver(resolves, config_used=list(key_value_pairs.keys()))

@staticmethod
def when_in(name=None, **key_value_group_pairs: Collection[Any]) -> "ConfigResolver":
"""Yields a decorator that resolves the function if all of the
values corresponding to the config keys are equal to one of items in the list of values.
``@config.when_in(param=[value1, value2, ...])`` Will be included if the parameter is equal to one of the \
specified values.
:param key_value_group_pairs: pairs of key-value mappings where the value is a list of possible values
:return: a configuration decorator
"""

def resolves(configuration: Dict[str, Any]) -> bool:
return all(
configuration.get(key) in value for key, value in key_value_group_pairs.items()
)

return ConfigResolver(resolves, config_used=list(key_value_group_pairs.keys()))

@staticmethod
def when_not_in(**key_value_group_pairs: Collection[Any]) -> "ConfigResolver":
"""Yields a decorator that resolves the function only if none of the keys are in the list of values.
``@config.when_not_in(param=[value1, value2, ...])`` Will be included if the parameter is not equal to any of \
the specified values.
:param key_value_group_pairs: pairs of key-value mappings where the value is a list of possible values
:return: a configuration decorator
.. code-block:: python
@config.when_not_in(business_line=["mens","kids"], region=["uk"])
def LEAD_LOG_BASS_MODEL_TIMES_TREND(
TREND_BSTS_WOMENS_ACQUISITIONS: pd.Series,
LEAD_LOG_BASS_MODEL_SIGNUPS_NON_REFERRAL: pd.Series) -> pd.Series:
above will resolve for config has `{"business_line": "womens", "region": "us"}`,
but not for configs that have `{"business_line": "mens", "region": "us"}`,
`{"business_line": "kids", "region": "us"}`, or `{"region": "uk"}`.
.. seealso::
:ref:config.when_not
"""

def resolves(configuration: Dict[str, Any]) -> bool:
return all(
configuration.get(key) not in value for key, value in key_value_group_pairs.items()
)

return ConfigResolver(resolves, config_used=list(key_value_group_pairs.keys()))


class config(base.NodeResolver):
"""Decorator class that determines whether a function should be in the DAG based on some configuration variable.
Expand Down Expand Up @@ -118,11 +211,8 @@ def when(name=None, **key_value_pairs) -> "config":
:param key_value_pairs: Keys and corresponding values to look up in the config
:return: a configuration decorator
"""

def resolves(configuration: Dict[str, Any]) -> bool:
return all(value == configuration.get(key) for key, value in key_value_pairs.items())

return config(resolves, target_name=name, config_used=list(key_value_pairs.keys()))
resolver = ConfigResolver.when(name=name, **key_value_pairs)
return config(resolver, target_name=name, config_used=list(resolver.optional_config))

@staticmethod
def when_not(name=None, **key_value_pairs: Any) -> "config":
Expand All @@ -134,10 +224,8 @@ def when_not(name=None, **key_value_pairs: Any) -> "config":
:return: a configuration decorator
"""

def resolves(configuration: Dict[str, Any]) -> bool:
return all(value != configuration.get(key) for key, value in key_value_pairs.items())

return config(resolves, target_name=name, config_used=list(key_value_pairs.keys()))
resolver = ConfigResolver.when_not(name=name, **key_value_pairs)
return config(resolver, target_name=name, config_used=list(resolver.optional_config))

@staticmethod
def when_in(name=None, **key_value_group_pairs: Collection[Any]) -> "config":
Expand All @@ -151,12 +239,8 @@ def when_in(name=None, **key_value_group_pairs: Collection[Any]) -> "config":
:return: a configuration decorator
"""

def resolves(configuration: Dict[str, Any]) -> bool:
return all(
configuration.get(key) in value for key, value in key_value_group_pairs.items()
)

return config(resolves, target_name=name, config_used=list(key_value_group_pairs.keys()))
resolver = ConfigResolver.when_in(name=name, **key_value_group_pairs)
return config(resolver, target_name=name, config_used=list(resolver.optional_config))

@staticmethod
def when_not_in(**key_value_group_pairs: Collection[Any]) -> "config":
Expand All @@ -183,9 +267,5 @@ def LEAD_LOG_BASS_MODEL_TIMES_TREND(
:ref:config.when_not
"""

def resolves(configuration: Dict[str, Any]) -> bool:
return all(
configuration.get(key) not in value for key, value in key_value_group_pairs.items()
)

return config(resolves, config_used=list(key_value_group_pairs.keys()))
resolver = ConfigResolver.when_not_in(**key_value_group_pairs)
return config(resolver, config_used=list(resolver.optional_config))
Loading

0 comments on commit 0af987d

Please sign in to comment.