From ba97ee4e155cd91b37165377da9a63672277eaea Mon Sep 17 00:00:00 2001 From: "P. Sai Vinay" Date: Thu, 16 Sep 2021 19:09:02 +0530 Subject: [PATCH 1/2] Improve performance with efficient use of FieldMappingCache --- eland/field_mappings.py | 18 ++--- eland/query_compiler.py | 71 +++++++++---------- .../test_field_name_pd_dtype_pytest.py | 6 +- 3 files changed, 43 insertions(+), 52 deletions(-) diff --git a/eland/field_mappings.py b/eland/field_mappings.py index 92ac7f72..825202f6 100644 --- a/eland/field_mappings.py +++ b/eland/field_mappings.py @@ -653,7 +653,7 @@ def date_field_format(self, es_field_name: str) -> str: self._mappings_capabilities.es_field_name == es_field_name ].es_date_format.squeeze() - def field_name_pd_dtype(self, es_field_name: str) -> str: + def field_name_pd_dtype(self, es_field_name: str) -> Tuple[bool, Optional[str]]: """ Parameters ---------- @@ -661,21 +661,17 @@ def field_name_pd_dtype(self, es_field_name: str) -> str: Returns ------- - pd_dtype: str - The pandas data type we map to + Tuple[bool, Optional[str]] + If es_field_name is source field and the pandas data type we map to - Raises - ------ - KeyError - If es_field_name does not exist in mapping """ if es_field_name not in self._mappings_capabilities.es_field_name: - raise KeyError(f"es_field_name {es_field_name} does not exist") + return False, "object" - pd_dtype = self._mappings_capabilities.loc[ + df: pd.DataFrame = self._mappings_capabilities.loc[ self._mappings_capabilities.es_field_name == es_field_name - ].pd_dtype.squeeze() - return pd_dtype + ] + return df.is_source.squeeze(), df.pd_dtype.squeeze() def add_scripted_field( self, scripted_field_name: str, display_name: str, pd_dtype: str diff --git a/eland/query_compiler.py b/eland/query_compiler.py index 6dc032d4..5da4f1aa 100644 --- a/eland/query_compiler.py +++ b/eland/query_compiler.py @@ -81,17 +81,20 @@ def __init__( Union[str, List[str], Tuple[str, ...], "Elasticsearch"] ] = None, index_pattern: Optional[str] = None, - display_names=None, - index_field=None, - to_copy=None, + display_names: Optional[List[str]] = None, + index_field: Optional[str] = None, + to_copy: Optional["QueryCompiler"] = None, ) -> None: # Implement copy as we don't deep copy the client if to_copy is not None: - self._client = to_copy._client - self._index_pattern = to_copy._index_pattern + self._client: "Elasticsearch" = to_copy._client + self._index_pattern: Optional[str] = to_copy._index_pattern self._index: "Index" = Index(self, to_copy._index.es_index_field) self._operations: "Operations" = copy.deepcopy(to_copy._operations) self._mappings: FieldMappings = copy.deepcopy(to_copy._mappings) + self._field_mapping_cache: Optional["FieldMappingCache"] = copy.deepcopy( + to_copy._field_mapping_cache + ) else: self._client = ensure_es_client(client) self._index_pattern = index_pattern @@ -104,6 +107,8 @@ def __init__( ) self._index = Index(self, index_field) self._operations = Operations() + # This should only be initialized when ETL is done + self._field_mapping_cache = None @property def index(self) -> Index: @@ -239,7 +244,8 @@ def _es_results_to_pandas( # This is one of the most performance critical areas of eland, and it repeatedly calls # self._mappings.field_name_pd_dtype and self._mappings.date_field_format # therefore create a simple cache for this data - field_mapping_cache = FieldMappingCache(self._mappings) + if self._field_mapping_cache is None: + self._field_mapping_cache = FieldMappingCache(self._mappings) rows = [] index = [] @@ -266,7 +272,7 @@ def _es_results_to_pandas( index.append(index_field) # flatten row to map correctly to 2D DataFrame - rows.append(self._flatten_dict(row, field_mapping_cache)) + rows.append(self._flatten_dict(row)) # Create pandas DataFrame df = pd.DataFrame(data=rows, index=index) @@ -279,7 +285,7 @@ def _es_results_to_pandas( ) for missing in missing_field_names: - pd_dtype = self._mappings.field_name_pd_dtype(missing) + _, pd_dtype = self._field_mapping_cache.field_name_pd_dtype(missing) df[missing] = pd.Series(dtype=pd_dtype) # Rename columns @@ -291,7 +297,7 @@ def _es_results_to_pandas( return df - def _flatten_dict(self, y, field_mapping_cache: "FieldMappingCache"): + def _flatten_dict(self, y): out = {} def flatten(x, name=""): @@ -301,12 +307,10 @@ def flatten(x, name=""): is_source_field = False pd_dtype = "object" else: - try: - pd_dtype = field_mapping_cache.field_name_pd_dtype(name[:-1]) - is_source_field = True - except KeyError: - is_source_field = False - pd_dtype = "object" + ( + is_source_field, + pd_dtype, + ) = self._field_mapping_cache.field_name_pd_dtype(name[:-1]) if not is_source_field and isinstance(x, dict): for a in x: @@ -321,7 +325,7 @@ def flatten(x, name=""): # Coerce types - for now just datetime if pd_dtype == "datetime64[ns]": x = elasticsearch_date_to_pandas_date( - x, field_mapping_cache.date_field_format(field_name) + x, self._field_mapping_cache.date_field_format(field_name) ) # Elasticsearch can have multiple values for a field. These are represented as lists, so @@ -791,28 +795,21 @@ class FieldMappingCache: def __init__(self, mappings: "FieldMappings") -> None: self._mappings = mappings + # This returns all the es_field_names + self._es_field_names: List[str] = mappings.get_field_names() + # Cache these to re-use later + self._field_name_pd_dtype: Dict[str, Tuple[bool, Optional[str]]] = { + i: mappings.field_name_pd_dtype(i) for i in self._es_field_names + } + self._date_field_format: Dict[str, str] = { + i: mappings.date_field_format(i) for i in self._es_field_names + } - self._field_name_pd_dtype: Dict[str, str] = dict() - self._date_field_format: Dict[str, str] = dict() - - def field_name_pd_dtype(self, es_field_name: str) -> str: - if es_field_name in self._field_name_pd_dtype: + def field_name_pd_dtype(self, es_field_name: str) -> Tuple[bool, Optional[str]]: + if es_field_name not in self._field_name_pd_dtype: + return False, "object" + else: return self._field_name_pd_dtype[es_field_name] - pd_dtype = self._mappings.field_name_pd_dtype(es_field_name) - - # cache this - self._field_name_pd_dtype[es_field_name] = pd_dtype - - return pd_dtype - def date_field_format(self, es_field_name: str) -> str: - if es_field_name in self._date_field_format: - return self._date_field_format[es_field_name] - - es_date_field_format = self._mappings.date_field_format(es_field_name) - - # cache this - self._date_field_format[es_field_name] = es_date_field_format - - return es_date_field_format + return self._date_field_format[es_field_name] diff --git a/tests/field_mappings/test_field_name_pd_dtype_pytest.py b/tests/field_mappings/test_field_name_pd_dtype_pytest.py index 4c2f8c5b..90b609c6 100644 --- a/tests/field_mappings/test_field_name_pd_dtype_pytest.py +++ b/tests/field_mappings/test_field_name_pd_dtype_pytest.py @@ -16,7 +16,6 @@ # under the License. # File called _pytest for PyCharm compatability -import pytest from pandas.testing import assert_series_equal from eland.field_mappings import FieldMappings @@ -35,7 +34,7 @@ def test_all_formats(self): assert_series_equal(pd_flights.dtypes, ed_field_mappings.dtypes()) for es_field_name in FLIGHTS_MAPPING["mappings"]["properties"].keys(): - pd_dtype = ed_field_mappings.field_name_pd_dtype(es_field_name) + _, pd_dtype = ed_field_mappings.field_name_pd_dtype(es_field_name) assert pd_flights[es_field_name].dtype == pd_dtype @@ -44,5 +43,4 @@ def test_non_existant(self): client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME ) - with pytest.raises(KeyError): - ed_field_mappings.field_name_pd_dtype("unknown") + assert (False, "object") == ed_field_mappings.field_name_pd_dtype("unknown") From 9c9a8fc0e94fb5f95756e9b4390b57b4eaf63d9f Mon Sep 17 00:00:00 2001 From: "P. Sai Vinay" Date: Thu, 31 Mar 2022 21:20:00 +0530 Subject: [PATCH 2/2] Fix linting --- eland/operations.py | 10 +++++----- eland/query.py | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/eland/operations.py b/eland/operations.py index 2ff75f7e..8dbb0c3b 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -416,7 +416,7 @@ def _metric_aggs( fields=fields, es_aggs=es_aggs, pd_aggs=pd_aggs, - response=response, + response=response, # type: ignore numeric_only=numeric_only, is_dataframe_agg=is_dataframe_agg, percentiles=percentiles, @@ -453,7 +453,7 @@ def _terms_aggs( body.terms_aggs(field, func, field, es_size=es_size) response = query_compiler._client.search( - index=query_compiler._index_pattern, size=0, body=body.to_search_body() + index=query_compiler._index_pattern, size=0, **body.to_search_body() ) results = {} @@ -499,7 +499,7 @@ def _hist_aggs( body.hist_aggs(field, field, min_aggs[field], max_aggs[field], num_bins) response = query_compiler._client.search( - index=query_compiler._index_pattern, size=0, body=body.to_search_body() + index=query_compiler._index_pattern, size=0, **body.to_search_body() ) # results are like # "aggregations" : { @@ -1040,7 +1040,7 @@ def bucket_generator( res = query_compiler._client.search( index=query_compiler._index_pattern, size=0, - body=body.to_search_body(), + **body.to_search_body(), ) # Pagination Logic @@ -1539,7 +1539,7 @@ def _search_yield_hits( try: pit_id = client.open_point_in_time( - index=query_compiler._index_pattern, keep_alive=DEFAULT_PIT_KEEP_ALIVE + index=query_compiler._index_pattern, keep_alive=DEFAULT_PIT_KEEP_ALIVE # type: ignore )["id"] # Modify the search with the new point in time ID and keep-alive time. diff --git a/eland/query.py b/eland/query.py index be20d1f2..16c803a9 100644 --- a/eland/query.py +++ b/eland/query.py @@ -322,11 +322,11 @@ def to_search_body(self) -> Dict[str, Any]: body["query"] = self._query.build() return body - def to_count_body(self) -> Optional[Dict[str, Any]]: + def to_count_body(self) -> Dict[str, Any]: if len(self._aggs) > 0: warnings.warn(f"Requesting count for agg query {self}") if self._query.empty(): - return None + return {} else: return {"query": self._query.build()}