Skip to content

Commit

Permalink
Support single level dataframes in serialization funcs
Browse files Browse the repository at this point in the history
  • Loading branch information
milesgranger committed Aug 31, 2019
1 parent dd390e1 commit 0b2b84d
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 84 deletions.
12 changes: 4 additions & 8 deletions gordo_components/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,8 @@ async def _process_post_prediction_task(
"""

json = {
"X": server_utils.multi_lvl_column_dataframe_to_dict(X.iloc[chunk]),
"y": server_utils.multi_lvl_column_dataframe_to_dict(y.iloc[chunk])
"X": server_utils.dataframe_to_dict(X.iloc[chunk]),
"y": server_utils.dataframe_to_dict(y.iloc[chunk])
if y is not None
else None,
}
Expand Down Expand Up @@ -448,9 +448,7 @@ async def _process_post_prediction_task(
# Process response and return if no exception
else:

predictions = server_utils.multi_lvl_column_dataframe_from_dict(
resp["data"]
)
predictions = server_utils.dataframe_from_dict(resp["data"])

# Forward predictions to any other consumer if registered.
if self.prediction_forwarder is not None:
Expand Down Expand Up @@ -550,9 +548,7 @@ async def _process_get_prediction_task(
)

logger.info(f"Processing {start} -> {end}")
predictions = server_utils.multi_lvl_column_dataframe_from_dict(
response["data"]
)
predictions = server_utils.dataframe_from_dict(response["data"])

if self.prediction_forwarder is not None:
await self.prediction_forwarder(
Expand Down
113 changes: 61 additions & 52 deletions gordo_components/server/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@
logger = logging.getLogger(__name__)


def multi_lvl_column_dataframe_to_dict(df: pd.DataFrame) -> dict:
def dataframe_to_dict(df: pd.DataFrame) -> dict:
"""
Convert a dataframe which has a :class:`pandas.MultiIndex` as columns into a dict
Convert a dataframe can have a :class:`pandas.MultiIndex` as columns into a dict
where each key is the top level column name, and the value is the array
of columns under the top level name.
of columns under the top level name. If it's a simple dataframe, :meth:`pandas.core.DataFrame.to_dict`
will be used.
This allows :func:`json.dumps` to be performed, where :meth:`pandas.DataFrame.to_dict()`
would convert such a multi-level column dataframe into keys of ``tuple`` objects, which are
Expand Down Expand Up @@ -59,7 +60,7 @@ def multi_lvl_column_dataframe_to_dict(df: pd.DataFrame) -> dict:
sub-feature-0 sub-feature-1 sub-feature-0 sub-feature-1
2019-01-01 0 1 2 3
2019-02-01 4 5 6 7
>>> serialized = multi_lvl_column_dataframe_to_dict(df)
>>> serialized = dataframe_to_dict(df)
>>> pprint.pprint(serialized)
{'feature0': {'sub-feature-0': {'2019-01-01': 0, '2019-02-01': 4},
'sub-feature-1': {'2019-01-01': 1, '2019-02-01': 5}},
Expand All @@ -71,15 +72,18 @@ def multi_lvl_column_dataframe_to_dict(df: pd.DataFrame) -> dict:
data = df.copy()
if isinstance(data.index, pd.DatetimeIndex):
data.index = data.index.astype(str)
return {
col: data[col].to_dict()
if isinstance(data[col], pd.DataFrame)
else pd.DataFrame(data[col]).to_dict()
for col in data.columns.get_level_values(0)
}
if isinstance(df.columns, pd.MultiIndex):
return {
col: data[col].to_dict()
if isinstance(data[col], pd.DataFrame)
else pd.DataFrame(data[col]).to_dict()
for col in data.columns.get_level_values(0)
}
else:
return data.to_dict()


def multi_lvl_column_dataframe_from_dict(data: dict) -> pd.DataFrame:
def dataframe_from_dict(data: dict) -> pd.DataFrame:
"""
The inverse procedure done by :func:`.multi_lvl_column_dataframe_from_dict`
Reconstructed a MultiIndex column dataframe from a previously serialized one.
Expand All @@ -105,20 +109,27 @@ def multi_lvl_column_dataframe_from_dict(data: dict) -> pd.DataFrame:
... 'feature1': {'sub-feature-0': {'2019-01-01': 2, '2019-02-01': 6},
... 'sub-feature-1': {'2019-01-01': 3, '2019-02-01': 7}}
... }
>>> multi_lvl_column_dataframe_from_dict(serialized) # doctest: +NORMALIZE_WHITESPACE
>>> dataframe_from_dict(serialized) # doctest: +NORMALIZE_WHITESPACE
feature0 feature1
sub-feature-0 sub-feature-1 sub-feature-0 sub-feature-1
2019-01-01 0 1 2 3
2019-02-01 4 5 6 7
"""

keys = data.keys()
df: pd.DataFrame = pd.concat(
(pd.DataFrame.from_dict(data[key]) for key in keys), axis=1, keys=keys
)
if isinstance(data, dict) and any(isinstance(val, dict) for val in data.values()):
try:
keys = data.keys()
df: pd.DataFrame = pd.concat(
(pd.DataFrame.from_dict(data[key]) for key in keys), axis=1, keys=keys
)
except (ValueError, AttributeError):
df = pd.DataFrame.from_dict(data)
else:
df = pd.DataFrame.from_dict(data)

try:
df.index = pd.to_datetime(df.index)
except ValueError:
df.index = df.index.map(dateutil.parser.isoparse) # type: ignore
except (TypeError, ValueError):
logger.debug("Could not parse index to pandas.DatetimeIndex")
pass # It wasn't a datetime index after all, no worries.
return df
Expand All @@ -134,20 +145,19 @@ def parse_iso_datetime(datetime_str: str) -> datetime:
return parsed_date


def _df_or_response_from_dict(
data: Union[dict, list], expected_columns: List[str]
def _verify_dataframe(
df: pd.DataFrame, expected_columns: List[str]
) -> Union[Response, pd.DataFrame]:
"""
Convert a dict or list of dicts into a :class:`pandas.core.DataFrame`.
Setting the column names to ``expected_columns`` if not already labeled and
the length of the columns match the length of the expected columns.
Verify the dataframe, setting the column names to ``expected_columns``
if not already labeled and the length of the columns match the length of the expected columns.
If it fails, it will return an instance of :class:`flask.wrappers.Response`
Parameters
----------
data: Union[dict, list]
Data to convert into a dataframe with :meth:`pandas.core.DataFrame.from_dict`
df: pandas.core.DataFrame
DataFrame to verify.
expected_columns: List[str]
List of expected column names to give if the dataframe does not consist of them
but the number of columns matches ``len(expected_columns)``
Expand All @@ -156,29 +166,32 @@ def _df_or_response_from_dict(
-------
Union[flask.wrappers.Response, pandas.core.DataFrame]
"""
df = pd.DataFrame.from_dict(data)

if not all(col in df.columns for col in expected_columns):
if not isinstance(df.columns, pd.MultiIndex):
if not all(col in df.columns for col in expected_columns):

# If the length doesn't mach, then we can't reliably determine what data we have hre.
if len(df.columns) != len(expected_columns):
msg = dict(
message=f"Unexpected features: "
f"was expecting {expected_columns} length of {len(expected_columns)}, "
f"but got {df.columns} length of {len(df.columns)}"
)
return make_response((jsonify(msg), 400))

# If the length doesn't mach, then we can't reliably determine what data we have hre.
if len(df.columns) != len(expected_columns):
msg = dict(
message=f"Unexpected features: "
f"was expecting {expected_columns} length of {len(expected_columns)}, "
f"but got {df.columns} length of {len(df.columns)}"
)
return make_response((jsonify(msg), 400))
# Otherwise we were send a list/ndarray data format which we assume the client has
# ordered correctly to the order of the expected_columns.
else:
df.columns = expected_columns

# Otherwise we were send a list/ndarray data format which we assume the client has
# ordered correctly to the order of the expected_columns.
# All columns exist in the dataframe, select them which thus ensures order and removes extra columns
else:
df.columns = expected_columns

# All columns exist in the dataframe, select them which thus ensures order and removes extra columns
df = df[expected_columns]
return df
else:
df = df[expected_columns]

return df
msg = {
"message": f"Server does not support multi-level dataframes at this time: {df.columns.tolist()}"
}
return make_response((jsonify(msg), 400))


def extract_X_y(method):
Expand Down Expand Up @@ -216,17 +229,13 @@ def wrapper_method(self, *args, **kwargs):
return make_response((jsonify(message), 400))

# Convert X and (maybe) y into dataframes.
try:
X = multi_lvl_column_dataframe_from_dict(X)
except (AttributeError, ValueError):
X = _df_or_response_from_dict(X, [t.name for t in self.tags])
X = dataframe_from_dict(X)
X = _verify_dataframe(X, [t.name for t in self.tags])

# Y is ok to be None for BaseView, view(s) like Anomaly might require it.
if y is not None:
try:
y = multi_lvl_column_dataframe_from_dict(y)
except (AttributeError, ValueError):
y = _df_or_response_from_dict(y, [t.name for t in self.target_tags])
y = dataframe_from_dict(y)
y = _verify_dataframe(y, [t.name for t in self.target_tags])

# If either X or y came back as a Response type, there was an error
for data_or_resp in [X, y]:
Expand Down
2 changes: 1 addition & 1 deletion gordo_components/server/views/anomaly.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def _create_anomaly_response(self, start_time: float = None):
return make_response(jsonify(msg), 422) # 422 Unprocessable Entity

context: typing.Dict[typing.Any, typing.Any] = dict()
context["data"] = utils.multi_lvl_column_dataframe_to_dict(anomaly_df)
context["data"] = utils.dataframe_to_dict(anomaly_df)
context["time-seconds"] = f"{timeit.default_timer() - start_time:.4f}"
return make_response(jsonify(context), context.pop("status-code", 200))

Expand Down
2 changes: 1 addition & 1 deletion gordo_components/server/views/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def _process_request(self):
target_tag_list=self.target_tags,
index=X.index,
)
context["data"] = server_utils.multi_lvl_column_dataframe_to_dict(data)
context["data"] = server_utils.dataframe_to_dict(data)
return make_response((jsonify(context), context.pop("status-code", 200)))


Expand Down
4 changes: 2 additions & 2 deletions tests/gordo_components/client/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,10 +432,10 @@ def test_ml_server_dataframe_to_dict_and_back(tags: typing.List[str]):
df = model_utils.make_base_dataframe(tags, original_input, model_output)

# Server then converts this into a dict which maps top level names to lists
serialized = server_utils.multi_lvl_column_dataframe_to_dict(df)
serialized = server_utils.dataframe_to_dict(df)

# Client reproduces this dataframe
df_clone = server_utils.multi_lvl_column_dataframe_from_dict(serialized)
df_clone = server_utils.dataframe_from_dict(serialized)

# each subset of column under the top level names should be equal
top_lvl_names = df.columns.get_level_values(0)
Expand Down
4 changes: 2 additions & 2 deletions tests/gordo_components/server/test_anomaly_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def test_anomaly_prediction_endpoint(
assert "data" in resp.json

# Load data into dataframe
data = server_utils.multi_lvl_column_dataframe_from_dict(resp.json["data"])
data = server_utils.dataframe_from_dict(resp.json["data"])

# Only different between POST and GET is POST will return None for
# start and end dates, because the server can't know what those are
Expand Down Expand Up @@ -94,7 +94,7 @@ def test_overlapping_time_buckets(influxdb, gordo_ml_server_client):
json={"start": "2016-01-01T00:11:00+00:00", "end": "2016-01-01T00:21:00+00:00"},
)
assert resp.status_code == 200
data = server_utils.multi_lvl_column_dataframe_from_dict(resp.json["data"])
data = server_utils.dataframe_from_dict(resp.json["data"])

assert len(data) == 1, f"Expected one prediction, got: {resp.json}"
assert data["start"].iloc[0].tolist() == ["2016-01-01T00:10:00+00:00"]
Expand Down
2 changes: 1 addition & 1 deletion tests/gordo_components/server/test_base_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def test_prediction_endpoint_post_ok(sensors, gordo_ml_server_client, data_to_po

assert resp.status_code == 200

data = server_utils.multi_lvl_column_dataframe_from_dict(resp.json["data"])
data = server_utils.dataframe_from_dict(resp.json["data"])

# Expected column names
assert all(key in data for key in ("model-output", "model-input"))
Expand Down
68 changes: 51 additions & 17 deletions tests/gordo_components/server/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,62 @@
# -*- coding: utf-8 -*-

import pytest
import pandas as pd
import numpy as np

from gordo_components.server import utils as server_utils


def test_multi_lvl_column_dataframe_from_to_dict():

columns = pd.MultiIndex.from_product((("feature1", "feature2"), ("col1", "col2")))
df = pd.DataFrame(
np.random.random((10, 4)),
columns=columns,
index=pd.date_range("2016-01-01", "2016-02-01", periods=10),
)

assert isinstance(df.index, pd.DatetimeIndex)

cloned = server_utils.multi_lvl_column_dataframe_from_dict(
server_utils.multi_lvl_column_dataframe_to_dict(df)
)

# Ensure the function hasn't mutated the index.
assert isinstance(df.index, pd.DatetimeIndex)
@pytest.mark.parametrize(
"df",
[
# Multi-column
pd.DataFrame(
np.random.random((10, 4)),
columns=pd.MultiIndex.from_product(
(("feature1", "feature2"), ("col1", "col2"))
),
index=pd.date_range("2016-01-01", "2016-02-01", periods=10),
),
# Normal dataframe, no date index
pd.DataFrame(
np.random.random((10, 4)), columns=["col1", "col2", "col3", "col4"]
),
],
)
def test_dataframe_from_to_dict(df):
"""
Test (de)serializations back and forth between dataframe -> dict -> dataframe
"""
index_was_datetimes: bool = isinstance(df.index, pd.DatetimeIndex)

cloned = server_utils.dataframe_from_dict(server_utils.dataframe_to_dict(df))

if index_was_datetimes:
# Ensure the function hasn't mutated the index.
assert isinstance(df.index, pd.DatetimeIndex)

assert np.allclose(df.values, cloned.values)
assert df.columns.tolist() == cloned.columns.tolist()
assert df.index.tolist() == cloned.index.tolist()


@pytest.mark.parametrize(
"expect_multi_lvl, data",
[
(False, {"col1": [0, 1, 2, 3], "col2": [0, 1, 2, 3]}),
(True, {("ft1", "col1"): [0, 1, 2, 3], ("ft1", "col2"): [0, 1, 2, 3]}),
(True, {"ft1": {"col1": [0, 1, 2]}, "ft2": {"col1": [0, 1, 2]}}),
(False, [[0, 1, 2], [0, 1, 2]]),
],
)
def test_dataframe_to_from_dict(expect_multi_lvl: bool, data: dict):
"""
Creating dataframes from various raw data structures should have determined behavior
such as not creating MultiIndex columns with a dict of simple key to array mappings.
"""
df = server_utils.dataframe_from_dict(data)
if expect_multi_lvl:
assert isinstance(df.columns, pd.MultiIndex)
else:
assert not isinstance(df.columns, pd.MultiIndex)

0 comments on commit 0b2b84d

Please sign in to comment.