Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - GordoTimeseriesGenerator #978

Open
wants to merge 62 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
ed8a9b7
GordoTimeseriesGenerator first version
koropets Apr 2, 2020
af53b76
First approach for GordoTimeseriesGenerator
koropets Apr 2, 2020
1daba7e
Always pass pd.DataFrame for create_keras_timeseriesgenerator()
koropets Apr 2, 2020
5de4d4f
Fix targets in GordoTimeseriesGenerator
koropets Apr 2, 2020
309b286
Gix github checks
koropets Apr 2, 2020
3a0cead
Do GordoTimeseriesGenerator in different way
koropets Apr 7, 2020
2cf6c33
Fix annotation checks on Github
koropets Apr 8, 2020
1d7ad98
logger.debug() in GordoTimeseriesGenerator
koropets Apr 9, 2020
0fcbe59
raise ValueError if the time series in wrong shape
koropets Apr 9, 2020
aab9f41
test_find_consecutive_chunks()
koropets Apr 9, 2020
46d73d5
test_create_generator_containers()
koropets Apr 9, 2020
d4d40ac
test_timeseries_generator()
koropets Apr 9, 2020
0375a56
black
koropets Apr 9, 2020
1a47bc5
Remove unused import
koropets Apr 9, 2020
323eda4
Consecutive index for RandomDataset
koropets Apr 10, 2020
a76152d
test_random_data_provider_consecutive()
koropets Apr 10, 2020
01c8a1a
black
koropets Apr 10, 2020
1f4fd14
timeseries_generators()
koropets Apr 10, 2020
e693521
Use names of pd.Timedelta instead of number of minutes in config
koropets Apr 10, 2020
409070c
Fix issues with of GordoTimeseriesGenerator and local_build()
koropets Apr 10, 2020
c0ec854
sklearn-pandas~=1.8.0
koropets Apr 11, 2020
d208613
Play around with DataFrameMapper
koropets Apr 11, 2020
813d407
Fix kwargs argument for DataFrameMapper
koropets Apr 11, 2020
46413a0
black
koropets Apr 11, 2020
7e3adb3
Deal with DataFrameMapper.__set_state__() and DataFrameMapper.__get_s…
koropets Apr 11, 2020
38db5bd
Some small fixes for DataFrameMapper
koropets Apr 11, 2020
b8e9007
black
koropets Apr 11, 2020
3820b00
Fix tests. Add lookahead for TimeseriesGenerator
koropets Apr 11, 2020
19a7fae
typo
koropets Apr 11, 2020
7ccb8e0
black
koropets Apr 11, 2020
57a7724
Steel not working properly
Apr 15, 2020
9856e9c
Works for simples cases
Apr 15, 2020
0da2f89
GordoTimeseriesGenerator first version
koropets Apr 2, 2020
2d0ba04
First approach for GordoTimeseriesGenerator
koropets Apr 2, 2020
f12a999
Always pass pd.DataFrame for create_keras_timeseriesgenerator()
koropets Apr 2, 2020
2ea84c5
Fix targets in GordoTimeseriesGenerator
koropets Apr 2, 2020
beeff76
Fix github checks
koropets Apr 2, 2020
235c0bf
Do GordoTimeseriesGenerator in different way
koropets Apr 7, 2020
2344f38
Fix annotation checks on Github
koropets Apr 8, 2020
663df74
logger.debug() in GordoTimeseriesGenerator
koropets Apr 9, 2020
7dc56a6
raise ValueError if the time series in wrong shape
koropets Apr 9, 2020
a0dd2e9
test_find_consecutive_chunks()
koropets Apr 9, 2020
a043bb3
test_create_generator_containers()
koropets Apr 9, 2020
36a429b
test_timeseries_generator()
koropets Apr 9, 2020
a08a44a
black
koropets Apr 9, 2020
7e68135
Remove unused import
koropets Apr 9, 2020
f3add87
timeseries_generators()
koropets Apr 10, 2020
8da71bc
Use names of pd.Timedelta instead of number of minutes in config
koropets Apr 10, 2020
7a24de2
Fix issues with of GordoTimeseriesGenerator and local_build()
koropets Apr 10, 2020
3631f72
sklearn-pandas~=1.8.0
koropets Apr 11, 2020
0260043
Play around with DataFrameMapper
koropets Apr 11, 2020
1638ad6
Fix kwargs argument for DataFrameMapper
koropets Apr 11, 2020
7f399e1
black
koropets Apr 11, 2020
663e53e
Deal with DataFrameMapper.__set_state__() and DataFrameMapper.__get_s…
koropets Apr 11, 2020
44d1d4b
Some small fixes for DataFrameMapper
koropets Apr 11, 2020
91f8377
black
koropets Apr 11, 2020
1598b60
Fix tests. Add lookahead for TimeseriesGenerator
koropets Apr 11, 2020
49467c9
typo
koropets Apr 11, 2020
0c6279e
black
koropets Apr 11, 2020
05a91bf
Steel not working properly
Apr 15, 2020
b782353
Works for simples cases
Apr 15, 2020
26cb99c
Merge branch 'gordo_timeseries_generator' of github.com:equinor/gordo…
koropets Feb 18, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions gordo/data_frame_mapper/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import logging
import sklearn_pandas

