-
Notifications
You must be signed in to change notification settings - Fork 0
/
varhunter.py
194 lines (155 loc) · 8.51 KB
/
varhunter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
import argparse
import os
import platform
import shutil
import sys
import tarfile
import zipfile
import requests
from loguru import logger
from scripts import files_preparation as fp
from scripts import variation_detection as vd
logger.remove()
logger.add("VarHunter.log", format="{time:YYYY-MM-DD HH:mm:ss} {message}", level="INFO")
logger.add(sys.stdout, format="{time:YYYY-MM-DD HH:mm:ss} {message}", level="INFO")
def check_os_type() -> str:
"""
Check the type of the operating system.
Returns:
str: Name of the operating system ('linux', 'mac', 'windows').
"""
os_type = platform.system().lower()
if 'linux' in os_type:
return 'linux'
elif 'darwin' in os_type:
return 'mac'
elif 'windows' in os_type:
return 'windows'
else:
raise EnvironmentError("Unsupported operating system")
def download_and_extract_prank(os_type: str, scripts_dir: str) -> str:
"""
Download and extract the PRANK software based on the operating system.
Args:
os_type (str): Operating system type.
scripts_dir (str): Path to the scripts directory.
Returns:
str: Path to the PRANK executable.
"""
if os_type == 'linux':
url = "https://github.com/ariloytynoja/prank-msa/raw/master/binaries/prank.linux64.170427.tgz"
extract_path = os.path.join(scripts_dir, 'prank')
prank_executable = os.path.join(extract_path, 'prank', 'bin', 'prank')
elif os_type == 'mac':
url = "https://github.com/ariloytynoja/prank-msa/raw/master/binaries/prank.osx64.170427.zip"
extract_path = os.path.join(scripts_dir)
prank_executable = os.path.join(extract_path, 'prank-msa', 'prank')
elif os_type == 'windows':
url = "https://github.com/ariloytynoja/prank-msa/raw/master/binaries/prank.cygwin.170427.zip"
extract_path = os.path.join(scripts_dir, 'prank.cygwin', 'prank.cygwin', 'prank')
prank_executable = os.path.join(extract_path, 'bin', 'prank.exe')
else:
raise EnvironmentError("Unsupported operating system. PRANK can't be downloaded")
if os.path.isfile(prank_executable):
logger.info(f"PRANK executable already exists at {prank_executable}. Skipping download.")
return prank_executable
os.makedirs(extract_path, exist_ok=True)
file_name = url.split('/')[-1]
file_path = os.path.join(scripts_dir, file_name)
logger.info(f"Downloading PRANK from {url}...")
response = requests.get(url)
with open(file_path, 'wb') as file:
file.write(response.content)
logger.info("Download completed.")
logger.info(f"Extracting {file_name}...")
if file_name.endswith('.tgz'):
with tarfile.open(file_path, 'r:gz') as tar:
tar.extractall(path=extract_path)
elif file_name.endswith('.zip'):
with zipfile.ZipFile(file_path, 'r') as zip_ref:
zip_ref.extractall(extract_path)
else:
raise ValueError("Unknown file format for PRANK")
os.remove(file_path)
logger.info("Extraction completed.")
if not os.path.isfile(prank_executable):
raise FileNotFoundError(f"PRANK executable not found at {prank_executable}")
if os_type == 'mac':
os.chmod(prank_executable, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
return prank_executable
def detect_phase_variations(genomes_csv_path: str,
lst_info_file_path: str,
fasta_file_path: str,
conditions: list,
division_factor: int,
processes: int) -> None:
"""
Detect phase variations in gene sequences based on specified conditions.
Args:
genomes_csv_path (str): Path to the CSV file containing genome information.
lst_info_file_path (str): Path to the LSTINFO file containing additional information.
fasta_file_path (str): Path to the FASTA file containing gene sequences.
conditions (list): List of selection conditions for filtering gene pairs.
division_factor (int): Factor to divide alignment length for segment calculation.
processes (int): Number of processes to use for parallel execution.
"""
if not os.path.isfile(genomes_csv_path):
logger.error(f"File not found: {genomes_csv_path}")
raise FileNotFoundError(f"File not found: {genomes_csv_path}")
if not os.path.isfile(lst_info_file_path):
logger.error(f"File not found: {lst_info_file_path}")
raise FileNotFoundError(f"File not found: {lst_info_file_path}")
if not os.path.isfile(fasta_file_path):
logger.error(f"File not found: {fasta_file_path}")
raise FileNotFoundError(f"File not found: {fasta_file_path}")
os_type = check_os_type()
prank_executable = download_and_extract_prank(os_type, 'scripts')
genome_info_merged = fp.process_info_files(genomes_csv_path, lst_info_file_path)
genes_df = fp.convert_fasta_to_dataframe(fasta_file_path)
input_table = fp.process_dataframes(genes_df, genome_info_merged)
quadruplets = []
for cond in conditions:
condition_rows = input_table[input_table['species'].str.contains(cond, na=False)]
fp.create_gene_pairs_parallel(condition_rows, cond, processes=processes)
input_folder = f"{cond.replace(' ', '_')}_pairs"
if not os.listdir(input_folder):
logger.info(f"Folder {input_folder} is empty. Skipping further processing.")
continue
output_folder = f"{cond.replace(' ', '_')}_pairs_aligned"
fp.align_sequences_parallel(input_folder, output_folder, processes, prank_executable)
mismatches_df = fp.calculate_mismatches_parallel(output_folder, division_factor, processes).fillna(0)
mismatches_df.to_csv(f'{cond.replace(" ", "_")}_mismatches.csv', index=True)
plots_folder = f"{cond.replace(' ', '_')}_plots"
fp.mismatch_imaging_parallel(mismatches_df, cond, division_factor, plots_folder, processes)
pre_output_df = vd.save_patterns(mismatches_df, 0, mismatches_df.shape[0], output_folder, division_factor)
output_df = vd.check_df_phva(pre_output_df, output_folder, mismatches_df, division_factor)
output_df.to_csv(f'{cond.replace(" ", "_")}_phase_variation.csv')
quadruplets = vd.extract_potential_pairs_with_phva(output_df, mismatches_df)
if quadruplets:
output_dir = f"{cond.replace(' ', '_')}_output"
os.makedirs(output_dir, exist_ok=True)
shutil.move(f'{cond.replace(" ", "_")}_phase_variation.csv', os.path.join(output_dir, f'{cond.replace(" ", "_")}_phase_variation.csv'))
aligned_dir = os.path.join(output_dir, f"{cond.replace(' ', '_')}_output_aligned")
os.makedirs(aligned_dir, exist_ok=True)
plots_dir = os.path.join(output_dir, f"{cond.replace(' ', '_')}_output_plots")
os.makedirs(plots_dir, exist_ok=True)
for files_list in quadruplets:
for file_name in files_list:
shutil.copy(os.path.join(output_folder, file_name), aligned_dir)
shutil.copy(os.path.join(plots_folder, f"{file_name}.png"), plots_dir)
logger.info("Finished phase variation detection.")
def argument_parser() -> None:
"""
Parse command-line arguments and execute the main function.
"""
parser = argparse.ArgumentParser(description='Detect phase variations in gene sequences based on specified conditions.')
parser.add_argument('--genomes_csv', '-g', type=str, help='Path to the CSV file containing genome information')
parser.add_argument('--lst_info_file', '-l', type=str, help='Path to the LSTINFO file containing additional information')
parser.add_argument('--fasta_file', '-f', type=str, help='Path to the FASTA file containing gene sequences')
parser.add_argument('--selection_condition', '-s', nargs='+', type=str, help='Selection condition(s) for filtering gene pairs')
parser.add_argument('--division_factor', '-d', type=int, default=20, help='Factor to divide alignment length for segment calculation (default: 20)')
parser.add_argument('--processes', '-p', type=int, default=1, help='Number of processes to use for parallel execution (default: 1)')
args = parser.parse_args()
detect_phase_variations(args.genomes_csv, args.lst_info_file, args.fasta_file, args.selection_condition, args.division_factor, args.processes)
if __name__ == "__main__":
argument_parser()