Skip to content

Commit

Permalink
Merge pull request #9 from mrc-ide/mrc-5613-status-accept-list-ids
Browse files Browse the repository at this point in the history
Mrc-5613 status accept list ids
  • Loading branch information
absternator authored Aug 21, 2024
2 parents 0f0d2ca + 1f37b79 commit f5794bd
Show file tree
Hide file tree
Showing 14 changed files with 317 additions and 121 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/R-CMD-check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
config:
- {os: macos-latest, r: 'release'}
- {os: windows-latest, r: 'release'}
- {os: ubuntu-latest, r: 'devel', http-user-agent: 'release'}
# - {os: ubuntu-latest, r: 'devel', http-user-agent: 'release'} # TODO: fix with issue #mrc-5693
- {os: ubuntu-latest, r: 'release'}
- {os: ubuntu-latest, r: 'oldrel-1'}

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/build-and-push.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
# Need help debugging build failures? Start at https://github.com/r-lib/actions#where-to-find-help
on:
push:
branches: [main, master]
branches: [main, master]
pull_request:
branches: [main, master]
branches: [main, master]

name: Build-and-push

Expand Down
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"editor.formatOnSave": false
}
19 changes: 11 additions & 8 deletions R/api.R
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,16 @@ report_list <- function(root, ref) {
last_changed <- function(nm) {
max(contents$modified[startsWith(contents$path, sprintf("src/%s", nm))])
}
updated_time <- vnapply(nms, last_changed, USE.NAMES = FALSE)
updatedTime <- vnapply(nms, last_changed, USE.NAMES = FALSE)
modified_sources <- git_get_modified(ref, relative_dir = "src/", repo = root)
modified_reports <- unique(first_dirname(modified_sources))
has_modifications <- vlapply(nms, function(report_name) {
hasModifications <- vlapply(nms, function(report_name) {
report_name %in% modified_reports
}, USE.NAMES = FALSE)
data.frame(
name = nms,
updated_time = updated_time,
has_modifications = has_modifications
updatedTime = updatedTime,
hasModifications = hasModifications
)
}

Expand Down Expand Up @@ -102,12 +102,15 @@ submit_report_run <- function(root, queue, data) {
ref = data$hash,
parameters = data$parameters
)
list(task_id = scalar(task_id))
list(taskId = scalar(task_id))
}

##' @porcelain
##' GET /report/status/<task_id:string> => json(report_run_status_response)
##' POST /report/status => json(report_run_status_response)
##' state queue :: queue
report_run_status <- function(queue, task_id) {
queue$get_status(task_id)
##' query include_logs :: logical
##' body data :: json(report_run_status_request)
report_run_status <- function(queue, include_logs, data) {
task_ids <- jsonlite::fromJSON(data)
queue$get_status(task_ids, include_logs)
}
8 changes: 5 additions & 3 deletions R/porcelain.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 21 additions & 16 deletions R/queue.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Queue <- R6::R6Class("Queue", # nolint
#' @param root Orderly root.
#' @param queue_id ID of an existing queue to connect to, creates a new one
#' if NULL (default NULL)
#' @param logs_dir directory to store worker logs
#' @param logs_dir directory to store worker logs
initialize = function(root, queue_id = NULL, logs_dir = "logs/worker") {
self$root <- root
self$config <- orderly2::orderly_config(self$root)
Expand Down Expand Up @@ -75,24 +75,29 @@ Queue <- R6::R6Class("Queue", # nolint
#' @description
#' Gets status of packet run
#'
#' @param task_id Id of redis queue job to get status of.
#' @param task_ids Task ids to get status of.
#' @param include_logs Whether to include logs in response or not
#' @return status of redis queue job
get_status = function(task_id) {
if (!rrq::rrq_task_exists(task_id, controller = self$controller)) {
porcelain::porcelain_stop("Job ID does not exist")
get_status = function(task_ids, include_logs = TRUE) {
invalid_task_ids <- task_ids[!rrq::rrq_task_exists(task_ids, controller = self$controller)]
if (length(invalid_task_ids) > 0) {
porcelain::porcelain_stop(sprintf("Job ids [%s] do not exist in the queue", paste(invalid_task_ids, collapse = ", ")))
}
status <- rrq::rrq_task_status(task_id, controller = self$controller)
times <- rrq::rrq_task_times(task_id, controller = self$controller)
statuses <- rrq::rrq_task_status(task_ids, controller = self$controller)
tasks_times <- rrq::rrq_task_times(task_ids, controller = self$controller)
queuePositions <- rrq::rrq_task_position(task_ids, controller = self$controller)

list(
status = scalar(status),
queue_position = if (status == "PENDING") scalar(rrq::rrq_task_position(task_id, controller = self$controller)) else NULL,
time_queued = scalar(times[1]),
time_started = scalar(times[2]),
time_complete = scalar(times[3]),
packet_id = if (status == "COMPLETE") scalar(rrq::rrq_task_result(task_id, controller = self$controller)) else NULL,
logs = rrq::rrq_task_log(task_id, controller = self$controller)
)
lapply(seq_along(task_ids), function(index) {
list(
status = scalar(statuses[index]),
queuePosition = if (statuses[index] == "PENDING") scalar(queuePositions[index]) else NULL,
timeQueued = scalar(tasks_times[task_ids[index], 1]),
timeStarted = scalar(tasks_times[task_ids[index], 2]),
timeComplete = scalar(tasks_times[task_ids[index], 3]),
packetId = if (statuses[index] == "COMPLETE") scalar(rrq::rrq_task_result(task_ids[index], controller = self$controller)) else NULL,
logs = if (include_logs) rrq::rrq_task_log(task_ids[index], controller = self$controller) else NULL
)
})
},

#' @description Destroy queue
Expand Down
6 changes: 3 additions & 3 deletions inst/schema/report_list.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
"type": "object",
"properties": {
"name": { "type": "string" },
"updated_time": { "type": "number" },
"has_modifications": { "type": "boolean" }
"updatedTime": { "type": "number" },
"hasModifications": { "type": "boolean" }
},
"additionalPropertes": false,
"required": [ "name", "updated_time", "has_modifications" ]
"required": [ "name", "updatedTime", "hasModifications" ]
}
}
4 changes: 2 additions & 2 deletions inst/schema/report_run_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"task_id": {
"taskId": {
"type": "string"
}
},
"required": ["task_id"],
"required": ["taskId"],
"additionalProperties": false
}
7 changes: 7 additions & 0 deletions inst/schema/report_run_status_request.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "array",
"items": {
"type": "string"
}
}
66 changes: 35 additions & 31 deletions inst/schema/report_run_status_response.json
Original file line number Diff line number Diff line change
@@ -1,35 +1,39 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"time_queued": {
"type": "number",
"description": "time queued in UTC time."
"type": "array",
"items": {
"type": "object",
"properties": {
"timeQueued": {
"type": "number",
"description": "time queued in UTC time."
},
"timeStarted": {
"type": ["number", "null"],
"description": "time started run in UTC time."
},
"timeComplete": {
"type": ["number", "null"],
"description": "time completed in UTC time."
},
"queuePosition": {
"type": ["number", "null"]
},
"logs": {
"type": ["array", "null"],
"items": {
"type": "string"
}
},
"status": {
"type": "string",
"enum": ["PENDING", "RUNNING","COMPLETE", "ERROR", "CANCELLED", "DIED", "TIMEOUT", "IMPOSSIBLE","DEFERRED", "MOVED"]
},
"packetId": {
"type": ["string", "null"]
}
},
"time_started": {
"type": ["number", "null"],
"description": "time started run in UTC time."
},
"time_complete": {
"type": ["number", "null"],
"description": "time completed in UTC time."
},
"queue_position": {
"type": ["number", "null"]
},
"logs": {
"type": ["array", "null"],
"items": {
"type": "string"
}
},
"status": {
"type": "string"
},
"packet_id": {
"type": ["string", "null"]
}
},
"required": ["time_queued", "time_started", "queue_position", "logs", "status", "packet_id"],
"additionalProperties": false
"required": ["timeQueued", "timeStarted", "queuePosition", "logs", "status", "packetId"],
"additionalProperties": false
}
}
10 changes: 7 additions & 3 deletions man/Queue.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 55 additions & 8 deletions tests/testthat/test-api.R
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,11 @@ test_that("can list orderly reports", {
repo$local,
skip_queue_creation = TRUE
)

res <- endpoint$run(gert::git_branch(repo$local))
expect_equal(res$status_code, 200)
expect_setequal(res$data$name, c("data", "parameters"))
expect_true(all(res$data$updated_time > (Sys.time() - 100)))
expect_false(all(res$data$has_modifications))
expect_true(all(res$data$updatedTime > (Sys.time() - 100)))
expect_false(all(res$data$hasModifications))

## Add a report on a 2nd branch
gert::git_branch_create("other", repo = repo$local)
Expand All @@ -56,7 +55,7 @@ test_that("can list orderly reports", {
existing <- other_res$data[other_res$data$name != "parameters2", ]
expect_equal(existing, res$data)
expect_equal(nrow(params2), 1)
expect_true(params2$has_modifications)
expect_true(params2$hasModifications)

## We can still see all reports on main branch
commits <- gert::git_log(repo = repo$local)$commit
Expand Down Expand Up @@ -115,9 +114,9 @@ test_that("can run orderly reports", {
)

res <- endpoint$run(jsonlite::toJSON(req))
rrq::rrq_task_wait(res$data$task_id, controller = queue$controller)
rrq::rrq_task_wait(res$data$taskId, controller = queue$controller)
expect_equal(
rrq::rrq_task_status(res$data$task_id, controller = queue$controller),
rrq::rrq_task_status(res$data$taskId, controller = queue$controller),
"COMPLETE"
)

Expand All @@ -129,9 +128,57 @@ test_that("can run orderly reports", {
)

res <- endpoint$run(jsonlite::toJSON(req))
rrq::rrq_task_wait(res$data$task_id, controller = queue$controller)
rrq::rrq_task_wait(res$data$taskId, controller = queue$controller)
expect_equal(
rrq::rrq_task_status(res$data$task_id, controller = queue$controller),
rrq::rrq_task_status(res$data$taskId, controller = queue$controller),
"COMPLETE"
)
})

test_that("can get statuses of jobs", {
# run 2 jobs first and wait for finish
skip_if_no_redis()
queue_id <- "orderly.runner:bad-animal"
repo <- test_prepare_orderly_example(c("data", "parameters"))
gert::git_init(repo)
orderly2::orderly_gitignore_update("(root)", root = repo)
git_add_and_commit(repo)
queue <- Queue$new(repo, queue_id = queue_id, logs_dir = tempfile())
worker_manager <- start_queue_workers_quietly(
1, queue$controller
)
make_worker_dirs(repo, worker_manager$id)
endpoint <- withr::with_envvar(
c(ORDERLY_RUNNER_QUEUE_ID = queue_id),
orderly_runner_endpoint("POST", "/report/run", repo)
)
req <- list(
name = scalar("data"),
branch = scalar(gert::git_branch(repo = repo)),
hash = scalar(gert::git_commit_id(repo = repo)),
parameters = scalar(NULL)
)
dat1 <- endpoint$run(jsonlite::toJSON(req))
dat2 <- endpoint$run(jsonlite::toJSON(req))
task_ids <- c(dat1$data$taskId, dat2$data$taskId)
rrq::rrq_task_wait(task_ids, controller = queue$controller)

# status endpoint
endpoint <- withr::with_envvar(
c(ORDERLY_RUNNER_QUEUE_ID = queue_id),
orderly_runner_endpoint("POST", "/report/status", repo)
)
dat <- endpoint$run(TRUE, jsonlite::toJSON(task_ids))$data

for (i in seq_along(task_ids)) {
task_status <- dat[[i]]
task_times <- get_task_times(task_ids[[i]], queue$controller)
expect_equal(task_status$status, scalar("COMPLETE"))
expect_null(scalar(task_status$queuePosition))
expect_equal(task_status$packetId, scalar(get_task_result(task_ids[[i]], queue$controller)))
expect_equal(scalar(task_times[1]), task_status$timeQueued)
expect_equal(scalar(task_times[2]), task_status$timeStarted)
expect_equal(scalar(task_times[3]), task_status$timeComplete)
expect_equal(get_task_logs(task_ids[[i]], queue$controller), unlist(task_status$logs))
}
})
Loading

0 comments on commit f5794bd

Please sign in to comment.