Skip to content

Commit

Permalink
RAG Data Ingestion Benchmark (#318)
Browse files Browse the repository at this point in the history
* first commit, partially working benchmark.

* Completed RAG Data ingestion benchmark.

* Updated tag readme.

* Fix typo in benchmark URL

* updated doc again to add documentation on optional input parameter of `embedding_model_id`

* cleaned up code, also allow for scaning text from images.

* Created a version of the benchmark that also extract images.

* Consolidated into one application. updated documentation.

---------

Co-authored-by: Viktor Gsteiger <[email protected]>
  • Loading branch information
Danidite and vGsteiger authored Sep 17, 2024
1 parent 9995f32 commit 4e70bc2
Show file tree
Hide file tree
Showing 12 changed files with 1,343 additions and 5 deletions.
5 changes: 1 addition & 4 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,4 @@ benchmarks/solver_benchmarks/images/
# Benchmark results
benchmarks/solver_benchmarks/results/

Dockerfile

# Framework deployment packages
.caribou/
Dockerfile
39 changes: 39 additions & 0 deletions benchmarks/rag_data_ingestion/.caribou/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
workflow_name: "rag_data_ingestion"
workflow_version: "0.0.1"
iam_policy_file: "iam_policy.json"
home_region:
provider: "aws"
region: "us-east-1"
estimated_invocations_per_month: 1000000
constraints:
hard_resource_constraints:
cost: null
runtime: null
carbon: null
soft_resource_constraints:
cost: null
runtime: null
carbon: null
priority_order:
- carbon
- cost
- runtime
regions_and_providers:
allowed_regions:
- provider: "aws"
region: "us-east-1"
- provider: "aws"
region: "us-west-1"
- provider: "aws"
region: "us-west-2"
- provider: "aws"
region: "ca-central-1"
disallowed_regions:
providers:
aws:
config:
timeout: 300
memory: 1024
additional_docker_commands:
- "yum update -y"
- "yum install -y libGL"
41 changes: 41 additions & 0 deletions benchmarks/rag_data_ingestion/.caribou/iam_policy.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"aws": {
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:*:*:*",
"Effect": "Allow"
},
{
"Action": ["sns:Publish"],
"Resource": "arn:aws:sns:*:*:*",
"Effect": "Allow"
},
{
"Action": ["dynamodb:GetItem", "dynamodb:UpdateItem", "dynamodb:PutItem"],
"Resource": "arn:aws:dynamodb:*:*:*",
"Effect": "Allow"
},
{
"Action": ["s3:GetObject", "s3:PutObject"],
"Resource": "arn:aws:s3:::*",
"Effect": "Allow"
},
{
"Action": ["secretsmanager:GetSecretValue", "secretsmanager:ListSecrets"],
"Effect": "Allow",
"Resource": "*"
},
{
"Action": ["bedrock:InvokeModel", "bedrock:InvokeModelWithResponseStream"],
"Effect": "Allow",
"Resource": "*"
}
]
}
}
1 change: 1 addition & 0 deletions benchmarks/rag_data_ingestion/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.caribou/deployment-packages/
Empty file.
230 changes: 230 additions & 0 deletions benchmarks/rag_data_ingestion/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
from datetime import datetime, timezone
import os
from typing import Any
import boto3
import PyPDF2
import shortuuid
import json
from langchain_community.embeddings import BedrockEmbeddings
from langchain_community.document_loaders import PyPDFLoader
from langchain_postgres.vectorstores import PGVector
from langchain.text_splitter import RecursiveCharacterTextSplitter
from sqlalchemy import create_engine
import rapidocr_onnxruntime # To force the import of the onnxruntime package

from caribou.deployment.client.caribou_workflow import CaribouWorkflow
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Constants
EMBEDDING_MODEL_ID_DEFAULT = "amazon.titan-embed-text-v2:0"
UPLOADED = "UPLOADED"
ALL_DOCUMENTS = "ALL_DOCUMENTS"
PROCESSING = "PROCESSING"
READY = "READY"

# Configuration if you wish to extract images from PDFs
EXTRACT_IMAGES = True

# Change the following bucket and dynamodb name and region to match your setup
desired_region = "us-east-1"
secrets_manager_region = desired_region

s3_bucket_name = "caribou-document-embedding-benchmark"
s3_bucket_region_name = desired_region
dynamodb_document_table = "caribou-document-embedding-benchmark-document"
dynamodb_memory_table = "caribou-document-embedding-benchmark-memory"
dynamodb_region_name = s3_bucket_region_name

