Skip to content

Commit

Permalink
update docs / collections
Browse files Browse the repository at this point in the history
  • Loading branch information
emrgnt-cmplxty committed Dec 2, 2024
1 parent b14cdbd commit 6ca9fa3
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 37 deletions.
9 changes: 8 additions & 1 deletion py/core/base/providers/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -1281,7 +1281,14 @@ async def search_documents(
async def delete(
self, filters: dict[str, Any]
) -> dict[str, dict[str, str]]:
return await self.vector_handler.delete(filters)
result = await self.vector_handler.delete(filters)
await self.graph_handler.entities.delete(
parent_id=filters["document_id"]["$eq"]
)
await self.graph_handler.relationships.delete(
parent_id=filters["document_id"]["$eq"]
)
return result

async def assign_document_to_collection_vector(
self,
Expand Down
30 changes: 22 additions & 8 deletions py/core/main/api/v3/graph_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from fastapi import Body, Depends, Path, Query

from core.base import R2RException, RunType
from core.base import KGEnrichmentStatus, R2RException, RunType
from core.base.abstractions import KGRunType
from core.base.api.models import (
GenericBooleanResponse,
Expand Down Expand Up @@ -1646,6 +1646,10 @@ async def pull(
collection_id: UUID = Path(
..., description="The ID of the graph to initialize."
),
force: Optional[bool] = Body(
False,
description="If true, forces a re-pull of all entities and relationships.",
),
# document_ids: list[UUID] = Body(
# ..., description="List of document IDs to add to the graph."
# ),
Expand Down Expand Up @@ -1736,24 +1740,34 @@ async def pull(
)
continue
if len(entities[0]) == 0:
logger.warning(
f"Document {document.id} has no entities, extraction may not have been called, skipping."
)
continue
if not force:
logger.warning(
f"Document {document.id} has no entities, extraction may not have been called, skipping."
)
continue
else:
logger.warning(
f"Document {document.id} has no entities, but force=True, continuing."
)

success = (
await self.providers.database.graph_handler.add_documents(
id=collection_id,
document_ids=[
document.id
], # [doc.id for doc in documents]
document_ids=[document.id],
)
)
if not success:
logger.warning(
f"No documents were added to graph {collection_id}, marking as failed."
)

if success:
await self.providers.database.set_workflow_status(
id=collection_id,
status_type="graph_sync_status",
status=KGEnrichmentStatus.SUCCESS,
)

return GenericBooleanResponse(success=success) # type: ignore

@self.router.delete(
Expand Down
42 changes: 41 additions & 1 deletion py/core/main/orchestration/hatchet/ingestion_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from core.base import (
DocumentChunk,
IngestionStatus,
KGEnrichmentStatus,
OrchestrationProvider,
generate_extraction_id,
increment_version,
Expand Down Expand Up @@ -179,6 +180,16 @@ async def parse(self, context: Context) -> dict:
document_id=document_info.id,
collection_id=collection_id,
)
await service.providers.database.set_workflow_status(
id=collection_id,
status_type="graph_sync_status",
status=KGEnrichmentStatus.OUTDATED,
)
await service.providers.database.set_workflow_status(
id=collection_id,
status_type="graph_cluster_status", # NOTE - we should actually check that cluster has been made first, if not it should be PENDING still
status=KGEnrichmentStatus.OUTDATED,
)
else:
for collection_id in collection_ids:
try:
Expand Down Expand Up @@ -218,7 +229,16 @@ async def parse(self, context: Context) -> dict:
document_id=document_info.id,
collection_id=collection_id,
)

await service.providers.database.set_workflow_status(
id=collection_id,
status_type="graph_sync_status",
status=KGEnrichmentStatus.OUTDATED,
)
await service.providers.database.set_workflow_status(
id=collection_id,
status_type="graph_cluster_status", # NOTE - we should actually check that cluster has been made first, if not it should be PENDING still
status=KGEnrichmentStatus.OUTDATED,
)
# get server chunk enrichment settings and override parts of it if provided in the ingestion config
server_chunk_enrichment_settings = getattr(
service.providers.ingestion.config,
Expand Down Expand Up @@ -525,6 +545,16 @@ async def finalize(self, context: Context) -> dict:
document_id=document_info.id,
collection_id=collection_id,
)
await service.providers.database.set_workflow_status(
id=collection_id,
status_type="graph_sync_status",
status=KGEnrichmentStatus.OUTDATED,
)
await service.providers.database.set_workflow_status(
id=collection_id,
status_type="graph_cluster_status", # NOTE - we should actually check that cluster has been made first, if not it should be PENDING still
status=KGEnrichmentStatus.OUTDATED,
)
else:
for collection_id in collection_ids:
try:
Expand Down Expand Up @@ -556,6 +586,16 @@ async def finalize(self, context: Context) -> dict:
document_id=document_info.id,
collection_id=collection_id,
)
await service.providers.database.set_workflow_status(
id=collection_id,
status_type="graph_sync_status",
status=KGEnrichmentStatus.OUTDATED,
)
await service.providers.database.set_workflow_status(
id=collection_id,
status_type="graph_cluster_status",
status=KGEnrichmentStatus.OUTDATED, # NOTE - we should actually check that cluster has been made first, if not it should be PENDING still
)
except Exception as e:
logger.error(
f"Error during assigning document to collection: {str(e)}"
Expand Down
50 changes: 49 additions & 1 deletion py/core/main/orchestration/simple/ingestion_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
from fastapi import HTTPException
from litellm import AuthenticationError

from core.base import DocumentChunk, R2RException, increment_version
from core.base import (
DocumentChunk,
KGEnrichmentStatus,
R2RException,
increment_version,
)
from core.utils import (
generate_default_user_collection_id,
generate_extraction_id,
Expand Down Expand Up @@ -89,6 +94,16 @@ async def ingest_files(input_data):
document_id=document_info.id,
collection_id=collection_id,
)
await service.providers.database.set_workflow_status(
id=collection_id,
status_type="graph_sync_status",
status=KGEnrichmentStatus.OUTDATED,
)
await service.providers.database.set_workflow_status(
id=collection_id,
status_type="graph_cluster_status",
status=KGEnrichmentStatus.OUTDATED, # NOTE - we should actually check that cluster has been made first, if not it should be PENDING still
)
else:
print("collection_ids = ", collection_ids)

Expand Down Expand Up @@ -134,6 +149,17 @@ async def ingest_files(input_data):
document_id=document_info.id,
collection_id=collection_id,
)
await service.providers.database.set_workflow_status(
id=collection_id,
status_type="graph_sync_status",
status=KGEnrichmentStatus.OUTDATED,
)
await service.providers.database.set_workflow_status(
id=collection_id,
status_type="graph_cluster_status",
status=KGEnrichmentStatus.OUTDATED, # NOTE - we should actually check that cluster has been made first, if not it should be PENDING still
)

except Exception as e:
logger.error(
f"Error during assigning document to collection: {str(e)}"
Expand Down Expand Up @@ -307,6 +333,17 @@ async def ingest_chunks(input_data):
document_id=document_info.id,
collection_id=collection_id,
)
await service.providers.database.set_workflow_status(
id=collection_id,
status_type="graph_sync_status",
status=KGEnrichmentStatus.OUTDATED,
)
await service.providers.database.set_workflow_status(
id=collection_id,
status_type="graph_cluster_status",
status=KGEnrichmentStatus.OUTDATED, # NOTE - we should actually check that cluster has been made first, if not it should be PENDING still
)

else:
print("collection_ids = ", collection_ids)
for collection_id in collection_ids:
Expand Down Expand Up @@ -344,6 +381,17 @@ async def ingest_chunks(input_data):
document_id=document_info.id,
collection_id=collection_id,
)
await service.providers.database.set_workflow_status(
id=collection_id,
status_type="graph_sync_status",
status=KGEnrichmentStatus.OUTDATED,
)
await service.providers.database.set_workflow_status(
id=collection_id,
status_type="graph_cluster_status",
status=KGEnrichmentStatus.OUTDATED, # NOTE - we should actually check that cluster has been made first, if not it should be PENDING still
)

except Exception as e:
logger.error(
f"Error during assigning document to collection: {str(e)}"
Expand Down
2 changes: 1 addition & 1 deletion py/core/main/orchestration/simple/kg_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ async def enrich_graph(input_data):
print("workflow_status = ", workflow_status)
if workflow_status == KGEnrichmentStatus.SUCCESS:
raise R2RException(
"Communities have already been built for this collection. To build communities again, first submit a POST request to `graphs/{collection_id}/reset`.",
"Communities have already been built for this collection. To build communities again, first submit a POST request to `graphs/{collection_id}/reset` to erase the previously built communities.",
400,
)

Expand Down
73 changes: 51 additions & 22 deletions py/core/main/services/management_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
AnalysisTypes,
CollectionResponse,
DocumentResponse,
KGEnrichmentStatus,
LogFilterCriteria,
LogProcessor,
Message,
Expand Down Expand Up @@ -355,29 +356,46 @@ def process_filter(filter_dict: dict[str, Any]):
)

for document_id in document_ids_to_purge:
remaining_chunks = await self.providers.database.list_document_chunks( # FIXME: This was using the pagination defaults from before... We need to review if this is as intended.
document_id=document_id,
offset=0,
limit=1000,
)
if remaining_chunks["total_entries"] == 0:
try:
await self.providers.database.delete_from_documents_overview(
document_id
)
logger.info(
f"Deleted document ID {document_id} from documents_overview."
)
except Exception as e:
logger.error(
f"Error deleting document ID {document_id} from documents_overview: {e}"
)
await self.providers.database.graph_handler.entities.delete(
parent_id=document_id, store_type="document"
)
await self.providers.database.graph_handler.relationships.delete(
parent_id=document_id, store_type="document"
# remaining_chunks = await self.providers.database.list_document_chunks( # FIXME: This was using the pagination defaults from before... We need to review if this is as intended.
# document_id=document_id,
# offset=0,
# limit=1000,
# )
# if remaining_chunks["total_entries"] == 0:
# try:
# await self.providers.database.delete_from_documents_overview(
# document_id
# )
# logger.info(
# f"Deleted document ID {document_id} from documents_overview."
# )
# except Exception as e:
# logger.error(
# f"Error deleting document ID {document_id} from documents_overview: {e}"
# )
# await self.providers.database.graph_handler.entities.delete(
# parent_id=document_id, store_type="document"
# )
# await self.providers.database.graph_handler.relationships.delete(
# parent_id=document_id, store_type="document"
# )
collections = (
await self.providers.database.get_collections_overview(
offset=0, limit=1000, filter_document_ids=[document_id]
)
)
# TODO - Loop over all collections
for collection in collections["results"]:
await self.providers.database.set_workflow_status(
id=collection.id,
status_type="graph_sync_status",
status=KGEnrichmentStatus.OUTDATED,
)
await self.providers.database.set_workflow_status(
id=collection.id,
status_type="graph_cluster_status",
status=KGEnrichmentStatus.OUTDATED,
)

return None

Expand Down Expand Up @@ -435,6 +453,17 @@ async def assign_document_to_collection(
await self.providers.database.assign_document_to_collection_relational(
document_id, collection_id
)
await self.providers.database.set_workflow_status(
id=collection_id,
status_type="graph_sync_status",
status=KGEnrichmentStatus.OUTDATED,
)
await self.providers.database.set_workflow_status(
id=collection_id,
status_type="graph_cluster_status",
status=KGEnrichmentStatus.OUTDATED,
)

return {"message": "Document assigned to collection successfully"}

@telemetry_event("RemoveDocumentFromCollection")
Expand Down
Loading

0 comments on commit 6ca9fa3

Please sign in to comment.