diff --git a/.github/workflows/R-CMD-check.yaml b/.github/workflows/R-CMD-check.yaml index 758f866..a808d97 100644 --- a/.github/workflows/R-CMD-check.yaml +++ b/.github/workflows/R-CMD-check.yaml @@ -47,6 +47,11 @@ jobs: extra-packages: any::rcmdcheck needs: check + - name: Start Redis + if: ${{ matrix.config.os != 'macos-latest' }} + run: | + ./scripts/redis start + - uses: r-lib/actions/check-r-package@v2 with: upload-snapshots: true diff --git a/DESCRIPTION b/DESCRIPTION index 2e8b3dc..34add46 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -22,7 +22,7 @@ Imports: porcelain, R6, redux, - rrq, + rrq (>= 0.7.15), withr Suggests: fs, diff --git a/R/queue.R b/R/queue.R index 9e9ff4f..09a37f4 100644 --- a/R/queue.R +++ b/R/queue.R @@ -26,14 +26,28 @@ Queue <- R6::R6Class("Queue", #nolint } # Create queue - self$controller <- rrq::rrq_controller2( + self$controller <- rrq::rrq_controller( queue_id %||% orderly_queue_id() ) worker_config <- rrq::rrq_worker_config(heartbeat_period = 10) - rrq::rrq_worker_config_save2("localhost", worker_config, + rrq::rrq_worker_config_save("localhost", worker_config, controller = self$controller) }, + submit = function(reportname, parameters = NULL, + branch = "master", ref = "HEAD") { + run_args <- list( + self$root, + reportname, + parameters, + branch, + ref + ) + rrq::rrq_task_create_call(runner_run, run_args, + separate_process = TRUE, + controller = self$controller) + }, + # Just until we add queue status for testing number_of_workers = function() { rrq::rrq_worker_len(self$controller) diff --git a/R/runner.R b/R/runner.R new file mode 100644 index 0000000..a08694a --- /dev/null +++ b/R/runner.R @@ -0,0 +1,64 @@ +runner_run <- function(orderly_root, reportname, parameters, branch, ref, ...) { + # Setup + worker_id <- Sys.getenv("RRQ_WORKER_ID") + worker_path <- file.path(orderly_root, ".packit", "workers", worker_id) + point_head_to_ref(worker_path, branch, ref) + + # Initial cleanup + git_clean(worker_path) + + # Run + withr::with_envvar( + c(ORDERLY_SRC_ROOT = file.path(worker_path, "src", reportname)), + orderly2::orderly_run(reportname, parameters = parameters, + root = orderly_root, ...) + ) + + # Cleanup + git_clean(worker_path) +} + +point_head_to_ref <- function(worker_path, branch, ref) { + gert::git_fetch(repo = worker_path) + gert::git_branch_checkout(branch, repo = worker_path) + gert::git_reset_hard(ref, repo = worker_path) +} + +add_dir_parent_if_empty <- function(files_to_delete, path) { + contained_files <- list.files(path, full.names = TRUE) + if (length(setdiff(contained_files, files_to_delete)) > 0) { + return(files_to_delete) + } + add_dir_parent_if_empty(c(files_to_delete, path), dirname(path)) +} + +get_empty_dirs <- function(worker_path) { + dirs <- fs::dir_ls(worker_path, recurse = TRUE, type = "directory") + Reduce(add_dir_parent_if_empty, c(list(character()), dirs)) +} + +git_clean <- function(worker_path) { + # gert does not have git clean but this should achieve the same thing + tryCatch( + { + gert::git_stash_save( + include_untracked = TRUE, + include_ignored = TRUE, + repo = worker_path + ) + gert::git_stash_drop(repo = worker_path) + }, + error = function(e) { + # we don't need to rethrow the error here since it doesn't break any + # further report runs + if (e$message != "cannot stash changes - there is nothing to stash.") { + # TODO add logger here + message(e$message) + } + NULL + } + ) + # however git ignores all directories, only cares about files, so we may + # have empty directories left + unlink(get_empty_dirs(worker_path), recursive = TRUE) +} diff --git a/scripts/redis b/scripts/redis index 202021e..7510fd8 100755 --- a/scripts/redis +++ b/scripts/redis @@ -1,6 +1,6 @@ #!/usr/bin/env bash set -e -CONTAINER_NAME=orderly.server_redis +CONTAINER_NAME=orderly.runner_redis if [ "$1" = "start" ]; then docker run --rm -d --name=$CONTAINER_NAME -p 127.0.0.1:6379:6379 redis elif [ "$1" = "stop" ]; then diff --git a/tests/testthat/examples/git-clean/git-clean.R b/tests/testthat/examples/git-clean/git-clean.R new file mode 100644 index 0000000..5115f7b --- /dev/null +++ b/tests/testthat/examples/git-clean/git-clean.R @@ -0,0 +1,5 @@ +orderly2::orderly_artefact("Some data", "data.rds") +d <- data.frame(a = 1:10, x = runif(10), y = 1:10 + runif(10)) +write.table("test", file = file.path("..", "..", "inside_draft.txt")) +write.table("test", file = file.path("..", "..", "..", "outside_draft.txt")) +saveRDS(d, "data.rds") diff --git a/tests/testthat/examples/parameters/orderly.R b/tests/testthat/examples/parameters/parameters.R similarity index 100% rename from tests/testthat/examples/parameters/orderly.R rename to tests/testthat/examples/parameters/parameters.R diff --git a/tests/testthat/helper-orderly-runner.R b/tests/testthat/helper-orderly-runner.R index 6eb76ed..20cda70 100644 --- a/tests/testthat/helper-orderly-runner.R +++ b/tests/testthat/helper-orderly-runner.R @@ -20,7 +20,7 @@ new_queue_quietly <- function(root, ...) { start_queue_workers_quietly <- function(n_workers, controller, env = parent.frame()) { suppressMessages( - rrq::rrq_worker_spawn2(n_workers, controller = controller) + rrq::rrq_worker_spawn(n_workers, controller = controller) ) withr::defer(rrq::rrq_worker_stop(controller = controller), env = env) } @@ -67,15 +67,65 @@ copy_examples <- function(examples, path_src) { } -helper_add_git <- function(path) { +helper_add_git <- function(path, add = ".") { gert::git_init(path) - sha <- git_add_and_commit(path) + sha <- git_add_and_commit(path, add) branch <- gert::git_branch(repo = path) url <- "https://example.com/git" gert::git_remote_add(url, repo = path) list(path = path, branch = branch, sha = sha, url = url) } +new_queue_quietly <- function(root, ...) { + suppressMessages(Queue$new(root, ...)) +} + +make_worker_dirs <- function(orderly_root, ids) { + packit_path <- file.path(orderly_root, ".packit") + dir.create(packit_path) + workers <- file.path(packit_path, "workers") + dir.create(workers) + lapply(ids, function(id) { + worker_path <- file.path(workers, id) + dir.create(worker_path) + gert::git_clone(orderly_root, path = worker_path) + gert::git_config_set("user.name", id, repo = worker_path) + gert::git_config_set("user.email", id, repo = worker_path) + }) + +} + +start_queue_workers_quietly <- function(n_workers, + controller, env = parent.frame()) { + worker_manager <- suppressMessages( + rrq::rrq_worker_spawn(n_workers, controller = controller) + ) + withr::defer(rrq::rrq_worker_stop(controller = controller), env = env) + worker_manager +} + +start_queue_with_workers <- function(root, n_workers, env = parent.frame()) { + q <- new_queue_quietly(root) + worker_manager <- start_queue_workers_quietly(n_workers, q$controller, + env = env) + make_worker_dirs(root, worker_manager$id) + q +} + +skip_if_no_redis <- function() { + available <- redux::redis_available() + if (!available) { + testthat::skip("Skipping test as redis is not available") + } + invisible(available) +} + +expect_worker_task_complete <- function(task_id, controller, n_tries) { + is_task_successful <- rrq::rrq_task_wait( + task_id, controller = controller, timeout = n_tries + ) + expect_true(is_task_successful) +} initialise_git_repo <- function() { t <- tempfile() @@ -85,16 +135,25 @@ initialise_git_repo <- function() { } -git_add_and_commit <- function(path) { - gert::git_add(".", repo = path) +create_new_commit <- function(path, new_file = "new", message = "new message", + add = ".") { + writeLines("new file", file.path(path, new_file)) + gert::git_add(add, repo = path) + user <- "author " + gert::git_commit("new commit", author = user, committer = user, repo = path) +} + + +git_add_and_commit <- function(path, add = ".") { + gert::git_add(add, repo = path) user <- "author " gert::git_commit("new commit", author = user, committer = user, repo = path) } -create_new_commit <- function(path, new_file = "new") { +create_new_commit <- function(path, new_file = "new", add = ".") { writeLines("new file", file.path(path, new_file)) - git_add_and_commit(path) + git_add_and_commit(path, add) } diff --git a/tests/testthat/test-api.R b/tests/testthat/test-api.R index 97e1e4c..998ec1b 100644 --- a/tests/testthat/test-api.R +++ b/tests/testthat/test-api.R @@ -32,8 +32,13 @@ test_that("can list orderly reports", { ## Add a report on a 2nd branch gert::git_branch_create("other", repo = repo$local) - fs::dir_copy(file.path(repo$local, "src", "parameters"), + fs::dir_copy(file.path(repo$local, "src", "parameters"), file.path(repo$local, "src", "parameters2")) + # have to rename to parameters2.R to recognise it as orderly report + file.rename( + file.path(repo$local, "src", "parameters2", "parameters.R"), + file.path(repo$local, "src", "parameters2", "parameters2.R") + ) sha <- git_add_and_commit(repo$local) ## Can list items from this sha diff --git a/tests/testthat/test-git.R b/tests/testthat/test-git.R index 1524cde..2d49688 100644 --- a/tests/testthat/test-git.R +++ b/tests/testthat/test-git.R @@ -31,13 +31,13 @@ test_that("can get files which have been modified", { expect_equal(git_get_modified(log$commit[[2]], repo = repo$local), character(0)) expect_equal(git_get_modified(log$commit[[1]], repo = repo$local), - "src/parameters/orderly.R") + "src/parameters/parameters.R") expect_equal(git_get_modified(log$commit[[1]], relative = "src/", repo = repo$local), - "parameters/orderly.R") + "parameters/parameters.R") expect_equal(git_get_modified(log$commit[[1]], base = log$commit[[2]], repo = repo$local), - "src/parameters/orderly.R") + "src/parameters/parameters.R") expect_equal(git_get_modified(log$commit[[2]], base = log$commit[[1]], repo = repo$local), character(0)) diff --git a/tests/testthat/test-queue.R b/tests/testthat/test-queue.R index ea8b8a7..cb3e078 100644 --- a/tests/testthat/test-queue.R +++ b/tests/testthat/test-queue.R @@ -6,7 +6,7 @@ test_that("Can bring up queue", { q <- new_queue_quietly(root) expect_equal(q$root, root) expect_equal(q$config$core$use_file_store, TRUE) - expect_equal(q$controller, rrq::rrq_controller2(q$controller$queue_id)) + expect_equal(q$controller, rrq::rrq_controller(q$controller$queue_id)) expect_equal(q$number_of_workers(), 0) start_queue_workers_quietly(1, q$controller) @@ -58,3 +58,59 @@ test_that("Generated namespaced id if no ids exist", { q <- new_queue_quietly(root) expect_match(q$controller$queue_id, "orderly.runner") }) + + +test_that("Can submit task", { + skip_if_no_redis() + + root <- test_prepare_orderly_example("data") + git_info <- helper_add_git(root, c("src", "orderly_config.yml")) + + q <- start_queue_with_workers(root, 1) + + task_id <- q$submit("data", branch = git_info$branch) + expect_worker_task_complete(task_id, q$controller, 10) +}) + + +test_that("Can submit 2 tasks on different branches", { + skip_if_no_redis() + + root <- test_prepare_orderly_example("data") + git_info <- helper_add_git(root, c("src", "orderly_config.yml")) + + gert::git_branch_create("branch", repo = root) + gert::git_branch_checkout("branch", repo = root) + create_new_commit(root, new_file = "test.txt", add = "test.txt") + + q <- start_queue_with_workers(root, 2) + + task_id1 <- q$submit("data", branch = git_info$branch) + task_id2 <- q$submit("data", branch = "branch") + expect_worker_task_complete(task_id1, q$controller, 10) + expect_worker_task_complete(task_id2, q$controller, 10) + + worker_id2 <- rrq::rrq_task_info(task_id2, controller = q$controller)$worker + worker2_txt <- file.path(root, ".packit", "workers", worker_id2, "test.txt") + expect_equal(file.exists(worker2_txt), TRUE) +}) + + +test_that("Can submit 2 tasks on different commit hashes", { + skip_if_no_redis() + + root <- test_prepare_orderly_example("data") + git_info <- helper_add_git(root, c("src", "orderly_config.yml")) + sha2 <- create_new_commit(root, new_file = "test.txt", add = "test.txt") + + q <- start_queue_with_workers(root, 2) + + task_id1 <- q$submit("data", ref = git_info$sha, branch = git_info$branch) + task_id2 <- q$submit("data", ref = sha2, branch = git_info$branch) + expect_worker_task_complete(task_id1, q$controller, 10) + expect_worker_task_complete(task_id2, q$controller, 10) + + worker_id2 <- rrq::rrq_task_info(task_id2, controller = q$controller)$worker + worker2_txt <- file.path(root, ".packit", "workers", worker_id2, "test.txt") + expect_equal(file.exists(worker2_txt), TRUE) +}) diff --git a/tests/testthat/test-reports.R b/tests/testthat/test-reports.R index 2d9fbe2..acba6df 100644 --- a/tests/testthat/test-reports.R +++ b/tests/testthat/test-reports.R @@ -3,7 +3,7 @@ test_that("can get orderly script name", { git_info <- helper_add_git(root) expect_equal(get_orderly_script_path("data", "HEAD", root), "src/data/data.R") expect_equal(get_orderly_script_path("parameters", "HEAD", root), - "src/parameters/orderly.R") + "src/parameters/parameters.R") file.copy(file.path(root, "src", "data", "data.R"), file.path(root, "src", "data", "orderly.R")) @@ -28,7 +28,7 @@ test_that("can get report parameters", { c = NULL)) ## Works with a specific git hash - params_src <- file.path(root, "src", "parameters", "orderly.R") + params_src <- file.path(root, "src", "parameters", "parameters.R") contents <- readLines(params_src) contents <- c("orderly2::orderly_parameters(a = 'default', b = 2, c = NULL)", contents[-1]) diff --git a/tests/testthat/test-runner.R b/tests/testthat/test-runner.R new file mode 100644 index 0000000..0de32bf --- /dev/null +++ b/tests/testthat/test-runner.R @@ -0,0 +1,64 @@ +test_that("runner runs as expected", { + orderly_root <- test_prepare_orderly_example("data") + git_info <- helper_add_git(orderly_root, c("src", "orderly_config.yml")) + + worker_id <- ids::adjective_animal() + make_worker_dirs(orderly_root, worker_id) + worker_root <- file.path(orderly_root, ".packit", "workers", worker_id) + + suppressMessages(withr::with_envvar( + c(RRQ_WORKER_ID = worker_id), + runner_run(orderly_root, "data", NULL, git_info$branch, + "HEAD", echo = FALSE) + )) + + # report has been run with data in archive + expect_equal(length(list.files(file.path(orderly_root, "archive"))), 1) + # cleanup has deleted draft folder + expect_equal(file.exists(file.path(worker_root, "draft")), FALSE) +}) + +test_that("runner runs as expected with parameters", { + orderly_root <- test_prepare_orderly_example("parameters") + git_info <- helper_add_git(orderly_root, c("src", "orderly_config.yml")) + + worker_id <- ids::adjective_animal() + make_worker_dirs(orderly_root, worker_id) + worker_root <- file.path(orderly_root, ".packit", "workers", worker_id) + + parameters <- list(a = -1, b = -2, c = -3) + suppressMessages(withr::with_envvar( + c(RRQ_WORKER_ID = worker_id), + runner_run(orderly_root, "parameters", parameters, + git_info$branch, "HEAD", echo = FALSE) + )) + + report_archive <- file.path(orderly_root, "archive", "parameters") + rds_path <- file.path(report_archive, list.files(report_archive), "data.rds") + output <- readRDS(rds_path) + + expect_equal(output, parameters) + expect_equal(file.exists(file.path(worker_root, "draft")), FALSE) +}) + +test_that("git clean clears unnecessary files", { + # git-clean.R spawns a file in draft folder and one in worker root folder + # and there will also be an empty folder draft/git-clean so we test + # all components of git_clean + orderly_root <- test_prepare_orderly_example("git-clean") + git_info <- helper_add_git(orderly_root, c("src", "orderly_config.yml")) + + worker_id <- ids::adjective_animal() + make_worker_dirs(orderly_root, worker_id) + worker_root <- file.path(orderly_root, ".packit", "workers", worker_id) + + suppressMessages(withr::with_envvar( + c(RRQ_WORKER_ID = worker_id), + runner_run(orderly_root, "git-clean", NULL, git_info$branch, + "HEAD", echo = FALSE) + )) + + expect_equal(length(list.files(file.path(orderly_root, "archive"))), 1) + expect_equal(file.exists(file.path(worker_root, "draft")), FALSE) + expect_equal(file.exists(file.path(worker_root, "outside_draft.txt")), FALSE) +})