Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perf optimization -- debugging tracker latency #1028

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 175 additions & 0 deletions examples/materialization/using_types/notebook.ipynb

Large diffs are not rendered by default.

28 changes: 28 additions & 0 deletions examples/materialization/using_types/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import logging

import simple_etl
from hamilton_sdk import adapters

from hamilton import driver
from hamilton.log_setup import setup_logging

setup_logging(logging.DEBUG)

tracker = adapters.HamiltonTracker(
project_id=15, # modify this as needed
username="[email protected]",
dag_name="my_version_of_the_dag",
tags={"environment": "DEV", "team": "MY_TEAM", "version": "X"},
) # note this slows down execution because there's 60 columns.
# 30 columns adds about a 1 second.
# 60 is therefore 2 seconds.

dr = driver.Builder().with_config({}).with_modules(simple_etl).with_adapters(tracker).build()
dr.display_all_functions("simple_etl.png")

import time

start = time.time()
print(start)
dr.execute(["saved_data"], inputs={"filepath": "data.csv"})
print(time.time() - start)
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
45 changes: 45 additions & 0 deletions examples/materialization/using_types/simple_etl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from hamilton.telemetry import disable_telemetry

disable_telemetry()
import logging

import pandas as pd
from sklearn import datasets

from hamilton import node
from hamilton.function_modifiers import loader, saver
from hamilton.io import utils as io_utils
from hamilton.log_setup import setup_logging

setup_logging(logging.INFO)


@loader()
def raw_data() -> tuple[pd.DataFrame, dict]:
data = datasets.load_digits()
df = pd.DataFrame(data.data, columns=[f"feature_{i}" for i in range(data.data.shape[1])])
metadata = io_utils.get_dataframe_metadata(df)
return df, metadata


def transformed_data(raw_data: pd.DataFrame) -> pd.DataFrame:
return raw_data


@saver()
def saved_data(transformed_data: pd.DataFrame, filepath: str) -> dict:
transformed_data.to_csv(filepath)
metadata = io_utils.get_file_and_dataframe_metadata(filepath, transformed_data)
return metadata


if __name__ == "__main__":
import time

from hamilton_sdk.tracking import runs

