Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mrc 5114 Add a submit function to the queue object #5

Merged
merged 38 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
9a98d7e
update lintr, add packages and add redis script
M-Kusumgar Feb 22, 2024
c8cb377
add queue R6 class
M-Kusumgar Feb 22, 2024
1e1a990
tests
M-Kusumgar Feb 22, 2024
e3417ae
documentation to finalizer
M-Kusumgar Feb 22, 2024
255f58e
add docs for Queue
M-Kusumgar Feb 22, 2024
fd2e375
refactor object so only has view on queue and does not manage workers
M-Kusumgar Feb 27, 2024
e767d0c
add redis to workflow
M-Kusumgar Feb 27, 2024
0699a33
actually start redis
M-Kusumgar Feb 28, 2024
62a98e6
update docs
M-Kusumgar Feb 28, 2024
cb5de4a
fix cleanup
M-Kusumgar Feb 28, 2024
d696102
add runner and submit function to queue
M-Kusumgar Mar 5, 2024
855877d
change env var names to match other PRs
M-Kusumgar Mar 5, 2024
3a6de8b
got worker running with queue
M-Kusumgar Mar 7, 2024
4979cd5
queue tests done
M-Kusumgar Mar 7, 2024
6b34b85
remove debug
M-Kusumgar Mar 7, 2024
f3d0ecd
Merge branch 'main' into mrc-5114
M-Kusumgar Mar 7, 2024
b1a83c9
clean up diff
M-Kusumgar Mar 7, 2024
ff7a8a3
fix tests by using git helpers
M-Kusumgar Mar 7, 2024
0fab722
fix test
M-Kusumgar Mar 7, 2024
f48f338
rob's comments
M-Kusumgar Mar 8, 2024
74d2ac0
cov, git clean test
M-Kusumgar Mar 8, 2024
cb4a5e5
Merge branch 'main' into mrc-5114
M-Kusumgar Mar 8, 2024
4bc17a2
fix tests referencing orderly.R
M-Kusumgar Mar 8, 2024
7e09a01
fix helper function
M-Kusumgar Mar 8, 2024
70533e1
try to fix test
M-Kusumgar Mar 8, 2024
441f844
tmate
M-Kusumgar Mar 8, 2024
6264648
add user name and email to worker git config
Mar 8, 2024
b1daffd
remove tmate
Mar 8, 2024
c7d03e6
test
Mar 8, 2024
17a32aa
error handling
Mar 8, 2024
f4655eb
refactor git stash for better error handling
Mar 8, 2024
8c88f39
add repo arg
Mar 8, 2024
53db6cb
revert to tryCatch for now
Mar 8, 2024
070805a
not running redis on mac
Mar 8, 2024
11b747b
better if
Mar 8, 2024
b69faac
change identity to me
M-Kusumgar Mar 8, 2024
7f8f52f
Update to work with latest rrq, make branch tests work with any defau…
r-ash May 3, 2024
ca6ec00
cleanup before report run
M-Kusumgar Aug 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions R/queue.R
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,20 @@ Queue <- R6::R6Class("Queue", #nolint
controller = self$controller)
},

submit = function(reportname, parameters = NULL,
branch = "master", ref = "HEAD") {
run_args <- list(
self$root,
reportname,
parameters,
branch,
ref
)
rrq::rrq_task_create_call(runner_run, run_args,
separate_process = TRUE,
controller = self$controller)
},

