Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

217 add functionality to send callback messages #225

Merged
merged 11 commits into from
Jan 10, 2024
97 changes: 97 additions & 0 deletions doc/source/user_guide/ProgressReporter.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Progress reporting for long running tasks"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from oda_api.api import ProgressReporter\n",
"import time"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Initialise ProgressReporter"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"pr = ProgressReporter()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Split your task into subtasks to enable progress report\n",
"use ProgressReporter.report_progress method to send progress reports"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"n_steps = 5\n",
"for step in range(n_steps):\n",
" stage = f\"simulation stage {step}\"\n",
" progress = 100 * step // n_steps\n",
" n_substeps = 10\n",
" \n",
" # optionally define subtasks\n",
" for substep in range(n_substeps):\n",
" substage = f\"subtask {substep}\"\n",
" subtask_progress = 100 * substep // n_substeps\n",
"\n",
" time.sleep(0.001) # replace this by actual calculation\n",
"\n",
" # report progress, optionally adding extra message\n",
" message='some message'\n",
" \n",
" pr.report_progress(stage=stage, progress=progress, substage=substage, subprogress=subtask_progress, message=message)"
]
}
],
"metadata": {
"kernel_info": {
"name": "python2"
},
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.18"
},
"nteract": {
"version": "0.15.0"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
2 changes: 2 additions & 0 deletions doc/source/user_guide/tutorial_main.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ The following tutorial covers a large fraction of the code features. The example

Upload a product to the Product Gallery <UploadToGallery.ipynb>

Progress reporting for long running tasks <ProgressReporter.ipynb>

Examples of workflows <https://github.com/cdcihub/oda_api_benchmark>


Expand Down
47 changes: 47 additions & 0 deletions oda_api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import requests
import ast
import json
import re

try:
# compatibility in some remaining environments
Expand Down Expand Up @@ -1339,3 +1340,49 @@
p.meta_data = p.meta

return d

class ProgressReporter(object):
"""
The class allows to report task progress to end user
"""
def __init__(self):
self._callback = None
callback_file = ".oda_api_callback" # perhaps it would be better to define this constant in a common lib
volodymyrss marked this conversation as resolved.
Show resolved Hide resolved
if not os.path.isfile(callback_file):
return
with open(callback_file, 'r') as file:
self._callback = file.read().strip()

@property
def enabled(self):
return self._callback is not None

def report_progress(self, stage: str=None, progress: int=50, substage: str=None, subprogress: int=None, message:str=None):
"""
Report progress via callback URL
:param stage: current stage description string
:param progress: current stage progress in %
:param substage: current substage description string
:param subprogress: current substage progress in %
:param message: message to pass
"""
callback_payload = dict(stage=stage, progress=progress, substage=substage, subprogress=subprogress, message=message)
callback_payload = {k: v for k, v in callback_payload.items() if v is not None}
callback_payload['action'] = 'progress'
if not self.enabled:
logger.info('no callback registered, skipping')
return

logger.info('will perform callback: %s', self._callback)

if re.match('^file://', self._callback):
with open(self._callback.replace('file://', ''), "w") as f:
dsavchenko marked this conversation as resolved.
Show resolved Hide resolved
json.dump(callback_payload, f)
logger.info('stored callback in a file %s', self._callback)

elif re.match('^https?://', self._callback):
r = requests.get(self._callback, params=callback_payload)
volodymyrss marked this conversation as resolved.
Show resolved Hide resolved
logger.info('callback %s returns %s : %s', self._callback, r, r.text)

Check warning on line 1385 in oda_api/api.py

View check run for this annotation

Codecov / codecov/patch

oda_api/api.py#L1383-L1385

Added lines #L1383 - L1385 were not covered by tests

else:
raise NotImplementedError

Check warning on line 1388 in oda_api/api.py

View check run for this annotation

Codecov / codecov/patch

oda_api/api.py#L1388

Added line #L1388 was not covered by tests
43 changes: 43 additions & 0 deletions tests/test_progress_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from oda_api.api import ProgressReporter
import os
import json

callback_file = ".oda_api_callback"
request_params = dict(stage='simulation', progress=50, substage='spectra', subprogress=30, message='some message')

def test_progress_reporter_disabled():
if os.path.isfile(callback_file):
os.remove(callback_file)

Check warning on line 10 in tests/test_progress_report.py

View check run for this annotation

Codecov / codecov/patch

tests/test_progress_report.py#L10

Added line #L10 was not covered by tests
# if callback is not available
pr = ProgressReporter()
assert not pr.enabled
# the call below should not produce exception
try:
pr.report_progress(**request_params)
except:
assert False, 'report_progress raises exception in case of disabled ProgressReporter'

Check warning on line 18 in tests/test_progress_report.py

View check run for this annotation

Codecov / codecov/patch

tests/test_progress_report.py#L17-L18

Added lines #L17 - L18 were not covered by tests

def test_progress_reporter_enabled():
# if callback is available
try:
dump_file = 'callback'
with open(callback_file, 'w') as file:
print(f'file://{os.getcwd()}/{dump_file}', file=file)

pr = ProgressReporter()
assert pr.enabled

pr.report_progress(**request_params)

# verify that params passed to report_progress were saved to dump_file
with open(dump_file) as json_file:
saved_params = json.load(json_file)
# append extra param befor check
request_params['action'] = 'progress' # this key is added by report_progress
assert saved_params == request_params
finally:
if os.path.isfile(callback_file):
os.remove(callback_file)
if os.path.isfile(dump_file):
os.remove(dump_file)

Loading