Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add xarray.Dataset support #1298

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,4 @@ venv.bak/
.python-version
*.DS_Store

venv_gordo*/
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Gordo base image
FROM python:3.9.15-buster as builder
FROM python:3.9-buster as builder

# Copy source code
COPY . /code
Expand All @@ -17,7 +17,7 @@ RUN cat /code/requirements/full_requirements.txt | grep tensorflow== > /code/pre
&& cat /code/requirements/full_requirements.txt | grep scipy== >> /code/prereq.txt \
&& cat /code/requirements/full_requirements.txt | grep catboost== >> /code/prereq.txt

FROM python:3.9.15-slim-buster
FROM python:3.9-slim-buster

# Nonroot user for running CMD
RUN groupadd -g 999 gordo && \
Expand Down
4 changes: 2 additions & 2 deletions gordo/machine/model/anomaly/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ class AnomalyDetectorBase(BaseEstimator, GordoBase, metaclass=abc.ABCMeta):
@abc.abstractmethod
def anomaly(
self,
X: Union[pd.DataFrame, xr.DataArray],
y: Union[pd.DataFrame, xr.DataArray],
X: Union[pd.DataFrame, xr.DataArray, xr.Dataset],
y: Union[pd.DataFrame, xr.DataArray, xr.Dataset],
frequency: Optional[timedelta] = None,
) -> Union[pd.DataFrame, xr.Dataset]:
"""
Expand Down
3 changes: 2 additions & 1 deletion gordo/machine/model/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import numpy as np
import pandas as pd
import xarray as xr


class GordoBase(abc.ABC):
Expand All @@ -21,7 +22,7 @@ def get_params(self, deep=False):
@abc.abstractmethod
def score(
self,
X: Union[np.ndarray, pd.DataFrame],
X: Union[np.ndarray, pd.DataFrame, xr.Dataset],
y: Union[np.ndarray, pd.DataFrame],
sample_weight: Optional[np.ndarray] = None,
):
Expand Down
19 changes: 15 additions & 4 deletions gordo/machine/model/factories/lstm_autoencoder.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# -*- coding: utf-8 -*-

from typing import Tuple, Union, Dict, Any
from typing import Tuple, Union, Dict, Any, Optional

import tensorflow
from tensorflow import keras
from tensorflow.keras.optimizers import Optimizer
from tensorflow.keras.layers import Dense, LSTM
from tensorflow.keras.layers import Dense, LSTM, Masking
from tensorflow.keras.models import Sequential as KerasSequential

from gordo.machine.model.register import register_model_builder
Expand All @@ -26,6 +26,7 @@ def lstm_model(
optimizer: Union[str, Optimizer] = "Adam",
optimizer_kwargs: Dict[str, Any] = dict(),
compile_kwargs: Dict[str, Any] = dict(),
mask_value: Optional[float] = None,
**kwargs,
) -> tensorflow.keras.models.Sequential:
"""
Expand Down Expand Up @@ -63,6 +64,8 @@ class (e.x. Adam(lr=0.01,beta_1=0.9, beta_2=0.999)). If no arguments are
default values will be used.
compile_kwargs: Dict[str, Any]
Parameters to pass to ``keras.Model.compile``.
mask_value: Optional[float]
Add Masking layer with this mask_value

Returns
-------
Expand All @@ -77,11 +80,19 @@ class (e.x. Adam(lr=0.01,beta_1=0.9, beta_2=0.999)). If no arguments are

model = KerasSequential()

if mask_value:
input_shape = (lookback_window, n_features)
model.add(Masking(mask_value=mask_value, input_shape=input_shape))

# encoding layers
kwargs = {"return_sequences": True}
for i, (n_neurons, activation) in enumerate(zip(encoding_dim, encoding_func)):
input_shape = (lookback_window, n_neurons if i != 0 else n_features)
kwargs.update(dict(activation=activation, input_shape=input_shape))
input_shape = (
lookback_window,
n_neurons if mask_value or i != 0 else n_features,
)
kwargs["activation"] = activation
kwargs["input_shape"] = input_shape
model.add(LSTM(n_neurons, **kwargs))

