Skip to content

Commit

Permalink
Merge branch 'feature/profiler_improvements' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
argenisleon committed Nov 13, 2019
2 parents 23e7f35 + 2f255e5 commit 4934f0b
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 45 deletions.
66 changes: 43 additions & 23 deletions optimus/bumblebee.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,38 @@
PROTOCOL = "http://"
PROTOCOL_SSL = "https://"

# API
DOMAIN_API = "api.hi-bumblebee.com"
FULL_DOMAIN_API = PROTOCOL_SSL + DOMAIN_API
END_POINT = FULL_DOMAIN_API + "/dataset"
FULL_API_URL = PROTOCOL_SSL + DOMAIN_API

# API END POINTS
END_POINT = "/dataset"

# APP
DOMAIN_APP = "app.hi-bumblebee.com"
FULL_DOMAIN = PROTOCOL_SSL + DOMAIN_APP
FULL_APP_URL = PROTOCOL_SSL + DOMAIN_APP


class Comm:
"""
Send encrypted message to the Bumblebee
"""

def __init__(self, queue_name=None, key=None):
def __init__(self, app_url=None, api_url=None, queue_name=None, key=None):

# If queue_name was not given try lo load from file if not generate one

if app_url is None:
self.app_url = save_config_key("bumblebee.ini", "DEFAULT", "appUrl", FULL_APP_URL)
else:
self.app_url = api_url

# API
if api_url is None:
self.api_url = save_config_key("bumblebee.ini", "DEFAULT", "apiUrl", FULL_API_URL)
else:
self.api_url = api_url

if queue_name is None:
self.queue_name = save_config_key("bumblebee.ini", "DEFAULT", "QueueName", str(uuid.uuid4()))
else:
Expand All @@ -43,21 +59,19 @@ def __init__(self, queue_name=None, key=None):
else:
self.key = key

keys_link = "<a href ='{FULL_DOMAIN}'> here</a>".format(FULL_DOMAIN=FULL_DOMAIN,
SESSION=self.queue_name, KEY=self.key)
keys_link = "<a href ='{FULL_DOMAIN}'> here</a>".format(FULL_DOMAIN=self.app_url,
SESSION=self.queue_name, KEY=self.key)

direct_link = "<a target='_blank' href ='{FULL_DOMAIN}/?session={SESSION}&key={KEY}&view=0'>{FULL_DOMAIN}</a>".format(
FULL_DOMAIN=FULL_DOMAIN, SESSION=self.queue_name, KEY=self.key)
FULL_DOMAIN=self.app_url, SESSION=self.queue_name, KEY=self.key)

print_html(
"Open Bumblebee: " + direct_link +
"<div>If you really care about privacy get your keys in bumblebee.ini and put them" + keys_link + "</div>"


)

self.token = None

self.f = Fernet(self.key)

@staticmethod
Expand All @@ -74,30 +88,36 @@ def _encrypt(self, message):
message = str(message).encode()
return self.f.encrypt(message)

def send(self, message):
def send(self, message, output):
"""
Send the info to the queue
:param message:
:param output: "http" or "json"
:return:
"""
logger.print(message)
self.token = self._encrypt(self._compress(message)).decode()

logger.print(self.token)
try:
headers = {'content-type': 'application/json'}

data = json.dumps({"username": self.queue_name, "data": self.token})
response = requests.post(END_POINT, data=data, headers=headers)

# If the response was successful, no Exception will be raised
response.raise_for_status()
except HTTPError as http_err:
print(f'HTTP error occurred: {http_err}')
except Exception as err:
print(f'Other error occurred: {err}')
data = json.dumps({"username": self.queue_name, "data": self.token})

if output == "http":
try:
headers = {'content-type': 'application/json'}

end_point_dataset = self.api_url + END_POINT
response = requests.post(end_point_dataset, data=data, headers=headers)

# If the response was successful, no Exception will be raised
response.raise_for_status()
except HTTPError as http_err:
print(f'HTTP error occurred: {http_err}')
except Exception as err:
print(f'Other error occurred: {err}')
else:
print('Send!')
else:
print('Send!')
return data

def _decrypt(self, token):
return self.f.decrypt(token)
Expand Down
21 changes: 11 additions & 10 deletions optimus/dataframe/extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,31 +462,32 @@ def reset(self):


@add_method(DataFrame)
def send(self, name=None, infer=True, mismatch=None, stats=True, advanced_stats=True):
def send(self, name=None, infer=True, mismatch=None, stats=True, advanced_stats=True, output="http"):
"""
Profile and send the data to the queue
:param self:
:param infer: infer datatypes
:param mismatch: a dict with the column name or regular expression to identify correct values.
:param name: Specified a name for the view/dataframe
:param stats: calculate stats or only vales
:param output:
:return:
"""
df = self
if name is not None:
df.set_name(name)

