Skip to content

Commit

Permalink
Merge pull request #3285 from danswer-ai/hotfix/v0.15-manual-indexing
Browse files Browse the repository at this point in the history
Merge hotfix/v0.15-manual-indexing into release/v0.15
  • Loading branch information
rkuo-danswer authored Nov 28, 2024
2 parents aa05c71 + 33eadbc commit 2b11ddf
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""add web ui option to slack config
Revision ID: 93560ba1b118
Revises: 6d562f86c78b
Create Date: 2024-11-24 06:36:17.490612
"""
from alembic import op

# revision identifiers, used by Alembic.
revision = "93560ba1b118"
down_revision = "6d562f86c78b"
branch_labels = None
depends_on = None


def upgrade() -> None:
# Add show_continue_in_web_ui with default False to all existing channel_configs
op.execute(
"""
UPDATE slack_channel_config
SET channel_config = channel_config || '{"show_continue_in_web_ui": false}'::jsonb
WHERE NOT channel_config ? 'show_continue_in_web_ui'
"""
)


def downgrade() -> None:
# Remove show_continue_in_web_ui from all channel_configs
op.execute(
"""
UPDATE slack_channel_config
SET channel_config = channel_config - 'show_continue_in_web_ui'
"""
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""add indexing trigger to cc_pair
Revision ID: abe7378b8217
Revises: 6d562f86c78b
Create Date: 2024-11-26 19:09:53.481171
"""
from alembic import op
import sqlalchemy as sa

# revision identifiers, used by Alembic.
revision = "abe7378b8217"
down_revision = "93560ba1b118"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.add_column(
"connector_credential_pair",
sa.Column(
"indexing_trigger",
sa.Enum("UPDATE", "REINDEX", name="indexingmode", native_enum=False),
nullable=True,
),
)


def downgrade() -> None:
op.drop_column("connector_credential_pair", "indexing_trigger")
54 changes: 44 additions & 10 deletions backend/danswer/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
from danswer.configs.constants import DanswerCeleryQueues
from danswer.configs.constants import DanswerRedisLocks
from danswer.configs.constants import DocumentSource
from danswer.db.connector import mark_ccpair_with_indexing_trigger
from danswer.db.connector_credential_pair import fetch_connector_credential_pairs
from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id
from danswer.db.engine import get_db_current_time
from danswer.db.engine import get_session_with_tenant
from danswer.db.enums import ConnectorCredentialPairStatus
from danswer.db.enums import IndexingMode
from danswer.db.enums import IndexingStatus
from danswer.db.enums import IndexModelStatus
from danswer.db.index_attempt import create_index_attempt
Expand Down Expand Up @@ -159,7 +161,7 @@ def get_unfenced_index_attempt_ids(db_session: Session, r: redis.Redis) -> list[
)
def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
tasks_created = 0

locked = False
r = get_redis_client(tenant_id=tenant_id)

