-
Notifications
You must be signed in to change notification settings - Fork 0
/
data_pipeline.py
41 lines (37 loc) · 1.16 KB
/
data_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import json
import boto3
import csv
import uuid
from datetime import datetime
import os
from pytz import timezone
s3 = boto3.resource('s3')
local_file = '/tmp/world_cup.csv'
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['DYNAMO_TABLE'])
tz_sydney = timezone(os.environ['TZ_LOCAL'])
date = datetime.now(tz_sydney).strftime("%Y-%m-%d")
def run(event, context):
#print("Started\t\t" + str(event))
records = event['Records']
for record in records:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
process_data(bucket, key, local_file)
def process_data(bucket, key, file):
s3get(bucket, key, file)
with open(file, mode='r') as csv_file:
csv_reader = csv.DictReader(csv_file)
for row in csv_reader:
row_json = json.dumps(row)
print(row_json)
id=str(uuid.uuid4())
table.put_item(
Item = {
'id': id,
'date': date,
'content': row_json
}
)
def s3get(bucket, key, file):
s3.Object(bucket, key).download_file(file)