Skip to content
This repository has been archived by the owner on Aug 10, 2024. It is now read-only.

feat: improve celery stability #349

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions app/templates/app/PollState.html
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
if (result.process_percent >= 0) {
percent = Math.max(percent, result.process_percent);
$(".bar").html(percent + "%").css("width", percent + "%");
if (result.process_percent === 100) {
if (result === "SUCCESS") {
$("#user-count").text("complete");
} else {
$("#user-count").text("still processing");
Expand All @@ -91,7 +91,7 @@
return result;
}
}).always(function(result) {
if (result === "SUCCESS" || result.process_percent === 100) {
if (result === "SUCCESS") {
clearInterval(refreshIntervalId);
$(".bar").html("100%").css("width", "100%");
$("#user-count").text("complete");
Expand Down
37 changes: 28 additions & 9 deletions app/views/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
import logging

from celery.exceptions import TimeoutError
from celery.result import AsyncResult
from celery.states import FAILURE, PENDING, SUCCESS
# from celery.result import AsyncResult
from celery.states import FAILURE, PENDING, STARTED, SUCCESS
from django.contrib.auth.decorators import login_required
from django.core import serializers
from django.http import (
Expand All @@ -22,13 +22,15 @@

from app.constants.str import PERMISSION_DENIED
from app.models import Item
from app.worker.app_celery import ATTEMPT_LIMIT, PROGRESS
from app.worker.app_celery import ATTEMPT_LIMIT
from app.worker.tasks import receiptor
from app.worker.tasks.exporter import exporter
from app.worker.tasks.importers import historical_data_importer
from reboot.celery import app

logger = logging.getLogger(__name__)

tasks_cache = {}
results_cache = {}

@require_GET
@login_required(login_url="/login")
Expand Down Expand Up @@ -121,14 +123,21 @@ def poll_state(request: HttpRequest):
request=request,
err_msg="The task_id query parameter of the request was omitted.")

task = AsyncResult(task_id)
task = app.AsyncResult(task_id)
res = JsonResponse(_poll_state(PENDING, 0, 200))
print(f"!!! task id={task_id},state={task.state},successful={task.successful()},ready={task.ready()},failed={task.failed()}")
if task.state == FAILURE or task.failed():
res = JsonResponse(_poll_state(FAILURE, 0, 400))
elif task.state == PROGRESS:
elif task.state == STARTED:
res = JsonResponse(task.result) if isinstance(
task.result, dict) else HttpResponse(task.result)
elif task.state == SUCCESS or task.successful() or task.ready():
tasks_cache[task_id] = task
try:
results_cache[task_id] = task.get(timeout=5)
print("!!! saved", results_cache[task_id], task.result)
except Exception as e:
print(f"!!! error", e)
res = HttpResponse(SUCCESS)
return res

Expand All @@ -142,13 +151,22 @@ def download_file(request: HttpRequest):
task_id = request.GET.get("task_id")
task_name = request.GET.get("task_name", "task")
attempts = 0
# CloudAMQP free tier is unstable and must be circuit breakered
# if task_id in results_cache:
# return results_cache[task_id]
while (attempts < ATTEMPT_LIMIT):
try:
attempts += 1
task = AsyncResult(task_id)
result = task.get(timeout=0.5 * attempts)
# if tasks_cache[task_id]:
# task = tasks_cache[task_id]
# del tasks_cache[task_id]
# else:
# task = app.AsyncResult(task_id)
task = tasks_cache[task_id] if task_id in tasks_cache else app.AsyncResult(task_id)
print(f"!!! task id={task_id},state={task.state},successful={task.successful()},ready={task.ready()},failed={task.failed()}")
result = task.get(timeout=1.0 * attempts)
print(f"{task} {task_name} success #{attempts}: {result}")
if task_id in tasks_cache:
del tasks_cache[task_id]
break
except TimeoutError:
print(f"{task} {task_name} failed #{attempts}")
Expand All @@ -158,6 +176,7 @@ def download_file(request: HttpRequest):
err_msg="Download exceeded max attempts")
return result
except Exception as e:
print(f"!!! error", e)
return _error(request=request, err_msg=f"Failed to download file: {e}")


Expand Down
19 changes: 10 additions & 9 deletions app/worker/app_celery.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,33 @@
import traceback
import celery
from celery.states import SUCCESS, FAILURE
from http import HTTPStatus

PROGRESS = 'PROGRESS'
from celery.states import FAILURE, STARTED, SUCCESS

from reboot.celery import app

ATTEMPT_LIMIT = 5


def update_state(state, percent, http_status):
print('{0!r} state: {1!r}, progress: {2!r}'.format(
celery.current_task.request.id, state, percent))
celery.current_task.update_state(state=state, meta={
app.current_task.request.id, state, percent))
app.current_task.update_state(state=state, meta={
'state': state,
'process_percent': percent,
'status': http_status,
})


def update_percent(percent):
update_state(PROGRESS, percent, HTTPStatus.ACCEPTED)
update_state(STARTED, percent, HTTPStatus.ACCEPTED)


def set_success():
update_state(SUCCESS, 100, HTTPStatus.OK)


def set_failure(e):
celery.current_task.update_state(
app.current_task.update_state(
state=FAILURE,
meta={
'exc_type': type(e).__name__,
Expand All @@ -38,17 +38,18 @@ def set_failure(e):
})


class AppTask(celery.Task):
class AppTask(app.Task):
max_retries = 0
# default_retry_delay = 10

def on_success(self, retval, task_id, args, kwargs):
set_success()
print('{0!r} success: {1!r}'.format(task_id, retval))
super().on_success(retval, task_id, args, kwargs)

def on_failure(self, exc, task_id, args, kwargs, einfo):
print('{0!r} failed: {1!r}'.format(task_id, exc))
set_failure(exc)
print('{0!r} failed: {1!r}'.format(task_id, exc))
super().on_failure(exc, task_id, args, kwargs, einfo)

def on_retry(self, exc, task_id, args, kwargs, einfo):
Expand Down
7 changes: 4 additions & 3 deletions app/worker/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
'''
Module for tasks to be sent on task queue
'''
from celery import task

from app.worker.app_celery import AppTask
# from celery import task
from reboot.celery import app

from .create_receipt import Receiptor


@task(bind=True, base=AppTask)
@app.task(bind=True, base=AppTask)
def receiptor(self, queryset, total_count):
receiptor = Receiptor(queryset, total_count)
return receiptor()
5 changes: 3 additions & 2 deletions app/worker/tasks/exporter.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import csv

from celery import task
# from celery import task
from celery.utils.log import get_task_logger
from django.core import serializers
from django.db.models.query import QuerySet
from django.http import HttpResponse

from app.constants.field_names import CURRENT_FIELDS
from app.worker.app_celery import AppTask, update_percent
from reboot.celery import app


@task(bind=True, base=AppTask)
@app.task(bind=True, base=AppTask)
def exporter(self, file_name, qs: QuerySet = None, total_count: int = 0):
rows = serializers.deserialize('json', qs)
csv_exporter = CsvExporter(file_name, rows, total_count)
Expand Down
6 changes: 4 additions & 2 deletions app/worker/tasks/importers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
"""
Module for csv file importers to be sent to queue
"""
from celery import task
# from celery import task

from app.worker.app_celery import AppTask
from reboot.celery import app

from .historical_data_importer import HistoricalDataImporter


@task(bind=True, base=AppTask)
@app.task(bind=True, base=AppTask)
def historical_data_importer(self, csvpath):
importer = HistoricalDataImporter(csvpath)
importer()
4 changes: 3 additions & 1 deletion reboot/celeryconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
broker_pool_limit = 1
event_queue_expires = 60
worker_prefetch_multiplier = 1
worker_concurrency = 10
worker_concurrency = 1
accept_content = ['json', 'pickle']
result_backend = config("REDIS_URL")
task_serializer = 'pickle'
result_serializer = 'pickle'
task_track_started = True
task_ignore_result = False

# Use PROD settings if valid CLOUDAMQP_URl, else dev
if config('CLOUDAMQP_URL', default=False):
Expand Down
Loading