diff --git a/src/Main.cpp b/src/Main.cpp index edaf07f..3985147 100644 --- a/src/Main.cpp +++ b/src/Main.cpp @@ -7,7 +7,7 @@ #include "peprocess.h" #include "seprocess.h" int main(int argc,char* argv[]){ - C_global_variable gv; //global variable, include statistic information + //C_global_variable gv; //global variable, include statistic information C_global_parameter gp; //global parameter generated by commands check_module(argc,argv); //check module global_parameter_initial(argc,argv,gp); //initial global parameter diff --git a/src/gc.cpp b/src/gc.cpp index 62c3c53..190c4cb 100644 --- a/src/gc.cpp +++ b/src/gc.cpp @@ -57,7 +57,7 @@ quartile_result cal_quar_from_map(map data){ } return return_value; } -quartile_result cal_quar_from_array(unsigned long long data[],int len){ +quartile_result cal_quar_from_array(long long data[],int len){ //cout< &elements){ if(line_info[ix]!=sep){ element+=line_info[ix]; }else{ - elements.push_back(element); + elements.emplace_back(element); element=""; } } - elements.push_back(element); + elements.emplace_back(element); } void line_split(string line_info,vector &elements){ elements.clear(); @@ -184,11 +185,11 @@ void line_split(string line_info,vector &elements){ if(!isspace(line_info[ix])){ element+=line_info[ix]; }else{ - elements.push_back(element); + elements.emplace_back(element); element=""; } } - elements.push_back(element); + elements.emplace_back(element); } void line_split(string line_info,char sep,set &elements){ elements.clear(); @@ -294,7 +295,7 @@ string join_vector(vector a,string sep){ return result; } string link_dir_file(string dir,string file){ - string::size_type index; + //string::size_type index; // remove null char in the start or end; dir.erase(0,dir.find_first_not_of(" ")); dir.erase(dir.find_last_not_of(" ")+1); @@ -433,4 +434,4 @@ vector get_se_hard_trim(string a){ exit(1); } return tmp_eles; -} \ No newline at end of file +} diff --git a/src/gc.h b/src/gc.h index eca1014..fcaf8ae 100644 --- a/src/gc.h +++ b/src/gc.h @@ -16,20 +16,20 @@ class quartile_result{ float first10_quar,last10_quar; }; -quartile_result cal_quar_from_map(map data); -quartile_result cal_quar_from_array(unsigned long long data[],int len); +//quartile_result cal_quar_from_map(map data); +quartile_result cal_quar_from_array(long long data[],int len); vector get_pe_hard_trim(string a); -vector get_se_hard_trim(string a); +//vector get_se_hard_trim(string a); void check_gz_file(string a); int check_gz_empty(string a); -void remove_space(string &a); -int file_exist_and_not_empty(string file_name); -void uniq_vector(vector &a); +//void remove_space(string &a); +//int file_exist_and_not_empty(string file_name); +//void uniq_vector(vector &a); string get_local_time(); void line_split(string line_info,char sep,vector &elements); void line_split(string line_info,char sep,set &elements); void line_split(string line_info,vector &elements); -int count_gc(string a); +//int count_gc(string a); void int2string(int &a,string &b); void double2string(double &a,string &b); void float2string(float &a,string &b); diff --git a/src/global_parameter.h b/src/global_parameter.h index 11bea94..25bed67 100644 --- a/src/global_parameter.h +++ b/src/global_parameter.h @@ -15,7 +15,7 @@ class C_global_parameter{ isAdptList_(true), isFull_(false), size_(0), cleanQualSys_(ILLUMINA_), filterAdapter_(true),seqType_(0),outType_(0),highAType_(0) */ C_global_parameter():is_streaming(false),seq_type("0"),index_remove(false),qualityPhred(33),outputQualityPhred(33),adapter_discard_or_trim("discard"),contam_discard_or_trim("discard"),adapter_method("hd"),whether_add_pe_info(false),output_file_type("fastq"),lowQual(5),lowQualityBaseRatio(0.5),meanQuality(-1),n_ratio(0.05),highA_ratio(-1),polyG_tail(-1),polyX_num(-1),overlap_length(-1),peMismatchRatio(0.1),max_read_length(-1),min_read_length(30),output_clean(0),have_output1(0),have_output2(0),adaMis(2),adaMR(0.5),ctMatchR("0.2"),adaEdge(6),adaRCtg(6),adaRAr(0.8),adaRMa(5),adaREr(0.4),adaRMm(4),threads_num(6),patchSize(0),split_line(10000000),mode(""),total_reads_num(0),f_total_reads_ratio(0),l_total_reads_num(0),total_reads_num_random(true),clean_file_reads(0){}; - C_global_parameter(int argc,char* argv[]); + //C_global_parameter(int argc,char* argv[]); string mode; string module_name; diff --git a/src/global_variable.h b/src/global_variable.h index 2f2a377..79abd10 100644 --- a/src/global_variable.h +++ b/src/global_variable.h @@ -10,7 +10,10 @@ using namespace::std; //#define REAL_MAX 50 class C_filter_stat{ public: - C_filter_stat():in_adapter_list_num(0),include_adapter_seq_num(0),include_contam_seq_num(0),n_ratio_num(0),highA_num(0),polyX_num(0),tile_num(0),fov_num(0),low_qual_base_ratio_num(0),mean_quality_num(0),short_len_num(0),long_len_num(0),over_lapped_num(0),no_3_adapter_num(0),int_insertNull_num(0),include_adapter_seq_num1(0),include_adapter_seq_num2(0),include_adapter_seq_num_overlap(0),include_contam_seq_num1(0),include_contam_seq_num2(0),include_contam_seq_num_overlap(0),n_ratio_num1(0),n_ratio_num2(0),n_ratio_num_overlap(0),highA_num1(0),highA_num2(0),highA_num_overlap(0),polyX_num1(0),polyX_num2(0),polyX_num_overlap(0),low_qual_base_ratio_num1(0),low_qual_base_ratio_num2(0),low_qual_base_ratio_num_overlap(0),mean_quality_num1(0),mean_quality_num2(0),mean_quality_num_overlap(0),short_len_num1(0),short_len_num2(0),short_len_num_overlap(0),long_len_num1(0),long_len_num2(0),long_len_num_overlap(0){}; + C_filter_stat():in_adapter_list_num(0),include_adapter_seq_num(0),include_contam_seq_num(0),n_ratio_num(0),highA_num(0),polyX_num(0),tile_num(0),fov_num(0),low_qual_base_ratio_num(0),mean_quality_num(0),short_len_num(0),long_len_num(0),over_lapped_num(0),no_3_adapter_num(0),int_insertNull_num(0),include_adapter_seq_num1(0),include_adapter_seq_num2(0),include_adapter_seq_num_overlap(0),include_contam_seq_num1(0),include_contam_seq_num2(0),include_contam_seq_num_overlap(0),n_ratio_num1(0),n_ratio_num2(0),n_ratio_num_overlap(0),highA_num1(0),highA_num2(0),highA_num_overlap(0),polyX_num1(0),polyX_num2(0),polyX_num_overlap(0),low_qual_base_ratio_num1(0),low_qual_base_ratio_num2(0),low_qual_base_ratio_num_overlap(0),mean_quality_num1(0),mean_quality_num2(0),mean_quality_num_overlap(0),short_len_num1(0),short_len_num2(0),short_len_num_overlap(0),long_len_num1(0),long_len_num2(0),long_len_num_overlap(0){ + include_global_contam_seq_num=include_global_contam_seq_num1=include_global_contam_seq_num2=include_global_contam_seq_num_overlap=0; + polyG_num=polyG_num1=polyG_num2=polyG_num_overlap=0; + }; //int output_reads_num; int in_adapter_list_num; int include_adapter_seq_num,include_adapter_seq_num1,include_adapter_seq_num2,include_adapter_seq_num_overlap; @@ -36,32 +39,32 @@ class C_general_stat{ C_general_stat(); int read_max_length; int read_length; - int reads_number; - unsigned long long base_number; - unsigned long long a_number,c_number,g_number,t_number,n_number; + long long reads_number; + long long base_number; + long long a_number,c_number,g_number,t_number,n_number; //unsigned long long a_ratio,c_ratio,g_ratio,t_ratio,n_ratio; - unsigned long long q20_num,q30_num; + long long q20_num,q30_num; //unsigned long long q20_ratio,q30_ratio; }; class C_reads_pos_base_stat{ public: C_reads_pos_base_stat(); - unsigned long long position_acgt_content[READ_MAX_LEN][5]; + long long position_acgt_content[READ_MAX_LEN][5]; //map > position_acgt_content_ratio; }; class C_reads_pos_qual_stat{ public: C_reads_pos_qual_stat(); - unsigned long long position_qual[READ_MAX_LEN][MAX_QUAL]; + long long position_qual[READ_MAX_LEN][MAX_QUAL]; //map > position_qual_ratio; }; class C_reads_trim_stat{ public: C_reads_trim_stat(); - unsigned long long hlq[READ_MAX_LEN],ht[READ_MAX_LEN]; - unsigned long long ta[READ_MAX_LEN],tlq[READ_MAX_LEN],tt[READ_MAX_LEN]; + long long hlq[READ_MAX_LEN],ht[READ_MAX_LEN]; + long long ta[READ_MAX_LEN],tlq[READ_MAX_LEN],tt[READ_MAX_LEN]; }; class C_fastq_file_stat{ public: diff --git a/src/peprocess.cpp b/src/peprocess.cpp index 6965c04..3738aee 100644 --- a/src/peprocess.cpp +++ b/src/peprocess.cpp @@ -20,31 +20,40 @@ using namespace::std; #define READBUF 500 #define random(x) (rand()%x) -mutex pe_cal_m; -mutex read_m,stat_m,write_m; -unsigned long long buffer(1024*1024*2); -string new_fq1_path,new_fq2_path; -C_filter_stat local_fs[max_thread]; -C_fastq_file_stat local_raw_stat1[max_thread],local_raw_stat2[max_thread],local_trim_stat1[max_thread],local_trim_stat2[max_thread],local_clean_stat1[max_thread],local_clean_stat2[max_thread]; -queue > thread_read_buffer1[max_thread],thread_read_buffer2[max_thread],thread_write_buffer1[max_thread],thread_write_buffer2[max_thread]; -string sticky_reads1[max_thread],sticky_reads2[max_thread]; -gzFile gz_fq1,gz_fq2; -map pe1_out,pe2_out; -int queue_buffer=50; -int exceed_output1(0); -int exceed_output2(0); -int bq_check(0),pair_check(0); + +//int exceed_output1(0); +//int exceed_output2(0); + peProcess::peProcess(C_global_parameter m_gp){ //initialize gp=m_gp; gv=C_global_variable(); used_threads_num=0; - processed_reads=0; srand((unsigned)time(NULL)); ostringstream tmpstring; tmpstring< filter_items; - filter_items.push_back("Reads limited to output number"); - filter_items.push_back("Reads with filtered tile"); - filter_items.push_back("Reads with filtered fov"); - filter_items.push_back("Reads too short"); - filter_items.push_back("Reads too long"); - filter_items.push_back("Reads with global contam sequence"); - filter_items.push_back("Reads with contam sequence"); - filter_items.push_back("Reads with n rate exceed"); - filter_items.push_back("Reads with highA"); - filter_items.push_back("Reads with polyX"); - filter_items.push_back("Reads with low quality"); - filter_items.push_back("Reads with low mean quality"); - filter_items.push_back("Reads with small insert size"); - filter_items.push_back("Reads with adapter"); + filter_items.emplace_back("Reads limited to output number"); + filter_items.emplace_back("Reads with filtered tile"); + filter_items.emplace_back("Reads with filtered fov"); + filter_items.emplace_back("Reads too short"); + filter_items.emplace_back("Reads too long"); + filter_items.emplace_back("Reads with global contam sequence"); + filter_items.emplace_back("Reads with contam sequence"); + filter_items.emplace_back("Reads with n rate exceed"); + filter_items.emplace_back("Reads with highA"); + filter_items.emplace_back("Reads with polyX"); + filter_items.emplace_back("Reads with low quality"); + filter_items.emplace_back("Reads with low mean quality"); + filter_items.emplace_back("Reads with small insert size"); + filter_items.emplace_back("Reads with adapter"); - map filter_number,filter_pe1,filter_pe2,filter_overlap; + map filter_number,filter_pe1,filter_pe2,filter_overlap; filter_number["Reads with global contam sequence"]=gv.fs.include_global_contam_seq_num; filter_number["Reads with contam sequence"]=gv.fs.include_contam_seq_num; filter_number["Reads too short"]=gv.fs.short_len_num; @@ -164,8 +173,8 @@ void peProcess::print_stat(){ //print statistic information to the file filter_overlap["Reads with filtered fov"]=gv.fs.fov_num; filter_overlap["Reads too long"]=gv.fs.long_len_num_overlap; //filter_overlap["Reads limited to output number"]=gv.fs.output_reads_num; - unsigned long long total_filter_fq1_num=0; - for(map::iterator ix=filter_number.begin();ix!=filter_number.end();ix++){ + long long total_filter_fq1_num=0; + for(map::iterator ix=filter_number.begin();ix!=filter_number.end();ix++){ total_filter_fq1_num+=ix->second; } of_filter_stat<=20){ raw1_q20_num+=gv.raw1_stat.qs.position_qual[i][j]; @@ -374,14 +383,13 @@ void peProcess::print_stat(){ //print statistic information to the file of_readPos_qual_stat1<<"Mean\tMedian\tLower quartile\tUpper quartile\t10th percentile\t90th percentile"<=20){ clean1_q20_num+=gv.clean1_stat.qs.position_qual[i][j]; @@ -788,11 +796,16 @@ void peProcess::update_stat(C_fastq_file_stat& fq1s_stat,C_fastq_file_stat& fq2s } -void* peProcess::stat_pe_fqs(PEstatOption opt){ //statistic the pair-ends fastq +void* peProcess::stat_pe_fqs(PEstatOption opt,string dataType){ //statistic the pair-ends fastq opt.stat1->gs.reads_number+=opt.fq1s->size(); opt.stat2->gs.reads_number+=opt.fq2s->size(); vector::iterator ix_end=opt.fq1s->end(); - + int qualityBase=0; + if(dataType=="clean"){ + qualityBase=gp.outputQualityPhred; + }else{ + qualityBase=gp.qualityPhred; + } for(vector::iterator ix=opt.fq1s->begin();ix!=ix_end;ix++){ if((*ix).head_hdcut>0 || (*ix).head_lqcut>0){ if((*ix).head_hdcut>=(*ix).head_lqcut){ @@ -838,7 +851,7 @@ void* peProcess::stat_pe_fqs(PEstatOption opt){ //statistic the pair-ends fastq } int qual_len=(*ix).qual_seq.size(); for(string::size_type i=0;i!=qual_len;i++){ //process quality sequence - int base_quality=((*ix).qual_seq)[i]-gp.qualityPhred; + int base_quality=((*ix).qual_seq)[i]-qualityBase; /* if(base_quality>MAX_QUAL){ //cout<gs.read_length=(*ix).sequence.size(); opt.stat1->gs.base_number+=opt.stat1->gs.read_length; } + //check base quality setting whether ok + //run one time int q1_exceed(0),q1_normal_bq(0),q1_mean_bq(0); int q2_exceed(0),q2_normal_bq(0),q2_mean_bq(0); float q1_normal_ratio(0.0),q2_normal_ratio(0.0); @@ -939,21 +954,10 @@ void* peProcess::stat_pe_fqs(PEstatOption opt){ //statistic the pair-ends fastq } } bq_check++; - /* - if(normal_ratio>1){ - cerr<<"Error:code error"<::iterator ix2_end=opt.fq2s->end(); for(vector::iterator ix=opt.fq2s->begin();ix!=ix2_end;ix++){ if((*ix).head_hdcut>0 || (*ix).head_lqcut>0){ @@ -997,9 +1001,10 @@ void* peProcess::stat_pe_fqs(PEstatOption opt){ //statistic the pair-ends fastq } } } + //stat base quality related information int qual_len=(*ix).qual_seq.size(); for(string::size_type i=0;i!=qual_len;i++){ //process quality sequence - int base_quality=((*ix).qual_seq)[i]-gp.qualityPhred; + int base_quality=((*ix).qual_seq)[i]-qualityBase; /* if(base_quality>MAX_QUAL){ cerr<<"Error:quality is too high,please check the quality system parameter or fastq file"<gs.read_length=(*ix).sequence.size(); opt.stat2->gs.base_number+=opt.stat2->gs.read_length; } - + return &bq_check; } -void peProcess::filter_pe_fqs(PEcalOption opt){ +void peProcess::filter_pe_fqs(PEcalOption* opt){ //C_reads_trim_stat_2 cut_pos; - vector::iterator i2=opt.fq2s->begin(); - vector::iterator i_end=opt.fq1s->end(); - for(vector::iterator i=opt.fq1s->begin();i!=i_end;i++){ + vector::iterator i2=opt->fq2s->begin(); + vector::iterator i_end=opt->fq1s->end(); + for(vector::iterator i=opt->fq1s->begin();i!=i_end;i++){ C_pe_fastq_filter pe_fastq_filter=C_pe_fastq_filter(*i,*i2,gp); /*int head_hdcut,head_lqcut,tail_hdcut,tail_lqcut,adacut_pos; int contam_pos; @@ -1053,18 +1058,21 @@ void peProcess::filter_pe_fqs(PEcalOption opt){ if(!gp.trim_fq1.empty()){ preOutput(1,pe_fastq_filter.fq1); preOutput(2,pe_fastq_filter.fq2); - opt.trim_result1->push_back(pe_fastq_filter.fq1); - opt.trim_result2->push_back(pe_fastq_filter.fq2); + opt->trim_result1->emplace_back(pe_fastq_filter.fq1); + opt->trim_result2->emplace_back(pe_fastq_filter.fq2); } - if(pe_fastq_filter.pe_discard(opt.local_fs,gp)!=1){ + if(pe_fastq_filter.pe_discard(opt->local_fs,gp)!=1){ if(!gp.clean_fq1.empty()){ preOutput(1,pe_fastq_filter.fq1); preOutput(2,pe_fastq_filter.fq2); - opt.clean_result1->push_back(pe_fastq_filter.fq1); - opt.clean_result2->push_back(pe_fastq_filter.fq2); + opt->clean_result1->emplace_back(pe_fastq_filter.fq1); + opt->clean_result2->emplace_back(pe_fastq_filter.fq2); } } i2++; + if(i2==opt->fq2s->end()){ + break; + } } //return cut_pos; } @@ -1104,7 +1112,7 @@ void peProcess::peWrite_split(vector& pe1,vector& pe2){ output_split_fastqs("1",pe1); output_split_fastqs("2",pe2); } -void peProcess::peWrite(vector& pe1,vector& pe2,string type,gzFile out1,gzFile out2){ //output the sequences to files +void peProcess::peWrite(vector& pe1,vector& pe2,gzFile out1,gzFile out2){ //output the sequences to files /*if(gp.threads_num==1){ if(gp.mode=="nonssd" && gp.output_clean>0 && type=="clean"){ output_split_fastqs("1",pe1); @@ -1131,7 +1139,7 @@ void peProcess::peWrite(vector& pe1,vector& pe2,string type,gz output_fastqs("1",pe1,out1); output_fastqs("2",pe2,out2); } -void peProcess::peWrite(vector& pe1,vector& pe2,string type,FILE* out1,FILE* out2){ +void peProcess::peWrite(vector& pe1,vector& pe2,FILE* out1,FILE* out2){ output_fastqs("1",pe1,out1); output_fastqs("2",pe2,out2); } @@ -1194,7 +1202,7 @@ int peProcess::read(vector& pe1,vector& pe2,ifstream& infile1, } if(file1_line_num%4==0){ fastq1.qual_seq=buf1; - //fq1s.push_back(fastq1); + //fq1s.emplace_back(fastq1); } } @@ -1209,52 +1217,8 @@ int peProcess::read(vector& pe1,vector& pe2,ifstream& infile1, } if(file2_line_num%4==0){ fastq2.qual_seq=buf2; - pe1.push_back(fastq1); - pe2.push_back(fastq2); - } - }else{ - return -1; - } - } - return 0; -} -int peProcess::read_gz(vector& pe1,vector& pe2){ //read file from gz format,used in non-ssd mode - char buf1[READBUF],buf2[READBUF]; - C_fastq fastq1,fastq2; - C_fastq_init(fastq1,fastq2); - int file1_line_num(0),file2_line_num(0); - for(int i=0;i &fq1s,vector &fq2s){ check_disk_available(); - vector trim_result1,trim_result2,clean_result1,clean_result2; - PEcalOption opt2; - opt2.local_fs=&local_fs[index]; - opt2.fq1s=&fq1s; - opt2.fq2s=&fq2s; - opt2.trim_result1=&trim_result1; - opt2.trim_result2=&trim_result2; - opt2.clean_result1=&clean_result1; - opt2.clean_result2=&clean_result2; + + vector trim_result1; + vector trim_result2; + vector clean_result1; + vector clean_result2; + PEcalOption* opt2=new PEcalOption(); + opt2->local_fs=&local_fs[index]; + opt2->fq1s=&fq1s; + opt2->fq2s=&fq2s; + opt2->trim_result1=&trim_result1; + opt2->trim_result2=&trim_result2; + opt2->clean_result1=&clean_result1; + opt2->clean_result2=&clean_result2; + if(pair_check==0){ - string readid1=fq1s[0].seq_id; - string readid2=fq2s[0].seq_id; + string readid1=opt2->fq1s->front().seq_id; + string readid2=opt2->fq2s->front().seq_id; if(readid1.size()!=readid2.size()){ cerr<<"Warning:read ID in fq1 and fq2 seems not in pair, please check the input files if you are not sure"< &fq1s,vector &fq1s,vector &fq1s,vectorfs=*(opt2->local_fs); + tmp_gv->raw1_stat=*(opt_raw.stat1); + tmp_gv->raw2_stat=*(opt_raw.stat2); + //tmp_gv->trim1_stat=local_trim_stat1[index]; + //tmp_gv->trim2_stat=local_trim_stat2[index]; + tmp_gv->clean1_stat=*(opt_clean.stat1); + tmp_gv->clean2_stat=*(opt_clean.stat2); + peStreaming_stat(*tmp_gv); + delete tmp_gv; write_m.unlock(); } - + clean_result1.clear(); clean_result2.clear(); } check_disk_available(); + return; } -void* peProcess::sub_thread_nonssd(int index){ //sub thread process in non-ssd mode - of_log< fq1s,fq2s; - vector trim_result1,trim_result2,clean_result1,clean_result2; - int done(0); - while(1){ - read_m.lock(); - if(gp.fq1_path.find(".gz")==gp.fq1_path.size()-3){ - if(read_gz(fq1s,fq2s)==-1){ //read fastqs from raw files - done=1; - } - }else{ - if(read(fq1s,fq2s,nongzfp1,nongzfp2)==-1){ - done=1; - } - } - processed_reads+=fq1s.size(); - if(processed_reads%(gp.patchSize*gp.threads_num)==0){ - of_log<0 && fq2s.size()>0) + if(!fq1s.empty() && !fq2s.empty()) thread_process_reads(index,fq1s,fq2s); if(limit_end>0){ break; @@ -1700,7 +1567,7 @@ void* peProcess::sub_thread(int index){ //sub thread in ssd mode } if(!gp.clean_fq1.empty()){ if(index!=0){ - if(gp.output_clean<=0 && !(gp.total_reads_num>0 && gp.total_reads_num_random==false)){ + if(gp.output_clean<=0 && !(gp.total_reads_num>0 && !gp.total_reads_num_random)){ if(gp.clean_fq1.rfind(".gz")==gp.clean_fq1.size()-3){ gzclose(gz_clean_out1[index]); gzclose(gz_clean_out2[index]); @@ -1713,6 +1580,7 @@ void* peProcess::sub_thread(int index){ //sub thread in ssd mode } check_disk_available(); of_log<>"<>"<1){ - thread cat_t1(bind(&peProcess::run_cmd,this,cat_cmd1.str())); - thread cat_t2(bind(&peProcess::run_cmd,this,cat_cmd2.str())); - cat_t1.join(); - cat_t2.join(); - }else{ - run_cmd(cat_cmd1.str()); - run_cmd(cat_cmd2.str()); - } - } - - } - if(!gp.clean_fq1.empty()){ - ostringstream clean_out_fq1_tmp,clean_out_fq2_tmp; - if(i==index){ - clean_out_fq1_tmp<>"<>"<1){ - thread cat_t1(bind(&peProcess::run_cmd,this,cat_cmd1.str())); - thread cat_t2(bind(&peProcess::run_cmd,this,cat_cmd2.str())); - cat_t1.join(); - cat_t2.join(); - }else{ - run_cmd(cat_cmd1.str()); - run_cmd(cat_cmd2.str()); - } - } - } - } - -} void peProcess::create_thread_read(int index){ multi_gzfq1[index]=gzopen((gp.fq1_path).c_str(),"rb"); if(!multi_gzfq1[index]){ @@ -2018,8 +1800,8 @@ void* peProcess::sub_thread_nonssd_realMultiThreads(int index){ char buf1[READBUF],buf2[READBUF]; C_fastq fastq1,fastq2; C_fastq_init(fastq1,fastq2); - unsigned long long file1_line_num(0),file2_line_num(0); - unsigned long long block_line_num1(0),block_line_num2(0); + long long file1_line_num(0),file2_line_num(0); + long long block_line_num1(0),block_line_num2(0); int thread_read_block=4*gp.patchSize; vector fq1s,fq2s; while(1){ @@ -2046,19 +1828,20 @@ void* peProcess::sub_thread_nonssd_realMultiThreads(int index){ block_line_num2++; if(block_line_num2%4==1){ fastq2.seq_id.assign(buf2); - fastq2.seq_id.erase(fastq2.seq_id.size()-1); + fastq2.seq_id.erase(fastq2.seq_id.size()-1,1); }else if(block_line_num2%4==2){ fastq2.sequence.assign(buf2); - fastq2.sequence.erase(fastq2.sequence.size()-1); + fastq2.sequence.erase(fastq2.sequence.size()-1,1); }else if(block_line_num2%4==0){ fastq2.qual_seq.assign(buf2); - fastq2.qual_seq.erase(fastq2.qual_seq.size()-1); - fq1s.push_back(fastq1); - fq2s.push_back(fastq2); + fastq2.qual_seq.erase(fastq2.qual_seq.size()-1,1); + fq1s.emplace_back(fastq1); + fq2s.emplace_back(fastq2); if(fq1s.size()==gp.patchSize){ - if(index==0) - of_log<0){ break; } @@ -2067,9 +1850,9 @@ void* peProcess::sub_thread_nonssd_realMultiThreads(int index){ } file2_line_num++; }else{ - if(fq1s.size()>0){ + if(!fq1s.empty()){ thread_process_reads(index,fq1s,fq2s); - if(limit_end){ + if(limit_end>0){ break; } } @@ -2102,51 +1885,7 @@ void* peProcess::sub_thread_nonssd_realMultiThreads(int index){ } } of_log< > thread_read_buffer[max_thread],thread_write_buffer[max_thread]; - //mutex thread_read_m[max_thread],thread_write_m[max_thread]; - vector fq1s,fq2s; - int done(0); - while(1){ - if(file_end){ - break; - }else{ - int ready(0); - for(int i=0;i!=gp.threads_num;i++){ - if(thread_read_m[i].try_lock()){ - cout<<"monitor: sub thread "< include_threads; int last_thread(0); int sticky_end(0); - unsigned long long cur_total(0); + long long cur_total(0); for(int i=0;i!=gp.threads_num;i++){ cur_total+=local_clean_stat1[i].gs.reads_number; if(cur_total>gp.l_total_reads_num){ @@ -2297,7 +2036,7 @@ void peProcess::process_some_reads(int index,int out_number){ char buf1[READBUF],buf2[READBUF]; C_fastq fastq1,fastq2; C_fastq_init(fastq1,fastq2); - unsigned long long file1_line_num(0),file2_line_num(0); + long long file1_line_num(0),file2_line_num(0); vector fq1s,fq2s; int processed_number(0),rest_number(out_number); while(1){ @@ -2335,8 +2074,8 @@ void peProcess::process_some_reads(int index,int out_number){ }else if(file2_line_num%4==3){ fastq2.qual_seq.assign(buf2); fastq2.qual_seq.erase(fastq2.qual_seq.size()-1); - fq1s.push_back(fastq1); - fq2s.push_back(fastq2); + fq1s.emplace_back(fastq1); + fq2s.emplace_back(fastq2); if(fq1s.size()==gp.patchSize || fq1s.size()==rest_number){ of_log< &fq1s,vector0 && gp.total_reads_num_random==false)){ - gzclose(gz_clean_out1[0]); - gzclose(gz_clean_out2[0]); + if(gp.output_clean<=0 && !(gp.l_total_reads_num>0 && !gp.total_reads_num_random)){ + if(gp.clean_fq1.rfind(".gz")==gp.clean_fq1.size()-3){ + gzclose(gz_clean_out1[0]); + gzclose(gz_clean_out2[0]); + }else{ + fclose(nongz_clean_out1[0]); + fclose(nongz_clean_out2[0]); + } } } if(gp.fq1_path.rfind(".gz")==gp.fq1_path.size()-3){ @@ -2606,34 +2350,6 @@ void peProcess::process(){ of_log.close(); } -void peProcess::output_fastqs2(int type,vector &fq1,ofstream& outfile){ - //m.lock(); - string out_content; - - for(int i=0;i!=fq1.size();i++){ - //for(vector::iterator i=fq1->begin();i!=fq1->end();i++){ - if(gp.output_file_type=="fasta"){ - fq1[i].seq_id=fq1[i].seq_id.replace(fq1[i].seq_id.find("@"),1,">"); - out_content+=fq1[i].seq_id+"\n"+fq1[i].sequence+"\n"; - }else if(gp.output_file_type=="fastq"){ - if(gp.outputQualityPhred!=gp.qualityPhred){ - for(string::size_type ix=0;ix!=fq1[i].qual_seq.size();ix++){ - fq1[i].qual_seq[ix]=(char)(fq1[i].qual_seq[ix]-gp.outputQualityPhred); - } - } - out_content+=fq1[i].seq_id+"\n"+fq1[i].sequence+"\n+\n"+fq1[i].qual_seq+"\n"; - }else{ - cerr<<"Error:output_file_type value error"< &fq1,gzFile outfile){ //m.lock(); string out_content,streaming_out; @@ -2861,15 +2577,15 @@ void peProcess::peStreaming_stat(C_global_variable& local_gv){ /*int read_max_length; int read_length; int reads_number; - unsigned long long base_number; - unsigned long long a_number,c_number,g_number,t_number,n_number; - //unsigned long long a_ratio,c_ratio,g_ratio,t_ratio,n_ratio; - unsigned long long q20_num,q30_num; + long long base_number; + long long a_number,c_number,g_number,t_number,n_number; + //long long a_ratio,c_ratio,g_ratio,t_ratio,n_ratio; + long long q20_num,q30_num; */ cout<<"#Fq1_statistical_information"<<"\n"; cout<* fq1s,*fq2s; C_fastq_file_stat* stat1,*stat2; + PEstatOption(){ + fq1s=fq2s=NULL; + stat1=stat2=NULL; + } }; struct PEcalOption { C_filter_stat* local_fs; vector* fq1s,*fq2s; vector *trim_result1,*trim_result2,*clean_result1,*clean_result2; -}; -struct PEthreadOpt -{ - vector files; - int index; + PEcalOption(){ + local_fs=NULL; + fq1s=NULL; + fq2s=NULL; + trim_result1=trim_result2=clean_result1=clean_result2=NULL; + } }; class peProcess{ public: - peProcess(C_global_parameter m_gp); - void q_clear(queue& a); + explicit peProcess(C_global_parameter m_gp); void process(); - void brother(); void print_stat(); void update_stat(C_fastq_file_stat& fq1s_stat,C_fastq_file_stat& fq2s_stat,C_filter_stat& fs_stat,string type); - void* stat_pe_fqs(PEstatOption opt); - void filter_pe_fqs(PEcalOption opt); + void* stat_pe_fqs(PEstatOption opt,string dataType); + void filter_pe_fqs(PEcalOption* opt); void* sub_thread(int index); int read(vector& pe1,vector& pe2,ifstream& infile1,ifstream& infile2); - void peWrite(vector& pe1,vector& pe2,string type,gzFile out1,gzFile out2); - void peWrite(vector& pe1,vector& pe2,string type,FILE* out1,FILE* out2); + void peWrite(vector& pe1,vector& pe2,gzFile out1,gzFile out2); + void peWrite(vector& pe1,vector& pe2,FILE* out1,FILE* out2); //void add_raw_trim(C_fastq_file_stat& a,C_fastq_file_stat& a2,C_reads_trim_stat& b,C_reads_trim_stat& b2); void peWrite_split(vector& pe1,vector& pe2); //void peRead(); - void* doCal(int index); void preOutput(int type,C_fastq& a); void output_fastqs(string type,vector &fq1,gzFile outfile); void output_fastqs(string type,vector &fq1,FILE* outfile); - void output_fastqs2(int type,vector &fq1,ofstream& outfile); void output_split_fastqs(string type,vector &fq1); //void peWrite(int num); void run_pigz(int type); - void get_line_number(int* line_num); void merge_stat(); - void merge_data(); void merge_trim_data(); void merge_clean_data(); void merge_clean_data(int index); void C_fastq_init(C_fastq& a,C_fastq& b); void process_nonssd(); - void* sub_thread_nonssd(int index); - void* sub_thread_nonssd_multiOut(int index); void* sub_thread_nonssd_realMultiThreads(int index); - int read_gz(vector& pe1,vector& pe2); void merge_stat_nonssd(); void peStreaming_stat(C_global_variable& local_gv); - void* gzread_(int index,gzFile a); void make_tmpDir(); void remove_tmpDir(); - void* sub_thread_nonssd_threadBuffer(int index); - void* monitor_read_thread(); - void* monitor_write_thread(); + //void* monitor_read_thread(); + //void* monitor_write_thread(); void create_thread_read(int index); - void create_thread_outputFile(int index); void create_thread_trimoutputFile(int index); void create_thread_cleanoutputFile(int index); void thread_process_reads(int index,vector &fq1s,vector &fq2s); @@ -92,7 +80,6 @@ class peProcess{ void run_extract_random(); void process_some_reads(int index,int out_number); void merge_stat(int index); - void merge_data(int index); void limit_process_reads(int index,vector &fq1s,vector &fq2s,gzFile gzfq1,gzFile gzfq2); void check_disk_available(); //void peOutput(outputOption opt); @@ -100,39 +87,38 @@ class peProcess{ C_global_parameter gp; C_global_variable gv; //queue read_queue1,read_queue2,cal_trim_queue1,cal_trim_queue2,cal_clean_queue1,cal_clean_queue2; - int read_num; - int cal_num; - int write_num; - int read_end,cal_end; - string brother_stat,write_stat; - mutex read_cal_m,cal_write_m; - long long processed_reads; //vector trim_merge1[max_thread],trim_merge2[max_thread],clean_merge1[max_thread],clean_merge2[max_thread]; - gzFile gzfp1,gzfp2; - gzFile multi_gzfq1[max_thread],multi_gzfq2[max_thread]; - ifstream nongzfp1,nongzfp2; - gzFile gz_trim_out1[max_thread],gz_trim_out2[max_thread]; - gzFile gz_clean_out1[max_thread],gz_clean_out2[max_thread]; - FILE* nongz_clean_out1[max_thread]; - FILE* nongz_clean_out2[max_thread]; - gzFile gz_trim_out1_nonssd,gz_trim_out2_nonssd,gz_clean_out1_nonssd,gz_clean_out2_nonssd; +// gzFile gzfp1,gzfp2; + gzFile *multi_gzfq1,*multi_gzfq2; + //ifstream nongzfp1,nongzfp2; + gzFile* gz_trim_out1,*gz_trim_out2; + gzFile* gz_clean_out1,*gz_clean_out2; + FILE** nongz_clean_out1; + FILE** nongz_clean_out2; ofstream of_log; - off_t t_start_pos[max_thread]; - off_t t_end_pos[max_thread]; - char* src1,*src2; + off_t* t_start_pos; + off_t* t_end_pos; +// char* src1,*src2; int fq1fd,fq2fd; string tmp_dir; int limit_end; - mutex thread_read_m[max_thread],thread_write_m[max_thread]; - bool file_end; + //mutex* thread_read_m,*thread_write_m; + //bool file_end; + + mutex pe_cal_m; + mutex read_m,write_m; + int buffer; + string new_fq1_path,new_fq2_path; + C_filter_stat *local_fs; + C_fastq_file_stat* local_raw_stat1,*local_raw_stat2,*local_trim_stat1,*local_trim_stat2,*local_clean_stat1,*local_clean_stat2; + //queue > thread_read_buffer1[max_thread],thread_read_buffer2[max_thread]; + string *sticky_reads1,*sticky_reads2; + gzFile gz_fq1,gz_fq2; + map pe1_out,pe2_out; + //int queue_buffer; + int bq_check,pair_check; private: - vector fq1s,fq2s; - vector trim_output_fq1,trim_output_fq2; - vector clean_output_fq1,clean_output_fq2; - - //C_fastq fastq1,fastq2; int used_threads_num; - string random_num; }; /* class peProcess{ diff --git a/src/process_argv.cpp b/src/process_argv.cpp index 462e494..db602c2 100644 --- a/src/process_argv.cpp +++ b/src/process_argv.cpp @@ -8,7 +8,7 @@ //#include using namespace::std; #define ADA_RATIO 0.4 -map> wrong_paras; +map > wrong_paras; void check_module(int argc,char* argv[]){ if(argc<2){ printModule(); @@ -127,7 +127,7 @@ int global_parameter_initial(int argc,char* argv[],C_global_parameter& gp){ gp.max_read_length=49; } gp.log="log"; - int error=0; +// int error=0; while (-1 != (nextOpt = getopt_long(argc, argv, shortOptions, longOptions, NULL))) { switch (nextOpt) @@ -234,14 +234,14 @@ int global_parameter_initial(int argc,char* argv[],C_global_parameter& gp){ break; } case 'w':gp.output_clean=atoi(optarg);break; - case 'M':gp.adaMis=atoi(optarg);wrong_paras["filtersRNA"].push_back("-M|--adaMis");break; - case 'A':gp.adaMR=atof(optarg);wrong_paras["filtersRNA"].push_back("-A|adaMR");break; - case '9':gp.adaEdge=atoi(optarg);wrong_paras["filtersRNA"].push_back("-9|--adaEdge");break; - case 'S':gp.adaRCtg=atoi(optarg);wrong_paras["filter"].push_back("-S|--adaRCtg");break; - case 's':gp.adaRAr=atof(optarg);wrong_paras["filter"].push_back("-s|--adaRAr");break; - case 'U':gp.adaRMa=atoi(optarg);wrong_paras["filter"].push_back("-U|--adaRMa");break; - case 'u':gp.adaREr=atof(optarg);wrong_paras["filter"].push_back("-u|--adaREr");break; - case 'b':gp.adaRMm=atoi(optarg);wrong_paras["filter"].push_back("-b|--adaRMm");break; + case 'M':gp.adaMis=atoi(optarg);wrong_paras["filtersRNA"].emplace_back("-M|--adaMis");break; + case 'A':gp.adaMR=atof(optarg);wrong_paras["filtersRNA"].emplace_back("-A|adaMR");break; + case '9':gp.adaEdge=atoi(optarg);wrong_paras["filtersRNA"].emplace_back("-9|--adaEdge");break; + case 'S':gp.adaRCtg=atoi(optarg);wrong_paras["filter"].emplace_back("-S|--adaRCtg");break; + case 's':gp.adaRAr=atof(optarg);wrong_paras["filter"].emplace_back("-s|--adaRAr");break; + case 'U':gp.adaRMa=atoi(optarg);wrong_paras["filter"].emplace_back("-U|--adaRMa");break; + case 'u':gp.adaREr=atof(optarg);wrong_paras["filter"].emplace_back("-u|--adaREr");break; + case 'b':gp.adaRMm=atoi(optarg);wrong_paras["filter"].emplace_back("-b|--adaRMm");break; // case 'd':gp.rmdup = true;break; case '0':gp.log.assign(optarg);break; case 'v':printVersion();return 1; @@ -272,6 +272,7 @@ int global_parameter_initial(int argc,char* argv[],C_global_parameter& gp){ gp.adaEdge=min_adapter_length*ADA_RATIO; } */ + return 0; } bool check_parameter(int argc,char* argv[],C_global_parameter& gp){ if(!gp.fq1_path.empty()){ @@ -484,11 +485,12 @@ bool check_parameter(int argc,char* argv[],C_global_parameter& gp){ cerr<<"Error:patchSize cannot exceed 5M considering memory usage"< ChenYuXin"< hasContams(string& ref_sequence,string& contam,C_global_parameter& g for(int i=0;i!=contams.size();i++){ float tmp_mr=atof(mrs[i].c_str()); int tmp_pos=hasContam(ref_sequence,contams[i],gp,tmp_mr); - return_poses.push_back(tmp_pos); + return_poses.emplace_back(tmp_pos); if(tmp_pos>=0 && tmp_pos hasGlobalContams(string& ref_sequence,C_global_parameter& gp){ }else{ push_pos=reverse_pos; } - return_poses.push_back(push_pos); + return_poses.emplace_back(push_pos); if(push_pos>=0 && push_pos se_out; -int exceed_output_se(0); -string se_new_fq1_path; -unsigned long long se_buffer(1024*1024*2); -int se_bq_check(0); + seProcess::seProcess(C_global_parameter m_gp){ gp=m_gp; gv=C_global_variable(); used_threads_num=0; - processed_reads=0; srand((unsigned)time(NULL)); ostringstream tmpstring; tmpstring< filter_items; - filter_items.push_back("Reads limited to output number"); - filter_items.push_back("Reads with filtered tile"); - filter_items.push_back("Reads with filtered fov"); - filter_items.push_back("Reads too short"); - filter_items.push_back("Reads too long"); - filter_items.push_back("Reads with contam sequence"); - filter_items.push_back("Reads with n rate exceed"); - filter_items.push_back("Reads with highA"); - filter_items.push_back("Reads with polyX"); - filter_items.push_back("Reads with low quality"); - filter_items.push_back("Reads with low mean quality"); - filter_items.push_back("Reads with adapter"); - filter_items.push_back("Reads with global contam sequence"); - map filter_number; + filter_items.emplace_back("Reads limited to output number"); + filter_items.emplace_back("Reads with filtered tile"); + filter_items.emplace_back("Reads with filtered fov"); + filter_items.emplace_back("Reads too short"); + filter_items.emplace_back("Reads too long"); + filter_items.emplace_back("Reads with contam sequence"); + filter_items.emplace_back("Reads with n rate exceed"); + filter_items.emplace_back("Reads with highA"); + filter_items.emplace_back("Reads with polyX"); + filter_items.emplace_back("Reads with low quality"); + filter_items.emplace_back("Reads with low mean quality"); + filter_items.emplace_back("Reads with adapter"); + filter_items.emplace_back("Reads with global contam sequence"); + map filter_number; filter_number["Reads with contam sequence"]=gv.fs.include_contam_seq_num; filter_number["Reads with global contam sequence"]=gv.fs.include_global_contam_seq_num; filter_number["Reads too short"]=gv.fs.short_len_num; @@ -105,7 +110,7 @@ void seProcess::print_stat(){ filter_number["Reads too long"]=gv.fs.long_len_num; //filter_number["Reads limited to output number"]=gv.fs.output_reads_num; unsigned long long total_filter_fq1_num=0; - for(map::iterator ix=filter_number.begin();ix!=filter_number.end();ix++){ + for(map::iterator ix=filter_number.begin();ix!=filter_number.end();ix++){ total_filter_fq1_num+=ix->second; } //int total_filter_fq1_num=gv.fs.output_reads_num+gv.fs.include_contam_seq_num+gv.fs.include_adapter_seq_num+gv.fs.n_ratio_num+gv.fs.highA_num+gv.fs.tile_num+gv.fs.low_qual_base_ratio_num+gv.fs.mean_quality_num+gv.fs.short_len_num; @@ -456,9 +461,15 @@ void seProcess::update_stat(C_fastq_file_stat& fq1s_stat,C_filter_stat& fs_stat, } -void* seProcess::stat_se_fqs(SEstatOption opt){ +void* seProcess::stat_se_fqs(SEstatOption opt,string dataType){ opt.stat1->gs.reads_number+=opt.fq1s->size(); - int normal_bq(0); + //int normal_bq(0); + int qualityBase=0; + if(dataType=="clean"){ + qualityBase=gp.outputQualityPhred; + }else{ + qualityBase=gp.qualityPhred; + } for(vector::iterator ix=opt.fq1s->begin();ix!=opt.fq1s->end();ix++){ if((*ix).head_hdcut>0 || (*ix).head_lqcut>0){ if((*ix).head_hdcut>=(*ix).head_lqcut){ @@ -501,7 +512,7 @@ void* seProcess::stat_se_fqs(SEstatOption opt){ } } for(string::size_type i=0;i!=(*ix).qual_seq.size();i++){ //process quality sequence - int base_quality=((*ix).qual_seq)[i]-gp.qualityPhred; + int base_quality=((*ix).qual_seq)[i]-qualityBase; /* if(base_quality>MAX_QUAL){ cerr<<"Error:quality is too high,please check the quality system parameter or fastq file"<push_back(se_fastq_filter.read); + opt.trim_result1->emplace_back(se_fastq_filter.read); } int whether_discard(0); if(gp.module_name=="filtersRNA"){ @@ -647,7 +659,7 @@ void seProcess::filter_se_fqs(SEcalOption opt){ if(whether_discard!=1){ if(!gp.clean_fq1.empty()){ preOutput(1,se_fastq_filter.read); - opt.clean_result1->push_back(se_fastq_filter.read); + opt.clean_result1->emplace_back(se_fastq_filter.read); } } } @@ -671,10 +683,10 @@ void seProcess::preOutput(int type,C_fastq& a){ void seProcess::seWrite_split(vector& se1){ output_split_fastqs("1",se1); } -void seProcess::seWrite(vector& se1,string type,gzFile out1){ +void seProcess::seWrite(vector& se1,gzFile out1){ output_fastqs("1",se1,out1); } -void seProcess::seWrite(vector& se1,string type,FILE* out1){ +void seProcess::seWrite(vector& se1,FILE* out1){ output_fastqs("1",se1,out1); } void seProcess::C_fastq_init(C_fastq& a){ @@ -721,59 +733,17 @@ int seProcess::read(vector& pe1,ifstream& infile1){ } if(file1_line_num%4==0){ fastq1.qual_seq=buf1; - pe1.push_back(fastq1); - //fq1s.push_back(fastq1); + pe1.emplace_back(fastq1); + //fq1s.emplace_back(fastq1); } }else{ return -1; } } + return 0; } -int seProcess::read_gz(vector& se1){ - char buf1[READBUF]; - C_fastq fastq1; - C_fastq_init(fastq1); - int file1_line_num(0); - for(int i=0;i0 && gp.total_reads_num_random==false)){ - //create_thread_outputFile(index); create_thread_trimoutputFile(index); create_thread_cleanoutputFile(index); }else if(!gp.trim_fq1.empty()){ create_thread_trimoutputFile(index); } - //create_thread_outputFile(index); - + //char *fq1_buf,*fq2_buf; unsigned long long mmap_start(start_pos); unsigned long long copysz(se_buffer); @@ -880,7 +848,7 @@ void* seProcess::sub_thread(int index){ for(int i=head_idx;i<=tail_idx;i++){ if(buf1[i]=='\n'){ if(line_num%4==3){ - fq1s.push_back(fastq1); + fq1s.emplace_back(fastq1); fastq1.seq_id.clear(); fastq1.sequence.clear(); fastq1.qual_seq.clear(); @@ -922,12 +890,17 @@ void* seProcess::sub_thread(int index){ if(!gp.clean_fq1.empty()){ if(index!=0){ if(gp.output_clean<=0 && !(gp.total_reads_num>0 && gp.total_reads_num_random==false)){ - gzclose(gz_clean_out1[index]); + if(gp.clean_fq1.rfind(".gz")==gp.clean_fq1.size()-3){ + gzclose(gz_clean_out1[index]); + }else{ + fclose(nongz_clean_out1[index]); + } } } } check_disk_available(); of_log<16?gp.threads_num:16; - int split_line_num=gp.split_line*4; - if(type==1){ - if(gp.fq1_path.rfind(".gz")==gp.fq1_path.size()-3){ - cmd1<<"pigz -c -d -p "<>"<>"<>"<>"<0 && gp.total_reads_num_random==false)){ - //create_thread_outputFile(index); create_thread_trimoutputFile(index); create_thread_cleanoutputFile(index); }else if(!gp.trim_fq1.empty()){ create_thread_trimoutputFile(index); } - //create_thread_outputFile(index); create_thread_read(index); char buf1[READBUF]; @@ -1225,7 +1076,7 @@ void* seProcess::sub_thread_nonssd_realMultiThreads(int index){ if(block_line_num1%4==0){ fastq1.qual_seq.assign(buf1); fastq1.qual_seq.erase(fastq1.qual_seq.size()-1); - fq1s.push_back(fastq1); + fq1s.emplace_back(fastq1); if(fq1s.size()==gp.patchSize){ if(index==0) of_log<0){ + if(!fq1s.empty()){ thread_process_reads(index,fq1s); if(limit_end>0){ break; @@ -1257,16 +1108,17 @@ void* seProcess::sub_thread_nonssd_realMultiThreads(int index){ gzclose(gz_trim_out1[index]); } if(!gp.clean_fq1.empty()){ - if(gp.output_clean<=0 && !(gp.total_reads_num>0 && gp.total_reads_num_random==false)) - if(gp.clean_fq1.rfind(".gz")==gp.clean_fq1.size()-3){ + if(gp.output_clean<=0 && !(gp.total_reads_num>0 && gp.total_reads_num_random==false)) { + if (gp.clean_fq1.rfind(".gz") == gp.clean_fq1.size() - 3) { gzclose(gz_clean_out1[index]); - }else{ + } else { fclose(nongz_clean_out1[index]); } - + } } check_disk_available(); of_log< &fq1s){ SEstatOption opt_raw; opt_raw.fq1s=&fq1s; opt_raw.stat1=&se_local_raw_stat1[index]; - stat_se_fqs(opt_raw); //statistic raw fastqs + stat_se_fqs(opt_raw,"raw"); //statistic raw fastqs fq1s.clear(); //add_raw_trim(se_local_raw_stat1[index],raw_cut); @@ -1360,7 +1212,7 @@ void seProcess::thread_process_reads(int index,vector &fq1s){ if(!gp.trim_fq1.empty()){ //trim means only trim but not discard. opt_trim.fq1s=&trim_result1; opt_trim.stat1=&se_local_trim_stat1[index]; - stat_se_fqs(opt_trim); //statistic trim fastqs + stat_se_fqs(opt_trim,"trim"); //statistic trim fastqs } /* if(!gp.clean_fq1.empty()){ @@ -1371,7 +1223,7 @@ void seProcess::thread_process_reads(int index,vector &fq1s){ */ // if(!gp.trim_fq1.empty()){ - seWrite(trim_result1,"trim",gz_trim_out1[index]); //output trim files + seWrite(trim_result1,gz_trim_out1[index]); //output trim files trim_result1.clear(); } if(!gp.clean_fq1.empty()){ @@ -1398,27 +1250,28 @@ void seProcess::thread_process_reads(int index,vector &fq1s){ se_write_m.unlock(); }else{ if(gp.clean_fq1.rfind(".gz")==gp.clean_fq1.size()-3){ - seWrite(clean_result1,"clean",gz_clean_out1[index]); //output clean files + seWrite(clean_result1,gz_clean_out1[index]); //output clean files }else{ - seWrite(clean_result1,"clean",nongz_clean_out1[index]); //output clean files + seWrite(clean_result1,nongz_clean_out1[index]); //output clean files } } opt_clean.fq1s=&clean_result1; - stat_se_fqs(opt_clean); //statistic clean fastqs + stat_se_fqs(opt_clean,"clean"); //statistic clean fastqs clean_result1.clear(); /*thread_write_m[index].lock(); thread pewrite_t(bind(&seProcess::peWrite,this,clean_result1,clean_result2,"clean",gz_clean_out1[index],gz_clean_out2[index])); thread_write_m[index].unlock();*/ if(gp.is_streaming){ se_write_m.lock(); - C_global_variable tmp_gv; - tmp_gv.fs=*(opt2.se_local_fs); - tmp_gv.raw1_stat=*(opt_raw.stat1); - //tmp_gv.trim1_stat=local_trim_stat1[index]; - //tmp_gv.trim2_stat=local_trim_stat2[index]; - tmp_gv.clean1_stat=*(opt_clean.stat1); - seStreaming_stat(tmp_gv); + C_global_variable* tmp_gv=new C_global_variable(); + tmp_gv->fs=*(opt2.se_local_fs); + tmp_gv->raw1_stat=*(opt_raw.stat1); + //tmp_gv->trim1_stat=local_trim_stat1[index]; + //tmp_gv->trim2_stat=local_trim_stat2[index]; + tmp_gv->clean1_stat=*(opt_clean.stat1); + seStreaming_stat(*tmp_gv); + delete tmp_gv; se_write_m.unlock(); } } @@ -1524,7 +1377,7 @@ void seProcess::process_some_reads(int index,int out_number){ if(file1_line_num%4==3){ fastq1.qual_seq.assign(buf1); fastq1.qual_seq.erase(fastq1.qual_seq.size()-1); - fq1s.push_back(fastq1); + fq1s.emplace_back(fastq1); if(fq1s.size()==gp.patchSize || fq1s.size()==rest_number){ of_log< &fq1s,gzFile gzfq1 SEstatOption opt_clean; opt_clean.fq1s=&fq1s; opt_clean.stat1=&se_local_clean_stat1[index]; - stat_se_fqs(opt_clean); //statistic raw fastqs - seWrite(fq1s,"clean",gzfq1); + stat_se_fqs(opt_clean,"raw"); //statistic raw fastqs + seWrite(fq1s,gzfq1); fq1s.clear(); } void seProcess::process(){ @@ -1627,7 +1480,7 @@ void seProcess::process(){ for(int i=0;i!=fq1_unprocess.size();i++){ if(fq1_unprocess[i]=='\n'){ if(line_num%4==3){ - fq1s.push_back(fastq1); + fq1s.emplace_back(fastq1); fastq1.seq_id.clear(); fastq1.sequence.clear(); fastq1.qual_seq.clear(); @@ -1653,8 +1506,13 @@ void seProcess::process(){ gzclose(gz_trim_out1[0]); } if(!gp.clean_fq1.empty()){ - if(gp.output_clean<=0 && !(gp.l_total_reads_num>0 && gp.total_reads_num_random==false)) - gzclose(gz_clean_out1[0]); + if(gp.output_clean<=0 && !(gp.l_total_reads_num>0 && gp.total_reads_num_random==false)) { + if (gp.clean_fq1.rfind(".gz") == gp.clean_fq1.size() - 3) { + gzclose(gz_clean_out1[0]); + } else { + fclose(nongz_clean_out1[0]); + } + } } // @@ -1742,33 +1600,6 @@ void seProcess::make_tmpDir(){ exit(1); } } -void seProcess::output_fastqs2(int type,vector &fq1,ofstream& outfile){ - //m.lock(); - string out_content; - for(int i=0;i!=fq1.size();i++){ - //for(vector::iterator i=fq1->begin();i!=fq1->end();i++){ - if(gp.output_file_type=="fasta"){ - fq1[i].seq_id=fq1[i].seq_id.replace(fq1[i].seq_id.find("@"),1,">"); - out_content+=fq1[i].seq_id+"\n"+fq1[i].sequence+"\n"; - }else if(gp.output_file_type=="fastq"){ - if(gp.outputQualityPhred!=gp.qualityPhred){ - for(string::size_type ix=0;ix!=fq1[i].qual_seq.size();ix++){ - fq1[i].qual_seq[ix]=(char)(fq1[i].qual_seq[ix]-gp.outputQualityPhred); - } - } - out_content+=fq1[i].seq_id+"\n"+fq1[i].sequence+"\n+\n"+fq1[i].qual_seq+"\n"; - }else{ - cerr<<"Error:output_file_type value error"< &fq1,gzFile outfile){ //m.lock(); string out_content,streaming_out; @@ -1919,33 +1750,11 @@ void seProcess::output_split_fastqs(string type,vector &fq1){ gzwrite(gz_fq_se,out_content.c_str(),out_content.size()); } } - //m.unlock(); } void seProcess::seStreaming_stat(C_global_variable& local_gv){ cout<<"#Total_statistical_information"<<"\n"; - /*int output_reads_num; - int in_adapter_list_num; - int include_adapter_seq_num; - int include_contam_seq_num; - int n_ratio_num; - int highA_num,polyX_num; - int tile_num,fov_num; - int low_qual_base_ratio_num; - int mean_quality_num; - int short_len_num,long_len_num; - int over_lapped_num; - int no_3_adapter_num,int_insertNull_num; - */ int total=local_gv.fs.include_adapter_seq_num+local_gv.fs.include_contam_seq_num+local_gv.fs.low_qual_base_ratio_num+local_gv.fs.mean_quality_num+local_gv.fs.n_ratio_num+local_gv.fs.over_lapped_num+local_gv.fs.highA_num+local_gv.fs.polyX_num; cout<* fq1s; vector *trim_result1,*clean_result1; }; -struct SEthreadOpt -{ - vector files; - int index; -}; class seProcess{ public: - seProcess(C_global_parameter m_gp); - void q_clear(queue& a); + explicit seProcess(C_global_parameter m_gp); void process(); - void brother(); void print_stat(); void update_stat(C_fastq_file_stat& fq1s_stat,C_filter_stat& fs_stat,string type); - void* stat_se_fqs(SEstatOption opt); + void* stat_se_fqs(SEstatOption opt,string dataType); //void add_raw_trim(C_fastq_file_stat& a,C_reads_trim_stat& b); void filter_se_fqs(SEcalOption opt); void* sub_thread(int index); int read(vector& se1,ifstream& infile1); - void seWrite(vector& se1,string type,gzFile out1); - void seWrite(vector& se1,string type,FILE* out1); + void seWrite(vector& se1,gzFile out1); + void seWrite(vector& se1,FILE* out1); void seWrite_split(vector& se1); //void peRead(); - void* doCal(int index); void preOutput(int type,C_fastq& a); void output_fastqs(string type,vector &fq1,gzFile outfile); void output_fastqs(string type,vector &fq1,FILE* outfile); - void output_fastqs2(int type,vector &fq1,ofstream& outfile); void output_split_fastqs(string type,vector &fq1); - void seWrite(int num); - void run_pigz_split(int type); - void get_line_number(int* line_num); void merge_stat(); - void merge_data(); void merge_trim_data(); void merge_clean_data(); void merge_clean_data(int index); void C_fastq_init(C_fastq& a); void process_nonssd(); - void* sub_thread_nonssd(int index); - void* sub_thread_nonssd_multiOut(int index); - int read_gz(vector& se1); void merge_stat_nonssd(); void seStreaming_stat(C_global_variable& local_gv); void remove_tmpDir(); void make_tmpDir(); void run_pigz(); void thread_process_reads(int index,vector &fq1s); - void create_thread_outputFile(int index); void create_thread_trimoutputFile(int index); void create_thread_cleanoutputFile(int index); void* sub_thread_nonssd_realMultiThreads(int index); @@ -79,36 +62,32 @@ class seProcess{ void run_extract_random(); void process_some_reads(int index,int out_number); void merge_stat(int index); - void merge_data(int index); void limit_process_reads(int index,vector &fq1s,gzFile gzfq1); void check_disk_available(); //void peOutput(outputOption opt); public: C_global_parameter gp; C_global_variable gv; - queue read_queue1,cal_trim_queue1,cal_clean_queue1; - int read_num; - int cal_num; - int write_num; - int read_end,cal_end; - string brother_stat,write_stat; - mutex read_cal_m,cal_write_m; - long long processed_reads; - vector trim_merge1[max_thread],clean_merge1[max_thread]; gzFile gzfp1; - ifstream nongzfp1; - gzFile gz_trim_out1[max_thread]; - gzFile gz_clean_out1[max_thread]; - FILE* nongz_clean_out1[max_thread]; - gzFile gz_trim_out1_nonssd,gz_clean_out1_nonssd; + gzFile *gz_trim_out1; + gzFile *gz_clean_out1; + FILE** nongz_clean_out1; ofstream of_log; string tmp_dir; - off_t t_start_pos[max_thread]; - off_t t_end_pos[max_thread]; - char* src1; + off_t* t_start_pos; + off_t* t_end_pos; int fq1fd; - gzFile multi_gzfq1[max_thread]; + gzFile* multi_gzfq1; int limit_end; + mutex se_stat_m,se_write_m; + C_filter_stat* se_local_fs; + C_fastq_file_stat* se_local_raw_stat1,*se_local_trim_stat1,*se_local_clean_stat1; + gzFile gz_fq_se; + string *se_sticky_reads1; + map se_out; + string se_new_fq1_path; + int se_buffer; + int se_bq_check; private: vector fq1s; vector trim_output_fq1; diff --git a/src/sequence.cpp b/src/sequence.cpp index 944da59..e64c9d5 100644 --- a/src/sequence.cpp +++ b/src/sequence.cpp @@ -9,21 +9,6 @@ #include "read_filter.h" using namespace::std; -mutex m2; -void C_fastq::clear(){ - seq_id=""; - sequence=""; - qual_seq=""; -} -void C_fastq::output(){ - cout<gp.total_reads_num){ - fs->output_reads_num++; - min_value=1; - return min_value; - //cout<<"total_reads_num discard"<gp.max_read_length){ fs->long_len_num++; @@ -100,22 +75,11 @@ int C_single_fastq_filter::sRNA_discard(C_filter_stat* fs,C_global_parameter& gp } int C_single_fastq_filter::se_discard(C_filter_stat* fs,C_global_parameter& gp){ int min_value=-1; - /* - if(gp.total_reads_num!=-1){ //global reads number limit - if(gp.output_reads_num>gp.total_reads_num){ - fs->output_reads_num++; - min_value=1; - return min_value; - //cout<<"total_reads_num discard"<tile_num++; min_value=1; return min_value; - //cout<<"tile discard"<in_adapter_list_num++; - min_value=1; - return min_value; - } - */ if(gp.min_read_length!=-1){ if(read.sequence.size()gp.total_reads_num){ - fs->output_reads_num++; - min_value=1; - return min_value; - //cout<<"total_reads_num discard"<tile_num++; @@ -261,13 +207,6 @@ int C_pe_fastq_filter::pe_discard(C_filter_stat* fs,C_global_parameter& gp){ return min_value; } } - /* - if(reads_result.fastq1_result.in_adapter_list){ //whether in adapter list - fs->in_adapter_list_num++; - min_value=1; - return min_value; - } - */ if(gp.min_read_length!=-1){ int v=pe_dis(fq1.sequence.size() seq_id1,vector sequence1,vector qual_seq1,vector seq_id2,vector sequence2,vector qual_seq2,C_global_parameter gp,C_global_variable& gv){ - /*vector trimmed_index; - vector filtered_index; - vector output_index; - int reads_num; - */ - v_seq_id1=seq_id1; - v_sequence1=sequence1; - v_qual_seq1=qual_seq1; - v_seq_id2=seq_id2; - v_sequence2=sequence2; - v_qual_seq2=qual_seq2; - for(int i=0;i!=seq_id1.size();i++){ - C_fastq a,b; - a.seq_id=seq_id1[i]; - a.sequence=sequence1[i]; - a.qual_seq=qual_seq1[i]; - b.seq_id=seq_id2[i]; - b.sequence=sequence2[i]; - b.qual_seq=qual_seq2[i]; - C_pe_fastq_filter pe_fastq_filter=C_pe_fastq_filter(a,b,gp); - pe_fastq_filter.pe_trim(gp); - /* - if(pe_fastq_filter.pe_discard(gv,gp)!=1){ - filtered_index.push_back(1); - }else{ - filtered_index.push_back(0); - } - */ - } -} int C_pe_fastq_filter::pe_dis(bool a,bool b){ int return_value=0; if(a) @@ -468,64 +375,4 @@ int C_pe_fastq_filter::pe_dis(bool a,bool b){ if(b) return_value+=2; return return_value; -} -void C_pe_fastqs_filter::pe_output(C_global_parameter gp,gzFile outfile1,gzFile outfile2){ - string out_content1,out_content2; - for(int i=0;i!=filtered_index.size();i++){ - out_content1+=v_seq_id1[i]+"\n"+v_sequence1[i]+"\n+\n"+v_qual_seq1[i]+"\n"; - out_content2+=v_seq_id2[i]+"\n"+v_sequence2[i]+"\n+\n"+v_qual_seq2[i]+"\n"; - - /* - if(filtered_index[i]==1){ - if(gp.whether_add_pe_info){ - v_seq_id1[i]=v_seq_id1[i]+"/1"; - v_seq_id2[i]=v_seq_id2[i]+"/2"; - } - if(!gp.base_convert.empty()){ - gp.base_convert=gp.base_convert.replace(gp.base_convert.find("TO"),2,""); - gp.base_convert=gp.base_convert.replace(gp.base_convert.find("2"),1,""); - if(gp.base_convert.size()!=2){ - cerr<<"Error:base_conver value format error"< seq_id1,vector sequence1,vector qual_seq1,vector seq_id2,vector sequence2,vector qual_seq2,C_global_parameter gp,C_global_variable& gv); - //void pe_trim(); - //void pe_discard(); - void pe_output(C_global_parameter gp,gzFile outfile1,gzFile outfile2); - vector v_seq_id1; - vector v_sequence1; - vector v_qual_seq1; - vector v_seq_id2; - vector v_sequence2; - vector v_qual_seq2; - //vector trimmed_index; - vector filtered_index; - //vector output_index; - int reads_num; -}; #endif \ No newline at end of file