-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlambda_function.py
87 lines (73 loc) · 2.9 KB
/
lambda_function.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import boto3
import json
import logging
from botocore.exceptions import ClientError
from datetime import datetime
import time
from decimal import Decimal
import uuid
# Initialize logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
# Initialize AWS clients
rekognition = boto3.client('rekognition', region_name='us-west-2')
kinesis_video = boto3.client('kinesisvideo', region_name='us-west-2')
sns = boto3.client('sns', region_name='us-west-2')
def lambda_handler(event, context):
"""Lambda handler to process video streams through Rekognition Content Moderation"""
logger.info("Received event: %s", json.dumps(event))
try:
# Extract stream name from the event
stream_name = event['detail']['requestParameters']['StreamName']
# Get stream endpoint
endpoint = kinesis_video.get_data_endpoint(
StreamName=stream_name,
APIName='GET_MEDIA'
)['DataEndpoint']
# Create Kinesis Video Media client
kvs_client = boto3.client('kinesis-video-media',
endpoint_url=endpoint,
region_name='us-west-2')
# Get the media stream
stream = kvs_client.get_media(
StreamName=stream_name,
StartSelector={'StartSelectorType': 'NOW'}
)
# Process frames through Rekognition
response = rekognition.detect_moderation_labels(
Image={'Bytes': stream['Payload'].read()},
MinConfidence=80.0
)
# Process moderation results
if response['ModerationLabels']:
dynamodb = boto3.resource('dynamodb', region_name='us-west-2')
table = dynamodb.Table('content-moderation-table')
# Format labels
formatted_labels = [{
'Name': label['Name'],
'Confidence': Decimal(str(label['Confidence']))
} for label in response['ModerationLabels']]
# Store in DynamoDB
table.put_item(Item={
'ImageId': str(uuid.uuid4()),
'stream_name': stream_name,
'timestamp': datetime.now().isoformat(),
'labels': formatted_labels,
'status': 'flagged'
})
# Send SNS notification
sns.publish(
TopicArn='arn:aws:sns:us-west-2:794038244518:content-moderation-alerts',
Message=json.dumps({
'stream_name': stream_name,
'labels': formatted_labels,
'timestamp': datetime.now().isoformat()
})
)
return {
'statusCode': 200,
'body': json.dumps('Content moderation completed')
}
except Exception as e:
logger.error(f"Error: {e}")
raise