from copy import copy
from sklearn.base import BaseEstimator
from typing import List, Union

logger = logging.getLogger(__name__)


class DataFrameMapper(sklearn_pandas.DataFrameMapper):
_default_kwargs = {"df_out": True}

def __init__(
self,
columns: List[Union[str, List[str]]],
transformers: List[BaseEstimator] = None,
**kwargs
):
self.columns = columns
self.transformers = transformers
features = self._build_features(columns, transformers)
base_kwargs = copy(self._default_kwargs)
base_kwargs.update(kwargs)
super().__init__(features=features, **base_kwargs)

@staticmethod
def _build_features(
columns: List[Union[str, List[str]]], transformers: List[BaseEstimator],
):
features = []
for column in columns:
features.append((column, transformers))
return features

def __getstate__(self):
state = super().__getstate__()
state["columns"] = self.columns
state["transformers"] = self.transformers
del state["features"]
return state

def __setstate__(self, state):
features = self._build_features(state.get("columns"), state.get("transformers"))
state["features"] = features
super().__setstate__(state)


__all__ = ['DataFrameMapper']
260 changes: 232 additions & 28 deletions gordo/machine/model/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@
from abc import ABCMeta
from copy import copy, deepcopy
from importlib.util import find_spec
from dataclasses import dataclass
from copy import copy

import h5py
import tensorflow.keras.models
from tensorflow.keras.models import load_model, save_model
from tensorflow.keras.preprocessing.sequence import pad_sequences, TimeseriesGenerator
from tensorflow.keras.wrappers.scikit_learn import KerasRegressor as BaseWrapper
from tensorflow.python.keras.utils import data_utils
import numpy as np
import pandas as pd
import xarray as xr
Expand Down Expand Up @@ -472,6 +475,7 @@ def __init__(
kind: Union[Callable, str],
lookback_window: int = 1,
batch_size: int = 32,
timeseries_generator: Optional[Dict[str, Any]] = None,
**kwargs,
) -> None:
"""
Expand Down Expand Up @@ -503,6 +507,7 @@ def __init__(
kwargs["lookback_window"] = lookback_window
kwargs["kind"] = kind
kwargs["batch_size"] = batch_size
kwargs["timeseries_generator"] = timeseries_generator

# fit_generator_params is a set of strings with the keyword arguments of
# Keras fit_generator method (excluding "shuffle" as this will be hardcoded).
Expand Down Expand Up @@ -533,6 +538,10 @@ def lookahead(self) -> int:
"""Steps ahead in y the model should target"""
...

@property
def timeseries_generator(self):
return self.kwargs.get("timeseries_generator", None)

def get_metadata(self):
"""
Add number of forecast steps to metadata
Expand Down Expand Up @@ -580,10 +589,10 @@ def fit(self, X: np.ndarray, y: np.ndarray, **kwargs) -> "KerasLSTMForecast":

"""

X = X.values if isinstance(X, pd.DataFrame) else X
y = y.values if isinstance(y, pd.DataFrame) else y

X = self._validate_and_fix_size_of_X(X)
if not isinstance(X, pd.DataFrame):
X = self._validate_and_fix_size_of_X(X)
else:
pass # TODO

# We call super.fit on a single sample (notice the batch_size=1) to initiate the
# model using the scikit-learn wrapper.
Expand All @@ -595,6 +604,7 @@ def fit(self, X: np.ndarray, y: np.ndarray, **kwargs) -> "KerasLSTMForecast":
batch_size=1,
lookback_window=self.lookback_window,
lookahead=self.lookahead,
config=self.timeseries_generator,
)

primer_x, primer_y = tsg[0]
Expand All @@ -607,6 +617,7 @@ def fit(self, X: np.ndarray, y: np.ndarray, **kwargs) -> "KerasLSTMForecast":
batch_size=self.batch_size,
lookback_window=self.lookback_window,
lookahead=self.lookahead,
config=self.timeseries_generator,
)

gen_kwargs = {
Expand Down Expand Up @@ -655,15 +666,18 @@ def predict(self, X: np.ndarray, **kwargs) -> np.ndarray:
>>> model_transform.shape
(2, 2)
"""
X = X.values if isinstance(X, pd.DataFrame) else X
if not isinstance(X, pd.DataFrame):
X = self._validate_and_fix_size_of_X(X)
else:
pass # TODO

