Skip to content

Commit

Permalink
Merge branch 'main' into bgrande/ORCA-163/tower-launch-workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
BrunoGrandePhD committed May 1, 2023
2 parents a906559 + db0785f commit 1069c75
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 2 deletions.
16 changes: 16 additions & 0 deletions src/orca/services/nextflowtower/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,3 +321,19 @@ def launch_workflow(
payload = launch_info.to_dict()
json = self.post(path, params=params, json=payload)
return self.unwrap(json, "workflowId")

def get_workflow(self, workspace_id: int, workflow_id: str) -> dict:
"""Gets available information about a workflow run
Attributes:
workspace_id (int): The ID number of the workspace the workflow
exists within.
workflow_id (str): The ID number for a workflow run to get
information about.
Returns:
response (dict): Dictionary containing information about the workflow run
"""
path = f"/workflow/{workflow_id}"
json = self.get(path=path, params={"workspaceId": workspace_id})
return json
18 changes: 18 additions & 0 deletions src/orca/services/nextflowtower/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from collections.abc import Collection
from dataclasses import field
from datetime import datetime
from enum import Enum
from typing import Any, Optional

from pydantic.dataclasses import dataclass
Expand All @@ -10,6 +11,23 @@
from orca.services.nextflowtower.utils import parse_datetime


class TaskStatus(Enum):
"""enum containing all possible status values for
Nextflow Tower runs. terminal_states set which
statuses result in a run being determined to be
"complete"
"""

SUBMITTED = "SUBMITTED"
RUNNING = "RUNNING"
SUCCEEDED = "SUCCEEDED"
FAILED = "FAILED"
CANCELLED = "CANCELLED"
UNKNOWN = "UNKNOWN"

terminal_states = [SUCCEEDED, FAILED, CANCELLED, UNKNOWN]


@dataclass(kw_only=False)
class User:
"""Nextflow Tower user."""
Expand Down
22 changes: 20 additions & 2 deletions src/orca/services/nextflowtower/ops.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from functools import cached_property
from typing import ClassVar, Optional
from typing import ClassVar, Optional, cast

from pydantic.dataclasses import dataclass

Expand All @@ -8,7 +8,7 @@
from orca.services.nextflowtower.client import NextflowTowerClient
from orca.services.nextflowtower.client_factory import NextflowTowerClientFactory
from orca.services.nextflowtower.config import NextflowTowerConfig
from orca.services.nextflowtower.models import LaunchInfo
from orca.services.nextflowtower.models import LaunchInfo, TaskStatus


@dataclass(kw_only=False)
Expand Down Expand Up @@ -130,3 +130,21 @@ def launch_workflow(
launch_info.fill_in("label_ids", label_ids)

return self.client.launch_workflow(launch_info, self.workspace_id)

def get_workflow_status(self, workflow_id: str) -> tuple[TaskStatus, bool]:
"""Gets status of workflow run
Args:
workflow_id (str): The ID number for a workflow run to get information about
Returns:
tuple: Tuple containing 1. status (str) and
2. Whether the workflow is done (boolean)
"""
response = self.client.get_workflow(
workspace_id=self.workspace_id, workflow_id=workflow_id
)
task_status = cast(TaskStatus, response["workflow"]["status"])
is_done = task_status in TaskStatus.terminal_states.value
# TODO: Consider switching return value to a namedtuple
return task_status, is_done
89 changes: 89 additions & 0 deletions tests/services/nextflowtower/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,92 @@


launch_workflow = {"workflowId": "23LNH"}


get_workflow: dict = {
"workflow": {
"id": "123456789",
"submit": "2023-04-28T16:22:31Z",
"start": "2023-04-28T16:30:44Z",
"complete": "2023-04-28T16:30:54Z",
"dateCreated": "2023-04-28T16:22:31Z",
"lastUpdated": "2023-04-28T16:30:54Z",
"runName": "example-run",
"sessionId": "abc-abc-abc-abc-abc",
"profile": "standard",
"workDir": "s3://example-bucket/work",
"commitId": "123",
"userName": "example-user",
"scriptId": "123",
"revision": None,
"commandLine": "nextflow run nextflow-io/example-workflow\
-name example-run -with-tower\
'https://tower.sagebionetworks.org/api'\
-r 123 -resume abc-abc-abc-abc-abc",
"projectName": "nextflow-io/example-workflow",
"scriptName": "main.nf",
"launchId": "abc",
"status": "SUCCEEDED",
"configFiles": [
"/.nextflow/assets/nextflow-io/example-workflow/nextflow.config",
"/nextflow.config",
],
"params": {},
"configText": "example-config",
"manifest": {
"nextflowVersion": None,
"defaultBranch": "master",
"version": None,
"homePage": None,
"gitmodules": None,
"description": None,
"name": None,
"mainScript": "main.nf",
"author": None,
},
"nextflow": {
"version": "22.10.6",
"build": "5843",
"timestamp": "2023-01-23T23:20:00Z",
},
"stats": {
"computeTimeFmt": "(a few seconds)",
"cachedCount": 4,
"failedCount": 0,
"ignoredCount": 0,
"succeedCount": 0,
"cachedCountFmt": "4",
"succeedCountFmt": "0",
"failedCountFmt": "0",
"ignoredCountFmt": "0",
"cachedPct": 100.0,
"failedPct": 0.0,
"succeedPct": 0.0,
"ignoredPct": 0.0,
"cachedDuration": 0,
"failedDuration": 0,
"succeedDuration": 0,
},
"errorMessage": None,
"errorReport": None,
"deleted": None,
"peakLoadCpus": None,
"peakLoadTasks": None,
"peakLoadMemory": None,
"projectDir": "/.nextflow/assets/nextflow-io/example-workflow",
"homeDir": "/root",
"container": "quay.io/nextflow/bash",
"repository": "https://github.com/nextflow-io/example-workflow",
"containerEngine": None,
"scriptFile": "/.nextflow/assets/nextflow-io/example-workflow/main.nf",
"launchDir": "/",
"duration": 10508,
"exitStatus": 0,
"resume": True,
"success": True,
"logFile": None,
"outFile": None,
"operationId": None,
"ownerId": 28,
}
}
9 changes: 9 additions & 0 deletions tests/services/nextflowtower/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,12 @@ def test_that_launch_workflow_works(client, mocker, get_response):
)
client.launch_workflow(launch_spec)
mock.assert_called_once()


def test_that_get_workflow_returns_expected_response(client, mocker, get_response):
expected = get_response("get_workflow")
mock = mocker.patch.object(client, "get")
mock.return_value = expected
actual = client.get_workflow(workspace_id=98765, workflow_id="123456789")
mock.assert_called_once()
assert actual == expected
17 changes: 17 additions & 0 deletions tests/services/nextflowtower/test_enums.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from orca.services.nextflowtower.models import TaskStatus


def test_that_TaskStatus_contant_values_are_correct():
assert TaskStatus.SUBMITTED.value == "SUBMITTED"
assert TaskStatus.RUNNING.value == "RUNNING"
assert TaskStatus.SUCCEEDED.value == "SUCCEEDED"
assert TaskStatus.FAILED.value == "FAILED"
assert TaskStatus.CANCELLED.value == "CANCELLED"
assert TaskStatus.UNKNOWN.value == "UNKNOWN"


def test_that_TaskStatus_terminal_states_are_in_terminal_states_list():
assert TaskStatus.SUCCEEDED.value in TaskStatus.terminal_states.value
assert TaskStatus.FAILED.value in TaskStatus.terminal_states.value
assert TaskStatus.CANCELLED.value in TaskStatus.terminal_states.value
assert TaskStatus.UNKNOWN.value in TaskStatus.terminal_states.value
24 changes: 24 additions & 0 deletions tests/services/nextflowtower/test_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,27 @@ def test_that_launch_workflow_works(mocked_ops, get_response, mocker):
mocked_ops.launch_workflow(launch_info, "ondemand")
mocked_ops.client.launch_workflow.assert_called_once()
assert launch_info.compute_env_id == compute_env.id


def test_that_get_workflow_status_returns_expected_tuple_workflow_is_complete(
mocker, get_response, mocked_ops
):
response = get_response("get_workflow")
mock = mocker.patch.object(mocked_ops, "client")
mock.get_workflow.return_value = response
result = mocked_ops.get_workflow_status(workflow_id="123456789")
mock.get_workflow.assert_called_once()
assert result == ("SUCCEEDED", True)


def test_that_get_workflow_status_returns_expected_tuple_workflow_is_not_complete(
mocked_ops, mocker, get_response
):
response = get_response("get_workflow")
response["workflow"]["complete"] = None
response["workflow"]["status"] = "SUBMITTED"
mock = mocker.patch.object(mocked_ops, "client")
mock.get_workflow.return_value = response
result = mocked_ops.get_workflow_status(workflow_id="123456789")
mock.get_workflow.assert_called_once()
assert result == ("SUBMITTED", False)

0 comments on commit 1069c75

Please sign in to comment.