diff --git a/app/views/views.py b/app/views/views.py index 60b11027..b2756dcd 100644 --- a/app/views/views.py +++ b/app/views/views.py @@ -2,7 +2,7 @@ import logging from celery.exceptions import TimeoutError -from celery.result import AsyncResult +# from celery.result import AsyncResult from celery.states import FAILURE, PENDING, SUCCESS from django.contrib.auth.decorators import login_required from django.core import serializers @@ -26,6 +26,7 @@ from app.worker.tasks import receiptor from app.worker.tasks.exporter import exporter from app.worker.tasks.importers import historical_data_importer +from reboot.celery import app logger = logging.getLogger(__name__) @@ -121,8 +122,9 @@ def poll_state(request: HttpRequest): request=request, err_msg="The task_id query parameter of the request was omitted.") - task = AsyncResult(task_id) + task = app.AsyncResult(task_id) res = JsonResponse(_poll_state(PENDING, 0, 200)) + print(f"!!! task id={task_id},state={task.state},successful={task.successful()},ready={task.ready()},failed={task.failed()}") if task.state == FAILURE or task.failed(): res = JsonResponse(_poll_state(FAILURE, 0, 400)) elif task.state == PROGRESS: @@ -146,8 +148,9 @@ def download_file(request: HttpRequest): while (attempts < ATTEMPT_LIMIT): try: attempts += 1 - task = AsyncResult(task_id) - result = task.get(timeout=0.5 * attempts) + task = app.AsyncResult(task_id) + print(f"!!! task id={task_id},state={task.state},successful={task.successful()},ready={task.ready()},failed={task.failed()}") + result = task.get(timeout=1.0 * attempts) print(f"{task} {task_name} success #{attempts}: {result}") break except TimeoutError: diff --git a/app/worker/tasks/__init__.py b/app/worker/tasks/__init__.py index fb4770f6..4bdf8a12 100644 --- a/app/worker/tasks/__init__.py +++ b/app/worker/tasks/__init__.py @@ -1,13 +1,14 @@ ''' Module for tasks to be sent on task queue ''' -from celery import task - from app.worker.app_celery import AppTask +# from celery import task +from reboot.celery import app + from .create_receipt import Receiptor -@task(bind=True, base=AppTask) +@app.task(bind=True, base=AppTask) def receiptor(self, queryset, total_count): receiptor = Receiptor(queryset, total_count) return receiptor() diff --git a/app/worker/tasks/exporter.py b/app/worker/tasks/exporter.py index 9419aec7..28a39714 100644 --- a/app/worker/tasks/exporter.py +++ b/app/worker/tasks/exporter.py @@ -1,6 +1,6 @@ import csv -from celery import task +# from celery import task from celery.utils.log import get_task_logger from django.core import serializers from django.db.models.query import QuerySet @@ -8,9 +8,10 @@ from app.constants.field_names import CURRENT_FIELDS from app.worker.app_celery import AppTask, update_percent +from reboot.celery import app -@task(bind=True, base=AppTask) +@app.task(bind=True, base=AppTask) def exporter(self, file_name, qs: QuerySet = None, total_count: int = 0): rows = serializers.deserialize('json', qs) csv_exporter = CsvExporter(file_name, rows, total_count) diff --git a/app/worker/tasks/importers/__init__.py b/app/worker/tasks/importers/__init__.py index cf650561..dc6b7dad 100644 --- a/app/worker/tasks/importers/__init__.py +++ b/app/worker/tasks/importers/__init__.py @@ -1,13 +1,15 @@ """ Module for csv file importers to be sent to queue """ -from celery import task +# from celery import task from app.worker.app_celery import AppTask +from reboot.celery import app + from .historical_data_importer import HistoricalDataImporter -@task(bind=True, base=AppTask) +@app.task(bind=True, base=AppTask) def historical_data_importer(self, csvpath): importer = HistoricalDataImporter(csvpath) importer() diff --git a/reboot/celeryconfig.py b/reboot/celeryconfig.py index 290d77b9..1639788a 100644 --- a/reboot/celeryconfig.py +++ b/reboot/celeryconfig.py @@ -13,6 +13,8 @@ result_backend = config("REDIS_URL") task_serializer = 'pickle' result_serializer = 'pickle' +task_track_started = True +task_ignore_result = False # Use PROD settings if valid CLOUDAMQP_URl, else dev if config('CLOUDAMQP_URL', default=False):