-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Mrc 5114 Add a submit function to the queue object #5
Changes from 19 commits
9a98d7e
c8cb377
1e1a990
e3417ae
255f58e
fd2e375
e767d0c
0699a33
62a98e6
cb5de4a
d696102
855877d
3a6de8b
4979cd5
6b34b85
f3d0ecd
b1a83c9
ff7a8a3
0fab722
f48f338
74d2ac0
cb4a5e5
4bc17a2
7e09a01
70533e1
441f844
6264648
b1daffd
c7d03e6
17a32aa
f4655eb
8c88f39
53db6cb
070805a
11b747b
b69faac
7f8f52f
ca6ec00
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
runner_run <- function(orderly_root, reportname, parameters, branch, ref, ...) { | ||
# Helper functions | ||
# ================== | ||
point_head_to_ref <- function(worker_path, branch, ref) { | ||
gert::git_fetch(repo = worker_path) | ||
gert::git_branch_checkout(branch, repo = worker_path) | ||
gert::git_reset_hard(ref, repo = worker_path) | ||
} | ||
|
||
add_dir_parent_if_empty <- function(files_to_delete, path) { | ||
contained_files <- list.files(path, full.names = TRUE) | ||
if (length(setdiff(contained_files, files_to_delete)) > 0) { | ||
return(files_to_delete) | ||
} | ||
add_dir_parent_if_empty(c(files_to_delete, path), dirname(path)) | ||
} | ||
|
||
get_empty_dirs <- function(worker_path) { | ||
dirs <- fs::dir_ls(worker_path, recurse = TRUE, type = "directory") | ||
Reduce(add_dir_parent_if_empty, c(list(character()), dirs)) | ||
} | ||
|
||
git_clean <- function(worker_path) { | ||
# gert does not have git clean but this should achieve the same thing | ||
res <- tryCatch( | ||
gert::git_stash_save( | ||
include_untracked = TRUE, | ||
include_ignored = TRUE, | ||
repo = worker_path | ||
), | ||
error = function(e) NULL | ||
) | ||
if (!is.null(res)) { | ||
gert::git_stash_drop(repo = worker_path) | ||
} | ||
# however git ignores all directories, only cares about files, so we may | ||
# have empty directories left | ||
unlink(get_empty_dirs(worker_path), recursive = TRUE) | ||
} | ||
# ================== | ||
|
||
|
||
# Actual runner code | ||
# ================== | ||
# Setup | ||
worker_id <- Sys.getenv("RRQ_WORKER_ID") | ||
worker_path <- file.path(orderly_root, ".packit", "workers", worker_id) | ||
point_head_to_ref(worker_path, branch, ref) | ||
|
||
# Run | ||
withr::with_envvar( | ||
c(ORDERLY_SRC_ROOT = file.path(worker_path, "src", reportname)), | ||
orderly2::orderly_run(reportname, parameters = parameters, | ||
root = orderly_root, ...) | ||
) | ||
|
||
# Cleanup | ||
git_clean(worker_path) | ||
Comment on lines
+17
to
+18
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In what case do we need to cleanup? Just tried locally and I was able to use Is this for if orderly fails to run and writes to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. at the end of a successful run, orderly deletes the file from the draft folder and leaves an empty folder in the directory, the most important bit of this git clean function is the removing of those empty dirs, just to keep a clean worktree after every run, otherwise that draft folder would just keep accumulating lots of empty folders after many runs the stash and drop is just a safety in case someone has somehow produced something that is untracked by git, we can guarantee that it is removed and doesnt accumulate, people could also change their gitignore files which could also lead to these problems i think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if it might also be worth (or worth doing in addition) the clean before running the report? |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -67,9 +67,9 @@ copy_examples <- function(examples, path_src) { | |
} | ||
|
||
|
||
helper_add_git <- function(path) { | ||
helper_add_git <- function(path, add = ".") { | ||
gert::git_init(path) | ||
gert::git_add(".", repo = path) | ||
gert::git_add(add, repo = path) | ||
user <- "author <[email protected]>" | ||
sha <- gert::git_commit("initial", author = user, committer = user, | ||
repo = path) | ||
|
@@ -79,6 +79,53 @@ helper_add_git <- function(path) { | |
list(path = path, user = user, branch = branch, sha = sha, url = url) | ||
} | ||
|
||
new_queue_quietly <- function(root, ...) { | ||
suppressMessages(Queue$new(root, ...)) | ||
} | ||
|
||
make_worker_dirs <- function(orderly_root, ids) { | ||
packit_path <- file.path(orderly_root, ".packit") | ||
dir.create(packit_path) | ||
workers <- file.path(packit_path, "workers") | ||
dir.create(workers) | ||
lapply(ids, function(id) { | ||
worker_path <- file.path(workers, id) | ||
dir.create(worker_path) | ||
gert::git_clone(orderly_root, path = worker_path) | ||
}) | ||
|
||
} | ||
|
||
start_queue_workers_quietly <- function(n_workers, | ||
controller, env = parent.frame()) { | ||
worker_manager <- suppressMessages( | ||
rrq::rrq_worker_spawn2(n_workers, controller = controller) | ||
) | ||
withr::defer(rrq::rrq_worker_stop(controller = controller), env = env) | ||
worker_manager | ||
} | ||
|
||
skip_if_no_redis <- function() { | ||
available <- redux::redis_available() | ||
if (!available) { | ||
testthat::skip("Skipping test as redis is not available") | ||
} | ||
invisible(available) | ||
} | ||
|
||
expect_worker_task_complete <- function(task_id, controller, n_tries) { | ||
is_completed <- FALSE | ||
for (i in seq_len(n_tries)) { | ||
is_completed <- rrq::rrq_task_status( | ||
task_id, controller = controller | ||
) == "COMPLETE" | ||
if (is_completed == TRUE) { | ||
break | ||
} | ||
Sys.sleep(1) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ooo thats neat thanks |
||
expect_equal(is_completed, TRUE) | ||
} | ||
|
||
initialise_git_repo <- function() { | ||
t <- tempfile() | ||
|
@@ -88,9 +135,10 @@ initialise_git_repo <- function() { | |
} | ||
|
||
|
||
create_new_commit <- function(path, new_file = "new", message = "new message") { | ||
create_new_commit <- function(path, new_file = "new", message = "new message", | ||
add = ".") { | ||
writeLines("new file", file.path(path, new_file)) | ||
gert::git_add(".", repo = path) | ||
gert::git_add(add, repo = path) | ||
user <- "author <[email protected]>" | ||
gert::git_commit(message, author = user, committer = user, repo = path) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,8 +32,13 @@ test_that("can list orderly reports", { | |
|
||
## Add a report on a 2nd branch | ||
gert::git_branch_create("other", repo = repo$local) | ||
fs::dir_copy(file.path(repo$local, "src", "parameters"), | ||
fs::dir_copy(file.path(repo$local, "src", "parameters"), | ||
file.path(repo$local, "src", "parameters2")) | ||
# have to rename to parameters2.R to recognise it as orderly report | ||
file.rename( | ||
file.path(repo$local, "src", "parameters2", "parameters.R"), | ||
file.path(repo$local, "src", "parameters2", "parameters2.R") | ||
) | ||
gert::git_add(".", repo = repo$local) | ||
sha <- gert::git_commit("Add report data2", repo = repo$local, | ||
author = "Test User <[email protected]>") | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,3 +58,65 @@ test_that("Generated namespaced id if no ids exist", { | |
q <- new_queue_quietly(root) | ||
expect_match(q$controller$queue_id, "orderly.runner") | ||
}) | ||
|
||
|
||
test_that("Can submit task", { | ||
skip_if_no_redis() | ||
|
||
root <- test_prepare_orderly_example("data") | ||
helper_add_git(root, c("src", "orderly_config.yml")) | ||
|
||
q <- new_queue_quietly(root) | ||
worker_manager <- start_queue_workers_quietly(1, q$controller) | ||
make_worker_dirs(root, worker_manager$id) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could wrap this in a helper to get a queue with specified number of workers. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
|
||
task_id <- q$submit("data") | ||
expect_worker_task_complete(task_id, q$controller, 10) | ||
}) | ||
|
||
|
||
test_that("Can submit 2 tasks on different branches", { | ||
skip_if_no_redis() | ||
|
||
root <- test_prepare_orderly_example("data") | ||
helper_add_git(root, c("src", "orderly_config.yml")) | ||
|
||
gert::git_branch_create("branch", repo = root) | ||
gert::git_branch_checkout("branch", repo = root) | ||
create_new_commit(root, new_file = "test.txt", add = "test.txt") | ||
|
||
q <- new_queue_quietly(root) | ||
worker_manager <- start_queue_workers_quietly(2, q$controller) | ||
make_worker_dirs(root, worker_manager$id) | ||
|
||
task_id1 <- q$submit("data", branch = "master") | ||
task_id2 <- q$submit("data", branch = "branch") | ||
expect_worker_task_complete(task_id1, q$controller, 10) | ||
expect_worker_task_complete(task_id2, q$controller, 10) | ||
|
||
worker_id2 <- rrq::rrq_task_info(task_id2, controller = q$controller)$worker | ||
worker2_txt <- file.path(root, ".packit", "workers", worker_id2, "test.txt") | ||
expect_equal(file.exists(worker2_txt), TRUE) | ||
}) | ||
|
||
|
||
test_that("Can submit 2 tasks on different commit hashes", { | ||
skip_if_no_redis() | ||
|
||
root <- test_prepare_orderly_example("data") | ||
sha1 <- helper_add_git(root, c("src", "orderly_config.yml"))$sha | ||
sha2 <- create_new_commit(root, new_file = "test.txt", add = "test.txt") | ||
|
||
q <- new_queue_quietly(root) | ||
worker_manager <- start_queue_workers_quietly(2, q$controller) | ||
make_worker_dirs(root, worker_manager$id) | ||
|
||
task_id1 <- q$submit("data", ref = sha1) | ||
task_id2 <- q$submit("data", ref = sha2) | ||
expect_worker_task_complete(task_id1, q$controller, 10) | ||
expect_worker_task_complete(task_id2, q$controller, 10) | ||
|
||
worker_id2 <- rrq::rrq_task_info(task_id2, controller = q$controller)$worker | ||
worker2_txt <- file.path(root, ".packit", "workers", worker_id2, "test.txt") | ||
expect_equal(file.exists(worker2_txt), TRUE) | ||
}) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
test_that("runner runs as expected", { | ||
orderly_root <- test_prepare_orderly_example("data") | ||
helper_add_git(orderly_root, c("src", "orderly_config.yml")) | ||
|
||
worker_id <- "worker1" | ||
make_worker_dirs(orderly_root, worker_id) | ||
worker_root <- file.path(orderly_root, ".packit", "workers", worker_id) | ||
|
||
suppressMessages(withr::with_envvar( | ||
c(RRQ_WORKER_ID = worker_id), | ||
runner_run(orderly_root, "data", NULL, "master", "HEAD", echo = FALSE) | ||
)) | ||
|
||
# report has been run with data in archive | ||
expect_equal(length(list.files(file.path(orderly_root, "archive"))), 1) | ||
# cleanup has deleted draft folder | ||
expect_equal(file.exists(file.path(worker_root, "draft")), FALSE) | ||
}) | ||
|
||
test_that("runner runs as expected with parameters", { | ||
orderly_root <- test_prepare_orderly_example("parameters") | ||
helper_add_git(orderly_root, c("src", "orderly_config.yml")) | ||
|
||
worker_id <- "worker1" | ||
make_worker_dirs(orderly_root, worker_id) | ||
worker_root <- file.path(orderly_root, ".packit", "workers", worker_id) | ||
|
||
parameters <- list(a = -1, b = -2, c = -3) | ||
suppressMessages(withr::with_envvar( | ||
c(RRQ_WORKER_ID = worker_id), | ||
runner_run(orderly_root, "parameters", parameters, | ||
"master", "HEAD", echo = FALSE) | ||
)) | ||
|
||
report_archive <- file.path(orderly_root, "archive", "parameters") | ||
rds_path <- file.path(report_archive, list.files(report_archive), "data.rds") | ||
output <- readRDS(rds_path) | ||
|
||
expect_equal(output, parameters) | ||
expect_equal(file.exists(file.path(worker_root, "draft")), FALSE) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are here because there is a bug in
rrq_task_create_call
and it doesn't seem to capture them. Made a ticket about this: https://mrc-ide.myjetbrains.com/youtrack/issue/mrc-5153/Investigate-rrqtaskcreatecall-bugThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this work if you call
rrq::rrq_task_create_call
with the package qualifier? i.e. asorderly.runner:::runner_run
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i tried it, didnt seem to even pick up
orderly:::runner_run
like the task completed successfully but it didnt do anythingThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ohhh nevermind, i didnt install it, it did work!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it actually works without it too, i just didnt do an install all and so it didnt pick it up from the
orderly.runner
package 😭 i was avoiding that cos I was relying on specific branches of the orderly2 and rrq repos and didnt know if install all would overwrite them with the head of the github repo, but yh that was the issue!