diff --git a/.ci/atime/tests.R b/.ci/atime/tests.R index f6d68ce682..5110794770 100644 --- a/.ci/atime/tests.R +++ b/.ci/atime/tests.R @@ -231,7 +231,20 @@ test.list <- atime::atime_test_list( }, expr = data.table:::melt(DT, measure.vars = measure.vars), Slow = "fd24a3105953f7785ea7414678ed8e04524e6955", # Parent of the merge commit (https://github.com/Rdatatable/data.table/commit/ed72e398df76a0fcfd134a4ad92356690e4210ea) of the PR (https://github.com/Rdatatable/data.table/pull/5054) that fixes the issue - Fast = "ed72e398df76a0fcfd134a4ad92356690e4210ea"), # Merge commit of the PR (https://github.com/Rdatatable/data.table/pull/5054) that fixes the issue + Fast = "ed72e398df76a0fcfd134a4ad92356690e4210ea"), # Merge commit of the PR (https://github.com/Rdatatable/data.table/pull/5054) that fixes the issue # Test case created directly using the atime code below (not adapted from any other benchmark), based on the issue/fix PR https://github.com/Rdatatable/data.table/pull/5054#issue-930603663 "melt should be more efficient when there are missing input columns." + + # Test case created from @tdhock's comment https://github.com/Rdatatable/data.table/pull/6393#issuecomment-2327396833, in turn adapted from @philippechataignon's comment https://github.com/Rdatatable/data.table/pull/6393#issuecomment-2326714012 + "fwrite refactored in #6393" = atime::atime_test( + setup = { + set.seed(1) + NC = 10L + L <- data.table(i=1:N) + L[, paste0("V", 1:NC) := replicate(NC, rnorm(N), simplify=FALSE)] + out.csv <- tempfile() + }, + expr = data.table::fwrite(L, out.csv, compress="gzip"), + Before = "f339aa64c426a9cd7cf2fcb13d91fc4ed353cd31", # Parent of the first commit https://github.com/Rdatatable/data.table/commit/fcc10d73a20837d0f1ad3278ee9168473afa5ff1 in the PR https://github.com/Rdatatable/data.table/pull/6393/commits with major change to fwrite with gzip. + PR = "3630413ae493a5a61b06c50e80d166924d2ef89a"), # Close-to-last merge commit in the PR. tests=extra.test.list) # nolint end: undesirable_operator_linter. diff --git a/NEWS.md b/NEWS.md index 92b4f9ee3b..bca7d63fa9 100644 --- a/NEWS.md +++ b/NEWS.md @@ -65,6 +65,10 @@ rowwiseDT( 4. `patterns()` in `melt()` combines correctly with user-defined `cols=`, which can be useful to specify a subset of columns to reshape without having to use a regex, for example `patterns("2", cols=c("y1", "y2"))` will only give `y2` even if there are other columns in the input matching `2`, [#6498](https://github.com/Rdatatable/data.table/issues/6498). Thanks to @hongyuanjia for the report, and to @tdhock for the PR. +5. `fwrite()` with `compress="gzip"` produces compatible gz files when composed of multiple independent chunks owing to parallelization, [#6356](https://github.com/Rdatatable/data.table/issues/6356). Earlier `fwrite()` versions could have issues with HTTP upload using `Content-Encoding: gzip` and `Transfer-Encoding: chunked`. Thanks to @oliverfoster for report and @philippechataignon for the fix. + +6. `fwrite()` gains a new parameter `compressLevel` to control compression level for gzip, [#5506](https://github.com/Rdatatable/data.table/issues/5506). This parameter balances compression speed and total compression, and corresponds directly to the analogous command-line parameter, e.g. `compressLevel=4` corresponds to passing `-4`; the default, `6`, matches the command-line default, i.e. equivalent to passing `-6`. Thanks @mgarbuzov for the request and @philippechataignon for implementing. + ## BUG FIXES 1. `fwrite()` respects `dec=','` for timestamp columns (`POSIXct` or `nanotime`) with sub-second accuracy, [#6446](https://github.com/Rdatatable/data.table/issues/6446). Thanks @kav2k for pointing out the inconsistency and @MichaelChirico for the PR. @@ -268,7 +272,7 @@ rowwiseDT( 5. Input files are now kept open during `mmap()` when running under Emscripten, [emscripten-core/emscripten#20459](https://github.com/emscripten-core/emscripten/issues/20459). This avoids an error in `fread()` when running in WebAssembly, [#5969](https://github.com/Rdatatable/data.table/issues/5969). Thanks to @maek-ies for the report and @georgestagg for the PR. -6. `dcast()` improves behavior for the situation that the `fun.aggregate` value of `length()` is used but not provided by the user. +6. `dcast()` improves behavior for the situation that the `fun.aggregate` value of `length()` is used but not provided by the user. a. This now triggers a warning, not a message, since relying on this default often signals unexpected duplicates in the data, [#5386](https://github.com/Rdatatable/data.table/issues/5386). The warning is classed as `dt_missing_fun_aggregate_warning`, allowing for more targeted handling in user code. Thanks @MichaelChirico for the suggestion and @Nj221102 for the fix. @@ -983,7 +987,7 @@ rowwiseDT( 14. The options `datatable.print.class` and `datatable.print.keys` are now `TRUE` by default. They have been available since v1.9.8 (Nov 2016) and v1.11.0 (May 2018) respectively. -15. Thanks to @ssh352, Václav Tlapák, Cole Miller, András Svraka and Toby Dylan Hocking for reporting and bisecting a significant performance regression in dev. This was fixed before release thanks to a PR by Jan Gorecki, [#5463](https://github.com/Rdatatable/data.table/pull/5463). +15. Thanks to @ssh352, Václav Tlapák, Cole Miller, András Svraka and Toby Dylan Hocking for reporting and bisecting a significant performance regression in dev. This was fixed before release thanks to a PR by Jan Gorecki, [#5463](https://github.com/Rdatatable/data.table/pull/5463). 16. `key(x) <- value` is now fully deprecated (from warning to error). Use `setkey()` to set a table's key. We started warning not to use this approach in 2012, with a stronger warning starting in 2019 (1.12.2). This function will be removed in the next release. diff --git a/R/fwrite.R b/R/fwrite.R index d67c886d13..ef87fbb4e8 100644 --- a/R/fwrite.R +++ b/R/fwrite.R @@ -10,6 +10,7 @@ fwrite = function(x, file="", append=FALSE, quote="auto", buffMB=8, nThread=getDTthreads(verbose), showProgress=getOption("datatable.showProgress", interactive()), compress = c("auto", "none", "gzip"), + compressLevel = 6L, yaml = FALSE, bom = FALSE, verbose=getOption("datatable.verbose", FALSE), @@ -18,12 +19,10 @@ fwrite = function(x, file="", append=FALSE, quote="auto", if (length(encoding) != 1L || !encoding %chin% c("", "UTF-8", "native")) { stopf("Argument 'encoding' must be '', 'UTF-8' or 'native'.") } - if (missing(qmethod)) qmethod = qmethod[1L] - if (missing(compress)) compress = compress[1L] - if (missing(dateTimeAs)) { dateTimeAs = dateTimeAs[1L] } - else if (length(dateTimeAs)>1L) stopf("dateTimeAs must be a single string") - dateTimeAs = chmatch(dateTimeAs, c("ISO","squash","epoch","write.csv"))-1L - if (is.na(dateTimeAs)) stopf("dateTimeAs must be 'ISO','squash','epoch' or 'write.csv'") + qmethod = match.arg(qmethod) + compress = match.arg(compress) + dateTimeAs = match.arg(dateTimeAs) + dateTimeAs = chmatch(dateTimeAs, c("ISO", "squash", "epoch", "write.csv"))-1L if (!missing(logical01) && !missing(logicalAsInt)) stopf("logicalAsInt has been renamed logical01. Use logical01 only, not both.") if (!missing(logicalAsInt)) { @@ -34,6 +33,7 @@ fwrite = function(x, file="", append=FALSE, quote="auto", scipen = if (is.numeric(scipen)) as.integer(scipen) else 0L buffMB = as.integer(buffMB) nThread = as.integer(nThread) + compressLevel = as.integer(compressLevel) # write.csv default is 'double' so fwrite follows suit. write.table's default is 'escape' # validate arguments if (is.matrix(x)) { # coerce to data.table if input object is matrix @@ -46,7 +46,8 @@ fwrite = function(x, file="", append=FALSE, quote="auto", x = as.data.table(x) } } - stopifnot(is.list(x), + stopifnot( + is.list(x), identical(quote,"auto") || isTRUEorFALSE(quote), is.character(sep) && length(sep)==1L && (nchar(sep) == 1L || identical(sep, "")), is.character(sep2) && length(sep2)==3L && nchar(sep2[2L])==1L, @@ -55,6 +56,7 @@ fwrite = function(x, file="", append=FALSE, quote="auto", is.character(eol) && length(eol)==1L, length(qmethod) == 1L && qmethod %chin% c("double", "escape"), length(compress) == 1L && compress %chin% c("auto", "none", "gzip"), + length(compressLevel) == 1L && 0 <= compressLevel && compressLevel <= 9, isTRUEorFALSE(col.names), isTRUEorFALSE(append), isTRUEorFALSE(row.names), isTRUEorFALSE(verbose), isTRUEorFALSE(showProgress), isTRUEorFALSE(logical01), isTRUEorFALSE(bom), @@ -62,7 +64,7 @@ fwrite = function(x, file="", append=FALSE, quote="auto", is.character(file) && length(file)==1L && !is.na(file), length(buffMB)==1L && !is.na(buffMB) && 1L<=buffMB && buffMB<=1024L, length(nThread)==1L && !is.na(nThread) && nThread>=1L - ) + ) is_gzip = compress == "gzip" || (compress == "auto" && endsWithAny(file, ".gz")) @@ -119,7 +121,7 @@ fwrite = function(x, file="", append=FALSE, quote="auto", file = enc2native(file) # CfwriteR cannot handle UTF-8 if that is not the native encoding, see #3078. .Call(CfwriteR, x, file, sep, sep2, eol, na, dec, quote, qmethod=="escape", append, row.names, col.names, logical01, scipen, dateTimeAs, buffMB, nThread, - showProgress, is_gzip, bom, yaml, verbose, encoding) + showProgress, is_gzip, compressLevel, bom, yaml, verbose, encoding) invisible() } diff --git a/inst/tests/tests.Rraw b/inst/tests/tests.Rraw index 92db5262c4..fc3a62a6fd 100644 --- a/inst/tests/tests.Rraw +++ b/inst/tests/tests.Rraw @@ -159,6 +159,9 @@ TZnotUTC = !identical(tt,"") && !is_utc(tt) # (3) function factory for matching messages exactly by substituting anything between delimiters [delim, fmt=TRUE] # (4) function factory for matching messages exactly by substituting a generic string [fmt=string] get_msg = function(e, delim, fmt=FALSE) { + ufq = options(useFancyQuotes = FALSE) # otherwise we get angled quotes, hard to match robustly + on.exit(options(ufq)) + condition = tryCatch({e; NULL}, error=identity, warning=identity) if (is.null(condition)) return(condition) msg = condition$message @@ -170,17 +173,13 @@ get_msg = function(e, delim, fmt=FALSE) { sprintf("%s%s%s", delim[1L], if (fmt) "%s" else ".+", delim[2L]), msg ) - if (fmt) return(function(x) sprintf(msg, x)) + if (fmt) return(function(...) sprintf(msg, ...)) return(msg) } base_messages = list( missing_object = get_msg(`__dt_test_missing_` + 1, "'", fmt=TRUE), missing_function = get_msg(`__dt_test_missing_`(), '"', fmt=TRUE), - missing_coerce_method = get_msg(delim = '"', { - old = options(useFancyQuotes = FALSE) # otherwise we get angled quotes, hard to match robustly - on.exit(options(old)) - methods::as(TRUE, 'foo') - }), + missing_coerce_method = get_msg(methods::as(TRUE, 'foo'), delim = '"'), missing_dispatch_method = get_msg(conditionMessage(structure(1, class="foo")), '[\'"]'), invalid_arg_unary_operator = get_msg(-'a'), invalid_arg_binary_operator = get_msg(1 + 'a'), @@ -199,6 +198,8 @@ base_messages = list( stopifnot = get_msg(stopifnot(FALSE), fmt="FALSE"), not_yet_used = get_msg(.NotYetUsed("abc"), "'", fmt=TRUE), # NB: need fmt= because the English message has '(yet)' --> parens in regex ambiguous_date_fmt = get_msg(as.Date('xxx')), + match_arg_length = get_msg(match.arg(c('a', 'b'), letters)), + match_arg_4_choices = get_msg(match.arg('e', letters[1:4]), delim='"', fmt=TRUE), NULL ) @@ -9981,7 +9982,7 @@ test(1658.27, fwrite(DT, na="NA", verbose=TRUE), output='Writing bom .false., ya test(1658.28, fwrite(ok_dt, 1), error=base_messages$stopifnot("is.character(file) && length(file) == 1L && !is.na(file)")) test(1658.29, fwrite(ok_dt, quote=123), error="identical\\(quote.*auto.*FALSE.*TRUE") test(1658.30, fwrite(ok_dt, sep="..."), error="nchar(sep)") -test(1658.31, fwrite(ok_dt, qmethod=c("double", "double")), error="length(qmethod)") +test(1658.31, fwrite(ok_dt, qmethod=c("double", "double")), error=base_messages$match_arg_length) test(1658.32, fwrite(ok_dt, col.names="foobar"), error="isTRUEorFALSE(col.names)") # null data table (no columns) @@ -10023,8 +10024,12 @@ if (!haszlib()) { test(1658.423, file.info(f1)$size < file.info(f2)$size) # 74 < 804 (file.size() isn't available in R 3.1.0) if (test_R.utils) test(1658.43, fread(f1), DT) # use fread to decompress gz (works cross-platform) fwrite(DT, file=f3<-tempfile(), compress="gzip") # compress to filename not ending .gz + fwrite(DT, file=f4<-tempfile(), compress="gzip", compressLevel=1) # test compressLevel + fwrite(DT, file=f5<-tempfile(), compress="gzip", compressLevel=9) test(1658.441, file.info(f3)$size, file.info(f1)$size) - unlink(c(f1,f2,f3)) + test(1658.442, file.info(f4)$size >= file.info(f1)$size) + test(1658.443, file.info(f1)$size >= file.info(f5)$size) + unlink(c(f1,f2,f3,f4,f5)) } DT = data.table(a=1:3, b=list(1:4, c(3.14, 100e10), c("foo", "bar", "baz"))) test(1658.45, fwrite(DT), output=c("a,b","1,1|2|3|4","2,3.14|1e+12","3,foo|bar|baz")) @@ -10947,9 +10952,9 @@ DT = data.table( D = as.POSIXct(dt<-paste(d,t), tz="UTC"), E = as.POSIXct(paste0(dt,c(".999",".0",".5",".111112",".123456",".023",".0",".999999",".99",".0009")), tz="UTC")) -test(1740.0, fwrite(DT,dateTimeAs="iso"), error="dateTimeAs must be 'ISO','squash','epoch' or 'write.csv'") -test(1740.1, fwrite(DT,dateTimeAs=c("ISO","squash")), error="dateTimeAs must be a single string") -test(1740.2, capture.output(fwrite(DT,dateTimeAs="ISO")), c( +test(1740.1, fwrite(DT,dateTimeAs="iso"), error=base_messages$match_arg_4_choices("ISO", "squash", "epoch", "write.csv")) +test(1740.2, fwrite(DT,dateTimeAs=c("ISO","squash")), error=base_messages$match_arg_length) +test(1740.3, capture.output(fwrite(DT,dateTimeAs="ISO")), c( "A,B,C,D,E", "1907-10-21,1907-10-21,23:59:59,1907-10-21T23:59:59Z,1907-10-21T23:59:59.999Z", "1907-10-22,1907-10-22,00:00:00,1907-10-22T00:00:00Z,1907-10-22T00:00:00Z", @@ -10961,7 +10966,7 @@ test(1740.2, capture.output(fwrite(DT,dateTimeAs="ISO")), c( "1999-12-31,1999-12-31,01:23:45,1999-12-31T01:23:45Z,1999-12-31T01:23:45.999999Z", "2000-02-29,2000-02-29,23:59:59,2000-02-29T23:59:59Z,2000-02-29T23:59:59.990Z", "2016-09-12,2016-09-12,01:30:30,2016-09-12T01:30:30Z,2016-09-12T01:30:30.000900Z")) -test(1740.3, capture.output(fwrite(DT,dateTimeAs="squash")), c( +test(1740.4, capture.output(fwrite(DT,dateTimeAs="squash")), c( "A,B,C,D,E", "19071021,19071021,235959,19071021235959000,19071021235959999", "19071022,19071022,000000,19071022000000000,19071022000000000", @@ -10973,7 +10978,7 @@ test(1740.3, capture.output(fwrite(DT,dateTimeAs="squash")), c( "19991231,19991231,012345,19991231012345000,19991231012345999", "20000229,20000229,235959,20000229235959000,20000229235959990", "20160912,20160912,013030,20160912013030000,20160912013030000")) -test(1740.4, capture.output(fwrite(DT,dateTimeAs="epoch")), c( +test(1740.5, capture.output(fwrite(DT,dateTimeAs="epoch")), c( "A,B,C,D,E", "-22718,-22718,86399,-1962748801,-1962748800.001", "-22717,-22717,0,-1962748800,-1962748800", diff --git a/man/fwrite.Rd b/man/fwrite.Rd index efa830fb9d..fb052a2093 100644 --- a/man/fwrite.Rd +++ b/man/fwrite.Rd @@ -18,6 +18,7 @@ fwrite(x, file = "", append = FALSE, quote = "auto", buffMB = 8L, nThread = getDTthreads(verbose), showProgress = getOption("datatable.showProgress", interactive()), compress = c("auto", "none", "gzip"), + compressLevel = 6L, yaml = FALSE, bom = FALSE, verbose = getOption("datatable.verbose", FALSE), @@ -58,6 +59,7 @@ fwrite(x, file = "", append = FALSE, quote = "auto", \item{nThread}{The number of threads to use. Experiment to see what works best for your data on your hardware.} \item{showProgress}{ Display a progress meter on the console? Ignored when \code{file==""}. } \item{compress}{If \code{compress = "auto"} and if \code{file} ends in \code{.gz} then output format is gzipped csv else csv. If \code{compress = "none"}, output format is always csv. If \code{compress = "gzip"} then format is gzipped csv. Output to the console is never gzipped even if \code{compress = "gzip"}. By default, \code{compress = "auto"}.} + \item{compressLevel}{Level of compression between 1 and 9, 6 by default. See \url{https://linux.die.net/man/1/gzip} for details.} \item{yaml}{If \code{TRUE}, \code{fwrite} will output a CSVY file, that is, a CSV file with metadata stored as a YAML header, using \code{\link[yaml]{as.yaml}}. See \code{Details}. } \item{bom}{If \code{TRUE} a BOM (Byte Order Mark) sequence (EF BB BF) is added at the beginning of the file; format 'UTF-8 with BOM'.} \item{verbose}{Be chatty and report timings?} diff --git a/src/data.table.h b/src/data.table.h index e597fb0d45..3b3860cbe5 100644 --- a/src/data.table.h +++ b/src/data.table.h @@ -294,7 +294,7 @@ SEXP chmatch_R(SEXP, SEXP, SEXP); SEXP chmatchdup_R(SEXP, SEXP, SEXP); SEXP chin_R(SEXP, SEXP); SEXP freadR(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); -SEXP fwriteR(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); +SEXP fwriteR(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); SEXP rbindlist(SEXP, SEXP, SEXP, SEXP, SEXP); SEXP setlistelt(SEXP, SEXP, SEXP); SEXP address(SEXP); diff --git a/src/fwrite.c b/src/fwrite.c index e6d149750e..41f398f1a0 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -26,12 +26,10 @@ #include "fwriteLookups.h" #include "fwrite.h" +#define MEGA (1 << 20) #define NUM_SF 15 #define SIZE_SF 1000000000000000ULL // 10^NUM_SF -#define MIN(a,b) (((a)<(b))?(a):(b)) -#define MAX(a,b) (((a)>(b))?(a):(b)) - // Globals for this file only. Written once to hold parameters passed from R level. static const char *na; // by default "" or if set (not recommended) then usually "NA" static char sep; // comma in .csv files @@ -43,6 +41,7 @@ static bool qmethodEscape=false; // when quoting fields, how to escape dou static int scipen; static bool squashDateTime=false; // 0=ISO(yyyy-mm-dd) 1=squash(yyyymmdd) static bool verbose=false; +static int gzip_level; extern const char *getString(const void *, int64_t); extern int getStringLen(const void *, int64_t); @@ -56,7 +55,8 @@ inline void write_chars(const char *x, char **pch) { // similar to C's strcpy but i) doesn't include trailing \0 and ii) moves destination along char *ch = *pch; - while (*x) *ch++=*x++; + while (*x) + *ch++ = *x++; *pch = ch; } @@ -113,7 +113,7 @@ void writeInt32(const void *col, int64_t row, char **pch) if (x == INT32_MIN) { write_chars(na, &ch); } else { - if (x<0) { *ch++ = '-'; x=-x; } + if (x<0) { *ch++ = '-'; x = -x; } // Avoid log() for speed. Write backwards then reverse when we know how long. char *low = ch; do { *ch++ = '0'+x%10; x/=10; } while (x>0); @@ -129,7 +129,7 @@ void writeInt64(const void *col, int64_t row, char **pch) if (x == INT64_MIN) { write_chars(na, &ch); } else { - if (x<0) { *ch++ = '-'; x=-x; } + if (x<0) { *ch++ = '-'; x = -x; } char *low = ch; do { *ch++ = '0'+x%10; x/=10; } while (x>0); reverse(ch, low); @@ -252,7 +252,7 @@ void writeFloat64(const void *col, int64_t row, char **pch) int dr = sf-exp-1; // how many characters to print to the right of the decimal place int width=0; // field width were it written decimal format. Used to decide whether to or not. int dl0=0; // how many 0's to add to the left of the decimal place before starting l - if (dr<=0) { dl0=-dr; dr=0; width=sf+dl0; } // 1, 10, 100, 99000 + if (dr<=0) { dl0 = -dr; dr=0; width=sf+dl0; } // 1, 10, 100, 99000 else { if (sf>dr) width=sf+1; // 1.234 and 123.4 else { dl0=1; width=dr+1+dl0; } // 0.1234, 0.0001234 @@ -285,7 +285,7 @@ void writeFloat64(const void *col, int64_t row, char **pch) *ch = '0' + l; ch += sf + (sf>1); *ch++ = 'e'; // lower case e to match base::write.csv - if (exp < 0) { *ch++ = '-'; exp=-exp; } + if (exp < 0) { *ch++ = '-'; exp = -exp; } else { *ch++ = '+'; } // to match base::write.csv if (exp < 100) { *ch++ = '0' + (exp / 10); @@ -562,14 +562,18 @@ void writeCategString(const void *col, int64_t row, char **pch) #ifndef NOZLIB int init_stream(z_stream *stream) { - memset(stream, 0, sizeof(z_stream)); // shouldn't be needed, done as part of #4099 to be sure stream->next_in = Z_NULL; stream->zalloc = Z_NULL; stream->zfree = Z_NULL; stream->opaque = Z_NULL; - - // 31 comes from : windows bits 15 | 16 gzip format - int err = deflateInit2(stream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 31, 8, Z_DEFAULT_STRATEGY); + // In deflateInit2, windowBits can be –8..–15 for raw deflate, 15 is the bigger window size. + // With -15, deflate() will generate raw deflate data with no zlib header or trailer, and will not compute a check value. + // Previously this parameter was 31 because windowBits can be greater than 15 for optional gzip encoding. + // Adding 16 writes a simple gzip header and trailer around the compressed data. + // Now we manage header and trailer. gzip file is slighty lower with -15 because no header/trailer are + // written for each chunk. + // For memLevel, 8 is the default value (128 KiB). memLevel=9 uses maximum memory for optimal speed. To be tested ? + int err = deflateInit2(stream, gzip_level, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY); return err; // # nocov } @@ -579,18 +583,17 @@ int compressbuff(z_stream *stream, void* dest, size_t *destLen, const void* sour stream->avail_out = *destLen; stream->next_in = (Bytef *)source; // don't use z_const anywhere; #3939 stream->avail_in = sourceLen; - int err = deflate(stream, Z_FINISH); - if (err == Z_OK) { - // with Z_FINISH, deflate must return Z_STREAM_END if correct, otherwise it's an error and we shouldn't return Z_OK (0) - err = -9; // # nocov - } - *destLen = stream->total_out; - return err == Z_STREAM_END ? Z_OK : err; + int err = deflate(stream, Z_SYNC_FLUSH); + *destLen = *destLen - stream->avail_out; + // *destLen = stream->total_out; + return (err != Z_STREAM_ERROR) ? Z_OK : err; } #endif /* - OpenMP is used here primarily to parallelize the process of writing rows + main fwrite function ---- + +OpenMP is used here primarily to parallelize the process of writing rows to the output file, but error handling and compression (if enabled) are also managed within the parallel region. Special attention is paid to thread safety and synchronization, especially in the ordered sections @@ -611,22 +614,31 @@ void fwriteMain(fwriteMainArgs args) doQuote = args.doQuote; int8_t quoteHeaders = args.doQuote; verbose = args.verbose; + gzip_level = args.gzip_level; + + size_t len; + unsigned int crc; + + // exit if compression is needed in param and no zlib +#ifdef NOZLIB + if (args.is_gzip) + STOP(_("Compression in fwrite uses zlib library. Its header files were not found at the time data.table was compiled. To enable fwrite compression, please reinstall data.table and study the output for further guidance.")); // # nocov +#endif // When NA is a non-empty string, then we must quote all string fields in case they contain the na string // na is recommended to be empty, though - if (na[0]!='\0' && doQuote==INT8_MIN) doQuote = true; + if (na[0]!='\0' && doQuote==INT8_MIN) + doQuote = true; qmethodEscape = args.qmethodEscape; squashDateTime = args.squashDateTime; - if (args.buffMB<1 || args.buffMB>1024) STOP(_("buffMB=%d outside [1,1024]"), args.buffMB); - size_t buffSize = (size_t)1024*1024*args.buffMB; - int eolLen=strlen(args.eol), naLen=strlen(args.na); // Aside: codacy wants strnlen but strnlen is not in C99 (neither is strlen_s). To pass `gcc -std=c99 -Wall -pedantic` // we'd need `#define _POSIX_C_SOURCE 200809L` before #include but that seems a step too far // and platform specific. We prefer to be pure C99. - if (eolLen<=0) STOP(_("eol must be 1 or more bytes (usually either \\n or \\r\\n) but is length %d"), eolLen); + if (eolLen<=0) + STOP(_("eol must be 1 or more bytes (usually either \\n or \\r\\n) but is length %d"), eolLen); if (verbose) { DTPRINT(_("Column writers: ")); @@ -642,6 +654,8 @@ void fwriteMain(fwriteMainArgs args) // # notranslate end } + // Calc maxLineLen + // // Calculate upper bound for line length. Numbers use a fixed maximum (e.g. 12 for integer) while strings find the longest // string in each column. Upper bound is then the sum of the columns' max widths. // This upper bound is required to determine a reasonable rowsPerBatch. It also saves needing to grow the buffers which @@ -652,10 +666,10 @@ void fwriteMain(fwriteMainArgs args) // could be console output) and writing column names to it. double t0 = wallclock(); - size_t maxLineLen = eolLen + args.ncol*(2*(doQuote!=0) + sepLen); + size_t maxLineLen = eolLen + args.ncol * (2*(doQuote!=0) + sepLen); if (args.doRowNames) { - maxLineLen += args.rowNames==NULL ? 1+(int)log10(args.nrow) // the width of the row number - : (args.rowNameFun==WF_String ? getMaxStringLen(args.rowNames, args.nrow)*2 // *2 in case longest row name is all quotes (!) and all get escaped + maxLineLen += args.rowNames==NULL ? 1 + (int)log10(args.nrow) // the width of the row number + : (args.rowNameFun==WF_String ? getMaxStringLen(args.rowNames, args.nrow) * 2 // *2 in case longest row name is all quotes (!) and all get escaped : 11); // specific integer names could be MAX_INT 2147483647 (10 chars) even on a 5 row table, and data.frame allows negative integer rownames hence 11 for the sign maxLineLen += 2/*possible quotes*/ + sepLen; } @@ -676,17 +690,20 @@ void fwriteMain(fwriteMainArgs args) INTERNAL_STOP("type %d has no max length method implemented", args.whichFun[j]); // # nocov } } - if (args.whichFun[j]==WF_Float64 && args.scipen>0) width+=MIN(args.scipen,350); // clamp width to IEEE754 max to avoid scipen=99999 allocating buffer larger than can ever be written - if (width 0) + // clamp width to IEEE754 max to avoid scipen=99999 allocating buffer larger than can ever be written + width += (args.scipen < 350) ? args.scipen : 350; + if (width < naLen) + width = naLen; + maxLineLen += width * 2; // *2 in case the longest string is all quotes and they all need to be escaped } - if (verbose) DTPRINT(_("maxLineLen=%"PRIu64". Found in %.3fs\n"), (uint64_t)maxLineLen, 1.0*(wallclock()-t0)); + if (verbose) + DTPRINT(_("maxLineLen=%"PRIu64". Found in %.3fs\n"), (uint64_t)maxLineLen, 1.0*(wallclock()-t0)); - int f=0; + int f = 0; if (*args.filename=='\0') { - f=-1; // file="" means write to standard output + f = -1; // file="" means write to standard output args.is_gzip = false; // gzip is only for file - // eol = "\n"; // We'll use DTPRINT which converts \n to \r\n inside it on Windows } else { #ifdef WIN32 f = _open(args.filename, _O_WRONLY | _O_BINARY | _O_CREAT | (args.append ? _O_APPEND : _O_TRUNC), _S_IWRITE); @@ -706,42 +723,153 @@ void fwriteMain(fwriteMainArgs args) // # nocov end } } -#ifdef NOZLIB - if (args.is_gzip) - STOP(_("Compression in fwrite uses zlib library. Its header files were not found at the time data.table was compiled. To enable fwrite compression, please reinstall data.table and study the output for further guidance.")); // # nocov -#endif int yamlLen = strlen(args.yaml); if (verbose) { - DTPRINT(_("Writing bom (%s), yaml (%d characters) and column names (%s) ... "), - args.bom?"true":"false", yamlLen, args.colNames?"true":"false"); - if (f==-1) DTPRINT(_("\n")); + DTPRINT(_("Writing bom (%s), yaml (%d characters) and column names (%s)\n"), + args.bom ? "true" : "false", yamlLen, args.colNames ? "true" : "false"); + if (f == -1) + DTPRINT(_("\n")); } + + // Calc headerLen + size_t headerLen = 0; - if (args.bom) headerLen += 3; + if (args.bom) + headerLen += 3; headerLen += yamlLen; if (args.colNames) { - for (int j=0; j> column name) + for (int j=0; j> column name) } + + // Create heap zones ---- + + int nth = args.nth; + + // Calc buffSize + if (args.buffMB < 1 || args.buffMB > 1024) + STOP(_("buffMB = %d is outside range 1..1024, exiting"), args.buffMB); + size_t buffSize = args.buffMB * MEGA; + + // Decide buffer size and rowsPerBatch for each thread + // Once rowsPerBatch is decided it can't be changed + + // if maxLineLen is greater than buffize, increase buffSize + if (buffSize < maxLineLen) { + buffSize = maxLineLen; + } + // ensure buffer can take header line + if (nth * buffSize < headerLen) { + buffSize = headerLen / nth + 1; + } + + int rowsPerBatch = buffSize / maxLineLen; + // + 1 because of the last incomplete loop + int numBatches = args.nrow / rowsPerBatch + 1; + + // force 1 batch, and then 1 thread, if number of lines in table < rowsPerBatch + if (args.nrow < rowsPerBatch) { + rowsPerBatch = args.nrow; + numBatches = 1; + } + + // avoid useless threads, and then too big memory allocations + if (numBatches < nth) + nth = numBatches; + + if (verbose) { + DTPRINT(_("Writing %"PRId64" rows in %d batches of %d rows, each buffer size %zu bytes (%zu MiB), showProgress=%d, nth=%d\n"), + args.nrow, numBatches, rowsPerBatch, buffSize, buffSize / MEGA, args.showProgress, nth); + } + + // alloc nth write buffers + errno=0; + size_t alloc_size = nth * buffSize; + if (verbose) { + DTPRINT(_("Allocate %zu bytes (%zu MiB) for buffPool\n"), alloc_size, alloc_size / MEGA); + } + char *buffPool = malloc(alloc_size); + if (!buffPool) { + // # nocov start + STOP(_("Unable to allocate %zu MB * %d thread buffers; '%d: %s'. Please read ?fwrite for nThread, buffMB and verbose options."), + buffSize / MEGA, nth, errno, strerror(errno)); + // # nocov end + } + + // init compress variables +#ifndef NOZLIB + z_stream *thread_streams = NULL; + char *zbuffPool = NULL; + size_t zbuffSize = 0; + size_t compress_len = 0; + if (args.is_gzip) { + // alloc zlib streams + thread_streams = (z_stream*) malloc(nth * sizeof(z_stream)); + if (verbose) { + DTPRINT(_("Allocate %zu bytes for thread_streams\n"), nth * sizeof(z_stream)); + } + if (!thread_streams) + STOP(_("Failed to allocated %d bytes for threads_streams."), (int)(nth * sizeof(z_stream))); + // VLA on stack should be fine for nth structs; in zlib v1.2.11 sizeof(struct)==112 on 64bit + // not declared inside the parallel region because solaris appears to move the struct in + // memory when the #pragma omp for is entered, which causes zlib's internal self reference + // pointer to mismatch, #4099 + + // compute zbuffSize which is the same for each thread + z_stream *stream = thread_streams; + if (init_stream(stream) != Z_OK) + STOP(_("Can't init stream structure for deflateBound")); + zbuffSize = deflateBound(stream, buffSize); + if (verbose) + DTPRINT(_("zbuffSize=%d returned from deflateBound\n"), (int)zbuffSize); + + // alloc nth zlib buffers + // if headerLen > nth * zbuffSize (long variable names and 1 thread), alloc headerLen + alloc_size = nth * zbuffSize < headerLen ? headerLen : nth * zbuffSize; + if (verbose) { + DTPRINT(_("Allocate %zu bytes (%zu MiB) for zbuffPool\n"), alloc_size, alloc_size / MEGA); + } + zbuffPool = malloc(alloc_size); + if (!zbuffPool) { + // # nocov start + free(buffPool); + STOP(_("Unable to allocate %zu MiB * %d thread compressed buffers; '%d: %s'. Please read ?fwrite for nThread, buffMB and verbose options."), + zbuffSize / MEGA, nth, errno, strerror(errno)); + // # nocov end + } + } +#endif + + // write header + + // use buffPool and zbuffPool because we ensure that allocation is minimum headerLen + if (headerLen) { - char *buff = malloc(headerLen); - if (!buff) - STOP(_("Unable to allocate %zu MiB for header: %s"), headerLen / 1024 / 1024, strerror(errno)); // # nocov + char *buff = buffPool; char *ch = buff; - if (args.bom) {*ch++=(char)0xEF; *ch++=(char)0xBB; *ch++=(char)0xBF; } // 3 appears above (search for "bom") + if (args.bom) { + *ch++=(char)0xEF; + *ch++=(char)0xBB; + *ch++=(char)0xBF; + } // 3 appears above (search for "bom") memcpy(ch, args.yaml, yamlLen); ch += yamlLen; if (args.colNames) { if (args.doRowNames) { // Unusual: the extra blank column name when row_names are added as the first column - if (doQuote!=0/*'auto'(NA) or true*/) { *ch++='"'; *ch++='"'; } // to match write.csv + if (doQuote !=0) { + // to match write.csv + *ch++='"'; + *ch++='"'; + } *ch = sep; ch += sepLen; } int8_t tempDoQuote = doQuote; doQuote = quoteHeaders; // temporary overwrite since headers might get different quoting behavior, #2964 - for (int j=0; j buffSize ? headerLen : buffSize); - char *zbuff = malloc(zbuffSize); - if (!zbuff) { - free(buff); // # nocov - STOP(_("Unable to allocate %zu MiB for zbuffer: %s"), zbuffSize / 1024 / 1024, strerror(errno)); // # nocov - } + z_stream *stream = thread_streams; + if (init_stream(stream) != Z_OK) + STOP(_("Can't init stream structure for writing header")); + char* zbuff = zbuffPool; + // write minimal gzip header + char* header = "\037\213\10\0\0\0\0\0\0\3"; + ret0 = WRITE(f, header, 10); + compress_len += 10; + crc = crc32(0L, Z_NULL, 0); + size_t zbuffUsed = zbuffSize; - ret1 = compressbuff(&stream, zbuff, &zbuffUsed, buff, (size_t)(ch-buff)); - if (ret1==Z_OK) ret2 = WRITE(f, zbuff, (int)zbuffUsed); - deflateEnd(&stream); - free(zbuff); + len = (size_t)(ch - buff); + crc = crc32(crc, (unsigned char*)buff, len); + ret1 = compressbuff(stream, zbuff, &zbuffUsed, buff, len); + if (ret1==Z_OK) { + ret2 = WRITE(f, zbuff, (int)zbuffUsed); + compress_len += zbuffUsed; + } #endif } else { ret2 = WRITE(f, buff, (int)(ch-buff)); } - free(buff); - if (ret1 || ret2==-1) { + if (ret0 == -1 || ret1 || ret2 == -1) { // # nocov start int errwrite = errno; // capture write errno now in case close fails with a different errno CLOSE(f); - if (ret1) STOP(_("Compress gzip error: %d"), ret1); - else STOP(_("%s: '%s'"), strerror(errwrite), args.filename); + if (ret0 == -1) STOP(_("Can't write gzip header error: %d"), ret0); + else if (ret1) STOP(_("Compress gzip error: %d"), ret1); + else STOP(_("%s: '%s'"), strerror(errwrite), args.filename); // # nocov end } } } - if (verbose) DTPRINT(_("done in %.3fs\n"), 1.0*(wallclock()-t0)); + if (verbose) + DTPRINT(_("Initialization done in %.3fs\n"), 1.0*(wallclock()-t0)); if (args.nrow == 0) { - if (verbose) DTPRINT(_("No data rows present (nrow==0)\n")); - if (f!=-1 && CLOSE(f)) STOP(_("%s: '%s'"), strerror(errno), args.filename); + if (verbose) + DTPRINT(_("No data rows present (nrow==0)\n")); + if (f != -1 && CLOSE(f)) + STOP(_("%s: '%s'"), strerror(errno), args.filename); return; } - // Writing rows + // Write rows ---- - // Decide buffer size and rowsPerBatch for each thread - // Once rowsPerBatch is decided it can't be changed - int rowsPerBatch=0; - if (maxLineLen*2>buffSize) { buffSize=2*maxLineLen; rowsPerBatch=2; } - else rowsPerBatch = buffSize / maxLineLen; - if (rowsPerBatch > args.nrow) rowsPerBatch = args.nrow; - if (rowsPerBatch < 1) rowsPerBatch = 1; - int numBatches = (args.nrow-1)/rowsPerBatch + 1; - int nth = args.nth; - if (numBatches < nth) nth = numBatches; - if (verbose) { - DTPRINT(_("Writing %"PRId64" rows in %d batches of %d rows (each buffer size %dMB, showProgress=%d, nth=%d)\n"), - args.nrow, numBatches, rowsPerBatch, args.buffMB, args.showProgress, nth); - } t0 = wallclock(); bool hasPrinted = false; int maxBuffUsedPC = 0; - // compute zbuffSize which is the same for each thread - size_t zbuffSize = 0; - if(args.is_gzip){ -#ifndef NOZLIB - z_stream stream = {0}; - if(init_stream(&stream)) - STOP(_("Can't allocate gzip stream structure")); // # nocov - zbuffSize = deflateBound(&stream, buffSize); - if (verbose) DTPRINT(_("zbuffSize=%d returned from deflateBound\n"), (int)zbuffSize); - deflateEnd(&stream); -#endif - } - - errno=0; - char *buffPool = malloc(nth*(size_t)buffSize); - if (!buffPool) { - // # nocov start - STOP(_("Unable to allocate %zu MB * %d thread buffers; '%d: %s'. Please read ?fwrite for nThread, buffMB and verbose options."), - (size_t)buffSize/(1024^2), nth, errno, strerror(errno)); - // # nocov end - } - char *zbuffPool = NULL; - if (args.is_gzip) { -#ifndef NOZLIB - zbuffPool = malloc(nth*(size_t)zbuffSize); - if (!zbuffPool) { - // # nocov start - free(buffPool); - STOP(_("Unable to allocate %zu MB * %d thread compressed buffers; '%d: %s'. Please read ?fwrite for nThread, buffMB and verbose options."), - (size_t)zbuffSize/(1024^2), nth, errno, strerror(errno)); - // # nocov end - } -#endif - } - bool failed = false; // naked (unprotected by atomic) write to bool ok because only ever write true in this special paradigm int failed_compress = 0; // the first thread to fail writes their reason here when they first get to ordered section int failed_write = 0; // same. could use +ve and -ve in the same code but separate it out to trace Solaris problem, #3931 -#ifndef NOZLIB - z_stream *thread_streams = (z_stream *)malloc(nth * sizeof(z_stream)); - if (!thread_streams) - STOP(_("Failed to allocated %d bytes for '%s'."), (int)(nth * sizeof(z_stream)), "thread_streams"); // # nocov - // VLA on stack should be fine for nth structs; in zlib v1.2.11 sizeof(struct)==112 on 64bit - // not declared inside the parallel region because solaris appears to move the struct in - // memory when the #pragma omp for is entered, which causes zlib's internal self reference - // pointer to mismatch, #4099 - char failed_msg[1001] = ""; // to hold zlib's msg; copied out of zlib in ordered section just in case the msg is allocated within zlib -#endif +// main parallel loop ---- +#pragma omp parallel for ordered num_threads(nth) schedule(dynamic) + for(int64_t start=0; start < args.nrow; start += rowsPerBatch) { + int me = omp_get_thread_num(); + int my_failed_compress = 0; + char* myBuff = buffPool + me * buffSize; + char* ch = myBuff; - #pragma omp parallel num_threads(nth) - { - int me = omp_get_thread_num(); - int my_failed_compress = 0; - char *ch, *myBuff; - ch = myBuff = buffPool + me*buffSize; - - void *myzBuff = NULL; - size_t myzbuffUsed = 0; #ifndef NOZLIB - z_stream *mystream = &thread_streams[me]; - if (args.is_gzip) { - myzBuff = zbuffPool + me*zbuffSize; - if (init_stream(mystream)) { // this should be thread safe according to zlib documentation - failed = true; // # nocov - my_failed_compress = -998; // # nocov - } - } + size_t mylen = 0; + int mycrc = 0; + z_stream *mystream = &thread_streams[me]; + void *myzBuff = NULL; + size_t myzbuffUsed = 0; + if (args.is_gzip) { + myzBuff = zbuffPool + me * zbuffSize; + if (init_stream(mystream) != Z_OK) { // this should be thread safe according to zlib documentation + failed = true; // # nocov + my_failed_compress = -998; // # nocov + } + } #endif + if (failed) + continue; // Not break. Because we don't use #omp cancel yet. + int64_t end = ((args.nrow - start) < rowsPerBatch) ? args.nrow : start + rowsPerBatch; - #pragma omp for ordered schedule(dynamic) - for(int64_t start=0; start=1 because 0-columns was caught earlier. write_chars(args.eol, &ch); // overwrite last sep with eol instead - } + } // end of chunk rows loop + // compress buffer if gzip #ifndef NOZLIB if (args.is_gzip && !failed) { myzbuffUsed = zbuffSize; - int ret = compressbuff(mystream, myzBuff, &myzbuffUsed, myBuff, (size_t)(ch-myBuff)); - if (ret) { failed=true; my_failed_compress=ret; } - else deflateReset(mystream); + mylen = (size_t)(ch - myBuff); + mycrc = crc32(0, (unsigned char*)myBuff, mylen); + int ret = compressbuff(mystream, myzBuff, &myzbuffUsed, myBuff, mylen); + if (ret) { + failed=true; + my_failed_compress=ret; + } } #endif - #pragma omp ordered - { - if (failed) { - // # nocov start - if (failed_compress==0 && my_failed_compress!=0) { - failed_compress = my_failed_compress; + + // ordered region ---- +#pragma omp ordered + if (failed) { + // # nocov start + if (failed_compress==0 && my_failed_compress!=0) { + failed_compress = my_failed_compress; + } + // else another thread could have failed below while I was working or waiting above; their reason got here first + // # nocov end + } else { + errno=0; + int ret = 0; + if (f == -1) { + *ch='\0'; // standard C string end marker so DTPRINT knows where to stop + DTPRINT("%s", myBuff); + } else if (args.is_gzip) { #ifndef NOZLIB - if (mystream->msg!=NULL) strncpy(failed_msg, mystream->msg, 1000); // copy zlib's msg for safe use after deflateEnd just in case zlib allocated the message + ret = WRITE(f, myzBuff, (int)myzbuffUsed); + compress_len += myzbuffUsed; #endif - } - // else another thread could have failed below while I was working or waiting above; their reason got here first - // # nocov end } else { - errno=0; - if (f==-1) { - *ch='\0'; // standard C string end marker so DTPRINT knows where to stop - DTPRINT("%s", myBuff); - } else if ((args.is_gzip ? WRITE(f, myzBuff, (int)myzbuffUsed) - : WRITE(f, myBuff, (int)(ch-myBuff))) == -1) { - failed=true; // # nocov - failed_write=errno; // # nocov - } + ret = WRITE(f, myBuff, (int)(ch-myBuff)); + } + if (ret == -1) { + failed=true; // # nocov + failed_write=errno; // # nocov + } - int used = 100*((double)(ch-myBuff))/buffSize; // percentage of original buffMB - if (used > maxBuffUsedPC) maxBuffUsedPC = used; - double now; - if (me==0 && args.showProgress && (now=wallclock())>=nextTime && !failed) { - // See comments above inside the f==-1 clause. - // Not only is this ordered section one-at-a-time but we'll also Rprintf() here only from the - // master thread (me==0) and hopefully this will work on Windows. If not, user should set - // showProgress=FALSE until this can be fixed or removed. - // # nocov start - int ETA = (int)((args.nrow-end)*((now-startTime)/end)); - if (hasPrinted || ETA >= 2) { - if (verbose && !hasPrinted) DTPRINT("\n"); - DTPRINT(Pl_(nth, - "\rWritten %.1f%% of %"PRId64" rows in %d secs using %d thread. maxBuffUsed=%d%%. ETA %d secs. ", - "\rWritten %.1f%% of %"PRId64" rows in %d secs using %d threads. maxBuffUsed=%d%%. ETA %d secs. "), - (100.0*end)/args.nrow, args.nrow, (int)(now-startTime), nth, maxBuffUsedPC, ETA); - // TODO: use progress() as in fread - nextTime = now+1; - hasPrinted = true; - } - // # nocov end + if (args.is_gzip) { +#ifndef NOZLIB + crc = crc32_combine(crc, mycrc, mylen); + len += mylen; +#endif + } + + int used = 100 * ((double)(ch - myBuff)) / buffSize; // percentage of original buffMB + if (used > maxBuffUsedPC) + maxBuffUsedPC = used; + double now; + if (me == 0 && !failed && args.showProgress && (now=wallclock()) >= nextTime) { + // See comments above inside the f==-1 clause. + // Not only is this ordered section one-at-a-time but we'll also Rprintf() here only from the + // master thread (me==0) and hopefully this will work on Windows. If not, user should set + // showProgress=FALSE until this can be fixed or removed. + // # nocov start + int ETA = (int)((args.nrow - end) * (now-startTime) /end); + if (hasPrinted || ETA >= 2) { + if (verbose && !hasPrinted) + DTPRINT("\n"); + DTPRINT(Pl_(nth, + "\rWritten %.1f%% of %"PRId64" rows in %d secs using %d thread. maxBuffUsed=%d%%. ETA %d secs. ", + "\rWritten %.1f%% of %"PRId64" rows in %d secs using %d threads. maxBuffUsed=%d%%. ETA %d secs. "), + (100.0*end)/args.nrow, args.nrow, (int)(now-startTime), nth, maxBuffUsedPC, ETA); + // TODO: use progress() as in fread + nextTime = now + 1; + hasPrinted = true; } - // May be possible for master thread (me==0) to call R_CheckUserInterrupt() here. - // Something like: - // if (me==0) { - // failed = TRUE; // inside ordered here; the slaves are before ordered and not looking at 'failed' - // R_CheckUserInterrupt(); - // failed = FALSE; // no user interrupt so return state - // } - // But I fear the slaves will hang waiting for the master (me==0) to complete the ordered - // section which may not happen if the master thread has been interrupted. Rather than - // seeing failed=TRUE and falling through to free() and close() as intended. - // Could register a finalizer to free() and close() perhaps : - // [r-devel] http://r.789695.n4.nabble.com/checking-user-interrupts-in-C-code-tp2717528p2717722.html - // Conclusion for now: do not provide ability to interrupt. - // write() errors and malloc() fails will be caught and cleaned up properly, however. - ch = myBuff; // back to the start of my buffer ready to fill it up again + // # nocov end } } - } - // all threads will call this free on their buffer, even if one or more threads had malloc - // or realloc fail. If the initial malloc failed, free(NULL) is ok and does nothing. + if (args.is_gzip) { +#ifndef NOZLIB + deflateEnd(mystream); +#endif + } + + } // end of parallel for loop + +/* put a 4-byte integer into a byte array in LSB order */ +#define PUT4(a,b) ((a)[0]=(b), (a)[1]=(b)>>8, (a)[2]=(b)>>16, (a)[3]=(b)>>24) + + // write gzip tailer with crc and len if (args.is_gzip) { #ifndef NOZLIB - deflateEnd(mystream); + unsigned char tail[10]; + tail[0] = 3; + tail[1] = 0; + PUT4(tail + 2, crc); + PUT4(tail + 6, len); + int ret = WRITE(f, tail, 10); + compress_len += 10; + if (ret == -1) + STOP("Error: can't write gzip tailer"); #endif } - } + free(buffPool); #ifndef NOZLIB free(thread_streams); @@ -1010,14 +1104,26 @@ void fwriteMain(fwriteMainArgs args) // # nocov start if (!failed) { // clear the progress meter DTPRINT("\r " - " \r"); + " \r\n"); } else { // don't clear any potentially helpful output before error DTPRINT("\n"); } // # nocov end } - if (f!=-1 && CLOSE(f) && !failed) + if (verbose) { + if (args.is_gzip) { +#ifndef NOZLIB + DTPRINT("zlib: uncompressed length=%zu (%zu MiB), compressed length=%zu (%zu MiB), ratio=%.1f%%, crc=%x\n", + len, len / MEGA, compress_len, compress_len / MEGA, len != 0 ? (100.0 * compress_len) / len : 0, crc); +#endif + } + DTPRINT("Written %"PRId64" rows in %.3f secs using %d thread%s. MaxBuffUsed=%d%%\n", + args.nrow, 1.0*(wallclock()-t0), nth, nth ==1 ? "" : "s", maxBuffUsedPC); + } + + + if (f != -1 && CLOSE(f) && !failed) STOP("%s: '%s'", strerror(errno), args.filename); // # nocov // quoted '%s' in case of trailing spaces in the filename // If a write failed, the line above tries close() to clean up, but that might fail as well. So the @@ -1027,8 +1133,8 @@ void fwriteMain(fwriteMainArgs args) // # nocov start #ifndef NOZLIB if (failed_compress) - STOP(_("zlib %s (zlib.h %s) deflate() returned error %d with z_stream->msg==\"%s\" Z_FINISH=%d Z_BLOCK=%d. %s"), - zlibVersion(), ZLIB_VERSION, failed_compress, failed_msg, Z_FINISH, Z_BLOCK, + STOP(_("zlib %s (zlib.h %s) deflate() returned error %d Z_FINISH=%d Z_BLOCK=%d. %s"), + zlibVersion(), ZLIB_VERSION, failed_compress, Z_FINISH, Z_BLOCK, verbose ? _("Please include the full output above and below this message in your data.table bug report.") : _("Please retry fwrite() with verbose=TRUE and include the full output with your data.table bug report.")); #endif @@ -1037,5 +1143,3 @@ void fwriteMain(fwriteMainArgs args) // # nocov end } } - - diff --git a/src/fwrite.h b/src/fwrite.h index 867081a5fb..b133ec0944 100644 --- a/src/fwrite.h +++ b/src/fwrite.h @@ -113,6 +113,7 @@ typedef struct fwriteMainArgs int nth; bool showProgress; bool is_gzip; + int gzip_level; bool bom; const char *yaml; bool verbose; diff --git a/src/fwriteR.c b/src/fwriteR.c index 0d5c0969ef..b4353cd4f1 100644 --- a/src/fwriteR.c +++ b/src/fwriteR.c @@ -167,6 +167,7 @@ SEXP fwriteR( SEXP nThread_Arg, SEXP showProgress_Arg, SEXP is_gzip_Arg, + SEXP gzip_level_Arg, SEXP bom_Arg, SEXP yaml_Arg, SEXP verbose_Arg, @@ -177,6 +178,7 @@ SEXP fwriteR( fwriteMainArgs args = {0}; // {0} to quieten valgrind's uninitialized, #4639 args.is_gzip = LOGICAL(is_gzip_Arg)[0]; + args.gzip_level = INTEGER(gzip_level_Arg)[0]; args.bom = LOGICAL(bom_Arg)[0]; args.yaml = CHAR(STRING_ELT(yaml_Arg, 0)); args.verbose = LOGICAL(verbose_Arg)[0];