From fcc10d73a20837d0f1ad3278ee9168473afa5ff1 Mon Sep 17 00:00:00 2001 From: Philippe Chataignon Date: Fri, 23 Aug 2024 00:15:14 +0200 Subject: [PATCH 01/42] fwrite with correct file length * gzip length and crc are manually computed in each thread and then added/combined * gzip header is minimal * remove some old debug code --- src/fwrite.c | 272 +++++++++++++++++++++++++++------------------------ 1 file changed, 145 insertions(+), 127 deletions(-) diff --git a/src/fwrite.c b/src/fwrite.c index cd9ef98ee..ebc13548e 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -560,14 +560,12 @@ void writeCategString(const void *col, int64_t row, char **pch) #ifndef NOZLIB int init_stream(z_stream *stream) { - memset(stream, 0, sizeof(z_stream)); // shouldn't be needed, done as part of #4099 to be sure stream->next_in = Z_NULL; stream->zalloc = Z_NULL; stream->zfree = Z_NULL; stream->opaque = Z_NULL; - // 31 comes from : windows bits 15 | 16 gzip format - int err = deflateInit2(stream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 31, 8, Z_DEFAULT_STRATEGY); + int err = deflateInit2(stream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY); return err; // # nocov } @@ -577,16 +575,14 @@ int compressbuff(z_stream *stream, void* dest, size_t *destLen, const void* sour stream->avail_out = *destLen; stream->next_in = (Bytef *)source; // don't use z_const anywhere; #3939 stream->avail_in = sourceLen; - int err = deflate(stream, Z_FINISH); - if (err == Z_OK) { - // with Z_FINISH, deflate must return Z_STREAM_END if correct, otherwise it's an error and we shouldn't return Z_OK (0) - err = -9; // # nocov - } - *destLen = stream->total_out; - return err == Z_STREAM_END ? Z_OK : err; + int err = deflate(stream, Z_SYNC_FLUSH); + *destLen = *destLen - stream->avail_out; + // *destLen = stream->total_out; + return err != Z_STREAM_ERROR ? Z_OK : err; } #endif +// main fwrite function ---- void fwriteMain(fwriteMainArgs args) { double startTime = wallclock(); @@ -602,6 +598,9 @@ void fwriteMain(fwriteMainArgs args) int8_t quoteHeaders = args.doQuote; verbose = args.verbose; + size_t len; + unsigned int crc; + // When NA is a non-empty string, then we must quote all string fields in case they contain the na string // na is recommended to be empty, though if (na[0]!='\0' && doQuote==INT8_MIN) doQuote = true; @@ -744,10 +743,10 @@ void fwriteMain(fwriteMainArgs args) DTPRINT("%s", buff); free(buff); } else { - int ret1=0, ret2=0; + int ret0=0, ret1=0, ret2=0; if (args.is_gzip) { #ifndef NOZLIB - z_stream stream = {0}; + z_stream stream; if(init_stream(&stream)) { free(buff); // # nocov STOP(_("Can't allocate gzip stream structure")); // # nocov @@ -760,22 +759,30 @@ void fwriteMain(fwriteMainArgs args) free(buff); // # nocov STOP(_("Unable to allocate %zu MiB for zbuffer: %s"), zbuffSize / 1024 / 1024, strerror(errno)); // # nocov } + // write minimal gzip header + char* header = "\037\213\10\0\0\0\0\0\0\3"; + ret0 = WRITE(f, header, 10); + crc = crc32(0L, Z_NULL, 0); + size_t zbuffUsed = zbuffSize; - ret1 = compressbuff(&stream, zbuff, &zbuffUsed, buff, (size_t)(ch-buff)); - if (ret1==Z_OK) ret2 = WRITE(f, zbuff, (int)zbuffUsed); - deflateEnd(&stream); + len = (size_t)(ch - buff); + crc = crc32(crc, buff, len); + ret1 = compressbuff(&stream, zbuff, &zbuffUsed, buff, len); + if (ret1==Z_OK) + ret2 = WRITE(f, zbuff, (int)zbuffUsed); free(zbuff); #endif } else { ret2 = WRITE(f, buff, (int)(ch-buff)); } free(buff); - if (ret1 || ret2==-1) { + if (ret0 == -1 || ret1 || ret2==-1) { // # nocov start int errwrite = errno; // capture write errno now in case close fails with a different errno CLOSE(f); - if (ret1) STOP(_("Compress gzip error: %d"), ret1); - else STOP(_("%s: '%s'"), strerror(errwrite), args.filename); + if (ret0 == -1) STOP(_("Can't write gzip header error: %d"), ret0); + else if (ret1) STOP(_("Compress gzip error: %d"), ret1); + else STOP(_("%s: '%s'"), strerror(errwrite), args.filename); // # nocov end } } @@ -792,13 +799,20 @@ void fwriteMain(fwriteMainArgs args) // Decide buffer size and rowsPerBatch for each thread // Once rowsPerBatch is decided it can't be changed int rowsPerBatch=0; - if (maxLineLen*2>buffSize) { buffSize=2*maxLineLen; rowsPerBatch=2; } - else rowsPerBatch = buffSize / maxLineLen; - if (rowsPerBatch > args.nrow) rowsPerBatch = args.nrow; - if (rowsPerBatch < 1) rowsPerBatch = 1; - int numBatches = (args.nrow-1)/rowsPerBatch + 1; + if (2 * maxLineLen > buffSize) { + buffSize=2*maxLineLen; + rowsPerBatch=2; + } + else + rowsPerBatch = buffSize / maxLineLen; + if (rowsPerBatch > args.nrow) + rowsPerBatch = args.nrow; + if (rowsPerBatch < 1) + rowsPerBatch = 1; + int numBatches = (args.nrow-1) / rowsPerBatch + 1; int nth = args.nth; - if (numBatches < nth) nth = numBatches; + if (numBatches < nth) + nth = numBatches; if (verbose) { DTPRINT(_("Writing %"PRId64" rows in %d batches of %d rows (each buffer size %dMB, showProgress=%d, nth=%d)\n"), args.nrow, numBatches, rowsPerBatch, args.buffMB, args.showProgress, nth); @@ -812,12 +826,12 @@ void fwriteMain(fwriteMainArgs args) size_t zbuffSize = 0; if(args.is_gzip){ #ifndef NOZLIB - z_stream stream = {0}; + z_stream stream; if(init_stream(&stream)) - STOP(_("Can't allocate gzip stream structure")); // # nocov + STOP(_("Can't allocate gzip stream structure")); // # nocov zbuffSize = deflateBound(&stream, buffSize); - if (verbose) DTPRINT(_("zbuffSize=%d returned from deflateBound\n"), (int)zbuffSize); - deflateEnd(&stream); + if (verbose) + DTPRINT(_("zbuffSize=%d returned from deflateBound\n"), (int)zbuffSize); #endif } @@ -843,45 +857,47 @@ void fwriteMain(fwriteMainArgs args) #endif } - bool failed = false; // naked (unprotected by atomic) write to bool ok because only ever write true in this special paradigm - int failed_compress = 0; // the first thread to fail writes their reason here when they first get to ordered section - int failed_write = 0; // same. could use +ve and -ve in the same code but separate it out to trace Solaris problem, #3931 - #ifndef NOZLIB - z_stream *thread_streams = (z_stream *)malloc(nth * sizeof(z_stream)); + z_stream *thread_streams = (z_stream*) malloc(nth * sizeof(z_stream)); if (!thread_streams) - STOP(_("Failed to allocated %d bytes for '%s'."), (int)(nth * sizeof(z_stream)), "thread_streams"); + STOP(_("Failed to allocated %d bytes for threads_streams."), (int)(nth * sizeof(z_stream))); // VLA on stack should be fine for nth structs; in zlib v1.2.11 sizeof(struct)==112 on 64bit // not declared inside the parallel region because solaris appears to move the struct in // memory when the #pragma omp for is entered, which causes zlib's internal self reference // pointer to mismatch, #4099 - char failed_msg[1001] = ""; // to hold zlib's msg; copied out of zlib in ordered section just in case the msg is allocated within zlib #endif - #pragma omp parallel num_threads(nth) - { - int me = omp_get_thread_num(); - int my_failed_compress = 0; - char *ch, *myBuff; - ch = myBuff = buffPool + me*buffSize; +bool failed = false; // naked (unprotected by atomic) write to bool ok because only ever write true in this special paradigm +int failed_compress = 0; // the first thread to fail writes their reason here when they first get to ordered section +int failed_write = 0; // same. could use +ve and -ve in the same code but separate it out to trace Solaris problem, #3931 + +// loop overs rows in parallel---- +#pragma omp parallel for ordered num_threads(nth) + for(int64_t start=0; start < args.nrow; start += rowsPerBatch) { + int me = omp_get_thread_num(); + size_t mylen = 0; + int mycrc; + int my_failed_compress = 0; + char* myBuff = buffPool + me * buffSize; + char* ch = myBuff; - void *myzBuff = NULL; - size_t myzbuffUsed = 0; #ifndef NOZLIB - z_stream *mystream = &thread_streams[me]; - if (args.is_gzip) { - myzBuff = zbuffPool + me*zbuffSize; - if (init_stream(mystream)) { // this should be thread safe according to zlib documentation - failed = true; // # nocov - my_failed_compress = -998; // # nocov - } - } + z_stream *mystream = &thread_streams[me]; + void *myzBuff = NULL; + size_t myzbuffUsed = 0; + if (args.is_gzip) { + myzBuff = zbuffPool + me * zbuffSize; + if (init_stream(mystream)) { // this should be thread safe according to zlib documentation + failed = true; // # nocov + my_failed_compress = -998; // # nocov + } + } #endif + if (failed) + continue; // Not break. Because we don't use #omp cancel yet. + int64_t end = ((args.nrow - start) < rowsPerBatch) ? args.nrow : start + rowsPerBatch; - #pragma omp for ordered schedule(dynamic) - for(int64_t start=0; start=1 because 0-columns was caught earlier. write_chars(args.eol, &ch); // overwrite last sep with eol instead - } + } // end of chunk rows loop + // compress buffer if gzip #ifndef NOZLIB if (args.is_gzip && !failed) { myzbuffUsed = zbuffSize; - int ret = compressbuff(mystream, myzBuff, &myzbuffUsed, myBuff, (size_t)(ch-myBuff)); - if (ret) { failed=true; my_failed_compress=ret; } - else deflateReset(mystream); + mylen = (size_t)(ch - myBuff); + mycrc = crc32(0, myBuff, mylen); + int ret = compressbuff(mystream, myzBuff, &myzbuffUsed, myBuff, mylen); + if (ret) { + failed=true; + my_failed_compress=ret; + } } #endif - #pragma omp ordered - { - if (failed) { - // # nocov start - if (failed_compress==0 && my_failed_compress!=0) { - failed_compress = my_failed_compress; -#ifndef NOZLIB - if (mystream->msg!=NULL) strncpy(failed_msg, mystream->msg, 1000); // copy zlib's msg for safe use after deflateEnd just in case zlib allocated the message -#endif - } - // else another thread could have failed below while I was working or waiting above; their reason got here first - // # nocov end + + // ordered region ---- +#pragma omp ordered + if (failed) { + // # nocov start + if (failed_compress==0 && my_failed_compress!=0) { + failed_compress = my_failed_compress; + } + // else another thread could have failed below while I was working or waiting above; their reason got here first + // # nocov end + } else { + errno=0; + int ret = 0; + if (f==-1) { + *ch='\0'; // standard C string end marker so DTPRINT knows where to stop + DTPRINT("%s", myBuff); + } else if (args.is_gzip) { + ret = WRITE(f, myzBuff, (int)myzbuffUsed); } else { - errno=0; - if (f==-1) { - *ch='\0'; // standard C string end marker so DTPRINT knows where to stop - DTPRINT("%s", myBuff); - } else if ((args.is_gzip ? WRITE(f, myzBuff, (int)myzbuffUsed) - : WRITE(f, myBuff, (int)(ch-myBuff))) == -1) { - failed=true; // # nocov - failed_write=errno; // # nocov - } + ret = WRITE(f, myBuff, (int)(ch-myBuff)); + } + if (ret == -1) { + failed=true; // # nocov + failed_write=errno; // # nocov + } - int used = 100*((double)(ch-myBuff))/buffSize; // percentage of original buffMB - if (used > maxBuffUsedPC) maxBuffUsedPC = used; - double now; - if (me==0 && args.showProgress && (now=wallclock())>=nextTime && !failed) { - // See comments above inside the f==-1 clause. - // Not only is this ordered section one-at-a-time but we'll also Rprintf() here only from the - // master thread (me==0) and hopefully this will work on Windows. If not, user should set - // showProgress=FALSE until this can be fixed or removed. - // # nocov start - int ETA = (int)((args.nrow-end)*((now-startTime)/end)); - if (hasPrinted || ETA >= 2) { - if (verbose && !hasPrinted) DTPRINT("\n"); - DTPRINT("\rWritten %.1f%% of %"PRId64" rows in %d secs using %d thread%s. " - "maxBuffUsed=%d%%. ETA %d secs. ", - (100.0*end)/args.nrow, args.nrow, (int)(now-startTime), nth, nth==1?"":"s", - maxBuffUsedPC, ETA); - // TODO: use progress() as in fread - nextTime = now+1; - hasPrinted = true; - } - // # nocov end + crc = crc32_combine(crc, mycrc, mylen); + len += mylen; + + int used = 100*((double)(ch-myBuff))/buffSize; // percentage of original buffMB + if (used > maxBuffUsedPC) maxBuffUsedPC = used; + double now; + if (me==0 && args.showProgress && (now=wallclock())>=nextTime && !failed) { + // See comments above inside the f==-1 clause. + // Not only is this ordered section one-at-a-time but we'll also Rprintf() here only from the + // master thread (me==0) and hopefully this will work on Windows. If not, user should set + // showProgress=FALSE until this can be fixed or removed. + // # nocov start + int ETA = (int)((args.nrow-end)*((now-startTime)/end)); + if (hasPrinted || ETA >= 2) { + if (verbose && !hasPrinted) DTPRINT("\n"); + DTPRINT("\rWritten %.1f%% of %"PRId64" rows in %d secs using %d thread%s. " + "maxBuffUsed=%d%%. ETA %d secs. ", + (100.0*end)/args.nrow, args.nrow, (int)(now-startTime), nth, nth==1?"":"s", + maxBuffUsedPC, ETA); + // TODO: use progress() as in fread + nextTime = now+1; + hasPrinted = true; } - // May be possible for master thread (me==0) to call R_CheckUserInterrupt() here. - // Something like: - // if (me==0) { - // failed = TRUE; // inside ordered here; the slaves are before ordered and not looking at 'failed' - // R_CheckUserInterrupt(); - // failed = FALSE; // no user interrupt so return state - // } - // But I fear the slaves will hang waiting for the master (me==0) to complete the ordered - // section which may not happen if the master thread has been interrupted. Rather than - // seeing failed=TRUE and falling through to free() and close() as intended. - // Could register a finalizer to free() and close() perhaps : - // [r-devel] http://r.789695.n4.nabble.com/checking-user-interrupts-in-C-code-tp2717528p2717722.html - // Conclusion for now: do not provide ability to interrupt. - // write() errors and malloc() fails will be caught and cleaned up properly, however. - ch = myBuff; // back to the start of my buffer ready to fill it up again + // # nocov end } } - } - // all threads will call this free on their buffer, even if one or more threads had malloc - // or realloc fail. If the initial malloc failed, free(NULL) is ok and does nothing. + } // end of parallel for loop + +/* put a 4-byte integer into a byte array in LSB order */ +#define PUT4(a,b) ((a)[0]=(b), (a)[1]=(b)>>8, (a)[2]=(b)>>16, (a)[3]=(b)>>24) + + // write gzip tailer with crc and len if (args.is_gzip) { -#ifndef NOZLIB - deflateEnd(mystream); -#endif + // DTPRINT(_("crc=%x len=%ld\n", crc, len)); + unsigned char tail[10]; + tail[0] = 3; + tail[1] = 0; + PUT4(tail + 2, crc); + PUT4(tail + 6, len); + int ret = WRITE(f, tail, 10); + if (ret == -1) + STOP("Error: can't write gzip tailer"); } - } + free(buffPool); #ifndef NOZLIB free(thread_streams); @@ -1016,8 +1036,8 @@ void fwriteMain(fwriteMainArgs args) // # nocov start #ifndef NOZLIB if (failed_compress) - STOP(_("zlib %s (zlib.h %s) deflate() returned error %d with z_stream->msg==\"%s\" Z_FINISH=%d Z_BLOCK=%d. %s"), - zlibVersion(), ZLIB_VERSION, failed_compress, failed_msg, Z_FINISH, Z_BLOCK, + STOP(_("zlib %s (zlib.h %s) deflate() returned error %d Z_FINISH=%d Z_BLOCK=%d. %s"), + zlibVersion(), ZLIB_VERSION, failed_compress, Z_FINISH, Z_BLOCK, verbose ? _("Please include the full output above and below this message in your data.table bug report.") : _("Please retry fwrite() with verbose=TRUE and include the full output with your data.table bug report.")); #endif @@ -1026,5 +1046,3 @@ void fwriteMain(fwriteMainArgs args) // # nocov end } } - - From 97c919beb63326c2d945291818a8120263e51877 Mon Sep 17 00:00:00 2001 From: Philippe Chataignon Date: Fri, 23 Aug 2024 08:08:36 +0200 Subject: [PATCH 02/42] Escape with NOZLIB for compilation succeed without zlib --- src/fwrite.c | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/fwrite.c b/src/fwrite.c index ebc13548e..fc50f0461 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -955,7 +955,9 @@ int failed_write = 0; // same. could use +ve and -ve in the same code but sep *ch='\0'; // standard C string end marker so DTPRINT knows where to stop DTPRINT("%s", myBuff); } else if (args.is_gzip) { +#ifndef NOZLIB ret = WRITE(f, myzBuff, (int)myzbuffUsed); +#endif } else { ret = WRITE(f, myBuff, (int)(ch-myBuff)); } @@ -964,8 +966,12 @@ int failed_write = 0; // same. could use +ve and -ve in the same code but sep failed_write=errno; // # nocov } - crc = crc32_combine(crc, mycrc, mylen); - len += mylen; + if (args.is_gzip) { +#ifndef NOZLIB + crc = crc32_combine(crc, mycrc, mylen); + len += mylen; +#endif + } int used = 100*((double)(ch-myBuff))/buffSize; // percentage of original buffMB if (used > maxBuffUsedPC) maxBuffUsedPC = used; From 4914d7ae2dd1a276334392c66c4cb00155529e08 Mon Sep 17 00:00:00 2001 From: Philippe Chataignon Date: Fri, 23 Aug 2024 08:13:50 +0200 Subject: [PATCH 03/42] Move zlib check at start to avoid oufile deletion --- src/fwrite.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/fwrite.c b/src/fwrite.c index fc50f0461..30d1feff4 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -601,6 +601,11 @@ void fwriteMain(fwriteMainArgs args) size_t len; unsigned int crc; +#ifdef NOZLIB + if (args.is_gzip) + STOP(_("Compression in fwrite uses zlib library. Its header files were not found at the time data.table was compiled. To enable fwrite compression, please reinstall data.table and study the output for further guidance.")); // # nocov +#endif + // When NA is a non-empty string, then we must quote all string fields in case they contain the na string // na is recommended to be empty, though if (na[0]!='\0' && doQuote==INT8_MIN) doQuote = true; @@ -694,10 +699,6 @@ void fwriteMain(fwriteMainArgs args) // # nocov end } } -#ifdef NOZLIB - if (args.is_gzip) - STOP(_("Compression in fwrite uses zlib library. Its header files were not found at the time data.table was compiled. To enable fwrite compression, please reinstall data.table and study the output for further guidance.")); // # nocov -#endif int yamlLen = strlen(args.yaml); if (verbose) { From d9e9e928d6495d126956d062cd07e1a873da7f61 Mon Sep 17 00:00:00 2001 From: Philippe Chataignon Date: Fri, 23 Aug 2024 14:14:48 +0200 Subject: [PATCH 04/42] Indent and add comments --- src/fwrite.c | 53 +++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 38 insertions(+), 15 deletions(-) diff --git a/src/fwrite.c b/src/fwrite.c index 30d1feff4..55377fa92 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -601,6 +601,7 @@ void fwriteMain(fwriteMainArgs args) size_t len; unsigned int crc; + // exit if compression is needed in param and no zlib #ifdef NOZLIB if (args.is_gzip) STOP(_("Compression in fwrite uses zlib library. Its header files were not found at the time data.table was compiled. To enable fwrite compression, please reinstall data.table and study the output for further guidance.")); // # nocov @@ -608,19 +609,24 @@ void fwriteMain(fwriteMainArgs args) // When NA is a non-empty string, then we must quote all string fields in case they contain the na string // na is recommended to be empty, though - if (na[0]!='\0' && doQuote==INT8_MIN) doQuote = true; + if (na[0]!='\0' && doQuote==INT8_MIN) + doQuote = true; - qmethodEscape = args.qmethodEscape; - squashDateTime = args.squashDateTime; - if (args.buffMB<1 || args.buffMB>1024) STOP(_("buffMB=%d outside [1,1024]"), args.buffMB); + // create buffers + if (args.buffMB<1 || args.buffMB>1024) + STOP(_("buffMB=%d outside [1,1024]"), args.buffMB); size_t buffSize = (size_t)1024*1024*args.buffMB; + qmethodEscape = args.qmethodEscape; + squashDateTime = args.squashDateTime; + int eolLen=strlen(args.eol), naLen=strlen(args.na); // Aside: codacy wants strnlen but strnlen is not in C99 (neither is strlen_s). To pass `gcc -std=c99 -Wall -pedantic` // we'd need `#define _POSIX_C_SOURCE 200809L` before #include but that seems a step too far // and platform specific. We prefer to be pure C99. - if (eolLen<=0) STOP(_("eol must be 1 or more bytes (usually either \\n or \\r\\n) but is length %d"), eolLen); + if (eolLen<=0) + STOP(_("eol must be 1 or more bytes (usually either \\n or \\r\\n) but is length %d"), eolLen); if (verbose) { DTPRINT(_("Column writers: ")); @@ -635,6 +641,8 @@ void fwriteMain(fwriteMainArgs args) args.doRowNames, args.rowNames, args.rowNameFun, doQuote, args.nrow, args.ncol, eolLen); } + // Calc maxLineLen + // // Calculate upper bound for line length. Numbers use a fixed maximum (e.g. 12 for integer) while strings find the longest // string in each column. Upper bound is then the sum of the columns' max widths. // This upper bound is required to determine a reasonable rowsPerBatch. It also saves needing to grow the buffers which @@ -669,17 +677,19 @@ void fwriteMain(fwriteMainArgs args) STOP(_("Internal error: type %d has no max length method implemented"), args.whichFun[j]); // # nocov } } - if (args.whichFun[j]==WF_Float64 && args.scipen>0) width+=MIN(args.scipen,350); // clamp width to IEEE754 max to avoid scipen=99999 allocating buffer larger than can ever be written - if (width0) + width+=MIN(args.scipen,350); // clamp width to IEEE754 max to avoid scipen=99999 allocating buffer larger than can ever be written + if (width> column name) } if (headerLen) { @@ -718,19 +733,27 @@ void fwriteMain(fwriteMainArgs args) if (!buff) STOP(_("Unable to allocate %zu MiB for header: %s"), headerLen / 1024 / 1024, strerror(errno)); char *ch = buff; - if (args.bom) {*ch++=(char)0xEF; *ch++=(char)0xBB; *ch++=(char)0xBF; } // 3 appears above (search for "bom") + if (args.bom) { + *ch++=(char)0xEF; + *ch++=(char)0xBB; + *ch++=(char)0xBF; + } // 3 appears above (search for "bom") memcpy(ch, args.yaml, yamlLen); ch += yamlLen; if (args.colNames) { if (args.doRowNames) { // Unusual: the extra blank column name when row_names are added as the first column - if (doQuote!=0/*'auto'(NA) or true*/) { *ch++='"'; *ch++='"'; } // to match write.csv + if (doQuote !=0) { + // to match write.csv + *ch++='"'; + *ch++='"'; + } *ch = sep; ch += sepLen; } int8_t tempDoQuote = doQuote; doQuote = quoteHeaders; // temporary overwrite since headers might get different quoting behavior, #2964 - for (int j=0; j Date: Fri, 23 Aug 2024 14:52:30 +0200 Subject: [PATCH 05/42] Buffers unification --- src/fwrite.c | 223 +++++++++++++++++++++++++-------------------------- 1 file changed, 110 insertions(+), 113 deletions(-) diff --git a/src/fwrite.c b/src/fwrite.c index 55377fa92..4b83c326b 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -29,9 +29,6 @@ #define NUM_SF 15 #define SIZE_SF 1000000000000000ULL // 10^NUM_SF -#define MIN(a,b) (((a)<(b))?(a):(b)) -#define MAX(a,b) (((a)>(b))?(a):(b)) - // Globals for this file only. Written once to hold parameters passed from R level. static const char *na; // by default "" or if set (not recommended) then usually "NA" static char sep; // comma in .csv files @@ -56,7 +53,8 @@ inline void write_chars(const char *x, char **pch) { // similar to C's strcpy but i) doesn't include trailing \0 and ii) moves destination along char *ch = *pch; - while (*x) *ch++=*x++; + while (*x) + *ch++ = *x++; *pch = ch; } @@ -113,7 +111,7 @@ void writeInt32(const void *col, int64_t row, char **pch) if (x == INT32_MIN) { write_chars(na, &ch); } else { - if (x<0) { *ch++ = '-'; x=-x; } + if (x<0) { *ch++ = '-'; x = -x; } // Avoid log() for speed. Write backwards then reverse when we know how long. char *low = ch; do { *ch++ = '0'+x%10; x/=10; } while (x>0); @@ -129,7 +127,7 @@ void writeInt64(const void *col, int64_t row, char **pch) if (x == INT64_MIN) { write_chars(na, &ch); } else { - if (x<0) { *ch++ = '-'; x=-x; } + if (x<0) { *ch++ = '-'; x = -x; } char *low = ch; do { *ch++ = '0'+x%10; x/=10; } while (x>0); reverse(ch, low); @@ -252,7 +250,7 @@ void writeFloat64(const void *col, int64_t row, char **pch) int dr = sf-exp-1; // how many characters to print to the right of the decimal place int width=0; // field width were it written decimal format. Used to decide whether to or not. int dl0=0; // how many 0's to add to the left of the decimal place before starting l - if (dr<=0) { dl0=-dr; dr=0; width=sf+dl0; } // 1, 10, 100, 99000 + if (dr<=0) { dl0 = -dr; dr=0; width=sf+dl0; } // 1, 10, 100, 99000 else { if (sf>dr) width=sf+1; // 1.234 and 123.4 else { dl0=1; width=dr+1+dl0; } // 0.1234, 0.0001234 @@ -285,7 +283,7 @@ void writeFloat64(const void *col, int64_t row, char **pch) *ch = '0' + l; ch += sf + (sf>1); *ch++ = 'e'; // lower case e to match base::write.csv - if (exp < 0) { *ch++ = '-'; exp=-exp; } + if (exp < 0) { *ch++ = '-'; exp = -exp; } else { *ch++ = '+'; } // to match base::write.csv if (exp < 100) { *ch++ = '0' + (exp / 10); @@ -612,12 +610,6 @@ void fwriteMain(fwriteMainArgs args) if (na[0]!='\0' && doQuote==INT8_MIN) doQuote = true; - - // create buffers - if (args.buffMB<1 || args.buffMB>1024) - STOP(_("buffMB=%d outside [1,1024]"), args.buffMB); - size_t buffSize = (size_t)1024*1024*args.buffMB; - qmethodEscape = args.qmethodEscape; squashDateTime = args.squashDateTime; @@ -678,7 +670,8 @@ void fwriteMain(fwriteMainArgs args) } } if (args.whichFun[j]==WF_Float64 && args.scipen>0) - width+=MIN(args.scipen,350); // clamp width to IEEE754 max to avoid scipen=99999 allocating buffer larger than can ever be written + // clamp width to IEEE754 max to avoid scipen=99999 allocating buffer larger than can ever be written + width += (args.scipen < 350 ? args.scipen : 350 ); if (width> column name) } + + // Create heap zones ---- + + int nth = args.nth; + // alloc zlib streams +#ifndef NOZLIB + z_stream *thread_streams = (z_stream*) malloc(nth * sizeof(z_stream)); + if (!thread_streams) + STOP(_("Failed to allocated %d bytes for threads_streams."), (int)(nth * sizeof(z_stream))); + // VLA on stack should be fine for nth structs; in zlib v1.2.11 sizeof(struct)==112 on 64bit + // not declared inside the parallel region because solaris appears to move the struct in + // memory when the #pragma omp for is entered, which causes zlib's internal self reference + // pointer to mismatch, #4099 +#endif + + // Get buffSize + if (args.buffMB<1 || args.buffMB>1024) + STOP(_("buffMB=%d outside [1,1024]"), args.buffMB); + size_t buffSize = (size_t) (1 << 20) * args.buffMB; + + // compute zbuffSize which is the same for each thread + size_t zbuffSize = 0; + if(args.is_gzip){ +#ifndef NOZLIB + z_stream *stream = thread_streams; + init_stream(stream); + zbuffSize = deflateBound(stream, buffSize); + if (verbose) + DTPRINT(_("zbuffSize=%d returned from deflateBound\n"), (int)zbuffSize); +#endif + } + // + // Decide buffer size and rowsPerBatch for each thread + // Once rowsPerBatch is decided it can't be changed + int rowsPerBatch=0; + if (2 * maxLineLen > buffSize) { + buffSize = 2 * maxLineLen; + rowsPerBatch=2; + } + else + rowsPerBatch = buffSize / maxLineLen; + if (rowsPerBatch > args.nrow) + rowsPerBatch = args.nrow; + if (rowsPerBatch < 1) + rowsPerBatch = 1; + int numBatches = (args.nrow-1) / rowsPerBatch + 1; + if (numBatches < nth) + nth = numBatches; + if (verbose) { + DTPRINT(_("Writing %"PRId64" rows in %d batches of %d rows (each buffer size %dMB, showProgress=%d, nth=%d)\n"), + args.nrow, numBatches, rowsPerBatch, args.buffMB, args.showProgress, nth); + } + + // alloc nth write buffers + errno=0; + // if headerLen > nth * buffSize (long variable names and 1 thread), alloc headerLen + char *buffPool = malloc(nth * buffSize < headerLen ? headerLen : nth * buffSize ); + if (!buffPool) { + // # nocov start + STOP(_("Unable to allocate %zu MB * %d thread buffers; '%d: %s'. Please read ?fwrite for nThread, buffMB and verbose options."), + (size_t)buffSize/(1 << 20), nth, errno, strerror(errno)); + // # nocov end + } + + // alloc nth zlib buffers + char *zbuffPool = NULL; + if (args.is_gzip) { +#ifndef NOZLIB + // if headerLen > nth * zbuffSize (long variable names and 1 thread), alloc headerLen + zbuffPool = malloc(nth * zbuffSize < headerLen ? headerLen : nth * zbuffSize); + if (!zbuffPool) { + // # nocov start + free(buffPool); + STOP(_("Unable to allocate %zu MB * %d thread compressed buffers; '%d: %s'. Please read ?fwrite for nThread, buffMB and verbose options."), + (size_t)zbuffSize/(1024^2), nth, errno, strerror(errno)); + // # nocov end + } +#endif + } + + // write header + + // use buffPool and zbuffPool because we ensure that allocation is minimum headerLen + if (headerLen) { - char *buff = malloc(headerLen); - if (!buff) - STOP(_("Unable to allocate %zu MiB for header: %s"), headerLen / 1024 / 1024, strerror(errno)); + char *buff = buffPool; char *ch = buff; if (args.bom) { *ch++=(char)0xEF; @@ -762,27 +838,16 @@ void fwriteMain(fwriteMainArgs args) ch -= sepLen; // backup over the last sep write_chars(args.eol, &ch); } - if (f==-1) { + if (f == -1) { *ch = '\0'; DTPRINT("%s", buff); - free(buff); } else { int ret0=0, ret1=0, ret2=0; if (args.is_gzip) { #ifndef NOZLIB - z_stream stream; - if(init_stream(&stream)) { - free(buff); // # nocov - STOP(_("Can't allocate gzip stream structure")); // # nocov - } - // by default, buffsize is the same used for writing rows (#5048 old openbsd zlib) - // takes the max with headerLen size in case of very long header - size_t zbuffSize = deflateBound(&stream, headerLen > buffSize ? headerLen : buffSize); - char *zbuff = malloc(zbuffSize); - if (!zbuff) { - free(buff); // # nocov - STOP(_("Unable to allocate %zu MiB for zbuffer: %s"), zbuffSize / 1024 / 1024, strerror(errno)); // # nocov - } + z_stream *stream = thread_streams; + init_stream(stream); + char* zbuff = zbuffPool; // write minimal gzip header char* header = "\037\213\10\0\0\0\0\0\0\3"; ret0 = WRITE(f, header, 10); @@ -791,16 +856,14 @@ void fwriteMain(fwriteMainArgs args) size_t zbuffUsed = zbuffSize; len = (size_t)(ch - buff); crc = crc32(crc, buff, len); - ret1 = compressbuff(&stream, zbuff, &zbuffUsed, buff, len); + ret1 = compressbuff(stream, zbuff, &zbuffUsed, buff, len); if (ret1==Z_OK) ret2 = WRITE(f, zbuff, (int)zbuffUsed); - free(zbuff); #endif } else { ret2 = WRITE(f, buff, (int)(ch-buff)); } - free(buff); - if (ret0 == -1 || ret1 || ret2==-1) { + if (ret0 == -1 || ret1 || ret2 == -1) { // # nocov start int errwrite = errno; // capture write errno now in case close fails with a different errno CLOSE(f); @@ -814,88 +877,22 @@ void fwriteMain(fwriteMainArgs args) if (verbose) DTPRINT(_("done in %.3fs\n"), 1.0*(wallclock()-t0)); if (args.nrow == 0) { if (verbose) DTPRINT(_("No data rows present (nrow==0)\n")); - if (f!=-1 && CLOSE(f)) STOP(_("%s: '%s'"), strerror(errno), args.filename); + if (f != -1 && CLOSE(f)) STOP(_("%s: '%s'"), strerror(errno), args.filename); return; } - // Writing rows + // Write rows ---- - // Decide buffer size and rowsPerBatch for each thread - // Once rowsPerBatch is decided it can't be changed - int rowsPerBatch=0; - if (2 * maxLineLen > buffSize) { - buffSize=2*maxLineLen; - rowsPerBatch=2; - } - else - rowsPerBatch = buffSize / maxLineLen; - if (rowsPerBatch > args.nrow) - rowsPerBatch = args.nrow; - if (rowsPerBatch < 1) - rowsPerBatch = 1; - int numBatches = (args.nrow-1) / rowsPerBatch + 1; - int nth = args.nth; - if (numBatches < nth) - nth = numBatches; - if (verbose) { - DTPRINT(_("Writing %"PRId64" rows in %d batches of %d rows (each buffer size %dMB, showProgress=%d, nth=%d)\n"), - args.nrow, numBatches, rowsPerBatch, args.buffMB, args.showProgress, nth); - } t0 = wallclock(); bool hasPrinted = false; int maxBuffUsedPC = 0; - // compute zbuffSize which is the same for each thread - size_t zbuffSize = 0; - if(args.is_gzip){ -#ifndef NOZLIB - z_stream stream; - if(init_stream(&stream)) - STOP(_("Can't allocate gzip stream structure")); // # nocov - zbuffSize = deflateBound(&stream, buffSize); - if (verbose) - DTPRINT(_("zbuffSize=%d returned from deflateBound\n"), (int)zbuffSize); -#endif - } - - errno=0; - char *buffPool = malloc(nth*(size_t)buffSize); - if (!buffPool) { - // # nocov start - STOP(_("Unable to allocate %zu MB * %d thread buffers; '%d: %s'. Please read ?fwrite for nThread, buffMB and verbose options."), - (size_t)buffSize/(1024^2), nth, errno, strerror(errno)); - // # nocov end - } - char *zbuffPool = NULL; - if (args.is_gzip) { -#ifndef NOZLIB - zbuffPool = malloc(nth*(size_t)zbuffSize); - if (!zbuffPool) { - // # nocov start - free(buffPool); - STOP(_("Unable to allocate %zu MB * %d thread compressed buffers; '%d: %s'. Please read ?fwrite for nThread, buffMB and verbose options."), - (size_t)zbuffSize/(1024^2), nth, errno, strerror(errno)); - // # nocov end - } -#endif - } - -#ifndef NOZLIB - z_stream *thread_streams = (z_stream*) malloc(nth * sizeof(z_stream)); - if (!thread_streams) - STOP(_("Failed to allocated %d bytes for threads_streams."), (int)(nth * sizeof(z_stream))); - // VLA on stack should be fine for nth structs; in zlib v1.2.11 sizeof(struct)==112 on 64bit - // not declared inside the parallel region because solaris appears to move the struct in - // memory when the #pragma omp for is entered, which causes zlib's internal self reference - // pointer to mismatch, #4099 -#endif - -bool failed = false; // naked (unprotected by atomic) write to bool ok because only ever write true in this special paradigm -int failed_compress = 0; // the first thread to fail writes their reason here when they first get to ordered section -int failed_write = 0; // same. could use +ve and -ve in the same code but separate it out to trace Solaris problem, #3931 + bool failed = false; // naked (unprotected by atomic) write to bool ok because only ever write true in this special paradigm + int failed_compress = 0; // the first thread to fail writes their reason here when they first get to ordered section + int failed_write = 0; // same. could use +ve and -ve in the same code but separate it out to trace Solaris problem, #3931 -// loop overs rows in parallel---- +// main parallel loop ---- #pragma omp parallel for ordered num_threads(nth) for(int64_t start=0; start < args.nrow; start += rowsPerBatch) { int me = omp_get_thread_num(); @@ -975,7 +972,7 @@ int failed_write = 0; // same. could use +ve and -ve in the same code but sep } else { errno=0; int ret = 0; - if (f==-1) { + if (f == -1) { *ch='\0'; // standard C string end marker so DTPRINT knows where to stop DTPRINT("%s", myBuff); } else if (args.is_gzip) { @@ -1056,7 +1053,7 @@ int failed_write = 0; // same. could use +ve and -ve in the same code but sep // # nocov end } - if (f!=-1 && CLOSE(f) && !failed) + if (f != -1 && CLOSE(f) && !failed) STOP("%s: '%s'", strerror(errno), args.filename); // # nocov // quoted '%s' in case of trailing spaces in the filename // If a write failed, the line above tries close() to clean up, but that might fail as well. So the From 2fc184e42cd5e51d3e89ee60c4e3369df81b6690 Mon Sep 17 00:00:00 2001 From: Philippe Chataignon Date: Fri, 23 Aug 2024 15:50:14 +0200 Subject: [PATCH 06/42] Restore schedule(dynamic) more efficient and progress --- src/fwrite.c | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/src/fwrite.c b/src/fwrite.c index 4b83c326b..522a4bbe4 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -874,10 +874,13 @@ void fwriteMain(fwriteMainArgs args) } } } - if (verbose) DTPRINT(_("done in %.3fs\n"), 1.0*(wallclock()-t0)); + if (verbose) + DTPRINT(_("done in %.3fs\n"), 1.0*(wallclock()-t0)); if (args.nrow == 0) { - if (verbose) DTPRINT(_("No data rows present (nrow==0)\n")); - if (f != -1 && CLOSE(f)) STOP(_("%s: '%s'"), strerror(errno), args.filename); + if (verbose) + DTPRINT(_("No data rows present (nrow==0)\n")); + if (f != -1 && CLOSE(f)) + STOP(_("%s: '%s'"), strerror(errno), args.filename); return; } @@ -893,7 +896,7 @@ void fwriteMain(fwriteMainArgs args) int failed_write = 0; // same. could use +ve and -ve in the same code but separate it out to trace Solaris problem, #3931 // main parallel loop ---- -#pragma omp parallel for ordered num_threads(nth) +#pragma omp parallel for ordered num_threads(nth) schedule(dynamic) for(int64_t start=0; start < args.nrow; start += rowsPerBatch) { int me = omp_get_thread_num(); size_t mylen = 0; @@ -919,7 +922,7 @@ void fwriteMain(fwriteMainArgs args) int64_t end = ((args.nrow - start) < rowsPerBatch) ? args.nrow : start + rowsPerBatch; // chunk rows - for (int64_t i=start; i maxBuffUsedPC) maxBuffUsedPC = used; + int used = 100 * ((double)(ch - myBuff)) / buffSize; // percentage of original buffMB + if (used > maxBuffUsedPC) + maxBuffUsedPC = used; double now; - if (me==0 && args.showProgress && (now=wallclock())>=nextTime && !failed) { + if (me == 0 && !failed && args.showProgress && (now=wallclock()) >= nextTime) { // See comments above inside the f==-1 clause. // Not only is this ordered section one-at-a-time but we'll also Rprintf() here only from the // master thread (me==0) and hopefully this will work on Windows. If not, user should set // showProgress=FALSE until this can be fixed or removed. // # nocov start - int ETA = (int)((args.nrow-end)*((now-startTime)/end)); + int ETA = (int)((args.nrow - end) * (now-startTime) /end); if (hasPrinted || ETA >= 2) { - if (verbose && !hasPrinted) DTPRINT("\n"); + if (verbose && !hasPrinted) + DTPRINT("\n"); DTPRINT("\rWritten %.1f%% of %"PRId64" rows in %d secs using %d thread%s. " "maxBuffUsed=%d%%. ETA %d secs. ", - (100.0*end)/args.nrow, args.nrow, (int)(now-startTime), nth, nth==1?"":"s", - maxBuffUsedPC, ETA); - // TODO: use progress() as in fread - nextTime = now+1; + (100.0*end)/args.nrow, args.nrow, (int)(now-startTime), nth, nth ==1 ? "" : "s", + maxBuffUsedPC, ETA); + // TODO: use progress() as in fread + nextTime = now + 1; hasPrinted = true; } // # nocov end From 47c16f2b872ead776e744e65020f509a9619bac2 Mon Sep 17 00:00:00 2001 From: Philippe Chataignon Date: Fri, 23 Aug 2024 16:20:33 +0200 Subject: [PATCH 07/42] Use alloc_size to see allocation when verbose --- src/fwrite.c | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/src/fwrite.c b/src/fwrite.c index 522a4bbe4..76815c720 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -705,8 +705,8 @@ void fwriteMain(fwriteMainArgs args) int yamlLen = strlen(args.yaml); if (verbose) { - DTPRINT(_("Writing bom (%s), yaml (%d characters) and column names (%s) ... "), - args.bom?"true":"false", yamlLen, args.colNames?"true":"false"); + DTPRINT(_("Writing bom (%s), yaml (%d characters) and column names (%s)\n"), + args.bom ? "true" : "false", yamlLen, args.colNames ? "true" : "false"); if (f == -1) DTPRINT(_("\n")); } @@ -729,6 +729,9 @@ void fwriteMain(fwriteMainArgs args) // alloc zlib streams #ifndef NOZLIB z_stream *thread_streams = (z_stream*) malloc(nth * sizeof(z_stream)); +if (verbose) { + DTPRINT(_("Allocate %ld bytes for thread_streams\n"), nth * sizeof(z_stream)); +} if (!thread_streams) STOP(_("Failed to allocated %d bytes for threads_streams."), (int)(nth * sizeof(z_stream))); // VLA on stack should be fine for nth structs; in zlib v1.2.11 sizeof(struct)==112 on 64bit @@ -777,10 +780,16 @@ void fwriteMain(fwriteMainArgs args) // alloc nth write buffers errno=0; + size_t alloc_size; // if headerLen > nth * buffSize (long variable names and 1 thread), alloc headerLen - char *buffPool = malloc(nth * buffSize < headerLen ? headerLen : nth * buffSize ); + alloc_size = nth * buffSize < headerLen ? headerLen : nth * buffSize; + if (verbose) { + DTPRINT(_("Allocate %ld bytes for buffPool\n"), alloc_size); + } + char *buffPool = malloc(alloc_size); if (!buffPool) { // # nocov start + free(thread_streams); STOP(_("Unable to allocate %zu MB * %d thread buffers; '%d: %s'. Please read ?fwrite for nThread, buffMB and verbose options."), (size_t)buffSize/(1 << 20), nth, errno, strerror(errno)); // # nocov end @@ -791,10 +800,15 @@ void fwriteMain(fwriteMainArgs args) if (args.is_gzip) { #ifndef NOZLIB // if headerLen > nth * zbuffSize (long variable names and 1 thread), alloc headerLen - zbuffPool = malloc(nth * zbuffSize < headerLen ? headerLen : nth * zbuffSize); + alloc_size = nth * zbuffSize < headerLen ? headerLen : nth * zbuffSize; + if (verbose) { + DTPRINT(_("Allocate %ld bytes for zbuffPool\n"), alloc_size); + } + zbuffPool = malloc(alloc_size); if (!zbuffPool) { // # nocov start free(buffPool); + free(thread_streams); STOP(_("Unable to allocate %zu MB * %d thread compressed buffers; '%d: %s'. Please read ?fwrite for nThread, buffMB and verbose options."), (size_t)zbuffSize/(1024^2), nth, errno, strerror(errno)); // # nocov end From 3dcb7d6af5d8a0edf7012431d0725210bb54a6b9 Mon Sep 17 00:00:00 2001 From: Philippe Chataignon Date: Fri, 23 Aug 2024 17:16:06 +0200 Subject: [PATCH 08/42] Test if stream init succeded --- src/fwrite.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/fwrite.c b/src/fwrite.c index 76815c720..e741d1570 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -750,7 +750,8 @@ if (verbose) { if(args.is_gzip){ #ifndef NOZLIB z_stream *stream = thread_streams; - init_stream(stream); + if (init_stream(stream) != Z_OK) + STOP(_("Can't init stream structure for deflateBound")); zbuffSize = deflateBound(stream, buffSize); if (verbose) DTPRINT(_("zbuffSize=%d returned from deflateBound\n"), (int)zbuffSize); @@ -860,7 +861,8 @@ if (verbose) { if (args.is_gzip) { #ifndef NOZLIB z_stream *stream = thread_streams; - init_stream(stream); + if (init_stream(stream) != Z_OK) + STOP(_("Can't init stream structure for writing header")); char* zbuff = zbuffPool; // write minimal gzip header char* header = "\037\213\10\0\0\0\0\0\0\3"; @@ -925,7 +927,7 @@ if (verbose) { size_t myzbuffUsed = 0; if (args.is_gzip) { myzBuff = zbuffPool + me * zbuffSize; - if (init_stream(mystream)) { // this should be thread safe according to zlib documentation + if (init_stream(mystream) != Z_OK) { // this should be thread safe according to zlib documentation failed = true; // # nocov my_failed_compress = -998; // # nocov } From 6569cfeb342641b7843d51400fc5fa940723f093 Mon Sep 17 00:00:00 2001 From: Philippe Chataignon Date: Fri, 23 Aug 2024 18:20:47 +0200 Subject: [PATCH 09/42] Add cast to avoid warnings on Windows --- src/fwrite.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/fwrite.c b/src/fwrite.c index e741d1570..5a61144bd 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -871,7 +871,7 @@ if (verbose) { size_t zbuffUsed = zbuffSize; len = (size_t)(ch - buff); - crc = crc32(crc, buff, len); + crc = crc32(crc, (unsigned char*)buff, len); ret1 = compressbuff(stream, zbuff, &zbuffUsed, buff, len); if (ret1==Z_OK) ret2 = WRITE(f, zbuff, (int)zbuffUsed); @@ -916,7 +916,7 @@ if (verbose) { for(int64_t start=0; start < args.nrow; start += rowsPerBatch) { int me = omp_get_thread_num(); size_t mylen = 0; - int mycrc; + int mycrc = crc32(0L, Z_NULL, 0); int my_failed_compress = 0; char* myBuff = buffPool + me * buffSize; char* ch = myBuff; @@ -970,7 +970,7 @@ if (verbose) { if (args.is_gzip && !failed) { myzbuffUsed = zbuffSize; mylen = (size_t)(ch - myBuff); - mycrc = crc32(0, myBuff, mylen); + mycrc = crc32(0, (unsigned char*)myBuff, mylen); int ret = compressbuff(mystream, myzBuff, &myzbuffUsed, myBuff, mylen); if (ret) { failed=true; From bae11e3fa095d087195045a3c8e13c3d44ddc1a0 Mon Sep 17 00:00:00 2001 From: Philippe Chataignon Date: Fri, 23 Aug 2024 18:31:13 +0200 Subject: [PATCH 10/42] More explicit timing messages --- src/fwrite.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/fwrite.c b/src/fwrite.c index 5a61144bd..415d896f7 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -891,7 +891,7 @@ if (verbose) { } } if (verbose) - DTPRINT(_("done in %.3fs\n"), 1.0*(wallclock()-t0)); + DTPRINT(_("Initialization done in %.3fs\n"), 1.0*(wallclock()-t0)); if (args.nrow == 0) { if (verbose) DTPRINT(_("No data rows present (nrow==0)\n")); @@ -1056,6 +1056,9 @@ if (verbose) { STOP("Error: can't write gzip tailer"); } + if (verbose) + DTPRINT(_("\nAll done in %.3fs\n"), 1.0*(wallclock()-t0)); + free(buffPool); #ifndef NOZLIB free(thread_streams); From 5ce31f3564d917229849270689f10436fe124c71 Mon Sep 17 00:00:00 2001 From: Philippe Chataignon Date: Sat, 24 Aug 2024 11:46:40 +0200 Subject: [PATCH 11/42] Free stream structs --- src/fwrite.c | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/fwrite.c b/src/fwrite.c index 415d896f7..0ffdb8a07 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -790,7 +790,6 @@ if (verbose) { char *buffPool = malloc(alloc_size); if (!buffPool) { // # nocov start - free(thread_streams); STOP(_("Unable to allocate %zu MB * %d thread buffers; '%d: %s'. Please read ?fwrite for nThread, buffMB and verbose options."), (size_t)buffSize/(1 << 20), nth, errno, strerror(errno)); // # nocov end @@ -809,7 +808,6 @@ if (verbose) { if (!zbuffPool) { // # nocov start free(buffPool); - free(thread_streams); STOP(_("Unable to allocate %zu MB * %d thread compressed buffers; '%d: %s'. Please read ?fwrite for nThread, buffMB and verbose options."), (size_t)zbuffSize/(1024^2), nth, errno, strerror(errno)); // # nocov end @@ -915,13 +913,13 @@ if (verbose) { #pragma omp parallel for ordered num_threads(nth) schedule(dynamic) for(int64_t start=0; start < args.nrow; start += rowsPerBatch) { int me = omp_get_thread_num(); - size_t mylen = 0; - int mycrc = crc32(0L, Z_NULL, 0); int my_failed_compress = 0; char* myBuff = buffPool + me * buffSize; char* ch = myBuff; #ifndef NOZLIB + size_t mylen = 0; + int mycrc = 0; z_stream *mystream = &thread_streams[me]; void *myzBuff = NULL; size_t myzbuffUsed = 0; @@ -1038,6 +1036,12 @@ if (verbose) { // # nocov end } } + if (args.is_gzip) { +#ifndef NOZLIB + deflateEnd(mystream); +#endif + } + } // end of parallel for loop /* put a 4-byte integer into a byte array in LSB order */ From b07306f25d13c0491188168469e6a760bbd1ea00 Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger <52290390+ben-schwen@users.noreply.github.com> Date: Fri, 4 Nov 2022 00:59:40 +0100 Subject: [PATCH 12/42] Add option to control compression level for fwrite with gzip --- R/fwrite.R | 6 +++++- src/data.table.h | 2 +- src/fwrite.c | 6 ++++-- src/fwrite.h | 1 + src/fwriteR.c | 2 ++ 5 files changed, 13 insertions(+), 4 deletions(-) diff --git a/R/fwrite.R b/R/fwrite.R index ad92859f3..53c790fdd 100644 --- a/R/fwrite.R +++ b/R/fwrite.R @@ -10,6 +10,7 @@ fwrite = function(x, file="", append=FALSE, quote="auto", buffMB=8, nThread=getDTthreads(verbose), showProgress=getOption("datatable.showProgress", interactive()), compress = c("auto", "none", "gzip"), + gzip_ratio = 0:9, yaml = FALSE, bom = FALSE, verbose=getOption("datatable.verbose", FALSE), @@ -20,6 +21,7 @@ fwrite = function(x, file="", append=FALSE, quote="auto", } if (missing(qmethod)) qmethod = qmethod[1L] if (missing(compress)) compress = compress[1L] + if (missing(gzip_ratio)) gzip_ratio = gzip_ratio[1L] if (missing(dateTimeAs)) { dateTimeAs = dateTimeAs[1L] } else if (length(dateTimeAs)>1L) stopf("dateTimeAs must be a single string") dateTimeAs = chmatch(dateTimeAs, c("ISO","squash","epoch","write.csv"))-1L @@ -34,6 +36,7 @@ fwrite = function(x, file="", append=FALSE, quote="auto", scipen = if (is.numeric(scipen)) as.integer(scipen) else 0L buffMB = as.integer(buffMB) nThread = as.integer(nThread) + gzip_ratio = as.integer(gzip_ratio) # write.csv default is 'double' so fwrite follows suit. write.table's default is 'escape' # validate arguments if (is.matrix(x)) { # coerce to data.table if input object is matrix @@ -55,6 +58,7 @@ fwrite = function(x, file="", append=FALSE, quote="auto", is.character(eol) && length(eol)==1L, length(qmethod) == 1L && qmethod %chin% c("double", "escape"), length(compress) == 1L && compress %chin% c("auto", "none", "gzip"), + length(gzip_ratio) == 1L && 0L<=gzip_ratio && gzip_ratio<=9L, isTRUEorFALSE(col.names), isTRUEorFALSE(append), isTRUEorFALSE(row.names), isTRUEorFALSE(verbose), isTRUEorFALSE(showProgress), isTRUEorFALSE(logical01), isTRUEorFALSE(bom), @@ -117,7 +121,7 @@ fwrite = function(x, file="", append=FALSE, quote="auto", file = enc2native(file) # CfwriteR cannot handle UTF-8 if that is not the native encoding, see #3078. .Call(CfwriteR, x, file, sep, sep2, eol, na, dec, quote, qmethod=="escape", append, row.names, col.names, logical01, scipen, dateTimeAs, buffMB, nThread, - showProgress, is_gzip, bom, yaml, verbose, encoding) + showProgress, is_gzip, gzip_ratio, bom, yaml, verbose, encoding) invisible() } diff --git a/src/data.table.h b/src/data.table.h index 4d4bed225..b65cc93f9 100644 --- a/src/data.table.h +++ b/src/data.table.h @@ -297,7 +297,7 @@ SEXP chmatch_R(SEXP, SEXP, SEXP); SEXP chmatchdup_R(SEXP, SEXP, SEXP); SEXP chin_R(SEXP, SEXP); SEXP freadR(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); -SEXP fwriteR(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); +SEXP fwriteR(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); SEXP rbindlist(SEXP, SEXP, SEXP, SEXP, SEXP); SEXP setlistelt(SEXP, SEXP, SEXP); SEXP address(SEXP); diff --git a/src/fwrite.c b/src/fwrite.c index 0ffdb8a07..9b7b3e517 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -40,6 +40,7 @@ static bool qmethodEscape=false; // when quoting fields, how to escape dou static int scipen; static bool squashDateTime=false; // 0=ISO(yyyy-mm-dd) 1=squash(yyyymmdd) static bool verbose=false; +static int gzip_ratio; extern const char *getString(const void *, int64_t); extern int getStringLen(const void *, int64_t); @@ -562,8 +563,8 @@ int init_stream(z_stream *stream) { stream->zalloc = Z_NULL; stream->zfree = Z_NULL; stream->opaque = Z_NULL; - - int err = deflateInit2(stream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY); + int err = deflateInit2(stream, gzip_ratio==0 ? Z_DEFAULT_COMPRESSION : gzip_ratio, Z_DEFLATED, + -15, 8, Z_DEFAULT_STRATEGY); return err; // # nocov } @@ -595,6 +596,7 @@ void fwriteMain(fwriteMainArgs args) doQuote = args.doQuote; int8_t quoteHeaders = args.doQuote; verbose = args.verbose; + gzip_ratio = args.gzip_ratio; size_t len; unsigned int crc; diff --git a/src/fwrite.h b/src/fwrite.h index 24fabd858..008ed3c6f 100644 --- a/src/fwrite.h +++ b/src/fwrite.h @@ -111,6 +111,7 @@ typedef struct fwriteMainArgs int nth; bool showProgress; bool is_gzip; + int gzip_ratio; bool bom; const char *yaml; bool verbose; diff --git a/src/fwriteR.c b/src/fwriteR.c index 5a0ab2dc7..878aa12ee 100644 --- a/src/fwriteR.c +++ b/src/fwriteR.c @@ -167,6 +167,7 @@ SEXP fwriteR( SEXP nThread_Arg, SEXP showProgress_Arg, SEXP is_gzip_Arg, + SEXP gzip_ratio_Arg, SEXP bom_Arg, SEXP yaml_Arg, SEXP verbose_Arg, @@ -177,6 +178,7 @@ SEXP fwriteR( fwriteMainArgs args = {0}; // {0} to quieten valgrind's uninitialized, #4639 args.is_gzip = LOGICAL(is_gzip_Arg)[0]; + args.gzip_ratio = INTEGER(gzip_ratio_Arg)[0]; args.bom = LOGICAL(bom_Arg)[0]; args.yaml = CHAR(STRING_ELT(yaml_Arg, 0)); args.verbose = LOGICAL(verbose_Arg)[0]; From 87225190333668f3cded8396359cf67932f00ed7 Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger <52290390+ben-schwen@users.noreply.github.com> Date: Tue, 15 Nov 2022 11:58:38 +0000 Subject: [PATCH 13/42] Rework namings and default value --- R/fwrite.R | 10 +++++----- src/fwrite.c | 6 +++--- src/fwrite.h | 2 +- src/fwriteR.c | 4 ++-- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/R/fwrite.R b/R/fwrite.R index 53c790fdd..04929b760 100644 --- a/R/fwrite.R +++ b/R/fwrite.R @@ -10,7 +10,7 @@ fwrite = function(x, file="", append=FALSE, quote="auto", buffMB=8, nThread=getDTthreads(verbose), showProgress=getOption("datatable.showProgress", interactive()), compress = c("auto", "none", "gzip"), - gzip_ratio = 0:9, + gzipLevel = -1:9, yaml = FALSE, bom = FALSE, verbose=getOption("datatable.verbose", FALSE), @@ -21,7 +21,7 @@ fwrite = function(x, file="", append=FALSE, quote="auto", } if (missing(qmethod)) qmethod = qmethod[1L] if (missing(compress)) compress = compress[1L] - if (missing(gzip_ratio)) gzip_ratio = gzip_ratio[1L] + if (missing(gzipLevel)) gzipLevel = gzipLevel[1L] if (missing(dateTimeAs)) { dateTimeAs = dateTimeAs[1L] } else if (length(dateTimeAs)>1L) stopf("dateTimeAs must be a single string") dateTimeAs = chmatch(dateTimeAs, c("ISO","squash","epoch","write.csv"))-1L @@ -36,7 +36,7 @@ fwrite = function(x, file="", append=FALSE, quote="auto", scipen = if (is.numeric(scipen)) as.integer(scipen) else 0L buffMB = as.integer(buffMB) nThread = as.integer(nThread) - gzip_ratio = as.integer(gzip_ratio) + gzipLevel = as.integer(gzipLevel) # write.csv default is 'double' so fwrite follows suit. write.table's default is 'escape' # validate arguments if (is.matrix(x)) { # coerce to data.table if input object is matrix @@ -58,7 +58,7 @@ fwrite = function(x, file="", append=FALSE, quote="auto", is.character(eol) && length(eol)==1L, length(qmethod) == 1L && qmethod %chin% c("double", "escape"), length(compress) == 1L && compress %chin% c("auto", "none", "gzip"), - length(gzip_ratio) == 1L && 0L<=gzip_ratio && gzip_ratio<=9L, + length(gzipLevel) == 1L && -1L<=gzipLevel && gzipLevel<=9L, isTRUEorFALSE(col.names), isTRUEorFALSE(append), isTRUEorFALSE(row.names), isTRUEorFALSE(verbose), isTRUEorFALSE(showProgress), isTRUEorFALSE(logical01), isTRUEorFALSE(bom), @@ -121,7 +121,7 @@ fwrite = function(x, file="", append=FALSE, quote="auto", file = enc2native(file) # CfwriteR cannot handle UTF-8 if that is not the native encoding, see #3078. .Call(CfwriteR, x, file, sep, sep2, eol, na, dec, quote, qmethod=="escape", append, row.names, col.names, logical01, scipen, dateTimeAs, buffMB, nThread, - showProgress, is_gzip, gzip_ratio, bom, yaml, verbose, encoding) + showProgress, is_gzip, gzipLevel, bom, yaml, verbose, encoding) invisible() } diff --git a/src/fwrite.c b/src/fwrite.c index 9b7b3e517..c91936c24 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -40,7 +40,7 @@ static bool qmethodEscape=false; // when quoting fields, how to escape dou static int scipen; static bool squashDateTime=false; // 0=ISO(yyyy-mm-dd) 1=squash(yyyymmdd) static bool verbose=false; -static int gzip_ratio; +static int gzip_level; extern const char *getString(const void *, int64_t); extern int getStringLen(const void *, int64_t); @@ -563,7 +563,7 @@ int init_stream(z_stream *stream) { stream->zalloc = Z_NULL; stream->zfree = Z_NULL; stream->opaque = Z_NULL; - int err = deflateInit2(stream, gzip_ratio==0 ? Z_DEFAULT_COMPRESSION : gzip_ratio, Z_DEFLATED, + int err = deflateInit2(stream, gzip_level==0 ? Z_DEFAULT_COMPRESSION : gzip_level, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY); return err; // # nocov } @@ -596,7 +596,7 @@ void fwriteMain(fwriteMainArgs args) doQuote = args.doQuote; int8_t quoteHeaders = args.doQuote; verbose = args.verbose; - gzip_ratio = args.gzip_ratio; + gzip_level = args.gzip_level; size_t len; unsigned int crc; diff --git a/src/fwrite.h b/src/fwrite.h index 008ed3c6f..b50666fab 100644 --- a/src/fwrite.h +++ b/src/fwrite.h @@ -111,7 +111,7 @@ typedef struct fwriteMainArgs int nth; bool showProgress; bool is_gzip; - int gzip_ratio; + int gzip_level; bool bom; const char *yaml; bool verbose; diff --git a/src/fwriteR.c b/src/fwriteR.c index 878aa12ee..9ee67f2df 100644 --- a/src/fwriteR.c +++ b/src/fwriteR.c @@ -167,7 +167,7 @@ SEXP fwriteR( SEXP nThread_Arg, SEXP showProgress_Arg, SEXP is_gzip_Arg, - SEXP gzip_ratio_Arg, + SEXP gzip_level_Arg, SEXP bom_Arg, SEXP yaml_Arg, SEXP verbose_Arg, @@ -178,7 +178,7 @@ SEXP fwriteR( fwriteMainArgs args = {0}; // {0} to quieten valgrind's uninitialized, #4639 args.is_gzip = LOGICAL(is_gzip_Arg)[0]; - args.gzip_ratio = INTEGER(gzip_ratio_Arg)[0]; + args.gzip_level = INTEGER(gzip_level_Arg)[0]; args.bom = LOGICAL(bom_Arg)[0]; args.yaml = CHAR(STRING_ELT(yaml_Arg, 0)); args.verbose = LOGICAL(verbose_Arg)[0]; From 0800f10c145b6e43a4bb2d966d6d2d158b6780b6 Mon Sep 17 00:00:00 2001 From: Philippe Chataignon Date: Sat, 24 Aug 2024 11:47:58 +0200 Subject: [PATCH 14/42] Rename gzipLevel to compressLevel --- R/fwrite.R | 15 ++++++++------- src/fwrite.c | 3 +-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/R/fwrite.R b/R/fwrite.R index 04929b760..d06da559e 100644 --- a/R/fwrite.R +++ b/R/fwrite.R @@ -10,7 +10,7 @@ fwrite = function(x, file="", append=FALSE, quote="auto", buffMB=8, nThread=getDTthreads(verbose), showProgress=getOption("datatable.showProgress", interactive()), compress = c("auto", "none", "gzip"), - gzipLevel = -1:9, + compressLevel = 6L, yaml = FALSE, bom = FALSE, verbose=getOption("datatable.verbose", FALSE), @@ -21,7 +21,7 @@ fwrite = function(x, file="", append=FALSE, quote="auto", } if (missing(qmethod)) qmethod = qmethod[1L] if (missing(compress)) compress = compress[1L] - if (missing(gzipLevel)) gzipLevel = gzipLevel[1L] + if (missing(compressLevel)) compressLevel = 6L if (missing(dateTimeAs)) { dateTimeAs = dateTimeAs[1L] } else if (length(dateTimeAs)>1L) stopf("dateTimeAs must be a single string") dateTimeAs = chmatch(dateTimeAs, c("ISO","squash","epoch","write.csv"))-1L @@ -36,7 +36,7 @@ fwrite = function(x, file="", append=FALSE, quote="auto", scipen = if (is.numeric(scipen)) as.integer(scipen) else 0L buffMB = as.integer(buffMB) nThread = as.integer(nThread) - gzipLevel = as.integer(gzipLevel) + compressLevel = as.integer(compressLevel) # write.csv default is 'double' so fwrite follows suit. write.table's default is 'escape' # validate arguments if (is.matrix(x)) { # coerce to data.table if input object is matrix @@ -49,7 +49,8 @@ fwrite = function(x, file="", append=FALSE, quote="auto", x = as.data.table(x) } } - stopifnot(is.list(x), + stopifnot( + is.list(x), identical(quote,"auto") || isTRUEorFALSE(quote), is.character(sep) && length(sep)==1L && (nchar(sep) == 1L || identical(sep, "")), is.character(sep2) && length(sep2)==3L && nchar(sep2[2L])==1L, @@ -58,7 +59,7 @@ fwrite = function(x, file="", append=FALSE, quote="auto", is.character(eol) && length(eol)==1L, length(qmethod) == 1L && qmethod %chin% c("double", "escape"), length(compress) == 1L && compress %chin% c("auto", "none", "gzip"), - length(gzipLevel) == 1L && -1L<=gzipLevel && gzipLevel<=9L, + length(compressLevel) == 1L && 0 <= compressLevel && compressLevel <= 9, isTRUEorFALSE(col.names), isTRUEorFALSE(append), isTRUEorFALSE(row.names), isTRUEorFALSE(verbose), isTRUEorFALSE(showProgress), isTRUEorFALSE(logical01), isTRUEorFALSE(bom), @@ -66,7 +67,7 @@ fwrite = function(x, file="", append=FALSE, quote="auto", is.character(file) && length(file)==1L && !is.na(file), length(buffMB)==1L && !is.na(buffMB) && 1L<=buffMB && buffMB<=1024L, length(nThread)==1L && !is.na(nThread) && nThread>=1L - ) + ) is_gzip = compress == "gzip" || (compress == "auto" && endsWithAny(file, ".gz")) @@ -121,7 +122,7 @@ fwrite = function(x, file="", append=FALSE, quote="auto", file = enc2native(file) # CfwriteR cannot handle UTF-8 if that is not the native encoding, see #3078. .Call(CfwriteR, x, file, sep, sep2, eol, na, dec, quote, qmethod=="escape", append, row.names, col.names, logical01, scipen, dateTimeAs, buffMB, nThread, - showProgress, is_gzip, gzipLevel, bom, yaml, verbose, encoding) + showProgress, is_gzip, compressLevel, bom, yaml, verbose, encoding) invisible() } diff --git a/src/fwrite.c b/src/fwrite.c index c91936c24..2e3c0155c 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -563,8 +563,7 @@ int init_stream(z_stream *stream) { stream->zalloc = Z_NULL; stream->zfree = Z_NULL; stream->opaque = Z_NULL; - int err = deflateInit2(stream, gzip_level==0 ? Z_DEFAULT_COMPRESSION : gzip_level, Z_DEFLATED, - -15, 8, Z_DEFAULT_STRATEGY); + int err = deflateInit2(stream, gzip_level, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY); return err; // # nocov } From a0a9c1fa1a8d1487d06e863cbbb0661802a4a06e Mon Sep 17 00:00:00 2001 From: Philippe Chataignon Date: Sat, 24 Aug 2024 12:04:16 +0200 Subject: [PATCH 15/42] compressLevel param documentation --- man/fwrite.Rd | 2 ++ 1 file changed, 2 insertions(+) diff --git a/man/fwrite.Rd b/man/fwrite.Rd index efa830fb9..549a69a83 100644 --- a/man/fwrite.Rd +++ b/man/fwrite.Rd @@ -18,6 +18,7 @@ fwrite(x, file = "", append = FALSE, quote = "auto", buffMB = 8L, nThread = getDTthreads(verbose), showProgress = getOption("datatable.showProgress", interactive()), compress = c("auto", "none", "gzip"), + compressLevel = 6L, yaml = FALSE, bom = FALSE, verbose = getOption("datatable.verbose", FALSE), @@ -58,6 +59,7 @@ fwrite(x, file = "", append = FALSE, quote = "auto", \item{nThread}{The number of threads to use. Experiment to see what works best for your data on your hardware.} \item{showProgress}{ Display a progress meter on the console? Ignored when \code{file==""}. } \item{compress}{If \code{compress = "auto"} and if \code{file} ends in \code{.gz} then output format is gzipped csv else csv. If \code{compress = "none"}, output format is always csv. If \code{compress = "gzip"} then format is gzipped csv. Output to the console is never gzipped even if \code{compress = "gzip"}. By default, \code{compress = "auto"}.} + \item{compressLevel}{Level of compression between 1 (fastest method but less compression) and 9 (slowest compression method). The default compression level is 6 which is a good compromise.} \item{yaml}{If \code{TRUE}, \code{fwrite} will output a CSVY file, that is, a CSV file with metadata stored as a YAML header, using \code{\link[yaml]{as.yaml}}. See \code{Details}. } \item{bom}{If \code{TRUE} a BOM (Byte Order Mark) sequence (EF BB BF) is added at the beginning of the file; format 'UTF-8 with BOM'.} \item{verbose}{Be chatty and report timings?} From a00761d5d0f545206f3225421e35ea739ad880fb Mon Sep 17 00:00:00 2001 From: Philippe Chataignon Date: Sun, 25 Aug 2024 11:26:06 +0200 Subject: [PATCH 16/42] Put zlib initialization together --- configure | 2 +- src/fwrite.c | 53 ++++++++++++++++++++++++---------------------------- 2 files changed, 25 insertions(+), 30 deletions(-) diff --git a/configure b/configure index 853c0d5fa..604ea960b 100755 --- a/configure +++ b/configure @@ -31,7 +31,7 @@ else echo "*** pkg-config is installed but 'pkg-config --exists zlib' did not return 0." msg=1 else - NOZLIB=0 + NOZLIB=1 lib=`pkg-config --libs zlib` cflag=`pkg-config --cflags zlib` expr -- "$lib" : ".*-lz$" >> config.log # -- for FreeBSD, #4652 diff --git a/src/fwrite.c b/src/fwrite.c index 2e3c0155c..5b926e008 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -727,38 +727,12 @@ void fwriteMain(fwriteMainArgs args) // Create heap zones ---- int nth = args.nth; - // alloc zlib streams -#ifndef NOZLIB - z_stream *thread_streams = (z_stream*) malloc(nth * sizeof(z_stream)); -if (verbose) { - DTPRINT(_("Allocate %ld bytes for thread_streams\n"), nth * sizeof(z_stream)); -} - if (!thread_streams) - STOP(_("Failed to allocated %d bytes for threads_streams."), (int)(nth * sizeof(z_stream))); - // VLA on stack should be fine for nth structs; in zlib v1.2.11 sizeof(struct)==112 on 64bit - // not declared inside the parallel region because solaris appears to move the struct in - // memory when the #pragma omp for is entered, which causes zlib's internal self reference - // pointer to mismatch, #4099 -#endif // Get buffSize if (args.buffMB<1 || args.buffMB>1024) STOP(_("buffMB=%d outside [1,1024]"), args.buffMB); size_t buffSize = (size_t) (1 << 20) * args.buffMB; - // compute zbuffSize which is the same for each thread - size_t zbuffSize = 0; - if(args.is_gzip){ -#ifndef NOZLIB - z_stream *stream = thread_streams; - if (init_stream(stream) != Z_OK) - STOP(_("Can't init stream structure for deflateBound")); - zbuffSize = deflateBound(stream, buffSize); - if (verbose) - DTPRINT(_("zbuffSize=%d returned from deflateBound\n"), (int)zbuffSize); -#endif - } - // // Decide buffer size and rowsPerBatch for each thread // Once rowsPerBatch is decided it can't be changed int rowsPerBatch=0; @@ -796,10 +770,31 @@ if (verbose) { // # nocov end } - // alloc nth zlib buffers +#ifndef NOZLIB + z_stream *thread_streams = (z_stream*) malloc(nth * sizeof(z_stream)); char *zbuffPool = NULL; + size_t zbuffSize = 0; if (args.is_gzip) { -#ifndef NOZLIB + // alloc zlib streams + if (verbose) { + DTPRINT(_("Allocate %ld bytes for thread_streams\n"), nth * sizeof(z_stream)); + } + if (!thread_streams) + STOP(_("Failed to allocated %d bytes for threads_streams."), (int)(nth * sizeof(z_stream))); + // VLA on stack should be fine for nth structs; in zlib v1.2.11 sizeof(struct)==112 on 64bit + // not declared inside the parallel region because solaris appears to move the struct in + // memory when the #pragma omp for is entered, which causes zlib's internal self reference + // pointer to mismatch, #4099 + + // compute zbuffSize which is the same for each thread + z_stream *stream = thread_streams; + if (init_stream(stream) != Z_OK) + STOP(_("Can't init stream structure for deflateBound")); + zbuffSize = deflateBound(stream, buffSize); + if (verbose) + DTPRINT(_("zbuffSize=%d returned from deflateBound\n"), (int)zbuffSize); + + // alloc nth zlib buffers // if headerLen > nth * zbuffSize (long variable names and 1 thread), alloc headerLen alloc_size = nth * zbuffSize < headerLen ? headerLen : nth * zbuffSize; if (verbose) { @@ -813,8 +808,8 @@ if (verbose) { (size_t)zbuffSize/(1024^2), nth, errno, strerror(errno)); // # nocov end } -#endif } +#endif // write header From aaf85ab91af9499844bdca29e0daf375221bc00d Mon Sep 17 00:00:00 2001 From: Philippe Chataignon Date: Sun, 25 Aug 2024 12:08:30 +0200 Subject: [PATCH 17/42] Refact buffSize, numBatchs and numBatches --- configure | 2 +- src/fwrite.c | 59 ++++++++++++++++++++++++++++------------------------ 2 files changed, 33 insertions(+), 28 deletions(-) diff --git a/configure b/configure index 604ea960b..853c0d5fa 100755 --- a/configure +++ b/configure @@ -31,7 +31,7 @@ else echo "*** pkg-config is installed but 'pkg-config --exists zlib' did not return 0." msg=1 else - NOZLIB=1 + NOZLIB=0 lib=`pkg-config --libs zlib` cflag=`pkg-config --cflags zlib` expr -- "$lib" : ".*-lz$" >> config.log # -- for FreeBSD, #4652 diff --git a/src/fwrite.c b/src/fwrite.c index 5b926e008..a4e748459 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -26,6 +26,7 @@ #include "fwriteLookups.h" #include "fwrite.h" +#define MEGA (1 << 20) #define NUM_SF 15 #define SIZE_SF 1000000000000000ULL // 10^NUM_SF @@ -648,8 +649,8 @@ void fwriteMain(fwriteMainArgs args) double t0 = wallclock(); size_t maxLineLen = eolLen + args.ncol*(2*(doQuote!=0) + sepLen); if (args.doRowNames) { - maxLineLen += args.rowNames==NULL ? 1+(int)log10(args.nrow) // the width of the row number - : (args.rowNameFun==WF_String ? getMaxStringLen(args.rowNames, args.nrow)*2 // *2 in case longest row name is all quotes (!) and all get escaped + maxLineLen += args.rowNames==NULL ? 1 + (int) log10(args.nrow) // the width of the row number + : (args.rowNameFun==WF_String ? getMaxStringLen(args.rowNames, args.nrow) * 2 // *2 in case longest row name is all quotes (!) and all get escaped : 11); // specific integer names could be MAX_INT 2147483647 (10 chars) even on a 5 row table, and data.frame allows negative integer rownames hence 11 for the sign maxLineLen += 2/*possible quotes*/ + sepLen; } @@ -712,7 +713,7 @@ void fwriteMain(fwriteMainArgs args) DTPRINT(_("\n")); } - // Calc headerLen + // Calc headerLen size_t headerLen = 0; if (args.bom) @@ -728,45 +729,49 @@ void fwriteMain(fwriteMainArgs args) int nth = args.nth; - // Get buffSize - if (args.buffMB<1 || args.buffMB>1024) - STOP(_("buffMB=%d outside [1,1024]"), args.buffMB); - size_t buffSize = (size_t) (1 << 20) * args.buffMB; + // Calc buffSize + if (args.buffMB < 1 || args.buffMB > 1024) + STOP(_("buffMB = %d is outside range 1..1024, exiting"), args.buffMB); + size_t buffSize = args.buffMB * MEGA; // Decide buffer size and rowsPerBatch for each thread // Once rowsPerBatch is decided it can't be changed - int rowsPerBatch=0; - if (2 * maxLineLen > buffSize) { - buffSize = 2 * maxLineLen; - rowsPerBatch=2; + + // if maxLineLen is greater then buffize, increase it + if (buffSize < maxLineLen) { + buffSize = maxLineLen; + } + if (nth * buffSize < headerLen) { + buffSize = headerLen / nth + 1; } - else - rowsPerBatch = buffSize / maxLineLen; - if (rowsPerBatch > args.nrow) + + int rowsPerBatch = buffSize / maxLineLen; + int numBatches = (args.nrow - 1) / rowsPerBatch + 1; + + if (args.nrow < rowsPerBatch) { rowsPerBatch = args.nrow; - if (rowsPerBatch < 1) - rowsPerBatch = 1; - int numBatches = (args.nrow-1) / rowsPerBatch + 1; + numBatches = 1; + } + if (numBatches < nth) nth = numBatches; + if (verbose) { - DTPRINT(_("Writing %"PRId64" rows in %d batches of %d rows (each buffer size %dMB, showProgress=%d, nth=%d)\n"), - args.nrow, numBatches, rowsPerBatch, args.buffMB, args.showProgress, nth); + DTPRINT(_("Writing %"PRId64" rows in %d batches of %d rows, each buffer size %ld bytes (%zu MiB), showProgress=%d, nth=%d\n"), + args.nrow, numBatches, rowsPerBatch, buffSize, buffSize / MEGA, args.showProgress, nth); } // alloc nth write buffers errno=0; - size_t alloc_size; - // if headerLen > nth * buffSize (long variable names and 1 thread), alloc headerLen - alloc_size = nth * buffSize < headerLen ? headerLen : nth * buffSize; + size_t alloc_size = nth * buffSize; if (verbose) { - DTPRINT(_("Allocate %ld bytes for buffPool\n"), alloc_size); + DTPRINT(_("Allocate %ld bytes (%zu MiB) for buffPool\n"), alloc_size, alloc_size / MEGA); } char *buffPool = malloc(alloc_size); if (!buffPool) { // # nocov start STOP(_("Unable to allocate %zu MB * %d thread buffers; '%d: %s'. Please read ?fwrite for nThread, buffMB and verbose options."), - (size_t)buffSize/(1 << 20), nth, errno, strerror(errno)); + buffSize / MEGA, nth, errno, strerror(errno)); // # nocov end } @@ -798,14 +803,14 @@ void fwriteMain(fwriteMainArgs args) // if headerLen > nth * zbuffSize (long variable names and 1 thread), alloc headerLen alloc_size = nth * zbuffSize < headerLen ? headerLen : nth * zbuffSize; if (verbose) { - DTPRINT(_("Allocate %ld bytes for zbuffPool\n"), alloc_size); + DTPRINT(_("Allocate %ld bytes (%zu MiB) for zbuffPool\n"), alloc_size, alloc_size / MEGA); } zbuffPool = malloc(alloc_size); if (!zbuffPool) { // # nocov start free(buffPool); - STOP(_("Unable to allocate %zu MB * %d thread compressed buffers; '%d: %s'. Please read ?fwrite for nThread, buffMB and verbose options."), - (size_t)zbuffSize/(1024^2), nth, errno, strerror(errno)); + STOP(_("Unable to allocate %zu MiB * %d thread compressed buffers; '%d: %s'. Please read ?fwrite for nThread, buffMB and verbose options."), + zbuffSize / MEGA, nth, errno, strerror(errno)); // # nocov end } } From 478b8628b77c721eddb7bdc37305b1faf4d83ab7 Mon Sep 17 00:00:00 2001 From: Philippe Chataignon Date: Sun, 25 Aug 2024 14:48:34 +0200 Subject: [PATCH 18/42] Add missing NOZLIB --- src/fwrite.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/fwrite.c b/src/fwrite.c index a4e748459..805a60314 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -1050,6 +1050,7 @@ void fwriteMain(fwriteMainArgs args) // write gzip tailer with crc and len if (args.is_gzip) { +#ifndef NOZLIB // DTPRINT(_("crc=%x len=%ld\n", crc, len)); unsigned char tail[10]; tail[0] = 3; @@ -1059,6 +1060,7 @@ void fwriteMain(fwriteMainArgs args) int ret = WRITE(f, tail, 10); if (ret == -1) STOP("Error: can't write gzip tailer"); +#endif } if (verbose) From b09aa342a18e5d9ccd53eeeba9a7e765e0b99d8b Mon Sep 17 00:00:00 2001 From: Philippe Chataignon Date: Sun, 25 Aug 2024 15:28:06 +0200 Subject: [PATCH 19/42] Increase outputs in last message when verbose --- src/fwrite.c | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/src/fwrite.c b/src/fwrite.c index 805a60314..4cb53c460 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -775,10 +775,12 @@ void fwriteMain(fwriteMainArgs args) // # nocov end } + // init compress variables #ifndef NOZLIB z_stream *thread_streams = (z_stream*) malloc(nth * sizeof(z_stream)); char *zbuffPool = NULL; size_t zbuffSize = 0; + size_t compress_len = 0; if (args.is_gzip) { // alloc zlib streams if (verbose) { @@ -866,14 +868,17 @@ void fwriteMain(fwriteMainArgs args) // write minimal gzip header char* header = "\037\213\10\0\0\0\0\0\0\3"; ret0 = WRITE(f, header, 10); + compress_len += 10; crc = crc32(0L, Z_NULL, 0); size_t zbuffUsed = zbuffSize; len = (size_t)(ch - buff); crc = crc32(crc, (unsigned char*)buff, len); ret1 = compressbuff(stream, zbuff, &zbuffUsed, buff, len); - if (ret1==Z_OK) + if (ret1==Z_OK) { ret2 = WRITE(f, zbuff, (int)zbuffUsed); + compress_len += zbuffUsed; + } #endif } else { ret2 = WRITE(f, buff, (int)(ch-buff)); @@ -996,6 +1001,7 @@ void fwriteMain(fwriteMainArgs args) } else if (args.is_gzip) { #ifndef NOZLIB ret = WRITE(f, myzBuff, (int)myzbuffUsed); + compress_len += myzbuffUsed; #endif } else { ret = WRITE(f, myBuff, (int)(ch-myBuff)); @@ -1051,21 +1057,18 @@ void fwriteMain(fwriteMainArgs args) // write gzip tailer with crc and len if (args.is_gzip) { #ifndef NOZLIB - // DTPRINT(_("crc=%x len=%ld\n", crc, len)); unsigned char tail[10]; tail[0] = 3; tail[1] = 0; PUT4(tail + 2, crc); PUT4(tail + 6, len); int ret = WRITE(f, tail, 10); + compress_len += 10; if (ret == -1) STOP("Error: can't write gzip tailer"); #endif } - if (verbose) - DTPRINT(_("\nAll done in %.3fs\n"), 1.0*(wallclock()-t0)); - free(buffPool); #ifndef NOZLIB free(thread_streams); @@ -1077,13 +1080,25 @@ void fwriteMain(fwriteMainArgs args) // # nocov start if (!failed) { // clear the progress meter DTPRINT("\r " - " \r"); + " \r\n"); } else { // don't clear any potentially helpful output before error DTPRINT("\n"); } // # nocov end } + if (verbose) { + if (args.is_gzip) { +#ifndef NOZLIB + DTPRINT("zlib: uncompressed length=%lu (%zu MiB), compressed length=%lu (%zu MiB), ratio=%.1f%%, crc=%x\n", + len, len / MEGA, compress_len, compress_len / MEGA, len != 0 ? (100.0 * compress_len) / len : 0, crc); +#endif + DTPRINT("Written %lu rows in %.3f secs using %d thread%s. MaxBuffUsed=%d%%\n", + args.nrow, 1.0*(wallclock()-t0), nth, nth ==1 ? "" : "s", maxBuffUsedPC); + } + } + + if (f != -1 && CLOSE(f) && !failed) STOP("%s: '%s'", strerror(errno), args.filename); // # nocov // quoted '%s' in case of trailing spaces in the filename From 669eb01058197916ccbeb2d6507338aaf8331b5b Mon Sep 17 00:00:00 2001 From: Philippe Chataignon Date: Sun, 25 Aug 2024 18:54:57 +0200 Subject: [PATCH 20/42] No real init for stream_thread when is_gzip false --- src/fwrite.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/fwrite.c b/src/fwrite.c index 4cb53c460..51355a667 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -777,12 +777,13 @@ void fwriteMain(fwriteMainArgs args) // init compress variables #ifndef NOZLIB - z_stream *thread_streams = (z_stream*) malloc(nth * sizeof(z_stream)); + z_stream *thread_streams = NULL; char *zbuffPool = NULL; size_t zbuffSize = 0; size_t compress_len = 0; if (args.is_gzip) { // alloc zlib streams + thread_streams = (z_stream*) malloc(nth * sizeof(z_stream)); if (verbose) { DTPRINT(_("Allocate %ld bytes for thread_streams\n"), nth * sizeof(z_stream)); } From 56684311188d9475b7f1abd8772ec9f95ae20885 Mon Sep 17 00:00:00 2001 From: Philippe Chataignon Date: Sun, 25 Aug 2024 18:56:38 +0200 Subject: [PATCH 21/42] Minor corrections --- src/fwrite.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/fwrite.c b/src/fwrite.c index 51355a667..397a6f18c 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -737,16 +737,17 @@ void fwriteMain(fwriteMainArgs args) // Decide buffer size and rowsPerBatch for each thread // Once rowsPerBatch is decided it can't be changed - // if maxLineLen is greater then buffize, increase it + // if maxLineLen is greater than buffize, increase buffSize if (buffSize < maxLineLen) { buffSize = maxLineLen; } + // ensure buffer can take header line if (nth * buffSize < headerLen) { buffSize = headerLen / nth + 1; } int rowsPerBatch = buffSize / maxLineLen; - int numBatches = (args.nrow - 1) / rowsPerBatch + 1; + int numBatches = args.nrow / rowsPerBatch + 1; if (args.nrow < rowsPerBatch) { rowsPerBatch = args.nrow; @@ -757,7 +758,7 @@ void fwriteMain(fwriteMainArgs args) nth = numBatches; if (verbose) { - DTPRINT(_("Writing %"PRId64" rows in %d batches of %d rows, each buffer size %ld bytes (%zu MiB), showProgress=%d, nth=%d\n"), + DTPRINT(_("Writing %"PRId64" rows in %d batches of %d rows, each buffer size %zu bytes (%zu MiB), showProgress=%d, nth=%d\n"), args.nrow, numBatches, rowsPerBatch, buffSize, buffSize / MEGA, args.showProgress, nth); } From e9e2e83ff8c79d3771e41cbc3a8ac578fbbaa5a0 Mon Sep 17 00:00:00 2001 From: Philippe Chataignon Date: Sun, 25 Aug 2024 19:42:59 +0200 Subject: [PATCH 22/42] Uses %zu format for size_t --- src/fwrite.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/fwrite.c b/src/fwrite.c index 397a6f18c..34a83874e 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -766,7 +766,7 @@ void fwriteMain(fwriteMainArgs args) errno=0; size_t alloc_size = nth * buffSize; if (verbose) { - DTPRINT(_("Allocate %ld bytes (%zu MiB) for buffPool\n"), alloc_size, alloc_size / MEGA); + DTPRINT(_("Allocate %zu bytes (%zu MiB) for buffPool\n"), alloc_size, alloc_size / MEGA); } char *buffPool = malloc(alloc_size); if (!buffPool) { @@ -786,7 +786,7 @@ void fwriteMain(fwriteMainArgs args) // alloc zlib streams thread_streams = (z_stream*) malloc(nth * sizeof(z_stream)); if (verbose) { - DTPRINT(_("Allocate %ld bytes for thread_streams\n"), nth * sizeof(z_stream)); + DTPRINT(_("Allocate %zu bytes for thread_streams\n"), nth * sizeof(z_stream)); } if (!thread_streams) STOP(_("Failed to allocated %d bytes for threads_streams."), (int)(nth * sizeof(z_stream))); @@ -807,7 +807,7 @@ void fwriteMain(fwriteMainArgs args) // if headerLen > nth * zbuffSize (long variable names and 1 thread), alloc headerLen alloc_size = nth * zbuffSize < headerLen ? headerLen : nth * zbuffSize; if (verbose) { - DTPRINT(_("Allocate %ld bytes (%zu MiB) for zbuffPool\n"), alloc_size, alloc_size / MEGA); + DTPRINT(_("Allocate %zu bytes (%zu MiB) for zbuffPool\n"), alloc_size, alloc_size / MEGA); } zbuffPool = malloc(alloc_size); if (!zbuffPool) { @@ -1092,10 +1092,10 @@ void fwriteMain(fwriteMainArgs args) if (verbose) { if (args.is_gzip) { #ifndef NOZLIB - DTPRINT("zlib: uncompressed length=%lu (%zu MiB), compressed length=%lu (%zu MiB), ratio=%.1f%%, crc=%x\n", + DTPRINT("zlib: uncompressed length=%zu (%zu MiB), compressed length=%zu (%zu MiB), ratio=%.1f%%, crc=%x\n", len, len / MEGA, compress_len, compress_len / MEGA, len != 0 ? (100.0 * compress_len) / len : 0, crc); #endif - DTPRINT("Written %lu rows in %.3f secs using %d thread%s. MaxBuffUsed=%d%%\n", + DTPRINT("Written %"PRId64" rows in %.3f secs using %d thread%s. MaxBuffUsed=%d%%\n", args.nrow, 1.0*(wallclock()-t0), nth, nth ==1 ? "" : "s", maxBuffUsedPC); } } From 7fb8738c168f511f3a4d00f4a9af589e980e7c0b Mon Sep 17 00:00:00 2001 From: Philippe Chataignon Date: Sun, 25 Aug 2024 20:01:59 +0200 Subject: [PATCH 23/42] Last verbose msg was not printed when not is_gzip --- src/fwrite.c | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/fwrite.c b/src/fwrite.c index 34a83874e..056fcb383 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -647,7 +647,7 @@ void fwriteMain(fwriteMainArgs args) // could be console output) and writing column names to it. double t0 = wallclock(); - size_t maxLineLen = eolLen + args.ncol*(2*(doQuote!=0) + sepLen); + size_t maxLineLen = eolLen + args.ncol * (2 * (doQuote!=0) + sepLen); if (args.doRowNames) { maxLineLen += args.rowNames==NULL ? 1 + (int) log10(args.nrow) // the width of the row number : (args.rowNameFun==WF_String ? getMaxStringLen(args.rowNames, args.nrow) * 2 // *2 in case longest row name is all quotes (!) and all get escaped @@ -671,17 +671,17 @@ void fwriteMain(fwriteMainArgs args) STOP(_("Internal error: type %d has no max length method implemented"), args.whichFun[j]); // # nocov } } - if (args.whichFun[j]==WF_Float64 && args.scipen>0) + if (args.whichFun[j] == WF_Float64 && args.scipen > 0) // clamp width to IEEE754 max to avoid scipen=99999 allocating buffer larger than can ever be written - width += (args.scipen < 350 ? args.scipen : 350 ); - if (width> column name) + headerLen += 2 * getStringLen(args.colNames, j); // * 2 in case quotes are escaped or doubled + headerLen += args.ncol * (sepLen + 2 * (doQuote != 0)) + eolLen + 3; // 3 in case doRowNames and doQuote (the first blank <<"",>> column name) } // Create heap zones ---- @@ -747,13 +747,16 @@ void fwriteMain(fwriteMainArgs args) } int rowsPerBatch = buffSize / maxLineLen; + // + 1 because of the last incomplete loop int numBatches = args.nrow / rowsPerBatch + 1; + // force 1 batch, and then 1 thread, if number of lines in table < rowsPerBatch if (args.nrow < rowsPerBatch) { rowsPerBatch = args.nrow; numBatches = 1; } + // avoid useless threads, and then too big memory allocations if (numBatches < nth) nth = numBatches; @@ -1095,9 +1098,9 @@ void fwriteMain(fwriteMainArgs args) DTPRINT("zlib: uncompressed length=%zu (%zu MiB), compressed length=%zu (%zu MiB), ratio=%.1f%%, crc=%x\n", len, len / MEGA, compress_len, compress_len / MEGA, len != 0 ? (100.0 * compress_len) / len : 0, crc); #endif + } DTPRINT("Written %"PRId64" rows in %.3f secs using %d thread%s. MaxBuffUsed=%d%%\n", args.nrow, 1.0*(wallclock()-t0), nth, nth ==1 ? "" : "s", maxBuffUsedPC); - } } From 024a34d9fd69846aa6ad1471196df9c17a855497 Mon Sep 17 00:00:00 2001 From: Michael Chirico Date: Thu, 29 Aug 2024 21:52:17 -0700 Subject: [PATCH 24/42] minor operator ws change --- src/fwrite.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/fwrite.c b/src/fwrite.c index 056fcb383..79c34109e 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -647,9 +647,9 @@ void fwriteMain(fwriteMainArgs args) // could be console output) and writing column names to it. double t0 = wallclock(); - size_t maxLineLen = eolLen + args.ncol * (2 * (doQuote!=0) + sepLen); + size_t maxLineLen = eolLen + args.ncol * (2*(doQuote!=0) + sepLen); if (args.doRowNames) { - maxLineLen += args.rowNames==NULL ? 1 + (int) log10(args.nrow) // the width of the row number + maxLineLen += args.rowNames==NULL ? 1 + (int)log10(args.nrow) // the width of the row number : (args.rowNameFun==WF_String ? getMaxStringLen(args.rowNames, args.nrow) * 2 // *2 in case longest row name is all quotes (!) and all get escaped : 11); // specific integer names could be MAX_INT 2147483647 (10 chars) even on a 5 row table, and data.frame allows negative integer rownames hence 11 for the sign maxLineLen += 2/*possible quotes*/ + sepLen; From 2fe3099ff4caaf37b0a622c358d5472fbb78ed60 Mon Sep 17 00:00:00 2001 From: Philippe Chataignon Date: Mon, 2 Sep 2024 09:25:42 +0200 Subject: [PATCH 25/42] Add test for compressLevel=1 --- inst/tests/tests.Rraw | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/inst/tests/tests.Rraw b/inst/tests/tests.Rraw index dffda6520..84bc0f2e4 100644 --- a/inst/tests/tests.Rraw +++ b/inst/tests/tests.Rraw @@ -10015,8 +10015,10 @@ if (!haszlib()) { test(1658.423, file.info(f1)$size < file.info(f2)$size) # 74 < 804 (file.size() isn't available in R 3.1.0) if (test_R.utils) test(1658.43, fread(f1), DT) # use fread to decompress gz (works cross-platform) fwrite(DT, file=f3<-tempfile(), compress="gzip") # compress to filename not ending .gz + fwrite(DT, file=f4<-tempfile(), compress="gzip", compressLevel=1) # test compressLevel test(1658.441, file.info(f3)$size, file.info(f1)$size) - unlink(c(f1,f2,f3)) + test(1658.442, file.info(f4)$size > file.info(f1)$size, TRUE) + unlink(c(f1,f2,f3,f4)) } DT = data.table(a=1:3, b=list(1:4, c(3.14, 100e10), c("foo", "bar", "baz"))) test(1658.45, fwrite(DT), output=c("a,b","1,1|2|3|4","2,3.14|1e+12","3,foo|bar|baz")) From e9f28614c99e797ef45c70b986f174af2641f1c9 Mon Sep 17 00:00:00 2001 From: Philippe Chataignon Date: Mon, 2 Sep 2024 09:44:15 +0200 Subject: [PATCH 26/42] Add url link in compressLevel documentation --- man/fwrite.Rd | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/man/fwrite.Rd b/man/fwrite.Rd index 549a69a83..fb052a209 100644 --- a/man/fwrite.Rd +++ b/man/fwrite.Rd @@ -59,7 +59,7 @@ fwrite(x, file = "", append = FALSE, quote = "auto", \item{nThread}{The number of threads to use. Experiment to see what works best for your data on your hardware.} \item{showProgress}{ Display a progress meter on the console? Ignored when \code{file==""}. } \item{compress}{If \code{compress = "auto"} and if \code{file} ends in \code{.gz} then output format is gzipped csv else csv. If \code{compress = "none"}, output format is always csv. If \code{compress = "gzip"} then format is gzipped csv. Output to the console is never gzipped even if \code{compress = "gzip"}. By default, \code{compress = "auto"}.} - \item{compressLevel}{Level of compression between 1 (fastest method but less compression) and 9 (slowest compression method). The default compression level is 6 which is a good compromise.} + \item{compressLevel}{Level of compression between 1 and 9, 6 by default. See \url{https://linux.die.net/man/1/gzip} for details.} \item{yaml}{If \code{TRUE}, \code{fwrite} will output a CSVY file, that is, a CSV file with metadata stored as a YAML header, using \code{\link[yaml]{as.yaml}}. See \code{Details}. } \item{bom}{If \code{TRUE} a BOM (Byte Order Mark) sequence (EF BB BF) is added at the beginning of the file; format 'UTF-8 with BOM'.} \item{verbose}{Be chatty and report timings?} From 6a75749014c830e7bf1c9a24853c5dc4b71ef404 Mon Sep 17 00:00:00 2001 From: Philippe Chataignon Date: Mon, 2 Sep 2024 09:44:41 +0200 Subject: [PATCH 27/42] Add 2 lines in NEWS for fwrite fix and compressLevel --- NEWS.md | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/NEWS.md b/NEWS.md index fc8dd16ca..844c0580d 100644 --- a/NEWS.md +++ b/NEWS.md @@ -111,6 +111,10 @@ 14. Passing functions programmatically with `env=` doesn't produce an opaque error, e.g. `DT[, f(b), env = list(f=sum)]`, [#6026](https://github.com/Rdatatable/data.table/issues/6026). Note that it's much better to pass functions like `f="sum"` instead. Thanks to @MichaelChirico for the bug report and fix. +15. `fwrite()` with compress="gzip" produces slightly incompatible gz files with multiple independent chunks [#6356](https://github.com/Rdatatable/data.table/issues/6356). This has been fixed by an internal refactoring of the fwrite function. Thanks to @olivierfoster for report and @philippechataignon for the fix. + +16. `fwrite()` gains a new parameter `compressLevel` to control compression level for gzip [#5506](https://github.com/Rdatatable/data.table/issues/5506)] [#5513](https://github.com/Rdatatable/data.table/issues/5513) + ## NOTES 1. `transform()` method for data.table sped up substantially when creating new columns on large tables. Thanks to @OfekShilon for the report and PR. The implemented solution was proposed by @ColeMiller1. @@ -123,7 +127,7 @@ 5. Input files are now kept open during `mmap()` when running under Emscripten, [emscripten-core/emscripten#20459](https://github.com/emscripten-core/emscripten/issues/20459). This avoids an error in `fread()` when running in WebAssembly, [#5969](https://github.com/Rdatatable/data.table/issues/5969). Thanks to @maek-ies for the report and @georgestagg for the PR. -6. `dcast()` improves behavior for the situation that the `fun.aggregate` value of `length()` is used but not provided by the user. +6. `dcast()` improves behavior for the situation that the `fun.aggregate` value of `length()` is used but not provided by the user. a. This now triggers a warning, not a message, since relying on this default often signals unexpected duplicates in the data, [#5386](https://github.com/Rdatatable/data.table/issues/5386). The warning is classed as `dt_missing_fun_aggregate_warning`, allowing for more targeted handling in user code. Thanks @MichaelChirico for the suggestion and @Nj221102 for the fix. @@ -838,7 +842,7 @@ 14. The options `datatable.print.class` and `datatable.print.keys` are now `TRUE` by default. They have been available since v1.9.8 (Nov 2016) and v1.11.0 (May 2018) respectively. -15. Thanks to @ssh352, Václav Tlapák, Cole Miller, András Svraka and Toby Dylan Hocking for reporting and bisecting a significant performance regression in dev. This was fixed before release thanks to a PR by Jan Gorecki, [#5463](https://github.com/Rdatatable/data.table/pull/5463). +15. Thanks to @ssh352, Václav Tlapák, Cole Miller, András Svraka and Toby Dylan Hocking for reporting and bisecting a significant performance regression in dev. This was fixed before release thanks to a PR by Jan Gorecki, [#5463](https://github.com/Rdatatable/data.table/pull/5463). 16. `key(x) <- value` is now fully deprecated (from warning to error). Use `setkey()` to set a table's key. We started warning not to use this approach in 2012, with a stronger warning starting in 2019 (1.12.2). This function will be removed in the next release. From 4936e4512ef1dca84a8cdd4a92bcac72c7f476b1 Mon Sep 17 00:00:00 2001 From: Michael Chirico Date: Mon, 2 Sep 2024 11:08:10 -0700 Subject: [PATCH 28/42] tidy-up, expand NEWS for compressLevel --- NEWS.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/NEWS.md b/NEWS.md index 844c0580d..aa9d14754 100644 --- a/NEWS.md +++ b/NEWS.md @@ -113,7 +113,7 @@ 15. `fwrite()` with compress="gzip" produces slightly incompatible gz files with multiple independent chunks [#6356](https://github.com/Rdatatable/data.table/issues/6356). This has been fixed by an internal refactoring of the fwrite function. Thanks to @olivierfoster for report and @philippechataignon for the fix. -16. `fwrite()` gains a new parameter `compressLevel` to control compression level for gzip [#5506](https://github.com/Rdatatable/data.table/issues/5506)] [#5513](https://github.com/Rdatatable/data.table/issues/5513) +16. `fwrite()` gains a new parameter `compressLevel` to control compression level for gzip, [#5506](https://github.com/Rdatatable/data.table/issues/5506). This parameter balances compression speed and total compression, and corresponds directly to the analogous command-line parameter, e.g. `compressLevel=4` corresponds to passing `-4`; the default, `6`, matches the command-line default. Thanks @mgarbuzov for the request and @philippechataignon for implementing. ## NOTES From 6b76bea3ddbe6787222ead2579a012ef802a32c2 Mon Sep 17 00:00:00 2001 From: Michael Chirico Date: Mon, 2 Sep 2024 11:22:39 -0700 Subject: [PATCH 29/42] Use match.arg() for arg validation --- R/fwrite.R | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/R/fwrite.R b/R/fwrite.R index d06da559e..dc9e0e2fb 100644 --- a/R/fwrite.R +++ b/R/fwrite.R @@ -19,13 +19,10 @@ fwrite = function(x, file="", append=FALSE, quote="auto", if (length(encoding) != 1L || !encoding %chin% c("", "UTF-8", "native")) { stopf("Argument 'encoding' must be '', 'UTF-8' or 'native'.") } - if (missing(qmethod)) qmethod = qmethod[1L] - if (missing(compress)) compress = compress[1L] - if (missing(compressLevel)) compressLevel = 6L - if (missing(dateTimeAs)) { dateTimeAs = dateTimeAs[1L] } - else if (length(dateTimeAs)>1L) stopf("dateTimeAs must be a single string") - dateTimeAs = chmatch(dateTimeAs, c("ISO","squash","epoch","write.csv"))-1L - if (is.na(dateTimeAs)) stopf("dateTimeAs must be 'ISO','squash','epoch' or 'write.csv'") + qmethod = match.arg(qmethod) + compress = match.arg(compress) + dateTimeAs = match.arg(dateTimeAs) + dateTimeAs = chmatch(dateTimeAs, c("ISO", "squash", "epoch", "write.csv"))-1L if (!missing(logical01) && !missing(logicalAsInt)) stopf("logicalAsInt has been renamed logical01. Use logical01 only, not both.") if (!missing(logicalAsInt)) { From eede93fbaa3ae9ee2df704009200a1ce9854a0f1 Mon Sep 17 00:00:00 2001 From: Michael Chirico Date: Mon, 2 Sep 2024 11:51:11 -0700 Subject: [PATCH 30/42] add a test for the other extreme compressLevel=9 --- inst/tests/tests.Rraw | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/inst/tests/tests.Rraw b/inst/tests/tests.Rraw index 84bc0f2e4..698ade3b7 100644 --- a/inst/tests/tests.Rraw +++ b/inst/tests/tests.Rraw @@ -10016,9 +10016,11 @@ if (!haszlib()) { if (test_R.utils) test(1658.43, fread(f1), DT) # use fread to decompress gz (works cross-platform) fwrite(DT, file=f3<-tempfile(), compress="gzip") # compress to filename not ending .gz fwrite(DT, file=f4<-tempfile(), compress="gzip", compressLevel=1) # test compressLevel + fwrite(DT, file=f5<-tempfile(), compress="gzip", compressLevel=9) test(1658.441, file.info(f3)$size, file.info(f1)$size) - test(1658.442, file.info(f4)$size > file.info(f1)$size, TRUE) - unlink(c(f1,f2,f3,f4)) + test(1658.442, file.info(f4)$size > file.info(f1)$size) + test(1658.443, file.info(f1)$size > file.info(f5)$size) + unlink(c(f1,f2,f3,f4,f5)) } DT = data.table(a=1:3, b=list(1:4, c(3.14, 100e10), c("foo", "bar", "baz"))) test(1658.45, fwrite(DT), output=c("a,b","1,1|2|3|4","2,3.14|1e+12","3,foo|bar|baz")) From e39259c1f57bdcaa958118e155d6bd501f07e119 Mon Sep 17 00:00:00 2001 From: Michael Chirico Date: Mon, 2 Sep 2024 12:03:00 -0700 Subject: [PATCH 31/42] partial test fix --- inst/tests/tests.Rraw | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/inst/tests/tests.Rraw b/inst/tests/tests.Rraw index 698ade3b7..688ef4df1 100644 --- a/inst/tests/tests.Rraw +++ b/inst/tests/tests.Rraw @@ -198,6 +198,7 @@ base_messages = list( stopifnot = get_msg(stopifnot(FALSE), fmt="FALSE"), not_yet_used = get_msg(.NotYetUsed("abc"), "'", fmt=TRUE), # NB: need fmt= because the English message has '(yet)' --> parens in regex ambiguous_date_fmt = get_msg(as.Date('xxx')), + match_arg_length = get_msg(match.arg(c('a', 'b'), letters)) NULL ) @@ -9973,7 +9974,7 @@ test(1658.27, fwrite(DT, na="NA", verbose=TRUE), output='Writing bom .false., ya test(1658.28, fwrite(ok_dt, 1), error=base_messages$stopifnot("is.character(file) && length(file) == 1L && !is.na(file)")) test(1658.29, fwrite(ok_dt, quote=123), error="identical\\(quote.*auto.*FALSE.*TRUE") test(1658.30, fwrite(ok_dt, sep="..."), error="nchar(sep)") -test(1658.31, fwrite(ok_dt, qmethod=c("double", "double")), error="length(qmethod)") +test(1658.31, fwrite(ok_dt, qmethod=c("double", "double")), error=base_messages$match_arg_length) test(1658.32, fwrite(ok_dt, col.names="foobar"), error="isTRUEorFALSE(col.names)") # null data table (no columns) @@ -10943,7 +10944,7 @@ DT = data.table( D = as.POSIXct(dt<-paste(d,t), tz="UTC"), E = as.POSIXct(paste0(dt,c(".999",".0",".5",".111112",".123456",".023",".0",".999999",".99",".0009")), tz="UTC")) -test(1740.0, fwrite(DT,dateTimeAs="iso"), error="dateTimeAs must be 'ISO','squash','epoch' or 'write.csv'") +test(1740.0, fwrite(DT,dateTimeAs="iso"), error="ISO") test(1740.1, fwrite(DT,dateTimeAs=c("ISO","squash")), error="dateTimeAs must be a single string") test(1740.2, capture.output(fwrite(DT,dateTimeAs="ISO")), c( "A,B,C,D,E", From 182432bbba0b638de8abc9aefe1df5223a13b18a Mon Sep 17 00:00:00 2001 From: Michael Chirico Date: Mon, 2 Sep 2024 12:08:54 -0700 Subject: [PATCH 32/42] fix updated test errors --- inst/tests/tests.Rraw | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/inst/tests/tests.Rraw b/inst/tests/tests.Rraw index 688ef4df1..3dc128898 100644 --- a/inst/tests/tests.Rraw +++ b/inst/tests/tests.Rraw @@ -158,6 +158,9 @@ TZnotUTC = !identical(tt,"") && !is_utc(tt) # (3) function factory for matching messages exactly by substituting anything between delimiters [delim, fmt=TRUE] # (4) function factory for matching messages exactly by substituting a generic string [fmt=string] get_msg = function(e, delim, fmt=FALSE) { + ufq = options(useFancyQuotes = FALSE) # otherwise we get angled quotes, hard to match robustly + on.exit(options(ufq)) + condition = tryCatch({e; NULL}, error=identity, warning=identity) if (is.null(condition)) return(condition) msg = condition$message @@ -169,17 +172,13 @@ get_msg = function(e, delim, fmt=FALSE) { sprintf("%s%s%s", delim[1L], if (fmt) "%s" else ".+", delim[2L]), msg ) - if (fmt) return(function(x) sprintf(msg, x)) + if (fmt) return(function(...) sprintf(msg, ...)) return(msg) } base_messages = list( missing_object = get_msg(`__dt_test_missing_` + 1, "'", fmt=TRUE), missing_function = get_msg(`__dt_test_missing_`(), '"', fmt=TRUE), - missing_coerce_method = get_msg(delim = '"', { - old = options(useFancyQuotes = FALSE) # otherwise we get angled quotes, hard to match robustly - on.exit(options(old)) - methods::as(TRUE, 'foo') - }), + missing_coerce_method = get_msg(methods::as(TRUE, 'foo'), delim = '"'), missing_dispatch_method = get_msg(conditionMessage(structure(1, class="foo")), '[\'"]'), invalid_arg_unary_operator = get_msg(-'a'), invalid_arg_binary_operator = get_msg(1 + 'a'), @@ -198,7 +197,8 @@ base_messages = list( stopifnot = get_msg(stopifnot(FALSE), fmt="FALSE"), not_yet_used = get_msg(.NotYetUsed("abc"), "'", fmt=TRUE), # NB: need fmt= because the English message has '(yet)' --> parens in regex ambiguous_date_fmt = get_msg(as.Date('xxx')), - match_arg_length = get_msg(match.arg(c('a', 'b'), letters)) + match_arg_length = get_msg(match.arg(c('a', 'b'), letters)), + match_arg_4_choices = get_msg(match.arg('e', letters[1:4]), delim='"', fmt=TRUE), NULL ) @@ -10944,9 +10944,9 @@ DT = data.table( D = as.POSIXct(dt<-paste(d,t), tz="UTC"), E = as.POSIXct(paste0(dt,c(".999",".0",".5",".111112",".123456",".023",".0",".999999",".99",".0009")), tz="UTC")) -test(1740.0, fwrite(DT,dateTimeAs="iso"), error="ISO") -test(1740.1, fwrite(DT,dateTimeAs=c("ISO","squash")), error="dateTimeAs must be a single string") -test(1740.2, capture.output(fwrite(DT,dateTimeAs="ISO")), c( +test(1740.1, fwrite(DT,dateTimeAs="iso"), error=base_messages$match_arg_4_choices("ISO", "epoch", "squash", "write.csv")) +test(1740.2, fwrite(DT,dateTimeAs=c("ISO","squash")), error=base_messages$match_arg_length) +test(1740.3, capture.output(fwrite(DT,dateTimeAs="ISO")), c( "A,B,C,D,E", "1907-10-21,1907-10-21,23:59:59,1907-10-21T23:59:59Z,1907-10-21T23:59:59.999Z", "1907-10-22,1907-10-22,00:00:00,1907-10-22T00:00:00Z,1907-10-22T00:00:00Z", @@ -10958,7 +10958,7 @@ test(1740.2, capture.output(fwrite(DT,dateTimeAs="ISO")), c( "1999-12-31,1999-12-31,01:23:45,1999-12-31T01:23:45Z,1999-12-31T01:23:45.999999Z", "2000-02-29,2000-02-29,23:59:59,2000-02-29T23:59:59Z,2000-02-29T23:59:59.990Z", "2016-09-12,2016-09-12,01:30:30,2016-09-12T01:30:30Z,2016-09-12T01:30:30.000900Z")) -test(1740.3, capture.output(fwrite(DT,dateTimeAs="squash")), c( +test(1740.4, capture.output(fwrite(DT,dateTimeAs="squash")), c( "A,B,C,D,E", "19071021,19071021,235959,19071021235959000,19071021235959999", "19071022,19071022,000000,19071022000000000,19071022000000000", @@ -10970,7 +10970,7 @@ test(1740.3, capture.output(fwrite(DT,dateTimeAs="squash")), c( "19991231,19991231,012345,19991231012345000,19991231012345999", "20000229,20000229,235959,20000229235959000,20000229235959990", "20160912,20160912,013030,20160912013030000,20160912013030000")) -test(1740.4, capture.output(fwrite(DT,dateTimeAs="epoch")), c( +test(1740.5, capture.output(fwrite(DT,dateTimeAs="epoch")), c( "A,B,C,D,E", "-22718,-22718,86399,-1962748801,-1962748800.001", "-22717,-22717,0,-1962748800,-1962748800", From 6999dc603f6f8dad34b84844693c33f9761afc6a Mon Sep 17 00:00:00 2001 From: Michael Chirico Date: Mon, 2 Sep 2024 12:09:53 -0700 Subject: [PATCH 33/42] confirmed NEWS wording, fix typo --- NEWS.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/NEWS.md b/NEWS.md index aa9d14754..8f413a48f 100644 --- a/NEWS.md +++ b/NEWS.md @@ -111,7 +111,7 @@ 14. Passing functions programmatically with `env=` doesn't produce an opaque error, e.g. `DT[, f(b), env = list(f=sum)]`, [#6026](https://github.com/Rdatatable/data.table/issues/6026). Note that it's much better to pass functions like `f="sum"` instead. Thanks to @MichaelChirico for the bug report and fix. -15. `fwrite()` with compress="gzip" produces slightly incompatible gz files with multiple independent chunks [#6356](https://github.com/Rdatatable/data.table/issues/6356). This has been fixed by an internal refactoring of the fwrite function. Thanks to @olivierfoster for report and @philippechataignon for the fix. +15. `fwrite()` with `compress="gzip"` produces compatible gz files when composed of multiple independent chunks owing to parallelization, [#6356](https://github.com/Rdatatable/data.table/issues/6356). Earlier `fwrite()` versions could have issues with HTTP upload using `Content-Encoding: gzip` and `Transfer-Encoding: chunked`. Thanks to @oliverfoster for report and @philippechataignon for the fix. 16. `fwrite()` gains a new parameter `compressLevel` to control compression level for gzip, [#5506](https://github.com/Rdatatable/data.table/issues/5506). This parameter balances compression speed and total compression, and corresponds directly to the analogous command-line parameter, e.g. `compressLevel=4` corresponds to passing `-4`; the default, `6`, matches the command-line default. Thanks @mgarbuzov for the request and @philippechataignon for implementing. From 47464db047acba6c6fd8820937fc92e8c3c4cd8e Mon Sep 17 00:00:00 2001 From: Michael Chirico Date: Mon, 2 Sep 2024 12:42:57 -0700 Subject: [PATCH 34/42] fix order --- inst/tests/tests.Rraw | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inst/tests/tests.Rraw b/inst/tests/tests.Rraw index 3dc128898..4b9bb9410 100644 --- a/inst/tests/tests.Rraw +++ b/inst/tests/tests.Rraw @@ -10944,7 +10944,7 @@ DT = data.table( D = as.POSIXct(dt<-paste(d,t), tz="UTC"), E = as.POSIXct(paste0(dt,c(".999",".0",".5",".111112",".123456",".023",".0",".999999",".99",".0009")), tz="UTC")) -test(1740.1, fwrite(DT,dateTimeAs="iso"), error=base_messages$match_arg_4_choices("ISO", "epoch", "squash", "write.csv")) +test(1740.1, fwrite(DT,dateTimeAs="iso"), error=base_messages$match_arg_4_choices("ISO", "squash", "epoch", "write.csv")) test(1740.2, fwrite(DT,dateTimeAs=c("ISO","squash")), error=base_messages$match_arg_length) test(1740.3, capture.output(fwrite(DT,dateTimeAs="ISO")), c( "A,B,C,D,E", From e2e7022357ab3aa00eb4b826c98e954602964ada Mon Sep 17 00:00:00 2001 From: Michael Chirico Date: Mon, 2 Sep 2024 12:59:08 -0700 Subject: [PATCH 35/42] weak ordering --- inst/tests/tests.Rraw | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/inst/tests/tests.Rraw b/inst/tests/tests.Rraw index 4b9bb9410..8f86ba5d3 100644 --- a/inst/tests/tests.Rraw +++ b/inst/tests/tests.Rraw @@ -10019,8 +10019,8 @@ if (!haszlib()) { fwrite(DT, file=f4<-tempfile(), compress="gzip", compressLevel=1) # test compressLevel fwrite(DT, file=f5<-tempfile(), compress="gzip", compressLevel=9) test(1658.441, file.info(f3)$size, file.info(f1)$size) - test(1658.442, file.info(f4)$size > file.info(f1)$size) - test(1658.443, file.info(f1)$size > file.info(f5)$size) + test(1658.442, file.info(f4)$size >= file.info(f1)$size) + test(1658.443, file.info(f1)$size >= file.info(f5)$size) unlink(c(f1,f2,f3,f4,f5)) } DT = data.table(a=1:3, b=list(1:4, c(3.14, 100e10), c("foo", "bar", "baz"))) From 6bebfc2d00b951ffced8a011fba4cccababfefed Mon Sep 17 00:00:00 2001 From: Michael Chirico Date: Mon, 2 Sep 2024 13:47:01 -0700 Subject: [PATCH 36/42] place in 1.17.0 NEWS --- NEWS.md | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/NEWS.md b/NEWS.md index f15d8f3bf..6b7502787 100644 --- a/NEWS.md +++ b/NEWS.md @@ -6,6 +6,12 @@ 1. In `DT[, variable := value]`, when value is class `POSIXlt`, we automatically coerce it to class `POSIXct` instead, [#1724](https://github.com/Rdatatable/data.table/issues/1724). Thanks to @linzhp for the report, and Benjamin Schwendinger for the fix. +## NEW FEATURES + +1. `fwrite()` with `compress="gzip"` produces compatible gz files when composed of multiple independent chunks owing to parallelization, [#6356](https://github.com/Rdatatable/data.table/issues/6356). Earlier `fwrite()` versions could have issues with HTTP upload using `Content-Encoding: gzip` and `Transfer-Encoding: chunked`. Thanks to @oliverfoster for report and @philippechataignon for the fix. + +2. `fwrite()` gains a new parameter `compressLevel` to control compression level for gzip, [#5506](https://github.com/Rdatatable/data.table/issues/5506). This parameter balances compression speed and total compression, and corresponds directly to the analogous command-line parameter, e.g. `compressLevel=4` corresponds to passing `-4`; the default, `6`, matches the command-line default. Thanks @mgarbuzov for the request and @philippechataignon for implementing. + ## BUG FIXES 1. Using `print.data.table()` with character truncation using `datatable.prettyprint.char` no longer errors with `NA` entries, [#6441](https://github.com/Rdatatable/data.table/issues/6441). Thanks to @r2evans for the bug report, and @joshhwuu for the fix. @@ -131,10 +137,6 @@ 14. Passing functions programmatically with `env=` doesn't produce an opaque error, e.g. `DT[, f(b), env = list(f=sum)]`, [#6026](https://github.com/Rdatatable/data.table/issues/6026). Note that it's much better to pass functions like `f="sum"` instead. Thanks to @MichaelChirico for the bug report and fix. -15. `fwrite()` with `compress="gzip"` produces compatible gz files when composed of multiple independent chunks owing to parallelization, [#6356](https://github.com/Rdatatable/data.table/issues/6356). Earlier `fwrite()` versions could have issues with HTTP upload using `Content-Encoding: gzip` and `Transfer-Encoding: chunked`. Thanks to @oliverfoster for report and @philippechataignon for the fix. - -16. `fwrite()` gains a new parameter `compressLevel` to control compression level for gzip, [#5506](https://github.com/Rdatatable/data.table/issues/5506). This parameter balances compression speed and total compression, and corresponds directly to the analogous command-line parameter, e.g. `compressLevel=4` corresponds to passing `-4`; the default, `6`, matches the command-line default. Thanks @mgarbuzov for the request and @philippechataignon for implementing. - ## NOTES 1. `transform()` method for data.table sped up substantially when creating new columns on large tables. Thanks to @OfekShilon for the report and PR. The implemented solution was proposed by @ColeMiller1. From 5687a0ce45e7e4051a1b1a06ebde6bc708f0024b Mon Sep 17 00:00:00 2001 From: Philippe Chataignon Date: Tue, 3 Sep 2024 14:56:47 +0200 Subject: [PATCH 37/42] Add parenthesis to be more explicit --- src/fwrite.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/fwrite.c b/src/fwrite.c index 9e41ea5a4..e6c783c41 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -673,7 +673,7 @@ void fwriteMain(fwriteMainArgs args) } if (args.whichFun[j] == WF_Float64 && args.scipen > 0) // clamp width to IEEE754 max to avoid scipen=99999 allocating buffer larger than can ever be written - width += args.scipen < 350 ? args.scipen : 350; + width += (args.scipen < 350) ? args.scipen : 350; if (width < naLen) width = naLen; maxLineLen += width * 2; // *2 in case the longest string is all quotes and they all need to be escaped From 117ab45674f1e56304abca83f9f0df50ab0274be Mon Sep 17 00:00:00 2001 From: Philippe Chataignon Date: Tue, 3 Sep 2024 15:03:46 +0200 Subject: [PATCH 38/42] Add comment for DeflateInit2 --- src/fwrite.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/fwrite.c b/src/fwrite.c index e6c783c41..b97ef63cd 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -564,6 +564,13 @@ int init_stream(z_stream *stream) { stream->zalloc = Z_NULL; stream->zfree = Z_NULL; stream->opaque = Z_NULL; + // In deflateInit2, windowBits can be –8..–15 for raw deflate, 15 is the bigger window size. + // With -15, deflate() will generate raw deflate data with no zlib header or trailer, and will not compute a check value. + // Previously this parameter was 31 baecause windowBits can be greater than 15 for optional gzip encoding. + // Adding 16 writes a simple gzip header and trailer around the compressed data. + // Now we manage header and trailer. gzip file is slighty lower with -15 because no header/trailer are + // written for each chunk. + // For memLevel, 8 is the default value (128 KiB). memLevel=9 uses maximum memory for optimal speed. To be tested ? int err = deflateInit2(stream, gzip_level, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY); return err; // # nocov } From 4e91a218cf0c08f89ea65715280df1c450839dc9 Mon Sep 17 00:00:00 2001 From: Michael Chirico Date: Tue, 3 Sep 2024 13:25:58 -0700 Subject: [PATCH 39/42] typo --- src/fwrite.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/fwrite.c b/src/fwrite.c index b97ef63cd..44df3f93a 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -566,7 +566,7 @@ int init_stream(z_stream *stream) { stream->opaque = Z_NULL; // In deflateInit2, windowBits can be –8..–15 for raw deflate, 15 is the bigger window size. // With -15, deflate() will generate raw deflate data with no zlib header or trailer, and will not compute a check value. - // Previously this parameter was 31 baecause windowBits can be greater than 15 for optional gzip encoding. + // Previously this parameter was 31 because windowBits can be greater than 15 for optional gzip encoding. // Adding 16 writes a simple gzip header and trailer around the compressed data. // Now we manage header and trailer. gzip file is slighty lower with -15 because no header/trailer are // written for each chunk. From 255f1cea0716697bcef3b8790df6d5db3aae15a3 Mon Sep 17 00:00:00 2001 From: Philippe Chataignon Date: Wed, 4 Sep 2024 11:49:28 +0200 Subject: [PATCH 40/42] Add parenthesis to be more explicit (2) Co-authored-by: Michael Chirico --- src/fwrite.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/fwrite.c b/src/fwrite.c index bec8f2f33..4229fab55 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -584,7 +584,7 @@ int compressbuff(z_stream *stream, void* dest, size_t *destLen, const void* sour int err = deflate(stream, Z_SYNC_FLUSH); *destLen = *destLen - stream->avail_out; // *destLen = stream->total_out; - return err != Z_STREAM_ERROR ? Z_OK : err; + return (err != Z_STREAM_ERROR) ? Z_OK : err; } #endif From 309da8f56d3bed59f0dbe5193aaf15cea0211b6b Mon Sep 17 00:00:00 2001 From: Michael Chirico Date: Mon, 2 Dec 2024 18:18:29 -0800 Subject: [PATCH 41/42] Try to emphasize that '-' is "command flag hyphen", not "negative" --- NEWS.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/NEWS.md b/NEWS.md index 90ee0106a..803a674a8 100644 --- a/NEWS.md +++ b/NEWS.md @@ -67,7 +67,7 @@ rowwiseDT( 5. `fwrite()` with `compress="gzip"` produces compatible gz files when composed of multiple independent chunks owing to parallelization, [#6356](https://github.com/Rdatatable/data.table/issues/6356). Earlier `fwrite()` versions could have issues with HTTP upload using `Content-Encoding: gzip` and `Transfer-Encoding: chunked`. Thanks to @oliverfoster for report and @philippechataignon for the fix. -6. `fwrite()` gains a new parameter `compressLevel` to control compression level for gzip, [#5506](https://github.com/Rdatatable/data.table/issues/5506). This parameter balances compression speed and total compression, and corresponds directly to the analogous command-line parameter, e.g. `compressLevel=4` corresponds to passing `-4`; the default, `6`, matches the command-line default. Thanks @mgarbuzov for the request and @philippechataignon for implementing. +6. `fwrite()` gains a new parameter `compressLevel` to control compression level for gzip, [#5506](https://github.com/Rdatatable/data.table/issues/5506). This parameter balances compression speed and total compression, and corresponds directly to the analogous command-line parameter, e.g. `compressLevel=4` corresponds to passing `-4`; the default, `6`, matches the command-line default, i.e. equivalent to passing `-6`. Thanks @mgarbuzov for the request and @philippechataignon for implementing. ## BUG FIXES From 5c57ebaa5363c08d2b975c486c4b14e65170bdab Mon Sep 17 00:00:00 2001 From: Michael Chirico Date: Mon, 2 Dec 2024 18:41:32 -0800 Subject: [PATCH 42/42] Convert Toby'd comment to atime_test() --- .ci/atime/tests.R | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/.ci/atime/tests.R b/.ci/atime/tests.R index f6d68ce68..511079477 100644 --- a/.ci/atime/tests.R +++ b/.ci/atime/tests.R @@ -231,7 +231,20 @@ test.list <- atime::atime_test_list( }, expr = data.table:::melt(DT, measure.vars = measure.vars), Slow = "fd24a3105953f7785ea7414678ed8e04524e6955", # Parent of the merge commit (https://github.com/Rdatatable/data.table/commit/ed72e398df76a0fcfd134a4ad92356690e4210ea) of the PR (https://github.com/Rdatatable/data.table/pull/5054) that fixes the issue - Fast = "ed72e398df76a0fcfd134a4ad92356690e4210ea"), # Merge commit of the PR (https://github.com/Rdatatable/data.table/pull/5054) that fixes the issue + Fast = "ed72e398df76a0fcfd134a4ad92356690e4210ea"), # Merge commit of the PR (https://github.com/Rdatatable/data.table/pull/5054) that fixes the issue # Test case created directly using the atime code below (not adapted from any other benchmark), based on the issue/fix PR https://github.com/Rdatatable/data.table/pull/5054#issue-930603663 "melt should be more efficient when there are missing input columns." + + # Test case created from @tdhock's comment https://github.com/Rdatatable/data.table/pull/6393#issuecomment-2327396833, in turn adapted from @philippechataignon's comment https://github.com/Rdatatable/data.table/pull/6393#issuecomment-2326714012 + "fwrite refactored in #6393" = atime::atime_test( + setup = { + set.seed(1) + NC = 10L + L <- data.table(i=1:N) + L[, paste0("V", 1:NC) := replicate(NC, rnorm(N), simplify=FALSE)] + out.csv <- tempfile() + }, + expr = data.table::fwrite(L, out.csv, compress="gzip"), + Before = "f339aa64c426a9cd7cf2fcb13d91fc4ed353cd31", # Parent of the first commit https://github.com/Rdatatable/data.table/commit/fcc10d73a20837d0f1ad3278ee9168473afa5ff1 in the PR https://github.com/Rdatatable/data.table/pull/6393/commits with major change to fwrite with gzip. + PR = "3630413ae493a5a61b06c50e80d166924d2ef89a"), # Close-to-last merge commit in the PR. tests=extra.test.list) # nolint end: undesirable_operator_linter.