# Change the following RDS and Postgres details to match your setup
bedrock_runtime_region_name = desired_region
postgresql_secret_name ="full_aws_secret_name_for_postgress" # Replace with your secret name (If set postgresql to be managed by AWS Secrets Manager)
postgresql_host = "some_db.somecharacters.us-east-1.rds.amazonaws.com" # Replace with your RDS endpoint
postgresql_dbname = "postgres" # Replace with your RDS database name
postgresql_port = "5432" # Replace with your RDS port

workflow = CaribouWorkflow(name="rag_data_ingestion", version="0.0.1")

@workflow.serverless_function(
name="upload_trigger",
entry_point=True,
)
def upload_trigger(event: dict[str, Any]) -> dict[str, Any]:
if isinstance(event, str):
event = json.loads(event)

if "user_id" in event:
user_id: str = event["user_id"]
else:
raise ValueError("No user ID provided")
if "file_name" in event:
file_name: str = event["file_name"]
else:
raise ValueError("No file name provided")
if "embedding_model_id" in event:
embedding_model_id: str = event["embedding_model_id"]
else:
embedding_model_id = EMBEDDING_MODEL_ID_DEFAULT # Default Embedding Model ID

s3 = boto3.client("s3", region_name=s3_bucket_region_name)
s3.download_file(s3_bucket_name, f"input/{file_name}", f"/tmp/{file_name}")
with open(f"/tmp/{file_name}", "rb") as f:
reader = PyPDF2.PdfReader(f)
pages = str(len(reader.pages))
filesize = str(os.path.getsize(f"/tmp/{file_name}"))

### Create new document & conversation history
document, conversation = create_document_and_conversation(user_id, file_name, pages, filesize)

document_table = boto3.resource("dynamodb", region_name=dynamodb_region_name).Table(dynamodb_document_table)
memory_table = boto3.resource("dynamodb", region_name=dynamodb_region_name).Table(dynamodb_memory_table)

document_table.put_item(Item=document)
memory_table.put_item(Item=conversation)

### Create/Update ALL_DOCUMENTS document
response = document_table.get_item(Key={"userid": user_id, "documentid": ALL_DOCUMENTS})
if "Item" not in response:
documents_all, conversation_all = create_document_and_conversation(user_id, ALL_DOCUMENTS, pages, filesize)
memory_table.put_item(Item=conversation_all)
else:
documents_all = response["Item"]
documents_all["docstatus"] = UPLOADED
documents_all["pages"] = str(int(documents_all["pages"]) + int(pages))
documents_all["filesize"] = str(int(documents_all["filesize"]) + int(filesize))

document_table.put_item(Item=documents_all)

payload = {
"document_id": document["documentid"],
"user": user_id,
"file_name": file_name,
"embedding_model_id": embedding_model_id,
}

workflow.invoke_serverless_function(generate_embeddings, payload)

return {"status": 200}

@workflow.serverless_function(name="generate_embeddings")
def generate_embeddings(event):
document_id = event["document_id"]
user_id = event["user"]
file_name = event["file_name"]
embedding_model_id = event["embedding_model_id"]


document_table = boto3.resource("dynamodb", region_name=dynamodb_region_name).Table(dynamodb_document_table)

set_doc_status(document_table, user_id, document_id, PROCESSING)
set_doc_status(document_table, user_id, ALL_DOCUMENTS, PROCESSING)

s3 = boto3.client("s3", region_name=s3_bucket_region_name)
s3.download_file(s3_bucket_name, f"input/{file_name}", f"/tmp/{file_name}")

loader = PyPDFLoader(f"/tmp/{file_name}", extract_images=EXTRACT_IMAGES)
data = loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=10000, chunk_overlap=1000)
split_document = text_splitter.split_documents(data)

bedrock_runtime = boto3.client(
service_name="bedrock-runtime",
region_name=bedrock_runtime_region_name,
)

embeddings = BedrockEmbeddings(
model_id=embedding_model_id,
client=bedrock_runtime,
region_name=bedrock_runtime_region_name,
)

db_secret = get_db_secret()
connection_string = f"postgresql://{db_secret['username']}:{db_secret['password']}@{postgresql_host}:{postgresql_port}/{postgresql_dbname}"
postgresql_connection = create_engine(connection_string)

collection_names = [f"{user_id}_{ALL_DOCUMENTS}", f"{user_id}_{file_name}"]
ids = {
f"{user_id}_{file_name}": [f"{user_id}_{file_name}_{i}" for i in range(len(split_document))],
f"{user_id}_{ALL_DOCUMENTS}": [f"{user_id}_{file_name}_{i}_{ALL_DOCUMENTS}" for i in range(len(split_document))]
}
for collection_name in collection_names:
vector_store = PGVector(
embeddings=embeddings,
collection_name=collection_name,
connection=postgresql_connection,
use_jsonb=True,
)

