diff --git a/howso/utilities/feature_attributes/time_series.py b/howso/utilities/feature_attributes/time_series.py index 4d86ec39..f1e0b5d5 100644 --- a/howso/utilities/feature_attributes/time_series.py +++ b/howso/utilities/feature_attributes/time_series.py @@ -27,30 +27,6 @@ SMALLEST_TIME_DELTA = 0.001 -def _delta_min_and_max_shard( - data, - time_feature_name, - datetime_feature_formats: Optional[Dict] = None, - derived_orders: Optional[Dict] = None, - features: Optional[dict] = None, - id_feature_name: Optional[Union[str, Iterable[str]]] = None, - orders_of_derivatives: Optional[Dict] = None, - time_feature_deltas: Optional[pd.Series] = None, -) -> t.Tuple[dict, pd.Series]: - """Internal function to aid multiprocessing of feature attributes for series.""" - - return InferFeatureAttributesTimeSeries._infer_delta_min_and_max( - data=data, - time_feature_name=time_feature_name, - features=features, - datetime_feature_formats=datetime_feature_formats, - derived_orders=derived_orders, - id_feature_name=id_feature_name, - orders_of_derivatives=orders_of_derivatives, - time_feature_deltas=time_feature_deltas - ) - - class InferFeatureAttributesTimeSeries: """Infer feature attributes for time series data.""" @@ -71,7 +47,7 @@ def _infer_delta_min_and_max( # noqa: C901 id_feature_name: Optional[Union[str, Iterable[str]]] = None, orders_of_derivatives: Optional[Dict] = None, time_feature_deltas: Optional[pd.Series] = None, - ) -> t.Tuple[dict, pd.Series]: + ) -> t.Tuple[dict | None, pd.Series]: """ Infer continuous feature delta_min, delta_max for each feature. @@ -103,16 +79,12 @@ def _infer_delta_min_and_max( # noqa: C901 to 2 will synthesize the 3rd order derivative value, and then use that synthed value to derive the 2nd and 1st order. - features : SingleTableFeatureAttributes, default None - (Optional) A partially filled SingleTableFeatureAttributes object. If partially filled + features : SingleTableFeatureAttributes | dict, default None + (Optional) A partially filled SingleTableFeatureAttributes objec or dict. If partially filled attributes for a feature are passed in, those parameters will be retained as is and the delta_min and delta_max attributes will be inferred. - features : Iterable or str, default None - (Optional) An interable or string of feature name(s) to calculate delta min and maxes for. - If `time_feature` is True, then this parameter should only contain one feature name. - id_feature_name : str or list of str, default None (Optional) The name(s) of the ID feature(s). @@ -131,18 +103,13 @@ def _infer_delta_min_and_max( # noqa: C901 Returns ------- - dict - Returns dict object of {`type`: "feature type"}} with column names - as keys as well as a Series of time feature deltas. If `time_feature` is not True, then this Series is a - passthrough of the provided `time_feature_deltas` parameter values. - - SingleTableFeatureAttributes | dict, pd.Series] + tuple[SingleTableFeatureAttributes | dict, pd.Series] Returns a SingleTableFeatureAttributes or dict object of {`type`: "feature type"}} with column names as keys as well as a Series of time feature deltas. If `time_feature` is not True, then this Series is a passthrough of the provided `time_feature_deltas` parameter values. """ - feature_names = set(features.keys()) + feature_names = list(set(features.keys())) if time_feature_name not in feature_names: if time_feature_deltas is None: @@ -152,7 +119,7 @@ def _infer_delta_min_and_max( # noqa: C901 # else we are processing the time feature else: time_feature_deltas = None - if len(feature_names) > 1: + if len(feature_names) > 1 and features is not None: features = {key: features[key] for key in features if key == time_feature_name} warnings.warn( "The features dictionary provide includes the time feature in addition to the non time features. " @@ -287,10 +254,6 @@ def _infer_delta_min_and_max( # noqa: C901 features[f_name]['time_series']['delta_min'].append(delta_min) return features, time_feature_deltas - # return { - # "features": features, - # "time_feature_deltas": time_feature_deltas - # } def _set_rate_delta_bounds(self, btype: str, bounds: Dict, features: Dict): """Set optinally-specified rate/delta bounds in the features dict.""" @@ -748,7 +711,7 @@ def _process( # noqa: C901 # Process the time feature first so that the time_feature_deltas are cached time_feature_attributes, time_feature_deltas = self._infer_delta_min_and_max( # Double brackets forces the selected dataframe to remain a dataframe instead of a series - data=self.data[[self.time_feature_name]], + data=self.data[[f for f in [id_feature_name, self.time_feature_name] if f is not None]], time_feature_name=self.time_feature_name, features={key: features[key] for key in [self.time_feature_name]}, datetime_feature_formats=datetime_feature_formats, @@ -770,7 +733,7 @@ def _process( # noqa: C901 mp_context = mp.get_context("spawn") futures: dict[Future, str] = dict() task_func = partial( - _delta_min_and_max_shard, + InferFeatureAttributesTimeSeries._infer_delta_min_and_max, time_feature_name=self.time_feature_name, datetime_feature_formats=datetime_feature_formats, id_feature_name=id_feature_name, @@ -783,7 +746,7 @@ def _process( # noqa: C901 for feature in non_time_feature_names: future = pool.submit( task_func, - data=self.data[[feature]], + data=self.data[[f for f in [id_feature_name, feature] if f is not None]], features={key: features[key] for key in [feature]}, ) futures[future] = feature