# decoding layers
Expand Down
137 changes: 130 additions & 7 deletions gordo/machine/model/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def get_n_features_out(

@staticmethod
def get_n_features(
X: Union[np.ndarray, pd.DataFrame, xr.DataArray]
X: Union[np.ndarray, pd.DataFrame, xr.DataArray] # TODO should xr.Dataset be here?
) -> Union[int, tuple]:
shape_len = len(X.shape)
if shape_len == 1:
Expand All @@ -240,19 +240,19 @@ def get_n_features(

def fit(
self,
X: Union[np.ndarray, pd.DataFrame, xr.DataArray],
y: Union[np.ndarray, pd.DataFrame, xr.DataArray],
X: Union[np.ndarray, pd.DataFrame, xr.DataArray, xr.Dataset],
y: Union[np.ndarray, pd.DataFrame, xr.DataArray, xr.Dataset],
**kwargs,
):
"""
Fit the model to X given y.

Parameters
----------
X: Union[np.ndarray, pd.DataFrame, xr.Dataset]
numpy array or pandas dataframe
y: Union[np.ndarray, pd.DataFrame, xr.Dataset]
numpy array or pandas dataframe
X: Union[np.ndarray, pd.DataFrame, xr.DataArray, xr.Dataset]
numpy array, pandas dataframe, xarray dataArray or xarray Dataset
y: Union[np.ndarray, pd.DataFrame, xr.DataArray, xr.Dataset]
numpy array, pandas dataframe, xarray dataArray or xarray Dataset
sample_weight: np.ndarray
array like - weight to assign to samples
kwargs
Expand Down Expand Up @@ -280,6 +280,9 @@ def fit(
X = X.values
if isinstance(y, (pd.DataFrame, xr.DataArray)):
y = y.values
if isinstance(X, xr.Dataset):
# TODO: add masking and padding here
X = X.to_dataframe().values
kwargs.setdefault("verbose", 0)
history = super().fit(X, y, sample_weight=None, **kwargs)
if isinstance(history, History):
Expand Down Expand Up @@ -556,6 +559,32 @@ def _validate_and_fix_size_of_X(self, X):
)
return X

def _handle_missed_timestamps_or_NaN_in_X_y(self, X, y):

if not isinstance(X, pd.DataFrame) or not isinstance(y, pd.DataFrame):
return (X, y)

train_start_date = self.kwargs["train_start_date"]
train_end_date = self.kwargs["train_end_date"]
strategy_name = self.kwargs["time_series_data_filter_strategy"]
time_frequency = self.kwargs["time_frequency"]

module_name, class_name = strategy_name.rsplit(".", 1)
module = importlib.import_module(module_name)
if not hasattr(module, class_name):
raise ValueError(
"Unable to find class %s in module '%s' for handling NaN, skipping"
% (self.kind, class_name, module_name)
)
strategy = getattr(module, class_name)()

return strategy.handle_missed_timestamps(
start_data_timestamp = train_start_date,
end_data_timestamp = train_end_date,
X = X,
y = y,
time_frequency = time_frequency)

def fit( # type: ignore
self, X: np.ndarray, y: np.ndarray, **kwargs
) -> "KerasLSTMForecast":
Expand All @@ -578,6 +607,7 @@ def fit( # type: ignore
KerasLSTMForecast

"""
X, y = self._handle_missed_timestamps_or_NaN_in_X_y(X,y)

X = X.values if isinstance(X, pd.DataFrame) else X
y = y.values if isinstance(y, pd.DataFrame) else y
Expand Down Expand Up @@ -796,3 +826,96 @@ def create_keras_timeseriesgenerator(
raise ValueError(f"Value of `lookahead` can not be negative, is {lookahead}")

return TimeseriesGenerator(**kwargs)

class BaseDataFilterStrategy(metaclass=ABCMeta):

@abc.abstractmethod
def handle_missed_timestamps(self,
start_data_timestamp: str,
end_data_timestamp: str,
X: np.ndarray,
y: Optional(np.ndarray),
time_frequency: Optional[str]
) -> Tuple(np.ndarray, Optional(np.ndarray)):
"""
Base class for handling NaNs or missed datapoints in the input X and y sample arrays.

Parameters
----------
X: np.ndarray
2d array of values, each row being one sample.
y: Optional[np.ndarray]
array representing the target.
start_data_timestamp: Optional[str]
start timestamp for the data in the input X and y (if present) samples
end_data_timestamp: Optional[str]
end timestamp for the data in the input X and y (if present) samples
time_frequency: Optional[str]
the data sampling frequency

Returns
-------
Tuple(np.ndarray, Optional(np.ndarray))
X and y data after processing.
"""
...

class RemoveRowsWithNaNDataStrategy(BaseDataFilterStrategy):

def handle_missed_timestamps(self,
start_data_timestamp: str,
end_data_timestamp: str,
X: np.ndarray,
y: Optional(np.ndarray),
time_frequency: Optional[str]
) -> Tuple(np.ndarray, Optional(np.ndarray)):
"""
Deletes all rows with NaN in X and corresponding records in y and visse-versa
"""
mask_X = np.isnan(X).any(axis=1)
if not y:
return X[~mask]

mask_y = np.isnan(y)
mask = np.logical_and(mask_X, mask_y)
return (X[~mask], y[~mask])

def handle_missed_timestamps(self,
start_data_timestamp: str,
end_data_timestamp: str,
X: pd.DataFrame,
y: Optional(pd.DataFrame),
time_frequency: Optional[str]
) -> Tuple(pd.DataFrame, Optional(pd.DataFrame)):
"""
Deletes all rows with NaN in X and corresponding records in y and visse-versa
"""
mask_X = X.isna().any(axis=1)
if not y:
return X[~mask]

mask_y = y.isna()
mask = np.logical_and(mask_X, mask_y)
return (X[~mask], y[~mask])

class FillMissingTimestampsWithNanStrategy(BaseDataFilterStrategy):

def handle_missed_timestamps(self,
start_data_timestamp: str,
end_data_timestamp: str,
X: pd.DataFrame,
y: Optional(pd.DataFrame),
time_frequency: Optional[str]
) -> Tuple(pd.DataFrame, Optional(pd.DataFrame)):
"""
Figure out missing timestamps in X and y and fill them with NaN values.
"""
start_training_data_timestamp = np.datetime64(start_data_timestamp)
end_training_data_timestamp = np.datetime64(end_data_timestamp)
expected_timestamps = pd.date_range(start=start_training_data_timestamp,
end=end_training_data_timestamp,
freq=time_frequency)
if y:
return X.reindex(expected_timestamps), y.reindex(expected_timestamps)
else:
return X.reindex(expected_timestamps)
Loading