df, metadata = raw_data()
t1 = time.time()
stats = runs.process_result(df, node.Node.from_fn(raw_data))
t2 = time.time()
print(t2 - t1)
1 change: 0 additions & 1 deletion hamilton/execution/graph_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ def dfs_traverse(
except Exception as e:
pre_node_execute_errored = True
raise e

if adapter.does_method("do_node_execute", is_async=False):
result = adapter.call_lifecycle_method_sync(
"do_node_execute",
Expand Down
2 changes: 2 additions & 0 deletions hamilton/function_modifiers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,5 @@
# materialization stuff
load_from = adapters.load_from
save_to = adapters.save_to
loader = macros.loader
saver = macros.saver
1 change: 0 additions & 1 deletion hamilton/function_modifiers/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,6 @@ def filter_function(_inject_parameter=inject_parameter, **kwargs):
def inject_nodes(
self, params: Dict[str, Type[Type]], config: Dict[str, Any], fn: Callable
) -> Tuple[Collection[node.Node], Dict[str, str]]:
pass
"""Generates two nodes:
1. A node that loads the data from the data source, and returns that + metadata
2. A node that takes the data from the data source, injects it into, and runs, the function.
Expand Down
105 changes: 105 additions & 0 deletions hamilton/function_modifiers/macros.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union

import pandas as pd
import typing_inspect

from hamilton import models, node
from hamilton.dev_utils.deprecation import deprecated
from hamilton.function_modifiers import base
from hamilton.function_modifiers.configuration import ConfigResolver
from hamilton.function_modifiers.delayed import resolve as delayed_resolve
from hamilton.function_modifiers.dependencies import (
InvalidDecoratorException,
LiteralDependency,
SingleDependency,
UpstreamDependency,
Expand Down Expand Up @@ -870,3 +872,106 @@ def optional_config(self) -> Dict[str, Any]:
#
# def __init__(self, *transforms: Applicable, collapse=False):
# super(flow, self).__init__(*transforms, collapse=collapse, _chain=False)


class loader(base.NodeCreator):
"""Class to capture metadata."""

# def __init__(self, og_function: Callable):
# self.og_function = og_function
# super(loader,self).__init__()

def validate(self, fn: Callable):
print("called validate loader")
return_annotation = inspect.signature(fn).return_annotation
if return_annotation is inspect.Signature.empty:
raise InvalidDecoratorException(
f"Function: {fn.__qualname__} must have a return annotation."
)
# check that the type is a tuple[TYPE, dict]:
if not typing_inspect.is_tuple_type(return_annotation):
raise InvalidDecoratorException(f"Function: {fn.__qualname__} must return a tuple.")
if len(typing_inspect.get_args(return_annotation)) != 2:
raise InvalidDecoratorException(
f"Function: {fn.__qualname__} must return a tuple of length 2."
)
if not typing_inspect.get_args(return_annotation)[1] == dict:
raise InvalidDecoratorException(
f"Function: {fn.__qualname__} must return a tuple of type (SOME_TYPE, dict)."
)

def generate_nodes(self, fn: Callable, config) -> List[node.Node]:
"""
Generates two nodes.
The first one is just the fn - with a slightly different name,
the second one uses the proper function name, but only returns
the first part of the tuple that the first returns.
We have to add tags appropriately.
:param fn:
:param config:
:return:
"""
_name = "loader"
og_node = node.Node.from_fn(fn, name=_name)
new_tags = og_node.tags.copy()
new_tags.update(
{
"hamilton.data_loader": True,
"hamilton.data_loader.has_metadata": True,
"hamilton.data_loader.source": f"{fn.__name__}",
"hamilton.data_loader.classname": f"{fn.__name__}()",
"hamilton.data_loader.node": _name,
}
)

def filter_function(**kwargs):
return kwargs[f"{fn.__name__}.{_name}"][0]

filter_node = node.Node(
name=fn.__name__, # use original function name
callabl=filter_function,
typ=typing_inspect.get_args(og_node.type)[0],
input_types={f"{fn.__name__}.{_name}": og_node.type},
tags={
"hamilton.data_loader": True,
"hamilton.data_loader.has_metadata": False,
"hamilton.data_loader.source": f"{fn.__name__}",
"hamilton.data_loader.classname": f"{fn.__name__}()",
"hamilton.data_loader.node": fn.__name__,
},
)

return [og_node.copy_with(tags=new_tags, namespace=(fn.__name__,)), filter_node]


class saver(base.NodeCreator):
"""Class to capture metadata."""

def validate(self, fn: Callable):
print("called validate")
return_annotation = inspect.signature(fn).return_annotation
if return_annotation is inspect.Signature.empty:
raise InvalidDecoratorException(
f"Function: {fn.__qualname__} must have a return annotation."
)
# check that the return type is a dict
if return_annotation not in (dict, Dict):
raise InvalidDecoratorException(f"Function: {fn.__qualname__} must return a dict.")

def generate_nodes(self, fn: Callable, config) -> List[node.Node]:
"""
All this does is add tags
:param fn:
:param config:
:return:
"""
og_node = node.Node.from_fn(fn)
new_tags = og_node.tags.copy()
new_tags.update(
{
"hamilton.data_saver": True,
"hamilton.data_saver.sink": f"{og_node.name}",
"hamilton.data_saver.classname": f"{fn.__name__}()",
}
)
return [og_node.copy_with(tags=new_tags)]
4 changes: 3 additions & 1 deletion hamilton/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,9 @@ def _get_legend(
node_style.update(**modifier_style)
seen_node_types.add("materializer")

if n.tags.get("hamilton.data_loader") and "load_data." in n.name:
if n.tags.get("hamilton.data_loader") and (
"load_data." in n.name or "loader" == n.tags.get("hamilton.data_loader.node")
):
materializer_type = n.tags["hamilton.data_loader.classname"]
label = _get_node_label(n, type_string=materializer_type)
modifier_style = _get_function_modifier_style("materializer")
Expand Down
6 changes: 4 additions & 2 deletions hamilton/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,9 @@ def from_fn(fn: Callable, name: str = None) -> "Node":
return_type = typing.get_type_hints(fn, **type_hint_kwargs).get("return")
if return_type is None:
raise ValueError(f"Missing type hint for return value in function {fn.__qualname__}.")
module = inspect.getmodule(fn).__name__
tags = {"module": module}

node_source = NodeType.STANDARD
# TODO - extract this into a function + clean up!
if typing_inspect.is_generic_type(return_type):
Expand All @@ -277,8 +280,7 @@ def from_fn(fn: Callable, name: str = None) -> "Node":
if typing_inspect.get_origin(hint) == Collect:
node_source = NodeType.COLLECT
break
module = inspect.getmodule(fn).__name__
tags = {"module": module}

if hasattr(fn, "__config_decorated__"):
tags["hamilton.config"] = ",".join(fn.__config_decorated__)
return Node(
Expand Down
16 changes: 3 additions & 13 deletions ui/sdk/src/hamilton_sdk/tracking/pandas_col_stats.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Dict, List, Union

import numpy as np
import pandas as pd
from hamilton_sdk.tracking import dataframe_stats as dfs

Expand Down Expand Up @@ -45,19 +46,8 @@ def quantiles(col: pd.Series, quantile_cuts: List[float]) -> Dict[float, float]:


def histogram(col: pd.Series, num_hist_bins: int = 10) -> Dict[str, int]:
try:
hist_counts = (
col.value_counts(
bins=num_hist_bins,
)
.sort_index()
.to_dict()
)
except ValueError:
return {}
except AttributeError:
return {}
return {str(interval): interval_value for interval, interval_value in hist_counts.items()}
hist, bins = np.histogram(col, bins=num_hist_bins)
return {str(interval): interval_value for interval, interval_value in zip(bins, hist)}


def numeric_column_stats(
Expand Down
12 changes: 11 additions & 1 deletion ui/sdk/src/hamilton_sdk/tracking/pandas_stats.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from typing import Any, Dict, Union

import pandas as pd
Expand All @@ -12,7 +13,11 @@
- for object types we should :shrug:
"""


dr = driver.Builder().with_modules(pcs).with_config({"config_key": "config_value"}).build()
logger = logging.getLogger(__name__)

import time


def _compute_stats(df: pd.DataFrame) -> Dict[str, Dict[str, Any]]:
Expand Down Expand Up @@ -44,11 +49,16 @@ def _compute_stats(df: pd.DataFrame) -> Dict[str, Dict[str, Any]]:
def execute_col(
target_output: str, col: pd.Series, name: Union[str, int], position: int
) -> Dict[str, Any]:
"""Get stats on a column."""
"""Get stats on a column.
TODO: profile this and see where we can speed things up.
"""
try:
t1 = time.time()
res = dr.execute(
[target_output], inputs={"col": col, "name": name, "position": position}
)

logger.info(f"Computed stats for column {name}, time taken was {time.time() - t1}")
res = res[target_output].to_dict()
except Exception:
# minimum that we want -- ideally we have hamilton handle errors and do best effort.
Expand Down
10 changes: 8 additions & 2 deletions ui/sdk/src/hamilton_sdk/tracking/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,16 @@ def compute_stats_tuple(result: tuple, node_name: str, node_tags: dict) -> Stats
if isinstance(result[1], dict):
try:
# double check that it's JSON serializable
raw_data = json.dumps(result[1])
raw_data = (
json.dumps(result[1])
if isinstance(result[1], dict)
else json.dumps(result[1].to_dict())
)
_metadata = json.loads(raw_data)
except Exception:
_metadata = str(result[1])
_metadata = (
str(result[1]) if isinstance(result[1], dict) else str(result[1].to_dict())
)
if len(_metadata) > 1000:
_metadata = _metadata[:1000] + "..."
else:
Expand Down
Loading