-
Notifications
You must be signed in to change notification settings - Fork 0
/
write_to_delta_lake
50 lines (32 loc) · 1.24 KB
/
write_to_delta_lake
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from delta import *
spark = SparkSession \
.builder \
.appName("word_counter") \
.config('spark.jars.packages','io.delta:delta-core_2.12:2.1.0') \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
inputPath = "./text.txt"
schema1 = StructType([StructField('word', StringType(), True)])
print('step1')
inputDF = spark.read.option("delimiter", "/t").schema(schema1).csv(inputPath)
inputDF.show()
print('step2')
inputDF = inputDF.select(split(col("word")," ").alias("word"))
inputDF.show()
print('step3')
inputDF = inputDF.select(inputDF.word,explode(inputDF.word)).withColumnRenamed("col","unique_word")
inputDF.show()
print('step4')
inputDF = inputDF.filter(inputDF["unique_word"] != "")
inputDF.show()
print('step5')
inputDF = inputDF.groupBy("unique_word").count()
inputDF.show()
print('writing to delta')
inputDF.write.mode("overwrite").format("delta").save("/tmp/delta-table")
DF_from_Delta = spark.read.format("delta").load("/tmp/delta-table")
DF_from_Delta.show()