Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

20998: Adds multiprocessing to time series IFA #295

Closed
wants to merge 9 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 146 additions & 44 deletions howso/utilities/feature_attributes/time_series.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
from concurrent.futures import (
as_completed,
Future,
ProcessPoolExecutor,
)
from functools import partial
import logging
from math import e, isnan
from math import e, isnan, prod
import multiprocessing as mp
import typing as t
from typing import (
Dict, Iterable, Optional, Union
)
Expand Down Expand Up @@ -28,24 +36,28 @@ def __init__(self, data: pd.DataFrame, time_feature_name: str):
# Keep track of features that contain unsupported data
self.unsupported = []

@staticmethod
def _infer_delta_min_and_max( # noqa: C901
self,
features: Optional[Dict] = None,
datetime_feature_formats: Optional[Dict] = None,
id_feature_name: Optional[Union[str, Iterable[str]]] = None,
orders_of_derivatives: Optional[Dict] = None,
derived_orders: Optional[Dict] = None,
) -> Dict:
data: pd.DataFrame,
time_feature_name: str,
datetime_feature_formats: t.Optional[dict] = None,
features: t.Optional[dict] = None,
derived_orders: t.Optional[Dict] = None,
id_feature_name: t.Optional[t.Union[str, t.Iterable[str]]] = None,
orders_of_derivatives: t.Optional[dict] = None,
time_feature_deltas: t.Optional[pd.Series] = None,
) -> t.Tuple[dict, pd.Series]:
"""
Infer continuous feature delta_min, delta_max for each feature.

If processing the time feature, this method is intended to be used only used on the time feature.
This is done so when multiprocessing is used, the time feature is processed first to cache deltas.
All of the other features may be then processed together or individually.

Parameters
----------
features : dict, default None
(Optional) A partially filled features dict. If partially filled
attributes for a feature are passed in, those parameters will be
retained as is and the delta_min and delta_max attributes will be
inferred.
data : pd.Dataframe
A dataframe of the data to infer min and max deltas for.

datetime_feature_formats : dict, default None
(Optional) Dict defining a custom (non-ISO8601) datetime format and
Expand All @@ -62,6 +74,19 @@ def _infer_delta_min_and_max( # noqa: C901
"end_date" : "%Y-%m-%d"
}

derived_orders : dict, default None
(Optional) Dict of features to the number of orders of derivatives
that should be derived instead of synthesized. For example, for a
feature with a 3rd order of derivative, setting its derived_orders
to 2 will synthesize the 3rd order derivative value, and then use
that synthed value to derive the 2nd and 1st order.

features : dict, default None
(Optional) A partially filled features dict. If partially filled
attributes for a feature are passed in, those parameters will be
retained as is and the delta_min and delta_max attributes will be
inferred.

id_feature_name : str or list of str, default None
(Optional) The name(s) of the ID feature(s).

Expand All @@ -72,32 +97,41 @@ def _infer_delta_min_and_max( # noqa: C901
set to 0, will not generate any delta/rate features. By default all
continuous features have an order value of 1.

derived_orders : dict, default None
(Optional) Dict of features to the number of orders of derivatives
that should be derived instead of synthesized. For example, for a
feature with a 3rd order of derivative, setting its derived_orders
to 2 will synthesize the 3rd order derivative value, and then use
that synthed value to derive the 2nd and 1st order.
time_feature_deltas : pd.Series, default None
(Optional) Series of time feature deltas. If not processing the time feature, then
`time_feature_deltas` must be provided.

Returns
-------
features : dict
Returns dictionary of {`type`: "feature type"}} with column names
in passed in df as key.
tuple[dict, pd.Series]
Returns a tuple of a dictionary of {`type`: "feature type"}} with column names
as keys and a Series of time feature deltas. If `time_feature` is not True, then this Series is a
passthrough of the provided `time_feature_deltas` parameter values.
"""
# iterate over all features, ensuring that the time feature is the first
# one to be processed so that its deltas are cached
feature_names = set(features.keys())
feature_names.remove(self.time_feature_name)
feature_names = [self.time_feature_name] + list(feature_names)
feature_names = list(set(features.keys()))

if time_feature_name not in feature_names:
if time_feature_deltas is None:
raise ValueError(
"If not a time feature, then calculated `time_feature_deltas` must be provided"
)
# else we are processing the time feature
else:
time_feature_deltas = None
if len(feature_names) > 1:
features = {f: features[f] for f in features if f == time_feature_name}
feature_names = [time_feature_name]
warnings.warn(
"The features dictionary provided includes the time feature in addition to the non time features. "
"These feature types must be processed separately. Ignoring non time features."
)

if orders_of_derivatives is None:
orders_of_derivatives = dict()

if derived_orders is None:
derived_orders = dict()

