Skip to content

Commit 7fe5a06

Browse files
[Workflows] Fix run notifications (mlrun#5783)
1 parent 9b591da commit 7fe5a06

File tree

3 files changed

+125
-1
lines changed

3 files changed

+125
-1
lines changed

mlrun/utils/notifications/notification/webhook.py

+23
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,29 @@ async def push(
6969
request_body["custom_html"] = custom_html
7070

7171
if override_body:
72+
list_edit_runs = []
73+
for run in runs:
74+
if hasattr(run, "to_dict"):
75+
run = run.to_dict()
76+
if isinstance(run, dict):
77+
parsed_run = {
78+
"project": run["metadata"]["project"],
79+
"name": run["metadata"]["name"],
80+
"host": run["metadata"]["labels"]["host"],
81+
"status": {"state": run["status"]["state"]},
82+
}
83+
if run["status"].get("error", None):
84+
parsed_run["status"]["error"] = run["status"]["error"]
85+
elif run["status"].get("results", None):
86+
parsed_run["status"]["results"] = run["status"]["results"]
87+
list_edit_runs.append(parsed_run)
88+
runs_value = str(list_edit_runs)
89+
if isinstance(override_body, dict):
90+
for key, value in override_body.items():
91+
if "{{ runs }}" in value:
92+
override_body[key] = value.replace("{{ runs }}", runs_value)
93+
elif "{{runs}}" in value:
94+
override_body[key] = value.replace("{{runs}}", runs_value)
7295
request_body = override_body
7396

7497
# Specify the `verify_ssl` parameter value only for HTTPS urls.

mlrun/utils/notifications/notification_pusher.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,7 @@ def _add_run_step(_step: mlrun_pipelines.models.PipelineStep):
397397
try:
398398
_run = db.list_runs(
399399
project=run.metadata.project,
400-
labels=f"mlrun_constants.MLRunInternalLabels.runner_pod={_step.node_name}",
400+
labels=f"{mlrun_constants.MLRunInternalLabels.runner_pod}={_step.node_name}",
401401
)[0]
402402
except IndexError:
403403
_run = {

tests/utils/test_notifications.py

+101
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import server.api.api.utils
3030
import server.api.constants
3131
import server.api.crud
32+
from mlrun.utils.notifications.notification.webhook import WebhookNotification
3233

3334

3435
@pytest.mark.parametrize(
@@ -196,6 +197,46 @@ def test_condition_evaluation_timeout():
196197
assert notification_pusher._should_notify(run, notification)
197198

198199

200+
@pytest.mark.parametrize(
201+
"override_body",
202+
[({"message": "runs: {{runs}}"}), ({"message": "runs: {{ runs }}"})],
203+
)
204+
async def test_webhook_overide_body_job_succeed(monkeypatch, override_body):
205+
requests_mock = _mock_async_response(monkeypatch, "post", {"id": "response-id"})
206+
runs = _generate_run_result(state="completed", results={"return": 1})
207+
await WebhookNotification(
208+
params={"override_body": override_body, "url": "http://test.com"}
209+
).push("test-message", "info", [runs])
210+
expected_body = {
211+
"message": "runs: [{'project': 'test-remote-workflow', 'name': 'func-func', 'host': 'func-func-8lvl8', "
212+
"'status': {'state': 'completed', 'results': {'return': 1}}}]"
213+
}
214+
requests_mock.assert_called_once_with(
215+
"http://test.com", headers={}, json=expected_body, ssl=None
216+
)
217+
218+
219+
@pytest.mark.parametrize(
220+
"override_body",
221+
[({"message": "runs: {{runs}}"}), ({"message": "runs: {{ runs }}"})],
222+
)
223+
async def test_webhook_overide_body_job_failed(monkeypatch, override_body):
224+
requests_mock = _mock_async_response(monkeypatch, "post", {"id": "response-id"})
225+
runs = _generate_run_result(
226+
state="error", error='can only concatenate str (not "int") to str'
227+
)
228+
await WebhookNotification(
229+
params={"override_body": override_body, "url": "http://test.com"}
230+
).push("test-message", "info", [runs])
231+
expected_body = {
232+
"message": "runs: [{'project': 'test-remote-workflow', 'name': 'func-func', 'host': 'func-func-8lvl8', "
233+
"'status': {'state': 'error', 'error': 'can only concatenate str (not \"int\") to str'}}]"
234+
}
235+
requests_mock.assert_called_once_with(
236+
"http://test.com", headers={}, json=expected_body, ssl=None
237+
)
238+
239+
199240
@pytest.mark.parametrize(
200241
"runs,expected,is_table",
201242
[
@@ -938,3 +979,63 @@ def _mock_async_response(monkeypatch, method, result):
938979
monkeypatch.setattr(aiohttp.ClientSession, method, requests_mock)
939980

940981
return requests_mock
982+
983+
984+
def _generate_run_result(state: str, error: str = None, results: dict = None):
985+
run_example = {
986+
"status": {
987+
"notifications": {
988+
"Test": {"status": "pending", "sent_time": None, "reason": None}
989+
},
990+
"last_update": "2024-06-18T13:46:37.686443+00:00",
991+
"start_time": "2024-06-18T13:46:37.392158+00:00",
992+
},
993+
"metadata": {
994+
"uid": "b176e54e4ed24b28883aa69dce981601",
995+
"project": "test-remote-workflow",
996+
"name": "func-func",
997+
"labels": {
998+
"v3io_user": "admin",
999+
"kind": "job",
1000+
"owner": "admin",
1001+
"mlrun/client_version": "1.7.0-rc21",
1002+
"mlrun/client_python_version": "3.9.18",
1003+
"host": "func-func-8lvl8",
1004+
},
1005+
"iteration": 0,
1006+
},
1007+
"spec": {
1008+
"function": "test-remote-workflow/func@8e0ddc3926470d5b97733679bb96738fa6dfd01b",
1009+
"parameters": {"x": 1},
1010+
"state_thresholds": {
1011+
"pending_scheduled": "1h",
1012+
"pending_not_scheduled": "-1",
1013+
"image_pull_backoff": "1h",
1014+
"executing": "24h",
1015+
},
1016+
"output_path": "v3io:///projects/test-remote-workflow/artifacts",
1017+
"notifications": [
1018+
{
1019+
"when": ["error", "completed"],
1020+
"name": "Test",
1021+
"params": {
1022+
"url": "https://webhook.site/5da7ac4d-39dc-4896-b18f-e13c5712a96a",
1023+
"method": "POST",
1024+
},
1025+
"message": "",
1026+
"status": "pending",
1027+
"condition": "",
1028+
"kind": "webhook",
1029+
"severity": "info",
1030+
}
1031+
],
1032+
"handler": "func",
1033+
},
1034+
}
1035+
if state == "completed":
1036+
run_example["status"]["results"] = results
1037+
run_example["status"]["state"] = state
1038+
elif state == "error":
1039+
run_example["status"]["error"] = error
1040+
run_example["status"]["state"] = state
1041+
return run_example

0 commit comments

Comments
 (0)