Skip to content

Commit

Permalink
Change between from dispatch decorator to bounds param
Browse files Browse the repository at this point in the history
  • Loading branch information
argenisleon committed Dec 13, 2019
1 parent 1806261 commit 08fb003
Showing 1 changed file with 12 additions and 13 deletions.
25 changes: 12 additions & 13 deletions optimus/dataframe/rows.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
import functools
import operator
from functools import reduce

from multipledispatch import dispatch
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
import functools

from optimus.audf import filter_row_by_data_type as fbdt
# Helpers
from optimus.dataframe.create import Create
from optimus.infer import is_list_of_str_or_int, is_list_of_dataframes, is_list_of_tuples, is_dataframe, \
PYSPARK_NUMERIC_TYPES
from optimus.audf import filter_row_by_data_type as fbdt
from optimus.helpers.columns import parse_columns, validate_columns_names
from optimus.helpers.constants import Actions
from optimus.helpers.converter import one_list_to_val
from optimus.helpers.decorators import add_attr
from optimus.helpers.functions import append as append_df
from optimus.helpers.functions import val_to_list
from optimus.helpers.raiseit import RaiseIt
from optimus.infer import is_list_of_str_or_int, is_list_of_dataframes, is_list_of_tuples, is_dataframe, \
PYSPARK_NUMERIC_TYPES


def rows(self):
Expand Down Expand Up @@ -171,10 +172,11 @@ def drop(where=None):
return df

@add_attr(rows)
@dispatch((str, list), list, bool, bool)
def between(columns, bounds, invert=False, equal=False):
def between(columns, lower_bound=None, upper_bound=None, invert=False, equal=False, bounds=None):
"""
Trim values at input thresholds
:param upper_bound:
:param lower_bound:
:param columns: Columns to be trimmed
:param bounds:
:param invert:
Expand All @@ -183,6 +185,8 @@ def between(columns, bounds, invert=False, equal=False):
"""
# TODO: should process string or dates
columns = parse_columns(self, columns, filter_by_column_dtypes=PYSPARK_NUMERIC_TYPES)
if bounds is None:
bounds = (lower_bound, upper_bound)

def _between(_col_name):

Expand All @@ -208,8 +212,8 @@ def _between(_col_name):

sub_query = []
for bound in bounds:
lower_bound, upper_bound = bound
sub_query.append(opb(op1(F.col(_col_name), lower_bound), op2(F.col(_col_name), upper_bound)))
_lower_bound, _upper_bound = bound
sub_query.append(opb(op1(F.col(_col_name), _lower_bound), op2(F.col(_col_name), _upper_bound)))
query = functools.reduce(operator.__or__, sub_query)

return query
Expand All @@ -220,11 +224,6 @@ def _between(_col_name):
df = df.preserve_meta(self, Actions.DROP_ROW.value, df.cols.names())
return df

@add_attr(rows)
@dispatch((str, list), int, int)
def between(columns, lower_bound, upper_bound, invert=False, equal=False):
return between(columns, [(lower_bound, upper_bound)], invert, equal)

@add_attr(rows)
def drop_by_dtypes(input_cols, data_type=None):
"""
Expand Down

0 comments on commit 08fb003

Please sign in to comment.