lock_beat: RedisLock = r.lock(
Expand All @@ -172,6 +174,8 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
if not lock_beat.acquire(blocking=False):
return None

locked = True

# check for search settings swap
with get_session_with_tenant(tenant_id=tenant_id) as db_session:
old_search_settings = check_index_swap(db_session=db_session)
Expand Down Expand Up @@ -231,22 +235,46 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
last_attempt = get_last_attempt_for_cc_pair(
cc_pair.id, search_settings_instance.id, db_session
)

search_settings_primary = False
if search_settings_instance.id == primary_search_settings.id:
search_settings_primary = True

if not _should_index(
cc_pair=cc_pair,
last_index=last_attempt,
search_settings_instance=search_settings_instance,
search_settings_primary=search_settings_primary,
secondary_index_building=len(search_settings) > 1,
db_session=db_session,
):
continue

reindex = False
if search_settings_instance.id == primary_search_settings.id:
# the indexing trigger is only checked and cleared with the primary search settings
if cc_pair.indexing_trigger is not None:
if cc_pair.indexing_trigger == IndexingMode.REINDEX:
reindex = True

task_logger.info(
f"Connector indexing manual trigger detected: "
f"cc_pair={cc_pair.id} "
f"search_settings={search_settings_instance.id} "
f"indexing_mode={cc_pair.indexing_trigger}"
)

mark_ccpair_with_indexing_trigger(
cc_pair.id, None, db_session
)

# using a task queue and only allowing one task per cc_pair/search_setting
# prevents us from starving out certain attempts
attempt_id = try_creating_indexing_task(
self.app,
cc_pair,
search_settings_instance,
False,
reindex,
db_session,
r,
tenant_id,
Expand Down Expand Up @@ -281,21 +309,21 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
mark_attempt_failed(
attempt.id, db_session, failure_reason=failure_reason
)

except SoftTimeLimitExceeded:
task_logger.info(
"Soft time limit exceeded, task is being terminated gracefully."
)
except Exception:
task_logger.exception(f"Unexpected exception: tenant={tenant_id}")
finally:
if lock_beat.owned():
lock_beat.release()
else:
task_logger.error(
"check_for_indexing - Lock not owned on completion: "
f"tenant={tenant_id}"
)
if locked:
if lock_beat.owned():
lock_beat.release()
else:
task_logger.error(
"check_for_indexing - Lock not owned on completion: "
f"tenant={tenant_id}"
)

return tasks_created

Expand All @@ -304,6 +332,7 @@ def _should_index(
cc_pair: ConnectorCredentialPair,
last_index: IndexAttempt | None,
search_settings_instance: SearchSettings,
search_settings_primary: bool,
secondary_index_building: bool,
db_session: Session,
) -> bool:
Expand Down Expand Up @@ -368,6 +397,11 @@ def _should_index(
):
return False

if search_settings_primary:
if cc_pair.indexing_trigger is not None:
# if a manual indexing trigger is on the cc pair, honor it for primary search settings
return True

# if no attempt has ever occurred, we should index regardless of refresh_freq
if not last_index:
return True
Expand Down
4 changes: 3 additions & 1 deletion backend/danswer/background/celery/versioned_apps/beat.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Factory stub for running celery worker / celery beat."""
from celery import Celery

from danswer.background.celery.apps.beat import celery_app
from danswer.utils.variable_functionality import set_is_ee_based_on_env_variable

set_is_ee_based_on_env_variable()
app = celery_app
app: Celery = celery_app
4 changes: 3 additions & 1 deletion backend/danswer/background/celery/versioned_apps/primary.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""Factory stub for running celery worker / celery beat."""
from celery import Celery

from danswer.utils.variable_functionality import fetch_versioned_implementation
from danswer.utils.variable_functionality import set_is_ee_based_on_env_variable

set_is_ee_based_on_env_variable()
app = fetch_versioned_implementation(
app: Celery = fetch_versioned_implementation(
"danswer.background.celery.apps.primary", "celery_app"
)
23 changes: 23 additions & 0 deletions backend/danswer/db/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from danswer.configs.app_configs import DEFAULT_PRUNING_FREQ
from danswer.configs.constants import DocumentSource
from danswer.connectors.models import InputType
from danswer.db.enums import IndexingMode
from danswer.db.models import Connector
from danswer.db.models import ConnectorCredentialPair
from danswer.db.models import IndexAttempt
Expand Down Expand Up @@ -311,3 +312,25 @@ def mark_cc_pair_as_external_group_synced(db_session: Session, cc_pair_id: int)
# If this changes, we need to update this function.
cc_pair.last_time_external_group_sync = datetime.now(timezone.utc)
db_session.commit()


def mark_ccpair_with_indexing_trigger(
cc_pair_id: int, indexing_mode: IndexingMode | None, db_session: Session
) -> None:
"""indexing_mode sets a field which will be picked up by a background task
to trigger indexing. Set to None to disable the trigger."""
try:
cc_pair = db_session.execute(
select(ConnectorCredentialPair)
.where(ConnectorCredentialPair.id == cc_pair_id)
.with_for_update()
).scalar_one()

if cc_pair is None:
raise ValueError(f"No cc_pair with ID: {cc_pair_id}")

cc_pair.indexing_trigger = indexing_mode
db_session.commit()
except Exception:
db_session.rollback()
raise
5 changes: 5 additions & 0 deletions backend/danswer/db/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ def is_terminal(self) -> bool:
return self in terminal_states


class IndexingMode(str, PyEnum):
UPDATE = "update"
REINDEX = "reindex"


# these may differ in the future, which is why we're okay with this duplication
class DeletionStatus(str, PyEnum):
NOT_STARTED = "not_started"
Expand Down
6 changes: 5 additions & 1 deletion backend/danswer/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from danswer.configs.constants import DocumentSource
from danswer.configs.constants import FileOrigin
from danswer.configs.constants import MessageType
from danswer.db.enums import AccessType
from danswer.db.enums import AccessType, IndexingMode
from danswer.configs.constants import NotificationType
from danswer.configs.constants import SearchFeedbackType
from danswer.configs.constants import TokenRateLimitScope
Expand Down Expand Up @@ -438,6 +438,10 @@ class ConnectorCredentialPair(Base):

total_docs_indexed: Mapped[int] = mapped_column(Integer, default=0)

indexing_trigger: Mapped[IndexingMode | None] = mapped_column(
Enum(IndexingMode, native_enum=False), nullable=True
)

connector: Mapped["Connector"] = relationship(
"Connector", back_populates="credentials"
)
Expand Down
Loading

0 comments on commit 2b11ddf

Please sign in to comment.