Skip to content

Commit

Permalink
Added broadcast invalidate status counts to processes (#751)
Browse files Browse the repository at this point in the history
* Add broadcast invalidate status counts to process fail, resume and delete

* Fixed unit test for websocket

* Document websocket channels, mainly /api/ws/events

Signed-off-by: Mark90 <[email protected]>

* Update docs

Signed-off-by: Mark90 <[email protected]>

* resolved comments

---------

Signed-off-by: Mark90 <[email protected]>
Co-authored-by: Mark90 <[email protected]>
  • Loading branch information
Georgi2704 and Mark90 authored Oct 15, 2024
1 parent 17694c1 commit db35b8a
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 8 deletions.
59 changes: 59 additions & 0 deletions docs/reference-docs/websockets.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,62 @@ Backend processes are executed in a threadpool and therefore access the same WSM
To solve this there is a dedicated `ProcessDataBroadcastThread` (attached to and managed by the `OrchestratorCore` app) to perform the actual `broadcast_data()` call.

The API endpoints which start/resume/abort a process, call `api_broadcast_process_data(request)` to acquire a function that can be used to submit process updates into a `threading.Queue` on which `ProcessDataBroadcastThread` listens.

## Channel overview

As mentioned in Implementation, messages are organized into channels that clients can listen to.
Each channel has its own usecase and specific message format.

### Events

The `events` channel is designed for the [Orchestrator UI v2](https://github.com/workfloworchestrator/example-orchestrator-ui).
Events are notifications to the UI/user that something happened in the backend API or workers.
They only include the bare minimum of information and can be sent at a high volume.

The API endpoint for this channel is `/api/ws/events` .

Messages in this channel are of the format:
```json
{"name": name, "value": value}
```

Where `name` and `value` are one of the following combinations:

| name | value | Notifies that |
|----------------------|-----------------------------------------------------------|-------------------------------------------------------|
| `"invalidateCache"` | `{"type": "processes", "id": "LIST"} ` | A process was started, updated or finished [1] |
| `"invalidateCache"` | `{"type": "processes", "id": "<process UUID>"}` | A process was started, updated or finished [1] |
| `"invalidateCache"` | `{"type": "subscriptions", "id": "LIST"} ` | A subscription was created, updated or terminated [1] |
| `"invalidateCache"` | `{"type": "subscriptions", "id": "<subscription UUID>"} ` | A subscription was created, updated or terminated [1] |
| `"invalidateCache"` | `{"type": "processStatusCounts"} ` | A process transitioned to/from a failed state [2] |
| `"invalidateCache"` | `{"type": "engineStatus"} ` | The workflow engine has been enabled or disabled |

<!-- Hint: an editor like VSCode/PyCharm makes editing markdown tables very easy -->

Notes:
1. The `LIST` and `<uuid>` combinations currently mean one and the same. The reason to keep them separate is that we may want to implement throttling on the `LIST` event.
2. The process status count event is triggered when a process:
* Transitions to non-failed state -> count may go down:
1. Non-running process is scheduled to be resumed
2. Non-running process is deleted from database
* Transitions to a failed state -> count goes up:
1. Running process finishes with an error

Example of a complete message:

```json
{"name":"invalidateCache","value":{"type":"engineStatus"}}
```

### Engine settings (deprecated)

The `engine-settings` channel was designed for the now deprecated [Orchestrator UI v1](https://github.com/workfloworchestrator/orchestrator-core-gui).

The API endpoint for this channel is `/api/settings/ws-status/` .

### Processes (deprecated)

The `processes` channel was designed for the now deprecated [Orchestrator UI v1](https://github.com/workfloworchestrator/orchestrator-core-gui).
It sent process list/detail data to the client which it would use to directly update the frontend.

The API endpoint for this channel is `/api/processes/all/` .
14 changes: 11 additions & 3 deletions orchestrator/api/api_v1/endpoints/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,12 @@
from orchestrator.settings import app_settings
from orchestrator.types import JSON, State
from orchestrator.utils.enrich_process import enrich_process
from orchestrator.websocket import WS_CHANNELS, broadcast_process_update_to_websocket, websocket_manager
from orchestrator.websocket import (
WS_CHANNELS,
broadcast_invalidate_status_counts,
broadcast_process_update_to_websocket,
websocket_manager,
)
from orchestrator.workflow import ProcessStatus

router = APIRouter()
Expand Down Expand Up @@ -115,11 +120,12 @@ def delete(process_id: UUID) -> None:
if not process:
raise_status(HTTPStatus.NOT_FOUND)

broadcast_process_update_to_websocket(process.process_id)

db.session.delete(db.session.get(ProcessTable, process_id))
db.session.commit()

broadcast_invalidate_status_counts()
broadcast_process_update_to_websocket(process.process_id)


@router.post(
"/{workflow_key}",
Expand Down Expand Up @@ -159,7 +165,9 @@ def resume_process_endpoint(
if process.last_status == ProcessStatus.RESUMED:
raise_status(HTTPStatus.CONFLICT, "Resuming a resumed workflow is not possible")

broadcast_invalidate_status_counts()
broadcast_func = api_broadcast_process_data(request)

resume_process(process, user=user, user_inputs=json_data, broadcast_func=broadcast_func)


Expand Down
2 changes: 2 additions & 0 deletions orchestrator/services/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from orchestrator.types import BroadcastFunc, State
from orchestrator.utils.datetime import nowtz
from orchestrator.utils.errors import error_state_to_dict
from orchestrator.websocket import broadcast_invalidate_status_counts
from orchestrator.workflow import (
CALLBACK_TOKEN_KEY,
Failed,
Expand Down Expand Up @@ -117,6 +118,7 @@ def _db_create_process(stat: ProcessStat) -> None:

def delete_process(process_id: UUID) -> None:
db.session.execute(delete(ProcessTable).where(ProcessTable.process_id == process_id))
broadcast_invalidate_status_counts()
db.session.commit()


Expand Down
11 changes: 10 additions & 1 deletion orchestrator/websocket/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,15 @@ async def invalidate_subscription_cache(subscription_id: UUID | UUIDstr, invalid
await broadcast_invalidate_cache({"type": "subscriptions", "id": str(subscription_id)})


def broadcast_invalidate_status_counts() -> None:
"""Broadcast message to invalidate the status counts of the connected websocket clients."""
if not websocket_manager.enabled:
logger.debug("WebSocketManager is not enabled. Skip broadcasting through websocket.")
return

sync_broadcast_invalidate_cache({"type": "processStatusCounts"})


def broadcast_process_update_to_websocket(
process_id: UUID,
) -> None:
Expand Down Expand Up @@ -135,8 +144,8 @@ async def broadcast_process_update_to_websocket_async(
__all__ = [
"websocket_manager",
"init_websocket_manager",
"is_process_active",
"broadcast_process_update_to_websocket",
"broadcast_process_update_to_websocket_async",
"WS_CHANNELS",
"broadcast_invalidate_status_counts",
]
12 changes: 11 additions & 1 deletion orchestrator/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,13 @@ def errorlogger(error: ErrorDict) -> None:
logger.error("Workflow returned an error.", **error)


def invalidate_status_counts() -> None:
"""Broadcast invalidate status counts to the websocket."""
from orchestrator.websocket import broadcast_invalidate_status_counts

broadcast_invalidate_status_counts()


def _exec_steps(steps: StepList, starting_process: Process, dblogstep: StepLogFuncInternal) -> Process:
"""Execute the workflow steps one by one until a Process state other than Success or Skipped is reached."""
consolelogger = cond_bind(logger, starting_process.unwrap(), "reporter", "created_by")
Expand Down Expand Up @@ -1426,7 +1433,10 @@ def resume_suspend(process: Process) -> Process:
# This enables recursive step execution of sub steps with the same StepLogFunc.
# Should probably be refactored at some point as contextvars is a kind of global state.
step_log_fn_var.set(_logstep)
return _exec_steps(steps, next_state, _logstep)
executed_steps = _exec_steps(steps, next_state, _logstep)
if executed_steps.overall_status == ProcessStatus.FAILED:
invalidate_status_counts()
return executed_steps


def abort_wf(pstat: ProcessStat, logstep: StepLogFunc) -> Process:
Expand Down
7 changes: 4 additions & 3 deletions test/unit_tests/api/test_processes_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,15 @@ def test_websocket_process_detail_with_suspend(test_client, test_workflow):
response = test_client.put(f"/api/processes/{process_id}/resume", json=[user_input])
assert HTTPStatus.NO_CONTENT == response.status_code

def get_ws_messages():
return [websocket.receive_json(), websocket.receive_json()]

expected_cache_invalidation_messages = [
{"name": "invalidateCache", "value": {"type": "processStatusCounts"}},
{"name": "invalidateCache", "value": {"type": "processes", "id": "LIST"}},
{"name": "invalidateCache", "value": {"type": "processes", "id": str(process_id)}},
]

def get_ws_messages():
return [websocket.receive_json() for _ in expected_cache_invalidation_messages]

assert get_ws_messages() == expected_cache_invalidation_messages

# close and call receive_text to check websocket close exception
Expand Down

0 comments on commit db35b8a

Please sign in to comment.