Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/query dynamo from spark #6

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cloud-streaming-pipeline/run_week5_app.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
spark-submit \
sparkhelloworld/week5_app.py
89 changes: 89 additions & 0 deletions cloud-streaming-pipeline/sparkhelloworld/week5_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
A pyspark program which enriches a dataset from DynamoDB
"""
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import boto3
from collections import namedtuple

#Use a namedtuple as a class to represent the data coming back from DynamoDB
#(Like a Scala case class)
Person = namedtuple("Person", ["name", "birthdate", "customer_id"])

#Fake input data, this will come from Kafka in reality
def build_fake_input_data(num_records, num_partitions, spark):
#Have to make sure the element is a 1-item tuple. The trailing comma
#that looks unnecessary/dangling in the expression:
# (key,)
#is crucial to ensuring it creates a tuple instead of a string. Do not remove it!
keys = [(key,) for key in get_random_customer_ids(num_records)]
# Define the schema for the DataFrame
schema = StructType([
StructField("customer_id", StringType(), nullable=False)
])
return spark.createDataFrame(data=keys, schema=schema).repartition(num_partitions)

#Get random customer IDs from DynamoDB customers table
def get_random_customer_ids(num):
my_session = boto3.Session(profile_name='hwe_prod')
dynamodb = my_session.client('dynamodb')
table_name = 'customers'
partition_key = 'customer_id'
# Scan the table for random values of the partition key
scan_params = {
'TableName': table_name,
'Limit': num
}
response = dynamodb.scan(**scan_params)

items = response['Items']
random_partition_keys = [item[partition_key]['S'] for item in items]
return random_partition_keys

#Query DynamoDB for a given list of customer IDs (representing a Spark partition)
#This function is meant to be passed into a mapPartitions function in Spark.
def query_dynamo_for_a_partition(partition):
# Create a DynamoDB client
my_session = boto3.Session(profile_name='hwe_prod')
dynamodb = my_session.client('dynamodb')
table_name = 'customers'
people = []
for item in partition:
key = {
'customer_id': {'S': item.customer_id}
}
response = dynamodb.get_item(TableName=table_name,Key=key)
item = response['Item']
person = Person(item['name']['S'], item['birthdate']['S'], item['customer_id']['S'])
people.append(person)
return people


## ## ## # # #
# # # # # # # ## #
# # # # # # # # #
# # ###### # # # #
# # # # # # ##
# # # # # # #
def main():
spark = SparkSession \
.builder \
.appName("HWE Week 5 App") \
.getOrCreate()

spark.sparkContext.setLogLevel('WARN')

#This will be coming from Kafka in reality...
input_data = build_fake_input_data(num_records=500, spark=spark, num_partitions=10)

#Enrich from DynamoDB using mapPartitions
people = input_data.rdd.mapPartitions(lambda partition: query_dynamo_for_a_partition(partition)).toDF(["name", "birthdate", "customer_id"])
print(people.collect())


if __name__ == "__main__":
main()