diff --git a/docs/openapi.json b/docs/openapi.json index 3e3d7cf0..8dd84829 100644 --- a/docs/openapi.json +++ b/docs/openapi.json @@ -1830,12 +1830,13 @@ }, "status": { "title": "Status", - "description": "\"success\" or \"failure\"", + "description": "\"success\", \"failure\", or \"notification_failed\"", "type": "string", "enum": [ 1, 2, - 3 + 3, + 4 ] }, "detail": { diff --git a/src/capabilities/__init__.py b/src/capabilities/__init__.py index 9c2f4806..5197dd4e 100644 --- a/src/capabilities/__init__.py +++ b/src/capabilities/__init__.py @@ -43,5 +43,6 @@ def get_capabilities(): if location: capabilities["sensor"]["location"] = location else: - del capabilities["sensor"]["location"] + if "location" in capabilities['sensor']: + del capabilities["sensor"]["location"] return capabilities diff --git a/src/requirements-dev.txt b/src/requirements-dev.txt index b7eefb8a..a29b909d 100644 --- a/src/requirements-dev.txt +++ b/src/requirements-dev.txt @@ -8,7 +8,7 @@ asgiref==3.5.0 # via # -r requirements.txt # django -aspy.refactor-imports==2.2.1 +aspy-refactor-imports==2.2.1 # via seed-isort-config attrs==21.4.0 # via @@ -25,7 +25,7 @@ black==21.12b0 cached-property==1.5.2 # via # -r requirements.txt - # aspy.refactor-imports + # aspy-refactor-imports # docker-compose certifi==2021.10.8 # via @@ -130,7 +130,7 @@ idna==3.3 # via # -r requirements.txt # requests -importlib-metadata==4.10.1 +importlib-metadata==4.11.2 # via # -r requirements.txt # click @@ -273,30 +273,27 @@ requests==2.27.1 # coreapi # docker # docker-compose - # requests-futures # requests-mock # requests-oauthlib -requests-futures==0.9.9 - # via -r requirements.txt requests-mock==1.6.0 # via -r requirements.txt requests-oauthlib==1.3.1 # via -r requirements.txt -ruamel.yaml==0.16.13 +ruamel-yaml==0.16.13 # via # -r requirements.txt # drf-yasg -ruamel.yaml.clib==0.2.6 +ruamel-yaml-clib==0.2.6 # via # -r requirements.txt - # ruamel.yaml + # ruamel-yaml scipy==1.7.3 # via -r requirements.txt -scos-actions @ git+https://github.com/NTIA/scos-actions@bug_fix +scos_actions @ git+https://github.com/NTIA/scos-actions@bug_fix # via # -r requirements.txt # scos-usrp -scos-usrp @ git+https://github.com/NTIA/scos-usrp@fix_dependencies +scos_usrp @ git+https://github.com/NTIA/scos-usrp@fix_dependencies # via -r requirements.txt seed-isort-config==1.9.1 # via -r requirements-dev.in @@ -337,7 +334,7 @@ tox==3.12.1 # via -r requirements-dev.in typed-ast==1.5.2 # via black -typing-extensions==4.0.1 +typing-extensions==4.1.1 # via # -r requirements.txt # asgiref diff --git a/src/requirements.in b/src/requirements.in index 6df1f656..53c40b6b 100644 --- a/src/requirements.in +++ b/src/requirements.in @@ -14,7 +14,6 @@ oauthlib~=3.1.0 psycopg2-binary==2.8.5 pyjwt~=1.7.1 raven==6.10.0 -requests-futures==0.9.9 requests-mock==1.6.0 requests_oauthlib~=1.3.0 #scos_actions @ git+https://github.com/NTIA/scos-actions@bug_fix#egg=scos_actions diff --git a/src/requirements.txt b/src/requirements.txt index ea93af3b..80a48767 100644 --- a/src/requirements.txt +++ b/src/requirements.txt @@ -69,7 +69,7 @@ gunicorn==19.9.0 # via -r requirements.in idna==3.3 # via requests -importlib-metadata==4.10.1 +importlib-metadata==4.11.2 # via jsonschema inflection==0.5.1 # via drf-yasg @@ -129,28 +129,25 @@ requests==2.27.1 # coreapi # docker # docker-compose - # requests-futures # requests-mock # requests-oauthlib -requests-futures==0.9.9 - # via -r requirements.in requests-mock==1.6.0 # via -r requirements.in requests-oauthlib==1.3.1 # via -r requirements.in -ruamel.yaml==0.16.13 +ruamel-yaml==0.16.13 # via # drf-yasg # scos-actions -ruamel.yaml.clib==0.2.6 +ruamel-yaml-clib==0.2.6 # via - # ruamel.yaml + # ruamel-yaml # scos-actions scipy==1.7.3 # via scos-actions -scos-actions @ git+https://github.com/NTIA/scos-actions@bug_fix +scos_actions @ git+https://github.com/NTIA/scos-actions@bug_fix # via scos-usrp -scos-usrp @ git+https://github.com/NTIA/scos-usrp@fix_dependencies +scos_usrp @ git+https://github.com/NTIA/scos-usrp@fix_dependencies # via -r requirements.in sigmf @ git+https://github.com/NTIA/SigMF.git@multi-recording-archive # via scos-actions @@ -171,7 +168,7 @@ sqlparse==0.4.2 # django-debug-toolbar texttable==1.6.4 # via docker-compose -typing-extensions==4.0.1 +typing-extensions==4.1.1 # via # asgiref # importlib-metadata diff --git a/src/schedule/models/schedule_entry.py b/src/schedule/models/schedule_entry.py index 36dc96eb..f862d666 100644 --- a/src/schedule/models/schedule_entry.py +++ b/src/schedule/models/schedule_entry.py @@ -1,7 +1,7 @@ import sys from itertools import count -from django.core.validators import MaxValueValidator, MinValueValidator +from django.core.validators import MaxValueValidator, MinValueValidator, ValidationError from django.db import models import actions @@ -69,7 +69,6 @@ class ScheduleEntry(models.Model): help_text="[Required] The unique identifier used in URLs and filenames", ) action = models.CharField( - choices=actions.CHOICES, max_length=actions.MAX_LENGTH, help_text="[Required] The name of the action to be scheduled", ) @@ -158,9 +157,11 @@ def __init__(self, *args, **kwargs): if self.next_task_time is None: self.next_task_time = self.start - # used by .save to detect whether to reset .next_task_time + # used by .save to detect whether to reset .next_task_times self.__start = self.start self.__interval = self.interval + if self.action not in actions.registered_actions: + raise ValidationError(self.action + ' does not exist') def update(self, *args, **kwargs): super(ScheduleEntry, self).update(*args, **kwargs) diff --git a/src/scheduler/scheduler.py b/src/scheduler/scheduler.py index dfe2db4c..72431ff5 100644 --- a/src/scheduler/scheduler.py +++ b/src/scheduler/scheduler.py @@ -5,10 +5,8 @@ import threading from contextlib import contextmanager from pathlib import Path - +import requests from django.utils import timezone -from requests_futures.sessions import FuturesSession - from authentication import oauth from capabilities import get_capabilities from schedule.models import ScheduleEntry @@ -21,7 +19,7 @@ from . import utils logger = logging.getLogger(__name__) -requests_futures_session = FuturesSession() + class Scheduler(threading.Thread): @@ -141,7 +139,6 @@ def _call_task_action(self): entry_name = self.task.schedule_entry_name task_id = self.task.task_id from schedule.serializers import ScheduleEntrySerializer - from tasks.serializers import TaskResultSerializer schedule_entry = ScheduleEntry.objects.get(name=entry_name) @@ -169,17 +166,17 @@ def _call_task_action(self): return status, detail[:MAX_DETAIL_LEN] def _finalize_task_result(self, started, finished, status, detail): + tr = self.task_result tr.started = started tr.finished = finished tr.duration = finished - started tr.status = status tr.detail = detail - tr.save() if self.entry.callback_url: try: - logger.debug("Trying callback to URL: " + self.entry.callback_url) + logger.info("Trying callback to URL: " + self.entry.callback_url) context = {"request": self.entry.request} result_json = TaskResultSerializer(tr, context=context).data verify_ssl = settings.CALLBACK_SSL_VERIFICATION @@ -194,29 +191,40 @@ def _finalize_task_result(self, started, finished, status, detail): self.entry.callback_url, data=json.dumps(result_json), headers=headers, - verify=verify_ssl, + verify=verify_ssl ) - self._callback_response_handler(client, response) + self._callback_response_handler(response, tr) else: + logger.info('Posting with token') token = self.entry.owner.auth_token headers = {"Authorization": "Token " + str(token)} - requests_futures_session.post( + response = requests.post( self.entry.callback_url, json=result_json, - background_callback=self._callback_response_handler, headers=headers, - verify=verify_ssl, + verify=verify_ssl ) + logger.info('posted') + self._callback_response_handler(response, tr) except Exception as err: logger.error(str(err)) + tr.status = 'notification_failed' + tr.save() + else: + tr.save() @staticmethod - def _callback_response_handler(sess, resp): + def _callback_response_handler(resp, task_result): if resp.ok: logger.info("POSTed to {}".format(resp.url)) else: msg = "Failed to POST to {}: {}" logger.warning(msg.format(resp.url, resp.reason)) + task_result.status = 'notification_failed' + + task_result.save() + + def _queue_pending_tasks(self, schedule_snapshot): pending_queue = TaskQueue() diff --git a/src/scheduler/tests/test_scheduler.py b/src/scheduler/tests/test_scheduler.py index d312a0e5..0b13ff79 100644 --- a/src/scheduler/tests/test_scheduler.py +++ b/src/scheduler/tests/test_scheduler.py @@ -4,9 +4,11 @@ import pytest import requests_mock +import requests from django import conf from scheduler.scheduler import Scheduler, minimum_duration +from tasks.models import TaskResult from .utils import ( BAD_ACTION_STR, @@ -319,16 +321,16 @@ def verify_request(request_history, status="success", detail=None): oauth_history = request_history[0] assert oauth_history.verify == conf.settings.PATH_TO_VERIFY_CERT assert ( - oauth_history.text - == f"grant_type=password&username={conf.settings.USER_NAME}&password={conf.settings.PASSWORD}" + oauth_history.text + == f"grant_type=password&username={conf.settings.USER_NAME}&password={conf.settings.PASSWORD}" ) assert oauth_history.cert == conf.settings.PATH_TO_CLIENT_CERT auth_header = oauth_history.headers.get("Authorization") auth_header = auth_header.replace("Basic ", "") auth_header_decoded = base64.b64decode(auth_header).decode("utf-8") assert ( - auth_header_decoded - == f"{conf.settings.CLIENT_ID}:{conf.settings.CLIENT_SECRET}" + auth_header_decoded + == f"{conf.settings.CLIENT_ID}:{conf.settings.CLIENT_SECRET}" ) request_json = request_history[1].json() else: @@ -428,6 +430,35 @@ def cb_request_handler(sess, resp): verify_request(request_history) +@pytest.mark.django_db +def test_notification_failed_status_unknown_host(test_scheduler): + entry = create_entry("t", 1, 1, 100, 5, "logger", 'https://badmgr.its.bldrdoc.gov') + entry.save() + entry.refresh_from_db() + print('entry = ' + entry.name) + s = test_scheduler + advance_testclock(s.timefn, 1) + s.run(blocking=False) # queue first 10 tasks + result = TaskResult.objects.first() + assert result.status == 'notification_failed' + + +@pytest.mark.django_db +def test_notification_failed_status_request_ok_false(test_scheduler): + entry = create_entry("t", 1, 1, 100, 5, "logger") + entry.save() + entry.refresh_from_db() + print('entry = ' + entry.name) + s = test_scheduler + advance_testclock(s.timefn, 1) + s.run(blocking=False) # queue first 10 tasks + tr = TaskResult.objects.first() + response = requests.Response() + response.status_code = 400 + s._callback_response_handler(response, tr) + assert tr.status == 'notification_failed' + + @pytest.mark.django_db def test_starvation(test_scheduler): """A recurring high-pri task should not 'starve' a low-pri task.""" diff --git a/src/tasks/migrations/0003_auto_20220302_1504.py b/src/tasks/migrations/0003_auto_20220302_1504.py new file mode 100644 index 00000000..70517c6a --- /dev/null +++ b/src/tasks/migrations/0003_auto_20220302_1504.py @@ -0,0 +1,18 @@ +# Generated by Django 3.1.14 on 2022-03-02 15:04 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('tasks', '0002_auto_20200319_1420'), + ] + + operations = [ + migrations.AlterField( + model_name='taskresult', + name='status', + field=models.CharField(choices=[(1, 'success'), (2, 'failure'), (3, 'in-progress')], default='in-progress', help_text='"success", "failure", or "notification_failed"', max_length=19), + ), + ] diff --git a/src/tasks/migrations/0004_auto_20220302_2250.py b/src/tasks/migrations/0004_auto_20220302_2250.py new file mode 100644 index 00000000..66adee8f --- /dev/null +++ b/src/tasks/migrations/0004_auto_20220302_2250.py @@ -0,0 +1,18 @@ +# Generated by Django 3.1.14 on 2022-03-02 22:50 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('tasks', '0003_auto_20220302_1504'), + ] + + operations = [ + migrations.AlterField( + model_name='taskresult', + name='status', + field=models.CharField(choices=[(1, 'success'), (2, 'failure'), (3, 'in-progress'), (4, 'notification_failed')], default='in-progress', help_text='"success", "failure", or "notification_failed"', max_length=19), + ), + ] diff --git a/src/tasks/models/task_result.py b/src/tasks/models/task_result.py index e9ed49e2..e543ab88 100644 --- a/src/tasks/models/task_result.py +++ b/src/tasks/models/task_result.py @@ -4,7 +4,6 @@ from django.db import models from django.utils import timezone - from schedule.models import ScheduleEntry from sensor.settings import MAX_DISK_USAGE from tasks.consts import MAX_DETAIL_LEN @@ -18,10 +17,12 @@ class TaskResult(models.Model): SUCCESS = 1 FAILURE = 2 IN_PROGRESS = 3 + NOTIFICATION_FAILED = 4 RESULT_CHOICES = ( (SUCCESS, "success"), (FAILURE, "failure"), (IN_PROGRESS, "in-progress"), + (NOTIFICATION_FAILED, "notification_failed") ) schedule_entry = models.ForeignKey( @@ -44,8 +45,8 @@ class TaskResult(models.Model): ) status = models.CharField( default="in-progress", - max_length=11, - help_text='"success" or "failure"', + max_length=19, + help_text='"success", "failure", or "notification_failed"', choices=RESULT_CHOICES, ) detail = models.CharField( @@ -64,9 +65,9 @@ def __init__(self, *args, **kwargs): def save(self): """Limit disk usage to MAX_DISK_USAGE by removing oldest result.""" - all_results = TaskResult.objects.all().order_by("id") filter = {"schedule_entry__name": self.schedule_entry.name} - same_entry_results = all_results.filter(**filter) + + same_entry_results = TaskResult.objects.filter(**filter).order_by("id") if ( same_entry_results.count() > 0 and same_entry_results[0].id != self.id ): # prevent from deleting this task result's acquisition