Skip to content

Commit

Permalink
Merge pull request #4433 from vgteam/faster-gam-gaf-gbwt
Browse files Browse the repository at this point in the history
Faster GBWT construction from GAM/GAF
  • Loading branch information
jltsiren authored Nov 6, 2024
2 parents cb82ebb + 8a12486 commit 519656f
Show file tree
Hide file tree
Showing 22 changed files with 306 additions and 439 deletions.
2 changes: 1 addition & 1 deletion deps/gcsa2
20 changes: 0 additions & 20 deletions src/gbwt_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,26 +88,6 @@ gbwt::size_type gbwt_node_width(const HandleGraph& graph) {
return gbwt::bit_length(gbwt::Node::encode(graph.max_node_id(), true));
}

void finish_gbwt_constuction(gbwt::GBWTBuilder& builder,
const std::vector<std::string>& sample_names,
const std::vector<std::string>& contig_names,
size_t haplotype_count, bool print_metadata,
const std::string& header) {

builder.finish();
builder.index.metadata.setSamples(sample_names);
builder.index.metadata.setHaplotypes(haplotype_count);
builder.index.metadata.setContigs(contig_names);
if (print_metadata) {
#pragma omp critical
{
std::cerr << header << ": ";
gbwt::operator<<(std::cerr, builder.index.metadata);
std::cerr << std::endl;
}
}
}

//------------------------------------------------------------------------------

