diff --git a/docs/reference-docs/websockets.md b/docs/reference-docs/websockets.md index 5bfed1f09..7662587ee 100644 --- a/docs/reference-docs/websockets.md +++ b/docs/reference-docs/websockets.md @@ -46,3 +46,62 @@ Backend processes are executed in a threadpool and therefore access the same WSM To solve this there is a dedicated `ProcessDataBroadcastThread` (attached to and managed by the `OrchestratorCore` app) to perform the actual `broadcast_data()` call. The API endpoints which start/resume/abort a process, call `api_broadcast_process_data(request)` to acquire a function that can be used to submit process updates into a `threading.Queue` on which `ProcessDataBroadcastThread` listens. + +## Channel overview + +As mentioned in Implementation, messages are organized into channels that clients can listen to. +Each channel has its own usecase and specific message format. + +### Events + +The `events` channel is designed for the [Orchestrator UI v2](https://github.com/workfloworchestrator/example-orchestrator-ui). +Events are notifications to the UI/user that something happened in the backend API or workers. +They only include the bare minimum of information and can be sent at a high volume. + +The API endpoint for this channel is `/api/ws/events` . + +Messages in this channel are of the format: +```json +{"name": name, "value": value} +``` + +Where `name` and `value` are one of the following combinations: + +| name | value | Notifies that | +|----------------------|-----------------------------------------------------------|-------------------------------------------------------| +| `"invalidateCache"` | `{"type": "processes", "id": "LIST"} ` | A process was started, updated or finished [1] | +| `"invalidateCache"` | `{"type": "processes", "id": ""}` | A process was started, updated or finished [1] | +| `"invalidateCache"` | `{"type": "subscriptions", "id": "LIST"} ` | A subscription was created, updated or terminated [1] | +| `"invalidateCache"` | `{"type": "subscriptions", "id": ""} ` | A subscription was created, updated or terminated [1] | +| `"invalidateCache"` | `{"type": "processStatusCounts"} ` | A process transitioned to/from a failed state [2] | +| `"invalidateCache"` | `{"type": "engineStatus"} ` | The workflow engine has been enabled or disabled | + + + +Notes: +1. The `LIST` and `` combinations currently mean one and the same. The reason to keep them separate is that we may want to implement throttling on the `LIST` event. +2. The process status count event is triggered when a process: + * Transitions to non-failed state -> count may go down: + 1. Non-running process is scheduled to be resumed + 2. Non-running process is deleted from database + * Transitions to a failed state -> count goes up: + 1. Running process finishes with an error + +Example of a complete message: + +```json +{"name":"invalidateCache","value":{"type":"engineStatus"}} +``` + +### Engine settings (deprecated) + +The `engine-settings` channel was designed for the now deprecated [Orchestrator UI v1](https://github.com/workfloworchestrator/orchestrator-core-gui). + +The API endpoint for this channel is `/api/settings/ws-status/` . + +### Processes (deprecated) + +The `processes` channel was designed for the now deprecated [Orchestrator UI v1](https://github.com/workfloworchestrator/orchestrator-core-gui). +It sent process list/detail data to the client which it would use to directly update the frontend. + +The API endpoint for this channel is `/api/processes/all/` . diff --git a/orchestrator/api/api_v1/endpoints/processes.py b/orchestrator/api/api_v1/endpoints/processes.py index d12ceb93d..e0277e855 100644 --- a/orchestrator/api/api_v1/endpoints/processes.py +++ b/orchestrator/api/api_v1/endpoints/processes.py @@ -63,7 +63,12 @@ from orchestrator.settings import app_settings from orchestrator.types import JSON, State from orchestrator.utils.enrich_process import enrich_process -from orchestrator.websocket import WS_CHANNELS, broadcast_process_update_to_websocket, websocket_manager +from orchestrator.websocket import ( + WS_CHANNELS, + broadcast_invalidate_status_counts, + broadcast_process_update_to_websocket, + websocket_manager, +) from orchestrator.workflow import ProcessStatus router = APIRouter() @@ -115,11 +120,12 @@ def delete(process_id: UUID) -> None: if not process: raise_status(HTTPStatus.NOT_FOUND) - broadcast_process_update_to_websocket(process.process_id) - db.session.delete(db.session.get(ProcessTable, process_id)) db.session.commit() + broadcast_invalidate_status_counts() + broadcast_process_update_to_websocket(process.process_id) + @router.post( "/{workflow_key}", @@ -159,7 +165,9 @@ def resume_process_endpoint( if process.last_status == ProcessStatus.RESUMED: raise_status(HTTPStatus.CONFLICT, "Resuming a resumed workflow is not possible") + broadcast_invalidate_status_counts() broadcast_func = api_broadcast_process_data(request) + resume_process(process, user=user, user_inputs=json_data, broadcast_func=broadcast_func) diff --git a/orchestrator/services/processes.py b/orchestrator/services/processes.py index de7a3ec57..d73779630 100644 --- a/orchestrator/services/processes.py +++ b/orchestrator/services/processes.py @@ -42,6 +42,7 @@ from orchestrator.types import BroadcastFunc, State from orchestrator.utils.datetime import nowtz from orchestrator.utils.errors import error_state_to_dict +from orchestrator.websocket import broadcast_invalidate_status_counts from orchestrator.workflow import ( CALLBACK_TOKEN_KEY, Failed, @@ -117,6 +118,7 @@ def _db_create_process(stat: ProcessStat) -> None: def delete_process(process_id: UUID) -> None: db.session.execute(delete(ProcessTable).where(ProcessTable.process_id == process_id)) + broadcast_invalidate_status_counts() db.session.commit() diff --git a/orchestrator/websocket/__init__.py b/orchestrator/websocket/__init__.py index 9d0d2c23f..065608848 100644 --- a/orchestrator/websocket/__init__.py +++ b/orchestrator/websocket/__init__.py @@ -105,6 +105,15 @@ async def invalidate_subscription_cache(subscription_id: UUID | UUIDstr, invalid await broadcast_invalidate_cache({"type": "subscriptions", "id": str(subscription_id)}) +def broadcast_invalidate_status_counts() -> None: + """Broadcast message to invalidate the status counts of the connected websocket clients.""" + if not websocket_manager.enabled: + logger.debug("WebSocketManager is not enabled. Skip broadcasting through websocket.") + return + + sync_broadcast_invalidate_cache({"type": "processStatusCounts"}) + + def broadcast_process_update_to_websocket( process_id: UUID, ) -> None: @@ -135,8 +144,8 @@ async def broadcast_process_update_to_websocket_async( __all__ = [ "websocket_manager", "init_websocket_manager", - "is_process_active", "broadcast_process_update_to_websocket", "broadcast_process_update_to_websocket_async", "WS_CHANNELS", + "broadcast_invalidate_status_counts", ] diff --git a/orchestrator/workflow.py b/orchestrator/workflow.py index 8445c51c6..198bf2596 100644 --- a/orchestrator/workflow.py +++ b/orchestrator/workflow.py @@ -1360,6 +1360,13 @@ def errorlogger(error: ErrorDict) -> None: logger.error("Workflow returned an error.", **error) +def invalidate_status_counts() -> None: + """Broadcast invalidate status counts to the websocket.""" + from orchestrator.websocket import broadcast_invalidate_status_counts + + broadcast_invalidate_status_counts() + + def _exec_steps(steps: StepList, starting_process: Process, dblogstep: StepLogFuncInternal) -> Process: """Execute the workflow steps one by one until a Process state other than Success or Skipped is reached.""" consolelogger = cond_bind(logger, starting_process.unwrap(), "reporter", "created_by") @@ -1426,7 +1433,10 @@ def resume_suspend(process: Process) -> Process: # This enables recursive step execution of sub steps with the same StepLogFunc. # Should probably be refactored at some point as contextvars is a kind of global state. step_log_fn_var.set(_logstep) - return _exec_steps(steps, next_state, _logstep) + executed_steps = _exec_steps(steps, next_state, _logstep) + if executed_steps.overall_status == ProcessStatus.FAILED: + invalidate_status_counts() + return executed_steps def abort_wf(pstat: ProcessStat, logstep: StepLogFunc) -> Process: diff --git a/test/unit_tests/api/test_processes_ws.py b/test/unit_tests/api/test_processes_ws.py index 4124fce38..9e91ac0b3 100644 --- a/test/unit_tests/api/test_processes_ws.py +++ b/test/unit_tests/api/test_processes_ws.py @@ -200,14 +200,15 @@ def test_websocket_process_detail_with_suspend(test_client, test_workflow): response = test_client.put(f"/api/processes/{process_id}/resume", json=[user_input]) assert HTTPStatus.NO_CONTENT == response.status_code - def get_ws_messages(): - return [websocket.receive_json(), websocket.receive_json()] - expected_cache_invalidation_messages = [ + {"name": "invalidateCache", "value": {"type": "processStatusCounts"}}, {"name": "invalidateCache", "value": {"type": "processes", "id": "LIST"}}, {"name": "invalidateCache", "value": {"type": "processes", "id": str(process_id)}}, ] + def get_ws_messages(): + return [websocket.receive_json() for _ in expected_cache_invalidation_messages] + assert get_ws_messages() == expected_cache_invalidation_messages # close and call receive_text to check websocket close exception