Skip to content

Commit

Permalink
Error handling edge cases for bulk indexing lambda and added tests
Browse files Browse the repository at this point in the history
  • Loading branch information
anthonyhashemi committed Jan 8, 2025
1 parent 7a77862 commit e743061
Show file tree
Hide file tree
Showing 6 changed files with 485 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@
get_s3_file,
get_secret_data,
)
from ..text_extraction import add_text_content
from ..text_extraction import TextExtractionStatus, add_text_content

logger = logging.getLogger()
logger.setLevel(logging.INFO)


class ConsignmentBulkIndexError(Exception):
pass


def bulk_index_consignment_from_aws(
consignment_reference: str, secret_id: str
) -> None:
Expand Down Expand Up @@ -79,13 +83,51 @@ def bulk_index_consignment(
"""
files = _fetch_files_in_consignment(consignment_reference, database_url)
documents_to_index = _construct_documents(files, bucket_name)
bulk_index_files_in_opensearch(
documents_to_index,
open_search_host_url,
open_search_http_auth,
open_search_bulk_index_timeout,
open_search_ca_certs,
)

document_text_extraction_exceptions_message = ""
for doc in documents_to_index:
if doc["document"]["text_extraction_status"] not in [
TextExtractionStatus.SKIPPED.value,
TextExtractionStatus.SUCCEEDED.value,
]:
if document_text_extraction_exceptions_message == "":
document_text_extraction_exceptions_message += (
"Text extraction failed on the following documents:"
)
document_text_extraction_exceptions_message += f"\n{doc['file_id']}"

bulk_indexing_exception_message = ""
try:
bulk_index_files_in_opensearch(
documents_to_index,
open_search_host_url,
open_search_http_auth,
open_search_bulk_index_timeout,
open_search_ca_certs,
)
except Exception as bulk_indexing_exception:
bulk_indexing_exception_message = bulk_indexing_exception.text
logger.error("Bulk indexing of files resulted in some errors")

# Combine and raise all errors from failed attempts to extract text or index documents
if (
document_text_extraction_exceptions_message
or bulk_indexing_exception_message
):
consignment_bulk_index_error_message = (
"The following errors occurred when attempting to "
f"bulk index consignment reference: {consignment_reference}"
)
if document_text_extraction_exceptions_message:
consignment_bulk_index_error_message += (
f"\n{document_text_extraction_exceptions_message}"
)
if bulk_indexing_exception_message:
consignment_bulk_index_error_message += (
f"\n{bulk_indexing_exception_message}"
)

raise ConsignmentBulkIndexError(consignment_bulk_index_error_message)


def _construct_documents(files: List[Dict], bucket_name: str) -> List[Dict]:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
import logging
import tempfile
from enum import Enum
from typing import Dict

import textract

logger = logging.getLogger()
logger.setLevel(logging.INFO)


class TextExtractionStatus(Enum):
SUCCEEDED = "SUCCEEDED"
FAILED = "FAILED"
SKIPPED = "SKIPPED"


SUPPORTED_TEXTRACT_FORMATS = [
"csv",
"doc",
Expand Down Expand Up @@ -45,18 +53,20 @@ def add_text_content(file: Dict, file_stream: bytes) -> Dict:
f"Text extraction skipped for unsupported file type: {file_type}"
)
file["content"] = ""
file["text_extraction_status"] = "n/a"
file["text_extraction_status"] = TextExtractionStatus.SKIPPED.value
else:
try:
file["content"] = extract_text(file_stream, file_type)
logger.info(f"Text extraction succeeded for file {file['file_id']}")
file["text_extraction_status"] = "success"
file["text_extraction_status"] = (
TextExtractionStatus.SUCCEEDED.value
)
except Exception as e:
logger.error(
f"Text extraction failed for file {file['file_id']}: {e}"
)
file["content"] = ""
file["text_extraction_status"] = "failed"
file["text_extraction_status"] = TextExtractionStatus.FAILED.value

return file

Expand Down
2 changes: 2 additions & 0 deletions data_management/opensearch_indexer/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ requests-aws4auth==1.3.1
SQLAlchemy==2.0.32
pg8000==1.31.2
textract==1.6.5
testing-postgresql==1.3.0
psycopg2=2.9.10
98 changes: 98 additions & 0 deletions data_management/opensearch_indexer/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import tempfile

import pytest
from sqlalchemy import (
Boolean,
Column,
DateTime,
ForeignKey,
String,
Text,
create_engine,
)
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import declarative_base, relationship
from testing.postgresql import PostgresqlFactory

Base = declarative_base()


class Body(Base):
__tablename__ = "Body"
BodyId = Column(UUID(as_uuid=True), primary_key=True)
Name = Column(Text)
Description = Column(Text)


class Series(Base):
__tablename__ = "Series"
SeriesId = Column(UUID(as_uuid=True), primary_key=True)
BodyId = Column(UUID(as_uuid=True), ForeignKey("Body.BodyId"))
Name = Column(Text)
Description = Column(Text)
body = relationship("Body", foreign_keys="Series.BodyId")


class Consignment(Base):
__tablename__ = "Consignment"
ConsignmentId = Column(UUID(as_uuid=True), primary_key=True)
SeriesId = Column(UUID(as_uuid=True), ForeignKey("Series.SeriesId"))
BodyId = Column(UUID(as_uuid=True), ForeignKey("Body.BodyId"))
ConsignmentReference = Column(Text)
ConsignmentType = Column(String, nullable=False)
IncludeTopLevelFolder = Column(Boolean)
ContactName = Column(Text)
ContactEmail = Column(Text)
TransferStartDatetime = Column(DateTime)
TransferCompleteDatetime = Column(DateTime)
ExportDatetime = Column(DateTime)
CreatedDatetime = Column(DateTime)
series = relationship("Series", foreign_keys="Consignment.SeriesId")


class File(Base):
__tablename__ = "File"
FileId = Column(UUID(as_uuid=True), primary_key=True)
ConsignmentId = Column(
UUID(as_uuid=True), ForeignKey("Consignment.ConsignmentId")
)
FileReference = Column(Text, nullable=False)
FileType = Column(Text, nullable=False)
FileName = Column(Text, nullable=False)
FilePath = Column(Text, nullable=False)
CiteableReference = Column(Text)
Checksum = Column(Text)
CreatedDatetime = Column(DateTime)
consignment = relationship("Consignment", foreign_keys="File.ConsignmentId")


class FileMetadata(Base):
__tablename__ = "FileMetadata"
MetadataId = Column(UUID(as_uuid=True), primary_key=True)
FileId = Column(UUID(as_uuid=True), ForeignKey("File.FileId"))
PropertyName = Column(Text, nullable=False)
Value = Column(Text)
CreatedDatetime = Column(DateTime)
file = relationship("File", foreign_keys="FileMetadata.FileId")


@pytest.fixture()
def temp_db():
temp_db_file = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
temp_db_file.close()
database_url = f"sqlite:///{temp_db_file.name}"
engine = create_engine(database_url)
Base.metadata.create_all(engine)
return engine


@pytest.fixture(scope="session")
def database(request):
# Launch new PostgreSQL server
postgresql = PostgresqlFactory(cache_initialized_db=True)()
yield postgresql

# PostgreSQL server is terminated here
@request.addfinalizer
def drop_database():
postgresql.stop()
Loading

0 comments on commit e743061

Please sign in to comment.