Skip to content

Commit

Permalink
Add resampling_endpoint up to which data will be forward filled
Browse files Browse the repository at this point in the history
  • Loading branch information
Natalie committed May 27, 2019
1 parent b917a4b commit 11dcf20
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 13 deletions.
3 changes: 1 addition & 2 deletions gordo_components/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,11 +607,10 @@ async def _raw_data(
pandas.core.DataFrame
Dataframe of required tags and index reflecting the datetime point
"""
freq = pd.tseries.frequencies.to_offset(endpoint.resolution)

dataset = TimeSeriesDataset( # type: ignore
data_provider=self.data_provider,
from_ts=start - freq.delta,
from_ts=start,
to_ts=end,
resolution=endpoint.resolution,
tag_list=endpoint.tag_list,
Expand Down
30 changes: 27 additions & 3 deletions gordo_components/dataset/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def get_metadata(self):
def join_timeseries(
series_iterable: Iterable[pd.Series],
resampling_startpoint: datetime,
resampling_endpoint: datetime,
resolution: str,
) -> pd.DataFrame:
"""
Expand All @@ -45,6 +46,11 @@ def join_timeseries(
that this resampling_startpoint datetime must be before or equal to the first
(earliest) datetime in the data to be resampled.
resampling_endpoint
The end point for resampling. This datetime must be equal to or after the last datetime in the
data to be resampled.
resolution
The bucket size for grouping all incoming time data (e.g. "10T")
Expand All @@ -61,6 +67,8 @@ def join_timeseries(
startpoint_sametz = resampling_startpoint.astimezone(
tz=series.index[0].tzinfo
)
endpoint_sametz = resampling_endpoint.astimezone(tz=series.index[0].tzinfo)

if series.index[0] > startpoint_sametz:
# Insert a NaN at the startpoint, to make sure that all resampled
# indexes are the same. This approach will "pad" most frames with
Expand All @@ -75,19 +83,35 @@ def join_timeseries(

elif series.index[0] < resampling_startpoint:
msg = (
f"Error - for {series.columns[0]}, first timestamp "
f"{series.index[0]} is before resampling start point "
f"Error - for {series.name}, first timestamp "
f"{series.index[0]} is before the resampling start point "
f"{startpoint_sametz}"
)
logging.error(msg)
raise RuntimeError(msg)

if series.index[-1] < endpoint_sametz:
endpoint = pd.Series(
[np.NaN], index=[endpoint_sametz], name=series.name
)
series = series.append(endpoint)
logging.debug(
f"Appending NaN to {series.name} " f"at time {endpoint_sametz}"
)
elif series.index[-1] > endpoint_sametz:
msg = (
f"Error - for {series.name}, last timestamp "
f"{series.index[-1]} is later than the resampling end point "
f"{endpoint_sametz}"
)
logging.error(msg)
raise RuntimeError(msg)

logging.debug("Head (3) and tail(3) of dataframe to be resampled:")
logging.debug(series.head(3))
logging.debug(series.tail(3))

resampled = series.resample(resolution).mean()

filled = resampled.fillna(method="ffill")
resampled_series.append(filled)

Expand Down
3 changes: 2 additions & 1 deletion gordo_components/dataset/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@ def get_data(self) -> Tuple[pd.DataFrame, None]:
dataframes = self.data_provider.load_series(
from_ts=self.from_ts, to_ts=self.to_ts, tag_list=self.tag_list
)
X = self.join_timeseries(dataframes, self.from_ts, self.resolution)
X = self.join_timeseries(dataframes, self.from_ts, self.to_ts, self.resolution)
y = None
if self.row_filter:
X = pandas_filter_rows(X, self.row_filter)
logger.info(f"First five rows of the filtered dataset are {X.head()}")
return X, y

def get_metadata(self):
Expand Down
20 changes: 13 additions & 7 deletions tests/gordo_components/dataset/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,25 +53,27 @@ def test_join_timeseries(self):
frequency = "7T"
timedelta = pd.Timedelta("7 minutes")
resampling_start = dateutil.parser.isoparse("2017-12-25 06:00:00Z")
resampling_end = dateutil.parser.isoparse("2018-01-15 08:00:00Z")
all_in_frame = GordoBaseDataset.join_timeseries(
timeseries_list, resampling_start, frequency
timeseries_list, resampling_start, resampling_end, frequency
)

# Check that first resulting resampled, joined row is within "frequency" from
# the real first data point
self.assertGreaterEqual(
all_in_frame.index[0], pd.Timestamp(latest_start) - timedelta
)
self.assertLessEqual(all_in_frame.index[-1], pd.Timestamp(earliest_end))
self.assertLessEqual(all_in_frame.index[-1], pd.Timestamp(resampling_end))

def test_join_timeseries_nonutcstart(self):
timeseries_list, latest_start, earliest_end = self.create_timeseries_list()
frequency = "7T"
resampling_start = dateutil.parser.isoparse("2017-12-25 06:00:00+07:00")
resampling_end = dateutil.parser.isoparse("2018-01-12 13:07:00+07:00")
all_in_frame = GordoBaseDataset.join_timeseries(
timeseries_list, resampling_start, frequency
timeseries_list, resampling_start, resampling_end, frequency
)
self.assertEqual(len(all_in_frame), 413)
self.assertEqual(len(all_in_frame), 1854)

def test_join_timeseries_with_gaps(self):
timeseries_list, latest_start, earliest_end = self.create_timeseries_list()
Expand All @@ -88,14 +90,18 @@ def test_join_timeseries_with_gaps(self):

frequency = "10T"
resampling_start = dateutil.parser.isoparse("2017-12-25 06:00:00Z")
resampling_end = dateutil.parser.isoparse("2018-01-12 07:00:00Z")

all_in_frame = GordoBaseDataset.join_timeseries(
timeseries_with_holes, resampling_start, frequency
timeseries_with_holes, resampling_start, resampling_end, frequency
)
self.assertEqual(all_in_frame.index[0], pd.Timestamp(latest_start))
self.assertEqual(all_in_frame.index[-1], pd.Timestamp(earliest_end))
self.assertEqual(all_in_frame.index[-1], pd.Timestamp(resampling_end))

expected_index = pd.date_range(
start=latest_start, end=earliest_end, freq=frequency
start=dateutil.parser.isoparse(latest_start),
end=resampling_end,
freq=frequency,
)
self.assertListEqual(list(all_in_frame.index), list(expected_index))

Expand Down

0 comments on commit 11dcf20

Please sign in to comment.