Skip to content

Commit

Permalink
fixes for nrows
Browse files Browse the repository at this point in the history
  • Loading branch information
calmacx committed Mar 17, 2022
2 parents ba10b4f + b78df64 commit 59a8813
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 36 deletions.
3 changes: 3 additions & 0 deletions coconnect/analyses/example_serology.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
def create_analysis(_filter):
def ana(model):
df = model.filter(_filter,dropna=True)
if len(df) == 0:
return {'fit':None,'hist':None}

df['age'] = 2022 - df['year_of_birth']
res = scipy.stats.linregress(x=df['age'],y=df['value_as_number'])
attributes = ['intercept', 'intercept_stderr', 'pvalue', 'rvalue', 'slope', 'stderr']
Expand Down
72 changes: 43 additions & 29 deletions coconnect/cdm/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ def add_analysis(self,func,_id=None):
if _id is None:
_id = hex(id(func))
self.__analyses[_id] = func

def get_analyses(self):
return self.__analyses
def get_analysis(self,key):
Expand All @@ -372,7 +372,7 @@ def run_analyses(self,analyses=None,max_workers=4):

def msg(x):
self.logger.info(f"finished with {x}")
self.logger.info(x.result())
self.logger.debug(x.result())

start = time()

Expand Down Expand Up @@ -431,38 +431,52 @@ def _filter(self,df,filters):

def filter(self,config,cols=None,dropna=False):
retval = None
for obj in config:
if isinstance(obj,str):
df = self[obj]
if isinstance(df,DestinationTable):
df = df.get_df()
if df.index.name != 'person_id':
df = df.set_index('person_id')
if retval is None:
retval = df
else:
retval = retval.merge(df,left_index=True,right_index=True)
elif isinstance(obj,dict):
for key,value in obj.items():
df = self[key]
if isinstance(df,DestinationTable):
df = df.get_df()

df = self._filter(df,value)
if df.index.name != 'person_id':
df = df.set_index('person_id')
if retval is None:
retval = df
else:
retval = retval.merge(df,left_index=True,right_index=True)
for table,spec in config.items():
df = self[table]
if isinstance(df,DestinationTable):
df = df.get_df()
for col,func in spec.items():
df = df[df[col].apply(func)]

if df.index.name != 'person_id':
df = df.reset_index().set_index('person_id')

if retval is None:
retval = df
else:
raise NotImplementedError("need to pass a json object to filter()")

retval = retval.merge(df,left_index=True,right_index=True)
if dropna:
retval = retval.dropna(axis=1)
if cols is not None:
retval = retval[cols]
index = retval.index.name
retval = retval.reset_index()
retval = retval[[col for col,keep in cols.items() if keep]]
if index in retval.columns:
retval = retval.set_index(index)

return retval


# for obj in config:
# if isinstance(obj,str):
# df = self[obj].get_df()
# if df.index.name != 'person_id':
# df = df.set_index('person_id')
# elif isinstance(obj,dict):
# for key,value in obj.items():
# print (self[key])
# df = self[key]
# df = self._filter(df,value)
# if df.index.name != 'person_id':
# df = df.set_index('person_id')
# if retval is None:
# retval = df
# else:
# retval = retval.merge(df,left_index=True,right_index=True)
# else:
# raise NotImplementedError("need to pass a json object to filter()")


def get_all_objects(self):
return [ obj for collection in self.__objects.values() for obj in collection.values()]
Expand Down
3 changes: 2 additions & 1 deletion coconnect/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
import io

class DataCollection(Logger):
def __init__(self,chunksize=None,**kwargs):
def __init__(self,chunksize=None,nrows=None,**kwargs):
self.logger.info("DataCollection Object Created")
self.__bricks = {}
self.chunksize = chunksize
self.nrows = nrows

if self.chunksize is not None:
self.logger.info(f"Using a chunksize of '{self.chunksize}' nrows")
Expand Down
9 changes: 9 additions & 0 deletions coconnect/io/plugins/bclink.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from coconnect.tools.bclink_helpers import BCLinkHelpers
from .local import LocalDataCollection
from coconnect.io.common import DataBrick
import io
import pandas as pd
import time
Expand Down Expand Up @@ -29,6 +30,14 @@ def finalise(self):

self.logger.info(f"done!")

def retrieve(self):
tables = self.bclink_helpers.get_table_map()
for name in tables:
df = self.bclink_helpers.get_table(name)
b = DataBrick(df,name=name)
self[name] = b


def write(self,*args,**kwargs):
f_out = super().write(*args,**kwargs)
destination_table = args[0]
Expand Down
5 changes: 3 additions & 2 deletions coconnect/io/plugins/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from time import gmtime, strftime

class LocalDataCollection(DataCollection):
def __init__(self,file_map=None,chunksize=None,output_folder=None,sep=',',write_mode='w',write_separate=False,**kwargs):
super().__init__(chunksize=chunksize)
def __init__(self,file_map=None,chunksize=None,nrows=None,output_folder=None,sep=',',write_mode='w',write_separate=False,**kwargs):
super().__init__(chunksize=chunksize,nrows=nrows)

self.__output_folder = output_folder
self.__separator = sep
Expand Down Expand Up @@ -163,6 +163,7 @@ def _load_input_files(self,file_map):
for name,path in file_map.items():
df = pd.read_csv(path,
chunksize=self.chunksize,
nrows=self.nrows,
dtype=str)
self[name] = DataBrick(df)

Expand Down
16 changes: 13 additions & 3 deletions coconnect/tools/bclink_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ def __init__(self,user='bclink',clean=False,check=True,gui_user='data',database=
self.clean_tables()
if check:
self.print_summary()


def get_table_map(self):
return self.table_map

def check_table_exists(self,table):
query = f"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = '{table}' )"

Expand Down Expand Up @@ -102,7 +105,10 @@ def get_table(self,table):
if stdout == None:
return None

return pd.read_csv(io.StringIO(stdout),sep='\t')
df = pd.read_csv(io.StringIO(stdout),sep='\t')
df.columns = [x.lower() for x in df.columns]
#df = df.drop('batch',axis=1)
return df


def get_bclink_table(self,table):
Expand Down Expand Up @@ -301,7 +307,11 @@ def get_table_jobs(self,table,head=1):
return info

def get_global_ids(self):
global_ids = self.table_map[get_default_global_id_name()]
name = get_default_global_id_name()
if name not in self.table_map.keys():
self.logger.warning(f"No table for getting existing person ids ({name}) has been defined")
return None
global_ids = self.table_map[name]

query=f"SELECT * FROM {global_ids} "
cmd=['bc_sqlselect',f'--user={self.user}',f'--query={query}',self.database]
Expand Down
2 changes: 1 addition & 1 deletion coconnect/tools/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def make_dag(data,output_file='dag.gv',format='pdf',render=False,orientation='RL
dot.render(output_file, view=True)
return
#
#return dot.pipe().decode('utf-8')
return dot.pipe().decode('utf-8')

def make_report_dag(data,name='dag',render=False,orientation='RL'):
_format = 'svg'
Expand Down

0 comments on commit 59a8813

Please sign in to comment.