Skip to content

Commit

Permalink
Merge pull request #8 from mrc-ide/mrc-5600-status-endpoint
Browse files Browse the repository at this point in the history
Mrc 5600 - status endpoint
  • Loading branch information
absternator authored Aug 20, 2024
2 parents 9985f51 + d781a99 commit 0f0d2ca
Show file tree
Hide file tree
Showing 15 changed files with 265 additions and 40 deletions.
37 changes: 24 additions & 13 deletions R/api.R
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@
##'
##' @export
api <- function(
root, validate = NULL, log_level = "info",
skip_queue_creation = FALSE
) {
root, validate = NULL, log_level = "info",
skip_queue_creation = FALSE) {
logger <- porcelain::porcelain_logger(log_level)

# Set ORDERLY_RUNNER_QUEUE_ID to specify existing queue id
Expand All @@ -39,22 +38,25 @@ api <- function(

##' @porcelain GET / => json(root)
root <- function() {
versions <- list(orderly2 = package_version_string("orderly2"),
orderly.runner = package_version_string("orderly.runner"))
versions <- list(
orderly2 = package_version_string("orderly2"),
orderly.runner = package_version_string("orderly.runner")
)
lapply(versions, scalar)
}


##' @porcelain
##' @porcelain
##' GET /report/list => json(report_list)
##' query ref :: string
##' state root :: root
report_list <- function(root, ref) {
contents <- gert::git_ls(root, ref = ref)
re <- "^src/([^/]+)/(\\1|orderly)\\.R$"
nms <- sub(re, "\\1",
grep(re, contents$path, value = TRUE, perl = TRUE),
perl = TRUE)
nms <- sub(re, "\\1",
grep(re, contents$path, value = TRUE, perl = TRUE),
perl = TRUE
)
last_changed <- function(nm) {
max(contents$modified[startsWith(contents$path, sprintf("src/%s", nm))])
}
Expand All @@ -64,9 +66,11 @@ report_list <- function(root, ref) {
has_modifications <- vlapply(nms, function(report_name) {
report_name %in% modified_reports
}, USE.NAMES = FALSE)
data.frame(name = nms,
updated_time = updated_time,
has_modifications = has_modifications)
data.frame(
name = nms,
updated_time = updated_time,
has_modifications = has_modifications
)
}


Expand Down Expand Up @@ -98,5 +102,12 @@ submit_report_run <- function(root, queue, data) {
ref = data$hash,
parameters = data$parameters
)
list(taskId = scalar(task_id))
list(task_id = scalar(task_id))
}

##' @porcelain
##' GET /report/status/<task_id:string> => json(report_run_status_response)
##' state queue :: queue
report_run_status <- function(queue, task_id) {
queue$get_status(task_id)
}
9 changes: 9 additions & 0 deletions R/porcelain.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 37 additions & 8 deletions R/queue.R
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#' Object for managing running jobs on the redis queue
#'
#' @keywords internal
Queue <- R6::R6Class("Queue", #nolint
Queue <- R6::R6Class("Queue", # nolint
cloneable = FALSE,
public = list(
#' @field root Orderly root
Expand All @@ -17,12 +17,15 @@ Queue <- R6::R6Class("Queue", #nolint
#' @param root Orderly root.
#' @param queue_id ID of an existing queue to connect to, creates a new one
#' if NULL (default NULL)
initialize = function(root, queue_id = NULL) {
#' @param logs_dir directory to store worker logs
initialize = function(root, queue_id = NULL, logs_dir = "logs/worker") {
self$root <- root
self$config <- orderly2::orderly_config(self$root)
if (!runner_has_git(self$root)) {
cli::cli_abort(paste("Not starting server as orderly",
"root is not version controlled."))
cli::cli_abort(paste(
"Not starting server as orderly",
"root is not version controlled."
))
}

# Connect to Redis
Expand All @@ -33,9 +36,11 @@ Queue <- R6::R6Class("Queue", #nolint
queue_id %||% orderly_queue_id(),
con = con
)
worker_config <- rrq::rrq_worker_config(heartbeat_period = 10)
dir.create(logs_dir, showWarnings = FALSE)
worker_config <- rrq::rrq_worker_config(heartbeat_period = 10, logdir = logs_dir)
rrq::rrq_worker_config_save("localhost", worker_config,
controller = self$controller)
controller = self$controller
)
},

#' @description
Expand All @@ -57,15 +62,39 @@ Queue <- R6::R6Class("Queue", #nolint
ref
)
rrq::rrq_task_create_call(runner_run, run_args,
separate_process = TRUE,
controller = self$controller)
separate_process = TRUE,
controller = self$controller
)
},

# Just until we add queue status for testing
number_of_workers = function() {
rrq::rrq_worker_len(self$controller)
},

#' @description
#' Gets status of packet run
#'
#' @param task_id Id of redis queue job to get status of.
#' @return status of redis queue job
get_status = function(task_id) {
if (!rrq::rrq_task_exists(task_id, controller = self$controller)) {
porcelain::porcelain_stop("Job ID does not exist")
}
status <- rrq::rrq_task_status(task_id, controller = self$controller)
times <- rrq::rrq_task_times(task_id, controller = self$controller)

list(
status = scalar(status),
queue_position = if (status == "PENDING") scalar(rrq::rrq_task_position(task_id, controller = self$controller)) else NULL,
time_queued = scalar(times[1]),
time_started = scalar(times[2]),
time_complete = scalar(times[3]),
packet_id = if (status == "COMPLETE") scalar(rrq::rrq_task_result(task_id, controller = self$controller)) else NULL,
logs = rrq::rrq_task_log(task_id, controller = self$controller)
)
},

#' @description Destroy queue
finalize = function() {
rrq::rrq_destroy(controller = self$controller)
Expand Down
4 changes: 3 additions & 1 deletion R/runner.R
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ runner_run <- function(orderly_root, reportname, parameters, branch, ref, ...) {
git_clean(worker_path)

# Run
withr::with_envvar(
id <- withr::with_envvar(
c(ORDERLY_SRC_ROOT = file.path(worker_path, "src", reportname)),
orderly2::orderly_run(reportname, parameters = parameters,
root = orderly_root, ...)
)

# Cleanup
git_clean(worker_path)

id
}

point_head_to_ref <- function(worker_path, branch, ref) {
Expand Down
1 change: 1 addition & 0 deletions docker/test/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
test-repo
orderly-root-volume
logs-volume
4 changes: 4 additions & 0 deletions docker/test/common
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@ export ORDERLY_VOLUME=$CONTAINER_NAMESPACE-orderly-root
export ORDERLY_RUNNER_IMAGE=ghcr.io/mrc-ide/orderly.runner:main

export CONTAINER_ORDERLY_ROOT_PATH=/orderly-root

export LOGS_DIR=/logs

export ORDERLY_LOGS_VOLUME=$CONTAINER_NAMESPACE-logs
2 changes: 2 additions & 0 deletions docker/test/copy-orderly-root
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ rm -rf $HERE/orderly-root-volume
mkdir -p $HERE/orderly-root-volume

docker cp $DEBUG:$CONTAINER_ORDERLY_ROOT_PATH/. $HERE/orderly-root-volume

docker cp $DEBUG:$LOGS_DIR/. $HERE/logs-volume
4 changes: 4 additions & 0 deletions docker/test/run-test
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ docker run --rm -d \
redis

docker volume create $ORDERLY_VOLUME
docker volume create $ORDERLY_LOGS_VOLUME

docker run --rm -d --pull=missing \
--name=$DEBUG \
-v $ORDERLY_VOLUME:$CONTAINER_ORDERLY_ROOT_PATH \
-v $ORDERLY_LOGS_VOLUME:$LOGS_DIR \
ubuntu \
sleep infinity

Expand All @@ -33,6 +35,7 @@ docker run --rm -d --pull=always \
--env=REDIS_CONTAINER_NAME=$REDIS \
-p 127.0.0.1:8001:8001 \
-v $ORDERLY_VOLUME:$CONTAINER_ORDERLY_ROOT_PATH \
-v $ORDERLY_LOGS_VOLUME:$LOGS_DIR \
$ORDERLY_RUNNER_IMAGE \
$CONTAINER_ORDERLY_ROOT_PATH

Expand All @@ -43,5 +46,6 @@ docker run --rm -d --pull=always \
--env=ORDERLY_RUNNER_QUEUE_ID=$ORDERLY_RUNNER_QUEUE_ID \
--env=REDIS_CONTAINER_NAME=$REDIS \
-v $ORDERLY_VOLUME:$CONTAINER_ORDERLY_ROOT_PATH \
-v $ORDERLY_LOGS_VOLUME:$LOGS_DIR \
$ORDERLY_RUNNER_IMAGE \
$CONTAINER_ORDERLY_ROOT_PATH
4 changes: 2 additions & 2 deletions inst/schema/report_run_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"taskId": {
"task_id": {
"type": "string"
}
},
"required": ["taskId"],
"required": ["task_id"],
"additionalProperties": false
}
35 changes: 35 additions & 0 deletions inst/schema/report_run_status_response.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"time_queued": {
"type": "number",
"description": "time queued in UTC time."
},
"time_started": {
"type": ["number", "null"],
"description": "time started run in UTC time."
},
"time_complete": {
"type": ["number", "null"],
"description": "time completed in UTC time."
},
"queue_position": {
"type": ["number", "null"]
},
"logs": {
"type": ["array", "null"],
"items": {
"type": "string"
}
},
"status": {
"type": "string"
},
"packet_id": {
"type": ["string", "null"]
}
},
"required": ["time_queued", "time_started", "queue_position", "logs", "status", "packet_id"],
"additionalProperties": false
}
21 changes: 21 additions & 0 deletions man/Queue.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 19 additions & 3 deletions tests/testthat/helper-orderly-runner.R
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ start_queue_workers_quietly <- function(n_workers,
start_queue_with_workers <- function(
root, n_workers, env = parent.frame(), queue_id = NULL
) {
q <- new_queue_quietly(root, queue_id = queue_id)
q <- new_queue_quietly(root, queue_id = queue_id, logs_dir = tempfile())
worker_manager <- start_queue_workers_quietly(n_workers, q$controller,
env = env)
make_worker_dirs(root, worker_manager$id)
Expand All @@ -138,10 +138,26 @@ skip_if_no_redis <- function() {
}

expect_worker_task_complete <- function(task_id, controller, n_tries) {
is_task_successful <- rrq::rrq_task_wait(
is_task_successful <- wait_for_task_complete(task_id, controller, n_tries)
expect_true(is_task_successful)
}

wait_for_task_complete <- function(task_id, controller, n_tries) {
rrq::rrq_task_wait(
task_id, controller = controller, timeout = n_tries
)
expect_true(is_task_successful)
}

get_task_times <- function(task_id, controller) {
rrq::rrq_task_times(task_id, controller = controller)
}

get_task_result <- function(task_id, controller) {
rrq::rrq_task_result(task_id, controller = controller)
}

get_task_logs <- function(task_id, controller) {
rrq::rrq_task_log(task_id, controller = controller)
}

initialise_git_repo <- function() {
Expand Down
Loading

0 comments on commit 0f0d2ca

Please sign in to comment.