Skip to content

Commit

Permalink
performance optimizations in RowDiff transform (#252)
Browse files Browse the repository at this point in the history
  • Loading branch information
karasikov authored Feb 10, 2021
1 parent bde5113 commit 84c3c42
Showing 1 changed file with 92 additions and 43 deletions.
135 changes: 92 additions & 43 deletions metagraph/src/annotation/row_diff_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,56 @@ using CallOnes = std::function<void(const bit_vector &source_col,
const uint64_t *pred_begin,
const uint64_t *pred_end)>;

void read_next_block(sdsl::int_vector_buffer<>::iterator *succ_it_p,
sdsl::int_vector_buffer<1>::iterator *pred_boundary_it_p,
sdsl::int_vector_buffer<>::iterator *pred_it_p,
uint64_t block_size,
std::array<std::vector<uint64_t>, 3> *out) {
auto &succ_it = *succ_it_p;
auto &pred_boundary_it = *pred_boundary_it_p;
auto &pred_it = *pred_it_p;

std::vector<uint64_t> &succ_chunk = out->at(0);
std::vector<uint64_t> &pred_chunk_idx = out->at(1);
std::vector<uint64_t> &pred_chunk = out->at(2);

#pragma omp parallel sections num_threads(2)
{
#pragma omp section
{
succ_chunk.resize(block_size);
for (uint64_t i = 0; i < block_size; ++i, ++succ_it) {
succ_chunk[i] = *succ_it;
}
}

#pragma omp section
{
// read predecessor offsets
pred_chunk_idx.resize(block_size + 1);
pred_chunk_idx[0] = 0;
for (uint64_t i = 1; i <= block_size; ++i) {
// find where the last predecessor for the node ends
pred_chunk_idx[i] = pred_chunk_idx[i - 1];
while (*pred_boundary_it == 0) {
++pred_chunk_idx[i];
++pred_boundary_it;
}
++pred_boundary_it;
}

// read all predecessors for the block
pred_chunk.resize(pred_chunk_idx.back());
for (uint64_t i = 0; i < pred_chunk.size(); ++i, ++pred_it) {
pred_chunk[i] = *pred_it;
}
}
}
}

/**
* Traverses a group of column compressed annotations (loaded in memory) in chunks of
* BLOCK_SIZE rows at a time and invokes #call_ones for each set bit.
* @param log_header label to be displayed in the progress bar
* @param num_rows number of rows in the annotation
* @param pred_succ_fprefix prefix for the pred/succ files containg the predecessors and
* the successor for each node
Expand All @@ -179,7 +225,6 @@ using CallOnes = std::function<void(const bit_vector &source_col,
* @param after_chunk callback to invoke after a chunk is traversed
*/
void traverse_anno_chunked(
const std::string &log_header,
uint64_t num_rows,
const std::string &pred_succ_fprefix,
const std::vector<annot::ColumnCompressed<>> &col_annotations,
Expand All @@ -191,56 +236,51 @@ void traverse_anno_chunked(

const uint32_t num_threads = get_num_threads();

sdsl::int_vector_buffer succ(pred_succ_fprefix + ".succ", std::ios::in, 1024 * 1024);
sdsl::int_vector_buffer pred(pred_succ_fprefix + ".pred", std::ios::in, 1024 * 1024);
sdsl::int_vector_buffer succ(pred_succ_fprefix + ".succ", std::ios::in, BLOCK_SIZE);
sdsl::int_vector_buffer pred(pred_succ_fprefix + ".pred", std::ios::in, BLOCK_SIZE);
sdsl::int_vector_buffer<1> pred_boundary(pred_succ_fprefix + ".pred_boundary",
std::ios::in, 1024 * 1024);
std::ios::in, BLOCK_SIZE);

assert(succ.size() == num_rows);
assert(static_cast<uint64_t>(std::count(pred_boundary.begin(), pred_boundary.end(), 0))
== pred.size());

std::vector<uint64_t> succ_chunk;
std::vector<uint64_t> pred_chunk;
std::vector<uint64_t> pred_chunk_idx;

auto succ_it = succ.begin();
auto pred_boundary_it = pred_boundary.begin();
auto pred_it = pred.begin();

ProgressBar progress_bar(num_rows, log_header, std::cerr, !common::get_verbose());
ThreadPool async_reader(1, 1);
// start reading the first block
uint64_t next_block_size = std::min(BLOCK_SIZE, num_rows);
std::array<std::vector<uint64_t>, 3> context;
std::array<std::vector<uint64_t>, 3> context_other;
async_reader.enqueue(read_next_block,
&succ_it, &pred_boundary_it, &pred_it, next_block_size,
&context_other);

ProgressBar progress_bar(num_rows, "Compute diffs", std::cerr, !common::get_verbose());

for (uint64_t chunk = 0; chunk < num_rows; chunk += BLOCK_SIZE) {
uint64_t block_size = std::min(BLOCK_SIZE, num_rows - chunk);
uint64_t block_size = next_block_size;
next_block_size = std::min(BLOCK_SIZE, num_rows - (chunk + block_size));

before_chunk(block_size);

succ_chunk.resize(block_size);
for (uint64_t i = 0; i < block_size; ++i) {
succ_chunk[i] = succ[chunk + i];
}
// finish reading this block
async_reader.join();
context.swap(context_other);
std::vector<uint64_t> &succ_chunk = context[0];
std::vector<uint64_t> &pred_chunk_idx = context[1];
std::vector<uint64_t> &pred_chunk = context[2];

// read predecessor offsets
pred_chunk_idx.resize(block_size + 1);
pred_chunk_idx[0] = 0;
for (uint64_t i = 1; i <= block_size; ++i) {
// find where the last predecessor for the node ends
pred_chunk_idx[i] = pred_chunk_idx[i - 1];
while (*pred_boundary_it == 0) {
++pred_chunk_idx[i];
++pred_boundary_it;
}
++pred_boundary_it;
}

// read all predecessors for the block
pred_chunk.resize(pred_chunk_idx.back());
for (uint64_t i = 0; i < pred_chunk.size(); ++i) {
pred_chunk[i] = *pred_it;
++pred_it;
}
// start reading next block
async_reader.enqueue(read_next_block,
&succ_it, &pred_boundary_it, &pred_it, next_block_size,
&context_other);

assert(succ_chunk.size() == block_size);
assert(pred_chunk.size() == pred_chunk_idx.back());

// process the current block
#pragma omp parallel for num_threads(num_threads) schedule(dynamic)
for (size_t l_idx = 0; l_idx < col_annotations.size(); ++l_idx) {
for (size_t j = 0; j < col_annotations[l_idx].num_labels(); ++j) {
Expand All @@ -256,11 +296,17 @@ void traverse_anno_chunked(
);
}
}

after_chunk(chunk);

progress_bar += succ_chunk.size();
}
assert(pred_boundary_it == pred_boundary.end());

if (succ_it != succ.end() || pred_it != pred.end()
|| pred_boundary_it != pred_boundary.end()) {
logger->error("Buffers were not read to the end, they might be corrupted");
exit(1);
}
}


Expand Down Expand Up @@ -363,9 +409,11 @@ void convert_batch_to_row_diff(const std::string &pred_succ_fprefix,

// total number of set bits in the original rows
std::vector<uint32_t> row_nbits_block;
// buffer for writing previous block while populating next row_nbits_block
std::vector<uint32_t> row_nbits_block_other;

traverse_anno_chunked(
"Compute diffs", anchor.size(), pred_succ_fprefix, sources,
anchor.size(), pred_succ_fprefix, sources,
[&](uint64_t chunk_size) {
row_nbits_block.assign(chunk_size, 0);
},
Expand Down Expand Up @@ -408,15 +456,16 @@ void convert_batch_to_row_diff(const std::string &pred_succ_fprefix,
return;

__atomic_thread_fence(__ATOMIC_ACQUIRE);
std::vector<uint32_t> nbits_block;
nbits_block.swap(row_nbits_block);

async_writer.enqueue([&,block_begin,to_write{std::move(nbits_block)}]() {
for (size_t i = 0; i < to_write.size(); ++i) {
async_writer.join();
row_nbits_block_other.swap(row_nbits_block);

async_writer.enqueue([&,block_begin]() {
for (size_t i = 0; i < row_nbits_block_other.size(); ++i) {
if (new_reduction_vector) {
row_reduction.push_back(to_write[i]);
row_reduction.push_back(row_nbits_block_other[i]);
} else {
row_reduction[block_begin + i] += to_write[i];
row_reduction[block_begin + i] += row_nbits_block_other[i];
}
}
});
Expand Down

0 comments on commit 84c3c42

Please sign in to comment.