diff --git a/optimus/bumblebee.py b/optimus/bumblebee.py
index dfc27330..87b8ba3f 100644
--- a/optimus/bumblebee.py
+++ b/optimus/bumblebee.py
@@ -15,12 +15,16 @@
PROTOCOL = "http://"
PROTOCOL_SSL = "https://"
+# API
DOMAIN_API = "api.hi-bumblebee.com"
-FULL_DOMAIN_API = PROTOCOL_SSL + DOMAIN_API
-END_POINT = FULL_DOMAIN_API + "/dataset"
+FULL_API_URL = PROTOCOL_SSL + DOMAIN_API
+# API END POINTS
+END_POINT = "/dataset"
+
+# APP
DOMAIN_APP = "app.hi-bumblebee.com"
-FULL_DOMAIN = PROTOCOL_SSL + DOMAIN_APP
+FULL_APP_URL = PROTOCOL_SSL + DOMAIN_APP
class Comm:
@@ -28,9 +32,21 @@ class Comm:
Send encrypted message to the Bumblebee
"""
- def __init__(self, queue_name=None, key=None):
+ def __init__(self, app_url=None, api_url=None, queue_name=None, key=None):
# If queue_name was not given try lo load from file if not generate one
+
+ if app_url is None:
+ self.app_url = save_config_key("bumblebee.ini", "DEFAULT", "appUrl", FULL_APP_URL)
+ else:
+ self.app_url = api_url
+
+ # API
+ if api_url is None:
+ self.api_url = save_config_key("bumblebee.ini", "DEFAULT", "apiUrl", FULL_API_URL)
+ else:
+ self.api_url = api_url
+
if queue_name is None:
self.queue_name = save_config_key("bumblebee.ini", "DEFAULT", "QueueName", str(uuid.uuid4()))
else:
@@ -43,21 +59,19 @@ def __init__(self, queue_name=None, key=None):
else:
self.key = key
- keys_link = " here".format(FULL_DOMAIN=FULL_DOMAIN,
- SESSION=self.queue_name, KEY=self.key)
+ keys_link = " here".format(FULL_DOMAIN=self.app_url,
+ SESSION=self.queue_name, KEY=self.key)
direct_link = "{FULL_DOMAIN}".format(
- FULL_DOMAIN=FULL_DOMAIN, SESSION=self.queue_name, KEY=self.key)
+ FULL_DOMAIN=self.app_url, SESSION=self.queue_name, KEY=self.key)
print_html(
"Open Bumblebee: " + direct_link +
"
If you really care about privacy get your keys in bumblebee.ini and put them" + keys_link + "
"
-
)
self.token = None
-
self.f = Fernet(self.key)
@staticmethod
@@ -74,30 +88,36 @@ def _encrypt(self, message):
message = str(message).encode()
return self.f.encrypt(message)
- def send(self, message):
+ def send(self, message, output):
"""
Send the info to the queue
:param message:
+ :param output: "http" or "json"
:return:
"""
logger.print(message)
self.token = self._encrypt(self._compress(message)).decode()
logger.print(self.token)
- try:
- headers = {'content-type': 'application/json'}
-
- data = json.dumps({"username": self.queue_name, "data": self.token})
- response = requests.post(END_POINT, data=data, headers=headers)
-
- # If the response was successful, no Exception will be raised
- response.raise_for_status()
- except HTTPError as http_err:
- print(f'HTTP error occurred: {http_err}')
- except Exception as err:
- print(f'Other error occurred: {err}')
+ data = json.dumps({"username": self.queue_name, "data": self.token})
+
+ if output == "http":
+ try:
+ headers = {'content-type': 'application/json'}
+
+ end_point_dataset = self.api_url + END_POINT
+ response = requests.post(end_point_dataset, data=data, headers=headers)
+
+ # If the response was successful, no Exception will be raised
+ response.raise_for_status()
+ except HTTPError as http_err:
+ print(f'HTTP error occurred: {http_err}')
+ except Exception as err:
+ print(f'Other error occurred: {err}')
+ else:
+ print('Send!')
else:
- print('Send!')
+ return data
def _decrypt(self, token):
return self.f.decrypt(token)
diff --git a/optimus/dataframe/extension.py b/optimus/dataframe/extension.py
index ba0ca663..b6e6764a 100644
--- a/optimus/dataframe/extension.py
+++ b/optimus/dataframe/extension.py
@@ -462,7 +462,7 @@ def reset(self):
@add_method(DataFrame)
-def send(self, name=None, infer=True, mismatch=None, stats=True, advanced_stats=True):
+def send(self, name=None, infer=True, mismatch=None, stats=True, advanced_stats=True, output="http"):
"""
Profile and send the data to the queue
:param self:
@@ -470,23 +470,24 @@ def send(self, name=None, infer=True, mismatch=None, stats=True, advanced_stats=
:param mismatch: a dict with the column name or regular expression to identify correct values.
:param name: Specified a name for the view/dataframe
:param stats: calculate stats or only vales
+ :param output:
:return:
"""
df = self
if name is not None:
df.set_name(name)
- output = Profiler.instance.dataset(df, columns="*", buckets=35, infer=infer, relative_error=RELATIVE_ERROR,
- approx_count=True,
- sample=10000,
- stats=stats,
- format="json",
- mismatch=mismatch,
- advanced_stats=advanced_stats
- )
+ message = Profiler.instance.dataset(df, columns="*", buckets=35, infer=infer, relative_error=RELATIVE_ERROR,
+ approx_count=True,
+ sample=10000,
+ stats=stats,
+ format="json",
+ mismatch=mismatch,
+ advanced_stats=advanced_stats
+ )
if Comm.instance:
- Comm.instance.send(output)
+ return Comm.instance.send(message, output=output)
else:
raise Exception("Comm is not initialized. Please use comm=True param like Optimus(comm=True)")
diff --git a/optimus/profiler/profiler.py b/optimus/profiler/profiler.py
index 416e7fa6..b8d0e1cb 100644
--- a/optimus/profiler/profiler.py
+++ b/optimus/profiler/profiler.py
@@ -26,6 +26,27 @@
MAX_BUCKETS = 33
BATCH_SIZE = 20
+import collections
+import six
+
+# python 3.8+ compatibility
+try:
+ collectionsAbc = collections.abc
+except:
+ collectionsAbc = collections
+
+
+def update(d, u):
+ for k, v in six.iteritems(u):
+ dv = d.get(k, {})
+ if not isinstance(dv, collectionsAbc.Mapping):
+ d[k] = v
+ elif isinstance(v, collectionsAbc.Mapping):
+ d[k] = update(dv, v)
+ else:
+ d[k] = v
+ return d
+
class Profiler:
@@ -311,6 +332,7 @@ def dataset(self, df, columns="*", buckets=10, infer=False, relative_error=RELAT
:param stats: calculate stats, if not only data table returned
:param format: dict or json
:param mismatch:
+ :param advanced_stats:
:return: dict or json
"""
output_columns = self.output_columns
@@ -323,13 +345,18 @@ def dataset(self, df, columns="*", buckets=10, infer=False, relative_error=RELAT
rows_count = df.count()
self.rows_count = rows_count
self.cols_count = cols_count = len(df.columns)
- output_columns = self.columns_stats(df, cols_to_profile, buckets, infer, relative_error, approx_count,
- mismatch, advanced_stats)
+ updated_columns = self.columns_stats(df, cols_to_profile, buckets, infer, relative_error, approx_count,
+ mismatch, advanced_stats)
# Update last profiling info
# Merge old and current profiling
- if self.is_cached():
- output_columns["columns"].update(self.output_columns["columns"])
+ # if self.is_cached():
+ # for c in cols_to_profile:
+ # # output_columns["columns"].update()
+ # output_columns["columns"][c].update(updated_columns["columns"][c])
+ # else:
+ # output_columns = updated_columns
+ output_columns = update(output_columns, updated_columns)
assign(output_columns, "name", df.get_name(), dict)
assign(output_columns, "file_name", df.get_meta("file_name"), dict)
@@ -350,10 +377,11 @@ def dataset(self, df, columns="*", buckets=10, infer=False, relative_error=RELAT
assign(output_columns, "summary.missing_count", total_count_na, dict)
assign(output_columns, "summary.p_missing", round(total_count_na / self.rows_count * 100, 2))
- sample = {"columns": [{"title": cols} for cols in df.cols.names()],
- "value": df.sample_n(sample).rows.to_list(columns)}
+ # TODO: drop, rename and move operation must affect the sample
+ sample = {"columns": [{"title": cols} for cols in df.cols.names()],
+ "value": df.sample_n(sample).rows.to_list(columns)}
- assign(output_columns, "sample", sample, dict)
+ assign(output_columns, "sample", sample, dict)
actual_columns = output_columns["columns"]
# Order columns
@@ -441,17 +469,20 @@ def columns_stats(self, df, columns, buckets=10, infer=False, relative_error=REL
# Calculate Frequency
logger.print("Processing Frequency ...")
- df_freq = df.cols.select("*", data_type=PYSPARK_NUMERIC_TYPES, invert=True)
+ # print("COLUMNS",columns)
+ df_freq = df.cols.select(columns, data_type=PYSPARK_NUMERIC_TYPES, invert=True)
+
freq = None
if df_freq is not None:
freq = df_freq.cols.frequency("*", buckets, True, self.rows_count)
-
+ # print("FREQUENCY1", freq)
for col_name in columns:
col_info = {}
assign(col_info, "stats", stats[col_name], dict)
if freq is not None:
if col_name in freq:
+ # print("ASSIGN")
assign(col_info, "frequency", freq[col_name])
assign(col_info, "name", col_name)
@@ -464,7 +495,8 @@ def columns_stats(self, df, columns, buckets=10, infer=False, relative_error=REL
return columns_info
- def columns_agg(self, df, columns, buckets=10, relative_error=RELATIVE_ERROR, approx_count=True, advanced_stats=True):
+ def columns_agg(self, df, columns, buckets=10, relative_error=RELATIVE_ERROR, approx_count=True,
+ advanced_stats=True):
columns = parse_columns(df, columns)
n = BATCH_SIZE
list_columns = [columns[i * n:(i + 1) * n] for i in range((len(columns) + n - 1) // n)]
@@ -580,8 +612,6 @@ def extra_columns_stats(df, col_name, stats):
return result
-
-
@staticmethod
def missing_values(df, columns):
"""