Skip to content

Commit

Permalink
Manually allocate thread-local vectors to protect against false sharing.
Browse files Browse the repository at this point in the history
This is more memory-efficient than tatami_stats::LocalOutputBuffer as we don't
create any allocations in the t = 0 case, and we avoid storing unnecessary NULL
pointers in the t > 0 case. More efficiency is important as there might be an
arbitrarily large number of groups and we don't want memory usage to be
dominated by the parallization overhead.

Incidentally, this allows us to remove the tatami_stats dependency.
  • Loading branch information
LTLA committed Jan 12, 2025
1 parent 45727bd commit 5bc659f
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 38 deletions.
3 changes: 1 addition & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ if(SCRAN_AGGREGATE_FETCH_EXTERN)
add_subdirectory(extern)
else()
find_package(tatami_tatami 3.0.0 CONFIG REQUIRED)
find_package(tatami_tatami_stats 1.1.0 CONFIG REQUIRED)
endif()

target_link_libraries(scran_aggregate INTERFACE tatami::tatami tatami::tatami_stats)
target_link_libraries(scran_aggregate INTERFACE tatami::tatami)

# Tests
if(CMAKE_PROJECT_NAME STREQUAL PROJECT_NAME)
Expand Down
1 change: 0 additions & 1 deletion cmake/Config.cmake.in
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,5 @@

include(CMakeFindDependencyMacro)
find_dependency(tatami_tatami 3.0.0 CONFIG REQUIRED)
find_dependency(tatami_tatami_stats 1.1.0 CONFIG REQUIRED)

include("${CMAKE_CURRENT_LIST_DIR}/libscran_scran_aggregateTargets.cmake")
7 changes: 0 additions & 7 deletions extern/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,4 @@ FetchContent_Declare(
GIT_TAG master # ^3.0.0
)

FetchContent_Declare(
tatami_stats
GIT_REPOSITORY https://github.com/tatami-inc/tatami_stats
GIT_TAG master # ^1.1.0
)

FetchContent_MakeAvailable(tatami)
FetchContent_MakeAvailable(tatami_stats)
73 changes: 45 additions & 28 deletions include/scran_aggregate/aggregate_across_cells.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include <vector>

#include "tatami/tatami.hpp"
#include "tatami_stats/tatami_stats.hpp"

/**
* @file aggregate_across_cells.hpp
Expand Down Expand Up @@ -175,23 +174,37 @@ void compute_aggregate_by_column(
tatami::Options opt;
opt.sparse_ordered_index = false;

tatami::parallelize([&](size_t t, Index_ s, Index_ l) {
tatami::parallelize([&](size_t t, Index_ start, Index_ length) {
auto NC = p.ncol();
auto ext = tatami::consecutive_extractor<sparse_>(&p, false, static_cast<Index_>(0), NC, s, l, opt);
std::vector<Data_> vbuffer(l);
typename std::conditional<sparse_, std::vector<Index_>, Index_>::type ibuffer(l);

auto ext = tatami::consecutive_extractor<sparse_>(&p, false, static_cast<Index_>(0), NC, start, length, opt);
std::vector<Data_> vbuffer(length);
typename std::conditional<sparse_, std::vector<Index_>, Index_>::type ibuffer(length);

// Creating local buffers to protect against false sharing in all but
// the first thread. The first thread has the honor of writing directly
// to the output buffers, to avoid extra allocations in the serial case
// where no false sharing can occur.
std::vector<std::vector<Sum_> > local_sums;
std::vector<std::vector<Detected_> > local_detected;
size_t num_sums = buffers.sums.size();
std::vector<tatami_stats::LocalOutputBuffer<Sum_> > local_sums;
local_sums.reserve(num_sums);
for (auto ptr : buffers.sums) {
local_sums.emplace_back(t, s, l, ptr);
}
size_t num_detected = buffers.detected.size();
std::vector<tatami_stats::LocalOutputBuffer<Detected_> > local_detected;
local_detected.reserve(num_detected);
for (auto ptr : buffers.detected) {
local_detected.emplace_back(t, s, l, ptr);
if (t != 0) {
local_sums.reserve(num_sums);
for (size_t s = 0; s < num_sums; ++s) {
local_sums.emplace_back(length);
}
local_detected.reserve(num_detected);
for (size_t d = 0; d < num_detected; ++d) {
local_detected.emplace_back(length);
}
} else {
// Need to zero it in the first thread for consistency with the other threads.
for (size_t s = 0; s < num_sums; ++s) {
std::fill_n(buffers.sums[s] + start, length, static_cast<Sum_>(0));
}
for (size_t d = 0; d < num_sums; ++d) {
std::fill_n(buffers.detected[d] + start, length, static_cast<Detected_>(0));
}
}

for (Index_ x = 0; x < NC; ++x) {
Expand All @@ -200,42 +213,46 @@ void compute_aggregate_by_column(
if constexpr(sparse_) {
auto col = ext->fetch(vbuffer.data(), ibuffer.data());
if (num_sums) {
auto cursum = local_sums[current].data();
auto cursum = (t != 0 ? local_sums[current].data() : buffers.sums[current] + start);
for (Index_ i = 0; i < col.number; ++i) {
cursum[col.index[i] - s] += col.value[i];
cursum[col.index[i] - start] += col.value[i];
}
}

if (num_detected) {
auto curdetected = local_detected[current].data();
auto curdetected = (t != 0 ? local_detected[current].data() : buffers.detected[current] + start);
for (Index_ i = 0; i < col.number; ++i) {
curdetected[col.index[i] - s] += (col.value[i] > 0);
curdetected[col.index[i] - start] += (col.value[i] > 0);
}
}

} else {
auto col = ext->fetch(vbuffer.data());
if (num_sums) {
auto cursum = local_sums[current].data();
for (Index_ i = 0; i < l; ++i) {
auto cursum = (t != 0 ? local_sums[current].data() : buffers.sums[current] + start);
for (Index_ i = 0; i < length; ++i) {
cursum[i] += col[i];
}
}

if (num_detected) {
auto curdetected = local_detected[current].data();
for (Index_ i = 0; i < l; ++i) {
auto curdetected = (t != 0 ? local_detected[current].data() : buffers.detected[current] + start);
for (Index_ i = 0; i < length; ++i) {
curdetected[i] += (col[i] > 0);
}
}
}
}

for (auto& lsums : local_sums) {
lsums.transfer();
}
for (auto& ldetected : local_detected) {
ldetected.transfer();
if (t != 0) {
for (size_t s = 0; s < num_sums; ++s) {
const auto& current = local_sums[s];
std::copy(current.begin(), current.end(), buffers.sums[s] + start);
}
for (size_t d = 0; d < num_detected; ++d) {
const auto& current = local_detected[d];
std::copy(current.begin(), current.end(), buffers.detected[d] + start);
}
}
}, p.nrow(), options.num_threads);
}
Expand Down

0 comments on commit 5bc659f

Please sign in to comment.