Skip to content

Commit

Permalink
Merge pull request #1370 from bcgov/chore/alex-dependancy-update-241202
Browse files Browse the repository at this point in the history
Feat: Async S3 aioboto3
  • Loading branch information
AlexZorkin authored Dec 5, 2024
2 parents 505f78c + 4bfd86e commit 65f131c
Show file tree
Hide file tree
Showing 6 changed files with 443 additions and 57 deletions.
59 changes: 35 additions & 24 deletions backend/lcfs/services/redis/lifetime.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import logging
from fastapi import FastAPI
from redis.asyncio import Redis
from redis.exceptions import RedisError

from redis.exceptions import RedisError, TimeoutError
import asyncio
from lcfs.settings import settings

logger = logging.getLogger(__name__)
Expand All @@ -14,27 +14,38 @@ async def init_redis(app: FastAPI) -> None:
:param app: current FastAPI application.
"""
try:
# Initialize Redis client directly
app.state.redis_client = Redis(
host=settings.redis_host,
port=settings.redis_port,
password=settings.redis_pass,
db=settings.redis_base or 0,
decode_responses=True,
socket_timeout=5, # Timeout for socket read/write (seconds)
socket_connect_timeout=5, # Timeout for connection establishment (seconds)
)

# Test the connection
await app.state.redis_client.ping()
logger.info("Redis client initialized and connection successful.")
except RedisError as e:
logger.error(f"Redis error during initialization: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error during Redis initialization: {e}")
raise
retries = 5 # Retry logic in case Redis is unavailable initially
for i in range(retries):
try:
# Initialize Redis client
app.state.redis_client = Redis(
host=settings.redis_host,
port=settings.redis_port,
password=settings.redis_pass,
db=settings.redis_base or 0,
decode_responses=True,
max_connections=10,
socket_timeout=5,
socket_connect_timeout=5,
)

# Test the connection
await app.state.redis_client.ping()
logger.info("Redis client initialized and connection successful.")
break
except TimeoutError as e:
logger.error(f"Redis timeout during initialization attempt {i + 1}: {e}")
if i == retries - 1:
raise
await asyncio.sleep(2**i) # Exponential backoff
except RedisError as e:
logger.error(f"Redis error during initialization attempt {i + 1}: {e}")
if i == retries - 1:
raise
await asyncio.sleep(2**i) # Exponential backoff
except Exception as e:
logger.error(f"Unexpected error during Redis initialization: {e}")
raise


async def shutdown_redis(app: FastAPI) -> None:
Expand All @@ -46,7 +57,7 @@ async def shutdown_redis(app: FastAPI) -> None:
try:
if hasattr(app.state, "redis_client") and app.state.redis_client:
await app.state.redis_client.close()
logger.info("Redis client closed successfully.")
logger.info("Redis client closed successfully.")
except RedisError as e:
logger.error(f"Redis error during shutdown: {e}")
except Exception as e:
Expand Down
36 changes: 20 additions & 16 deletions backend/lcfs/services/s3/client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import os
import uuid

import boto3
from fastapi import Depends, Request
from fastapi import Depends
from pydantic.v1 import ValidationError
from sqlalchemy import select
from sqlalchemy.exc import InvalidRequestError
Expand Down Expand Up @@ -53,12 +51,13 @@ async def upload_file(self, file, parent_id: str, parent_type="compliance_report
self.clamav_service.scan_file(file)

# Upload file to S3
self.s3_client.upload_fileobj(
Fileobj=file.file,
Bucket=BUCKET_NAME,
Key=file_key,
ExtraArgs={"ContentType": file.content_type},
)
async with self.s3_client as client:
await client.upload_fileobj(
Fileobj=file.file,
Bucket=BUCKET_NAME,
Key=file_key,
ExtraArgs={"ContentType": file.content_type},
)

document = Document(
file_key=file_key,
Expand Down Expand Up @@ -97,11 +96,12 @@ async def generate_presigned_url(self, document_id: int):
if not document:
raise Exception("Document not found")

presigned_url = self.s3_client.generate_presigned_url(
"get_object",
Params={"Bucket": BUCKET_NAME, "Key": document.file_key},
ExpiresIn=60, # URL expiration in seconds
)
async with self.s3_client as client:
presigned_url = await client.generate_presigned_url(
"get_object",
Params={"Bucket": BUCKET_NAME, "Key": document.file_key},
ExpiresIn=60, # URL expiration in seconds
)
return presigned_url

# Delete a file from S3 and remove the entry from the database
Expand All @@ -113,7 +113,8 @@ async def delete_file(self, document_id: int):
raise Exception("Document not found")

# Delete the file from S3
self.s3_client.delete_object(Bucket=BUCKET_NAME, Key=document.file_key)
async with self.s3_client as client:
await client.delete_object(Bucket=BUCKET_NAME, Key=document.file_key)

# Delete the entry from the database
await self.db.delete(document)
Expand Down Expand Up @@ -144,5 +145,8 @@ async def get_object(self, document_id: int):
if not document:
raise Exception("Document not found")

response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=document.file_key)
async with self.s3_client as client:
response = await client.get_object(
Bucket=BUCKET_NAME, Key=document.file_key
)
return response, document
7 changes: 4 additions & 3 deletions backend/lcfs/services/s3/dependency.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
from starlette.requests import Request
import boto3
from aioboto3 import Session


# S3 Client Dependency
async def get_s3_client(
request: Request,
) -> boto3.client:
) -> Session.client:
"""
Returns the S3 client from the application state.
Usage:
>>> async def handler(s3_client = Depends(get_s3_client)):
>>> s3_client.upload_file('file.txt', 'my-bucket', 'file.txt')
>>> async with s3_client as client:
>>> await client.upload_fileobj('file.txt', 'my-bucket', 'file.txt')
:param request: Current request object.
:returns: S3 client.
Expand Down
13 changes: 7 additions & 6 deletions backend/lcfs/services/s3/lifetime.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import boto3
from fastapi import FastAPI
from aioboto3 import Session
from lcfs.settings import settings


Expand All @@ -9,14 +9,15 @@ async def init_s3(app: FastAPI) -> None:
:param app: FastAPI application.
"""
app.state.s3_client = boto3.client(
session = Session()
app.state.s3_client = session.client(
"s3",
aws_access_key_id=settings.s3_access_key,
aws_secret_access_key=settings.s3_secret_key,
endpoint_url=settings.s3_endpoint,
region_name="us-east-1",
)
print("S3 client initialized.")
print("Async S3 client initialized.")


async def shutdown_s3(app: FastAPI) -> None:
Expand All @@ -25,6 +26,6 @@ async def shutdown_s3(app: FastAPI) -> None:
:param app: FastAPI application.
"""
if hasattr(app.state, "s3_client"):
del app.state.s3_client
print("S3 client shutdown.")
if hasattr(app.state, "s3_client") and app.state.s3_client:
await app.state.s3_client.close()
print("Async S3 client shutdown.")
Loading

0 comments on commit 65f131c

Please sign in to comment.