Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

basic mecat2pw grid integration #18

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file modified mecat2canu/src/canu_version_update.pl
100644 → 100755
Empty file.
95 changes: 82 additions & 13 deletions src/mecat2pw/pw.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <sstream>
#include <string>
#include <list>

using namespace std;

Expand All @@ -32,19 +33,60 @@ create_volume_results_name_finished(int vid, const char* wrk_dir, string& name)
}

void
merge_results(const char* output, const char* wrk_dir, const int num_volumes)
merge_results(const char* output, const char* wrk_dir, const std::list<std::string>& files)
{
string vrn;
for (int i = 0; i < num_volumes; ++i)
std::list<std::string>::const_iterator a = files.begin();
const std::list<std::string>::const_iterator end_a = files.end();
for (; a != end_a; ++a)
{
create_volume_results_name_finished(i, wrk_dir, vrn);
ostringstream cmd;
if (i == 0) cmd << "cat " << vrn << " >" << output;
else cmd << "cat " << vrn << " >> " << output;
if (a == files.begin()) cmd << "cat " << *a << " >" << output;
else cmd << "cat " << *a << " >> " << output;
assert(system(cmd.str().c_str()) == 0);
}
}

void
grid_start(std::string prog, const options_t &options, const int i, const int num_vols, const std::string &volume_results_name_working, const std::string &volume_results_name_finished)
{
// create grid command
options_t new_options = options;
new_options.job_index = i;
new_options.grid_options = NULL;
new_options.num_vols = num_vols;
new_options.reads = volume_results_name_working.c_str();
new_options.output = volume_results_name_finished.c_str();
std::ostringstream cmd;
cmd << "qsub -N m2pw." << i << " " << options.grid_options;
cmd << " \"" << prog << make_options(&new_options) << "\"";
assert(system(cmd.str().c_str()) == 0);
}

// pass by value so we can modify list to easily avoid checking previously
// found results files
void
wait_for_files(std::list<std::string> results)
{
const std::list<std::string>::const_iterator end_a = results.end();
while (!results.empty())
{
sleep(60);
std::list<std::string>::iterator a = results.begin();
while (a != end_a)
{
if (access(a->c_str(), F_OK) == 0)
{
a = results.erase(a);
}
else
{
++a;
}
}
}
}

int main(int argc, char* argv[])
{
options_t options;
Expand All @@ -55,31 +97,58 @@ int main(int argc, char* argv[])
return 1;
}

int num_vols = split_raw_dataset(options.reads, options.wrk_dir);
int num_vols = options.num_vols != -1 ? options.num_vols : split_raw_dataset(options.reads, options.wrk_dir);

char vol_idx_file_name[1024];
generate_idx_file_name(options.wrk_dir, vol_idx_file_name);
cout << vol_idx_file_name << "\n";
volume_names_t* vn = load_volume_names(vol_idx_file_name, 0);
r_assert(num_vols == vn->num_vols);
if (options.job_index != -1)
{
int i = options.job_index;
// repurposing these two for the wrapper
std::string volume_results_name_working = options.reads;
std::string volume_results_name_finished = options.output;
std::ofstream out;
open_fstream(out, volume_results_name_working.c_str(), std::ios::out);
process_one_volume(&options, i, vn->num_vols, vn, &out);
close_fstream(out);
assert(rename(volume_results_name_working.c_str(), volume_results_name_finished.c_str()) == 0);
return 1;
}
cout << vol_idx_file_name << "\n";
std::list<std::string> results;
for (int i = 0; i < vn->num_vols; ++i)
{
string volume_results_name_finished;
create_volume_results_name_finished(i, options.wrk_dir, volume_results_name_finished);
results.push_back(volume_results_name_finished);
if (access(volume_results_name_finished.c_str(), F_OK) == 0)
{
LOG(stderr, "volume %d has been finished\n", i);
continue;
}
string volume_results_name_working;
create_volume_results_name_working(i, options.wrk_dir, volume_results_name_working);
ofstream out;
open_fstream(out, volume_results_name_working.c_str(), ios::out);
process_one_volume(&options, i, vn->num_vols, vn, &out);
close_fstream(out);
assert(rename(volume_results_name_working.c_str(), volume_results_name_finished.c_str()) == 0);
if (options.grid_options == NULL)
{
ofstream out;
open_fstream(out, volume_results_name_working.c_str(), ios::out);
process_one_volume(&options, i, vn->num_vols, vn, &out);
close_fstream(out);
assert(rename(volume_results_name_working.c_str(), volume_results_name_finished.c_str()) == 0);
}
else
{
grid_start(argv[0], options, i, vn->num_vols, volume_results_name_working, volume_results_name_finished);
}
}
vn = delete_volume_names_t(vn);

if (options.grid_options != NULL)
{
wait_for_files(results);
}

merge_results(options.output, options.wrk_dir, num_vols);
merge_results(options.output, options.wrk_dir, results);
}
70 changes: 69 additions & 1 deletion src/mecat2pw/pw_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include <dirent.h>
#include <sys/stat.h>
#include <cstdio>
#include <sstream>
#include <string>

static const int kDefaultNumThreads = 1;
static const int kDefaultNumCandidates = 100;
Expand All @@ -27,6 +29,53 @@ print_options(options_t* options)
LOG(stderr, "tech\t%d", options->tech);
}

