Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forder segfault #6111

Draft
wants to merge 11 commits into
base: master
Choose a base branch
from
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ rowwiseDT(

16. Joins of `integer64` and `double` columns succeed when the `double` column has lossless `integer64` representation, [#4167](https://github.com/Rdatatable/data.table/issues/4167) and [#6625](https://github.com/Rdatatable/data.table/issues/6625). Previously, this only worked when the double column had lossless _32-bit_ integer representation. Thanks @MichaelChirico for the reports and fix.

17. `forderv` could segfault if the number of needed recursion levels for radix sort were too high, [#4300](https://github.com/Rdatatable/data.table/issues/4300). This is a major problem since sorting is extensively used in `data.table`. Thanks @quantitative-technologies for the report and @ben-schwen for the fix.

## NOTES

1. There is a new vignette on joins! See `vignette("datatable-joins")`. Thanks to Angel Feliz for authoring it! Feedback welcome. This vignette has been highly requested since 2017: [#2181](https://github.com/Rdatatable/data.table/issues/2181).
Expand Down
73 changes: 55 additions & 18 deletions src/forder.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ static uint8_t **key = NULL;
static int *anso = NULL;
static bool notFirst=false;

typedef struct {
int from;
int to;
int radix;
} State;

static State *queue= NULL;
static int front, rear, queuesize;

static char msg[1001];
// use STOP in this file (not error()) to ensure cleanup() is called first
// snprintf to msg first in case nrow (just as an example) is provided in the message because cleanup() sets nrow to 0
Expand Down Expand Up @@ -431,7 +440,7 @@ uint64_t dtwiddle(double x) //const void *p, int i)
STOP(_("Unknown non-finite value; not NA, NaN, -Inf or +Inf")); // # nocov
}

void radix_r(const int from, const int to, const int radix);
void radix_i(int from, int to, int radix);

/*
OpenMP is used here to parallelize multiple operations that come together to
Expand Down Expand Up @@ -572,7 +581,6 @@ SEXP forder(SEXP DT, SEXP by, SEXP retGrpArg, SEXP retStatsArg, SEXP sortGroupsA
if (sortType!=1 && sortType!=-1)
STOP(_("Item %d of order (ascending/descending) is %d. Must be +1 or -1."), col+1, sortType);
}
//Rprintf(_("sortType = %d\n"), sortType);
switch(TYPEOF(x)) {
case INTSXP : case LGLSXP : // TODO skip LGL and assume range [0,1]
range_i32(INTEGER(x), nrow, &min, &max, &na_count);
Expand Down Expand Up @@ -802,7 +810,7 @@ SEXP forder(SEXP DT, SEXP by, SEXP retGrpArg, SEXP retStatsArg, SEXP sortGroupsA
}
}
if (nradix) {
radix_r(0, nrow-1, 0); // top level recursive call: (from, to, radix)
radix_i(0, nrow-1, 0);
} else {
push(&nrow, 1);
}
Expand Down Expand Up @@ -894,17 +902,46 @@ static bool sort_ugrp(uint8_t *x, const int n)
return skip;
}

void radix_r(const int from, const int to, const int radix) {
void pushState(State s) {
if (rear == queuesize) {
queuesize *= 2;
queue = (State*)realloc(queue, queuesize * sizeof(State));
if (!queue) STOP(_("Unable to reallocate queue of size=%d for iterative radix sort. Please report to data.table issue tracker."), queuesize);
}
int pos;
for (pos = rear-1; pos>=0; pos--) {
if (s.from < queue[pos].from) queue[pos+1] = queue[pos];
else break;
}
queue[pos+1] = s;
rear++;
}

State popState() {
return queue[front++];
}

void radix_i(int from, int to, int radix) {
TBEG();
front = rear = 0;
queuesize = nradix > nrow ? nradix : nrow;
queue = (State*)malloc(queuesize * sizeof(State));
if (!queue) STOP(_("Unable to allocate queue of size=%d for iterative radix sort. Please report to data.table issue tracker."), queuesize);

pushState((State){from, to, 0}); // recursive call in forder
while (rear > front) {
State current = popState();
from = current.from;
to = current.to;
radix = current.radix;
const int my_n = to-from+1;
if (my_n==1) { // minor TODO: batch up the 1's instead in caller (and that's only needed when retgrp anyway)
push(&my_n, 1);
TEND(5);
return;
continue;
}
else if (my_n<=256) {
// if nth==1
// Rprintf(_("insert clause: radix=%d, my_n=%d, from=%d, to=%d\n"), radix, my_n, from, to);
// insert sort with some twists:
// i) detects if grouped; if sortType==0 can then skip
// ii) keeps group appearance order at byte level to minimize movement
Expand All @@ -916,7 +953,7 @@ void radix_r(const int from, const int to, const int radix) {
uint8_t *restrict my_key = key[radix]+from; // safe to write as we don't use this radix again
uint8_t *o = (uint8_t *)malloc(my_n * sizeof(uint8_t));
if (!o)
STOP(_("Failed to allocate %d bytes for '%s'."), (int)(my_n * sizeof(uint8_t)), "o"); // # nocov
STOP(_("Failed to allocate %zu bytes for '%s'."), my_n * sizeof(uint8_t), "o"); // # nocov
// if last key (i.e. radix+1==nradix) there are no more keys to reorder so we could reorder osub by reference directly and save allocating and populating o just
// to use it once. However, o's type is uint8_t so many moves within this max-256 vector should be faster than many moves in osub (4 byte or 8 byte ints) [1 byte
// type is always aligned]
Expand Down Expand Up @@ -1003,7 +1040,7 @@ void radix_r(const int from, const int to, const int radix) {
// my_key is now grouped (and sorted by group too if sort!=0)
// all we have left to do is find the group sizes and either recurse or push
if (radix+1==nradix && !retgrp) {
return;
continue;
}
int ngrp=0; //minor TODO: could know number of groups with certainty up above
int *my_gs = malloc(my_n * sizeof(int));
Expand All @@ -1020,15 +1057,14 @@ void radix_r(const int from, const int to, const int radix) {
push(my_gs, ngrp);
} else {
for (int i=0, f=from; i<ngrp; i++) {
radix_r(f, f+my_gs[i]-1, radix+1);
pushState((State){f, f+my_gs[i]-1, radix+1});
f+=my_gs[i];
}
}
free(my_gs);
return;
continue;
}
else if (my_n<=UINT16_MAX) { // UINT16_MAX==65535 (important not 65536)
// if (nth==1) Rprintf(_("counting clause: radix=%d, my_n=%d\n"), radix, my_n);
uint16_t my_counts[256] = {0}; // Needs to be all-0 on entry. This ={0} initialization should be fast as it's on stack. Otherwise, we have to manage
// a stack of counts anyway since this is called recursively and these counts are needed to make the recursive calls.
// This thread-private stack alloc has no chance of false sharing and gives omp and compiler best chance.
Expand Down Expand Up @@ -1106,7 +1142,7 @@ void radix_r(const int from, const int to, const int radix) {
}

if (!retgrp && radix+1==nradix) {
return; // we're done. avoid allocating and populating very last group sizes for last key
continue; // we're done. avoid allocating and populating very last group sizes for last key
}
int *my_gs = malloc((ngrp==0 ? 256 : ngrp) * sizeof(int)); // ngrp==0 when sort and skip==true; we didn't count the non-zeros in my_counts yet in that case
if (!my_gs)
Expand All @@ -1124,12 +1160,12 @@ void radix_r(const int from, const int to, const int radix) {
} else {
// this single thread will now descend and resolve all groups, now that the groups are close in cache
for (int i=0, my_from=from; i<ngrp; i++) {
radix_r(my_from, my_from+my_gs[i]-1, radix+1);
pushState((State){my_from, my_from+my_gs[i]-1, radix+1});
my_from+=my_gs[i];
}
}
free(my_gs);
return;
continue;
}
// else parallel batches. This is called recursively but only once or maybe twice before resolving to UINT16_MAX branch above

Expand Down Expand Up @@ -1324,7 +1360,7 @@ void radix_r(const int from, const int to, const int radix) {
// each in parallel here and they're all dealt with in parallel. There is no nestedness here.
for (int i=0; i<ngrp; i++) {
int start = from + starts[ugrp[i]];
radix_r(start, start+my_gs[i]-1, radix+1);
pushState((State){start, start+my_gs[i]-1, radix+1});
flush();
}
TEND(24)
Expand All @@ -1336,7 +1372,7 @@ void radix_r(const int from, const int to, const int radix) {
#pragma omp parallel for ordered schedule(dynamic) num_threads(MIN(nth, ngrp)) // #5077
for (int i=0; i<ngrp; i++) {
int start = from + starts[ugrp[i]];
radix_r(start, start+my_gs[i]-1, radix+1);
pushState((State){start, start+my_gs[i]-1, radix+1});
#pragma omp ordered
flush();
}
Expand All @@ -1345,7 +1381,7 @@ void radix_r(const int from, const int to, const int radix) {
#pragma omp parallel for schedule(dynamic) num_threads(MIN(nth, ngrp)) // #5077
for (int i=0; i<ngrp; i++) {
int start = from + starts[ugrp[i]];
radix_r(start, start+my_gs[i]-1, radix+1);
pushState((State){start, start+my_gs[i]-1, radix+1});
}
}
TEND(25)
Expand All @@ -1356,10 +1392,11 @@ void radix_r(const int from, const int to, const int radix) {
free(starts);
free(ugrps);
free(ngrps);
}
free(queue);
TEND(26)
}


SEXP issorted(SEXP x, SEXP by)
{
// Just checks if ordered and returns FALSE early if not. Does not return ordering if so, unlike forder.
Expand Down
Loading