Skip to content

Commit

Permalink
Merge pull request #16 from Sage-Bionetworks-Workflows/bwmac/orca-164…
Browse files Browse the repository at this point in the history
…/tower_execution_status

[ORCA-164] adds `get_workflow` and `get_workflow_status` methods to ops
  • Loading branch information
Bruno Grande authored May 1, 2023
2 parents 56a4060 + 0385b89 commit db0785f
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 3 deletions.
16 changes: 16 additions & 0 deletions src/orca/services/nextflowtower/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,19 @@ def list_user_workspaces(self) -> list[dict[str, Any]]:
continue
workspaces.append(workspace)
return workspaces

def get_workflow(self, workspace_id: int, workflow_id: str) -> dict:
"""Gets available information about a workflow run
Attributes:
workspace_id (int): The ID number of the workspace the workflow
exists within.
workflow_id (str): The ID number for a workflow run to get
information about.
Returns:
response (dict): Dictionary containing information about the workflow run
"""
path = f"/workflow/{workflow_id}"
response = self.get(path=path, params={"workspaceId": workspace_id})
return response
18 changes: 18 additions & 0 deletions src/orca/services/nextflowtower/models/enums.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from enum import Enum


class TaskStatus(Enum):
"""enum containing all possible status values for
Nextflow Tower runs. terminal_states set which
statuses result in a run being determined to be
"complete"
"""

SUBMITTED = "SUBMITTED"
RUNNING = "RUNNING"
SUCCEEDED = "SUCCEEDED"
FAILED = "FAILED"
CANCELLED = "CANCELLED"
UNKNOWN = "UNKNOWN"

terminal_states = [SUCCEEDED, FAILED, CANCELLED, UNKNOWN]
20 changes: 20 additions & 0 deletions src/orca/services/nextflowtower/ops.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from functools import cached_property
from typing import cast

from pydantic.dataclasses import dataclass

from orca.errors import ConfigError
from orca.services.base.ops import BaseOps
from orca.services.nextflowtower.client_factory import NextflowTowerClientFactory
from orca.services.nextflowtower.config import NextflowTowerConfig
from orca.services.nextflowtower.models.enums import TaskStatus


@dataclass(kw_only=False)
Expand Down Expand Up @@ -41,3 +43,21 @@ def workspace(self) -> str:
message = f"Config ({self.config}) does not specify a workspace."
raise ConfigError(message)
return self.config.workspace

def get_workflow_status(self, workflow_id: str) -> tuple[TaskStatus, bool]:
"""Gets status of workflow run
Args:
workflow_id (str): The ID number for a workflow run to get information about
Returns:
tuple: Tuple containing 1. status (str) and
2. Whether the workflow is done (boolean)
"""
response = self.client.get_workflow(
workspace_id=self.workspace_id, workflow_id=workflow_id
)
task_status = cast(TaskStatus, response["workflow"]["status"])
is_done = task_status in TaskStatus.terminal_states.value
# TODO consider switching return value to a namedtuple
return task_status, is_done
11 changes: 8 additions & 3 deletions tests/services/nextflowtower/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,14 @@ def config(patch_os_environ):


@pytest.fixture
def ops(config, client, mocker):
mock = mocker.patch.object(NextflowTowerOps, "client")
mock.return_value = client
def ops(config):
yield NextflowTowerOps(config)


@pytest.fixture
def mocked_ops(config, client, mocker):
mocker.patch.object(NextflowTowerOps, "client", return_value=client)
mocker.patch.object(NextflowTowerOps, "workspace_id", return_value=98765)
yield NextflowTowerOps(config)


Expand Down
88 changes: 88 additions & 0 deletions tests/services/nextflowtower/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,91 @@
},
]
}