// given options, recreate arguments from the command line
std::string
make_options(options_t* options)
{
std::stringstream cmd;
if (options->task == TASK_ALN || options->task == TASK_SEED) {
cmd << " -j " << options->task;
}
if (options->reads != NULL) {
cmd << " -d " << options->reads;
}
if (options->output != NULL) {
cmd << " -o " << options->output;
}
if (options->wrk_dir != NULL) {
cmd << " -w " << options->wrk_dir;
}
if (options->grid_options != NULL) {
cmd << " -G " << options->grid_options;
}
if (options->num_threads > 0) {
cmd << " -t " << options->num_threads;
}
if (options->num_candidates > 0) {
cmd << " -n " << options->num_candidates;
}
if (options->min_align_size > 0) {
cmd << " -a " << options->min_align_size;
}
if (options->min_kmer_match > 0) {
cmd << " -k " << options->min_kmer_match;
}
if (options->output_gapped_start_point > 0) {
cmd << " -g " << options->output_gapped_start_point;
}
if (options->tech == TECH_PACBIO || options->tech == TECH_NANOPORE) {
cmd << " -x " << (options->tech == TECH_PACBIO ? 0 : 1);
}
if (options->num_vols > 0) {
cmd << " -N " << options->num_vols;
}
if (options->job_index != -1) {
cmd << " -i " << options->job_index;
}
return cmd.str();
}

void
init_options(options_t* options, int tech)
{
Expand All @@ -35,10 +84,13 @@ init_options(options_t* options, int tech)
options->reads = NULL;
options->output = NULL;
options->wrk_dir = NULL;
options->grid_options = NULL;
options->num_threads = 1;
options->num_candidates = 100;
options->output_gapped_start_point = 0;
options->tech = tech;
options->job_index = -1;
options->num_vols = -1;

if (tech == TECH_PACBIO) {
options->min_align_size = kDefaultAlignSizePacbio;
Expand Down Expand Up @@ -68,6 +120,7 @@ void print_usage(const char* prog)
fprintf(stderr, "Default: %d if x = %d, %d if x = %d\n", kDefaultKmerMatchPacbio, TECH_PACBIO, kDefaultKmerMatchNanopore, TECH_NANOPORE);
fprintf(stderr, "-g <0/1>\twhether print gapped extension start point, 0 = no, 1 = yes\n\t\tDefault: 0\n");
fprintf(stderr, "-x <0/x>\tsequencing technology: 0 = pacbio, 1 = nanopore\n\t\tDefault: 0\n");
fprintf(stderr, "-G <string>\tgrid options\n");
}

int
Expand All @@ -82,14 +135,17 @@ parse_arguments(int argc, char* argv[], options_t* options)
const char* reads = NULL;
const char* output = NULL;
const char* wrk_dir = NULL;
const char* grid_options = NULL;
int num_threads = -1;
int num_candidates = -1;
int min_align_size = -1;
int min_kmer_match = -1;
int output_gapped_start_point = -1;
int tech = TECH_PACBIO;
int job_index = -1;
int num_vols = -1;

while((opt_char = getopt(argc, argv, "j:d:o:w:t:n:g:x:a:k:")) != -1)
while((opt_char = getopt(argc, argv, "j:d:o:w:t:n:g:x:a:k:G:i:N:")) != -1)
{
switch(opt_char)
{
Expand All @@ -105,9 +161,18 @@ parse_arguments(int argc, char* argv[], options_t* options)
case 'w':
wrk_dir = optarg;
break;
case 'G':
grid_options = optarg;
break;
case 't':
num_threads = atoi(optarg);
break;
case 'i':
job_index = atoi(optarg);
break;
case 'N':
num_vols = atoi(optarg);
break;
case 'n':
num_candidates = atoi(optarg);
break;
Expand Down Expand Up @@ -155,11 +220,14 @@ parse_arguments(int argc, char* argv[], options_t* options)
options->reads = reads;
options->output = output;
options->wrk_dir = wrk_dir;
if (grid_options != NULL) options->grid_options = grid_options;
if (num_threads != -1) options->num_threads = num_threads;
if (num_candidates != -1) options->num_candidates = num_candidates;
if (min_align_size != -1) options->min_align_size = min_align_size;
if (min_kmer_match != -1) options->min_kmer_match = min_kmer_match;
if (output_gapped_start_point != -1) options->output_gapped_start_point = output_gapped_start_point;
if (job_index != -1) options->job_index = job_index;
if (num_vols != -1) options->num_vols = num_vols;

if (options->task != TASK_SEED && options->task != TASK_ALN)
{
Expand Down
8 changes: 8 additions & 0 deletions src/mecat2pw/pw_options.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#ifndef MP_OPTIONS_H
#define MP_OPTIONS_H

#include <string>

#include "../common/defs.h"

#define TASK_SEED 0
Expand All @@ -12,14 +14,20 @@ typedef struct
const char* reads;
const char* output;
const char* wrk_dir;
const char* grid_options;
int num_threads;
int num_candidates;
int min_align_size;
int min_kmer_match;
int output_gapped_start_point;
int tech;
int num_vols;
int job_index;
} options_t;

std::string
make_options(options_t* options);

void
print_options(options_t* options);

Expand Down