time_feature_deltas = None
for f_name in feature_names:
if features[f_name]['type'] == "continuous" and \
'time_series' in features[f_name]:
Expand Down Expand Up @@ -127,18 +161,18 @@ def _infer_delta_min_and_max( # noqa: C901
if dt_format is not None:
# copy just the id columns and the time feature
if isinstance(id_feature_name, str):
df_c = self.data.loc[:, [id_feature_name, f_name]]
elif isinstance(id_feature_name, list):
df_c = self.data.loc[:, id_feature_name + [f_name]]
df_c = data.loc[:, [id_feature_name, f_name]]
elif isinstance(id_feature_name, list) and len(id_feature_name) > 0:
df_c = data.loc[:, id_feature_name + [f_name]]
else:
df_c = self.data.loc[:, [f_name]]
df_c = data.loc[:, [f_name]]

# convert time feature to epoch
df_c[f_name] = df_c[f_name].apply(
lambda x: date_to_epoch(x, dt_format))

# use Pandas' diff() to pull all the deltas for this feature
if isinstance(id_feature_name, list):
if isinstance(id_feature_name, list) and len(id_feature_name) > 0:
deltas = df_c.groupby(id_feature_name)[f_name].diff(1)
elif isinstance(id_feature_name, str):
deltas = df_c.groupby([id_feature_name])[f_name].diff(1)
Expand All @@ -147,14 +181,14 @@ def _infer_delta_min_and_max( # noqa: C901

else:
# Use pandas' diff() to pull all the deltas for this feature
if isinstance(id_feature_name, list):
deltas = self.data.groupby(id_feature_name)[f_name].diff(1)
if isinstance(id_feature_name, list) and len(id_feature_name) > 0:
deltas = data.groupby(id_feature_name)[f_name].diff(1)
elif isinstance(id_feature_name, str):
deltas = self.data.groupby([id_feature_name])[f_name].diff(1)
deltas = data.groupby([id_feature_name])[f_name].diff(1)
else:
deltas = self.data[f_name].diff(1)
deltas = data[f_name].diff(1)

if f_name == self.time_feature_name:
if f_name == time_feature_name:
time_feature_deltas = deltas

# initial rates are same as deltas which will then be used as input
Expand Down Expand Up @@ -213,13 +247,13 @@ def _infer_delta_min_and_max( # noqa: C901
delta_min = min(deltas.dropna())
# don't allow the time series time feature to go back in time
# TODO: 15550: support user-specified min/max values
if f_name == self.time_feature_name:
if f_name == time_feature_name:
features[f_name]['time_series']['delta_min'].append(max(0, delta_min / e))
else:
delta_min = delta_min / e if delta_min > 0 else delta_min * e
features[f_name]['time_series']['delta_min'].append(delta_min)

return features
return features, time_feature_deltas

def _set_rate_delta_bounds(self, btype: str, bounds: Dict, features: Dict):
"""Set optinally-specified rate/delta bounds in the features dict."""
Expand Down Expand Up @@ -593,7 +627,7 @@ def _process( # noqa: C901
if isinstance(id_feature_name, str):
id_feature_names = [id_feature_name]
elif isinstance(id_feature_name, Iterable):
id_feature_names = id_feature_name
id_feature_names = list(id_feature_name)
else:
id_feature_names = []

Expand Down Expand Up @@ -673,14 +707,82 @@ def _process( # noqa: C901
else:
features[self.time_feature_name]['bounds'] = {'allow_null': False}

features = self._infer_delta_min_and_max(
features=features,
# Process the time feature first so that the `time_feature_deltas` are cached in case multiprocessing is used
time_feature_attributes, time_feature_deltas = self._infer_delta_min_and_max(
data=self.data[((id_feature_names if id_feature_names is not None else []) + [self.time_feature_name])],
time_feature_name=self.time_feature_name,
features={f: features[f] for f in [self.time_feature_name]},
datetime_feature_formats=datetime_feature_formats,
id_feature_name=id_feature_name,
id_feature_name=id_feature_names,
orders_of_derivatives=orders_of_derivatives,
derived_orders=derived_orders
)

# Get the list of the other features to process
non_time_feature_names = list(set(features.keys()))
non_time_feature_names.remove(self.time_feature_name)

if prod(self.data.shape) < 25_000_000 and max_workers is None:
max_workers = 0
if max_workers is None or max_workers >= 1:
mp_context = mp.get_context("spawn")
futures: dict[Future, str] = dict()
task_func = partial(
InferFeatureAttributesTimeSeries._infer_delta_min_and_max,
time_feature_name=self.time_feature_name,
datetime_feature_formats=datetime_feature_formats,
id_feature_name=id_feature_names,
orders_of_derivatives=orders_of_derivatives,
derived_orders=derived_orders,
time_feature_deltas=time_feature_deltas,
)

with ProcessPoolExecutor(max_workers=max_workers, mp_context=mp_context) as pool:
for feature in non_time_feature_names:
future = pool.submit(
task_func,
data=self.data[((id_feature_names if id_feature_names is not None else []) + [feature])],
features={f: features[f] for f in [feature]},
)
futures[future] = feature

temp_feature_attributes: dict[str, t.Any] = dict()
for future in as_completed(futures):
try:
response = future.result()
temp_feature_attributes.update(response[0])
except Exception as e:
warnings.warn(
f"Infer_feature_attributes raised an exception "
f"while processing '{futures[future]}' ({str(e)})."
)

else:
temp_feature_attributes, _ = InferFeatureAttributesTimeSeries._infer_delta_min_and_max(
data=self.data[non_time_feature_names],
time_feature_name=self.time_feature_name,
features={f: features[f] for f in non_time_feature_names},
time_feature_deltas=time_feature_deltas,
datetime_feature_formats=datetime_feature_formats,
id_feature_name=id_feature_names,
orders_of_derivatives=orders_of_derivatives,
derived_orders=derived_orders,
)

# Add the time feature attributes
temp_feature_attributes.update(time_feature_attributes)
# Re-order the keys like the original dataframe by following the `features`` key order.
# features has not been modified since it was created and ordered by `InferFeatureAttributesDataFrame`.
temp_feature_attributes = {
k: temp_feature_attributes[k] for k in set(features.keys())
}

# Since features is a `SingleTableFeatureAttributes` object, in order preserve the class
# attributes while reordering, the key-values are cleared and then updated with the ordered temporary feature
# attributes.
features.clear()
features.update(temp_feature_attributes)

# Set any manually specified rate/delta boundaries
if delta_boundaries is not None:
self._set_rate_delta_bounds('delta', delta_boundaries, features)
Expand Down
Loading