Skip to content

Commit

Permalink
up (#1557)
Browse files Browse the repository at this point in the history
* up

* bump package
  • Loading branch information
emrgnt-cmplxty authored Nov 4, 2024
1 parent 40233cc commit d50d55e
Show file tree
Hide file tree
Showing 11 changed files with 229 additions and 2 deletions.
2 changes: 1 addition & 1 deletion js/sdk/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "r2r-js",
"version": "0.3.13",
"version": "0.3.14",
"description": "",
"main": "dist/index.js",
"browser": "dist/index.browser.js",
Expand Down
17 changes: 17 additions & 0 deletions js/sdk/src/r2rClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,23 @@ export class r2rClient {
});
}

/**
* Update the metadata of an existing document.
* @param documentId The ID of the document to update.
* @param metadata The new metadata to merge with existing metadata.
* @returns A promise that resolves to the response from the server.
*/
@feature("updateDocumentMetadata")
async updateDocumentMetadata(
documentId: string,
metadata: Record<string, any>
): Promise<Record<string, any>> {
this._ensureAuthenticated();
return await this._makeRequest("POST", `update_document_metadata/${documentId}`, {
data: metadata,
});
}

@feature("ingestChunks")
async ingestChunks(
chunks: RawChunk[],
Expand Down
2 changes: 2 additions & 0 deletions py/core/base/api/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
WrappedDeleteVectorIndexResponse,
WrappedIngestionResponse,
WrappedListVectorIndicesResponse,
WrappedMetadataUpdateResponse,
WrappedSelectVectorIndexResponse,
WrappedUpdateResponse,
)
Expand Down Expand Up @@ -89,6 +90,7 @@
"IngestionResponse",
"WrappedIngestionResponse",
"WrappedUpdateResponse",
"WrappedMetadataUpdateResponse",
"CreateVectorIndexResponse",
"WrappedCreateVectorIndexResponse",
"WrappedListVectorIndicesResponse",
Expand Down
55 changes: 55 additions & 0 deletions py/core/main/api/ingestion_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
WrappedDeleteVectorIndexResponse,
WrappedIngestionResponse,
WrappedListVectorIndicesResponse,
WrappedMetadataUpdateResponse,
WrappedUpdateResponse,
)
from core.providers import (
Expand Down Expand Up @@ -73,6 +74,11 @@ def _register_workflows(self):
if self.orchestration_provider.config.provider != "simple"
else "Chunk update completed successfully."
),
"update-document-metadata": (
"Update document metadata task queued successfully."
if self.orchestration_provider.config.provider != "simple"
else "Document metadata update completed successfully."
),
"create-vector-index": (
"Vector index creation task queued successfully."
if self.orchestration_provider.config.provider != "simple"
Expand Down Expand Up @@ -434,6 +440,55 @@ async def ingest_chunks_app(
}
]

@self.router.post(
"/update_document_metadata/{document_id}",
)
@self.base_endpoint
async def update_document_metadata_app(
document_id: UUID = Path(
..., description="The document ID of the document to update"
),
metadata: dict = Body(
...,
description="The new metadata to merge with existing metadata",
),
auth_user=Depends(self.service.providers.auth.auth_wrapper),
) -> WrappedMetadataUpdateResponse:
"""
Updates the metadata of a previously ingested document and its associated chunks.
A valid user authentication token is required to access this endpoint, as regular users can only update their own documents.
"""

try:
workflow_input = {
"document_id": str(document_id),
"metadata": metadata,
"user": auth_user.model_dump_json(),
}

logger.info(
"Running document metadata update without orchestration."
)
from core.main.orchestration import simple_ingestion_factory

simple_ingestor = simple_ingestion_factory(self.service)
await simple_ingestor["update-document-metadata"](
workflow_input
)

return { # type: ignore
"message": "Update metadata task completed successfully.",
"document_id": str(document_id),
"task_id": None,
}

except Exception as e:
raise R2RException(
status_code=500,
message=f"Error updating document metadata: {str(e)}",
)

@self.router.put(
"/update_chunk/{document_id}/{extraction_id}",
)
Expand Down
48 changes: 48 additions & 0 deletions py/core/main/orchestration/hatchet/ingestion_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,10 +659,57 @@ async def delete_vector_index(self, context: Context) -> dict:

return {"status": "Vector index deleted successfully."}

@orchestration_provider.workflow(
name="update-document-metadata",
timeout="30m",
)
class HatchetUpdateDocumentMetadataWorkflow:
def __init__(self, ingestion_service: IngestionService):
self.ingestion_service = ingestion_service

@orchestration_provider.step(timeout="30m")
async def update_document_metadata(self, context: Context) -> dict:
try:
input_data = context.workflow_input()["request"]
parsed_data = IngestionServiceAdapter.parse_update_document_metadata_input(
input_data
)

document_id = UUID(parsed_data["document_id"])
metadata = parsed_data["metadata"]
user = parsed_data["user"]

await self.ingestion_service.update_document_metadata(
document_id=document_id,
metadata=metadata,
user=user,
)

return {
"message": "Document metadata update completed successfully.",
"document_id": str(document_id),
"task_id": context.workflow_run_id(),
}

except Exception as e:
raise R2RException(
status_code=500,
message=f"Error during document metadata update: {str(e)}",
)

@orchestration_provider.failure()
async def on_failure(self, context: Context) -> None:
# Handle failure case if necessary
pass

