Skip to content

Commit

Permalink
Now between accepts tuples
Browse files Browse the repository at this point in the history
  • Loading branch information
argenisleon committed Dec 6, 2019
1 parent 8410d73 commit c3ca5b4
Show file tree
Hide file tree
Showing 5 changed files with 1,376 additions and 520 deletions.
3 changes: 0 additions & 3 deletions optimus/dataframe/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
from infer import Infer, is_, is_type, is_function, is_list, is_tuple, is_list_of_str, \
is_list_of_dataframes, is_list_of_tuples, is_one_element, is_num_or_str, is_numeric, is_str, is_int, is_dataframe, \
parse_spark_class_dtypes, PYSPARK_NUMERIC_TYPES, PYSPARK_NOT_ARRAY_TYPES, PYSPARK_STRING_TYPES, PYSPARK_ARRAY_TYPES
# Functions
# from optimus.optimus import Optimus
# from optimus.optimus import Optimus
from optimus.audf import abstract_udf as audf, filter_row_by_data_type as fbdt
# Helpers
from optimus.helpers.check import has_, is_column_a
Expand Down
38 changes: 23 additions & 15 deletions optimus/dataframe/rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from multipledispatch import dispatch
from pyspark.sql import DataFrame
from pyspark.sql import functions as F

import functools
# Helpers
from optimus.dataframe.create import Create
from infer import is_list_of_str_or_int, is_list_of_dataframes, is_list_of_tuples, is_dataframe, \
Expand Down Expand Up @@ -171,20 +171,20 @@ def drop(where=None):
return df

@add_attr(rows)
def between(columns, lower_bound, upper_bound, invert=False, equal=False):
"""
Trim values at input thresholds
:param columns: Columns to be trimmed
:param lower_bound: Lower value bound
:param upper_bound: Upper value bound
:param invert:
:param equal:
:return:
"""
@dispatch((str, list), list)
def between(columns, bounds, invert=False, equal=False):
"""
Trim values at input thresholds
:param columns: Columns to be trimmed
:param bounds:
:param invert:
:param equal:
:return:
"""
# TODO: should process string or dates
columns = parse_columns(self, columns, filter_by_column_dtypes=PYSPARK_NUMERIC_TYPES)

def _clip(_col_name):
def _between(_col_name):

if invert is False and equal is False:
op1 = operator.gt
Expand All @@ -206,16 +206,24 @@ def _clip(_col_name):
op2 = operator.ge
opb = operator.__or__

query = opb(op1(F.col(_col_name), lower_bound), op2(F.col(_col_name), upper_bound))
sub_query = []
for bound in bounds:
lower_bound, upper_bound = bound
sub_query.append(opb(op1(F.col(_col_name), lower_bound), op2(F.col(_col_name), upper_bound)))
query = functools.reduce(operator.__or__, sub_query)

print(query)
return query

df = self
for col_name in columns:
df = df.where(_clip(col_name))
df = df.where(_between(col_name))
return df

@add_attr(rows)
@dispatch((str, list), int, int)
def between(columns, lower_bound, upper_bound, invert=False, equal=False):
return between(columns, [(lower_bound, upper_bound)], invert, equal)

@add_attr(rows)
def drop_by_dtypes(input_cols, data_type=None):
"""
Expand Down
Loading

0 comments on commit c3ca5b4

Please sign in to comment.