forked from feng-li/dlsa
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils_spark.py
84 lines (55 loc) · 2.24 KB
/
utils_spark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#! /usr/bin/env python3
from pyspark.sql.types import *
def convert_schema(usecols_x, dummy_info, fit_intercept):
'''Convert schema type for large data frame
'''
schema_fields = []
if len(dummy_info) == 0: # No dummy is used
for j in usecols_x:
schema_fields.append(StructField(j, DoubleType(), True))
else:
# Use dummy
convert_dummies = list(dummy_info['factor_selected'].keys())
for x in list(set(usecols_x) - set(convert_dummies)):
schema_fields.append(StructField(x, DoubleType(), True))
for i in convert_dummies:
for j in dummy_info["factor_selected_names"][i][fit_intercept:]:
schema_fields.append(StructField(j, DoubleType(), True))
if fit_intercept:
schema_fields.insert(0, StructField('intercept', DoubleType(), True))
return schema_fields
def clean_airlinedata_sdf():
usecols = ['Month', 'DayofMonth', 'DayOfWeek', 'DepTime', 'CRSDepTime',
'ArrTime', 'CRSArrTime', 'UniqueCarrier', 'ActualElapsedTime', 'AirTime',
'ArrDelay', 'DepDelay', 'Origin', 'Dest', 'Distance']
sdfraw0=spark.read.csv(file_path_hdfs[file_no_i],header=True)
sdf0 = sdfraw0.select(usecols)
sdf0.dropna()
data_sdf.createOrReplaceTempView("data_sdf")
data_sdf = spark.sql(
"""
select *, row_id%20 as partition_id
from (
select *, row_number() over (order by rand()) as row_id
from data_sdf
)
"""
)
from pyspark.sql import functions
return False
def insert_partition_id_sdf(data_sdf, partition_num, partition_method):
''''Insert arbitrary index to Spark DataFrame for partition
assign a row ID and a partition ID using Spark SQL
FIXME: WARN WindowExec: No Partition Defined for Window operation! Moving all data to a
single partition, this can cause serious performance
degradation. https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
'''
data_sdf.createOrReplaceTempView("data_sdf")
data_sdf = spark.sql("""
select *, row_id%20 as partition_id
from (
select *, row_number() over (order by rand()) as row_id
from data_sdf
)
""")
return data_sdf