From 59495258e5c7df5e1e198a6257d8b4974774b391 Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Mon, 11 Nov 2019 12:00:53 -0600 Subject: [PATCH 1/8] Fix on sample error --- optimus/profiler/profiler.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/optimus/profiler/profiler.py b/optimus/profiler/profiler.py index 416e7fa6..4a5d6858 100644 --- a/optimus/profiler/profiler.py +++ b/optimus/profiler/profiler.py @@ -311,6 +311,7 @@ def dataset(self, df, columns="*", buckets=10, infer=False, relative_error=RELAT :param stats: calculate stats, if not only data table returned :param format: dict or json :param mismatch: + :param advanced_stats: :return: dict or json """ output_columns = self.output_columns @@ -350,10 +351,11 @@ def dataset(self, df, columns="*", buckets=10, infer=False, relative_error=RELAT assign(output_columns, "summary.missing_count", total_count_na, dict) assign(output_columns, "summary.p_missing", round(total_count_na / self.rows_count * 100, 2)) - sample = {"columns": [{"title": cols} for cols in df.cols.names()], - "value": df.sample_n(sample).rows.to_list(columns)} + # TODO: drop, rename and move operation must affect the sample + sample = {"columns": [{"title": cols} for cols in df.cols.names()], + "value": df.sample_n(sample).rows.to_list(columns)} - assign(output_columns, "sample", sample, dict) + assign(output_columns, "sample", sample, dict) actual_columns = output_columns["columns"] # Order columns @@ -464,6 +466,8 @@ def columns_stats(self, df, columns, buckets=10, infer=False, relative_error=REL return columns_info + + def columns_agg(self, df, columns, buckets=10, relative_error=RELATIVE_ERROR, approx_count=True, advanced_stats=True): columns = parse_columns(df, columns) n = BATCH_SIZE From f0e6edb15b465299ac73655b0b510372df10a17c Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Mon, 11 Nov 2019 12:56:19 -0600 Subject: [PATCH 2/8] Now comm accepts url as param an can be set in bumblebee.ini --- optimus/bumblebee.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/optimus/bumblebee.py b/optimus/bumblebee.py index dfc27330..8459baec 100644 --- a/optimus/bumblebee.py +++ b/optimus/bumblebee.py @@ -20,7 +20,6 @@ END_POINT = FULL_DOMAIN_API + "/dataset" DOMAIN_APP = "app.hi-bumblebee.com" -FULL_DOMAIN = PROTOCOL_SSL + DOMAIN_APP class Comm: @@ -28,9 +27,15 @@ class Comm: Send encrypted message to the Bumblebee """ - def __init__(self, queue_name=None, key=None): + def __init__(self, url=None, queue_name=None, key=None): # If queue_name was not given try lo load from file if not generate one + if url is None: + FULL_DOMAIN = save_config_key("bumblebee.ini", "DEFAULT", "url", PROTOCOL_SSL + DOMAIN_APP) + print(FULL_DOMAIN) + else: + FULL_DOMAIN = url + if queue_name is None: self.queue_name = save_config_key("bumblebee.ini", "DEFAULT", "QueueName", str(uuid.uuid4())) else: @@ -44,7 +49,7 @@ def __init__(self, queue_name=None, key=None): self.key = key keys_link = " here".format(FULL_DOMAIN=FULL_DOMAIN, - SESSION=self.queue_name, KEY=self.key) + SESSION=self.queue_name, KEY=self.key) direct_link = "{FULL_DOMAIN}".format( FULL_DOMAIN=FULL_DOMAIN, SESSION=self.queue_name, KEY=self.key) @@ -53,7 +58,6 @@ def __init__(self, queue_name=None, key=None): "Open Bumblebee: " + direct_link + "
If you really care about privacy get your keys in bumblebee.ini and put them" + keys_link + "
" - ) self.token = None From a1b1303b9a0969a78dddaa5dad8345fb9ed6242b Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Mon, 11 Nov 2019 13:17:56 -0600 Subject: [PATCH 3/8] Remove print --- optimus/profiler/profiler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/optimus/profiler/profiler.py b/optimus/profiler/profiler.py index 4a5d6858..be660fa1 100644 --- a/optimus/profiler/profiler.py +++ b/optimus/profiler/profiler.py @@ -444,10 +444,10 @@ def columns_stats(self, df, columns, buckets=10, infer=False, relative_error=REL # Calculate Frequency logger.print("Processing Frequency ...") df_freq = df.cols.select("*", data_type=PYSPARK_NUMERIC_TYPES, invert=True) + freq = None if df_freq is not None: freq = df_freq.cols.frequency("*", buckets, True, self.rows_count) - for col_name in columns: col_info = {} assign(col_info, "stats", stats[col_name], dict) From 78b801630f4455319e793059d2db1da19a7cc479 Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Mon, 11 Nov 2019 14:06:28 -0600 Subject: [PATCH 4/8] Better handling internal state --- optimus/bumblebee.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/optimus/bumblebee.py b/optimus/bumblebee.py index 8459baec..45295bff 100644 --- a/optimus/bumblebee.py +++ b/optimus/bumblebee.py @@ -17,9 +17,7 @@ DOMAIN_API = "api.hi-bumblebee.com" FULL_DOMAIN_API = PROTOCOL_SSL + DOMAIN_API -END_POINT = FULL_DOMAIN_API + "/dataset" - -DOMAIN_APP = "app.hi-bumblebee.com" +END_POINT = "/dataset" class Comm: @@ -30,11 +28,11 @@ class Comm: def __init__(self, url=None, queue_name=None, key=None): # If queue_name was not given try lo load from file if not generate one + if url is None: - FULL_DOMAIN = save_config_key("bumblebee.ini", "DEFAULT", "url", PROTOCOL_SSL + DOMAIN_APP) - print(FULL_DOMAIN) + self.url = save_config_key("bumblebee.ini", "DEFAULT", "url", FULL_DOMAIN_API) else: - FULL_DOMAIN = url + self.url = url if queue_name is None: self.queue_name = save_config_key("bumblebee.ini", "DEFAULT", "QueueName", str(uuid.uuid4())) @@ -47,12 +45,11 @@ def __init__(self, url=None, queue_name=None, key=None): self.key = save_config_key("bumblebee.ini", "DEFAULT", "Key", key.decode()) else: self.key = key - - keys_link = " here".format(FULL_DOMAIN=FULL_DOMAIN, + keys_link = " here".format(FULL_DOMAIN=self.url, SESSION=self.queue_name, KEY=self.key) direct_link = "{FULL_DOMAIN}".format( - FULL_DOMAIN=FULL_DOMAIN, SESSION=self.queue_name, KEY=self.key) + FULL_DOMAIN=self.url, SESSION=self.queue_name, KEY=self.key) print_html( "Open Bumblebee: " + direct_link + @@ -92,7 +89,8 @@ def send(self, message): headers = {'content-type': 'application/json'} data = json.dumps({"username": self.queue_name, "data": self.token}) - response = requests.post(END_POINT, data=data, headers=headers) + end_point_dataset = self.url + END_POINT + response = requests.post(end_point_dataset, data=data, headers=headers) # If the response was successful, no Exception will be raised response.raise_for_status() From 763fefc603f4ad848ef0ab4d886bea97ae2ba7d8 Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Tue, 12 Nov 2019 11:46:04 -0600 Subject: [PATCH 5/8] Bumblebee accepts api url and app url params --- optimus/bumblebee.py | 64 ++++++++++++++++++++++++++++---------------- 1 file changed, 41 insertions(+), 23 deletions(-) diff --git a/optimus/bumblebee.py b/optimus/bumblebee.py index 45295bff..87b8ba3f 100644 --- a/optimus/bumblebee.py +++ b/optimus/bumblebee.py @@ -15,24 +15,37 @@ PROTOCOL = "http://" PROTOCOL_SSL = "https://" +# API DOMAIN_API = "api.hi-bumblebee.com" -FULL_DOMAIN_API = PROTOCOL_SSL + DOMAIN_API +FULL_API_URL = PROTOCOL_SSL + DOMAIN_API + +# API END POINTS END_POINT = "/dataset" +# APP +DOMAIN_APP = "app.hi-bumblebee.com" +FULL_APP_URL = PROTOCOL_SSL + DOMAIN_APP + class Comm: """ Send encrypted message to the Bumblebee """ - def __init__(self, url=None, queue_name=None, key=None): + def __init__(self, app_url=None, api_url=None, queue_name=None, key=None): # If queue_name was not given try lo load from file if not generate one - if url is None: - self.url = save_config_key("bumblebee.ini", "DEFAULT", "url", FULL_DOMAIN_API) + if app_url is None: + self.app_url = save_config_key("bumblebee.ini", "DEFAULT", "appUrl", FULL_APP_URL) + else: + self.app_url = api_url + + # API + if api_url is None: + self.api_url = save_config_key("bumblebee.ini", "DEFAULT", "apiUrl", FULL_API_URL) else: - self.url = url + self.api_url = api_url if queue_name is None: self.queue_name = save_config_key("bumblebee.ini", "DEFAULT", "QueueName", str(uuid.uuid4())) @@ -45,11 +58,12 @@ def __init__(self, url=None, queue_name=None, key=None): self.key = save_config_key("bumblebee.ini", "DEFAULT", "Key", key.decode()) else: self.key = key - keys_link = " here".format(FULL_DOMAIN=self.url, + + keys_link = " here".format(FULL_DOMAIN=self.app_url, SESSION=self.queue_name, KEY=self.key) direct_link = "{FULL_DOMAIN}".format( - FULL_DOMAIN=self.url, SESSION=self.queue_name, KEY=self.key) + FULL_DOMAIN=self.app_url, SESSION=self.queue_name, KEY=self.key) print_html( "Open Bumblebee: " + direct_link + @@ -58,7 +72,6 @@ def __init__(self, url=None, queue_name=None, key=None): ) self.token = None - self.f = Fernet(self.key) @staticmethod @@ -75,31 +88,36 @@ def _encrypt(self, message): message = str(message).encode() return self.f.encrypt(message) - def send(self, message): + def send(self, message, output): """ Send the info to the queue :param message: + :param output: "http" or "json" :return: """ logger.print(message) self.token = self._encrypt(self._compress(message)).decode() logger.print(self.token) - try: - headers = {'content-type': 'application/json'} - - data = json.dumps({"username": self.queue_name, "data": self.token}) - end_point_dataset = self.url + END_POINT - response = requests.post(end_point_dataset, data=data, headers=headers) - - # If the response was successful, no Exception will be raised - response.raise_for_status() - except HTTPError as http_err: - print(f'HTTP error occurred: {http_err}') - except Exception as err: - print(f'Other error occurred: {err}') + data = json.dumps({"username": self.queue_name, "data": self.token}) + + if output == "http": + try: + headers = {'content-type': 'application/json'} + + end_point_dataset = self.api_url + END_POINT + response = requests.post(end_point_dataset, data=data, headers=headers) + + # If the response was successful, no Exception will be raised + response.raise_for_status() + except HTTPError as http_err: + print(f'HTTP error occurred: {http_err}') + except Exception as err: + print(f'Other error occurred: {err}') + else: + print('Send!') else: - print('Send!') + return data def _decrypt(self, token): return self.f.decrypt(token) From 08f92be3cd46c71e405a0f7218f96fb0db5f8ff3 Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Tue, 12 Nov 2019 11:46:30 -0600 Subject: [PATCH 6/8] Now send can return a json --- optimus/dataframe/extension.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/optimus/dataframe/extension.py b/optimus/dataframe/extension.py index ba0ca663..b6e6764a 100644 --- a/optimus/dataframe/extension.py +++ b/optimus/dataframe/extension.py @@ -462,7 +462,7 @@ def reset(self): @add_method(DataFrame) -def send(self, name=None, infer=True, mismatch=None, stats=True, advanced_stats=True): +def send(self, name=None, infer=True, mismatch=None, stats=True, advanced_stats=True, output="http"): """ Profile and send the data to the queue :param self: @@ -470,23 +470,24 @@ def send(self, name=None, infer=True, mismatch=None, stats=True, advanced_stats= :param mismatch: a dict with the column name or regular expression to identify correct values. :param name: Specified a name for the view/dataframe :param stats: calculate stats or only vales + :param output: :return: """ df = self if name is not None: df.set_name(name) - output = Profiler.instance.dataset(df, columns="*", buckets=35, infer=infer, relative_error=RELATIVE_ERROR, - approx_count=True, - sample=10000, - stats=stats, - format="json", - mismatch=mismatch, - advanced_stats=advanced_stats - ) + message = Profiler.instance.dataset(df, columns="*", buckets=35, infer=infer, relative_error=RELATIVE_ERROR, + approx_count=True, + sample=10000, + stats=stats, + format="json", + mismatch=mismatch, + advanced_stats=advanced_stats + ) if Comm.instance: - Comm.instance.send(output) + return Comm.instance.send(message, output=output) else: raise Exception("Comm is not initialized. Please use comm=True param like Optimus(comm=True)") From 19065b5d535480ae6ae250bdc74eb3b4c7858973 Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Tue, 12 Nov 2019 12:55:07 -0600 Subject: [PATCH 7/8] Fixed bug in frequency profiling update --- optimus/profiler/profiler.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/optimus/profiler/profiler.py b/optimus/profiler/profiler.py index be660fa1..c7441005 100644 --- a/optimus/profiler/profiler.py +++ b/optimus/profiler/profiler.py @@ -26,7 +26,6 @@ MAX_BUCKETS = 33 BATCH_SIZE = 20 - class Profiler: def __init__(self, output_path=None): @@ -324,13 +323,19 @@ def dataset(self, df, columns="*", buckets=10, infer=False, relative_error=RELAT rows_count = df.count() self.rows_count = rows_count self.cols_count = cols_count = len(df.columns) - output_columns = self.columns_stats(df, cols_to_profile, buckets, infer, relative_error, approx_count, - mismatch, advanced_stats) + updated_columns = self.columns_stats(df, cols_to_profile, buckets, infer, relative_error, approx_count, + mismatch, advanced_stats) # Update last profiling info # Merge old and current profiling if self.is_cached(): - output_columns["columns"].update(self.output_columns["columns"]) + for c in cols_to_profile: + # output_columns["columns"].update() + output_columns["columns"][c].update(updated_columns["columns"][c]) + else: + output_columns = updated_columns + + # output_columns = copy.deepcopy(self.output_columns) assign(output_columns, "name", df.get_name(), dict) assign(output_columns, "file_name", df.get_meta("file_name"), dict) @@ -443,17 +448,20 @@ def columns_stats(self, df, columns, buckets=10, infer=False, relative_error=REL # Calculate Frequency logger.print("Processing Frequency ...") - df_freq = df.cols.select("*", data_type=PYSPARK_NUMERIC_TYPES, invert=True) + # print("COLUMNS",columns) + df_freq = df.cols.select(columns, data_type=PYSPARK_NUMERIC_TYPES, invert=True) freq = None if df_freq is not None: freq = df_freq.cols.frequency("*", buckets, True, self.rows_count) + # print("FREQUENCY1", freq) for col_name in columns: col_info = {} assign(col_info, "stats", stats[col_name], dict) if freq is not None: if col_name in freq: + # print("ASSIGN") assign(col_info, "frequency", freq[col_name]) assign(col_info, "name", col_name) @@ -466,9 +474,8 @@ def columns_stats(self, df, columns, buckets=10, infer=False, relative_error=REL return columns_info - - - def columns_agg(self, df, columns, buckets=10, relative_error=RELATIVE_ERROR, approx_count=True, advanced_stats=True): + def columns_agg(self, df, columns, buckets=10, relative_error=RELATIVE_ERROR, approx_count=True, + advanced_stats=True): columns = parse_columns(df, columns) n = BATCH_SIZE list_columns = [columns[i * n:(i + 1) * n] for i in range((len(columns) + n - 1) // n)] @@ -584,8 +591,6 @@ def extra_columns_stats(df, col_name, stats): return result - - @staticmethod def missing_values(df, columns): """ From 2f255e58eb53338eb27bf1db832b29107b0022ef Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Tue, 12 Nov 2019 12:59:59 -0600 Subject: [PATCH 8/8] Better handling update profiler --- optimus/profiler/profiler.py | 37 ++++++++++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/optimus/profiler/profiler.py b/optimus/profiler/profiler.py index c7441005..b8d0e1cb 100644 --- a/optimus/profiler/profiler.py +++ b/optimus/profiler/profiler.py @@ -26,6 +26,28 @@ MAX_BUCKETS = 33 BATCH_SIZE = 20 +import collections +import six + +# python 3.8+ compatibility +try: + collectionsAbc = collections.abc +except: + collectionsAbc = collections + + +def update(d, u): + for k, v in six.iteritems(u): + dv = d.get(k, {}) + if not isinstance(dv, collectionsAbc.Mapping): + d[k] = v + elif isinstance(v, collectionsAbc.Mapping): + d[k] = update(dv, v) + else: + d[k] = v + return d + + class Profiler: def __init__(self, output_path=None): @@ -328,14 +350,13 @@ def dataset(self, df, columns="*", buckets=10, infer=False, relative_error=RELAT # Update last profiling info # Merge old and current profiling - if self.is_cached(): - for c in cols_to_profile: - # output_columns["columns"].update() - output_columns["columns"][c].update(updated_columns["columns"][c]) - else: - output_columns = updated_columns - - # output_columns = copy.deepcopy(self.output_columns) + # if self.is_cached(): + # for c in cols_to_profile: + # # output_columns["columns"].update() + # output_columns["columns"][c].update(updated_columns["columns"][c]) + # else: + # output_columns = updated_columns + output_columns = update(output_columns, updated_columns) assign(output_columns, "name", df.get_name(), dict) assign(output_columns, "file_name", df.get_meta("file_name"), dict)