X = self._validate_and_fix_size_of_X(X)
tsg = create_keras_timeseriesgenerator(
X=X,
y=X,
batch_size=10000,
lookback_window=self.lookback_window,
lookahead=self.lookahead,
config=self.timeseries_generator,
)
return self.model.predict_generator(tsg)

Expand Down Expand Up @@ -715,13 +729,35 @@ def lookahead(self) -> int:
return 0


def pad_x_and_y(
X: np.ndarray, y: np.ndarray, lookahead: int
) -> Tuple[np.ndarray, np.ndarray]:
new_length = len(X) + 1 - lookahead
if lookahead == 1:
return X, y
elif lookahead >= 0:
pad_kw = dict(maxlen=new_length, dtype=X.dtype)

if lookahead == 0:
X = pad_sequences([X], padding="post", **pad_kw)[0]
y = pad_sequences([y], padding="pre", **pad_kw)[0]

elif lookahead > 1:
X = pad_sequences([X], padding="post", truncating="post", **pad_kw)[0]
y = pad_sequences([y], padding="pre", truncating="pre", **pad_kw)[0]
return X, y
else:
raise ValueError(f"Value of `lookahead` can not be negative, is {lookahead}")


def create_keras_timeseriesgenerator(
X: np.ndarray,
y: Optional[np.ndarray],
X: Union[pd.DataFrame, np.ndarray],
y: Optional[Union[pd.DataFrame, np.ndarray]],
batch_size: int,
lookback_window: int,
lookahead: int,
) -> tensorflow.keras.preprocessing.sequence.TimeseriesGenerator:
config: Optional[Dict[str, Any]] = None,
) -> TimeseriesGenerator:
"""
Provides a `keras.preprocessing.sequence.TimeseriesGenerator` for use with
LSTM's, but with the added ability to specify the lookahead of the target in y.
Expand Down Expand Up @@ -773,27 +809,195 @@ def create_keras_timeseriesgenerator(
>>> len(gen[0][0][0][0]) # n_features = 2
2
"""
new_length = len(X) + 1 - lookahead
kwargs: Dict[str, Any] = dict(length=lookback_window, batch_size=batch_size)
if lookahead == 1:
kwargs.update(dict(data=X, targets=y))
return timeseries_generators.create_from_config(
config,
data=X,
targets=y,
length=lookback_window,
batch_size=batch_size,
lookahead=lookahead,
)


class TimeseriesGeneratorTypes:
def __init__(self, default_type):
self.default_type = default_type
self._types = {}

def create_from_config(self, config, **kwargs):
if config is None:
return self.default_type(**kwargs)
else:
if "type" not in config:
raise ValueError(
'Unspecified "type" attribute for "timeseries_generator"'
)
type_name = config["type"]
if type_name not in self._types:
raise ValueError(
f'Unknown type "{type_name}" for "timeseries_generator"'
)
all_kwargs = copy(config)
all_kwargs.pop("type")
all_kwargs.update(kwargs)
return self._types[type_name](**all_kwargs)

def __call__(self, type_name):
def wrap(cls):
if type_name in self._types:
raise ValueError(
f'TimeseriesGenerator type with name "{type_name}" already exists'
)
self._types[type_name] = cls
return cls

elif lookahead >= 0:
return wrap

pad_kw = dict(maxlen=new_length, dtype=X.dtype)

if lookahead == 0:
kwargs["data"] = pad_sequences([X], padding="post", **pad_kw)[0]
kwargs["targets"] = pad_sequences([y], padding="pre", **pad_kw)[0]
class DefaultTimeseriesGenerator(TimeseriesGenerator):
def __init__(
self,
data: Union[pd.DataFrame, np.ndarray],
targets: Union[pd.DataFrame, np.ndarray],
lookahead: int = 1,
**kwargs,
):
if isinstance(data, pd.DataFrame):
data = data.values
if isinstance(targets, pd.DataFrame):
targets = targets.values
data, targets = pad_x_and_y(data, targets, lookahead)
super().__init__(data=data, targets=targets, **kwargs)

