diff --git a/backend/danswer/background/celery/tasks/beat_schedule.py b/backend/danswer/background/celery/tasks/beat_schedule.py index 3b18f8931e4..6d65bb01654 100644 --- a/backend/danswer/background/celery/tasks/beat_schedule.py +++ b/backend/danswer/background/celery/tasks/beat_schedule.py @@ -2,54 +2,55 @@ from typing import Any from danswer.configs.constants import DanswerCeleryPriority +from danswer.configs.constants import DanswerCeleryTask tasks_to_schedule = [ { "name": "check-for-vespa-sync", - "task": "check_for_vespa_sync_task", + "task": DanswerCeleryTask.CHECK_FOR_VESPA_SYNC_TASK, "schedule": timedelta(seconds=20), "options": {"priority": DanswerCeleryPriority.HIGH}, }, { "name": "check-for-connector-deletion", - "task": "check_for_connector_deletion_task", + "task": DanswerCeleryTask.CHECK_FOR_CONNECTOR_DELETION, "schedule": timedelta(seconds=20), "options": {"priority": DanswerCeleryPriority.HIGH}, }, { "name": "check-for-indexing", - "task": "check_for_indexing", + "task": DanswerCeleryTask.CHECK_FOR_INDEXING, "schedule": timedelta(seconds=15), "options": {"priority": DanswerCeleryPriority.HIGH}, }, { "name": "check-for-prune", - "task": "check_for_pruning", + "task": DanswerCeleryTask.CHECK_FOR_PRUNING, "schedule": timedelta(seconds=15), "options": {"priority": DanswerCeleryPriority.HIGH}, }, { "name": "kombu-message-cleanup", - "task": "kombu_message_cleanup_task", + "task": DanswerCeleryTask.KOMBU_MESSAGE_CLEANUP_TASK, "schedule": timedelta(seconds=3600), "options": {"priority": DanswerCeleryPriority.LOWEST}, }, { "name": "monitor-vespa-sync", - "task": "monitor_vespa_sync", + "task": DanswerCeleryTask.MONITOR_VESPA_SYNC, "schedule": timedelta(seconds=5), "options": {"priority": DanswerCeleryPriority.HIGH}, }, { "name": "check-for-doc-permissions-sync", - "task": "check_for_doc_permissions_sync", + "task": DanswerCeleryTask.CHECK_FOR_DOC_PERMISSIONS_SYNC, "schedule": timedelta(seconds=30), "options": {"priority": DanswerCeleryPriority.HIGH}, }, { "name": "check-for-external-group-sync", - "task": "check_for_external_group_sync", + "task": DanswerCeleryTask.CHECK_FOR_EXTERNAL_GROUP_SYNC, "schedule": timedelta(seconds=20), "options": {"priority": DanswerCeleryPriority.HIGH}, }, diff --git a/backend/danswer/background/celery/tasks/connector_deletion/tasks.py b/backend/danswer/background/celery/tasks/connector_deletion/tasks.py index caae8be301b..d0298f2dd6a 100644 --- a/backend/danswer/background/celery/tasks/connector_deletion/tasks.py +++ b/backend/danswer/background/celery/tasks/connector_deletion/tasks.py @@ -11,6 +11,7 @@ from danswer.background.celery.apps.app_base import task_logger from danswer.configs.app_configs import JOB_TIMEOUT from danswer.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT +from danswer.configs.constants import DanswerCeleryTask from danswer.configs.constants import DanswerRedisLocks from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id from danswer.db.connector_credential_pair import get_connector_credential_pairs @@ -28,7 +29,7 @@ class TaskDependencyError(RuntimeError): @shared_task( - name="check_for_connector_deletion_task", + name=DanswerCeleryTask.CHECK_FOR_CONNECTOR_DELETION, soft_time_limit=JOB_TIMEOUT, trail=False, bind=True, diff --git a/backend/danswer/background/celery/tasks/doc_permission_syncing/tasks.py b/backend/danswer/background/celery/tasks/doc_permission_syncing/tasks.py index babf9b69b6f..747197621df 100644 --- a/backend/danswer/background/celery/tasks/doc_permission_syncing/tasks.py +++ b/backend/danswer/background/celery/tasks/doc_permission_syncing/tasks.py @@ -18,6 +18,7 @@ from danswer.configs.constants import DANSWER_REDIS_FUNCTION_LOCK_PREFIX from danswer.configs.constants import DanswerCeleryPriority from danswer.configs.constants import DanswerCeleryQueues +from danswer.configs.constants import DanswerCeleryTask from danswer.configs.constants import DanswerRedisLocks from danswer.configs.constants import DocumentSource from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id @@ -82,7 +83,7 @@ def _is_external_doc_permissions_sync_due(cc_pair: ConnectorCredentialPair) -> b @shared_task( - name="check_for_doc_permissions_sync", + name=DanswerCeleryTask.CHECK_FOR_DOC_PERMISSIONS_SYNC, soft_time_limit=JOB_TIMEOUT, bind=True, ) @@ -164,7 +165,7 @@ def try_creating_permissions_sync_task( custom_task_id = f"{redis_connector.permissions.generator_task_key}_{uuid4()}" result = app.send_task( - "connector_permission_sync_generator_task", + DanswerCeleryTask.CONNECTOR_PERMISSION_SYNC_GENERATOR_TASK, kwargs=dict( cc_pair_id=cc_pair_id, tenant_id=tenant_id, @@ -191,7 +192,7 @@ def try_creating_permissions_sync_task( @shared_task( - name="connector_permission_sync_generator_task", + name=DanswerCeleryTask.CONNECTOR_PERMISSION_SYNC_GENERATOR_TASK, acks_late=False, soft_time_limit=JOB_TIMEOUT, track_started=True, @@ -286,7 +287,7 @@ def connector_permission_sync_generator_task( @shared_task( - name="update_external_document_permissions_task", + name=DanswerCeleryTask.UPDATE_EXTERNAL_DOCUMENT_PERMISSIONS_TASK, soft_time_limit=LIGHT_SOFT_TIME_LIMIT, time_limit=LIGHT_TIME_LIMIT, max_retries=DOCUMENT_PERMISSIONS_UPDATE_MAX_RETRIES, diff --git a/backend/danswer/background/celery/tasks/external_group_syncing/tasks.py b/backend/danswer/background/celery/tasks/external_group_syncing/tasks.py index d80b2b518ee..698894c7507 100644 --- a/backend/danswer/background/celery/tasks/external_group_syncing/tasks.py +++ b/backend/danswer/background/celery/tasks/external_group_syncing/tasks.py @@ -17,6 +17,7 @@ from danswer.configs.constants import DANSWER_REDIS_FUNCTION_LOCK_PREFIX from danswer.configs.constants import DanswerCeleryPriority from danswer.configs.constants import DanswerCeleryQueues +from danswer.configs.constants import DanswerCeleryTask from danswer.configs.constants import DanswerRedisLocks from danswer.db.connector import mark_cc_pair_as_external_group_synced from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id @@ -85,7 +86,7 @@ def _is_external_group_sync_due(cc_pair: ConnectorCredentialPair) -> bool: @shared_task( - name="check_for_external_group_sync", + name=DanswerCeleryTask.CHECK_FOR_EXTERNAL_GROUP_SYNC, soft_time_limit=JOB_TIMEOUT, bind=True, ) @@ -161,7 +162,7 @@ def try_creating_external_group_sync_task( custom_task_id = f"{redis_connector.external_group_sync.taskset_key}_{uuid4()}" result = app.send_task( - "connector_external_group_sync_generator_task", + DanswerCeleryTask.CONNECTOR_EXTERNAL_GROUP_SYNC_GENERATOR_TASK, kwargs=dict( cc_pair_id=cc_pair_id, tenant_id=tenant_id, @@ -191,7 +192,7 @@ def try_creating_external_group_sync_task( @shared_task( - name="connector_external_group_sync_generator_task", + name=DanswerCeleryTask.CONNECTOR_EXTERNAL_GROUP_SYNC_GENERATOR_TASK, acks_late=False, soft_time_limit=JOB_TIMEOUT, track_started=True, diff --git a/backend/danswer/background/celery/tasks/indexing/tasks.py b/backend/danswer/background/celery/tasks/indexing/tasks.py index 8e445b4f3f4..57bca336f09 100644 --- a/backend/danswer/background/celery/tasks/indexing/tasks.py +++ b/backend/danswer/background/celery/tasks/indexing/tasks.py @@ -23,6 +23,7 @@ from danswer.configs.constants import DANSWER_REDIS_FUNCTION_LOCK_PREFIX from danswer.configs.constants import DanswerCeleryPriority from danswer.configs.constants import DanswerCeleryQueues +from danswer.configs.constants import DanswerCeleryTask from danswer.configs.constants import DanswerRedisLocks from danswer.configs.constants import DocumentSource from danswer.db.connector import mark_ccpair_with_indexing_trigger @@ -156,7 +157,7 @@ def get_unfenced_index_attempt_ids(db_session: Session, r: redis.Redis) -> list[ @shared_task( - name="check_for_indexing", + name=DanswerCeleryTask.CHECK_FOR_INDEXING, soft_time_limit=300, bind=True, ) @@ -486,7 +487,7 @@ def try_creating_indexing_task( # when the task is sent, we have yet to finish setting up the fence # therefore, the task must contain code that blocks until the fence is ready result = celery_app.send_task( - "connector_indexing_proxy_task", + DanswerCeleryTask.CONNECTOR_INDEXING_PROXY_TASK, kwargs=dict( index_attempt_id=index_attempt_id, cc_pair_id=cc_pair.id, @@ -524,7 +525,10 @@ def try_creating_indexing_task( @shared_task( - name="connector_indexing_proxy_task", bind=True, acks_late=False, track_started=True + name=DanswerCeleryTask.CONNECTOR_INDEXING_PROXY_TASK, + bind=True, + acks_late=False, + track_started=True, ) def connector_indexing_proxy_task( self: Task, diff --git a/backend/danswer/background/celery/tasks/periodic/tasks.py b/backend/danswer/background/celery/tasks/periodic/tasks.py index 20baa7c52fa..efef013f5e4 100644 --- a/backend/danswer/background/celery/tasks/periodic/tasks.py +++ b/backend/danswer/background/celery/tasks/periodic/tasks.py @@ -13,12 +13,13 @@ from danswer.background.celery.apps.app_base import task_logger from danswer.configs.app_configs import JOB_TIMEOUT +from danswer.configs.constants import DanswerCeleryTask from danswer.configs.constants import PostgresAdvisoryLocks from danswer.db.engine import get_session_with_tenant @shared_task( - name="kombu_message_cleanup_task", + name=DanswerCeleryTask.KOMBU_MESSAGE_CLEANUP_TASK, soft_time_limit=JOB_TIMEOUT, bind=True, base=AbortableTask, diff --git a/backend/danswer/background/celery/tasks/pruning/tasks.py b/backend/danswer/background/celery/tasks/pruning/tasks.py index 67b781f228f..be2ee4ba418 100644 --- a/backend/danswer/background/celery/tasks/pruning/tasks.py +++ b/backend/danswer/background/celery/tasks/pruning/tasks.py @@ -20,6 +20,7 @@ from danswer.configs.constants import DANSWER_REDIS_FUNCTION_LOCK_PREFIX from danswer.configs.constants import DanswerCeleryPriority from danswer.configs.constants import DanswerCeleryQueues +from danswer.configs.constants import DanswerCeleryTask from danswer.configs.constants import DanswerRedisLocks from danswer.connectors.factory import instantiate_connector from danswer.connectors.models import InputType @@ -75,7 +76,7 @@ def _is_pruning_due(cc_pair: ConnectorCredentialPair) -> bool: @shared_task( - name="check_for_pruning", + name=DanswerCeleryTask.CHECK_FOR_PRUNING, soft_time_limit=JOB_TIMEOUT, bind=True, ) @@ -184,7 +185,7 @@ def try_creating_prune_generator_task( custom_task_id = f"{redis_connector.prune.generator_task_key}_{uuid4()}" celery_app.send_task( - "connector_pruning_generator_task", + DanswerCeleryTask.CONNECTOR_PRUNING_GENERATOR_TASK, kwargs=dict( cc_pair_id=cc_pair.id, connector_id=cc_pair.connector_id, @@ -209,7 +210,7 @@ def try_creating_prune_generator_task( @shared_task( - name="connector_pruning_generator_task", + name=DanswerCeleryTask.CONNECTOR_PRUNING_GENERATOR_TASK, acks_late=False, soft_time_limit=JOB_TIMEOUT, track_started=True, diff --git a/backend/danswer/background/celery/tasks/shared/tasks.py b/backend/danswer/background/celery/tasks/shared/tasks.py index 2719a4d0665..2212046c3e9 100644 --- a/backend/danswer/background/celery/tasks/shared/tasks.py +++ b/backend/danswer/background/celery/tasks/shared/tasks.py @@ -9,6 +9,7 @@ from danswer.access.access import get_access_for_document from danswer.background.celery.apps.app_base import task_logger from danswer.background.celery.tasks.shared.RetryDocumentIndex import RetryDocumentIndex +from danswer.configs.constants import DanswerCeleryTask from danswer.db.document import delete_document_by_connector_credential_pair__no_commit from danswer.db.document import delete_documents_complete__no_commit from danswer.db.document import get_document @@ -31,7 +32,7 @@ @shared_task( - name="document_by_cc_pair_cleanup_task", + name=DanswerCeleryTask.DOCUMENT_BY_CC_PAIR_CLEANUP_TASK, soft_time_limit=LIGHT_SOFT_TIME_LIMIT, time_limit=LIGHT_TIME_LIMIT, max_retries=DOCUMENT_BY_CC_PAIR_CLEANUP_MAX_RETRIES, diff --git a/backend/danswer/background/celery/tasks/vespa/tasks.py b/backend/danswer/background/celery/tasks/vespa/tasks.py index 13835b5d2c6..1670dcd84c3 100644 --- a/backend/danswer/background/celery/tasks/vespa/tasks.py +++ b/backend/danswer/background/celery/tasks/vespa/tasks.py @@ -25,6 +25,7 @@ from danswer.configs.app_configs import JOB_TIMEOUT from danswer.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT from danswer.configs.constants import DanswerCeleryQueues +from danswer.configs.constants import DanswerCeleryTask from danswer.configs.constants import DanswerRedisLocks from danswer.db.connector import fetch_connector_by_id from danswer.db.connector import mark_cc_pair_as_permissions_synced @@ -80,7 +81,7 @@ # celery auto associates tasks created inside another task, # which bloats the result metadata considerably. trail=False prevents this. @shared_task( - name="check_for_vespa_sync_task", + name=DanswerCeleryTask.CHECK_FOR_VESPA_SYNC_TASK, soft_time_limit=JOB_TIMEOUT, trail=False, bind=True, @@ -707,7 +708,7 @@ def monitor_ccpair_indexing_taskset( redis_connector_index.reset() -@shared_task(name="monitor_vespa_sync", soft_time_limit=300, bind=True) +@shared_task(name=DanswerCeleryTask.MONITOR_VESPA_SYNC, soft_time_limit=300, bind=True) def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool: """This is a celery beat task that monitors and finalizes metadata sync tasksets. It scans for fence values and then gets the counts of any associated tasksets. @@ -818,7 +819,7 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool: @shared_task( - name="vespa_metadata_sync_task", + name=DanswerCeleryTask.VESPA_METADATA_SYNC_TASK, bind=True, soft_time_limit=LIGHT_SOFT_TIME_LIMIT, time_limit=LIGHT_TIME_LIMIT, diff --git a/backend/danswer/configs/constants.py b/backend/danswer/configs/constants.py index e6facc587d7..4913cbf7077 100644 --- a/backend/danswer/configs/constants.py +++ b/backend/danswer/configs/constants.py @@ -259,6 +259,32 @@ class DanswerCeleryPriority(int, Enum): LOWEST = auto() +class DanswerCeleryTask: + CHECK_FOR_CONNECTOR_DELETION = "check_for_connector_deletion_task" + CHECK_FOR_VESPA_SYNC_TASK = "check_for_vespa_sync_task" + CHECK_FOR_INDEXING = "check_for_indexing" + CHECK_FOR_PRUNING = "check_for_pruning" + CHECK_FOR_DOC_PERMISSIONS_SYNC = "check_for_doc_permissions_sync" + CHECK_FOR_EXTERNAL_GROUP_SYNC = "check_for_external_group_sync" + MONITOR_VESPA_SYNC = "monitor_vespa_sync" + KOMBU_MESSAGE_CLEANUP_TASK = "kombu_message_cleanup_task" + CONNECTOR_PERMISSION_SYNC_GENERATOR_TASK = ( + "connector_permission_sync_generator_task" + ) + UPDATE_EXTERNAL_DOCUMENT_PERMISSIONS_TASK = ( + "update_external_document_permissions_task" + ) + CONNECTOR_EXTERNAL_GROUP_SYNC_GENERATOR_TASK = ( + "connector_external_group_sync_generator_task" + ) + CONNECTOR_INDEXING_PROXY_TASK = "connector_indexing_proxy_task" + CONNECTOR_PRUNING_GENERATOR_TASK = "connector_pruning_generator_task" + DOCUMENT_BY_CC_PAIR_CLEANUP_TASK = "document_by_cc_pair_cleanup_task" + VESPA_METADATA_SYNC_TASK = "vespa_metadata_sync_task" + CHECK_TTL_MANAGEMENT_TASK = "check_ttl_management_task" + AUTOGENERATE_USAGE_REPORT_TASK = "autogenerate_usage_report_task" + + REDIS_SOCKET_KEEPALIVE_OPTIONS = {} REDIS_SOCKET_KEEPALIVE_OPTIONS[socket.TCP_KEEPINTVL] = 15 REDIS_SOCKET_KEEPALIVE_OPTIONS[socket.TCP_KEEPCNT] = 3 diff --git a/backend/danswer/redis/redis_connector_credential_pair.py b/backend/danswer/redis/redis_connector_credential_pair.py index f624fa1542a..23661bf82c9 100644 --- a/backend/danswer/redis/redis_connector_credential_pair.py +++ b/backend/danswer/redis/redis_connector_credential_pair.py @@ -10,6 +10,7 @@ from danswer.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT from danswer.configs.constants import DanswerCeleryPriority from danswer.configs.constants import DanswerCeleryQueues +from danswer.configs.constants import DanswerCeleryTask from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id from danswer.db.document import ( construct_document_select_for_connector_credential_pair_by_needs_sync, @@ -105,7 +106,7 @@ def generate_tasks( # Priority on sync's triggered by new indexing should be medium result = celery_app.send_task( - "vespa_metadata_sync_task", + DanswerCeleryTask.VESPA_METADATA_SYNC_TASK, kwargs=dict(document_id=doc.id, tenant_id=tenant_id), queue=DanswerCeleryQueues.VESPA_METADATA_SYNC, task_id=custom_task_id, diff --git a/backend/danswer/redis/redis_connector_delete.py b/backend/danswer/redis/redis_connector_delete.py index 1b7a440b2e5..4ab42ee65a7 100644 --- a/backend/danswer/redis/redis_connector_delete.py +++ b/backend/danswer/redis/redis_connector_delete.py @@ -12,6 +12,7 @@ from danswer.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT from danswer.configs.constants import DanswerCeleryPriority from danswer.configs.constants import DanswerCeleryQueues +from danswer.configs.constants import DanswerCeleryTask from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id from danswer.db.document import construct_document_select_for_connector_credential_pair from danswer.db.models import Document as DbDocument @@ -114,7 +115,7 @@ def generate_tasks( # Priority on sync's triggered by new indexing should be medium result = celery_app.send_task( - "document_by_cc_pair_cleanup_task", + DanswerCeleryTask.DOCUMENT_BY_CC_PAIR_CLEANUP_TASK, kwargs=dict( document_id=doc.id, connector_id=cc_pair.connector_id, diff --git a/backend/danswer/redis/redis_connector_doc_perm_sync.py b/backend/danswer/redis/redis_connector_doc_perm_sync.py index 7b3748fcc2d..2e5eb69b533 100644 --- a/backend/danswer/redis/redis_connector_doc_perm_sync.py +++ b/backend/danswer/redis/redis_connector_doc_perm_sync.py @@ -12,6 +12,7 @@ from danswer.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT from danswer.configs.constants import DanswerCeleryPriority from danswer.configs.constants import DanswerCeleryQueues +from danswer.configs.constants import DanswerCeleryTask class RedisConnectorPermissionSyncPayload(BaseModel): @@ -149,7 +150,7 @@ def generate_tasks( self.redis.sadd(self.taskset_key, custom_task_id) result = celery_app.send_task( - "update_external_document_permissions_task", + DanswerCeleryTask.UPDATE_EXTERNAL_DOCUMENT_PERMISSIONS_TASK, kwargs=dict( tenant_id=self.tenant_id, serialized_doc_external_access=doc_perm.to_dict(), diff --git a/backend/danswer/redis/redis_connector_prune.py b/backend/danswer/redis/redis_connector_prune.py index f8e6f372619..9739d2f9832 100644 --- a/backend/danswer/redis/redis_connector_prune.py +++ b/backend/danswer/redis/redis_connector_prune.py @@ -10,6 +10,7 @@ from danswer.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT from danswer.configs.constants import DanswerCeleryPriority from danswer.configs.constants import DanswerCeleryQueues +from danswer.configs.constants import DanswerCeleryTask from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id @@ -134,7 +135,7 @@ def generate_tasks( # Priority on sync's triggered by new indexing should be medium result = celery_app.send_task( - "document_by_cc_pair_cleanup_task", + DanswerCeleryTask.DOCUMENT_BY_CC_PAIR_CLEANUP_TASK, kwargs=dict( document_id=doc_id, connector_id=cc_pair.connector_id, diff --git a/backend/danswer/redis/redis_document_set.py b/backend/danswer/redis/redis_document_set.py index 879d955eb88..ff92c30a4e5 100644 --- a/backend/danswer/redis/redis_document_set.py +++ b/backend/danswer/redis/redis_document_set.py @@ -11,6 +11,7 @@ from danswer.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT from danswer.configs.constants import DanswerCeleryPriority from danswer.configs.constants import DanswerCeleryQueues +from danswer.configs.constants import DanswerCeleryTask from danswer.db.document_set import construct_document_select_by_docset from danswer.redis.redis_object_helper import RedisObjectHelper @@ -76,7 +77,7 @@ def generate_tasks( redis_client.sadd(self.taskset_key, custom_task_id) result = celery_app.send_task( - "vespa_metadata_sync_task", + DanswerCeleryTask.VESPA_METADATA_SYNC_TASK, kwargs=dict(document_id=doc.id, tenant_id=tenant_id), queue=DanswerCeleryQueues.VESPA_METADATA_SYNC, task_id=custom_task_id, diff --git a/backend/danswer/redis/redis_usergroup.py b/backend/danswer/redis/redis_usergroup.py index 7c49b9c7fb8..83bd8859632 100644 --- a/backend/danswer/redis/redis_usergroup.py +++ b/backend/danswer/redis/redis_usergroup.py @@ -11,6 +11,7 @@ from danswer.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT from danswer.configs.constants import DanswerCeleryPriority from danswer.configs.constants import DanswerCeleryQueues +from danswer.configs.constants import DanswerCeleryTask from danswer.redis.redis_object_helper import RedisObjectHelper from danswer.utils.variable_functionality import fetch_versioned_implementation from danswer.utils.variable_functionality import global_version @@ -89,7 +90,7 @@ def generate_tasks( redis_client.sadd(self.taskset_key, custom_task_id) result = celery_app.send_task( - "vespa_metadata_sync_task", + DanswerCeleryTask.VESPA_METADATA_SYNC_TASK, kwargs=dict(document_id=doc.id, tenant_id=tenant_id), queue=DanswerCeleryQueues.VESPA_METADATA_SYNC, task_id=custom_task_id, diff --git a/backend/danswer/server/documents/connector.py b/backend/danswer/server/documents/connector.py index cdeb4ed16c6..36af9d2baa2 100644 --- a/backend/danswer/server/documents/connector.py +++ b/backend/danswer/server/documents/connector.py @@ -20,6 +20,7 @@ from danswer.background.celery.versioned_apps.primary import app as primary_app from danswer.configs.app_configs import ENABLED_CONNECTOR_TYPES from danswer.configs.constants import DanswerCeleryPriority +from danswer.configs.constants import DanswerCeleryTask from danswer.configs.constants import DocumentSource from danswer.configs.constants import FileOrigin from danswer.connectors.google_utils.google_auth import ( @@ -867,7 +868,7 @@ def connector_run_once( # run the beat task to pick up the triggers immediately primary_app.send_task( - "check_for_indexing", + DanswerCeleryTask.CHECK_FOR_INDEXING, priority=DanswerCeleryPriority.HIGH, kwargs={"tenant_id": tenant_id}, ) diff --git a/backend/danswer/server/manage/administrative.py b/backend/danswer/server/manage/administrative.py index 1ceeb776abc..cbf744500d4 100644 --- a/backend/danswer/server/manage/administrative.py +++ b/backend/danswer/server/manage/administrative.py @@ -13,6 +13,7 @@ from danswer.background.celery.versioned_apps.primary import app as primary_app from danswer.configs.app_configs import GENERATIVE_MODEL_ACCESS_CHECK_FREQ from danswer.configs.constants import DanswerCeleryPriority +from danswer.configs.constants import DanswerCeleryTask from danswer.configs.constants import DocumentSource from danswer.configs.constants import KV_GEN_AI_KEY_CHECK_TIME from danswer.db.connector_credential_pair import get_connector_credential_pair @@ -199,7 +200,7 @@ def create_deletion_attempt_for_connector_id( # run the beat task to pick up this deletion from the db immediately primary_app.send_task( - "check_for_connector_deletion_task", + DanswerCeleryTask.CHECK_FOR_CONNECTOR_DELETION, priority=DanswerCeleryPriority.HIGH, kwargs={"tenant_id": tenant_id}, ) diff --git a/backend/ee/danswer/background/celery/tasks/beat_schedule.py b/backend/ee/danswer/background/celery/tasks/beat_schedule.py index 86680e60c7f..4444d73544f 100644 --- a/backend/ee/danswer/background/celery/tasks/beat_schedule.py +++ b/backend/ee/danswer/background/celery/tasks/beat_schedule.py @@ -4,16 +4,17 @@ from danswer.background.celery.tasks.beat_schedule import ( tasks_to_schedule as base_tasks_to_schedule, ) +from danswer.configs.constants import DanswerCeleryTask ee_tasks_to_schedule = [ { "name": "autogenerate_usage_report", - "task": "autogenerate_usage_report_task", + "task": DanswerCeleryTask.AUTOGENERATE_USAGE_REPORT_TASK, "schedule": timedelta(days=30), # TODO: change this to config flag }, { "name": "check-ttl-management", - "task": "check_ttl_management_task", + "task": DanswerCeleryTask.CHECK_TTL_MANAGEMENT_TASK, "schedule": timedelta(hours=1), }, ]