From 4bde63e3114ec02861d173178f4150587490a303 Mon Sep 17 00:00:00 2001 From: Tim Sagona Date: Mon, 19 Jun 2023 17:24:18 -0500 Subject: [PATCH 1/4] Adding week5_app.py which can enrich fake data from DyanmoDB using mapPartitions --- .../sparkhelloworld/week5_app.py | 89 +++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 cloud-streaming-pipeline/sparkhelloworld/week5_app.py diff --git a/cloud-streaming-pipeline/sparkhelloworld/week5_app.py b/cloud-streaming-pipeline/sparkhelloworld/week5_app.py new file mode 100644 index 0000000..de45bde --- /dev/null +++ b/cloud-streaming-pipeline/sparkhelloworld/week5_app.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +A pyspark program which enriches a dataset from DynamoDB +""" +from pyspark.sql import SparkSession +from pyspark.sql.functions import * +from pyspark.sql.types import StructType, StructField, StringType, IntegerType +import boto3 +from collections import namedtuple + +#Use a namedtuple as a class to represent the data coming back from DynamoDB +#(Like a Scala case class) +Person = namedtuple("Person", ["name", "birthdate", "customer_id"]) + +#Fake input data, this will come from Kafka in reality +def build_fake_input_data(num_records, num_partitions, spark): + keys = get_random_customer_ids(num_records) + # Define the schema for the DataFrame + # Note: I only want this to have 1 field: customer_id. But I get an error? + # If I add a 2nd bogus field everything works fine? + schema = StructType([ + StructField("customer_id", StringType(), nullable=False), + StructField("bogus", StringType(), nullable=False) + ]) + keys_with_bogus_data = [(key, "bogus") for key in keys] + return spark.createDataFrame(data=keys_with_bogus_data, schema=schema).repartition(num_partitions) + +#Get random customer IDs from DynamoDB customers table +def get_random_customer_ids(num): + my_session = boto3.Session(profile_name='hwe_prod') + dynamodb = my_session.client('dynamodb') + table_name = 'customers' + partition_key = 'customer_id' + # Scan the table for random values of the partition key + scan_params = { + 'TableName': table_name, + 'Limit': num + } + response = dynamodb.scan(**scan_params) + + items = response['Items'] + random_partition_keys = [item[partition_key]['S'] for item in items] + return random_partition_keys + +#Query DynamoDB for a given list of customer IDs (representing a Spark partition) +#This function is meant to be passed into a mapPartitions function in Spark. +def query_dynamo_for_a_partition(partition): + # Create a DynamoDB client + my_session = boto3.Session(profile_name='hwe_prod') + dynamodb = my_session.client('dynamodb') + table_name = 'customers' + people = [] + for item in partition: + key = { + 'customer_id': {'S': item.customer_id} + } + response = dynamodb.get_item(TableName=table_name,Key=key) + item = response['Item'] + person = Person(item['name']['S'], item['birthdate']['S'], item['customer_id']['S']) + people.append(person) + return people + + + ## ## ## # # # + # # # # # # # ## # + # # # # # # # # # + # # ###### # # # # + # # # # # # ## + # # # # # # # +def main(): + spark = SparkSession \ + .builder \ + .appName("HWE Week 5 App") \ + .getOrCreate() + + spark.sparkContext.setLogLevel('WARN') + + #This will be coming from Kafka in reality... + fake_input_data = build_fake_input_data(num_records=500, spark=spark, num_partitions=10) + + #Enrich from DynamoDB using mapPartitions + people = fake_input_data.rdd.mapPartitions(lambda partition: query_dynamo_for_a_partition(partition)).toDF(["name", "birthdate", "customer_id"]) + print(people.collect()) + + +if __name__ == "__main__": + main() \ No newline at end of file From 13c0522fa3b9969a20d34384f926532edbae49cb Mon Sep 17 00:00:00 2001 From: Tim Sagona Date: Tue, 20 Jun 2023 11:51:00 -0500 Subject: [PATCH 2/4] Adding run script --- cloud-streaming-pipeline/run_week5_app.sh | 2 ++ 1 file changed, 2 insertions(+) create mode 100755 cloud-streaming-pipeline/run_week5_app.sh diff --git a/cloud-streaming-pipeline/run_week5_app.sh b/cloud-streaming-pipeline/run_week5_app.sh new file mode 100755 index 0000000..0a034b9 --- /dev/null +++ b/cloud-streaming-pipeline/run_week5_app.sh @@ -0,0 +1,2 @@ +spark-submit \ +sparkhelloworld/week5_app.py From 0924adc41a536a96ec6fce6a3584f0d9545acd38 Mon Sep 17 00:00:00 2001 From: Tim Sagona Date: Tue, 20 Jun 2023 13:48:47 -0500 Subject: [PATCH 3/4] Fixing struct to contain only 1 value --- .../sparkhelloworld/week5_app.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/cloud-streaming-pipeline/sparkhelloworld/week5_app.py b/cloud-streaming-pipeline/sparkhelloworld/week5_app.py index de45bde..0656504 100644 --- a/cloud-streaming-pipeline/sparkhelloworld/week5_app.py +++ b/cloud-streaming-pipeline/sparkhelloworld/week5_app.py @@ -16,16 +16,16 @@ #Fake input data, this will come from Kafka in reality def build_fake_input_data(num_records, num_partitions, spark): - keys = get_random_customer_ids(num_records) + #Have to make sure the element is a 1-item tuple. The trailing comma + #that looks unnecessary/dangling in the expression: + # (key,) + #is crucial to ensuring it creates a tuple instead of a string. Do not remove it! + keys = [(key,) for key in get_random_customer_ids(num_records)] # Define the schema for the DataFrame - # Note: I only want this to have 1 field: customer_id. But I get an error? - # If I add a 2nd bogus field everything works fine? schema = StructType([ - StructField("customer_id", StringType(), nullable=False), - StructField("bogus", StringType(), nullable=False) + StructField("customer_id", StringType(), nullable=False) ]) - keys_with_bogus_data = [(key, "bogus") for key in keys] - return spark.createDataFrame(data=keys_with_bogus_data, schema=schema).repartition(num_partitions) + return spark.createDataFrame(data=keys, schema=schema).repartition(num_partitions) #Get random customer IDs from DynamoDB customers table def get_random_customer_ids(num): From 9d914aa0962e0687edd9bb51fb8b4cc7cb041103 Mon Sep 17 00:00:00 2001 From: Tim Sagona Date: Tue, 20 Jun 2023 13:52:09 -0500 Subject: [PATCH 4/4] Renaming variable --- cloud-streaming-pipeline/sparkhelloworld/week5_app.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cloud-streaming-pipeline/sparkhelloworld/week5_app.py b/cloud-streaming-pipeline/sparkhelloworld/week5_app.py index 0656504..27291e1 100644 --- a/cloud-streaming-pipeline/sparkhelloworld/week5_app.py +++ b/cloud-streaming-pipeline/sparkhelloworld/week5_app.py @@ -78,10 +78,10 @@ def main(): spark.sparkContext.setLogLevel('WARN') #This will be coming from Kafka in reality... - fake_input_data = build_fake_input_data(num_records=500, spark=spark, num_partitions=10) + input_data = build_fake_input_data(num_records=500, spark=spark, num_partitions=10) #Enrich from DynamoDB using mapPartitions - people = fake_input_data.rdd.mapPartitions(lambda partition: query_dynamo_for_a_partition(partition)).toDF(["name", "birthdate", "customer_id"]) + people = input_data.rdd.mapPartitions(lambda partition: query_dynamo_for_a_partition(partition)).toDF(["name", "birthdate", "customer_id"]) print(people.collect())