diff --git a/NAMESPACE b/NAMESPACE index be238d75b5..d20f87450a 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -59,6 +59,7 @@ export(setnafill) export(.Last.updated) export(fcoalesce) export(cbindlist) +export(mergelist) export(substitute2) #export(DT) # mtcars |> DT(i,j,by) #4872 #5472 diff --git a/NEWS.md b/NEWS.md index 8b15449e82..d5d4332265 100644 --- a/NEWS.md +++ b/NEWS.md @@ -6,6 +6,10 @@ 1. In `DT[, variable := value]`, when value is class `POSIXlt`, we automatically coerce it to class `POSIXct` instead, [#1724](https://github.com/Rdatatable/data.table/issues/1724). Thanks to @linzhp for the report, and Benjamin Schwendinger for the fix. +## NEW FEATURES + +1. (add example here?) New functions `cbindlist` and `mergelist` have been implemented and exported. Works like `cbind`/`merge` but takes `list` of data.tables on input. `merge` happens in `Reduce` fashion. Supports `how` (_left_, _inner_, _full_, _right_, _semi_, _anti_, _cross_) joins and `mult` argument, closes [#599](https://github.com/Rdatatable/data.table/issues/599) and [#2576](https://github.com/Rdatatable/data.table/issues/2576). + ## NOTES 1. Tests run again when some Suggests packages are missing, [#6411](https://github.com/Rdatatable/data.table/issues/6411). Thanks @aadler for the note and @MichaelChirico for the fix. diff --git a/R/mergelist.R b/R/mergelist.R index 47725d64a5..2a5c00e54f 100644 --- a/R/mergelist.R +++ b/R/mergelist.R @@ -235,5 +235,101 @@ mergepair = function(lhs, rhs, on, how, mult, lhs.cols=names(lhs), rhs.cols=name setDT(out) } +mergelist = function(l, on, cols, how=c("left","inner","full","right","semi","anti","cross"), mult, copy=TRUE, join.many=getOption("datatable.join.many")) { + verbose = getOption("datatable.verbose") + if (verbose) + p = proc.time()[[3L]] + { + if (!is.list(l) || is.data.frame(l)) + stopf("'l' must be a list") + if (!all(vapply_1b(l, is.data.table))) + stopf("Every element of 'l' list must be data.table objects") + if (!all(lengths(l))) + stopf("Tables in 'l' argument must be non-zero columns tables") + if (any(vapply_1i(l, function(x) anyDuplicated(names(x))))) + stopf("Some of the tables in 'l' have duplicated column names") + } ## l + if (!isTRUEorFALSE(copy)) + stopf("'%s' must be TRUE or FALSE", "copy") + n = length(l) + if (n<2L) { + out = if (!n) as.data.table(l) else l[[1L]] + if (copy) out = copy(out) + if (verbose) + catf("mergelist: merging %d table(s), took %.3fs\n", n, proc.time()[[3L]]-p) + return(out) + } + { + if (!is.list(join.many)) + join.many = rep(list(join.many), n-1L) + if (length(join.many)!=n-1L || !all(vapply_1b(join.many, isTRUEorFALSE))) + stopf("'join.many' must be TRUE or FALSE, or a list of such which length must be length(l)-1L") + } ## join.many + { + if (missing(mult)) + mult = NULL + if (!is.list(mult)) + mult = rep(list(mult), n-1L) + if (length(mult)!=n-1L || !all(vapply_1b(mult, function(x) is.null(x) || (is.character(x) && length(x)==1L && !anyNA(x) && x %chin% c("error","all","first","last"))))) + stopf("'mult' must be one of [error, all, first, last] or NULL, or a list of such which length must be length(l)-1L") + } ## mult + { + if (missing(how) || is.null(how)) + how = match.arg(how) + if (!is.list(how)) + how = rep(list(how), n-1L) + if (length(how)!=n-1L || !all(vapply_1b(how, function(x) is.character(x) && length(x)==1L && !anyNA(x) && x %chin% c("left","inner","full","right","semi","anti","cross")))) + stopf("'how' must be one of [left, inner, full, right, semi, anti, cross], or a list of such which length must be length(l)-1L") + } ## how + { + if (missing(cols) || is.null(cols)) { + cols = vector("list", n) + } else { + if (!is.list(cols)) + stopf("'%s' must be a list", "cols") + if (length(cols) != n) + stopf("'cols' must be same length as 'l'") + skip = vapply_1b(cols, is.null) + if (!all(vapply_1b(cols[!skip], function(x) is.character(x) && !anyNA(x) && !anyDuplicated(x)))) + stopf("'cols' must be a list of non-zero length, non-NA, non-duplicated, character vectors, or eventually NULLs (all columns)") + if (any(mapply(function(x, icols) !all(icols %chin% names(x)), l[!skip], cols[!skip]))) + stopf("'cols' specify columns not present in corresponding table") + } + } ## cols + { + if (missing(on) || is.null(on)) { + on = vector("list", n-1L) + } else { + if (!is.list(on)) + on = rep(list(on), n-1L) + if (length(on)!=n-1L || !all(vapply_1b(on, function(x) is.character(x) && !anyNA(x) && !anyDuplicated(x)))) ## length checked in dtmerge + stopf("'on' must be non-NA, non-duplicated, character vector, or a list of such which length must be length(l)-1L") + } + } ## on + + l.mem = lapply(l, vapply, address, "") + out = l[[1L]] + out.cols = cols[[1L]] + for (join.i in seq_len(n-1L)) { + rhs.i = join.i + 1L + out = mergepair( + lhs = out, rhs = l[[rhs.i]], + on = on[[join.i]], + how = how[[join.i]], mult = mult[[join.i]], + lhs.cols = out.cols, rhs.cols = cols[[rhs.i]], + copy = FALSE, ## avoid any copies inside, will copy once below + join.many = join.many[[join.i]], + verbose = verbose + ) + out.cols = copy(names(out)) + } + out.mem = vapply_1c(out, address) + if (copy) + .Call(CcopyCols, out, colnamesInt(out, names(out.mem)[out.mem %chin% unique(unlist(l.mem, recursive=FALSE))])) + if (verbose) + catf("mergelist: merging %d tables, took %.3fs\n", n, proc.time()[[3L]]-p) + out +} + seqexp = function(x) .Call(Cseqexp, x) perhaps.data.table = function(x) .Call(CperhapsDataTableR, x) diff --git a/inst/tests/other.Rraw b/inst/tests/other.Rraw index eb3e461f78..33f1dc2ebf 100644 --- a/inst/tests/other.Rraw +++ b/inst/tests/other.Rraw @@ -1,4 +1,4 @@ -pkgs = c("ggplot2", "hexbin", "plyr", "dplyr", "caret", "zoo", "xts", "gdata", "nlme", "bit64", "knitr", "parallel", "sf", "nanotime", "R.utils", "yaml") +pkgs = c("ggplot2", "hexbin", "plyr", "dplyr", "caret", "zoo", "xts", "gdata", "nlme", "bit64", "knitr", "parallel", "sf", "nanotime", "R.utils", "yaml", "DBI", "RSQLite") # First expression of this file must be as above: .gitlab-ci.yml uses parse(,n=1L) to read one expression from this file and installs pkgs. # So that these dependencies of other.Rraw are maintained in a single place. # TEST_DATA_TABLE_WITH_OTHER_PACKAGES is off by default so this other.Rraw doesn't run on CRAN. It is run by GLCI, locally in dev, and by @@ -761,3 +761,297 @@ if (loaded[["dplyr"]]) { DT = data.table(a = 1, b = 2, c = '1,2,3,4', d = 4) test(30, DT[, c := strsplit(c, ',', fixed = TRUE) %>% lapply(as.integer) %>% as.list]$c, list(1:4)) # nolint: pipe_call_linter. Mimicking MRE as filed. } + +# NB: currently, RSQLite requires DBI, so partially redundant, but future-proof. +if (loaded[["DBI"]] && loaded[["RSQLite"]]) { + # mergelist join tester vs SQLite, based on v1.9.8 non-equi join tester + + # funs ---- + + # produce SQL statement + # ln, rn: lhs names, rhs names, symmult: symmetric mult + mult_all = function(tbl, cols, ...) sprintf( + "(\n SELECT %s FROM %s\n) %s", + paste(setdiff(cols,"row_id"), collapse=", "), tbl, tbl + ) + mult_one = function(tbl, cols, on, mult) sprintf( + "(SELECT %s FROM (\n SELECT *, ROW_NUMBER() OVER (PARTITION BY %s ORDER BY row_id %s) AS rownum FROM %s\n) %s WHERE rownum=1) %s", + paste(setdiff(cols,c("row_id","rownum")), collapse=", "), + paste(on, collapse=", "), + if (mult=="first") "ASC" else "DESC", + tbl, tbl, tbl + ) + sql = function(how, on, mult, ln, rn, symmult=FALSE, notjoin=FALSE) { + stopifnot(length(on)==1L) + # building sql query + if (how=="full") { + return(sprintf( + "%s\nUNION ALL\n%s", + sql("left", on, mult, ln, rn, symmult=mult%in%c("first","last")), + sql("right", on, mult, ln, rn, symmult=mult%in%c("first","last"), notjoin=TRUE) + )) + } + nm = list() + nm[["lhs"]] = ln; nm[["rhs"]] = rn + using = sprintf("USING (%s)", paste(on, collapse=", ")) + lhs = "lhs"; rhs = "rhs" + join = if (how=="inner") { + if (mult=="all") sprintf("%s\nINNER JOIN\n%s\n%s", mult_all(lhs, nm[[lhs]]), mult_all(rhs, nm[[rhs]]), using) + else sprintf("%s\nINNER JOIN\n%s\n%s", mult_one(lhs, nm[[lhs]], on, mult), mult_one(rhs, nm[[rhs]], on, mult), using) + } else if (how=="left") { + if (mult=="all") sprintf("%s\nLEFT JOIN\n%s\n%s", mult_all(lhs, nm[[lhs]]), mult_all(rhs, nm[[rhs]]), using) + else sprintf("%s\nLEFT JOIN\n%s\n%s", (if (symmult) mult_one else mult_all)(lhs, nm[[lhs]], on, mult), mult_one(rhs, nm[[rhs]], on, mult), using) + } else if (how=="right") { ## lhs-rhs swap happens here, mult_one is applied on new rhs + if (mult=="all") sprintf("%s\nLEFT JOIN\n%s\n%s", mult_all(rhs, nm[[rhs]]), mult_all(lhs, nm[[lhs]]), using) + else sprintf("%s\nLEFT JOIN\n%s\n%s", (if (symmult) mult_one else mult_all)(rhs, nm[[rhs]], on, mult), mult_one(lhs, nm[[lhs]], on, mult), using) + } + if (how=="right") {lhs = "rhs"; rhs = "lhs"} ## this name swap is for notjoin and select below + where = if (!notjoin) "" else sprintf("\nWHERE %s IS NULL", paste(rhs, on, sep=".")) + select = sprintf("%s, %s, %s", paste(lhs, on, sep="."), + paste("lhs", setdiff(nm[["lhs"]], c("row_id",on)),sep=".",collapse=", "), + paste("rhs", setdiff(nm[["rhs"]], c("row_id",on)),sep=".",collapse=", ")) + sprintf("SELECT %s FROM\n%s%s", select, join, where) + } + + # .conn SQLite connection, if provided it will use it instead of creating temporary one + # .drop logical TRUE (default) will drop db tables before and after and populate new, when FALSE it expects tables to be populated + join.sql.equal = function(l, on, how="inner", mult="all", allow.cartesian=TRUE, .conn, .drop=TRUE, .debug=interactive(), ans, err=FALSE) { + nm = names(l) + stopifnot(is.null(nm) || identical(nm, c("x","i")) || identical(nm, c("lhs","rhs"))) + names(l) = c("lhs","rhs") + lhs = l[["lhs"]]; rhs = l[["rhs"]] + stopifnot(is.data.table(lhs), is.data.table(rhs), + is.character(how), is.character(mult), length(mult)==1L, + is.character(on), + is.logical(allow.cartesian), is.logical(.drop)) + if (err && mult=="error") { + dt = try(silent=TRUE, mergelist(list(lhs, rhs), on=on, how=how, mult=mult)) + if (!inherits(dt, "try-error")) { + if (.debug) browser() + stop("no error returned from mergelist(mult='error') but err flag set to TRUE in join.sql.equal") + } + err_msg = "mult='error' and multiple matches during merge" + if (!identical(attr(dt, "condition", TRUE)[["message"]], err_msg)) { + if (.debug) browser() + stop("different error returned than expected: ", attr(dt, "condition", TRUE)[["message"]]) + } + return(TRUE) + } + # row_id column required as SQL is not ordered, creating on R side + if (!"row_id" %in% names(lhs)) lhs = copy(lhs)[, "row_id" := seq_len(.N)] + if (!"row_id" %in% names(rhs)) rhs = copy(rhs)[, "row_id" := seq_len(.N)] + # preparing sql environment + conn = if (new.conn <- missing(.conn)) DBI::dbConnect(RSQLite::SQLite()) else .conn + if (.drop) { + try(suppressWarnings(DBI::dbSendQuery(conn, "DROP TABLE lhs;")), silent=TRUE) + try(suppressWarnings(DBI::dbSendQuery(conn, "DROP TABLE rhs;")), silent=TRUE) + DBI::dbWriteTable(conn, name="lhs", value=lhs) + DBI::dbWriteTable(conn, name="rhs", value=rhs) + } + # building sql query + s = sql(how, on, mult, names(lhs), names(rhs)) + s = paste0(s,";\n") + # run data.table and SQLite + dt = mergelist(list(lhs[,!"row_id"], rhs[,!"row_id"]), on=on, how=how, mult=mult) + sq = try(silent=TRUE, as.data.table(DBI::dbGetQuery(conn, s))) + if (inherits(sq, "try-error")) { + if (.debug) {message("error during sql statement"); browser()} + stop("error during sql statement") + } + if (!is.data.table(dt) || !is.data.table(sq)) { + if (.debug) {message("dt and sq must be data.table already"); browser()} + stop("dt and sq must be data.table already") + } + if (how %in% c("inner","full")) { + dt2 = mergelist(list(rhs[,!"row_id"], lhs[,!"row_id"]), on=on, how=how, mult=mult) + setcolorder(dt2, neworder=names(dt)) + setattr(dt, "index", integer()) + setattr(dt2, "index", integer()) + r = all.equal(dt, dt2, ignore.row.order=TRUE) + ## check it is symetric + if (!isTRUE(r)) { + if (.debug) {message("mergelist is not symmetric for ", how); browser()} + stop("mergelist is not symmetric for ", how) + } + } + setattr(sq, "index", integer()) + setattr(dt, "index", integer()) + # compare results + a = all.equal(dt, sq, ignore.row.order=TRUE) + b = all.equal(dt, sq, ignore.row.order=TRUE, ignore.col.order=TRUE) + if (!missing(ans)) { + r = all.equal(ans, sq, ignore.row.order=TRUE) + if (!isTRUE(r)) { + if (.debug) browser() + stop("sql does not match to reference answer") + } + } + if (.drop) { + suppressWarnings(DBI::dbSendQuery(conn, "DROP TABLE lhs;")) + suppressWarnings(DBI::dbSendQuery(conn, "DROP TABLE rhs;")) + } + if (new.conn) suppressWarnings(DBI::dbDisconnect(conn)) + if (isTRUE(b) && !isTRUE(a)) { + if (.debug) browser() + stop("only column order mismatch") + } + if (!isTRUE(a)) { + if (.debug) browser() + cat(sep="\n",c( + sprintf("# dtq:\nmergelist(l, on='%s', how='%s', mult='%s')", paste(on, collapse=", "), how, mult), + sprintf("# sql:\n%s", s), + a, "\n")) + } + isTRUE(a) + } + + batch.join.sql.equal = function(cases, on, hows=c("inner","left","right","full"), mults=c("all","first","last"), .debug=FALSE) { + if ("error" %in% mults) stop("mult=error is not supported") + p = proc.time()[[3L]] + conn = DBI::dbConnect(RSQLite::SQLite()) + ans = list() + dup_n = 0L + for (case in cases) { + l = data(case) + stopifnot(c("lhs","rhs") %in% names(l)) + case = as.character(case) + lhs = l$lhs; rhs = l$rhs + ans[[case]] = list() + # reuse tables, to test if affects sqlite efficiency + try(suppressWarnings(DBI::dbSendQuery(conn, "DROP TABLE lhs;")), silent = TRUE) + try(suppressWarnings(DBI::dbSendQuery(conn, "DROP TABLE rhs;")), silent = TRUE) + # row_id column required as SQL is not ordered, creating on R side + if (!"row_id" %in% names(lhs)) lhs = copy(lhs)[, "row_id" := seq_len(.N)] + if (!"row_id" %in% names(rhs)) rhs = copy(rhs)[, "row_id" := seq_len(.N)] + DBI::dbWriteTable(conn, name="lhs", value=lhs) + DBI::dbWriteTable(conn, name="rhs", value=rhs) + len = prod(length(cases), length(hows), length(mults)) + if (len > (len.warn <- getOption("tests.length.warning", 1e3))) + warning(sprintf("You are about to run %s number of tests. To suppress this warning use 'tests.length.warning' option, set to numeric threshold or Inf.", len.warn)) + for (how in hows) { + ans[[case]][[how]] = list() + for (mult in mults) { + if (!is.null(ans[[case]][[how]][[mult]])) { + dup_n = dup_n+1L + next #warning("Some tests are duplicated, so far ", dup_n) + } + ans[[case]][[how]][[mult]] = join.sql.equal(list(lhs=lhs, rhs=rhs), on=on, how=how, mult=mult, .conn=conn, .drop=FALSE, .debug=.debug) + } + } + suppressWarnings(DBI::dbSendQuery(conn, "DROP TABLE lhs;")) + suppressWarnings(DBI::dbSendQuery(conn, "DROP TABLE rhs;")) + } + suppressWarnings(DBI::dbDisconnect(conn)) + cat(sprintf("batch.join.sql.equal: %s%s tests completed in %.1fs\n", + len, if (dup_n) sprintf(" (%s duplicated)", dup_n) else "", proc.time()[[3L]] - p)) + ans + } + data = function(case) { + set.seed(108) + if (case == 1L) { # 2 match + lhs = data.table(id = c(1L,5L,3L,7L), v1=1:4) + rhs = data.table(id = c(2L,4L,3L,5L), v2=1:4) + } else if (case == 2L) { # 4 match + lhs = data.table(id = c(1L,5L,3L,7L), v1=1:4) + rhs = data.table(id = c(7L,5L,3L,1L), v2=1:4) + } else if (case == 3L) { # 1 match + lhs = data.table(id =c(1L,5L,3L,7L), v1=1:4) + rhs = data.table(id =c(1L,2L,4L,6L), v2=1:4) + } else if (case == 4L) { # 0 match + lhs = data.table(id =c(1L,5L,3L,7L), v1=1:4) + rhs = data.table(id =c(0L,2L,4L,6L), v2=1:4) + } else if (case == 5L) { # 0 match dup + lhs = data.table(id =c(1L,5L,3L,7L), v1=1:4) + rhs = data.table(id =c(0L,2L,2L,6L), v2=1:4) + } else if (case == 6L) { # 1 match dup + lhs = data.table(id =c(1L,5L,3L,7L), v1=1:4) + rhs = data.table(id =c(1L,2L,2L,6L), v2=1:4) + } else if (case == 7L) { # 1 match dup match + lhs = data.table(id =c(1L,5L,3L,7L), v1=1:4) + rhs = data.table(id =c(3L,3L,4L,6L), v2=1:4) + } else if (case == 8L) { # 2 match 2 dup match + lhs = data.table(id =c(1L,5L,3L,7L), v1=1:4) + rhs = data.table(id = c(3L,3L,7L,7L), v2=1:4) + } else if (case == 9L) { # 2 dup 2 dup + lhs = data.table(id = c(1L,5L,1L,5L), v1=1:4) + rhs = data.table(id = c(5L,5L,1L,1L), v2=1:4) + } else if (case == 10L) { # 4 dup 4 dup match + lhs = data.table(id = c(1L,1L,1L,1L), v1=1:4) + rhs = data.table(id = c(1L,1L,1L,1L), v2=1:4) + } else if (case == 11L) { # 4 dup 4 dup nomatch + lhs = data.table(id = c(1L,1L,1L,1L), v1=1:4) + rhs = data.table(id = c(2L,2L,2L,2L), v2=1:4) + } else if (case == 12L) { # no match, no overlap + lhs = data.table(id = c(1:4), v1=1:4) + rhs = data.table(id = c(6:9), v2=1:4) + } else if (case == 13L) { # all i matches + lhs = data.table(id = c(1L,5L,3L,7L,9L), v1=1:5) + rhs = data.table(id = c(7L,5L,3L,1L), v2=1:4) + } else if (case == 14L) { # dup match and 1 non-match + ## inner join short circuit test + ## what if some row is excluded but another is duplicated? nrow(i) match + lhs = data.table(id = c(1L,5L,3L,7L,3L), v1=1:5) + rhs = data.table(id = c(7L,5L,3L,2L), v2=1:4) + } else if (case == 15L) { + # does not raise error on mult="error" because dups '13' does not have matching rows! + lhs = data.table(id = as.integer(c(17,14,11,10,5,1,19,7,16,15)), v1=1:10) + rhs = data.table(id = as.integer(c(6,20,13,1,8,13,3,10,17,9)), v2=1:10) + } else if (case == 16L) { + lhs = data.table(id = sample(10L, 10L, TRUE), v1=1:10) + rhs = data.table(id = sample(10L, 10L, TRUE), v2=1:10) + } else if (case == 17L) { + lhs = data.table(id = sample(1e2L, 1e2L, TRUE), v1=1:1e2) + rhs = data.table(id = sample(1e2L, 1e2L, TRUE), v2=1:1e2) + } else if (case == 18L) { + lhs = data.table(id = sample(1e2L, 1e2L, TRUE), v1=1:1e2) + rhs = data.table(id = sample(10L, 20L, TRUE), v2=1:1e2) + } else if (case==19L) { + lhs = as.data.table(list(id=sample(1e3), v1=1:1e3)) + rhs = as.data.table(list(id=sample(1e3), v2=1:1e3)) + } else if (case==20L) { + lhs = as.data.table(list(id=sample(1e3*2L, 1e3), v1=1:1e3)) + rhs = as.data.table(list(id=sample(1e3*2L, 1e3), v2=1:1e3)) + } else if (case==21L) { + lhs = as.data.table(list(id=sample(1e3, 1e3*2L, TRUE), v1=1:1e3)) + rhs = as.data.table(list(id=sample(1e3, 1e3*2L, TRUE), v2=1:1e3)) + } else if (case==22L) { ## LHS equals RHS + lhs = data.table(id=1:2, v1=1:2) + rhs = data.table(id=1:2, v2=1:2) + } else if (case==23L) { ## cross join + lhs = data.table(id=c(1L,1L), v1=1:2) + rhs = data.table(id=c(1L,1L), v2=1:2) + } else if (case==24L) { ## cartesian match, dups on both sides of match + lhs = data.table(id=c(1L,1:2), v1=1:3) + rhs = data.table(id=c(1L,1L,3L), v2=1:3) + } else if (case==25L) { ## duplicates in RHS + lhs = data.table(id=1:2, v1=1:2) + rhs = data.table(id=c(2L,2:3), v2=1:3) + } else if (case==26L) { ## duplicates in RHS and LHS, some RHS dups does not have matches in LHS (merge.data.table+mult fails) + lhs = data.table(id=c(1:3,3L), v1=1:4) + rhs = data.table(id=c(1L,1L,3:4,4L), v2=1:5) + } else if (case==27L) { ## duplicates in RHS and LHS, some LHS dups does not have matches in RHS + lhs = data.table(id=c(1L,1L,3:4,4L), v1=1:5) + rhs = data.table(id=c(1:3,3L), v2=1:4) + } else if (case==28L) { ## duplicates in RHS and LHS + lhs = data.table(id=c(1:3,3L), v1=1:4) + rhs = data.table(id=c(1L,1L,3:4), v2=1:4) + } else if (case==29L) { ## duplicates in RHS + lhs = data.table(id=1:2, v1=1:2) + rhs = data.table(id=c(2L,2:3), v2=1:3) + } else if (case==30L) { ## duplicates in LHS + lhs = data.table(id=c(1:2,2L), v1=1:3) + rhs = data.table(id=2:3, v2=1:2) + } else if (case==31L) { + lhs = data.table(id=integer(), v1=integer()) + rhs = data.table(id=integer(), v2=integer()) + } else stop("case not found") + list(lhs=lhs, rhs=rhs) + } + + # tests ---- + + y = batch.join.sql.equal(cases=1:31, on="id", hows=c("inner","left","right","full"), mults=c("all","first","last"), .debug=interactive()) + y = rapply(y, isTRUE) + if (!all(y)) + stop(sprintf("join tests failed for %s cases:\n%s", sum(!y), paste(" ", names(y)[!y], collapse="\n"))) +} diff --git a/man/mergelist.Rd b/man/mergelist.Rd new file mode 100644 index 0000000000..bfee1aae11 --- /dev/null +++ b/man/mergelist.Rd @@ -0,0 +1,189 @@ +\name{mergelist} +\alias{mergelist} +\title{Merge multiple data.tables} +\description{ + Faster merge of multiple \code{data.table}s. +} +\usage{ + mergelist(l, on, cols, + how = c("left","inner","full","right","semi","anti","cross"), + mult, copy = TRUE, + join.many = getOption("datatable.join.many")) +} +\arguments{ + \item{l}{ \code{list} of \code{data.table}s to merge. } + \item{on}{ \code{character} vector of column names to merge on; when missing, the \emph{key} of \emph{join-to} table is used. } + \item{cols}{ \code{list} of \code{character} column names corresponding to tables in \code{l}, used to subset columns during merges. } + \item{how}{ \code{character} scalar, controls how to merge tables. Allowed values are \code{"left"} (default), \code{"inner"}, \code{"full"}, \code{"right"}, \code{"semi"}, \code{"anti"}, \code{"cross"}. See Details. } + \item{mult}{ \code{character} scalar, controls how to proceed when multiple rows in \emph{join-to} table match to the row in \emph{join-from} table. Allowed values are \code{"error"}, \code{"all"}, \code{"first"}, \code{"last"}. Default depends on \code{how}, described in \emph{details} below. See examples on how to detect duplicated matches. Using \code{"all"} is recommended together with \code{join.many=FALSE}, unless rows explosion or cartesian product are intended. } + \item{copy}{ \code{logical}, defaults to \code{TRUE}, when \code{FALSE}, then resulting object may share columns with tables in \code{l}, depending on matches. } + \item{join.many}{ \code{logical}, defaults to \code{getOption("datatable.join.many")}, which is \code{TRUE} by default; when \code{FALSE} and \code{mult="all"}, then extra check is made to ensure no \emph{many-to-many} matches exist between tables, and if they exist, then exception is raised. Works similarly to \code{allow.cartesian} option in \code{[.data.table} but is more strict. An option \code{"datatable.join.many"} controls that globally for \code{mergelist} and \code{[.data.table}. } +} +\details{ + Function should be considered experimental. Users are encouraged to provide feedback in our issue tracker. + + Merging is performed sequentially, for \code{l} of 3 tables, it will do something like \code{merge(merge(l[[1L]], l[[2L]]), l[[3L]])}. Merging does not support \emph{non-equi joins}, column names to merge on must be common in both tables on each merge. + + Arguments \code{on}, \code{how}, \code{mult}, \code{join.many} could be lists as well, each of length \code{length(l)-1L}, to provide argument to be used for each single tables pair to merge, see examples. + + Terms \emph{join-to} and \emph{join-from} depends on \code{how} argument: + \enumerate{ + \item{ \code{how="left|semi|anti"}: \emph{join-to} is \emph{RHS}, \emph{join-from} is \emph{LHS}. } + \item{ \code{how="inner|full|cross"}: treats \emph{LHS} and \emph{RHS} tables equally, terms applies to both tables. } + \item{ \code{how="right"}: \emph{join-to} is \emph{LHS}, \emph{join-from} is \emph{RHS}. } + } + + Using \code{mult="error"} will raise exception when multiple rows in \emph{join-to} table match to the row in \emph{join-from} table. It should not be used to just detect duplicates, as duplicates might not have matching row, and in such case exception will not be raised. + + Default value for argument \code{mult} depends on \code{how} argument: + \enumerate{ + \item{ \code{how="left|inner|full|right"}: sets \code{mult="error"}. } + \item{ \code{how="semi|anti"}: sets \code{mult="last"}, although works same as \code{mult="first"}. } + \item{ \code{how="cross"}: sets \code{mult="all"}. } + } + + When \code{on} argument is missing, then columns to join on will be decided based on \emph{key} depending on \code{how} argument: + \enumerate{ + \item{ \code{how="left|right|semi|anti"}: key columns of \emph{join-to} table. } + \item{ \code{how="inner|full"}: if only one table has key, then this key is used, if both tables have key, then \code{intersect(key(lhs), key(rhs))}, having its order aligned to shorter key. } + } + + When joining tables that are not directly linked to single table, e.g. snowflake schema, \emph{right} outer join can be used to optimize the sequence of merges, see examples. +} +\value{ + A new \code{data.table} based on the merged objects. +} +\note{ + Using \code{how="inner|full"} together with \code{mult!="all"} is sub-efficient. Unlike during join in \code{[.data.table}, it will apply \code{mult} on both tables. It is to ensure that the join is symmetric so \emph{LHS} and \emph{RHS} tables can be swapped, regardless of \code{mult} argument. It is always possible to apply \code{mult}-like filter manually and join using \code{mult="all"}. + + Using \code{join.many=FALSE} is sub-efficient. Note that it only takes effect when \code{mult="all"}. If input data are verified to not have duplicated matches, then this can safely use the default \code{TRUE}. Otherwise for \code{mult="all"} merges it is recommended to use \code{join.many=FALSE}, unless of course \emph{many-to-many} join, that duplicates rows, is intended. +} +\seealso{ + \code{\link{[.data.table}}, \code{\link{merge.data.table}} +} +\examples{ +l = list( + data.table(id1 = c(1:4,2:5), v1 = 1:8), + data.table(id1 = 2:3, v2 = 1:2), + data.table(id1 = 3:5, v3 = 1:3) +) +mergelist(l, on="id1") + +## using keys +l = list( + data.table(id1 = c(1:4,2:5), v1 = 1:8), + data.table(id1 = 3:5, id2 = 1:3, v2 = 1:3, key="id1"), + data.table(id2 = 1:4, v3 = 4:1, key="id2") +) +mergelist(l) + +## select columns +l = list( + data.table(id1 = c(1:4,2:5), v1 = 1:8, v2 = 8:1), + data.table(id1 = 3:5, v3 = 1:3, v4 = 3:1, v5 = 1L, key="id1") +) +mergelist(l, cols = list(NULL, c("v3","v5"))) + +## different arguments for each merge pair +l = list( + data.table(id1=1:4, id2=4:1), + data.table(id1=c(1:3,1:2), v2=c(1L,1L,1:2,2L)), + data.table(id2=4:5) +) +mergelist(l, + on = list("id1", "id2"), ## first merge on id1, second on id2 + how = list("inner", "anti"), ## first inner join, second anti join + mult = list("last", NULL)) ## use default 'mult' in second join + +## detecting duplicates matches +l = list( + data.table(id1=c(1:4,2:5), v1=1:8), ## dups in LHS are fine + data.table(id1=c(2:3,2L), v2=1:3), ## dups in RHS + data.table(id1=3:5, v3=1:3) +) +#mergelist(l, on="id1") # ERROR: mult='error' and multiple matches during merge +lapply(l[-1L], `[`, j = if (.N>1L) .SD, by = "id1") ## duplicated rows + +## 'star schema' and 'snowflake schema' examples + +### populate fact: US population by state and date + +gt = state.x77[,"Population"] +gt = data.table(state_id=seq_along(state.name), p=gt[state.name]/sum(gt), k=1L) +tt = as.IDate(paste0(as.integer(time(uspop)),"-01-01")) +tt = as.data.table(stats::approx(tt, c(uspop), tt[1L]:tt[length(tt)])) +tt = tt[, .(date=as.IDate(x), date_id=seq_along(x), pop=y, k=1L)] +fact = tt[gt, on="k", allow.cartesian=TRUE, + .(state_id=i.state_id, date_id=x.date_id, population = x.pop * i.p)] +setkeyv(fact, c("state_id","date_id")) + +### populate dimensions: time and geography + +time = data.table(key = "date_id", + date_id = seq_along(tt$date), date = tt$date, + month_id = month(tt$date), month = month.name[month(tt$date)], + year_id = year(tt$date)-1789L, year = as.character(year(tt$date)), + week_id = week(tt$date), week = as.character(week(tt$date)), + weekday_id = wday(tt$date)-1L, weekday = weekdays(tt$date) +)[weekday_id==0L, weekday_id:=7L][] +geog = data.table(key = "state_id", + state_id = seq_along(state.name), state_abb=state.abb, state_name=state.name, + division_id = as.integer(state.division), + division_name = as.character(state.division), + region_id = as.integer(state.region), + region_name = as.character(state.region) +) +rm(gt, tt) + +### denormalize 'star schema' + +l = list(fact, time, geog) +ans = mergelist(l) + +rm(l, ans) + +### turn 'star schema' into 'snowflake schema' + +make.lvl = function(x, cols) { + stopifnot(is.data.table(x)) + lvl = x[, unique(.SD), .SDcols=cols] + setkeyv(lvl, cols[1L]) + setindexv(lvl, as.list(cols)) +} +time = list( + date = make.lvl(time, c("date_id","date","year_id","month_id","week_id", + "weekday_id")), + weekday = make.lvl(time, c("weekday_id","weekday")), + week = make.lvl(time, c("week_id","week")), + month = make.lvl(time, c("month_id","month")), + year = make.lvl(time, c("year_id","year")) +) +geog = list( + state = make.lvl(geog, c("state_id","state_abb","state_name","division_id")), + division = make.lvl(geog, c("division_id","division_name","region_id")), + region = make.lvl(geog, c("region_id","region_name")) +) + +### denormalize 'snowflake schema' + +#### left join all +l = c(list(fact=fact), time, geog) +ans = mergelist(l) + +rm(ans) +#### merge hierarchies alone, reduce sizes in merges of geog dimension +ans = mergelist(list( + fact, + mergelist(time), + mergelist(rev(geog), how="right") +)) + +rm(ans) +#### same but no unnecessary copies +ans = mergelist(list( + fact, + mergelist(time, copy=FALSE), + mergelist(rev(geog), how="right", copy=FALSE) +)) +} +\keyword{ data }