-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjoin_selection_benchmark.py
60 lines (39 loc) · 1.88 KB
/
join_selection_benchmark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
from pyspark.sql import SparkSession
import sys, time
spark = SparkSession.builder.appName('study-joins').getOrCreate()
HDFS_PATH = 'hdfs://master:9000/files'
'''
This functions executed a query performing a join.
Fill the necessary parts of the function to tune the optimizer
regarding the join selection.
@arguments:
- disabled: If the "disabled" argument is set to
False, the optimizer should not perform join selection.
@returns:
The execution time of performing the benchmark join and writing the result
back to HDFS in parquet format
@TO-DOs:
1. Fill in spark.conf.set the appropriate spark optimizer property and
property value to disable the optimizer, if user asks so.
2. Set HDFS_PATH to the hdfs folder containing the charts and regions
parquet files
'''
def join_experiment(disabled = False):
if disabled:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
df = spark.read.format("parquet")
df1 = df.load(HDFS_PATH + "/charts.parquet")
df2 = df.load(HDFS_PATH + "/regions.parquet")
df1.registerTempTable("charts")
df2.registerTempTable("regions")
sqlString = 'select c.*, r._c1 as region_name from charts as c, regions as r where c._c4 = r._c0'
t1 = time.time()
spark.sql(sqlString).write.mode('overwrite').parquet(HDFS_PATH + "/joined_with_optimizer_join_selection_" + str(not disabled) + ".parquet")
t2 = time.time()
print("***************** PHYSICAL PLAN OF JOIN EXPERIMENT WITH OPTIMIZER JOIN SELECTION (DISABLED = " + str(disabled) + ")")
spark.sql(sqlString).explain()
return t2 - t1
enabled_time = join_experiment()
disabled_time = join_experiment(disabled = True)
times = [('Enabled', enabled_time), ('Disabled', disabled_time)]
spark.createDataFrame(data=times, schema = ['Optimizer Join Selection', 'Execution Time']).write.mode('overwrite').option('header', 'true').csv(HDFS_PATH + "/join_experiment.csv")