# Just until we add queue status for testing
number_of_workers = function() {
rrq::rrq_worker_len(self$controller)
Expand Down
59 changes: 59 additions & 0 deletions R/runner.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
runner_run <- function(orderly_root, reportname, parameters, branch, ref, ...) {
# Helper functions
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are here because there is a bug in rrq_task_create_call and it doesn't seem to capture them. Made a ticket about this: https://mrc-ide.myjetbrains.com/youtrack/issue/mrc-5153/Investigate-rrqtaskcreatecall-bug

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work if you call rrq::rrq_task_create_call with the package qualifier? i.e. as orderly.runner:::runner_run?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i tried it, didnt seem to even pick up orderly:::runner_run like the task completed successfully but it didnt do anything

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohhh nevermind, i didnt install it, it did work!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it actually works without it too, i just didnt do an install all and so it didnt pick it up from the orderly.runner package 😭 i was avoiding that cos I was relying on specific branches of the orderly2 and rrq repos and didnt know if install all would overwrite them with the head of the github repo, but yh that was the issue!

# ==================
point_head_to_ref <- function(worker_path, branch, ref) {
gert::git_fetch(repo = worker_path)
gert::git_branch_checkout(branch, repo = worker_path)
gert::git_reset_hard(ref, repo = worker_path)
}

add_dir_parent_if_empty <- function(files_to_delete, path) {
contained_files <- list.files(path, full.names = TRUE)
if (length(setdiff(contained_files, files_to_delete)) > 0) {
return(files_to_delete)
}
add_dir_parent_if_empty(c(files_to_delete, path), dirname(path))
}

get_empty_dirs <- function(worker_path) {
dirs <- fs::dir_ls(worker_path, recurse = TRUE, type = "directory")
Reduce(add_dir_parent_if_empty, c(list(character()), dirs))
}

git_clean <- function(worker_path) {
# gert does not have git clean but this should achieve the same thing
res <- tryCatch(
gert::git_stash_save(
include_untracked = TRUE,
include_ignored = TRUE,
repo = worker_path
),
error = function(e) NULL
)
if (!is.null(res)) {
gert::git_stash_drop(repo = worker_path)

Check warning on line 34 in R/runner.R

View check run for this annotation

Codecov / codecov/patch

R/runner.R#L34

Added line #L34 was not covered by tests
}
# however git ignores all directories, only cares about files, so we may
# have empty directories left
unlink(get_empty_dirs(worker_path), recursive = TRUE)
}
# ==================


# Actual runner code
# ==================
# Setup
worker_id <- Sys.getenv("RRQ_WORKER_ID")
worker_path <- file.path(orderly_root, ".packit", "workers", worker_id)
point_head_to_ref(worker_path, branch, ref)

# Run
withr::with_envvar(
c(ORDERLY_SRC_ROOT = file.path(worker_path, "src", reportname)),
orderly2::orderly_run(reportname, parameters = parameters,
root = orderly_root, ...)
)

# Cleanup
git_clean(worker_path)
Comment on lines +17 to +18
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what case do we need to cleanup?

Just tried locally and I was able to use point_head_to_ref to switch between commits without cleaning up anything in between.

Is this for if orderly fails to run and writes to the draft dir? I think that will be gitignored?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at the end of a successful run, orderly deletes the file from the draft folder and leaves an empty folder in the directory, the most important bit of this git clean function is the removing of those empty dirs, just to keep a clean worktree after every run, otherwise that draft folder would just keep accumulating lots of empty folders after many runs

the stash and drop is just a safety in case someone has somehow produced something that is untracked by git, we can guarantee that it is removed and doesnt accumulate, people could also change their gitignore files which could also lead to these problems i think

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it might also be worth (or worth doing in addition) the clean before running the report?

}
2 changes: 1 addition & 1 deletion scripts/redis
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env bash
set -e
CONTAINER_NAME=orderly.server_redis
CONTAINER_NAME=orderly.runner_redis
if [ "$1" = "start" ]; then
docker run --rm -d --name=$CONTAINER_NAME -p 127.0.0.1:6379:6379 redis
elif [ "$1" = "stop" ]; then
Expand Down
56 changes: 52 additions & 4 deletions tests/testthat/helper-orderly-runner.R
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ copy_examples <- function(examples, path_src) {
}


helper_add_git <- function(path) {
helper_add_git <- function(path, add = ".") {
gert::git_init(path)
gert::git_add(".", repo = path)
gert::git_add(add, repo = path)
user <- "author <[email protected]>"
sha <- gert::git_commit("initial", author = user, committer = user,
repo = path)
Expand All @@ -79,6 +79,53 @@ helper_add_git <- function(path) {
list(path = path, user = user, branch = branch, sha = sha, url = url)
}

new_queue_quietly <- function(root, ...) {
suppressMessages(Queue$new(root, ...))
}

make_worker_dirs <- function(orderly_root, ids) {
packit_path <- file.path(orderly_root, ".packit")
dir.create(packit_path)
workers <- file.path(packit_path, "workers")
dir.create(workers)
lapply(ids, function(id) {
worker_path <- file.path(workers, id)
dir.create(worker_path)
gert::git_clone(orderly_root, path = worker_path)
})

}

start_queue_workers_quietly <- function(n_workers,
controller, env = parent.frame()) {
worker_manager <- suppressMessages(
rrq::rrq_worker_spawn2(n_workers, controller = controller)
)
withr::defer(rrq::rrq_worker_stop(controller = controller), env = env)
worker_manager
}

skip_if_no_redis <- function() {
available <- redux::redis_available()
if (!available) {
testthat::skip("Skipping test as redis is not available")
}
invisible(available)
}

expect_worker_task_complete <- function(task_id, controller, n_tries) {
is_completed <- FALSE
for (i in seq_len(n_tries)) {
is_completed <- rrq::rrq_task_status(
task_id, controller = controller
) == "COMPLETE"
if (is_completed == TRUE) {
break
}
Sys.sleep(1)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use rrq_task_wait for this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooo thats neat thanks

expect_equal(is_completed, TRUE)
}

initialise_git_repo <- function() {
t <- tempfile()
Expand All @@ -88,9 +135,10 @@ initialise_git_repo <- function() {
}


create_new_commit <- function(path, new_file = "new", message = "new message") {
create_new_commit <- function(path, new_file = "new", message = "new message",
add = ".") {
writeLines("new file", file.path(path, new_file))
gert::git_add(".", repo = path)
gert::git_add(add, repo = path)
user <- "author <[email protected]>"
gert::git_commit(message, author = user, committer = user, repo = path)
}
Expand Down
7 changes: 6 additions & 1 deletion tests/testthat/test-api.R
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,13 @@ test_that("can list orderly reports", {

## Add a report on a 2nd branch
gert::git_branch_create("other", repo = repo$local)
fs::dir_copy(file.path(repo$local, "src", "parameters"),
fs::dir_copy(file.path(repo$local, "src", "parameters"),
file.path(repo$local, "src", "parameters2"))
# have to rename to parameters2.R to recognise it as orderly report
file.rename(
file.path(repo$local, "src", "parameters2", "parameters.R"),
file.path(repo$local, "src", "parameters2", "parameters2.R")
)
gert::git_add(".", repo = repo$local)
sha <- gert::git_commit("Add report data2", repo = repo$local,
author = "Test User <[email protected]>")
Expand Down
6 changes: 3 additions & 3 deletions tests/testthat/test-git.R
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ test_that("can get files which have been modified", {
expect_equal(git_get_modified(log$commit[[2]], repo = repo$local),
character(0))
expect_equal(git_get_modified(log$commit[[1]], repo = repo$local),
"src/parameters/orderly.R")
"src/parameters/parameters.R")
expect_equal(git_get_modified(log$commit[[1]], relative = "src/",
repo = repo$local),
"parameters/orderly.R")
"parameters/parameters.R")
expect_equal(git_get_modified(log$commit[[1]], base = log$commit[[2]],
repo = repo$local),
"src/parameters/orderly.R")
"src/parameters/parameters.R")
expect_equal(git_get_modified(log$commit[[2]], base = log$commit[[1]],
repo = repo$local),
character(0))
Expand Down
62 changes: 62 additions & 0 deletions tests/testthat/test-queue.R
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,65 @@ test_that("Generated namespaced id if no ids exist", {
q <- new_queue_quietly(root)
expect_match(q$controller$queue_id, "orderly.runner")
})


test_that("Can submit task", {
skip_if_no_redis()

root <- test_prepare_orderly_example("data")
helper_add_git(root, c("src", "orderly_config.yml"))

q <- new_queue_quietly(root)
worker_manager <- start_queue_workers_quietly(1, q$controller)
make_worker_dirs(root, worker_manager$id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could wrap this in a helper to get a queue with specified number of workers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


task_id <- q$submit("data")
expect_worker_task_complete(task_id, q$controller, 10)
})


test_that("Can submit 2 tasks on different branches", {
skip_if_no_redis()

root <- test_prepare_orderly_example("data")
helper_add_git(root, c("src", "orderly_config.yml"))

gert::git_branch_create("branch", repo = root)
gert::git_branch_checkout("branch", repo = root)
create_new_commit(root, new_file = "test.txt", add = "test.txt")

q <- new_queue_quietly(root)
worker_manager <- start_queue_workers_quietly(2, q$controller)
make_worker_dirs(root, worker_manager$id)

task_id1 <- q$submit("data", branch = "master")
task_id2 <- q$submit("data", branch = "branch")
expect_worker_task_complete(task_id1, q$controller, 10)
expect_worker_task_complete(task_id2, q$controller, 10)

worker_id2 <- rrq::rrq_task_info(task_id2, controller = q$controller)$worker
worker2_txt <- file.path(root, ".packit", "workers", worker_id2, "test.txt")
expect_equal(file.exists(worker2_txt), TRUE)
})


test_that("Can submit 2 tasks on different commit hashes", {
skip_if_no_redis()

root <- test_prepare_orderly_example("data")
sha1 <- helper_add_git(root, c("src", "orderly_config.yml"))$sha
sha2 <- create_new_commit(root, new_file = "test.txt", add = "test.txt")

q <- new_queue_quietly(root)
worker_manager <- start_queue_workers_quietly(2, q$controller)
make_worker_dirs(root, worker_manager$id)

task_id1 <- q$submit("data", ref = sha1)
task_id2 <- q$submit("data", ref = sha2)
expect_worker_task_complete(task_id1, q$controller, 10)
expect_worker_task_complete(task_id2, q$controller, 10)

worker_id2 <- rrq::rrq_task_info(task_id2, controller = q$controller)$worker
worker2_txt <- file.path(root, ".packit", "workers", worker_id2, "test.txt")
expect_equal(file.exists(worker2_txt), TRUE)
})
41 changes: 41 additions & 0 deletions tests/testthat/test-runner.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
test_that("runner runs as expected", {
orderly_root <- test_prepare_orderly_example("data")
helper_add_git(orderly_root, c("src", "orderly_config.yml"))

worker_id <- "worker1"
make_worker_dirs(orderly_root, worker_id)
worker_root <- file.path(orderly_root, ".packit", "workers", worker_id)

suppressMessages(withr::with_envvar(
c(RRQ_WORKER_ID = worker_id),
runner_run(orderly_root, "data", NULL, "master", "HEAD", echo = FALSE)
))

# report has been run with data in archive
expect_equal(length(list.files(file.path(orderly_root, "archive"))), 1)
# cleanup has deleted draft folder
expect_equal(file.exists(file.path(worker_root, "draft")), FALSE)
})

test_that("runner runs as expected with parameters", {
orderly_root <- test_prepare_orderly_example("parameters")
helper_add_git(orderly_root, c("src", "orderly_config.yml"))

worker_id <- "worker1"
make_worker_dirs(orderly_root, worker_id)
worker_root <- file.path(orderly_root, ".packit", "workers", worker_id)

parameters <- list(a = -1, b = -2, c = -3)
suppressMessages(withr::with_envvar(
c(RRQ_WORKER_ID = worker_id),
runner_run(orderly_root, "parameters", parameters,
"master", "HEAD", echo = FALSE)
))

report_archive <- file.path(orderly_root, "archive", "parameters")
rds_path <- file.path(report_archive, list.files(report_archive), "data.rds")
output <- readRDS(rds_path)

expect_equal(output, parameters)
expect_equal(file.exists(file.path(worker_root, "draft")), FALSE)
})
Loading