Skip to content

Commit

Permalink
refactor celery task names to constants (#3296)
Browse files Browse the repository at this point in the history
  • Loading branch information
rkuo-danswer authored Dec 3, 2024
1 parent 46315cd commit 6c2269e
Show file tree
Hide file tree
Showing 19 changed files with 84 additions and 37 deletions.
17 changes: 9 additions & 8 deletions backend/danswer/background/celery/tasks/beat_schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,54 +2,55 @@
from typing import Any

from danswer.configs.constants import DanswerCeleryPriority
from danswer.configs.constants import DanswerCeleryTask


tasks_to_schedule = [
{
"name": "check-for-vespa-sync",
"task": "check_for_vespa_sync_task",
"task": DanswerCeleryTask.CHECK_FOR_VESPA_SYNC_TASK,
"schedule": timedelta(seconds=20),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
{
"name": "check-for-connector-deletion",
"task": "check_for_connector_deletion_task",
"task": DanswerCeleryTask.CHECK_FOR_CONNECTOR_DELETION,
"schedule": timedelta(seconds=20),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
{
"name": "check-for-indexing",
"task": "check_for_indexing",
"task": DanswerCeleryTask.CHECK_FOR_INDEXING,
"schedule": timedelta(seconds=15),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
{
"name": "check-for-prune",
"task": "check_for_pruning",
"task": DanswerCeleryTask.CHECK_FOR_PRUNING,
"schedule": timedelta(seconds=15),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
{
"name": "kombu-message-cleanup",
"task": "kombu_message_cleanup_task",
"task": DanswerCeleryTask.KOMBU_MESSAGE_CLEANUP_TASK,
"schedule": timedelta(seconds=3600),
"options": {"priority": DanswerCeleryPriority.LOWEST},
},
{
"name": "monitor-vespa-sync",
"task": "monitor_vespa_sync",
"task": DanswerCeleryTask.MONITOR_VESPA_SYNC,
"schedule": timedelta(seconds=5),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
{
"name": "check-for-doc-permissions-sync",
"task": "check_for_doc_permissions_sync",
"task": DanswerCeleryTask.CHECK_FOR_DOC_PERMISSIONS_SYNC,
"schedule": timedelta(seconds=30),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
{
"name": "check-for-external-group-sync",
"task": "check_for_external_group_sync",
"task": DanswerCeleryTask.CHECK_FOR_EXTERNAL_GROUP_SYNC,
"schedule": timedelta(seconds=20),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from danswer.background.celery.apps.app_base import task_logger
from danswer.configs.app_configs import JOB_TIMEOUT
from danswer.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
from danswer.configs.constants import DanswerCeleryTask
from danswer.configs.constants import DanswerRedisLocks
from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id
from danswer.db.connector_credential_pair import get_connector_credential_pairs
Expand All @@ -28,7 +29,7 @@ class TaskDependencyError(RuntimeError):


@shared_task(
name="check_for_connector_deletion_task",
name=DanswerCeleryTask.CHECK_FOR_CONNECTOR_DELETION,
soft_time_limit=JOB_TIMEOUT,
trail=False,
bind=True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from danswer.configs.constants import DANSWER_REDIS_FUNCTION_LOCK_PREFIX
from danswer.configs.constants import DanswerCeleryPriority
from danswer.configs.constants import DanswerCeleryQueues
from danswer.configs.constants import DanswerCeleryTask
from danswer.configs.constants import DanswerRedisLocks
from danswer.configs.constants import DocumentSource
from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id
Expand Down Expand Up @@ -82,7 +83,7 @@ def _is_external_doc_permissions_sync_due(cc_pair: ConnectorCredentialPair) -> b


@shared_task(
name="check_for_doc_permissions_sync",
name=DanswerCeleryTask.CHECK_FOR_DOC_PERMISSIONS_SYNC,
soft_time_limit=JOB_TIMEOUT,
bind=True,
)
Expand Down Expand Up @@ -164,7 +165,7 @@ def try_creating_permissions_sync_task(
custom_task_id = f"{redis_connector.permissions.generator_task_key}_{uuid4()}"

result = app.send_task(
"connector_permission_sync_generator_task",
DanswerCeleryTask.CONNECTOR_PERMISSION_SYNC_GENERATOR_TASK,
kwargs=dict(
cc_pair_id=cc_pair_id,
tenant_id=tenant_id,
Expand All @@ -191,7 +192,7 @@ def try_creating_permissions_sync_task(


@shared_task(
name="connector_permission_sync_generator_task",
name=DanswerCeleryTask.CONNECTOR_PERMISSION_SYNC_GENERATOR_TASK,
acks_late=False,
soft_time_limit=JOB_TIMEOUT,
track_started=True,
Expand Down Expand Up @@ -286,7 +287,7 @@ def connector_permission_sync_generator_task(


@shared_task(
name="update_external_document_permissions_task",
name=DanswerCeleryTask.UPDATE_EXTERNAL_DOCUMENT_PERMISSIONS_TASK,
soft_time_limit=LIGHT_SOFT_TIME_LIMIT,
time_limit=LIGHT_TIME_LIMIT,
max_retries=DOCUMENT_PERMISSIONS_UPDATE_MAX_RETRIES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from danswer.configs.constants import DANSWER_REDIS_FUNCTION_LOCK_PREFIX
from danswer.configs.constants import DanswerCeleryPriority
from danswer.configs.constants import DanswerCeleryQueues
from danswer.configs.constants import DanswerCeleryTask
from danswer.configs.constants import DanswerRedisLocks
from danswer.db.connector import mark_cc_pair_as_external_group_synced
from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id
Expand Down Expand Up @@ -85,7 +86,7 @@ def _is_external_group_sync_due(cc_pair: ConnectorCredentialPair) -> bool:


@shared_task(
name="check_for_external_group_sync",
name=DanswerCeleryTask.CHECK_FOR_EXTERNAL_GROUP_SYNC,
soft_time_limit=JOB_TIMEOUT,
bind=True,
)
Expand Down Expand Up @@ -161,7 +162,7 @@ def try_creating_external_group_sync_task(
custom_task_id = f"{redis_connector.external_group_sync.taskset_key}_{uuid4()}"

result = app.send_task(
"connector_external_group_sync_generator_task",
DanswerCeleryTask.CONNECTOR_EXTERNAL_GROUP_SYNC_GENERATOR_TASK,
kwargs=dict(
cc_pair_id=cc_pair_id,
tenant_id=tenant_id,
Expand Down Expand Up @@ -191,7 +192,7 @@ def try_creating_external_group_sync_task(


@shared_task(
name="connector_external_group_sync_generator_task",
name=DanswerCeleryTask.CONNECTOR_EXTERNAL_GROUP_SYNC_GENERATOR_TASK,
acks_late=False,
soft_time_limit=JOB_TIMEOUT,
track_started=True,
Expand Down
10 changes: 7 additions & 3 deletions backend/danswer/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from danswer.configs.constants import DANSWER_REDIS_FUNCTION_LOCK_PREFIX
from danswer.configs.constants import DanswerCeleryPriority
from danswer.configs.constants import DanswerCeleryQueues
from danswer.configs.constants import DanswerCeleryTask
from danswer.configs.constants import DanswerRedisLocks
from danswer.configs.constants import DocumentSource
from danswer.db.connector import mark_ccpair_with_indexing_trigger
Expand Down Expand Up @@ -156,7 +157,7 @@ def get_unfenced_index_attempt_ids(db_session: Session, r: redis.Redis) -> list[


@shared_task(
name="check_for_indexing",
name=DanswerCeleryTask.CHECK_FOR_INDEXING,
soft_time_limit=300,
bind=True,
)
Expand Down Expand Up @@ -486,7 +487,7 @@ def try_creating_indexing_task(
# when the task is sent, we have yet to finish setting up the fence
# therefore, the task must contain code that blocks until the fence is ready
result = celery_app.send_task(
"connector_indexing_proxy_task",
DanswerCeleryTask.CONNECTOR_INDEXING_PROXY_TASK,
kwargs=dict(
index_attempt_id=index_attempt_id,
cc_pair_id=cc_pair.id,
Expand Down Expand Up @@ -524,7 +525,10 @@ def try_creating_indexing_task(


@shared_task(
name="connector_indexing_proxy_task", bind=True, acks_late=False, track_started=True
name=DanswerCeleryTask.CONNECTOR_INDEXING_PROXY_TASK,
bind=True,
acks_late=False,
track_started=True,
)
def connector_indexing_proxy_task(
self: Task,
Expand Down
3 changes: 2 additions & 1 deletion backend/danswer/background/celery/tasks/periodic/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@

from danswer.background.celery.apps.app_base import task_logger
from danswer.configs.app_configs import JOB_TIMEOUT
from danswer.configs.constants import DanswerCeleryTask
from danswer.configs.constants import PostgresAdvisoryLocks
from danswer.db.engine import get_session_with_tenant


@shared_task(
name="kombu_message_cleanup_task",
name=DanswerCeleryTask.KOMBU_MESSAGE_CLEANUP_TASK,
soft_time_limit=JOB_TIMEOUT,
bind=True,
base=AbortableTask,
Expand Down
7 changes: 4 additions & 3 deletions backend/danswer/background/celery/tasks/pruning/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from danswer.configs.constants import DANSWER_REDIS_FUNCTION_LOCK_PREFIX
from danswer.configs.constants import DanswerCeleryPriority
from danswer.configs.constants import DanswerCeleryQueues
from danswer.configs.constants import DanswerCeleryTask
from danswer.configs.constants import DanswerRedisLocks
from danswer.connectors.factory import instantiate_connector
from danswer.connectors.models import InputType
Expand Down Expand Up @@ -75,7 +76,7 @@ def _is_pruning_due(cc_pair: ConnectorCredentialPair) -> bool:


@shared_task(
name="check_for_pruning",
name=DanswerCeleryTask.CHECK_FOR_PRUNING,
soft_time_limit=JOB_TIMEOUT,
bind=True,
)
Expand Down Expand Up @@ -184,7 +185,7 @@ def try_creating_prune_generator_task(
custom_task_id = f"{redis_connector.prune.generator_task_key}_{uuid4()}"

celery_app.send_task(
"connector_pruning_generator_task",
DanswerCeleryTask.CONNECTOR_PRUNING_GENERATOR_TASK,
kwargs=dict(
cc_pair_id=cc_pair.id,
connector_id=cc_pair.connector_id,
Expand All @@ -209,7 +210,7 @@ def try_creating_prune_generator_task(


@shared_task(
name="connector_pruning_generator_task",
name=DanswerCeleryTask.CONNECTOR_PRUNING_GENERATOR_TASK,
acks_late=False,
soft_time_limit=JOB_TIMEOUT,
track_started=True,
Expand Down
3 changes: 2 additions & 1 deletion backend/danswer/background/celery/tasks/shared/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from danswer.access.access import get_access_for_document
from danswer.background.celery.apps.app_base import task_logger
from danswer.background.celery.tasks.shared.RetryDocumentIndex import RetryDocumentIndex
from danswer.configs.constants import DanswerCeleryTask
from danswer.db.document import delete_document_by_connector_credential_pair__no_commit
from danswer.db.document import delete_documents_complete__no_commit
from danswer.db.document import get_document
Expand All @@ -31,7 +32,7 @@


@shared_task(
name="document_by_cc_pair_cleanup_task",
name=DanswerCeleryTask.DOCUMENT_BY_CC_PAIR_CLEANUP_TASK,
soft_time_limit=LIGHT_SOFT_TIME_LIMIT,
time_limit=LIGHT_TIME_LIMIT,
max_retries=DOCUMENT_BY_CC_PAIR_CLEANUP_MAX_RETRIES,
Expand Down
7 changes: 4 additions & 3 deletions backend/danswer/background/celery/tasks/vespa/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from danswer.configs.app_configs import JOB_TIMEOUT
from danswer.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
from danswer.configs.constants import DanswerCeleryQueues
from danswer.configs.constants import DanswerCeleryTask
from danswer.configs.constants import DanswerRedisLocks
from danswer.db.connector import fetch_connector_by_id
from danswer.db.connector import mark_cc_pair_as_permissions_synced
Expand Down Expand Up @@ -80,7 +81,7 @@
# celery auto associates tasks created inside another task,
# which bloats the result metadata considerably. trail=False prevents this.
@shared_task(
name="check_for_vespa_sync_task",
name=DanswerCeleryTask.CHECK_FOR_VESPA_SYNC_TASK,
soft_time_limit=JOB_TIMEOUT,
trail=False,
bind=True,
Expand Down Expand Up @@ -707,7 +708,7 @@ def monitor_ccpair_indexing_taskset(
redis_connector_index.reset()


@shared_task(name="monitor_vespa_sync", soft_time_limit=300, bind=True)
@shared_task(name=DanswerCeleryTask.MONITOR_VESPA_SYNC, soft_time_limit=300, bind=True)
def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:
"""This is a celery beat task that monitors and finalizes metadata sync tasksets.
It scans for fence values and then gets the counts of any associated tasksets.
Expand Down Expand Up @@ -818,7 +819,7 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:


@shared_task(
name="vespa_metadata_sync_task",
name=DanswerCeleryTask.VESPA_METADATA_SYNC_TASK,
bind=True,
soft_time_limit=LIGHT_SOFT_TIME_LIMIT,
time_limit=LIGHT_TIME_LIMIT,
Expand Down
26 changes: 26 additions & 0 deletions backend/danswer/configs/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,32 @@ class DanswerCeleryPriority(int, Enum):
LOWEST = auto()


class DanswerCeleryTask:
CHECK_FOR_CONNECTOR_DELETION = "check_for_connector_deletion_task"
CHECK_FOR_VESPA_SYNC_TASK = "check_for_vespa_sync_task"
CHECK_FOR_INDEXING = "check_for_indexing"
CHECK_FOR_PRUNING = "check_for_pruning"
CHECK_FOR_DOC_PERMISSIONS_SYNC = "check_for_doc_permissions_sync"
CHECK_FOR_EXTERNAL_GROUP_SYNC = "check_for_external_group_sync"
MONITOR_VESPA_SYNC = "monitor_vespa_sync"
KOMBU_MESSAGE_CLEANUP_TASK = "kombu_message_cleanup_task"
CONNECTOR_PERMISSION_SYNC_GENERATOR_TASK = (
"connector_permission_sync_generator_task"
)
UPDATE_EXTERNAL_DOCUMENT_PERMISSIONS_TASK = (
"update_external_document_permissions_task"
)
CONNECTOR_EXTERNAL_GROUP_SYNC_GENERATOR_TASK = (
"connector_external_group_sync_generator_task"
)
CONNECTOR_INDEXING_PROXY_TASK = "connector_indexing_proxy_task"
CONNECTOR_PRUNING_GENERATOR_TASK = "connector_pruning_generator_task"
DOCUMENT_BY_CC_PAIR_CLEANUP_TASK = "document_by_cc_pair_cleanup_task"
VESPA_METADATA_SYNC_TASK = "vespa_metadata_sync_task"
CHECK_TTL_MANAGEMENT_TASK = "check_ttl_management_task"
AUTOGENERATE_USAGE_REPORT_TASK = "autogenerate_usage_report_task"


REDIS_SOCKET_KEEPALIVE_OPTIONS = {}
REDIS_SOCKET_KEEPALIVE_OPTIONS[socket.TCP_KEEPINTVL] = 15
REDIS_SOCKET_KEEPALIVE_OPTIONS[socket.TCP_KEEPCNT] = 3
Expand Down
3 changes: 2 additions & 1 deletion backend/danswer/redis/redis_connector_credential_pair.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from danswer.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
from danswer.configs.constants import DanswerCeleryPriority
from danswer.configs.constants import DanswerCeleryQueues
from danswer.configs.constants import DanswerCeleryTask
from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id
from danswer.db.document import (
construct_document_select_for_connector_credential_pair_by_needs_sync,
Expand Down Expand Up @@ -105,7 +106,7 @@ def generate_tasks(

# Priority on sync's triggered by new indexing should be medium
result = celery_app.send_task(
"vespa_metadata_sync_task",
DanswerCeleryTask.VESPA_METADATA_SYNC_TASK,
kwargs=dict(document_id=doc.id, tenant_id=tenant_id),
queue=DanswerCeleryQueues.VESPA_METADATA_SYNC,
task_id=custom_task_id,
Expand Down
3 changes: 2 additions & 1 deletion backend/danswer/redis/redis_connector_delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from danswer.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
from danswer.configs.constants import DanswerCeleryPriority
from danswer.configs.constants import DanswerCeleryQueues
from danswer.configs.constants import DanswerCeleryTask
from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id
from danswer.db.document import construct_document_select_for_connector_credential_pair
from danswer.db.models import Document as DbDocument
Expand Down Expand Up @@ -114,7 +115,7 @@ def generate_tasks(

# Priority on sync's triggered by new indexing should be medium
result = celery_app.send_task(
"document_by_cc_pair_cleanup_task",
DanswerCeleryTask.DOCUMENT_BY_CC_PAIR_CLEANUP_TASK,
kwargs=dict(
document_id=doc.id,
connector_id=cc_pair.connector_id,
Expand Down
3 changes: 2 additions & 1 deletion backend/danswer/redis/redis_connector_doc_perm_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from danswer.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
from danswer.configs.constants import DanswerCeleryPriority
from danswer.configs.constants import DanswerCeleryQueues
from danswer.configs.constants import DanswerCeleryTask


class RedisConnectorPermissionSyncPayload(BaseModel):
Expand Down Expand Up @@ -149,7 +150,7 @@ def generate_tasks(
self.redis.sadd(self.taskset_key, custom_task_id)

result = celery_app.send_task(
"update_external_document_permissions_task",
DanswerCeleryTask.UPDATE_EXTERNAL_DOCUMENT_PERMISSIONS_TASK,
kwargs=dict(
tenant_id=self.tenant_id,
serialized_doc_external_access=doc_perm.to_dict(),
Expand Down
Loading

0 comments on commit 6c2269e

Please sign in to comment.