elif lookahead > 1:
kwargs["data"] = pad_sequences(
[X], padding="post", truncating="post", **pad_kw
)[0]
kwargs["targets"] = pad_sequences(
[y], padding="pre", truncating="pre", **pad_kw
)[0]
else:
raise ValueError(f"Value of `lookahead` can not be negative, is {lookahead}")

return TimeseriesGenerator(**kwargs)
timeseries_generators = TimeseriesGeneratorTypes(
default_type=DefaultTimeseriesGenerator
)


@dataclass
class TimeseriesChunk:
start_ts: pd.Timestamp
end_ts: pd.Timestamp
size: int


@dataclass
class TimeseriesGeneratorContainer:
generator: TimeseriesGenerator
chunk: TimeseriesChunk
length: int


@timeseries_generators("GordoTimeseriesGenerator")
class GordoTimeseriesGenerator(data_utils.Sequence):
def __init__(
self,
data: Union[pd.DataFrame, np.ndarray],
targets: Union[pd.DataFrame, np.ndarray],
length: int,
batch_size: int = 128,
shuffle: bool = False,
step: Union[pd.Timedelta, str] = "10min",
lookahead: int = 1,
):
if not isinstance(data, pd.DataFrame):
raise ValueError("Data have to be instance of pandas.DataFrame")
if not isinstance(targets, pd.DataFrame):
raise ValueError("Targets have to be instance of pandas.DataFrame")
if len(data) != len(targets):
raise ValueError(
"Data and targets have to be of same length. "
f"Data length is {len(data)}"
f" while target length is {len(targets)}"
)

if isinstance(step, str):
step = pd.to_timedelta(step)
self.step = step
self.consecutive_chunks = self.find_consecutive_chunks(data)
logger.debug(
"GordoTimeseriesGenerator with consecutive_chunks=%s",
self.consecutive_chunks,
)
self.failed_chunks: List[TimeseriesChunk] = []
self.generators_containers = self.create_generator_containers(
data, targets, length=length, batch_size=batch_size, shuffle=shuffle
)
logger.debug(
"GordoTimeseriesGenerator with generators_containers=%s",
self.generators_containers,
)
if not self.generators_containers:
raise ValueError(
"Seems like the time series are too small or in random order."
"Failed chunks: %s" % self.consecutive_chunks
)
# TODO use lookahead
self.lookahead = lookahead

def filter_chunks(self, indexes=None):
if indexes is not None:
self.generators_containers = [
self.generators_containers[i] for i in indexes
]

def __len__(self):
return sum(container.length for container in self.generators_containers)

def find_consecutive_chunks(self, df: pd.DataFrame) -> List[TimeseriesChunk]:
chunks = []
prev_ts, start_ts, start_i = None, None, 0
for i, dt in enumerate(df.index):
if prev_ts is None:
prev_ts = dt
start_ts = dt
else:
if dt - prev_ts == self.step:
prev_ts = dt
else:
chunks.append(TimeseriesChunk(start_ts, prev_ts, i - start_i))
prev_ts, start_ts = None, None
start_i = i
if start_ts is not None:
chunks.append(TimeseriesChunk(start_ts, prev_ts, len(df.index) - start_i))
return chunks

def create_generator_containers(
self,
data: pd.DataFrame,
targets: pd.DataFrame,
length: int,
batch_size: int,
shuffle: bool,
) -> List[TimeseriesGeneratorContainer]:
generator_containers = []
for chunk in self.consecutive_chunks:
gen_data = data[chunk.start_ts : chunk.end_ts].values
gen_target = targets[chunk.start_ts : chunk.end_ts].values
try:
generator = TimeseriesGenerator(
gen_data,
gen_target,
length=length,
batch_size=batch_size,
shuffle=shuffle,
)
except ValueError:
self.failed_chunks.append(chunk)
else:
length = len(generator)
generator_containers.append(
TimeseriesGeneratorContainer(generator, chunk, length)
)
return generator_containers

def __getitem__(self, index):
i = -1
for container in self.generators_containers:
new_i = i + container.length
if index <= new_i:
gen_i = index - i - 1
return container.generator[gen_i]
i = new_i
raise IndexError(index)
Loading