Skip to content

Commit

Permalink
checkin work (#1444)
Browse files Browse the repository at this point in the history
* checkin work

* finish index functionality extension

* fix concurrency
  • Loading branch information
emrgnt-cmplxty authored Oct 22, 2024
1 parent 5dd3a9d commit a1017e7
Show file tree
Hide file tree
Showing 16 changed files with 574 additions and 157 deletions.
2 changes: 1 addition & 1 deletion docs/api-reference/openapi.json

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions py/core/base/api/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
CreateVectorIndexResponse,
IngestionResponse,
WrappedCreateVectorIndexResponse,
WrappedDeleteVectorIndexResponse,
WrappedIngestionResponse,
WrappedListVectorIndicesResponse,
WrappedSelectVectorIndexResponse,
WrappedUpdateResponse,
)
from shared.api.models.kg.responses import (
Expand Down Expand Up @@ -80,6 +83,9 @@
"WrappedUpdateResponse",
"CreateVectorIndexResponse",
"WrappedCreateVectorIndexResponse",
"WrappedListVectorIndicesResponse",
"WrappedDeleteVectorIndexResponse",
"WrappedSelectVectorIndexResponse",
# Knowledge Graph Responses
"KGCreationResponse",
"WrappedKGCreationResponse",
Expand Down
42 changes: 36 additions & 6 deletions py/core/base/providers/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,8 +496,8 @@ async def get_document_chunks(
async def create_index(
self,
table_name: Optional[VectorTableName] = None,
measure: IndexMeasure = IndexMeasure.cosine_distance,
method: IndexMethod = IndexMethod.auto,
index_measure: IndexMeasure = IndexMeasure.cosine_distance,
index_method: IndexMethod = IndexMethod.auto,
index_arguments: Optional[
Union[IndexArgsIVFFlat, IndexArgsHNSW]
] = None,
Expand All @@ -506,6 +506,21 @@ async def create_index(
) -> None:
pass

@abstractmethod
async def list_indices(
self, table_name: Optional[VectorTableName] = None
) -> list[dict]:
pass

@abstractmethod
async def delete_index(
self,
index_name: str,
table_name: Optional[VectorTableName] = None,
concurrently: bool = True,
) -> None:
pass

@abstractmethod
async def get_semantic_neighbors(
self,
Expand Down Expand Up @@ -890,8 +905,8 @@ async def get_document_chunks(
async def create_index(
self,
table_name: Optional[VectorTableName] = None,
measure: IndexMeasure = IndexMeasure.cosine_distance,
method: IndexMethod = IndexMethod.auto,
index_measure: IndexMeasure = IndexMeasure.cosine_distance,
index_method: IndexMethod = IndexMethod.auto,
index_arguments: Optional[
Union[IndexArgsIVFFlat, IndexArgsHNSW]
] = None,
Expand All @@ -900,13 +915,28 @@ async def create_index(
) -> None:
return await self.vector_handler.create_index(
table_name,
measure,
method,
index_measure,
index_method,
index_arguments,
index_name,
concurrently,
)

async def list_indices(
self, table_name: Optional[VectorTableName] = None
) -> list[dict]:
return await self.vector_handler.list_indices(table_name)

async def delete_index(
self,
index_name: str,
table_name: Optional[VectorTableName] = None,
concurrently: bool = True,
) -> None:
return await self.vector_handler.delete_index(
index_name, table_name, concurrently
)

async def get_semantic_neighbors(
self,
document_id: UUID,
Expand Down
97 changes: 97 additions & 0 deletions py/core/main/api/data/ingestion_router_openapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,100 @@ ingest_chunks:
chunks: "A list of text chunks to ingest into the system."
document_id: "An optional document id to associate the chunks with. If not provided, a unique document id will be generated."
metadata: "Optional JSON metadata to associate with the ingested chunks."

list_vector_indices:
openapi_extra:
x-codeSamples:
- lang: Python
source: |
from r2r import R2RClient
client = R2RClient("http://localhost:7272")
# when using auth, do client.login(...)
result = client.list_vector_indices(
table_name="vectors",
concurrently=False
)
- lang: Shell
source: |
curl -X GET "http://localhost:7276/v2/list_vector_indices" \
-H "Content-Type: application/json" \
-d '{
"table_name": "vectors",
"concurrently": false
}'
input_descriptions:
table_name: "The name of the table to list indices for. Options: vectors, entities_document, entities_collection, communities"
concurrently: "Whether to perform the operation concurrently"

create_vector_index:
openapi_extra:
x-codeSamples:
- lang: Python
source: |
from r2r import R2RClient
client = R2RClient("http://localhost:7272")
# when using auth, do client.login(...)
result = client.create_vector_index(
table_name="vectors",
index_method="hnsw",
index_measure="cosine_distance",
index_arguments={"m": 16, "ef_construction": 64},
concurrently=True
)
- lang: Shell
source: |
curl -X POST "http://localhost:7276/v2/create_vector_index" \
-H "Content-Type: application/json" \
-d '{
"table_name": "vectors",
"index_method": "hnsw",
"index_measure": "cosine_distance",
"index_arguments": {
"m": 16,
"ef_construction": 64
},
"concurrently": true
}'
input_descriptions:
table_name: "The table to create the index on. Default: vectors"
index_method: "The indexing method to use. Options: hnsw, ivfflat, auto. Default: hnsw"
index_measure: "Distance measure for vector comparisons. Options: cosine_distance, l2_distance, max_inner_product. Default: cosine_distance"
index_name: "Optional custom name for the index. If not provided, one will be auto-generated"
index_arguments: "Configuration parameters for the chosen index method. For HNSW: {m: int, ef_construction: int}. For IVFFlat: {n_lists: int}"
concurrently: "Whether to create the index concurrently. Default: true"

delete_vector_index:
openapi_extra:
x-codeSamples:
- lang: Python
source: |
from r2r import R2RClient
client = R2RClient("http://localhost:7272")
# when using auth, do client.login(...)
result = client.delete_vector_index(
index_name="ix_vector_cosine_ops_hnsw__20241021211541",
table_name="vectors",
concurrently=True
)
- lang: Shell
source: |
curl -X DELETE "http://localhost:7276/v2/delete_vector_index" \
-H "Content-Type: application/json" \
-d '{
"index_name": "ix_vector_cosine_ops_hnsw__20241021211541",
"table_name": "vectors",
"concurrently": true
}'
input_descriptions:
index_name: "The name of the index to delete"
table_name: "The name of the table containing the index. Default: vectors"
concurrently: "Whether to delete the index concurrently. Default: true"
119 changes: 108 additions & 11 deletions py/core/main/api/ingestion_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@
from uuid import UUID

import yaml
from fastapi import Body, Depends, File, Form, UploadFile
from fastapi import Body, Depends, File, Form, Query, UploadFile
from pydantic import Json

from core.base import R2RException, RawChunk, generate_document_id
from core.base.api.models import (
CreateVectorIndexResponse,
WrappedCreateVectorIndexResponse,
WrappedDeleteVectorIndexResponse,
WrappedIngestionResponse,
WrappedListVectorIndicesResponse,
WrappedSelectVectorIndexResponse,
WrappedUpdateResponse,
)
from core.base.providers import OrchestrationProvider, Workflow
Expand Down Expand Up @@ -66,6 +69,16 @@ def _register_workflows(self):
if self.orchestration_provider.config.provider != "simple"
else "Vector index creation task completed successfully."
),
"delete-vector-index": (
"Vector index deletion task queued successfully."
if self.orchestration_provider.config.provider != "simple"
else "Vector index deletion task completed successfully."
),
"select-vector-index": (
"Vector index selection task queued successfully."
if self.orchestration_provider.config.provider != "simple"
else "Vector index selection task completed successfully."
),
},
)

Expand Down Expand Up @@ -332,34 +345,44 @@ async def ingest_chunks_app(

return [raw_message] # type: ignore

@self.router.post("/create_vector_index")
create_vector_index_extras = self.openapi_extras.get(
"create_vector_index", {}
)
create_vector_descriptions = create_vector_index_extras.get(
"input_descriptions", {}
)

@self.router.post(
"/create_vector_index",
openapi_extra=create_vector_index_extras.get("openapi_extra"),
)
@self.base_endpoint
async def create_vector_index_app(
table_name: Optional[VectorTableName] = Body(
default=VectorTableName.RAW_CHUNKS,
description="The name of the vector table to create.",
description=create_vector_descriptions.get("table_name"),
),
index_method: IndexMethod = Body(
default=IndexMethod.hnsw,
description="The type of vector index to create. Supported values are 'hnsw' and 'ivfflat'.",
description=create_vector_descriptions.get("index_method"),
),
measure: IndexMeasure = Body(
index_measure: IndexMeasure = Body(
default=IndexMeasure.cosine_distance,
description="The distance measure corresponding to the vector index. Used for calculating the distance between vectors during search.",
description=create_vector_descriptions.get("index_measure"),
),
index_arguments: Optional[
Union[IndexArgsIVFFlat, IndexArgsHNSW]
] = Body(
None,
description="The arguments for the index method.",
description=create_vector_descriptions.get("index_arguments"),
),
index_name: Optional[str] = Body(
None,
description="The name of the index to create.",
description=create_vector_descriptions.get("index_name"),
),
concurrently: bool = Body(
default=True,
description="Whether to create the index concurrently.",
description=create_vector_descriptions.get("concurrently"),
),
auth_user=Depends(self.service.providers.auth.auth_wrapper),
) -> WrappedCreateVectorIndexResponse:
Expand All @@ -369,7 +392,7 @@ async def create_vector_index_app(
"""

logger.info(
f"Creating vector index for {table_name} with method {index_method}, measure {measure}, concurrently {concurrently}"
f"Creating vector index for {table_name} with method {index_method}, measure {index_measure}, concurrently {concurrently}"
)

raw_message = await self.orchestration_provider.run_workflow(
Expand All @@ -378,7 +401,8 @@ async def create_vector_index_app(
"request": {
"table_name": table_name,
"index_method": index_method,
"measure": measure,
"index_measure": index_measure,
"index_name": index_name,
"index_arguments": index_arguments,
"concurrently": concurrently,
},
Expand All @@ -390,6 +414,79 @@ async def create_vector_index_app(

return raw_message # type: ignore

list_vector_indices_extras = self.openapi_extras.get(
"create_vector_index", {}
)
list_vector_indices_descriptions = list_vector_indices_extras.get(
"input_descriptions", {}
)

@self.router.get(
"/list_vector_indices",
openapi_extra=list_vector_indices_extras.get("openapi_extra"),
)
@self.base_endpoint
async def list_vector_indices_app(
table_name: Optional[VectorTableName] = Query(
default=VectorTableName.RAW_CHUNKS,
description=list_vector_indices_descriptions.get("table_name"),
),
auth_user=Depends(self.service.providers.auth.auth_wrapper),
) -> WrappedListVectorIndicesResponse:
indices = await self.service.providers.database.list_indices(
table_name=table_name
)
return {"indices": indices} # type: ignore

delete_vector_index_extras = self.openapi_extras.get(
"delete_vector_index", {}
)
delete_vector_index_descriptions = delete_vector_index_extras.get(
"input_descriptions", {}
)

@self.router.delete(
"/delete_vector_index",
openapi_extra=delete_vector_index_extras.get("openapi_extra"),
)
@self.base_endpoint
async def delete_vector_index_app(
index_name: str = Body(
...,
description=delete_vector_index_descriptions.get("index_name"),
),
table_name: Optional[VectorTableName] = Body(
default=VectorTableName.RAW_CHUNKS,
description=delete_vector_index_descriptions.get("table_name"),
),
concurrently: bool = Body(
default=True,
description=delete_vector_index_descriptions.get(
"concurrently"
),
),
auth_user=Depends(self.service.providers.auth.auth_wrapper),
) -> WrappedDeleteVectorIndexResponse:
logger.info(
f"Deleting vector index {index_name} from table {table_name}"
)

raw_message = await self.orchestration_provider.run_workflow(
"delete-vector-index",
{
"request": {
"index_name": index_name,
"table_name": table_name,
"concurrently": concurrently,
},
},
options={
"additional_metadata": {},
},
)

return raw_message # type: ignore

@staticmethod
async def _process_files(files):
import base64
Expand Down
Loading

0 comments on commit a1017e7

Please sign in to comment.