Skip to content

Commit

Permalink
Adds function prefix to dataframe column concatenation (#322)
Browse files Browse the repository at this point in the history
That way we know how to map an output back to the function.
We decided this was a better approach for now. We don't want to
lose the linkage between output and function, and for dataframe
flattening, this is the best approach.

It does mean that the columns cease being valid python identifiers,
but that's fine because we're in build_result. If people complain
we can figure out a way to change it.

Upside is that we have less code to check/manage with this approach.
  • Loading branch information
skrawcz authored Feb 20, 2023
1 parent 915efdd commit 01087f0
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 44 deletions.
24 changes: 13 additions & 11 deletions hamilton/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,23 +189,25 @@ def build_dataframe_with_dataframes(outputs: Dict[str, Any]) -> pd.DataFrame:
:param outputs: The outputs to build the dataframe from.
:return: A dataframe with the outputs.
"""

def get_output_name(output_name: str, column_name: str) -> str:
"""Add function prefix to columns.
Note this means that they stop being valid python identifiers due to the `.` in the string.
"""
return f"{output_name}.{column_name}"

flattened_outputs = {}
for name, output in outputs.items():
if isinstance(output, pd.DataFrame):
df_columns = list(output.columns)
column_intersection = [
column for column in df_columns if column in flattened_outputs
]
if column_intersection:
raise ValueError(
f"Dataframe {name} contains columns {column_intersection} that already exist in the output. "
f"Please rename the columns in {name} to avoid this error."
)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
f"Unpacking dataframe {name} into dict of series with columns {df_columns}."
f"Unpacking dataframe {name} into dict of series with columns {list(output.columns)}."
)
df_dict = output.to_dict(orient="series")

df_dict = {
get_output_name(name, col_name): col_value
for col_name, col_value in output.to_dict(orient="series").items()
}
flattened_outputs.update(df_dict)
elif isinstance(output, pd.Series):
if name in flattened_outputs:
Expand Down
54 changes: 21 additions & 33 deletions tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,19 @@ def __eq__(self, other: typing.Any) -> bool:
"a": pd.DataFrame({"a": [1, 2, 3], "b": [11, 12, 13]}),
"b": pd.DataFrame({"c": [1, 3, 5], "d": [14, 15, 16]}),
},
pd.DataFrame({"a": [1, 2, 3], "b": [11, 12, 13], "c": [1, 3, 5], "d": [14, 15, 16]}),
pd.DataFrame(
{"a.a": [1, 2, 3], "a.b": [11, 12, 13], "b.c": [1, 3, 5], "b.d": [14, 15, 16]}
),
),
(
{
"a": pd.Series([1, 2, 3]),
"b": pd.Series([11, 12, 13]),
"c": pd.DataFrame({"d": [0, 0, 0]}),
},
pd.DataFrame({"a": pd.Series([1, 2, 3]), "b": pd.Series([11, 12, 13]), "d": [0, 0, 0]}),
pd.DataFrame(
{"a": pd.Series([1, 2, 3]), "b": pd.Series([11, 12, 13]), "c.d": [0, 0, 0]}
),
),
],
ids=[
Expand Down Expand Up @@ -284,7 +288,13 @@ def test_PandasDataFrameResult_build_result_errors(outputs):
"d": [8, 9, 10],
},
pd.DataFrame(
{"a": [1, 2, 3], "z": [0, 0, 0], "b": [4, 5, 6], "c": [7, 7, 7], "d": [8, 9, 10]}
{
"a.a": [1, 2, 3],
"a.z": [0, 0, 0],
"b": [4, 5, 6],
"c": [7, 7, 7],
"d": [8, 9, 10],
}
),
),
(
Expand All @@ -294,10 +304,10 @@ def test_PandasDataFrameResult_build_result_errors(outputs):
},
pd.DataFrame(
{
"a": [1, 2, 3, None, None, None],
"b": [11, 12, 13, None, None, None],
"c": [None, None, None, 1, 3, 5],
"d": [None, None, None, 14, 15, 16],
"a.a": [1, 2, 3, None, None, None],
"a.b": [11, 12, 13, None, None, None],
"b.c": [None, None, None, 1, 3, 5],
"b.d": [None, None, None, 14, 15, 16],
},
index=[0, 1, 2, 3, 4, 5],
),
Expand All @@ -312,11 +322,11 @@ def test_PandasDataFrameResult_build_result_errors(outputs):
pd.DataFrame(
{
"a": [None, 1, 2, 3],
"d": [0, 0, 0, None],
"e": [1, 1, 1, None],
"c.d": [0, 0, 0, None],
"c.e": [1, 1, 1, None],
"b": [11, 12, 13, None],
"g": [None, 2, 2, 2],
"h": [None, 3, 3, 3],
"f.g": [None, 2, 2, 2],
"f.h": [None, 3, 3, 3],
},
index=[0, 1, 2, 3],
),
Expand All @@ -335,28 +345,6 @@ def test_PandasDataFrameResult_build_dataframe_with_dataframes(outputs, expected
pd.testing.assert_frame_equal(actual, expected_result)


@pytest.mark.parametrize(
"outputs",
[
{"a": pd.DataFrame({"a": [1, 2, 3], "b": [11, 12, 13]}), "b": pd.Series([4, 5, 6])},
{"b": pd.Series([4, 5, 6]), "a": pd.DataFrame({"a": [1, 2, 3], "b": [11, 12, 13]})},
{"a": pd.DataFrame({"a": [1, 2, 3], "b": [11, 12, 13]}), "b": 7},
{"b": 7, "a": pd.DataFrame({"a": [1, 2, 3], "b": [11, 12, 13]})},
],
ids=[
"test-df-series-duplicate",
"test-series-df-duplicate",
"test-df-scalar-duplicate",
"test-scalar-df-duplicate",
],
)
def test_PandasDataFrameResult_build_dataframe_with_dataframes_error(outputs):
"""Tests build_dataframe_with_dataframes works as expected"""
pdfr = base.PandasDataFrameResult()
with pytest.raises(ValueError):
pdfr.build_dataframe_with_dataframes(outputs)


@pytest.mark.parametrize(
"outputs,expected_result",
[
Expand Down

0 comments on commit 01087f0

Please sign in to comment.