Skip to content

Commit

Permalink
feature:generate_singed_url
Browse files Browse the repository at this point in the history
  • Loading branch information
codernesty committed Aug 26, 2024
1 parent d1dcbf1 commit 37740a8
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 2 deletions.
19 changes: 17 additions & 2 deletions linguaphoto/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from fastapi import FastAPI, File, HTTPException, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from utils.cloudfront_url_signer import CloudFrontUrlSigner

# Load environment variables from .env file
load_dotenv()
Expand All @@ -27,6 +28,8 @@
# Retrieve AWS configuration from environment variables
bucket_name = os.getenv("S3_BUCKET_NAME")
dynamodb_table_name = os.getenv("DYNAMODB_TABLE_NAME")
media_hosting_server = os.getenv("MEDIA_HOSTING_SERVER")
key_pair_id = os.getenv("KEY_PAIR_ID")


class ImageMetadata(BaseModel):
Expand All @@ -50,6 +53,14 @@ async def upload_image(file: UploadFile = File(...)) -> ImageMetadata:
if dynamodb_table_name is None:
raise HTTPException(status_code=500, detail="DynamoDB table name is not set.")

# Create an instance of CloudFrontUrlSigner
private_key_path = os.path.abspath("private_key.pem")
cfs = CloudFrontUrlSigner(str(key_pair_id), private_key_path)
# Generate a signed URL
url = f"{media_hosting_server}/{unique_filename}"
custom_policy = cfs.create_custom_policy(url, expire_days=100)
s3_url = cfs.generate_presigned_url(url, custom_policy)
print(s3_url)
# Create an S3 client with aioboto3
async with aioboto3.Session().client(
"s3",
Expand All @@ -59,7 +70,6 @@ async def upload_image(file: UploadFile = File(...)) -> ImageMetadata:
) as s3_client:
# Upload the file to S3
await s3_client.upload_fileobj(file.file, bucket_name, f"uploads/{unique_filename}")
s3_url = f"https://{bucket_name}.s3.amazonaws.com/uploads/{unique_filename}"

