Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
jackx111 committed Sep 25, 2024
1 parent 284da43 commit 2029393
Showing 1 changed file with 9 additions and 46 deletions.
55 changes: 9 additions & 46 deletions howso/utilities/feature_attributes/time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,6 @@
SMALLEST_TIME_DELTA = 0.001


def _delta_min_and_max_shard(
data,
time_feature_name,
datetime_feature_formats: Optional[Dict] = None,
derived_orders: Optional[Dict] = None,
features: Optional[dict] = None,
id_feature_name: Optional[Union[str, Iterable[str]]] = None,
orders_of_derivatives: Optional[Dict] = None,
time_feature_deltas: Optional[pd.Series] = None,
) -> t.Tuple[dict, pd.Series]:
"""Internal function to aid multiprocessing of feature attributes for series."""

return InferFeatureAttributesTimeSeries._infer_delta_min_and_max(
data=data,
time_feature_name=time_feature_name,
features=features,
datetime_feature_formats=datetime_feature_formats,
derived_orders=derived_orders,
id_feature_name=id_feature_name,
orders_of_derivatives=orders_of_derivatives,
time_feature_deltas=time_feature_deltas
)


class InferFeatureAttributesTimeSeries:
"""Infer feature attributes for time series data."""

Expand All @@ -71,7 +47,7 @@ def _infer_delta_min_and_max( # noqa: C901
id_feature_name: Optional[Union[str, Iterable[str]]] = None,
orders_of_derivatives: Optional[Dict] = None,
time_feature_deltas: Optional[pd.Series] = None,
) -> t.Tuple[dict, pd.Series]:
) -> t.Tuple[dict | None, pd.Series]:
"""
Infer continuous feature delta_min, delta_max for each feature.
Expand Down Expand Up @@ -103,16 +79,12 @@ def _infer_delta_min_and_max( # noqa: C901
to 2 will synthesize the 3rd order derivative value, and then use
that synthed value to derive the 2nd and 1st order.
features : SingleTableFeatureAttributes, default None
(Optional) A partially filled SingleTableFeatureAttributes object. If partially filled
features : SingleTableFeatureAttributes | dict, default None
(Optional) A partially filled SingleTableFeatureAttributes objec or dict. If partially filled
attributes for a feature are passed in, those parameters will be
retained as is and the delta_min and delta_max attributes will be
inferred.
features : Iterable or str, default None
(Optional) An interable or string of feature name(s) to calculate delta min and maxes for.
If `time_feature` is True, then this parameter should only contain one feature name.
id_feature_name : str or list of str, default None
(Optional) The name(s) of the ID feature(s).
Expand All @@ -131,18 +103,13 @@ def _infer_delta_min_and_max( # noqa: C901
Returns
-------
dict
Returns dict object of {`type`: "feature type"}} with column names
as keys as well as a Series of time feature deltas. If `time_feature` is not True, then this Series is a
passthrough of the provided `time_feature_deltas` parameter values.
SingleTableFeatureAttributes | dict, pd.Series]
tuple[SingleTableFeatureAttributes | dict, pd.Series]
Returns a SingleTableFeatureAttributes or dict object of {`type`: "feature type"}} with column names
as keys as well as a Series of time feature deltas. If `time_feature` is not True, then this Series is a
passthrough of the provided `time_feature_deltas` parameter values.
"""

feature_names = set(features.keys())
feature_names = list(set(features.keys()))

if time_feature_name not in feature_names:
if time_feature_deltas is None:
Expand All @@ -152,7 +119,7 @@ def _infer_delta_min_and_max( # noqa: C901
# else we are processing the time feature
else:
time_feature_deltas = None
if len(feature_names) > 1:
if len(feature_names) > 1 and features is not None:
features = {key: features[key] for key in features if key == time_feature_name}
warnings.warn(
"The features dictionary provide includes the time feature in addition to the non time features. "
Expand Down Expand Up @@ -287,10 +254,6 @@ def _infer_delta_min_and_max( # noqa: C901
features[f_name]['time_series']['delta_min'].append(delta_min)

return features, time_feature_deltas
# return {
# "features": features,
# "time_feature_deltas": time_feature_deltas
# }

def _set_rate_delta_bounds(self, btype: str, bounds: Dict, features: Dict):
"""Set optinally-specified rate/delta bounds in the features dict."""
Expand Down Expand Up @@ -748,7 +711,7 @@ def _process( # noqa: C901
# Process the time feature first so that the time_feature_deltas are cached
time_feature_attributes, time_feature_deltas = self._infer_delta_min_and_max(
# Double brackets forces the selected dataframe to remain a dataframe instead of a series
data=self.data[[self.time_feature_name]],
data=self.data[[f for f in [id_feature_name, self.time_feature_name] if f is not None]],
time_feature_name=self.time_feature_name,
features={key: features[key] for key in [self.time_feature_name]},
datetime_feature_formats=datetime_feature_formats,
Expand All @@ -770,7 +733,7 @@ def _process( # noqa: C901
mp_context = mp.get_context("spawn")
futures: dict[Future, str] = dict()
task_func = partial(
_delta_min_and_max_shard,
InferFeatureAttributesTimeSeries._infer_delta_min_and_max,
time_feature_name=self.time_feature_name,
datetime_feature_formats=datetime_feature_formats,
id_feature_name=id_feature_name,
Expand All @@ -783,7 +746,7 @@ def _process( # noqa: C901
for feature in non_time_feature_names:
future = pool.submit(
task_func,
data=self.data[[feature]],
data=self.data[[f for f in [id_feature_name, feature] if f is not None]],
features={key: features[key] for key in [feature]},
)
futures[future] = feature
Expand Down

0 comments on commit 2029393

Please sign in to comment.