From b874171e4ca2ca03958837641edc21dd058d80e5 Mon Sep 17 00:00:00 2001 From: rkuo-danswer Date: Wed, 27 Nov 2024 17:34:34 -0800 Subject: [PATCH 1/3] use indexing flag in db for manually triggering indexing (#3264) * use indexing flag in db for manually trigger indexing * add comment. * only try to release the lock if we actually succeeded with the lock * ensure we don't trigger manual indexing on anything but the primary search settings * comment usage of primary search settings * run check for indexing immediately after indexing triggers are set * reorder fix --- ...78b8217_add_indexing_trigger_to_cc_pair.py | 30 +++++++++ .../background/celery/tasks/indexing/tasks.py | 54 +++++++++++++--- .../background/celery/versioned_apps/beat.py | 4 +- .../celery/versioned_apps/primary.py | 4 +- backend/danswer/db/connector.py | 23 +++++++ backend/danswer/db/enums.py | 5 ++ backend/danswer/db/models.py | 6 +- backend/danswer/main.py | 1 + backend/danswer/server/documents/connector.py | 64 ++++++++----------- 9 files changed, 139 insertions(+), 52 deletions(-) create mode 100644 backend/alembic/versions/abe7378b8217_add_indexing_trigger_to_cc_pair.py diff --git a/backend/alembic/versions/abe7378b8217_add_indexing_trigger_to_cc_pair.py b/backend/alembic/versions/abe7378b8217_add_indexing_trigger_to_cc_pair.py new file mode 100644 index 00000000000..cc947eef0e0 --- /dev/null +++ b/backend/alembic/versions/abe7378b8217_add_indexing_trigger_to_cc_pair.py @@ -0,0 +1,30 @@ +"""add indexing trigger to cc_pair + +Revision ID: abe7378b8217 +Revises: 6d562f86c78b +Create Date: 2024-11-26 19:09:53.481171 + +""" +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = "abe7378b8217" +down_revision = "93560ba1b118" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column( + "connector_credential_pair", + sa.Column( + "indexing_trigger", + sa.Enum("UPDATE", "REINDEX", name="indexingmode", native_enum=False), + nullable=True, + ), + ) + + +def downgrade() -> None: + op.drop_column("connector_credential_pair", "indexing_trigger") diff --git a/backend/danswer/background/celery/tasks/indexing/tasks.py b/backend/danswer/background/celery/tasks/indexing/tasks.py index 73b2b20a4e0..9ebab40d0af 100644 --- a/backend/danswer/background/celery/tasks/indexing/tasks.py +++ b/backend/danswer/background/celery/tasks/indexing/tasks.py @@ -25,11 +25,13 @@ from danswer.configs.constants import DanswerCeleryQueues from danswer.configs.constants import DanswerRedisLocks from danswer.configs.constants import DocumentSource +from danswer.db.connector import mark_ccpair_with_indexing_trigger from danswer.db.connector_credential_pair import fetch_connector_credential_pairs from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id from danswer.db.engine import get_db_current_time from danswer.db.engine import get_session_with_tenant from danswer.db.enums import ConnectorCredentialPairStatus +from danswer.db.enums import IndexingMode from danswer.db.enums import IndexingStatus from danswer.db.enums import IndexModelStatus from danswer.db.index_attempt import create_index_attempt @@ -159,7 +161,7 @@ def get_unfenced_index_attempt_ids(db_session: Session, r: redis.Redis) -> list[ ) def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: tasks_created = 0 - + locked = False r = get_redis_client(tenant_id=tenant_id) lock_beat: RedisLock = r.lock( @@ -172,6 +174,8 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: if not lock_beat.acquire(blocking=False): return None + locked = True + # check for search settings swap with get_session_with_tenant(tenant_id=tenant_id) as db_session: old_search_settings = check_index_swap(db_session=db_session) @@ -231,22 +235,46 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: last_attempt = get_last_attempt_for_cc_pair( cc_pair.id, search_settings_instance.id, db_session ) + + search_settings_primary = False + if search_settings_instance.id == primary_search_settings.id: + search_settings_primary = True + if not _should_index( cc_pair=cc_pair, last_index=last_attempt, search_settings_instance=search_settings_instance, + search_settings_primary=search_settings_primary, secondary_index_building=len(search_settings) > 1, db_session=db_session, ): continue + reindex = False + if search_settings_instance.id == primary_search_settings.id: + # the indexing trigger is only checked and cleared with the primary search settings + if cc_pair.indexing_trigger is not None: + if cc_pair.indexing_trigger == IndexingMode.REINDEX: + reindex = True + + task_logger.info( + f"Connector indexing manual trigger detected: " + f"cc_pair={cc_pair.id} " + f"search_settings={search_settings_instance.id} " + f"indexing_mode={cc_pair.indexing_trigger}" + ) + + mark_ccpair_with_indexing_trigger( + cc_pair.id, None, db_session + ) + # using a task queue and only allowing one task per cc_pair/search_setting # prevents us from starving out certain attempts attempt_id = try_creating_indexing_task( self.app, cc_pair, search_settings_instance, - False, + reindex, db_session, r, tenant_id, @@ -281,7 +309,6 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: mark_attempt_failed( attempt.id, db_session, failure_reason=failure_reason ) - except SoftTimeLimitExceeded: task_logger.info( "Soft time limit exceeded, task is being terminated gracefully." @@ -289,13 +316,14 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: except Exception: task_logger.exception(f"Unexpected exception: tenant={tenant_id}") finally: - if lock_beat.owned(): - lock_beat.release() - else: - task_logger.error( - "check_for_indexing - Lock not owned on completion: " - f"tenant={tenant_id}" - ) + if locked: + if lock_beat.owned(): + lock_beat.release() + else: + task_logger.error( + "check_for_indexing - Lock not owned on completion: " + f"tenant={tenant_id}" + ) return tasks_created @@ -304,6 +332,7 @@ def _should_index( cc_pair: ConnectorCredentialPair, last_index: IndexAttempt | None, search_settings_instance: SearchSettings, + search_settings_primary: bool, secondary_index_building: bool, db_session: Session, ) -> bool: @@ -368,6 +397,11 @@ def _should_index( ): return False + if search_settings_primary: + if cc_pair.indexing_trigger is not None: + # if a manual indexing trigger is on the cc pair, honor it for primary search settings + return True + # if no attempt has ever occurred, we should index regardless of refresh_freq if not last_index: return True diff --git a/backend/danswer/background/celery/versioned_apps/beat.py b/backend/danswer/background/celery/versioned_apps/beat.py index af407f93c64..64bc1112ed3 100644 --- a/backend/danswer/background/celery/versioned_apps/beat.py +++ b/backend/danswer/background/celery/versioned_apps/beat.py @@ -1,6 +1,8 @@ """Factory stub for running celery worker / celery beat.""" +from celery import Celery + from danswer.background.celery.apps.beat import celery_app from danswer.utils.variable_functionality import set_is_ee_based_on_env_variable set_is_ee_based_on_env_variable() -app = celery_app +app: Celery = celery_app diff --git a/backend/danswer/background/celery/versioned_apps/primary.py b/backend/danswer/background/celery/versioned_apps/primary.py index 2d97caa3da5..f07a63b2e1a 100644 --- a/backend/danswer/background/celery/versioned_apps/primary.py +++ b/backend/danswer/background/celery/versioned_apps/primary.py @@ -1,8 +1,10 @@ """Factory stub for running celery worker / celery beat.""" +from celery import Celery + from danswer.utils.variable_functionality import fetch_versioned_implementation from danswer.utils.variable_functionality import set_is_ee_based_on_env_variable set_is_ee_based_on_env_variable() -app = fetch_versioned_implementation( +app: Celery = fetch_versioned_implementation( "danswer.background.celery.apps.primary", "celery_app" ) diff --git a/backend/danswer/db/connector.py b/backend/danswer/db/connector.py index 767a722eec4..1bcfe75e4c1 100644 --- a/backend/danswer/db/connector.py +++ b/backend/danswer/db/connector.py @@ -12,6 +12,7 @@ from danswer.configs.app_configs import DEFAULT_PRUNING_FREQ from danswer.configs.constants import DocumentSource from danswer.connectors.models import InputType +from danswer.db.enums import IndexingMode from danswer.db.models import Connector from danswer.db.models import ConnectorCredentialPair from danswer.db.models import IndexAttempt @@ -311,3 +312,25 @@ def mark_cc_pair_as_external_group_synced(db_session: Session, cc_pair_id: int) # If this changes, we need to update this function. cc_pair.last_time_external_group_sync = datetime.now(timezone.utc) db_session.commit() + + +def mark_ccpair_with_indexing_trigger( + cc_pair_id: int, indexing_mode: IndexingMode | None, db_session: Session +) -> None: + """indexing_mode sets a field which will be picked up by a background task + to trigger indexing. Set to None to disable the trigger.""" + try: + cc_pair = db_session.execute( + select(ConnectorCredentialPair) + .where(ConnectorCredentialPair.id == cc_pair_id) + .with_for_update() + ).scalar_one() + + if cc_pair is None: + raise ValueError(f"No cc_pair with ID: {cc_pair_id}") + + cc_pair.indexing_trigger = indexing_mode + db_session.commit() + except Exception: + db_session.rollback() + raise diff --git a/backend/danswer/db/enums.py b/backend/danswer/db/enums.py index b1905d4e785..0ccb1470ca7 100644 --- a/backend/danswer/db/enums.py +++ b/backend/danswer/db/enums.py @@ -19,6 +19,11 @@ def is_terminal(self) -> bool: return self in terminal_states +class IndexingMode(str, PyEnum): + UPDATE = "update" + REINDEX = "reindex" + + # these may differ in the future, which is why we're okay with this duplication class DeletionStatus(str, PyEnum): NOT_STARTED = "not_started" diff --git a/backend/danswer/db/models.py b/backend/danswer/db/models.py index 76e70c2d2d9..3cae55a9c66 100644 --- a/backend/danswer/db/models.py +++ b/backend/danswer/db/models.py @@ -42,7 +42,7 @@ from danswer.configs.constants import DocumentSource from danswer.configs.constants import FileOrigin from danswer.configs.constants import MessageType -from danswer.db.enums import AccessType +from danswer.db.enums import AccessType, IndexingMode from danswer.configs.constants import NotificationType from danswer.configs.constants import SearchFeedbackType from danswer.configs.constants import TokenRateLimitScope @@ -438,6 +438,10 @@ class ConnectorCredentialPair(Base): total_docs_indexed: Mapped[int] = mapped_column(Integer, default=0) + indexing_trigger: Mapped[IndexingMode | None] = mapped_column( + Enum(IndexingMode, native_enum=False), nullable=True + ) + connector: Mapped["Connector"] = relationship( "Connector", back_populates="credentials" ) diff --git a/backend/danswer/main.py b/backend/danswer/main.py index 3fd7072bb9a..f72f3addec9 100644 --- a/backend/danswer/main.py +++ b/backend/danswer/main.py @@ -44,6 +44,7 @@ from danswer.configs.constants import POSTGRES_WEB_APP_NAME from danswer.db.engine import SqlEngine from danswer.db.engine import warm_up_connections +from danswer.server.api_key.api import router as api_key_router from danswer.server.auth_check import check_router_auth from danswer.server.danswer_api.ingestion import router as danswer_api_router from danswer.server.documents.cc_pair import router as cc_pair_router diff --git a/backend/danswer/server/documents/connector.py b/backend/danswer/server/documents/connector.py index 9b9da834e05..cdeb4ed16c6 100644 --- a/backend/danswer/server/documents/connector.py +++ b/backend/danswer/server/documents/connector.py @@ -17,9 +17,9 @@ from danswer.auth.users import current_curator_or_admin_user from danswer.auth.users import current_user from danswer.background.celery.celery_utils import get_deletion_attempt_snapshot -from danswer.background.celery.tasks.indexing.tasks import try_creating_indexing_task from danswer.background.celery.versioned_apps.primary import app as primary_app from danswer.configs.app_configs import ENABLED_CONNECTOR_TYPES +from danswer.configs.constants import DanswerCeleryPriority from danswer.configs.constants import DocumentSource from danswer.configs.constants import FileOrigin from danswer.connectors.google_utils.google_auth import ( @@ -59,6 +59,7 @@ from danswer.db.connector import fetch_connector_by_id from danswer.db.connector import fetch_connectors from danswer.db.connector import get_connector_credential_ids +from danswer.db.connector import mark_ccpair_with_indexing_trigger from danswer.db.connector import update_connector from danswer.db.connector_credential_pair import add_credential_to_connector from danswer.db.connector_credential_pair import get_cc_pair_groups_for_ids @@ -74,6 +75,7 @@ from danswer.db.engine import get_current_tenant_id from danswer.db.engine import get_session from danswer.db.enums import AccessType +from danswer.db.enums import IndexingMode from danswer.db.index_attempt import get_index_attempts_for_cc_pair from danswer.db.index_attempt import get_latest_index_attempt_for_cc_pair_id from danswer.db.index_attempt import get_latest_index_attempts @@ -86,7 +88,6 @@ from danswer.file_store.file_store import get_default_file_store from danswer.key_value_store.interface import KvKeyNotFoundError from danswer.redis.redis_connector import RedisConnector -from danswer.redis.redis_pool import get_redis_client from danswer.server.documents.models import AuthStatus from danswer.server.documents.models import AuthUrl from danswer.server.documents.models import ConnectorCredentialPairIdentifier @@ -792,12 +793,10 @@ def connector_run_once( _: User = Depends(current_curator_or_admin_user), db_session: Session = Depends(get_session), tenant_id: str = Depends(get_current_tenant_id), -) -> StatusResponse[list[int]]: +) -> StatusResponse[int]: """Used to trigger indexing on a set of cc_pairs associated with a single connector.""" - r = get_redis_client(tenant_id=tenant_id) - connector_id = run_info.connector_id specified_credential_ids = run_info.credential_ids @@ -843,54 +842,41 @@ def connector_run_once( ) ] - search_settings = get_current_search_settings(db_session) - connector_credential_pairs = [ get_connector_credential_pair(connector_id, credential_id, db_session) for credential_id in credential_ids if credential_id not in skipped_credentials ] - index_attempt_ids = [] + num_triggers = 0 for cc_pair in connector_credential_pairs: if cc_pair is not None: - attempt_id = try_creating_indexing_task( - primary_app, - cc_pair, - search_settings, - run_info.from_beginning, - db_session, - r, - tenant_id, + indexing_mode = IndexingMode.UPDATE + if run_info.from_beginning: + indexing_mode = IndexingMode.REINDEX + + mark_ccpair_with_indexing_trigger(cc_pair.id, indexing_mode, db_session) + num_triggers += 1 + + logger.info( + f"connector_run_once - marking cc_pair with indexing trigger: " + f"connector={run_info.connector_id} " + f"cc_pair={cc_pair.id} " + f"indexing_trigger={indexing_mode}" ) - if attempt_id: - logger.info( - f"connector_run_once - try_creating_indexing_task succeeded: " - f"connector={run_info.connector_id} " - f"cc_pair={cc_pair.id} " - f"attempt={attempt_id} " - ) - index_attempt_ids.append(attempt_id) - else: - logger.info( - f"connector_run_once - try_creating_indexing_task failed: " - f"connector={run_info.connector_id} " - f"cc_pair={cc_pair.id}" - ) - if not index_attempt_ids: - msg = "No new indexing attempts created, indexing jobs are queued or running." - logger.info(msg) - raise HTTPException( - status_code=400, - detail=msg, - ) + # run the beat task to pick up the triggers immediately + primary_app.send_task( + "check_for_indexing", + priority=DanswerCeleryPriority.HIGH, + kwargs={"tenant_id": tenant_id}, + ) - msg = f"Successfully created {len(index_attempt_ids)} index attempts. {index_attempt_ids}" + msg = f"Marked {num_triggers} index attempts with indexing triggers." return StatusResponse( success=True, message=msg, - data=index_attempt_ids, + data=num_triggers, ) From e59acb207715dbc56c14093f2086e2ff7123f174 Mon Sep 17 00:00:00 2001 From: Richard Kuo Date: Wed, 27 Nov 2024 22:52:31 -0800 Subject: [PATCH 2/3] remove unused import --- backend/danswer/main.py | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/danswer/main.py b/backend/danswer/main.py index f72f3addec9..3fd7072bb9a 100644 --- a/backend/danswer/main.py +++ b/backend/danswer/main.py @@ -44,7 +44,6 @@ from danswer.configs.constants import POSTGRES_WEB_APP_NAME from danswer.db.engine import SqlEngine from danswer.db.engine import warm_up_connections -from danswer.server.api_key.api import router as api_key_router from danswer.server.auth_check import check_router_auth from danswer.server.danswer_api.ingestion import router as danswer_api_router from danswer.server.documents.cc_pair import router as cc_pair_router From 33eadbc0a2bb4b391077857737f312fd602e8dbc Mon Sep 17 00:00:00 2001 From: Richard Kuo Date: Wed, 27 Nov 2024 23:03:42 -0800 Subject: [PATCH 3/3] backport alembic revision --- ...1b118_add_web_ui_option_to_slack_config.py | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 backend/alembic/versions/93560ba1b118_add_web_ui_option_to_slack_config.py diff --git a/backend/alembic/versions/93560ba1b118_add_web_ui_option_to_slack_config.py b/backend/alembic/versions/93560ba1b118_add_web_ui_option_to_slack_config.py new file mode 100644 index 00000000000..ab084aee314 --- /dev/null +++ b/backend/alembic/versions/93560ba1b118_add_web_ui_option_to_slack_config.py @@ -0,0 +1,35 @@ +"""add web ui option to slack config + +Revision ID: 93560ba1b118 +Revises: 6d562f86c78b +Create Date: 2024-11-24 06:36:17.490612 + +""" +from alembic import op + +# revision identifiers, used by Alembic. +revision = "93560ba1b118" +down_revision = "6d562f86c78b" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # Add show_continue_in_web_ui with default False to all existing channel_configs + op.execute( + """ + UPDATE slack_channel_config + SET channel_config = channel_config || '{"show_continue_in_web_ui": false}'::jsonb + WHERE NOT channel_config ? 'show_continue_in_web_ui' + """ + ) + + +def downgrade() -> None: + # Remove show_continue_in_web_ui from all channel_configs + op.execute( + """ + UPDATE slack_channel_config + SET channel_config = channel_config - 'show_continue_in_web_ui' + """ + )