# Add this to the workflows dictionary in hatchet_ingestion_factory
ingest_files_workflow = HatchetIngestFilesWorkflow(service)
update_files_workflow = HatchetUpdateFilesWorkflow(service)
ingest_chunks_workflow = HatchetIngestChunksWorkflow(service)
update_chunks_workflow = HatchetUpdateChunkWorkflow(service)
update_document_metadata_workflow = HatchetUpdateDocumentMetadataWorkflow(
service
)
create_vector_index_workflow = HatchetCreateVectorIndexWorkflow(service)
delete_vector_index_workflow = HatchetDeleteVectorIndexWorkflow(service)

Expand All @@ -671,6 +718,7 @@ async def delete_vector_index(self, context: Context) -> dict:
"update_files": update_files_workflow,
"ingest_chunks": ingest_chunks_workflow,
"update_chunk": update_chunks_workflow,
"update_document_metadata": update_document_metadata_workflow,
"create_vector_index": create_vector_index_workflow,
"delete_vector_index": delete_vector_index_workflow,
}
33 changes: 33 additions & 0 deletions py/core/main/orchestration/simple/ingestion_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,11 +379,44 @@ async def delete_vector_index(input_data):
message=f"Error during vector index deletion: {str(e)}",
)

async def update_document_metadata(input_data):
try:
from core.main import IngestionServiceAdapter

parsed_data = (
IngestionServiceAdapter.parse_update_document_metadata_input(
input_data
)
)

document_id = parsed_data["document_id"]
metadata = parsed_data["metadata"]
user = parsed_data["user"]

await service.update_document_metadata(
document_id=document_id,
metadata=metadata,
user=user,
)

return {
"message": "Document metadata update completed successfully.",
"document_id": str(document_id),
"task_id": None,
}

except Exception as e:
raise R2RException(
status_code=500,
message=f"Error during document metadata update: {str(e)}",
)

return {
"ingest-files": ingest_files,
"update-files": update_files,
"ingest-chunks": ingest_chunks,
"update-chunk": update_chunk,
"update-document-metadata": update_document_metadata,
"create-vector-index": create_vector_index,
"delete-vector-index": delete_vector_index,
}
42 changes: 42 additions & 0 deletions py/core/main/services/ingestion_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,40 @@ async def chunk_enrichment(

return len(new_vector_entries)

async def update_document_metadata(
self,
document_id: UUID,
metadata: dict,
user: UserResponse,
) -> None:
# Verify document exists and user has access
existing_document = (
await self.providers.database.get_documents_overview(
filter_document_ids=[document_id],
filter_user_ids=[user.id],
)
)

if not existing_document["results"]:
raise R2RException(
status_code=404,
message=f"Document with id {document_id} not found or you don't have access.",
)

existing_document = existing_document["results"][0]

# Merge metadata
merged_metadata = {
**existing_document.metadata, # type: ignore
**metadata,
}

# Update document metadata
existing_document.metadata = merged_metadata # type: ignore
await self.providers.database.upsert_documents_overview(
existing_document # type: ignore
)


class IngestionServiceAdapter:
@staticmethod
Expand Down Expand Up @@ -699,3 +733,11 @@ def parse_select_vector_index_input(input_data: dict) -> dict:
"index_name": input_data["index_name"],
"table_name": input_data.get("table_name"),
}

@staticmethod
def parse_update_document_metadata_input(data: dict) -> dict:
return {
"document_id": data["document_id"],
"metadata": data["metadata"],
"user": IngestionServiceAdapter._parse_user_data(data["user"]),
}
2 changes: 1 addition & 1 deletion py/r2r.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ provider = "r2r"
access_token_lifetime_in_minutes = 60
refresh_token_lifetime_in_days = 7
require_authentication = false
require_email_verification = true
require_email_verification = false
default_admin_email = "[email protected]"
default_admin_password = "change_me_immediately"

Expand Down
27 changes: 27 additions & 0 deletions py/sdk/mixins/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,3 +314,30 @@ async def delete_vector_index(
return await self._make_request( # type: ignore
"DELETE", "delete_vector_index", json=data
)

async def update_document_metadata(
self,
document_id: Union[str, UUID],
metadata: dict,
) -> dict:
"""
Update the metadata of an existing document.
Args:
document_id (Union[str, UUID]): The ID of the document to update.
metadata (dict): The new metadata to merge with existing metadata.
run_with_orchestration (Optional[bool]): Whether to run the update through orchestration.
Returns:
dict: Update results containing the status of the metadata update.
"""
data = {
"metadata": metadata,
}

# Remove None values from payload
data = {k: v for k, v in data.items() if v is not None}

return await self._make_request( # type: ignore
"POST", f"update_document_metadata/{document_id}", json=metadata
)
2 changes: 2 additions & 0 deletions py/shared/api/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from shared.api.models.ingestion.responses import (
IngestionResponse,
WrappedIngestionResponse,
WrappedMetadataUpdateResponse,
WrappedUpdateResponse,
)
from shared.api.models.kg.responses import (
Expand Down Expand Up @@ -72,6 +73,7 @@
"IngestionResponse",
"WrappedIngestionResponse",
"WrappedUpdateResponse",
"WrappedMetadataUpdateResponse",
# Restructure Responses
"KGCreationResponse",
"WrappedKGCreationResponse",
Expand Down
1 change: 1 addition & 0 deletions py/shared/api/models/ingestion/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class SelectVectorIndexResponse(BaseModel):


WrappedIngestionResponse = ResultsWrapper[list[IngestionResponse]]
WrappedMetadataUpdateResponse = ResultsWrapper[IngestionResponse]
WrappedUpdateResponse = ResultsWrapper[UpdateResponse]
WrappedCreateVectorIndexResponse = ResultsWrapper[CreateVectorIndexResponse]
WrappedListVectorIndicesResponse = ResultsWrapper[ListVectorIndicesResponse]
Expand Down

0 comments on commit d50d55e

Please sign in to comment.