From 5884254d68201153a04c970a4befd57a1de6d784 Mon Sep 17 00:00:00 2001 From: jack-xia-dp <144161208+jackx111@users.noreply.github.com> Date: Wed, 25 Sep 2024 09:40:49 -0400 Subject: [PATCH 1/8] save --- .../feature_attributes/time_series.py | 211 +++++++++++++++--- 1 file changed, 175 insertions(+), 36 deletions(-) diff --git a/howso/utilities/feature_attributes/time_series.py b/howso/utilities/feature_attributes/time_series.py index bb25f051..122571db 100644 --- a/howso/utilities/feature_attributes/time_series.py +++ b/howso/utilities/feature_attributes/time_series.py @@ -1,5 +1,9 @@ +from concurrent.futures import as_completed, Future, ProcessPoolExecutor, ThreadPoolExecutor +from functools import partial import logging -from math import e, isnan +from math import e, isnan, prod +import multiprocessing as mp +import typing as t from typing import ( Dict, Iterable, Optional, Union ) @@ -18,6 +22,29 @@ SMALLEST_TIME_DELTA = 0.001 +def standalone_infer_delta_min_and_max( + data, + time_feature_name, + datetime_feature_formats: Optional[Dict] = None, + derived_orders: Optional[Dict] = None, + features: Optional[dict | SingleTableFeatureAttributes] = None, + id_feature_name: Optional[Union[str, Iterable[str]]] = None, + orders_of_derivatives: Optional[Dict] = None, + time_feature_deltas: Optional[pd.Series] = None, +) -> t.Tuple[dict | SingleTableFeatureAttributes, pd.Series]: + + return InferFeatureAttributesTimeSeries._infer_delta_min_and_max( + data=data, + time_feature_name=time_feature_name, + features=features, + datetime_feature_formats=datetime_feature_formats, + derived_orders=derived_orders, + id_feature_name=id_feature_name, + orders_of_derivatives=orders_of_derivatives, + time_feature_deltas=time_feature_deltas + ) + + class InferFeatureAttributesTimeSeries: """Infer feature attributes for time series data.""" @@ -28,25 +55,27 @@ def __init__(self, data: pd.DataFrame, time_feature_name: str): # Keep track of features that contain unsupported data self.unsupported = [] + @staticmethod def _infer_delta_min_and_max( # noqa: C901 - self, - features: Optional[Dict] = None, + data, + features: dict, + time_feature_name, datetime_feature_formats: Optional[Dict] = None, + derived_orders: Optional[Dict] = None, + # feature_names: Optional[Iterable | str] = None, id_feature_name: Optional[Union[str, Iterable[str]]] = None, orders_of_derivatives: Optional[Dict] = None, - derived_orders: Optional[Dict] = None, - ) -> Dict: + time_feature_deltas: Optional[pd.Series] = None, + ) -> t.Tuple[dict | SingleTableFeatureAttributes, pd.Series]: """ Infer continuous feature delta_min, delta_max for each feature. + Can either process all of the features or a subset of features. If processing the time feature, this method + is intended to be used only used on the time feature. All of the other features may be then processed together + or individually. + Parameters ---------- - features : dict, default None - (Optional) A partially filled features dict. If partially filled - attributes for a feature are passed in, those parameters will be - retained as is and the delta_min and delta_max attributes will be - inferred. - datetime_feature_formats : dict, default None (Optional) Dict defining a custom (non-ISO8601) datetime format and an optional locale for features with datetimes. By default @@ -62,6 +91,23 @@ def _infer_delta_min_and_max( # noqa: C901 "end_date" : "%Y-%m-%d" } + derived_orders : dict, default None + (Optional) Dict of features to the number of orders of derivatives + that should be derived instead of synthesized. For example, for a + feature with a 3rd order of derivative, setting its derived_orders + to 2 will synthesize the 3rd order derivative value, and then use + that synthed value to derive the 2nd and 1st order. + + features : SingleTableFeatureAttributes, default None + (Optional) A partially filled SingleTableFeatureAttributes object. If partially filled + attributes for a feature are passed in, those parameters will be + retained as is and the delta_min and delta_max attributes will be + inferred. + + features : Iterable or str, default None + (Optional) An interable or string of feature name(s) to calculate delta min and maxes for. + If `time_feature` is True, then this parameter should only contain one feature name. + id_feature_name : str or list of str, default None (Optional) The name(s) of the ID feature(s). @@ -72,24 +118,41 @@ def _infer_delta_min_and_max( # noqa: C901 set to 0, will not generate any delta/rate features. By default all continuous features have an order value of 1. - derived_orders : dict, default None - (Optional) Dict of features to the number of orders of derivatives - that should be derived instead of synthesized. For example, for a - feature with a 3rd order of derivative, setting its derived_orders - to 2 will synthesize the 3rd order derivative value, and then use - that synthed value to derive the 2nd and 1st order. + time_feature_deltas : pd.Series, default None + (Optional) Series of time feature deltas. If `time_feature` is not True, then + `time_feature_deltas` must be provided. time_feature_deltas` is a pandas series that is + created when `_infer_delta_min_and_max` is called on a time feature and setting + `time_feature` to True. Returns ------- - features : dict - Returns dictionary of {`type`: "feature type"}} with column names - in passed in df as key. + dict + Returns dict object of {`type`: "feature type"}} with column names + as keys as well as a Series of time feature deltas. If `time_feature` is not True, then this Series is a + passthrough of the provided `time_feature_deltas` parameter values. + + SingleTableFeatureAttributes | dict, pd.Series] + Returns a SingleTableFeatureAttributes or dict object of {`type`: "feature type"}} with column names + as keys as well as a Series of time feature deltas. If `time_feature` is not True, then this Series is a + passthrough of the provided `time_feature_deltas` parameter values. """ - # iterate over all features, ensuring that the time feature is the first - # one to be processed so that its deltas are cached + feature_names = set(features.keys()) - feature_names.remove(self.time_feature_name) - feature_names = [self.time_feature_name] + list(feature_names) + + if time_feature_name not in feature_names: + if time_feature_deltas is None: + raise ValueError( + "If not a time feature, then calculated time_feature_deltas must be provided" + ) + # else we are processing the time feature + else: + time_feature_deltas = None + if len(feature_names) > 1: + features = {key: features[key] for key in features if key == time_feature_name} + warnings.warn( + "The features dictionary provide includes the time feature in addition to the non time features. " + "These feature types must be processed separately, ignoring non time features." + ) if orders_of_derivatives is None: orders_of_derivatives = dict() @@ -97,7 +160,6 @@ def _infer_delta_min_and_max( # noqa: C901 if derived_orders is None: derived_orders = dict() - time_feature_deltas = None for f_name in feature_names: if features[f_name]['type'] == "continuous" and \ 'time_series' in features[f_name]: @@ -127,11 +189,11 @@ def _infer_delta_min_and_max( # noqa: C901 if dt_format is not None: # copy just the id columns and the time feature if isinstance(id_feature_name, str): - df_c = self.data.loc[:, [id_feature_name, f_name]] + df_c = data.loc[:, [id_feature_name, f_name]] elif isinstance(id_feature_name, list): - df_c = self.data.loc[:, id_feature_name + [f_name]] + df_c = data.loc[:, id_feature_name + [f_name]] else: - df_c = self.data.loc[:, [f_name]] + df_c = data.loc[:, [f_name]] # convert time feature to epoch df_c[f_name] = df_c[f_name].apply( @@ -148,13 +210,13 @@ def _infer_delta_min_and_max( # noqa: C901 else: # Use pandas' diff() to pull all the deltas for this feature if isinstance(id_feature_name, list): - deltas = self.data.groupby(id_feature_name)[f_name].diff(1) + deltas = data.groupby(id_feature_name)[f_name].diff(1) elif isinstance(id_feature_name, str): - deltas = self.data.groupby([id_feature_name])[f_name].diff(1) + deltas = data.groupby([id_feature_name])[f_name].diff(1) else: - deltas = self.data[f_name].diff(1) + deltas = data[f_name].diff(1) - if f_name == self.time_feature_name: + if time_feature_deltas is None: time_feature_deltas = deltas # initial rates are same as deltas which will then be used as input @@ -213,13 +275,17 @@ def _infer_delta_min_and_max( # noqa: C901 delta_min = min(deltas.dropna()) # don't allow the time series time feature to go back in time # TODO: 15550: support user-specified min/max values - if f_name == self.time_feature_name: + if f_name == time_feature_name: features[f_name]['time_series']['delta_min'].append(max(0, delta_min / e)) else: delta_min = delta_min / e if delta_min > 0 else delta_min * e features[f_name]['time_series']['delta_min'].append(delta_min) - return features + return features, time_feature_deltas + # return { + # "features": features, + # "time_feature_deltas": time_feature_deltas + # } def _set_rate_delta_bounds(self, btype: str, bounds: Dict, features: Dict): """Set optinally-specified rate/delta bounds in the features dict.""" @@ -271,6 +337,7 @@ def _process( # noqa: C901 rate_boundaries: Optional[Dict] = None, delta_boundaries: Optional[Dict] = None, max_workers: Optional[int] = None, + **kwargs ) -> Dict: """ Infer time series attributes. @@ -673,14 +740,86 @@ def _process( # noqa: C901 else: features[self.time_feature_name]['bounds'] = {'allow_null': False} - features = self._infer_delta_min_and_max( - features=features, + # Process the time feature first so that the time_feature_deltas are cached + time_feature_attributes, time_feature_deltas = self._infer_delta_min_and_max( + # Double brackets forces the selected dataframe to remain a dataframe instead of a series + data=self.data[[self.time_feature_name]], + time_feature_name=self.time_feature_name, + features={key: features[key] for key in [self.time_feature_name]}, datetime_feature_formats=datetime_feature_formats, id_feature_name=id_feature_name, orders_of_derivatives=orders_of_derivatives, derived_orders=derived_orders ) + # Get the list of the other features to process + non_time_feature_names = list(set(features.keys())) + non_time_feature_names.remove(self.time_feature_name) + + max_workers = kwargs.pop("series_max_workers", None) + # The default with be to not use multiprocessing if the product of rows + # and columns is less than 25M. + if prod(self.data.shape) < 25_000_000 and max_workers is None: + max_workers = 0 + if max_workers is None or max_workers >= 1: + mp_context = mp.get_context("spawn") + futures: dict[Future, str] = dict() + task_func = partial( + standalone_infer_delta_min_and_max, + time_feature_name=self.time_feature_name, + datetime_feature_formats=datetime_feature_formats, + id_feature_name=id_feature_name, + orders_of_derivatives=orders_of_derivatives, + derived_orders=derived_orders, + time_feature_deltas=time_feature_deltas, + ) + + with ProcessPoolExecutor(max_workers=max_workers, mp_context=mp_context) as pool: + for feature in non_time_feature_names: + future = pool.submit( + task_func, + data=self.data[[feature]], + features={key: features[key] for key in [feature]}, + ) + futures[future] = feature + + temp_feature_attributes: dict[str, t.Any] = dict() + for future in as_completed(futures): + try: + response = future.result() + temp_feature_attributes.update(response[0]) + except Exception as e: + warnings.warn( + f"Infer_feature_attributes raised an exception " + f"while processing '{futures[future]}' ({str(e)})." + ) + + else: + temp_feature_attributes, _ = standalone_infer_delta_min_and_max( + data=self.data[non_time_feature_names], + time_feature_name=self.time_feature_name, + features={key: features[key] for key in non_time_feature_names}, + time_feature_deltas=time_feature_deltas, + datetime_feature_formats=datetime_feature_formats, + id_feature_name=id_feature_name, + orders_of_derivatives=orders_of_derivatives, + derived_orders=derived_orders, + ) + + # Add the time feature attributes + temp_feature_attributes.update(time_feature_attributes) + # Re-order the keys like the original dataframe by following the `features`` key order. + # features has not been modified since it was created and ordered by `InferFeatureAttributesDataFrame`. + temp_feature_attributes = { + k: temp_feature_attributes[k] for k in set(features.keys()) + } + + # Since features is a `SingleTableFeatureAttributes` object, in order preserve the class + # attributes while reordering, the key-values are cleared and then updated with the ordered temporary feature + # attributes. + features.clear() + features.update(temp_feature_attributes) + # Set any manually specified rate/delta boundaries if delta_boundaries is not None: self._set_rate_delta_bounds('delta', delta_boundaries, features) From 23c25764afec4d400346129b002b050b5d856816 Mon Sep 17 00:00:00 2001 From: jack-xia-dp <144161208+jackx111@users.noreply.github.com> Date: Wed, 25 Sep 2024 09:42:06 -0400 Subject: [PATCH 2/8] save --- howso/utilities/feature_attributes/base.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/howso/utilities/feature_attributes/base.py b/howso/utilities/feature_attributes/base.py index 5203f7d9..293f01b6 100644 --- a/howso/utilities/feature_attributes/base.py +++ b/howso/utilities/feature_attributes/base.py @@ -62,6 +62,10 @@ def __copy__(self) -> "FeatureAttributesBase": obj_copy.params = self.params return obj_copy + def copy(self): + """Ensure the copy method explicity calls the __copy__ method.""" + return self.__copy__() + def get_parameters(self) -> dict: """ Get the keyword arguments used with the initial call to infer_feature_attributes. @@ -628,7 +632,6 @@ def _process(self, # noqa: C901 # User passed only the format string feature_attributes[feature_name] = { 'type': 'continuous', - 'data_type': 'formatted_date_time', 'date_time_format': user_dt_format, } elif ( @@ -639,7 +642,6 @@ def _process(self, # noqa: C901 dt_format, dt_locale = user_dt_format feature_attributes[feature_name] = { 'type': 'continuous', - 'data_type': 'formatted_date_time', 'date_time_format': dt_format, 'locale': dt_locale, } From 02d2294245e8f1cba25119bfbb7d9687aa3cc3a6 Mon Sep 17 00:00:00 2001 From: jack-xia-dp <144161208+jackx111@users.noreply.github.com> Date: Wed, 25 Sep 2024 11:40:01 -0400 Subject: [PATCH 3/8] save --- howso/utilities/feature_attributes/pandas.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/howso/utilities/feature_attributes/pandas.py b/howso/utilities/feature_attributes/pandas.py index de5112ef..80427221 100644 --- a/howso/utilities/feature_attributes/pandas.py +++ b/howso/utilities/feature_attributes/pandas.py @@ -44,7 +44,7 @@ def _shard(data: pd.DataFrame, *, kwargs: dict[str, t.Any]): ifr_inst = InferFeatureAttributesDataFrame(data) # Filter out features that are not related to this shard. _kwargs = kwargs.copy() - if "features" in _kwargs: + if _kwargs.get("features") is not None: _kwargs['features'] = { k: v for k, v in _kwargs["features"].items() if k in data.columns From 284da432aef4fda049317eb64278f9bf46d375d0 Mon Sep 17 00:00:00 2001 From: jack-xia-dp <144161208+jackx111@users.noreply.github.com> Date: Wed, 25 Sep 2024 11:48:20 -0400 Subject: [PATCH 4/8] save --- .../feature_attributes/time_series.py | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/howso/utilities/feature_attributes/time_series.py b/howso/utilities/feature_attributes/time_series.py index 122571db..4d86ec39 100644 --- a/howso/utilities/feature_attributes/time_series.py +++ b/howso/utilities/feature_attributes/time_series.py @@ -1,4 +1,9 @@ -from concurrent.futures import as_completed, Future, ProcessPoolExecutor, ThreadPoolExecutor +from concurrent.futures import ( + as_completed, + Future, + ProcessPoolExecutor, + ThreadPoolExecutor +) from functools import partial import logging from math import e, isnan, prod @@ -22,16 +27,17 @@ SMALLEST_TIME_DELTA = 0.001 -def standalone_infer_delta_min_and_max( +def _delta_min_and_max_shard( data, time_feature_name, datetime_feature_formats: Optional[Dict] = None, derived_orders: Optional[Dict] = None, - features: Optional[dict | SingleTableFeatureAttributes] = None, + features: Optional[dict] = None, id_feature_name: Optional[Union[str, Iterable[str]]] = None, orders_of_derivatives: Optional[Dict] = None, time_feature_deltas: Optional[pd.Series] = None, -) -> t.Tuple[dict | SingleTableFeatureAttributes, pd.Series]: +) -> t.Tuple[dict, pd.Series]: + """Internal function to aid multiprocessing of feature attributes for series.""" return InferFeatureAttributesTimeSeries._infer_delta_min_and_max( data=data, @@ -58,15 +64,14 @@ def __init__(self, data: pd.DataFrame, time_feature_name: str): @staticmethod def _infer_delta_min_and_max( # noqa: C901 data, - features: dict, - time_feature_name, + time_feature_name: str, datetime_feature_formats: Optional[Dict] = None, + features: Optional[dict] = None, derived_orders: Optional[Dict] = None, - # feature_names: Optional[Iterable | str] = None, id_feature_name: Optional[Union[str, Iterable[str]]] = None, orders_of_derivatives: Optional[Dict] = None, time_feature_deltas: Optional[pd.Series] = None, - ) -> t.Tuple[dict | SingleTableFeatureAttributes, pd.Series]: + ) -> t.Tuple[dict, pd.Series]: """ Infer continuous feature delta_min, delta_max for each feature. @@ -765,7 +770,7 @@ def _process( # noqa: C901 mp_context = mp.get_context("spawn") futures: dict[Future, str] = dict() task_func = partial( - standalone_infer_delta_min_and_max, + _delta_min_and_max_shard, time_feature_name=self.time_feature_name, datetime_feature_formats=datetime_feature_formats, id_feature_name=id_feature_name, @@ -795,7 +800,7 @@ def _process( # noqa: C901 ) else: - temp_feature_attributes, _ = standalone_infer_delta_min_and_max( + temp_feature_attributes, _ = _delta_min_and_max_shard( data=self.data[non_time_feature_names], time_feature_name=self.time_feature_name, features={key: features[key] for key in non_time_feature_names}, From 2029393b56d87086eab8ed1b3f14021a138093da Mon Sep 17 00:00:00 2001 From: jack-xia-dp <144161208+jackx111@users.noreply.github.com> Date: Wed, 25 Sep 2024 12:13:04 -0400 Subject: [PATCH 5/8] save --- .../feature_attributes/time_series.py | 55 +++---------------- 1 file changed, 9 insertions(+), 46 deletions(-) diff --git a/howso/utilities/feature_attributes/time_series.py b/howso/utilities/feature_attributes/time_series.py index 4d86ec39..f1e0b5d5 100644 --- a/howso/utilities/feature_attributes/time_series.py +++ b/howso/utilities/feature_attributes/time_series.py @@ -27,30 +27,6 @@ SMALLEST_TIME_DELTA = 0.001 -def _delta_min_and_max_shard( - data, - time_feature_name, - datetime_feature_formats: Optional[Dict] = None, - derived_orders: Optional[Dict] = None, - features: Optional[dict] = None, - id_feature_name: Optional[Union[str, Iterable[str]]] = None, - orders_of_derivatives: Optional[Dict] = None, - time_feature_deltas: Optional[pd.Series] = None, -) -> t.Tuple[dict, pd.Series]: - """Internal function to aid multiprocessing of feature attributes for series.""" - - return InferFeatureAttributesTimeSeries._infer_delta_min_and_max( - data=data, - time_feature_name=time_feature_name, - features=features, - datetime_feature_formats=datetime_feature_formats, - derived_orders=derived_orders, - id_feature_name=id_feature_name, - orders_of_derivatives=orders_of_derivatives, - time_feature_deltas=time_feature_deltas - ) - - class InferFeatureAttributesTimeSeries: """Infer feature attributes for time series data.""" @@ -71,7 +47,7 @@ def _infer_delta_min_and_max( # noqa: C901 id_feature_name: Optional[Union[str, Iterable[str]]] = None, orders_of_derivatives: Optional[Dict] = None, time_feature_deltas: Optional[pd.Series] = None, - ) -> t.Tuple[dict, pd.Series]: + ) -> t.Tuple[dict | None, pd.Series]: """ Infer continuous feature delta_min, delta_max for each feature. @@ -103,16 +79,12 @@ def _infer_delta_min_and_max( # noqa: C901 to 2 will synthesize the 3rd order derivative value, and then use that synthed value to derive the 2nd and 1st order. - features : SingleTableFeatureAttributes, default None - (Optional) A partially filled SingleTableFeatureAttributes object. If partially filled + features : SingleTableFeatureAttributes | dict, default None + (Optional) A partially filled SingleTableFeatureAttributes objec or dict. If partially filled attributes for a feature are passed in, those parameters will be retained as is and the delta_min and delta_max attributes will be inferred. - features : Iterable or str, default None - (Optional) An interable or string of feature name(s) to calculate delta min and maxes for. - If `time_feature` is True, then this parameter should only contain one feature name. - id_feature_name : str or list of str, default None (Optional) The name(s) of the ID feature(s). @@ -131,18 +103,13 @@ def _infer_delta_min_and_max( # noqa: C901 Returns ------- - dict - Returns dict object of {`type`: "feature type"}} with column names - as keys as well as a Series of time feature deltas. If `time_feature` is not True, then this Series is a - passthrough of the provided `time_feature_deltas` parameter values. - - SingleTableFeatureAttributes | dict, pd.Series] + tuple[SingleTableFeatureAttributes | dict, pd.Series] Returns a SingleTableFeatureAttributes or dict object of {`type`: "feature type"}} with column names as keys as well as a Series of time feature deltas. If `time_feature` is not True, then this Series is a passthrough of the provided `time_feature_deltas` parameter values. """ - feature_names = set(features.keys()) + feature_names = list(set(features.keys())) if time_feature_name not in feature_names: if time_feature_deltas is None: @@ -152,7 +119,7 @@ def _infer_delta_min_and_max( # noqa: C901 # else we are processing the time feature else: time_feature_deltas = None - if len(feature_names) > 1: + if len(feature_names) > 1 and features is not None: features = {key: features[key] for key in features if key == time_feature_name} warnings.warn( "The features dictionary provide includes the time feature in addition to the non time features. " @@ -287,10 +254,6 @@ def _infer_delta_min_and_max( # noqa: C901 features[f_name]['time_series']['delta_min'].append(delta_min) return features, time_feature_deltas - # return { - # "features": features, - # "time_feature_deltas": time_feature_deltas - # } def _set_rate_delta_bounds(self, btype: str, bounds: Dict, features: Dict): """Set optinally-specified rate/delta bounds in the features dict.""" @@ -748,7 +711,7 @@ def _process( # noqa: C901 # Process the time feature first so that the time_feature_deltas are cached time_feature_attributes, time_feature_deltas = self._infer_delta_min_and_max( # Double brackets forces the selected dataframe to remain a dataframe instead of a series - data=self.data[[self.time_feature_name]], + data=self.data[[f for f in [id_feature_name, self.time_feature_name] if f is not None]], time_feature_name=self.time_feature_name, features={key: features[key] for key in [self.time_feature_name]}, datetime_feature_formats=datetime_feature_formats, @@ -770,7 +733,7 @@ def _process( # noqa: C901 mp_context = mp.get_context("spawn") futures: dict[Future, str] = dict() task_func = partial( - _delta_min_and_max_shard, + InferFeatureAttributesTimeSeries._infer_delta_min_and_max, time_feature_name=self.time_feature_name, datetime_feature_formats=datetime_feature_formats, id_feature_name=id_feature_name, @@ -783,7 +746,7 @@ def _process( # noqa: C901 for feature in non_time_feature_names: future = pool.submit( task_func, - data=self.data[[feature]], + data=self.data[[f for f in [id_feature_name, feature] if f is not None]], features={key: features[key] for key in [feature]}, ) futures[future] = feature From 34924efd1f6df564d19b5ae3d62141c372fa90c4 Mon Sep 17 00:00:00 2001 From: jack-xia-dp <144161208+jackx111@users.noreply.github.com> Date: Wed, 25 Sep 2024 12:53:14 -0400 Subject: [PATCH 6/8] save --- howso/utilities/feature_attributes/base.py | 4 -- .../feature_attributes/time_series.py | 62 +++++++++---------- 2 files changed, 28 insertions(+), 38 deletions(-) diff --git a/howso/utilities/feature_attributes/base.py b/howso/utilities/feature_attributes/base.py index 293f01b6..cb6d2325 100644 --- a/howso/utilities/feature_attributes/base.py +++ b/howso/utilities/feature_attributes/base.py @@ -62,10 +62,6 @@ def __copy__(self) -> "FeatureAttributesBase": obj_copy.params = self.params return obj_copy - def copy(self): - """Ensure the copy method explicity calls the __copy__ method.""" - return self.__copy__() - def get_parameters(self) -> dict: """ Get the keyword arguments used with the initial call to infer_feature_attributes. diff --git a/howso/utilities/feature_attributes/time_series.py b/howso/utilities/feature_attributes/time_series.py index f1e0b5d5..9e8bedc4 100644 --- a/howso/utilities/feature_attributes/time_series.py +++ b/howso/utilities/feature_attributes/time_series.py @@ -2,7 +2,6 @@ as_completed, Future, ProcessPoolExecutor, - ThreadPoolExecutor ) from functools import partial import logging @@ -39,24 +38,27 @@ def __init__(self, data: pd.DataFrame, time_feature_name: str): @staticmethod def _infer_delta_min_and_max( # noqa: C901 - data, + data: pd.DataFrame, time_feature_name: str, - datetime_feature_formats: Optional[Dict] = None, - features: Optional[dict] = None, - derived_orders: Optional[Dict] = None, - id_feature_name: Optional[Union[str, Iterable[str]]] = None, - orders_of_derivatives: Optional[Dict] = None, - time_feature_deltas: Optional[pd.Series] = None, - ) -> t.Tuple[dict | None, pd.Series]: + datetime_feature_formats: t.Optional[dict] = None, + features: t.Optional[dict] = None, + derived_orders: t.Optional[Dict] = None, + id_feature_name: t.Optional[t.Union[str, t.Iterable[str]]] = None, + orders_of_derivatives: t.Optional[dict] = None, + time_feature_deltas: t.Optional[pd.Series] = None, + ) -> t.Tuple[dict, pd.Series]: """ Infer continuous feature delta_min, delta_max for each feature. - Can either process all of the features or a subset of features. If processing the time feature, this method - is intended to be used only used on the time feature. All of the other features may be then processed together - or individually. + If processing the time feature, this method is intended to be used only used on the time feature. + This is done so when multiprocessing is used, the time feature is processed first to cache deltas. + All of the other features may be then processed together or individually. Parameters ---------- + data : pd.Dataframe + A dataframe of the data to infer min and max for. + datetime_feature_formats : dict, default None (Optional) Dict defining a custom (non-ISO8601) datetime format and an optional locale for features with datetimes. By default @@ -79,8 +81,8 @@ def _infer_delta_min_and_max( # noqa: C901 to 2 will synthesize the 3rd order derivative value, and then use that synthed value to derive the 2nd and 1st order. - features : SingleTableFeatureAttributes | dict, default None - (Optional) A partially filled SingleTableFeatureAttributes objec or dict. If partially filled + features : dict, default None + (Optional) A partially filled features dict. If partially filled attributes for a feature are passed in, those parameters will be retained as is and the delta_min and delta_max attributes will be inferred. @@ -96,34 +98,31 @@ def _infer_delta_min_and_max( # noqa: C901 continuous features have an order value of 1. time_feature_deltas : pd.Series, default None - (Optional) Series of time feature deltas. If `time_feature` is not True, then - `time_feature_deltas` must be provided. time_feature_deltas` is a pandas series that is - created when `_infer_delta_min_and_max` is called on a time feature and setting - `time_feature` to True. + (Optional) Series of time feature deltas. If not processing the time feature, then + `time_feature_deltas` must be provided. Returns ------- - tuple[SingleTableFeatureAttributes | dict, pd.Series] - Returns a SingleTableFeatureAttributes or dict object of {`type`: "feature type"}} with column names - as keys as well as a Series of time feature deltas. If `time_feature` is not True, then this Series is a + tuple[dict, pd.Series] + Returns a tuple of a dictionary of {`type`: "feature type"}} with column names + as keys and a Series of time feature deltas. If `time_feature` is not True, then this Series is a passthrough of the provided `time_feature_deltas` parameter values. """ - feature_names = list(set(features.keys())) if time_feature_name not in feature_names: if time_feature_deltas is None: raise ValueError( - "If not a time feature, then calculated time_feature_deltas must be provided" + "If not a time feature, then calculated `time_feature_deltas` must be provided" ) # else we are processing the time feature else: time_feature_deltas = None - if len(feature_names) > 1 and features is not None: - features = {key: features[key] for key in features if key == time_feature_name} + if len(feature_names) > 1: + features = {f: features[f] for f in features if f == time_feature_name} warnings.warn( - "The features dictionary provide includes the time feature in addition to the non time features. " - "These feature types must be processed separately, ignoring non time features." + "The features dictionary provided includes the time feature in addition to the non time features. " + "These feature types must be processed separately. Ignoring non time features." ) if orders_of_derivatives is None: @@ -305,7 +304,6 @@ def _process( # noqa: C901 rate_boundaries: Optional[Dict] = None, delta_boundaries: Optional[Dict] = None, max_workers: Optional[int] = None, - **kwargs ) -> Dict: """ Infer time series attributes. @@ -708,9 +706,8 @@ def _process( # noqa: C901 else: features[self.time_feature_name]['bounds'] = {'allow_null': False} - # Process the time feature first so that the time_feature_deltas are cached + # Process the time feature first so that the time_feature_deltas are cached in case multiprocessing is used time_feature_attributes, time_feature_deltas = self._infer_delta_min_and_max( - # Double brackets forces the selected dataframe to remain a dataframe instead of a series data=self.data[[f for f in [id_feature_name, self.time_feature_name] if f is not None]], time_feature_name=self.time_feature_name, features={key: features[key] for key in [self.time_feature_name]}, @@ -724,9 +721,6 @@ def _process( # noqa: C901 non_time_feature_names = list(set(features.keys())) non_time_feature_names.remove(self.time_feature_name) - max_workers = kwargs.pop("series_max_workers", None) - # The default with be to not use multiprocessing if the product of rows - # and columns is less than 25M. if prod(self.data.shape) < 25_000_000 and max_workers is None: max_workers = 0 if max_workers is None or max_workers >= 1: @@ -763,7 +757,7 @@ def _process( # noqa: C901 ) else: - temp_feature_attributes, _ = _delta_min_and_max_shard( + temp_feature_attributes, _ = InferFeatureAttributesTimeSeries._infer_delta_min_and_max( data=self.data[non_time_feature_names], time_feature_name=self.time_feature_name, features={key: features[key] for key in non_time_feature_names}, From 59327aa474cace9eaa5a09af933bcfa875b324aa Mon Sep 17 00:00:00 2001 From: jack-xia-dp <144161208+jackx111@users.noreply.github.com> Date: Wed, 25 Sep 2024 13:42:48 -0400 Subject: [PATCH 7/8] save --- .../feature_attributes/time_series.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/howso/utilities/feature_attributes/time_series.py b/howso/utilities/feature_attributes/time_series.py index 9e8bedc4..6f8fcf3f 100644 --- a/howso/utilities/feature_attributes/time_series.py +++ b/howso/utilities/feature_attributes/time_series.py @@ -161,7 +161,7 @@ def _infer_delta_min_and_max( # noqa: C901 # copy just the id columns and the time feature if isinstance(id_feature_name, str): df_c = data.loc[:, [id_feature_name, f_name]] - elif isinstance(id_feature_name, list): + elif isinstance(id_feature_name, list) and len(id_feature_name) > 0: df_c = data.loc[:, id_feature_name + [f_name]] else: df_c = data.loc[:, [f_name]] @@ -171,7 +171,7 @@ def _infer_delta_min_and_max( # noqa: C901 lambda x: date_to_epoch(x, dt_format)) # use Pandas' diff() to pull all the deltas for this feature - if isinstance(id_feature_name, list): + if isinstance(id_feature_name, list) and len(id_feature_name) > 0: deltas = df_c.groupby(id_feature_name)[f_name].diff(1) elif isinstance(id_feature_name, str): deltas = df_c.groupby([id_feature_name])[f_name].diff(1) @@ -180,7 +180,7 @@ def _infer_delta_min_and_max( # noqa: C901 else: # Use pandas' diff() to pull all the deltas for this feature - if isinstance(id_feature_name, list): + if isinstance(id_feature_name, list) and len(id_feature_name) > 0: deltas = data.groupby(id_feature_name)[f_name].diff(1) elif isinstance(id_feature_name, str): deltas = data.groupby([id_feature_name])[f_name].diff(1) @@ -626,7 +626,7 @@ def _process( # noqa: C901 if isinstance(id_feature_name, str): id_feature_names = [id_feature_name] elif isinstance(id_feature_name, Iterable): - id_feature_names = id_feature_name + id_feature_names = list(id_feature_name) else: id_feature_names = [] @@ -708,11 +708,11 @@ def _process( # noqa: C901 # Process the time feature first so that the time_feature_deltas are cached in case multiprocessing is used time_feature_attributes, time_feature_deltas = self._infer_delta_min_and_max( - data=self.data[[f for f in [id_feature_name, self.time_feature_name] if f is not None]], + data=self.data[((id_feature_names if id_feature_names is not None else []) + [self.time_feature_name])], time_feature_name=self.time_feature_name, features={key: features[key] for key in [self.time_feature_name]}, datetime_feature_formats=datetime_feature_formats, - id_feature_name=id_feature_name, + id_feature_name=id_feature_names, orders_of_derivatives=orders_of_derivatives, derived_orders=derived_orders ) @@ -730,7 +730,7 @@ def _process( # noqa: C901 InferFeatureAttributesTimeSeries._infer_delta_min_and_max, time_feature_name=self.time_feature_name, datetime_feature_formats=datetime_feature_formats, - id_feature_name=id_feature_name, + id_feature_name=id_feature_names, orders_of_derivatives=orders_of_derivatives, derived_orders=derived_orders, time_feature_deltas=time_feature_deltas, @@ -740,7 +740,7 @@ def _process( # noqa: C901 for feature in non_time_feature_names: future = pool.submit( task_func, - data=self.data[[f for f in [id_feature_name, feature] if f is not None]], + data=self.data[((id_feature_names if id_feature_names is not None else []) + [feature])], features={key: features[key] for key in [feature]}, ) futures[future] = feature @@ -763,7 +763,7 @@ def _process( # noqa: C901 features={key: features[key] for key in non_time_feature_names}, time_feature_deltas=time_feature_deltas, datetime_feature_formats=datetime_feature_formats, - id_feature_name=id_feature_name, + id_feature_name=id_feature_names, orders_of_derivatives=orders_of_derivatives, derived_orders=derived_orders, ) From 75ee62996c322f3d7e5c2c6f7991969887cd46c3 Mon Sep 17 00:00:00 2001 From: jack-xia-dp <144161208+jackx111@users.noreply.github.com> Date: Wed, 25 Sep 2024 14:56:03 -0400 Subject: [PATCH 8/8] cleanup --- howso/utilities/feature_attributes/base.py | 2 ++ howso/utilities/feature_attributes/time_series.py | 13 +++++++------ 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/howso/utilities/feature_attributes/base.py b/howso/utilities/feature_attributes/base.py index cb6d2325..5203f7d9 100644 --- a/howso/utilities/feature_attributes/base.py +++ b/howso/utilities/feature_attributes/base.py @@ -628,6 +628,7 @@ def _process(self, # noqa: C901 # User passed only the format string feature_attributes[feature_name] = { 'type': 'continuous', + 'data_type': 'formatted_date_time', 'date_time_format': user_dt_format, } elif ( @@ -638,6 +639,7 @@ def _process(self, # noqa: C901 dt_format, dt_locale = user_dt_format feature_attributes[feature_name] = { 'type': 'continuous', + 'data_type': 'formatted_date_time', 'date_time_format': dt_format, 'locale': dt_locale, } diff --git a/howso/utilities/feature_attributes/time_series.py b/howso/utilities/feature_attributes/time_series.py index 6f8fcf3f..40691702 100644 --- a/howso/utilities/feature_attributes/time_series.py +++ b/howso/utilities/feature_attributes/time_series.py @@ -57,7 +57,7 @@ def _infer_delta_min_and_max( # noqa: C901 Parameters ---------- data : pd.Dataframe - A dataframe of the data to infer min and max for. + A dataframe of the data to infer min and max deltas for. datetime_feature_formats : dict, default None (Optional) Dict defining a custom (non-ISO8601) datetime format and @@ -120,6 +120,7 @@ def _infer_delta_min_and_max( # noqa: C901 time_feature_deltas = None if len(feature_names) > 1: features = {f: features[f] for f in features if f == time_feature_name} + feature_names = [time_feature_name] warnings.warn( "The features dictionary provided includes the time feature in addition to the non time features. " "These feature types must be processed separately. Ignoring non time features." @@ -187,7 +188,7 @@ def _infer_delta_min_and_max( # noqa: C901 else: deltas = data[f_name].diff(1) - if time_feature_deltas is None: + if f_name == time_feature_name: time_feature_deltas = deltas # initial rates are same as deltas which will then be used as input @@ -706,11 +707,11 @@ def _process( # noqa: C901 else: features[self.time_feature_name]['bounds'] = {'allow_null': False} - # Process the time feature first so that the time_feature_deltas are cached in case multiprocessing is used + # Process the time feature first so that the `time_feature_deltas` are cached in case multiprocessing is used time_feature_attributes, time_feature_deltas = self._infer_delta_min_and_max( data=self.data[((id_feature_names if id_feature_names is not None else []) + [self.time_feature_name])], time_feature_name=self.time_feature_name, - features={key: features[key] for key in [self.time_feature_name]}, + features={f: features[f] for f in [self.time_feature_name]}, datetime_feature_formats=datetime_feature_formats, id_feature_name=id_feature_names, orders_of_derivatives=orders_of_derivatives, @@ -741,7 +742,7 @@ def _process( # noqa: C901 future = pool.submit( task_func, data=self.data[((id_feature_names if id_feature_names is not None else []) + [feature])], - features={key: features[key] for key in [feature]}, + features={f: features[f] for f in [feature]}, ) futures[future] = feature @@ -760,7 +761,7 @@ def _process( # noqa: C901 temp_feature_attributes, _ = InferFeatureAttributesTimeSeries._infer_delta_min_and_max( data=self.data[non_time_feature_names], time_feature_name=self.time_feature_name, - features={key: features[key] for key in non_time_feature_names}, + features={f: features[f] for f in non_time_feature_names}, time_feature_deltas=time_feature_deltas, datetime_feature_formats=datetime_feature_formats, id_feature_name=id_feature_names,