From 11dcf206a7298679bb170d3bb5affbbc4c683455 Mon Sep 17 00:00:00 2001 From: Natalie Date: Tue, 21 May 2019 08:15:11 +0200 Subject: [PATCH] Add resampling_endpoint up to which data will be forward filled --- gordo_components/client/client.py | 3 +- gordo_components/dataset/base.py | 30 +++++++++++++++++-- gordo_components/dataset/datasets.py | 3 +- .../gordo_components/dataset/test_dataset.py | 20 ++++++++----- 4 files changed, 43 insertions(+), 13 deletions(-) diff --git a/gordo_components/client/client.py b/gordo_components/client/client.py index 3116d5787..09130738d 100644 --- a/gordo_components/client/client.py +++ b/gordo_components/client/client.py @@ -607,11 +607,10 @@ async def _raw_data( pandas.core.DataFrame Dataframe of required tags and index reflecting the datetime point """ - freq = pd.tseries.frequencies.to_offset(endpoint.resolution) dataset = TimeSeriesDataset( # type: ignore data_provider=self.data_provider, - from_ts=start - freq.delta, + from_ts=start, to_ts=end, resolution=endpoint.resolution, tag_list=endpoint.tag_list, diff --git a/gordo_components/dataset/base.py b/gordo_components/dataset/base.py index 25997a43a..a0972a801 100644 --- a/gordo_components/dataset/base.py +++ b/gordo_components/dataset/base.py @@ -29,6 +29,7 @@ def get_metadata(self): def join_timeseries( series_iterable: Iterable[pd.Series], resampling_startpoint: datetime, + resampling_endpoint: datetime, resolution: str, ) -> pd.DataFrame: """ @@ -45,6 +46,11 @@ def join_timeseries( that this resampling_startpoint datetime must be before or equal to the first (earliest) datetime in the data to be resampled. + resampling_endpoint + The end point for resampling. This datetime must be equal to or after the last datetime in the + data to be resampled. + + resolution The bucket size for grouping all incoming time data (e.g. "10T") @@ -61,6 +67,8 @@ def join_timeseries( startpoint_sametz = resampling_startpoint.astimezone( tz=series.index[0].tzinfo ) + endpoint_sametz = resampling_endpoint.astimezone(tz=series.index[0].tzinfo) + if series.index[0] > startpoint_sametz: # Insert a NaN at the startpoint, to make sure that all resampled # indexes are the same. This approach will "pad" most frames with @@ -75,19 +83,35 @@ def join_timeseries( elif series.index[0] < resampling_startpoint: msg = ( - f"Error - for {series.columns[0]}, first timestamp " - f"{series.index[0]} is before resampling start point " + f"Error - for {series.name}, first timestamp " + f"{series.index[0]} is before the resampling start point " f"{startpoint_sametz}" ) logging.error(msg) raise RuntimeError(msg) + if series.index[-1] < endpoint_sametz: + endpoint = pd.Series( + [np.NaN], index=[endpoint_sametz], name=series.name + ) + series = series.append(endpoint) + logging.debug( + f"Appending NaN to {series.name} " f"at time {endpoint_sametz}" + ) + elif series.index[-1] > endpoint_sametz: + msg = ( + f"Error - for {series.name}, last timestamp " + f"{series.index[-1]} is later than the resampling end point " + f"{endpoint_sametz}" + ) + logging.error(msg) + raise RuntimeError(msg) + logging.debug("Head (3) and tail(3) of dataframe to be resampled:") logging.debug(series.head(3)) logging.debug(series.tail(3)) resampled = series.resample(resolution).mean() - filled = resampled.fillna(method="ffill") resampled_series.append(filled) diff --git a/gordo_components/dataset/datasets.py b/gordo_components/dataset/datasets.py index f5eccb9c9..f578ca285 100644 --- a/gordo_components/dataset/datasets.py +++ b/gordo_components/dataset/datasets.py @@ -68,10 +68,11 @@ def get_data(self) -> Tuple[pd.DataFrame, None]: dataframes = self.data_provider.load_series( from_ts=self.from_ts, to_ts=self.to_ts, tag_list=self.tag_list ) - X = self.join_timeseries(dataframes, self.from_ts, self.resolution) + X = self.join_timeseries(dataframes, self.from_ts, self.to_ts, self.resolution) y = None if self.row_filter: X = pandas_filter_rows(X, self.row_filter) + logger.info(f"First five rows of the filtered dataset are {X.head()}") return X, y def get_metadata(self): diff --git a/tests/gordo_components/dataset/test_dataset.py b/tests/gordo_components/dataset/test_dataset.py index a513912b1..578991b31 100644 --- a/tests/gordo_components/dataset/test_dataset.py +++ b/tests/gordo_components/dataset/test_dataset.py @@ -53,8 +53,9 @@ def test_join_timeseries(self): frequency = "7T" timedelta = pd.Timedelta("7 minutes") resampling_start = dateutil.parser.isoparse("2017-12-25 06:00:00Z") + resampling_end = dateutil.parser.isoparse("2018-01-15 08:00:00Z") all_in_frame = GordoBaseDataset.join_timeseries( - timeseries_list, resampling_start, frequency + timeseries_list, resampling_start, resampling_end, frequency ) # Check that first resulting resampled, joined row is within "frequency" from @@ -62,16 +63,17 @@ def test_join_timeseries(self): self.assertGreaterEqual( all_in_frame.index[0], pd.Timestamp(latest_start) - timedelta ) - self.assertLessEqual(all_in_frame.index[-1], pd.Timestamp(earliest_end)) + self.assertLessEqual(all_in_frame.index[-1], pd.Timestamp(resampling_end)) def test_join_timeseries_nonutcstart(self): timeseries_list, latest_start, earliest_end = self.create_timeseries_list() frequency = "7T" resampling_start = dateutil.parser.isoparse("2017-12-25 06:00:00+07:00") + resampling_end = dateutil.parser.isoparse("2018-01-12 13:07:00+07:00") all_in_frame = GordoBaseDataset.join_timeseries( - timeseries_list, resampling_start, frequency + timeseries_list, resampling_start, resampling_end, frequency ) - self.assertEqual(len(all_in_frame), 413) + self.assertEqual(len(all_in_frame), 1854) def test_join_timeseries_with_gaps(self): timeseries_list, latest_start, earliest_end = self.create_timeseries_list() @@ -88,14 +90,18 @@ def test_join_timeseries_with_gaps(self): frequency = "10T" resampling_start = dateutil.parser.isoparse("2017-12-25 06:00:00Z") + resampling_end = dateutil.parser.isoparse("2018-01-12 07:00:00Z") + all_in_frame = GordoBaseDataset.join_timeseries( - timeseries_with_holes, resampling_start, frequency + timeseries_with_holes, resampling_start, resampling_end, frequency ) self.assertEqual(all_in_frame.index[0], pd.Timestamp(latest_start)) - self.assertEqual(all_in_frame.index[-1], pd.Timestamp(earliest_end)) + self.assertEqual(all_in_frame.index[-1], pd.Timestamp(resampling_end)) expected_index = pd.date_range( - start=latest_start, end=earliest_end, freq=frequency + start=dateutil.parser.isoparse(latest_start), + end=resampling_end, + freq=frequency, ) self.assertListEqual(list(all_in_frame.index), list(expected_index))