Skip to content

Commit

Permalink
multithreaded loading (test)
Browse files Browse the repository at this point in the history
  • Loading branch information
fernandomeyer committed Jan 6, 2021
1 parent 06c3098 commit f1f7cc6
Showing 1 changed file with 27 additions and 75 deletions.
102 changes: 27 additions & 75 deletions src/utils/load_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,32 +207,6 @@ def load_queries_mthreaded(gold_standard_file, bin_files, labels, options, optio
return load_queries(gs, queries, labels, options, options_gs)


def load_sample(query_tuple):
sample_id_to_gs_df, sample_id_to_gs_df_filtered, sample_id_to_query_df, sample_id, label, options = query_tuple
if sample_id not in sample_id_to_gs_df:
logging.getLogger('amber').critical("Sample ID {} in {} not found in the gold standard.".format(sample_id, label))
exit(1)

gs_df = sample_id_to_gs_df[sample_id]
query_df = sample_id_to_query_df[sample_id]
condition = query_df['SEQUENCEID'].isin(gs_df['SEQUENCEID'])
if ~condition.all():
logging.getLogger('amber').warning("{} sequences in {} not found in the gold standard.".format(query_df[~condition]['SEQUENCEID'].nunique(), label))
query_df = query_df[condition]

if options.min_length:
gs_df = sample_id_to_gs_df_filtered[sample_id]
condition = query_df['SEQUENCEID'].isin(gs_df['SEQUENCEID'])
if ~condition.all():
query_df = query_df[condition]

if 'BINID' in query_df.columns:
return binning_classes.GenomeQuery(query_df.drop_duplicates(['SEQUENCEID', 'BINID']), label, sample_id, options)
if 'TAXID' in query_df.columns and not binning_classes.TaxonomicQuery.taxonomy_df.empty:
return binning_classes.TaxonomicQuery(get_rank_to_df(query_df), label, sample_id, options)



def load_queries(gs, queries, labels, options, options_gs):
logging.getLogger('amber').info('Loading {}'.format(utils_labels.GS))
sample_id_to_gs_df = gs
Expand Down Expand Up @@ -264,61 +238,39 @@ def load_queries(gs, queries, labels, options, options_gs):
sample_id_to_queries_list[sample_id].append(t_query_gs)
sample_id_to_t_gs[sample_id] = t_query_gs


for query, label in zip(queries, labels):
logging.getLogger('amber').info('Loading {}'.format(label))
sample_id_to_query_df = query


samples_list = []
for sample_id in sample_id_to_query_df:
samples_list.append((sample_id_to_gs_df, sample_id_to_gs_df_filtered if options.min_length else sample_id_to_gs_df, query, sample_id, label, options))
pool = ThreadPool(multiprocessing.cpu_count())
results = pool.map(load_sample, samples_list)
pool.close()
for query in results:
if isinstance(query, binning_classes.GenomeQuery):
query.gold_standard = sample_id_to_g_gs[sample_id]
query.gold_standard_df = gs_df
sample_id_to_queries_list[sample_id].append(query)
if sample_id not in sample_id_to_gs_df:
logging.getLogger('amber').critical("Sample ID {} in {} not found in the gold standard.".format(sample_id, label))
exit(1)

gs_df = sample_id_to_gs_df[sample_id]
query_df = sample_id_to_query_df[sample_id]
condition = query_df['SEQUENCEID'].isin(gs_df['SEQUENCEID'])
if ~condition.all():
logging.getLogger('amber').warning("{} sequences in {} not found in the gold standard.".format(query_df[~condition]['SEQUENCEID'].nunique(), label))
query_df = query_df[condition]

if options.min_length:
gs_df = sample_id_to_gs_df_filtered[sample_id]
condition = query_df['SEQUENCEID'].isin(gs_df['SEQUENCEID'])
if ~condition.all():
query_df = query_df[condition]

if 'BINID' in query_df.columns:
g_query = binning_classes.GenomeQuery(query_df.drop_duplicates(['SEQUENCEID', 'BINID']), label, sample_id, options)
g_query.gold_standard = sample_id_to_g_gs[sample_id]
g_query.gold_standard_df = gs_df
sample_id_to_queries_list[sample_id].append(g_query)
options.only_taxonomic_queries = options_gs.only_taxonomic_queries = False
if isinstance(query, binning_classes.TaxonomicQuery):
query.gold_standard = sample_id_to_t_gs[sample_id]
query.gold_standard_df = sample_id_to_gs_rank_to_df[sample_id]
sample_id_to_queries_list[sample_id].append(query)
if 'TAXID' in query_df.columns and not binning_classes.TaxonomicQuery.taxonomy_df.empty:
t_query = binning_classes.TaxonomicQuery(get_rank_to_df(query_df), label, sample_id, options)
t_query.gold_standard = sample_id_to_t_gs[sample_id]
t_query.gold_standard_df = sample_id_to_gs_rank_to_df[sample_id]
sample_id_to_queries_list[sample_id].append(t_query)
options.only_taxonomic_queries = options_gs.only_genome_queries = False


# for sample_id in sample_id_to_query_df:
# if sample_id not in sample_id_to_gs_df:
# logging.getLogger('amber').critical("Sample ID {} in {} not found in the gold standard.".format(sample_id, label))
# exit(1)
#
# gs_df = sample_id_to_gs_df[sample_id]
# query_df = sample_id_to_query_df[sample_id]
# condition = query_df['SEQUENCEID'].isin(gs_df['SEQUENCEID'])
# if ~condition.all():
# logging.getLogger('amber').warning("{} sequences in {} not found in the gold standard.".format(query_df[~condition]['SEQUENCEID'].nunique(), label))
# query_df = query_df[condition]
#
# if options.min_length:
# gs_df = sample_id_to_gs_df_filtered[sample_id]
# condition = query_df['SEQUENCEID'].isin(gs_df['SEQUENCEID'])
# if ~condition.all():
# query_df = query_df[condition]
#
# if 'BINID' in query_df.columns:
# g_query = binning_classes.GenomeQuery(query_df.drop_duplicates(['SEQUENCEID', 'BINID']), label, sample_id, options)
# g_query.gold_standard = sample_id_to_g_gs[sample_id]
# g_query.gold_standard_df = gs_df
# sample_id_to_queries_list[sample_id].append(g_query)
# options.only_taxonomic_queries = options_gs.only_taxonomic_queries = False
# if 'TAXID' in query_df.columns and not binning_classes.TaxonomicQuery.taxonomy_df.empty:
# t_query = binning_classes.TaxonomicQuery(get_rank_to_df(query_df), label, sample_id, options)
# t_query.gold_standard = sample_id_to_t_gs[sample_id]
# t_query.gold_standard_df = sample_id_to_gs_rank_to_df[sample_id]
# sample_id_to_queries_list[sample_id].append(t_query)
# options.only_taxonomic_queries = options_gs.only_genome_queries = False

return sample_id_to_queries_list, list(sample_id_to_gs_df.keys())

0 comments on commit f1f7cc6

Please sign in to comment.