Skip to content

Commit

Permalink
Multithread branch sequence reconstruction
Browse files Browse the repository at this point in the history
  • Loading branch information
nickjcroucher committed Apr 22, 2024
1 parent 5d97efb commit abb28ac
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 93 deletions.
138 changes: 70 additions & 68 deletions src/Newickform.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,38 @@

#define STR_OUT "out"

// Function to be executed by each thread
void* print_nodes(void* arg) {
ThreadArgs* args = (ThreadArgs*)arg;

int start = (args->thread_id) * (args->num_nodes / args->num_threads);
int end = (args->thread_id + 1) * (args->num_nodes / args->num_threads);

// Adjust the end index for the last thread to cover all remaining nodes
if (args->thread_id == args->num_threads - 1) {
end = args->num_nodes;
}

// Print the contents of the jobNodeArray for this thread's portion
for (int i = start; i < end; ++i) {
newick_node* node = args->jobNodeArray[i];
printf("Thread %d: Node %d: %s\n", args->thread_id, i, node->taxon);
// Define thread function
void* threadFunction(void* arg) {

// Extract thread data
struct ThreadData* data = (struct ThreadData*)arg;

if (data->num_nodes_to_process > -1)
{
for (int node_index = data->start_node; node_index < data->start_node+data->num_nodes_to_process; ++node_index)
{
// Generate branch sequences and identify recombinations
generate_branch_sequences(data->nodes[node_index],
data->vcf_file_pointer,
data->snp_locations,
data->number_of_snps,
data->column_names,
data->number_of_columns,
data->length_of_original_genome,
data->block_file_pointer,
data->gff_file_pointer,
data->min_snps,
data->branch_snps_file_pointer,
data->window_min,
data->window_max,
data->uncorrected_p_value,
data->trimming_ratio,
data->extensive_search_flag,
node_index);
}
}

// Exit thread
pthread_exit(NULL);
}

Expand Down Expand Up @@ -237,22 +251,11 @@ newick_node* build_newick_tree(char * filename, FILE *vcf_file_pointer,int * snp
// Initiate multithreading
int num_threads = 4; // Adjust as needed
pthread_t threads[num_threads];
ThreadArgs thread_args[num_threads];
struct ThreadData threadData[num_threads];

// First carry gaps up tree
carry_unambiguous_gaps_up_tree(root);

// Allocate memory to store all sequences
// N.B. this can be reduced once memory management improves
int num_stored_nodes = num_nodes;
char ** node_sequences = calloc((number_of_snps + 1) * num_stored_nodes, sizeof(char*));
char ** node_names = calloc(MAX_SAMPLE_NAME_SIZE*num_stored_nodes,sizeof(char));

for (int seq_store_index = 0; seq_store_index < num_stored_nodes; ++seq_store_index)
{
node_names[seq_store_index] = " ";
}

// iterate through depths and identify batches of analyses to be run
for (int depth = 0; depth <= max_depth; ++depth)
{
Expand All @@ -262,55 +265,54 @@ newick_node* build_newick_tree(char * filename, FILE *vcf_file_pointer,int * snp
newick_node** jobNodeArray = malloc(num_jobs * sizeof(newick_node*));
get_job_nodes(jobNodeArray,nodeArray,node_depths,depth,num_nodes);

// Divide jobNodeArray among threads
int numJobsPerThread = num_jobs / num_threads;
int remainder = num_jobs % num_threads;

// Create and execute threads
for (int i = 0; i < num_threads; ++i) {
thread_args[i].thread_id = i;
thread_args[i].num_threads = num_threads;
thread_args[i].num_nodes = num_jobs;
thread_args[i].jobNodeArray = jobNodeArray;

int rc = pthread_create(&threads[i], NULL, print_nodes, (void*)&thread_args[i]);
if (rc) {
printf("Error: Unable to create thread %d\n", i);
exit(-1);
}
}

// Join threads
for (int i = 0; i < num_threads; ++i) {
pthread_join(threads[i], NULL);
}

for (int node_index = 0; node_index < num_jobs; ++node_index)
{
// Generate branch sequences and identify recombinations
generate_branch_sequences(jobNodeArray[node_index],
node_sequences,
node_names,
vcf_file_pointer,
snp_locations,
number_of_snps,
column_names,
number_of_columns,
length_of_original_genome,
num_stored_nodes,
block_file_pointer,
gff_file_pointer,
min_snps,
branch_snps_file_pointer,
window_min,
window_max,
uncorrected_p_value,
trimming_ratio,
extensive_search_flag);
// Calculate start and end indices for current thread
int startIndex = i * numJobsPerThread + (i < remainder ? i : remainder);
int endIndex = startIndex + numJobsPerThread + (i < remainder ? 1 : 0) - 1;

// Set thread data
threadData[i].nodes = jobNodeArray;
threadData[i].start_node = startIndex;
threadData[i].num_nodes_to_process = endIndex - startIndex + 1; // Number of nodes for this thread
threadData[i].vcf_file_pointer = vcf_file_pointer;
threadData[i].snp_locations = snp_locations;
threadData[i].number_of_snps = number_of_snps;
threadData[i].column_names = column_names;
threadData[i].number_of_columns = number_of_columns;
threadData[i].length_of_original_genome = length_of_original_genome;
threadData[i].block_file_pointer = block_file_pointer;
threadData[i].gff_file_pointer = gff_file_pointer;
threadData[i].min_snps = min_snps;
threadData[i].branch_snps_file_pointer = branch_snps_file_pointer;
threadData[i].window_min = window_min;
threadData[i].window_max = window_max;
threadData[i].uncorrected_p_value = uncorrected_p_value;
threadData[i].trimming_ratio = trimming_ratio;
threadData[i].extensive_search_flag = extensive_search_flag;
threadData[i].thread_index = i;

// Create thread
if (pthread_create(&threads[i], NULL, threadFunction, (void*)&threadData[i]) != 0) {
perror("pthread_create");
exit(EXIT_FAILURE);
}

// Join threads
for (int i = 0; i < num_threads; ++i) {
pthread_join(threads[i], NULL);
}
}

}

free(nodeArray);
free(node_depths);
free(node_sequences);
free(node_names);

int * parent_recombinations = NULL;
fill_in_recombinations_with_gaps(root, parent_recombinations, 0, 0,0,root->block_coordinates,length_of_original_genome,snp_locations,number_of_snps);
Expand Down
32 changes: 26 additions & 6 deletions src/Newickform.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,31 @@ typedef struct newick_node
} newick_node;

// Define the structure to hold thread arguments
typedef struct {
int thread_id;
int num_threads;
int num_nodes;
newick_node** jobNodeArray;
} ThreadArgs;
struct ThreadData {
newick_node** nodes; // Nodes to be processed by all threads
int start_node; // Index of starting node for this thread
int num_nodes_to_process; // Number of nodes to process by this thread
char** node_sequences; // Pointer to the array of node sequences
char** node_names; // Pointer to the array of node names
FILE* vcf_file_pointer; // Pointer to the VCF file
int* snp_locations; // Pointer to the array of SNP locations
int number_of_snps; // Number of SNPs
char** column_names; // Pointer to the array of column names
int number_of_columns; // Number of columns
int length_of_original_genome; // Length of the original genome
int num_stored_nodes; // Number of stored nodes
FILE* block_file_pointer; // Pointer to the block file
FILE* gff_file_pointer; // Pointer to the GFF file
int min_snps; // Minimum number of SNPs
FILE* branch_snps_file_pointer; // Pointer to the branch SNPs file
int window_min; // Minimum window size
int window_max; // Maximum window size
float uncorrected_p_value; // Uncorrected p-value
float trimming_ratio; // Trimming ratio
int extensive_search_flag; // Extensive search flag
int thread_index; // Thread index
};


#define MAX_FILENAME_SIZE 1024

Expand All @@ -64,6 +83,7 @@ char* strip_quotes(char *taxon);
#else
extern newick_node* parseTree(char *str);
extern newick_node* build_newick_tree(char * filename, FILE *vcf_file_pointer,int * snp_locations, int number_of_snps, char** column_names, int number_of_columns, int length_of_original_genome,int min_snps, int window_min, int window_max, float uncorrected_p_value, float trimming_ratio, int extensive_search_flag);
void* threadFunction(void* arg);
extern void print_tree(newick_node *root, FILE * outputfile);
void fill_nodeArray(newick_node *root, newick_node** nodeArray);
int count_tree_nodes(newick_node* root);
Expand Down
28 changes: 10 additions & 18 deletions src/branch_sequences.c
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ void carry_unambiguous_gaps_up_tree(newick_node *root)
}
}

void generate_branch_sequences(newick_node *node, char ** node_sequences, char ** node_names, FILE *vcf_file_pointer,int * snp_locations, int number_of_snps, char** column_names, int number_of_columns, int length_of_original_genome, int num_stored_nodes, FILE * block_file_pointer, FILE * gff_file_pointer,int min_snps,FILE * branch_snps_file_pointer, int window_min, int window_max, float uncorrected_p_value, float trimming_ratio, int extensive_search_flag)
void generate_branch_sequences(newick_node *node, FILE *vcf_file_pointer,int * snp_locations, int number_of_snps, char** column_names, int number_of_columns, int length_of_original_genome, FILE * block_file_pointer, FILE * gff_file_pointer,int min_snps,FILE * branch_snps_file_pointer, int window_min, int window_max, float uncorrected_p_value, float trimming_ratio, int extensive_search_flag, int thread_index)
{
newick_child *child;
int child_counter = 0;
Expand All @@ -315,7 +315,7 @@ void generate_branch_sequences(newick_node *node, char ** node_sequences, char *
if (node->childNum == 0)
{

get_sequence_for_sample_name(node_sequence, node->taxon);
get_sequence_for_sample_name(node_sequence, node->taxon); // Get rid?

node->taxon_names = (char *) calloc(MAX_SAMPLE_NAME_SIZE,sizeof(char));
memcpy(node->taxon_names, node->taxon, size_of_string(node->taxon)+1);
Expand All @@ -328,7 +328,12 @@ void generate_branch_sequences(newick_node *node, char ** node_sequences, char *
else
{
child = node->child;
char * child_sequences[node->childNum];
char ** child_sequences = calloc((number_of_snps + 1) * node->childNum, sizeof(char*));
// Allocate memory for each string in child_sequences
for (int i = 0; i < node->childNum; i++) {
child_sequences[i] = malloc((number_of_snps + 1) * sizeof(char)); // Assuming MAX_STRING_LENGTH is the maximum length of the string
}

newick_node * child_nodes[node->childNum];
node->taxon_names = (char *) calloc(MAX_SAMPLE_NAME_SIZE*number_of_columns,sizeof(char));

Expand All @@ -337,13 +342,7 @@ void generate_branch_sequences(newick_node *node, char ** node_sequences, char *
while (child != NULL)
{
// Retrieve child sequences from store
for (int seq_store_index = 0; seq_store_index < num_stored_nodes; ++seq_store_index)
{
if (node_names[seq_store_index] == child->node->taxon) {
child_sequences[child_counter] = node_sequences[seq_store_index];
break;
}
}
get_sequence_for_sample_name(child_sequences[child_counter], child->node->taxon);
child_nodes[child_counter] = child->node;

// Remove from store as cannot be children of any other nodes
Expand Down Expand Up @@ -408,14 +407,7 @@ void generate_branch_sequences(newick_node *node, char ** node_sequences, char *
}

// Store node sequence
for (int seq_store_index = 0; seq_store_index < num_stored_nodes; ++seq_store_index)
{
if (strcmp(node_names[seq_store_index]," ") == 0) {
node_names[seq_store_index] = node->taxon;
node_sequences[seq_store_index] = node_sequence;
break;
}
}
free(node_sequence);

}

Expand Down
2 changes: 1 addition & 1 deletion src/branch_sequences.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#define _BRANCH_SEQUENCES_H_
#include "seqUtil.h"
#include "Newickform.h"
void generate_branch_sequences(newick_node *root, char ** node_sequences, char ** node_names, FILE *vcf_file_pointer,int * snp_locations, int number_of_snps, char** column_names, int number_of_columns, int length_of_original_genome, int num_stored_nodes, FILE * block_file_pointer, FILE * gff_file_pointer,int min_snps, FILE * branch_snps_file_pointer, int window_min, int window_max, float uncorrected_p_value, float trimming_ratio, int extensive_search_flag);
void generate_branch_sequences(newick_node *root, FILE *vcf_file_pointer,int * snp_locations, int number_of_snps, char** column_names, int number_of_columns, int length_of_original_genome, FILE * block_file_pointer, FILE * gff_file_pointer,int min_snps, FILE * branch_snps_file_pointer, int window_min, int window_max, float uncorrected_p_value, float trimming_ratio, int extensive_search_flag, int thread_index);
void identify_recombinations(int number_of_branch_snps, int * branches_snp_sites,int length_of_original_genome);
double calculate_snp_density(int * branches_snp_sites, int number_of_branch_snps, int index);
void get_likelihood_for_windows(char * child_sequence, int current_num_snps, int * snp_site_coords, int branch_genome_size, int number_of_branch_snps, int * snp_locations, newick_node * current_node, FILE * block_file_pointer, newick_node *root, char * branch_snp_sequence, FILE * gff_file_pointer,int min_snps, int length_of_original_genome, char * original_sequence,int window_min, int window_max, float uncorrected_p_value, float trimming_ratio, int extensive_search_flag);
Expand Down

0 comments on commit abb28ac

Please sign in to comment.