Skip to content

Commit

Permalink
Merge pull request #5 from mrc-ide/mrc-5114
Browse files Browse the repository at this point in the history
Mrc 5114 Add a submit function to the queue object
  • Loading branch information
M-Kusumgar authored Aug 1, 2024
2 parents 0c8dfbd + ca6ec00 commit e15473b
Show file tree
Hide file tree
Showing 13 changed files with 290 additions and 18 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/R-CMD-check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ jobs:
extra-packages: any::rcmdcheck
needs: check

- name: Start Redis
if: ${{ matrix.config.os != 'macos-latest' }}
run: |
./scripts/redis start
- uses: r-lib/actions/check-r-package@v2
with:
upload-snapshots: true
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Imports:
porcelain,
R6,
redux,
rrq,
rrq (>= 0.7.15),
withr
Suggests:
fs,
Expand Down
18 changes: 16 additions & 2 deletions R/queue.R
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,28 @@ Queue <- R6::R6Class("Queue", #nolint
}

# Create queue
self$controller <- rrq::rrq_controller2(
self$controller <- rrq::rrq_controller(
queue_id %||% orderly_queue_id()
)
worker_config <- rrq::rrq_worker_config(heartbeat_period = 10)
rrq::rrq_worker_config_save2("localhost", worker_config,
rrq::rrq_worker_config_save("localhost", worker_config,
controller = self$controller)
},

submit = function(reportname, parameters = NULL,
branch = "master", ref = "HEAD") {
run_args <- list(
self$root,
reportname,
parameters,
branch,
ref
)
rrq::rrq_task_create_call(runner_run, run_args,
separate_process = TRUE,
controller = self$controller)
},

# Just until we add queue status for testing
number_of_workers = function() {
rrq::rrq_worker_len(self$controller)
Expand Down
64 changes: 64 additions & 0 deletions R/runner.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
runner_run <- function(orderly_root, reportname, parameters, branch, ref, ...) {
# Setup
worker_id <- Sys.getenv("RRQ_WORKER_ID")
worker_path <- file.path(orderly_root, ".packit", "workers", worker_id)
point_head_to_ref(worker_path, branch, ref)

# Initial cleanup
git_clean(worker_path)

# Run
withr::with_envvar(
c(ORDERLY_SRC_ROOT = file.path(worker_path, "src", reportname)),
orderly2::orderly_run(reportname, parameters = parameters,
root = orderly_root, ...)
)

# Cleanup
git_clean(worker_path)
}

point_head_to_ref <- function(worker_path, branch, ref) {
gert::git_fetch(repo = worker_path)
gert::git_branch_checkout(branch, repo = worker_path)
gert::git_reset_hard(ref, repo = worker_path)
}

add_dir_parent_if_empty <- function(files_to_delete, path) {
contained_files <- list.files(path, full.names = TRUE)
if (length(setdiff(contained_files, files_to_delete)) > 0) {
return(files_to_delete)
}
add_dir_parent_if_empty(c(files_to_delete, path), dirname(path))
}

get_empty_dirs <- function(worker_path) {
dirs <- fs::dir_ls(worker_path, recurse = TRUE, type = "directory")
Reduce(add_dir_parent_if_empty, c(list(character()), dirs))
}

git_clean <- function(worker_path) {
# gert does not have git clean but this should achieve the same thing
tryCatch(
{
gert::git_stash_save(
include_untracked = TRUE,
include_ignored = TRUE,
repo = worker_path
)
gert::git_stash_drop(repo = worker_path)
},
error = function(e) {
# we don't need to rethrow the error here since it doesn't break any
# further report runs
if (e$message != "cannot stash changes - there is nothing to stash.") {
# TODO add logger here
message(e$message)
}
NULL
}
)
# however git ignores all directories, only cares about files, so we may
# have empty directories left
unlink(get_empty_dirs(worker_path), recursive = TRUE)
}
2 changes: 1 addition & 1 deletion scripts/redis
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env bash
set -e
CONTAINER_NAME=orderly.server_redis
CONTAINER_NAME=orderly.runner_redis
if [ "$1" = "start" ]; then
docker run --rm -d --name=$CONTAINER_NAME -p 127.0.0.1:6379:6379 redis
elif [ "$1" = "stop" ]; then
Expand Down
5 changes: 5 additions & 0 deletions tests/testthat/examples/git-clean/git-clean.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
orderly2::orderly_artefact("Some data", "data.rds")
d <- data.frame(a = 1:10, x = runif(10), y = 1:10 + runif(10))
write.table("test", file = file.path("..", "..", "inside_draft.txt"))
write.table("test", file = file.path("..", "..", "..", "outside_draft.txt"))
saveRDS(d, "data.rds")
73 changes: 66 additions & 7 deletions tests/testthat/helper-orderly-runner.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ new_queue_quietly <- function(root, ...) {
start_queue_workers_quietly <- function(n_workers,
controller, env = parent.frame()) {
suppressMessages(
rrq::rrq_worker_spawn2(n_workers, controller = controller)
rrq::rrq_worker_spawn(n_workers, controller = controller)
)
withr::defer(rrq::rrq_worker_stop(controller = controller), env = env)
}
Expand Down Expand Up @@ -67,15 +67,65 @@ copy_examples <- function(examples, path_src) {
}


helper_add_git <- function(path) {
helper_add_git <- function(path, add = ".") {
gert::git_init(path)
sha <- git_add_and_commit(path)
sha <- git_add_and_commit(path, add)
branch <- gert::git_branch(repo = path)
url <- "https://example.com/git"
gert::git_remote_add(url, repo = path)
list(path = path, branch = branch, sha = sha, url = url)
}

new_queue_quietly <- function(root, ...) {
suppressMessages(Queue$new(root, ...))
}

make_worker_dirs <- function(orderly_root, ids) {
packit_path <- file.path(orderly_root, ".packit")
dir.create(packit_path)
workers <- file.path(packit_path, "workers")
dir.create(workers)
lapply(ids, function(id) {
worker_path <- file.path(workers, id)
dir.create(worker_path)
gert::git_clone(orderly_root, path = worker_path)
gert::git_config_set("user.name", id, repo = worker_path)
gert::git_config_set("user.email", id, repo = worker_path)
})

}

start_queue_workers_quietly <- function(n_workers,
controller, env = parent.frame()) {
worker_manager <- suppressMessages(
rrq::rrq_worker_spawn(n_workers, controller = controller)
)
withr::defer(rrq::rrq_worker_stop(controller = controller), env = env)
worker_manager
}

start_queue_with_workers <- function(root, n_workers, env = parent.frame()) {
q <- new_queue_quietly(root)
worker_manager <- start_queue_workers_quietly(n_workers, q$controller,
env = env)
make_worker_dirs(root, worker_manager$id)
q
}

skip_if_no_redis <- function() {
available <- redux::redis_available()
if (!available) {
testthat::skip("Skipping test as redis is not available")
}
invisible(available)
}

expect_worker_task_complete <- function(task_id, controller, n_tries) {
is_task_successful <- rrq::rrq_task_wait(
task_id, controller = controller, timeout = n_tries
)
expect_true(is_task_successful)
}

initialise_git_repo <- function() {
t <- tempfile()
Expand All @@ -85,16 +135,25 @@ initialise_git_repo <- function() {
}


git_add_and_commit <- function(path) {
gert::git_add(".", repo = path)
create_new_commit <- function(path, new_file = "new", message = "new message",
add = ".") {
writeLines("new file", file.path(path, new_file))
gert::git_add(add, repo = path)
user <- "author <[email protected]>"
gert::git_commit("new commit", author = user, committer = user, repo = path)
}


git_add_and_commit <- function(path, add = ".") {
gert::git_add(add, repo = path)
user <- "author <[email protected]>"
gert::git_commit("new commit", author = user, committer = user, repo = path)
}


create_new_commit <- function(path, new_file = "new") {
create_new_commit <- function(path, new_file = "new", add = ".") {
writeLines("new file", file.path(path, new_file))
git_add_and_commit(path)
git_add_and_commit(path, add)
}


Expand Down
7 changes: 6 additions & 1 deletion tests/testthat/test-api.R
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,13 @@ test_that("can list orderly reports", {

## Add a report on a 2nd branch
gert::git_branch_create("other", repo = repo$local)
fs::dir_copy(file.path(repo$local, "src", "parameters"),
fs::dir_copy(file.path(repo$local, "src", "parameters"),
file.path(repo$local, "src", "parameters2"))
# have to rename to parameters2.R to recognise it as orderly report
file.rename(
file.path(repo$local, "src", "parameters2", "parameters.R"),
file.path(repo$local, "src", "parameters2", "parameters2.R")
)
sha <- git_add_and_commit(repo$local)

## Can list items from this sha
Expand Down
6 changes: 3 additions & 3 deletions tests/testthat/test-git.R
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ test_that("can get files which have been modified", {
expect_equal(git_get_modified(log$commit[[2]], repo = repo$local),
character(0))
expect_equal(git_get_modified(log$commit[[1]], repo = repo$local),
"src/parameters/orderly.R")
"src/parameters/parameters.R")
expect_equal(git_get_modified(log$commit[[1]], relative = "src/",
repo = repo$local),
"parameters/orderly.R")
"parameters/parameters.R")
expect_equal(git_get_modified(log$commit[[1]], base = log$commit[[2]],
repo = repo$local),
"src/parameters/orderly.R")
"src/parameters/parameters.R")
expect_equal(git_get_modified(log$commit[[2]], base = log$commit[[1]],
repo = repo$local),
character(0))
Expand Down
58 changes: 57 additions & 1 deletion tests/testthat/test-queue.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ test_that("Can bring up queue", {
q <- new_queue_quietly(root)
expect_equal(q$root, root)
expect_equal(q$config$core$use_file_store, TRUE)
expect_equal(q$controller, rrq::rrq_controller2(q$controller$queue_id))
expect_equal(q$controller, rrq::rrq_controller(q$controller$queue_id))

expect_equal(q$number_of_workers(), 0)
start_queue_workers_quietly(1, q$controller)
Expand Down Expand Up @@ -58,3 +58,59 @@ test_that("Generated namespaced id if no ids exist", {
q <- new_queue_quietly(root)
expect_match(q$controller$queue_id, "orderly.runner")
})


test_that("Can submit task", {
skip_if_no_redis()

root <- test_prepare_orderly_example("data")
git_info <- helper_add_git(root, c("src", "orderly_config.yml"))

q <- start_queue_with_workers(root, 1)

task_id <- q$submit("data", branch = git_info$branch)
expect_worker_task_complete(task_id, q$controller, 10)
})


test_that("Can submit 2 tasks on different branches", {
skip_if_no_redis()

root <- test_prepare_orderly_example("data")
git_info <- helper_add_git(root, c("src", "orderly_config.yml"))

gert::git_branch_create("branch", repo = root)
gert::git_branch_checkout("branch", repo = root)
create_new_commit(root, new_file = "test.txt", add = "test.txt")

q <- start_queue_with_workers(root, 2)

task_id1 <- q$submit("data", branch = git_info$branch)
task_id2 <- q$submit("data", branch = "branch")
expect_worker_task_complete(task_id1, q$controller, 10)
expect_worker_task_complete(task_id2, q$controller, 10)

worker_id2 <- rrq::rrq_task_info(task_id2, controller = q$controller)$worker
worker2_txt <- file.path(root, ".packit", "workers", worker_id2, "test.txt")
expect_equal(file.exists(worker2_txt), TRUE)
})


test_that("Can submit 2 tasks on different commit hashes", {
skip_if_no_redis()

root <- test_prepare_orderly_example("data")
git_info <- helper_add_git(root, c("src", "orderly_config.yml"))
sha2 <- create_new_commit(root, new_file = "test.txt", add = "test.txt")

q <- start_queue_with_workers(root, 2)

task_id1 <- q$submit("data", ref = git_info$sha, branch = git_info$branch)
task_id2 <- q$submit("data", ref = sha2, branch = git_info$branch)
expect_worker_task_complete(task_id1, q$controller, 10)
expect_worker_task_complete(task_id2, q$controller, 10)

worker_id2 <- rrq::rrq_task_info(task_id2, controller = q$controller)$worker
worker2_txt <- file.path(root, ".packit", "workers", worker_id2, "test.txt")
expect_equal(file.exists(worker2_txt), TRUE)
})
4 changes: 2 additions & 2 deletions tests/testthat/test-reports.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ test_that("can get orderly script name", {
git_info <- helper_add_git(root)
expect_equal(get_orderly_script_path("data", "HEAD", root), "src/data/data.R")
expect_equal(get_orderly_script_path("parameters", "HEAD", root),
"src/parameters/orderly.R")
"src/parameters/parameters.R")

file.copy(file.path(root, "src", "data", "data.R"),
file.path(root, "src", "data", "orderly.R"))
Expand All @@ -28,7 +28,7 @@ test_that("can get report parameters", {
c = NULL))

## Works with a specific git hash
params_src <- file.path(root, "src", "parameters", "orderly.R")
params_src <- file.path(root, "src", "parameters", "parameters.R")
contents <- readLines(params_src)
contents <- c("orderly2::orderly_parameters(a = 'default', b = 2, c = NULL)",
contents[-1])
Expand Down
Loading

0 comments on commit e15473b

Please sign in to comment.