Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core[minor],langchain[patch]: Move base indexing interface and logic to core #20667

Merged
merged 15 commits into from
Apr 24, 2024
Merged
15 changes: 15 additions & 0 deletions libs/core/langchain_core/indexing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""Code to help indexing data into a vectorstore.

This package contains helper logic to help deal with indexing data into
a vectorstore while avoiding duplicated content and over-writing content
if it's unchanged.
"""
from langchain_core.indexing.api import IndexingResult, aindex, index
from langchain_core.indexing.base import RecordManager

__all__ = [
"aindex",
"index",
"IndexingResult",
"RecordManager",
]
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
cast,
)

from langchain_community.document_loaders.base import BaseLoader
from langchain_core.document_loaders.base import BaseLoader
from langchain_core.documents import Document
from langchain_core.indexing.base import RecordManager
from langchain_core.pydantic_v1 import root_validator
from langchain_core.vectorstores import VectorStore

from langchain.indexes.base import NAMESPACE_UUID, RecordManager
NAMESPACE_UUID = uuid.UUID(int=1984)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i know this is just a migration but a comment of what this is doing would be great



T = TypeVar("T")

Expand Down Expand Up @@ -227,7 +229,8 @@ def index(
Clean up is done continuously during indexing helping
to minimize the probability of users seeing duplicated
content.
- Full: Delete all documents that haven to been returned by the loader.
- Full: Delete all documents that have not been returned by the loader
during this run of indexing.
Clean up runs after all documents have been indexed.
This means that users may see duplicated content during indexing.
- None: Do not delete any documents.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
from __future__ import annotations

import uuid
from abc import ABC, abstractmethod
from typing import List, Optional, Sequence

NAMESPACE_UUID = uuid.UUID(int=1984)


class RecordManager(ABC):
"""An abstract base class representing the interface for a record manager."""
Expand Down Expand Up @@ -64,8 +61,16 @@ def update(
Args:
keys: A list of record keys to upsert.
group_ids: A list of group IDs corresponding to the keys.
time_at_least: if provided, updates should only happen if the
updated_at field is at least this time.
time_at_least: Optional timestamp. Implementation can use this
to optionally verify that the timestamp IS at least this time
in the system that stores the data.

e.g., use to validate that the time in the postgres database
is equal to or larger than the given timestamp, if not
raise an error.

This is meant to help prevent time-drift issues since
time may not be monotonically increasing!

Raises:
ValueError: If the length of keys doesn't match the length of group_ids.
Expand All @@ -84,8 +89,16 @@ async def aupdate(
Args:
keys: A list of record keys to upsert.
group_ids: A list of group IDs corresponding to the keys.
time_at_least: if provided, updates should only happen if the
updated_at field is at least this time.
time_at_least: Optional timestamp. Implementation can use this
to optionally verify that the timestamp IS at least this time
in the system that stores the data.

e.g., use to validate that the time in the postgres database
is equal to or larger than the given timestamp, if not
raise an error.

This is meant to help prevent time-drift issues since
time may not be monotonically increasing!

Raises:
ValueError: If the length of keys doesn't match the length of group_ids.
Expand Down
Empty file.
105 changes: 105 additions & 0 deletions libs/core/tests/unit_tests/indexing/in_memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import time
from typing import Dict, List, Optional, Sequence, TypedDict

from langchain_core.indexing.base import RecordManager


class _Record(TypedDict):
group_id: Optional[str]
updated_at: float


class InMemoryRecordManager(RecordManager):
"""An in-memory record manager for testing purposes."""

def __init__(self, namespace: str) -> None:
super().__init__(namespace)
# Each key points to a dictionary
# of {'group_id': group_id, 'updated_at': timestamp}
self.records: Dict[str, _Record] = {}
self.namespace = namespace

def create_schema(self) -> None:
"""In-memory schema creation is simply ensuring the structure is initialized."""

async def acreate_schema(self) -> None:
"""In-memory schema creation is simply ensuring the structure is initialized."""

def get_time(self) -> float:
"""Get the current server time as a high resolution timestamp!"""
return time.time()

async def aget_time(self) -> float:
"""Get the current server time as a high resolution timestamp!"""
return self.get_time()

def update(
self,
keys: Sequence[str],
*,
group_ids: Optional[Sequence[Optional[str]]] = None,
time_at_least: Optional[float] = None,
) -> None:
if group_ids and len(keys) != len(group_ids):
raise ValueError("Length of keys must match length of group_ids")
for index, key in enumerate(keys):
group_id = group_ids[index] if group_ids else None
if time_at_least and time_at_least > self.get_time():
raise ValueError("time_at_least must be in the past")
self.records[key] = {"group_id": group_id, "updated_at": self.get_time()}

async def aupdate(
self,
keys: Sequence[str],
*,
group_ids: Optional[Sequence[Optional[str]]] = None,
time_at_least: Optional[float] = None,
) -> None:
self.update(keys, group_ids=group_ids, time_at_least=time_at_least)

def exists(self, keys: Sequence[str]) -> List[bool]:
return [key in self.records for key in keys]

async def aexists(self, keys: Sequence[str]) -> List[bool]:
return self.exists(keys)

def list_keys(
self,
*,
before: Optional[float] = None,
after: Optional[float] = None,
group_ids: Optional[Sequence[str]] = None,
limit: Optional[int] = None,
) -> List[str]:
result = []
for key, data in self.records.items():
if before and data["updated_at"] >= before:
continue
if after and data["updated_at"] <= after:
continue
if group_ids and data["group_id"] not in group_ids:
continue
result.append(key)
if limit:
return result[:limit]
return result

async def alist_keys(
self,
*,
before: Optional[float] = None,
after: Optional[float] = None,
group_ids: Optional[Sequence[str]] = None,
limit: Optional[int] = None,
) -> List[str]:
return self.list_keys(
before=before, after=after, group_ids=group_ids, limit=limit
)

def delete_keys(self, keys: Sequence[str]) -> None:
for key in keys:
if key in self.records:
del self.records[key]

async def adelete_keys(self, keys: Sequence[str]) -> None:
self.delete_keys(keys)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be worth keeping some form of the langchain ones here just to confirm we don't break anything for existing users for now (maybe with a note at top to remove test file for 0.2)

Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pytest
from langchain_core.documents import Document

from langchain.indexes._api import _HashedDocument
from langchain_core.documents import Document
from langchain_core.indexing.api import _HashedDocument


def test_hashed_document_hashing() -> None:
Expand Down
Loading
Loading