output = Profiler.instance.dataset(df, columns="*", buckets=35, infer=infer, relative_error=RELATIVE_ERROR,
approx_count=True,
sample=10000,
stats=stats,
format="json",
mismatch=mismatch,
advanced_stats=advanced_stats
)
message = Profiler.instance.dataset(df, columns="*", buckets=35, infer=infer, relative_error=RELATIVE_ERROR,
approx_count=True,
sample=10000,
stats=stats,
format="json",
mismatch=mismatch,
advanced_stats=advanced_stats
)

if Comm.instance:
Comm.instance.send(output)
return Comm.instance.send(message, output=output)
else:
raise Exception("Comm is not initialized. Please use comm=True param like Optimus(comm=True)")

Expand Down
54 changes: 42 additions & 12 deletions optimus/profiler/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,27 @@
MAX_BUCKETS = 33
BATCH_SIZE = 20

import collections
import six

# python 3.8+ compatibility
try:
collectionsAbc = collections.abc
except:
collectionsAbc = collections


def update(d, u):
for k, v in six.iteritems(u):
dv = d.get(k, {})
if not isinstance(dv, collectionsAbc.Mapping):
d[k] = v
elif isinstance(v, collectionsAbc.Mapping):
d[k] = update(dv, v)
else:
d[k] = v
return d


class Profiler:

Expand Down Expand Up @@ -311,6 +332,7 @@ def dataset(self, df, columns="*", buckets=10, infer=False, relative_error=RELAT
:param stats: calculate stats, if not only data table returned
:param format: dict or json
:param mismatch:
:param advanced_stats:
:return: dict or json
"""
output_columns = self.output_columns
Expand All @@ -323,13 +345,18 @@ def dataset(self, df, columns="*", buckets=10, infer=False, relative_error=RELAT
rows_count = df.count()
self.rows_count = rows_count
self.cols_count = cols_count = len(df.columns)
output_columns = self.columns_stats(df, cols_to_profile, buckets, infer, relative_error, approx_count,
mismatch, advanced_stats)
updated_columns = self.columns_stats(df, cols_to_profile, buckets, infer, relative_error, approx_count,
mismatch, advanced_stats)

# Update last profiling info
# Merge old and current profiling
if self.is_cached():
output_columns["columns"].update(self.output_columns["columns"])
# if self.is_cached():
# for c in cols_to_profile:
# # output_columns["columns"].update()
# output_columns["columns"][c].update(updated_columns["columns"][c])
# else:
# output_columns = updated_columns
output_columns = update(output_columns, updated_columns)

assign(output_columns, "name", df.get_name(), dict)
assign(output_columns, "file_name", df.get_meta("file_name"), dict)
Expand All @@ -350,10 +377,11 @@ def dataset(self, df, columns="*", buckets=10, infer=False, relative_error=RELAT
assign(output_columns, "summary.missing_count", total_count_na, dict)
assign(output_columns, "summary.p_missing", round(total_count_na / self.rows_count * 100, 2))

sample = {"columns": [{"title": cols} for cols in df.cols.names()],
"value": df.sample_n(sample).rows.to_list(columns)}
# TODO: drop, rename and move operation must affect the sample
sample = {"columns": [{"title": cols} for cols in df.cols.names()],
"value": df.sample_n(sample).rows.to_list(columns)}

assign(output_columns, "sample", sample, dict)
assign(output_columns, "sample", sample, dict)

actual_columns = output_columns["columns"]
# Order columns
Expand Down Expand Up @@ -441,17 +469,20 @@ def columns_stats(self, df, columns, buckets=10, infer=False, relative_error=REL

# Calculate Frequency
logger.print("Processing Frequency ...")
df_freq = df.cols.select("*", data_type=PYSPARK_NUMERIC_TYPES, invert=True)
# print("COLUMNS",columns)
df_freq = df.cols.select(columns, data_type=PYSPARK_NUMERIC_TYPES, invert=True)

freq = None
if df_freq is not None:
freq = df_freq.cols.frequency("*", buckets, True, self.rows_count)

# print("FREQUENCY1", freq)
for col_name in columns:
col_info = {}
assign(col_info, "stats", stats[col_name], dict)

if freq is not None:
if col_name in freq:
# print("ASSIGN")
assign(col_info, "frequency", freq[col_name])

assign(col_info, "name", col_name)
Expand All @@ -464,7 +495,8 @@ def columns_stats(self, df, columns, buckets=10, infer=False, relative_error=REL

return columns_info

def columns_agg(self, df, columns, buckets=10, relative_error=RELATIVE_ERROR, approx_count=True, advanced_stats=True):
def columns_agg(self, df, columns, buckets=10, relative_error=RELATIVE_ERROR, approx_count=True,
advanced_stats=True):
columns = parse_columns(df, columns)
n = BATCH_SIZE
list_columns = [columns[i * n:(i + 1) * n] for i in range((len(columns) + n - 1) // n)]
Expand Down Expand Up @@ -580,8 +612,6 @@ def extra_columns_stats(df, col_name, stats):

return result



@staticmethod
def missing_values(df, columns):
"""
Expand Down

0 comments on commit 4934f0b

Please sign in to comment.