get_workflow: dict = {
"workflow": {
"id": "123456789",
"submit": "2023-04-28T16:22:31Z",
"start": "2023-04-28T16:30:44Z",
"complete": "2023-04-28T16:30:54Z",
"dateCreated": "2023-04-28T16:22:31Z",
"lastUpdated": "2023-04-28T16:30:54Z",
"runName": "example-run",
"sessionId": "abc-abc-abc-abc-abc",
"profile": "standard",
"workDir": "s3://example-bucket/work",
"commitId": "123",
"userName": "example-user",
"scriptId": "123",
"revision": None,
"commandLine": "nextflow run nextflow-io/example-workflow\
-name example-run -with-tower\
'https://tower.sagebionetworks.org/api'\
-r 123 -resume abc-abc-abc-abc-abc",
"projectName": "nextflow-io/example-workflow",
"scriptName": "main.nf",
"launchId": "abc",
"status": "SUCCEEDED",
"configFiles": [
"/.nextflow/assets/nextflow-io/example-workflow/nextflow.config",
"/nextflow.config",
],
"params": {},
"configText": "example-config",
"manifest": {
"nextflowVersion": None,
"defaultBranch": "master",
"version": None,
"homePage": None,
"gitmodules": None,
"description": None,
"name": None,
"mainScript": "main.nf",
"author": None,
},
"nextflow": {
"version": "22.10.6",
"build": "5843",
"timestamp": "2023-01-23T23:20:00Z",
},
"stats": {
"computeTimeFmt": "(a few seconds)",
"cachedCount": 4,
"failedCount": 0,
"ignoredCount": 0,
"succeedCount": 0,
"cachedCountFmt": "4",
"succeedCountFmt": "0",
"failedCountFmt": "0",
"ignoredCountFmt": "0",
"cachedPct": 100.0,
"failedPct": 0.0,
"succeedPct": 0.0,
"ignoredPct": 0.0,
"cachedDuration": 0,
"failedDuration": 0,
"succeedDuration": 0,
},
"errorMessage": None,
"errorReport": None,
"deleted": None,
"peakLoadCpus": None,
"peakLoadTasks": None,
"peakLoadMemory": None,
"projectDir": "/.nextflow/assets/nextflow-io/example-workflow",
"homeDir": "/root",
"container": "quay.io/nextflow/bash",
"repository": "https://github.com/nextflow-io/example-workflow",
"containerEngine": None,
"scriptFile": "/.nextflow/assets/nextflow-io/example-workflow/main.nf",
"launchDir": "/",
"duration": 10508,
"exitStatus": 0,
"resume": True,
"success": True,
"logFile": None,
"outFile": None,
"operationId": None,
"ownerId": 28,
}
}
9 changes: 9 additions & 0 deletions tests/services/nextflowtower/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,12 @@ def test_that_list_user_workspaces_fails_with_nonstandard_response(client, mocke
mock.return_value = {"message": "foobar"}
with pytest.raises(HTTPError):
client.list_user_workspaces()


def test_that_get_workflow_returns_expected_response(client, mocker, get_response):
expected = get_response("get_workflow")
mock = mocker.patch.object(client, "get")
mock.return_value = expected
actual = client.get_workflow(workspace_id=98765, workflow_id="123456789")
mock.assert_called_once()
assert actual == expected
17 changes: 17 additions & 0 deletions tests/services/nextflowtower/test_enums.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from orca.services.nextflowtower.models.enums import TaskStatus


def test_that_TaskStatus_contant_values_are_correct():
assert TaskStatus.SUBMITTED.value == "SUBMITTED"
assert TaskStatus.RUNNING.value == "RUNNING"
assert TaskStatus.SUCCEEDED.value == "SUCCEEDED"
assert TaskStatus.FAILED.value == "FAILED"
assert TaskStatus.CANCELLED.value == "CANCELLED"
assert TaskStatus.UNKNOWN.value == "UNKNOWN"


def test_that_TaskStatus_terminal_states_are_in_terminal_states_list():
assert TaskStatus.SUCCEEDED.value in TaskStatus.terminal_states.value
assert TaskStatus.FAILED.value in TaskStatus.terminal_states.value
assert TaskStatus.CANCELLED.value in TaskStatus.terminal_states.value
assert TaskStatus.UNKNOWN.value in TaskStatus.terminal_states.value
24 changes: 24 additions & 0 deletions tests/services/nextflowtower/test_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,27 @@ def test_for_error_when_the_workspace_id_does_not_exist(ops, mocker, get_respons
mock.list_user_workspaces.return_value = items_filtered
with pytest.raises(ValueError):
ops.workspace_id


def test_that_get_workflow_status_returns_expected_tuple_workflow_is_complete(
mocker, get_response, mocked_ops
):
response = get_response("get_workflow")
mock = mocker.patch.object(mocked_ops, "client")
mock.get_workflow.return_value = response
result = mocked_ops.get_workflow_status(workflow_id="123456789")
mock.get_workflow.assert_called_once()
assert result == ("SUCCEEDED", True)


def test_that_get_workflow_status_returns_expected_tuple_workflow_is_not_complete(
mocked_ops, mocker, get_response
):
response = get_response("get_workflow")
response["workflow"]["complete"] = None
response["workflow"]["status"] = "SUBMITTED"
mock = mocker.patch.object(mocked_ops, "client")
mock.get_workflow.return_value = response
result = mocked_ops.get_workflow_status(workflow_id="123456789")
mock.get_workflow.assert_called_once()
assert result == ("SUBMITTED", False)

0 comments on commit db0785f

Please sign in to comment.