From 875405fb83e903fbaa687bdd55b2dde423b8445b Mon Sep 17 00:00:00 2001 From: Dave Flowers Date: Thu, 17 Aug 2017 14:08:27 -0500 Subject: [PATCH] adding grid to mecat2pw --- mecat2canu/src/canu_version_update.pl | 0 src/mecat2pw/pw.cpp | 95 +++++++++++++++++++++++---- src/mecat2pw/pw_options.cpp | 70 +++++++++++++++++++- src/mecat2pw/pw_options.h | 8 +++ 4 files changed, 159 insertions(+), 14 deletions(-) mode change 100644 => 100755 mecat2canu/src/canu_version_update.pl diff --git a/mecat2canu/src/canu_version_update.pl b/mecat2canu/src/canu_version_update.pl old mode 100644 new mode 100755 diff --git a/src/mecat2pw/pw.cpp b/src/mecat2pw/pw.cpp index f3b0763..e94b26d 100644 --- a/src/mecat2pw/pw.cpp +++ b/src/mecat2pw/pw.cpp @@ -8,6 +8,7 @@ #include #include +#include using namespace std; @@ -32,19 +33,60 @@ create_volume_results_name_finished(int vid, const char* wrk_dir, string& name) } void -merge_results(const char* output, const char* wrk_dir, const int num_volumes) +merge_results(const char* output, const char* wrk_dir, const std::list& files) { string vrn; - for (int i = 0; i < num_volumes; ++i) + std::list::const_iterator a = files.begin(); + const std::list::const_iterator end_a = files.end(); + for (; a != end_a; ++a) { - create_volume_results_name_finished(i, wrk_dir, vrn); ostringstream cmd; - if (i == 0) cmd << "cat " << vrn << " >" << output; - else cmd << "cat " << vrn << " >> " << output; + if (a == files.begin()) cmd << "cat " << *a << " >" << output; + else cmd << "cat " << *a << " >> " << output; assert(system(cmd.str().c_str()) == 0); } } +void +grid_start(std::string prog, const options_t &options, const int i, const int num_vols, const std::string &volume_results_name_working, const std::string &volume_results_name_finished) +{ + // create grid command + options_t new_options = options; + new_options.job_index = i; + new_options.grid_options = NULL; + new_options.num_vols = num_vols; + new_options.reads = volume_results_name_working.c_str(); + new_options.output = volume_results_name_finished.c_str(); + std::ostringstream cmd; + cmd << "qsub -N m2pw." << i << " " << options.grid_options; + cmd << " \"" << prog << make_options(&new_options) << "\""; + assert(system(cmd.str().c_str()) == 0); +} + +// pass by value so we can modify list to easily avoid checking previously +// found results files +void +wait_for_files(std::list results) +{ + const std::list::const_iterator end_a = results.end(); + while (!results.empty()) + { + sleep(60); + std::list::iterator a = results.begin(); + while (a != end_a) + { + if (access(a->c_str(), F_OK) == 0) + { + a = results.erase(a); + } + else + { + ++a; + } + } + } +} + int main(int argc, char* argv[]) { options_t options; @@ -55,17 +97,32 @@ int main(int argc, char* argv[]) return 1; } - int num_vols = split_raw_dataset(options.reads, options.wrk_dir); + int num_vols = options.num_vols != -1 ? options.num_vols : split_raw_dataset(options.reads, options.wrk_dir); char vol_idx_file_name[1024]; generate_idx_file_name(options.wrk_dir, vol_idx_file_name); - cout << vol_idx_file_name << "\n"; volume_names_t* vn = load_volume_names(vol_idx_file_name, 0); r_assert(num_vols == vn->num_vols); + if (options.job_index != -1) + { + int i = options.job_index; + // repurposing these two for the wrapper + std::string volume_results_name_working = options.reads; + std::string volume_results_name_finished = options.output; + std::ofstream out; + open_fstream(out, volume_results_name_working.c_str(), std::ios::out); + process_one_volume(&options, i, vn->num_vols, vn, &out); + close_fstream(out); + assert(rename(volume_results_name_working.c_str(), volume_results_name_finished.c_str()) == 0); + return 1; + } + cout << vol_idx_file_name << "\n"; + std::list results; for (int i = 0; i < vn->num_vols; ++i) { string volume_results_name_finished; create_volume_results_name_finished(i, options.wrk_dir, volume_results_name_finished); + results.push_back(volume_results_name_finished); if (access(volume_results_name_finished.c_str(), F_OK) == 0) { LOG(stderr, "volume %d has been finished\n", i); @@ -73,13 +130,25 @@ int main(int argc, char* argv[]) } string volume_results_name_working; create_volume_results_name_working(i, options.wrk_dir, volume_results_name_working); - ofstream out; - open_fstream(out, volume_results_name_working.c_str(), ios::out); - process_one_volume(&options, i, vn->num_vols, vn, &out); - close_fstream(out); - assert(rename(volume_results_name_working.c_str(), volume_results_name_finished.c_str()) == 0); + if (options.grid_options == NULL) + { + ofstream out; + open_fstream(out, volume_results_name_working.c_str(), ios::out); + process_one_volume(&options, i, vn->num_vols, vn, &out); + close_fstream(out); + assert(rename(volume_results_name_working.c_str(), volume_results_name_finished.c_str()) == 0); + } + else + { + grid_start(argv[0], options, i, vn->num_vols, volume_results_name_working, volume_results_name_finished); + } } vn = delete_volume_names_t(vn); + + if (options.grid_options != NULL) + { + wait_for_files(results); + } - merge_results(options.output, options.wrk_dir, num_vols); + merge_results(options.output, options.wrk_dir, results); } diff --git a/src/mecat2pw/pw_options.cpp b/src/mecat2pw/pw_options.cpp index 5a41947..5bef5e5 100644 --- a/src/mecat2pw/pw_options.cpp +++ b/src/mecat2pw/pw_options.cpp @@ -4,6 +4,8 @@ #include #include #include +#include +#include static const int kDefaultNumThreads = 1; static const int kDefaultNumCandidates = 100; @@ -27,6 +29,53 @@ print_options(options_t* options) LOG(stderr, "tech\t%d", options->tech); } +// given options, recreate arguments from the command line +std::string +make_options(options_t* options) +{ + std::stringstream cmd; + if (options->task == TASK_ALN || options->task == TASK_SEED) { + cmd << " -j " << options->task; + } + if (options->reads != NULL) { + cmd << " -d " << options->reads; + } + if (options->output != NULL) { + cmd << " -o " << options->output; + } + if (options->wrk_dir != NULL) { + cmd << " -w " << options->wrk_dir; + } + if (options->grid_options != NULL) { + cmd << " -G " << options->grid_options; + } + if (options->num_threads > 0) { + cmd << " -t " << options->num_threads; + } + if (options->num_candidates > 0) { + cmd << " -n " << options->num_candidates; + } + if (options->min_align_size > 0) { + cmd << " -a " << options->min_align_size; + } + if (options->min_kmer_match > 0) { + cmd << " -k " << options->min_kmer_match; + } + if (options->output_gapped_start_point > 0) { + cmd << " -g " << options->output_gapped_start_point; + } + if (options->tech == TECH_PACBIO || options->tech == TECH_NANOPORE) { + cmd << " -x " << (options->tech == TECH_PACBIO ? 0 : 1); + } + if (options->num_vols > 0) { + cmd << " -N " << options->num_vols; + } + if (options->job_index != -1) { + cmd << " -i " << options->job_index; + } + return cmd.str(); +} + void init_options(options_t* options, int tech) { @@ -35,10 +84,13 @@ init_options(options_t* options, int tech) options->reads = NULL; options->output = NULL; options->wrk_dir = NULL; + options->grid_options = NULL; options->num_threads = 1; options->num_candidates = 100; options->output_gapped_start_point = 0; options->tech = tech; + options->job_index = -1; + options->num_vols = -1; if (tech == TECH_PACBIO) { options->min_align_size = kDefaultAlignSizePacbio; @@ -68,6 +120,7 @@ void print_usage(const char* prog) fprintf(stderr, "Default: %d if x = %d, %d if x = %d\n", kDefaultKmerMatchPacbio, TECH_PACBIO, kDefaultKmerMatchNanopore, TECH_NANOPORE); fprintf(stderr, "-g <0/1>\twhether print gapped extension start point, 0 = no, 1 = yes\n\t\tDefault: 0\n"); fprintf(stderr, "-x <0/x>\tsequencing technology: 0 = pacbio, 1 = nanopore\n\t\tDefault: 0\n"); + fprintf(stderr, "-G \tgrid options\n"); } int @@ -82,14 +135,17 @@ parse_arguments(int argc, char* argv[], options_t* options) const char* reads = NULL; const char* output = NULL; const char* wrk_dir = NULL; + const char* grid_options = NULL; int num_threads = -1; int num_candidates = -1; int min_align_size = -1; int min_kmer_match = -1; int output_gapped_start_point = -1; int tech = TECH_PACBIO; + int job_index = -1; + int num_vols = -1; - while((opt_char = getopt(argc, argv, "j:d:o:w:t:n:g:x:a:k:")) != -1) + while((opt_char = getopt(argc, argv, "j:d:o:w:t:n:g:x:a:k:G:i:N:")) != -1) { switch(opt_char) { @@ -105,9 +161,18 @@ parse_arguments(int argc, char* argv[], options_t* options) case 'w': wrk_dir = optarg; break; + case 'G': + grid_options = optarg; + break; case 't': num_threads = atoi(optarg); break; + case 'i': + job_index = atoi(optarg); + break; + case 'N': + num_vols = atoi(optarg); + break; case 'n': num_candidates = atoi(optarg); break; @@ -155,11 +220,14 @@ parse_arguments(int argc, char* argv[], options_t* options) options->reads = reads; options->output = output; options->wrk_dir = wrk_dir; + if (grid_options != NULL) options->grid_options = grid_options; if (num_threads != -1) options->num_threads = num_threads; if (num_candidates != -1) options->num_candidates = num_candidates; if (min_align_size != -1) options->min_align_size = min_align_size; if (min_kmer_match != -1) options->min_kmer_match = min_kmer_match; if (output_gapped_start_point != -1) options->output_gapped_start_point = output_gapped_start_point; + if (job_index != -1) options->job_index = job_index; + if (num_vols != -1) options->num_vols = num_vols; if (options->task != TASK_SEED && options->task != TASK_ALN) { diff --git a/src/mecat2pw/pw_options.h b/src/mecat2pw/pw_options.h index e15cd57..4611514 100644 --- a/src/mecat2pw/pw_options.h +++ b/src/mecat2pw/pw_options.h @@ -1,6 +1,8 @@ #ifndef MP_OPTIONS_H #define MP_OPTIONS_H +#include + #include "../common/defs.h" #define TASK_SEED 0 @@ -12,14 +14,20 @@ typedef struct const char* reads; const char* output; const char* wrk_dir; + const char* grid_options; int num_threads; int num_candidates; int min_align_size; int min_kmer_match; int output_gapped_start_point; int tech; + int num_vols; + int job_index; } options_t; +std::string +make_options(options_t* options); + void print_options(options_t* options);