void load_gbwt(gbwt::GBWT& index, const std::string& filename, bool show_progress) {
Expand Down
7 changes: 0 additions & 7 deletions src/gbwt_helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,6 @@ gbwt::vector_type path_predecessors(const PathHandleGraph& graph, const std::str
/// Determine the node width in bits for the GBWT nodes based on the given graph.
gbwt::size_type gbwt_node_width(const HandleGraph& graph);

/// Finish GBWT construction and optionally print the metadata.
void finish_gbwt_constuction(gbwt::GBWTBuilder& builder,
const std::vector<std::string>& sample_names,
const std::vector<std::string>& contig_names,
size_t haplotype_count, bool print_metadata,
const std::string& header = "GBWT");

//------------------------------------------------------------------------------

/*
Expand Down
156 changes: 120 additions & 36 deletions src/haplotype_indexer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@
* \file haplotype_indexer.cpp: implementations of haplotype indexing with the GBWT
*/

#include "haplotype_indexer.hpp"

#include <iostream>
#include <map>
#include <mutex>
#include <vector>
#include <string>

#include <vg/io/stream.hpp>
#include <gbwtgraph/algorithms.h>
#include <gbwtgraph/path_cover.h>

#include "gbwt_helper.hpp"

#include "haplotype_indexer.hpp"

#include "path.hpp"
#include "alignment.hpp"
#include "hash_map.hpp"
#include "path.hpp"

using namespace std;

Expand Down Expand Up @@ -394,7 +396,7 @@ std::unique_ptr<gbwt::DynamicGBWT> HaplotypeIndexer::build_gbwt(const std::vecto
// And add every path that passes the filter (including haplotype paths) from the source graph.
gbwtgraph::store_paths(builder, *graph, {PathSense::GENERIC, PathSense::REFERENCE, PathSense::HAPLOTYPE}, &path_filter);

// Finish the construction for this set of threads and put the index back.
// Finish the construction for this set of paths and put the index back.
builder.finish();
builder.swapIndex(*index);
}
Expand All @@ -413,62 +415,144 @@ std::unique_ptr<gbwt::DynamicGBWT> HaplotypeIndexer::build_gbwt(const PathHandle
return build_gbwt({}, "GBWT", &graph);
}

std::unique_ptr<gbwt::DynamicGBWT> HaplotypeIndexer::build_gbwt(const PathHandleGraph& graph,
const std::vector<std::string>& aln_filenames, const std::string& aln_format) const {
std::unique_ptr<gbwt::GBWT> HaplotypeIndexer::build_gbwt(const HandleGraph& graph,
const std::vector<std::string>& aln_filenames, const std::string& aln_format, size_t parallel_jobs) const {

// Handle multithreading and parallel jobs.
parallel_jobs = std::max<size_t>(1, parallel_jobs);
int old_threads = omp_get_max_threads();
omp_set_num_threads(parallel_jobs);

// GBWT metadata.
std::vector<std::string> sample_names, contig_names;
std::map<std::string, std::pair<size_t, size_t>> sample_info; // name -> (id, count)
contig_names.push_back("0"); // An artificial contig.
size_t haplotype_count = 0;
// Partition the graph into construction jobs.
if (this->show_progress) {
#pragma omp critical
{
std::cerr << "Partitioning the graph into GBWT construction jobs" << std::endl;
}
}
size_t target_size = graph.get_node_count() / parallel_jobs;
gbwtgraph::ConstructionJobs jobs = gbwtgraph::gbwt_construction_jobs(graph, target_size);
if (this->show_progress) {
#pragma omp critical
{
std::cerr << "Created " << jobs.size() << " parallel construction jobs" << std::endl;
}
}

// GBWT construction.
gbwt::GBWTBuilder builder(gbwt_node_width(graph), this->gbwt_buffer_size * gbwt::MILLION, this->id_interval);
builder.index.addMetadata();
std::vector<std::mutex> builder_mutexes(jobs.size());
std::vector<std::unique_ptr<gbwt::GBWTBuilder>> builders(jobs.size());
// This is a bit inefficient, as read names are often longer than the SSO threshold for GCC (but not for Clang).
// TODO: Maybe use concatenated 0-terminated names?
std::vector<std::vector<std::string>> read_names(jobs.size());
for (size_t i = 0; i < jobs.size(); i++) {
builders[i].reset(new gbwt::GBWTBuilder(gbwt_node_width(graph), this->gbwt_buffer_size * gbwt::MILLION, this->id_interval));
}

// Actual work.
if (this->show_progress) {
#pragma omp critical
{
std::cerr << "Converting " << aln_format << " to threads" << std::endl;
std::cerr << "Converting " << aln_format << " to GBWT paths" << std::endl;
}
}
std::function<void(Alignment&)> lambda = [&](Alignment& aln) {
gbwt::vector_type buffer;
for (auto& m : aln.path().mapping()) {
buffer.push_back(mapping_to_gbwt(m));
}
builder.insert(buffer, true); // Insert in both orientations.
size_t sample_id = 0, sample_count = 0;
auto iter = sample_info.find(aln.name());
if (iter == sample_info.end()) {
sample_id = sample_names.size();
sample_names.push_back(aln.name());
sample_info[aln.name()] = std::pair<size_t, size_t>(sample_id, sample_count);
haplotype_count++;
} else {
sample_id = iter->second.first;
sample_count = iter->second.second;
iter->second.second++;
size_t job_id = 0;
if (buffer.size() > 0) {
job_id = jobs.job(gbwt::Node::id(buffer.front()));
if (job_id >= jobs.size()) {
job_id = 0;
}
}
{
// Insert the path into the appropriate builder and record the read name.
std::lock_guard<std::mutex> lock(builder_mutexes[job_id]);
builders[job_id]->insert(buffer, true);
read_names[job_id].push_back(aln.name());
}
builder.index.metadata.addPath(sample_id, 0, 0, sample_count);
};
for (auto& file_name : aln_filenames) {
if (aln_format == "GAM") {
get_input_file(file_name, [&](istream& in) {
vg::io::for_each(in, lambda);
vg::io::for_each_parallel(in, lambda);
});
} else {
assert(aln_format == "GAF");
vg::io::gaf_unpaired_for_each(graph, file_name, lambda);
vg::io::gaf_unpaired_for_each_parallel(graph, file_name, lambda);
}
}

// Finish the construction and extract the index.
finish_gbwt_constuction(builder, sample_names, contig_names, haplotype_count, this->show_progress);
std::unique_ptr<gbwt::DynamicGBWT> built(new gbwt::DynamicGBWT());
builder.swapIndex(*built);
return built;

// Finish the construction and convert to compressed GBWT.
std::vector<gbwt::GBWT> partial_indexes(jobs.size());
#pragma omp parallel for schedule(dynamic, 1)
for (size_t i = 0; i < jobs.size(); i++) {
builders[i]->finish();
partial_indexes[i] = gbwt::GBWT(builders[i]->index);
builders[i].reset();
}

// Merge the partial indexes.
if (this->show_progress) {
#pragma omp critical
{
std::cerr << "Merging the partial GBWTs" << std::endl;
}
}
std::unique_ptr<gbwt::GBWT> result(new gbwt::GBWT(partial_indexes));
partial_indexes.clear();

// Create the metadata.
if (this->show_progress) {
#pragma omp critical
{
std::cerr << "Creating metadata" << std::endl;
}
}
result->addMetadata();
result->metadata.setContigs({ "unknown" });
{
// We can use 32-bit values, as GBWT metadata uses them as well.
string_hash_map<std::string, std::pair<std::uint32_t, std::uint32_t>> read_info; // name -> (sample id, fragment count)
for (auto& names : read_names) {
for (const std::string& name : names) {
std::uint32_t sample_id = 0, fragment_count = 0;
auto iter = read_info.find(name);
if (iter == read_info.end()) {
sample_id = read_info.size();
read_info[name] = std::make_pair(sample_id, fragment_count);
} else {
sample_id = iter->second.first;
fragment_count = iter->second.second;
iter->second.second++;
}
result->metadata.addPath(sample_id, 0, 0, fragment_count);
}
names = std::vector<std::string>();
}
std::vector<std::string> sample_names(read_info.size());
for (auto& p : read_info) {
sample_names[p.second.first] = p.first;
}
read_info = string_hash_map<std::string, std::pair<std::uint32_t, std::uint32_t>>();
result->metadata.setSamples(sample_names);
result->metadata.setHaplotypes(sample_names.size());
}
if (this->show_progress) {
#pragma omp critical
{
std::cerr << "GBWT: ";
gbwt::operator<<(std::cerr, result->metadata);
std::cerr << std::endl;
}
}

// Restore the number of threads.
omp_set_num_threads(old_threads);
return result;
}

}
11 changes: 8 additions & 3 deletions src/haplotype_indexer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,15 @@ class HaplotypeIndexer : public Progressive {
* the same name, the corresponding GBWT path names will have the same
* sample identifier but different values in the count field.
*
* aln_format can be "GAM" or "GAF"
* aln_format can be "GAM" or "GAF".
*
* Runs approximately the given number of jobs in parallel. The exact
* number depends on the sizes of weakly connected components in the graph.
* Each job uses at most 2 threads.
*/
std::unique_ptr<gbwt::DynamicGBWT> build_gbwt(const PathHandleGraph& graph,
const std::vector<std::string>& aln_filenames, const std::string& aln_format) const;
std::unique_ptr<gbwt::GBWT> build_gbwt(const HandleGraph& graph,
const std::vector<std::string>& aln_filenames, const std::string& aln_format,
size_t parallel_jobs = 1) const;
};

}
Expand Down
Loading

1 comment on commit 519656f

@adamnovak
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vg CI tests complete for merge to master. View the full report here.

16 tests passed, 0 tests failed and 0 tests skipped in 17482 seconds

Please sign in to comment.