Skip to content
This repository has been archived by the owner on Aug 10, 2024. It is now read-only.

Commit

Permalink
feat: improve celery stability
Browse files Browse the repository at this point in the history
  • Loading branch information
bwdmonkey committed Jan 27, 2024
1 parent d2abb22 commit d75fa56
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 11 deletions.
11 changes: 7 additions & 4 deletions app/views/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging

from celery.exceptions import TimeoutError
from celery.result import AsyncResult
# from celery.result import AsyncResult
from celery.states import FAILURE, PENDING, SUCCESS
from django.contrib.auth.decorators import login_required
from django.core import serializers
Expand All @@ -26,6 +26,7 @@
from app.worker.tasks import receiptor
from app.worker.tasks.exporter import exporter
from app.worker.tasks.importers import historical_data_importer
from reboot.celery import app

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -121,8 +122,9 @@ def poll_state(request: HttpRequest):
request=request,
err_msg="The task_id query parameter of the request was omitted.")

task = AsyncResult(task_id)
task = app.AsyncResult(task_id)
res = JsonResponse(_poll_state(PENDING, 0, 200))
print(f"!!! task id={task_id},state={task.state},successful={task.successful()},ready={task.ready()},failed={task.failed()}")
if task.state == FAILURE or task.failed():
res = JsonResponse(_poll_state(FAILURE, 0, 400))
elif task.state == PROGRESS:
Expand All @@ -146,8 +148,9 @@ def download_file(request: HttpRequest):
while (attempts < ATTEMPT_LIMIT):
try:
attempts += 1
task = AsyncResult(task_id)
result = task.get(timeout=0.5 * attempts)
task = app.AsyncResult(task_id)
print(f"!!! task id={task_id},state={task.state},successful={task.successful()},ready={task.ready()},failed={task.failed()}")
result = task.get(timeout=1.0 * attempts)
print(f"{task} {task_name} success #{attempts}: {result}")
break
except TimeoutError:
Expand Down
7 changes: 4 additions & 3 deletions app/worker/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
'''
Module for tasks to be sent on task queue
'''
from celery import task

from app.worker.app_celery import AppTask
# from celery import task
from reboot.celery import app

from .create_receipt import Receiptor


@task(bind=True, base=AppTask)
@app.task(bind=True, base=AppTask)
def receiptor(self, queryset, total_count):
receiptor = Receiptor(queryset, total_count)
return receiptor()
5 changes: 3 additions & 2 deletions app/worker/tasks/exporter.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import csv

from celery import task
# from celery import task
from celery.utils.log import get_task_logger
from django.core import serializers
from django.db.models.query import QuerySet
from django.http import HttpResponse

from app.constants.field_names import CURRENT_FIELDS
from app.worker.app_celery import AppTask, update_percent
from reboot.celery import app


@task(bind=True, base=AppTask)
@app.task(bind=True, base=AppTask)
def exporter(self, file_name, qs: QuerySet = None, total_count: int = 0):
rows = serializers.deserialize('json', qs)
csv_exporter = CsvExporter(file_name, rows, total_count)
Expand Down
6 changes: 4 additions & 2 deletions app/worker/tasks/importers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
"""
Module for csv file importers to be sent to queue
"""
from celery import task
# from celery import task

from app.worker.app_celery import AppTask
from reboot.celery import app

from .historical_data_importer import HistoricalDataImporter


@task(bind=True, base=AppTask)
@app.task(bind=True, base=AppTask)
def historical_data_importer(self, csvpath):
importer = HistoricalDataImporter(csvpath)
importer()
2 changes: 2 additions & 0 deletions reboot/celeryconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
result_backend = config("REDIS_URL")
task_serializer = 'pickle'
result_serializer = 'pickle'
task_track_started = True
task_ignore_result = False

# Use PROD settings if valid CLOUDAMQP_URl, else dev
if config('CLOUDAMQP_URL', default=False):
Expand Down

0 comments on commit d75fa56

Please sign in to comment.