Skip to content

Commit

Permalink
Add insufficient data error catching in dataset
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanjdillon committed Dec 12, 2019
1 parent 004f5ed commit 98b67d9
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 23 deletions.
2 changes: 2 additions & 0 deletions gordo_components/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from gordo_components.data_provider.providers import NoSuitableDataProviderError
from gordo_components.dataset.sensor_tag import SensorTagNormalizationError
from gordo_components.dataset.datasets import InsufficientDataError
from gordo_components.builder.mlflow_utils import MlflowLoggingError
from gunicorn.glogging import Logger

Expand All @@ -33,6 +34,7 @@
FileNotFoundError: 30,
SensorTagNormalizationError: 60,
NoSuitableDataProviderError: 70,
InsufficientDataError: 71,
MlflowLoggingError: 80,
}

Expand Down
33 changes: 26 additions & 7 deletions gordo_components/dataset/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
logger = logging.getLogger(__name__)


class InsufficientDataError(ValueError):
pass


def compat(init):
"""
__init__ decorator for compatibility where the Gordo config file's ``dataset`` keys have
Expand Down Expand Up @@ -61,6 +65,7 @@ def __init__(
row_filter_buffer_size: int = 0,
asset: Optional[str] = None,
default_asset: Optional[str] = None,
n_samples_threshold: int = 0,
**_kwargs,
):
"""
Expand Down Expand Up @@ -111,6 +116,8 @@ def __init__(
default_asset: Optional[str]
Asset which will be used if `asset` is not provided and the tag is not
resolvable to a specific asset.
n_samples_threshold: int = 0
The threshold at which the generated DataFrame is considered to have too few rows of data.
_kwargs
"""
self.from_ts = self._validate_dt(from_ts)
Expand All @@ -131,6 +138,7 @@ def __init__(
self.aggregation_methods = aggregation_methods
self.row_filter_buffer_size = row_filter_buffer_size
self.asset = asset
self.n_samples_threshold = n_samples_threshold

if not self.from_ts.tzinfo or not self.to_ts.tzinfo:
raise ValueError(
Expand All @@ -157,16 +165,27 @@ def get_data(self) -> Tuple[pd.DataFrame, Optional[pd.DataFrame]]:

# Resample if we have a resolution set, otherwise simply join the series.
if self.resolution:
data = self.join_timeseries(
series_iter,
self.from_ts,
self.to_ts,
self.resolution,
aggregation_methods=self.aggregation_methods,
)
try:
data = self.join_timeseries(
series_iter,
self.from_ts,
self.to_ts,
self.resolution,
aggregation_methods=self.aggregation_methods,
)
except IndexError:
raise InsufficientDataError(
f"One or more series in the tag list is missing data for the specified period."
)
else:
data = pd.concat(series_iter, axis=1, join="inner")

if len(data) <= self.n_samples_threshold:
raise InsufficientDataError(
f"The length of the generated DataFrame ({len(data)}) does not exceed the "
f"specified required threshold for number of rows ({self.n_samples_threshold})."
)

if self.row_filter:
data = pandas_filter_rows(
data, self.row_filter, buffer_size=self.row_filter_buffer_size
Expand Down
74 changes: 58 additions & 16 deletions tests/gordo_components/dataset/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
from datetime import datetime

from gordo_components.data_provider.base import GordoBaseDataProvider
from gordo_components.dataset.datasets import RandomDataset, TimeSeriesDataset
from gordo_components.dataset.datasets import (
RandomDataset,
TimeSeriesDataset,
InsufficientDataError,
)
from gordo_components.dataset.base import GordoBaseDataset
from gordo_components.dataset.sensor_tag import SensorTag

Expand Down Expand Up @@ -101,6 +105,41 @@ def test_join_timeseries():
assert all_in_frame.index[-1] <= pd.Timestamp(resampling_end)


@pytest.mark.parametrize(
"value,n_rows,resolution,row_threshold,error",
[
# Frequency passed as zero, resulting in an ZeroDivisionError during aggregation
(None, None, "0T", 0, ZeroDivisionError),
# Empty series results in an InsufficientDataError
(None, 0, "12T", 0, InsufficientDataError),
# When all rows are NaNs and dropped result in InsufficientDataError
(np.NaN, None, "12T", 0, InsufficientDataError),
# Rows less then or equal to `row_threshold` result in InsufficientDataError
(None, 6, "12T", 6, InsufficientDataError),
],
)
def test_join_timeseries_empty_series(value, n_rows, resolution, row_threshold, error):
"""
Test that empty data scenarios raise appropriate errors
"""

from_ts = dateutil.parser.isoparse("2018-01-01 00:00:00+00:00")
to_ts = dateutil.parser.isoparse("2018-01-05 00:00:00+00:00")
tag_list = [SensorTag(name=n, asset=None) for n in ["Tag 1", "Tag 2", "Tag 3"]]

kwargs = {
"from_ts": from_ts,
"to_ts": to_ts,
"tag_list": tag_list,
"resolution": resolution,
"row_threshold": row_threshold,
"data_provider": MockDataProvider(value=np.NaN, n_rows=n_rows),
}

with pytest.raises(error):
TimeSeriesDataset(**kwargs).get_data()


def test_join_timeseries_nonutcstart():
timeseries_list, latest_start, earliest_end = create_timeseries_list()
frequency = "7T"
Expand Down Expand Up @@ -143,7 +182,7 @@ def test_join_timeseries_with_gaps():
def test_row_filter():
"""Tests that row_filter filters away rows"""
kwargs = dict(
data_provider=MockDataSource(),
data_provider=MockDataProvider(),
tag_list=[
SensorTag("Tag 1", None),
SensorTag("Tag 2", None),
Expand All @@ -168,7 +207,7 @@ def test_aggregation_methods():
"""Tests that it works to set aggregation method(s)"""

kwargs = dict(
data_provider=MockDataSource(),
data_provider=MockDataProvider(),
tag_list=[
SensorTag("Tag 1", None),
SensorTag("Tag 2", None),
Expand Down Expand Up @@ -202,7 +241,7 @@ def test_aggregation_methods():

def test_time_series_no_resolution():
kwargs = dict(
data_provider=MockDataSource(),
data_provider=MockDataProvider(),
tag_list=[
SensorTag("Tag 1", None),
SensorTag("Tag 2", None),
Expand All @@ -211,6 +250,7 @@ def test_time_series_no_resolution():
from_ts=dateutil.parser.isoparse("2017-12-25 06:00:00Z"),
to_ts=dateutil.parser.isoparse("2017-12-29 06:00:00Z"),
)

no_resolution, _ = TimeSeriesDataset(resolution=None, **kwargs).get_data()
wi_resolution, _ = TimeSeriesDataset(resolution="10T", **kwargs).get_data()
assert len(no_resolution) > len(wi_resolution)
Expand Down Expand Up @@ -240,7 +280,7 @@ def test_timeseries_target_tags(tag_list, target_tag_list):
end,
tag_list=tag_list,
target_tag_list=target_tag_list,
data_provider=MockDataSource(),
data_provider=MockDataProvider(),
)
X, y = tsd.get_data()

Expand All @@ -261,9 +301,11 @@ def test_timeseries_target_tags(tag_list, target_tag_list):
assert [tag.name for tag in tag_list] == X.columns.tolist()


class MockDataSource(GordoBaseDataProvider):
def __init__(self, **kwargs):
pass
class MockDataProvider(GordoBaseDataProvider):
def __init__(self, value=None, n_rows=None, **kwargs):
"""With value argument for generating different types of data series (e.g. NaN)"""
self.value = value
self.n_rows = n_rows

def can_handle_tag(self, tag):
return True
Expand All @@ -275,13 +317,13 @@ def load_series(
tag_list: List[SensorTag],
dry_run: Optional[bool] = False,
) -> Iterable[pd.Series]:
days = pd.date_range(from_ts, to_ts, freq="s")
tag_list_strings = sorted([tag.name for tag in tag_list])
for i, name in enumerate(tag_list_strings):
series = pd.Series(
index=days, data=list(range(i, len(days) + i)), name=name
)
yield series

index = pd.date_range(from_ts, to_ts, freq="s")
for i, name in enumerate(sorted([tag.name for tag in tag_list])):
# If value not passed, data for each tag are staggered integer ranges
data = [self.value if self.value else i for i in range(i, len(index) + i)]
series = pd.Series(index=index, data=data, name=name)
yield series[: self.n_rows] if self.n_rows else series


def test_timeseries_dataset_compat():
Expand All @@ -291,7 +333,7 @@ def test_timeseries_dataset_compat():
:func:`gordo_components.dataset.datasets.compat` should adjust for these differences.
"""
dataset = TimeSeriesDataset(
data_provider=MockDataSource(),
data_provider=MockDataProvider(),
train_start_date="2017-12-25 06:00:00Z",
train_end_date="2017-12-29 06:00:00Z",
tags=[SensorTag("Tag 1", None)],
Expand Down

0 comments on commit 98b67d9

Please sign in to comment.