# Create a DynamoDB resource with aioboto3
async with aioboto3.Session().resource(
Expand All @@ -75,6 +85,7 @@ async def upload_image(file: UploadFile = File(...)) -> ImageMetadata:
return ImageMetadata(filename=unique_filename, s3_url=s3_url)

except Exception as e:
print(str(e))
raise HTTPException(status_code=500, detail=str(e))


Expand Down Expand Up @@ -108,4 +119,8 @@ async def root() -> dict[str, str]:

if __name__ == "__main__":
print("Starting webserver...")
uvicorn.run(app, port=8080, host="0.0.0.0")
uvicorn.run(
app, # Replace with the module name and app instance
host="0.0.0.0",
port=8080,
)
27 changes: 27 additions & 0 deletions linguaphoto/private_key.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAxoAXnKN5nJlYe44iFBJJXgd+nH8WCJ/q+7yLE5iRAf5ylCHH
/Vai6ogLOPrQQzSyKLRAIBRxlgFw/U60H0wyiD8bS/iRy5Dlne4Vp2WHc/Lw6M+o
E0Fb/YiAM1dpPFFpHOKy/OL0CN5/eh0i7cvUht1Ssz+KI5Zgi4i7KuI67p2LdSZd
+rsqmBipyrTPTONPADgj1wuVq3J/6TX8ltMPiJgCMNblddDtn5OArWCZVsGydKmP
kHM051q4wJQn9Y2eI7ho6Qd3i4iuV7rK0T/gKu9kv9gdb6k1J/Le/jaEMcXuUSj8
nu2RsfkMotxH/Rm6XH5LdjrcVWY4ulF7WgJk9QIDAQABAoIBAF6S2+0Y5BGs6//e
HbVzavo+VuAIGr7cNnBfCeI5v+jzyrJyD99PfkqAq9wnf79tZW7IRn8iTmXaZPOD
IoWA39iTPJWrJgeXjxb6Pt3lHS6ssoQxn9Igw0vd07ribDS9UvfcuMuM9BkfrRvI
swIaKbVh0Iuve8jt1izU5dwOMmbJ171MZHMiILsKH5Xf3UmkDTX1cLU3GoSBpBa+
oq5ngo9MiWoBgBA7sexm6kQIcRJqJJGrRIL7sAYFJQd2C9kV4rnTcC8Vw/mD4G/U
oe5TuSQ98T+SAjgso+xyqA9uYgVRfE37h5ctYVf4t2zPD1LrkxV6gljk2VizSFg3
4S/XUrcCgYEA/cs3L6lOcbxvFxHdrE6LL6c2hIiiJxUFGq/I4J/WYwMMDtMPgqNw
uwbhK+cQkRyO5YcinV7CRTON7LLFhNJZug1nAcVCVH7OOMUM6xef+6bIy8SVf/gT
0x69+x3oRMILHF5BLBpO+DsvTgsFSF2cDsN6S7LlJ+J2Xg2YlP7BRI8CgYEAyDnU
FAAD4elgOxDVTps+bt3fgZSmzSaAaVdL2hNgpA3SBvcGfq/tRntKCt0oFUn9z//y
Y2+7rTN5A6cJNdPQGA2HE1YtECIO97ir3gDypGxZatCGywR90KTOBnQVWndDqddT
lOQxIFRfokEPmNtQP+D4fVKZ51mifKTXa5RK6DsCgYEA5SkhY3/UvQ4QoRwYtQUW
2Kh7qaBWCkQIn9gp4elxg2W09Y3Oa394wuerWiEB7IWE4evrbX2qnSG3/QpPH2dw
bXa8k/Sxt+nn+4qx53Ull+05UgTnmO0/uVoA4UZX+/3aWnshDdmThMCsLiP1WSpt
R0dqnf+iuyjZCIPuSlrd2DECgYAfggIel5YE0d3DzbVTZlifx6hpUsQg2mMwsH0O
NyhpCIe5ctwByZt0EOio5v5swzT+q08wWJ/W9JehfIKVhtxjPJW59ECpHkLuto/N
IqcMOsSja2cawX0u/RAysce+cbAjJPBRKMuWQ9C8zrIuoqxxMOzJg9sWLePE64e+
tRpIiwKBgQCctJAzR66e4RsBIY5AaxSLbP17qPbQKeuwbFrG6xbQbQQ8hqPuxPW9
dDTflbkoSK+paqHsd1yiLX+IwylTZWPdTBUdpO9in0f/WYm1DGZEXyR0MtFv7NFG
bETRchnR3ss+6ZYIbn/AtZWEcGW3YwX4Ys3+5RGfRM6TeKT+cGj08A==
-----END RSA PRIVATE KEY-----
1 change: 1 addition & 0 deletions linguaphoto/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__version__ = "0.0.1"
83 changes: 83 additions & 0 deletions linguaphoto/utils/cloudfront_url_signer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""This module provides a class to generate signed URLs for AWS CloudFront using RSA keys.
The `CloudFrontUrlSigner` class allows you to create and sign CloudFront URLs with optional custom policies.
"""

import json
from datetime import datetime, timedelta
from typing import Any, Dict, Optional

import rsa
from botocore.signers import CloudFrontSigner


class CloudFrontUrlSigner:
"""A class to generate signed URLs for AWS CloudFront using RSA keys."""

def __init__(self, key_id: str, private_key_path: str) -> None:
"""Initialize the CloudFrontUrlSigner with a key ID and the path to the private key file.
:param key_id: The CloudFront key ID associated with the public key in your CloudFront key group.
:param private_key_path: The path to the private key PEM file.
"""
self.key_id = key_id
self.private_key_path = private_key_path
self.cf_signer = CloudFrontSigner(key_id, self._rsa_signer)

def _rsa_signer(self, message: str) -> bytes:
"""RSA signer function that signs a message using the private key.
:param message: The message to be signed.
:return: The RSA signature of the message as bytes.
"""
with open(self.private_key_path, "r") as key_file:
private_key = key_file.read()
return rsa.sign(
message, # Ensure message is in bytes
rsa.PrivateKey.load_pkcs1(private_key.encode("utf8")),
"SHA-1", # CloudFront requires SHA-1 hash
)

def generate_presigned_url(self, url: str, policy: Optional[str] = None) -> str:
"""Generate a presigned URL for CloudFront using an optional custom policy.
:param url: The URL to sign.
:param policy: (Optional) A custom policy for the URL.
:return: The signed URL.
"""
return self.cf_signer.generate_presigned_url(url, policy=policy)

def create_custom_policy(self, url: str, expire_days: int = 1, ip_range: Optional[str] = None) -> str:
"""Create a custom policy for CloudFront signed URLs.
:param url: The URL to be signed.
:param expire_days: Number of days until the policy expires.
:param ip_range: Optional IP range to restrict access.
:return: The custom policy in JSON format.
"""
expiration_time = int((datetime.utcnow() + timedelta(days=expire_days)).timestamp())
policy: Dict[str, Any] = {
"Statement": [
{
"Resource": url,
"Condition": {
"DateLessThan": {"AWS:EpochTime": expiration_time},
},
}
]
}
if ip_range:
policy["Statement"][0]["Condition"]["IpAddress"] = {"AWS:SourceIp": ip_range}

return json.dumps(policy, separators=(",", ":")) # Minified JSON


# Example usage:
# key_id = 'your_cloudfront_key_id'
# private_key_path = 'path/to/your/private_key.pem'
# url = 'https://your-distribution.cloudfront.net/your-file'
# my_policy = None # Replace with your custom policy if needed

# signer = CloudFrontUrlSigner(key_id, private_key_path)
# signed_url = signer.generate_presigned_url(url, policy=my_policy)
# print(signed_url)
2 changes: 2 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[mypy]
ignore_missing_imports = True

0 comments on commit 37740a8

Please sign in to comment.