vector_store.add_documents(split_document, ids=ids[collection_name])

set_doc_status(document_table, user_id, document_id, READY, ids[f"{user_id}_{file_name}"])
set_doc_status(document_table, user_id, ALL_DOCUMENTS, READY, ids[f"{user_id}_{ALL_DOCUMENTS}"])

# Clean up
postgresql_connection.dispose(close=True)

return {"status": 200}

# Helper functions for upload_trigger
def create_document_and_conversation(user_id, filename, pages, filesize):
timestamp = datetime.now(timezone.utc)
timestamp_str = timestamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ")

document_id = shortuuid.uuid()
conversation_id = shortuuid.uuid()

document = {
"userid": user_id,
"documentid": ALL_DOCUMENTS if (filename == ALL_DOCUMENTS) else document_id,
"filename": filename,
"created": timestamp_str,
"pages": pages,
"filesize": filesize,
"docstatus": UPLOADED,
"conversations": [],
"document_split_ids": [],
}

conversation = {"conversationid": conversation_id, "created": timestamp_str}
document["conversations"].append(conversation)

conversation = {"SessionId": conversation_id, "History": []}

return [document, conversation]

# Helper functions for generate_embeddings
def set_doc_status(document_table, user_id, document_id, status, ids=None):
if (ids):
UpdateExpression="""
SET docstatus = :docstatus,
document_split_ids = list_append(if_not_exists(document_split_ids, :empty_list), :ids)
"""
ExpressionAttributeValues={
":docstatus": status,
":ids": ids,
":empty_list": []
}
else:
UpdateExpression="SET docstatus = :docstatus"
ExpressionAttributeValues={
":docstatus": status
}

document_table.update_item(
Key={"userid": user_id, "documentid": document_id},
UpdateExpression=UpdateExpression,
ExpressionAttributeValues=ExpressionAttributeValues,
)

def get_db_secret():
sm_client = boto3.client(
service_name="secretsmanager",
region_name=secrets_manager_region,
)
response = sm_client.get_secret_value(
SecretId=postgresql_secret_name
)["SecretString"]
secret = json.loads(response)
return secret
41 changes: 41 additions & 0 deletions benchmarks/rag_data_ingestion/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# RAG Data Ingestion Benchmark

This benchmark is the `Data Ingestion` part of the bigger document chat application
benchmark: https://github.com/UBC-CIC/document-chat/

This benchmark requires access to the S3 bucket named `caribou-document-embedding-benchmark`,
a valid `PostgreSQL` database with its credential saved to `AWS Secret Manager`, two dynamoDB
tables by the name of `caribou-document-embedding-benchmark-document` (Partition key: `userid` (String),
Sort key: `documentid` (String)) and `caribou-document-embedding-benchmark-memory`
(Partition key: SessionId (String)), and permission to use the `Titan Text Embeddings V2` model in AWS Bedrock.
All with the AWS Region set to or located in `us-east-1` (N. Virginia).

Alternatively, the user should set the aforementioned databases and dependencies in the `app.py`.

You can also enable/disable image extraction from images by changing the EXTRACT_IMAGES macro.

There needs to be a file in the S3 bucket, for example, `example.pdf`, in a folder called `input`,
or any valid PDF file.

This benchmark allows for usage of custom AWS Bedrock Embedding, which
can be configured with an input argument `embedding_model_id`
(https://docs.aws.amazon.com/bedrock/latest/userguide/titan-embedding-models.html).
If this is not set, this benchmark defaults to using `amazon.titan-embed-text-v2:0`.

You can deploy the benchmark with the following command while inside the poetry environment:

```bash
caribou deploy
```

And then run the benchmark with the following command:

```bash
caribou run rag_data_ingestion-version_number -a '{"user_id": "example_user_1", "file_name": "example.pdf"}'
```

To remove the benchmark, you can use the following command:

```bash
caribou remove rag_data_ingestion-version_number
```
16 changes: 16 additions & 0 deletions benchmarks/rag_data_ingestion/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
boto3==1.34.131
botocore==1.34.131
pyyaml==6.0.1
pytz==2024.1
PyPDF2==3.0.1
shortuuid==1.0.11
faiss-cpu==1.7.4
langchain==0.2.12
langchain-community==0.2.10
langchain-aws==0.1.13
langchain_postgres==0.0.9
pypdf==3.17.0
urllib3
psycopg[binary,pool]
psycopg2-binary
rapidocr-onnxruntime
Empty file.
Empty file.
Loading

0 comments on commit 4e70bc2

Please sign in to comment.