generated from databricks-industry-solutions/industry-solutions-blueprints
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path1_Ingest_Emails_Into_Lakehouse.py
55 lines (40 loc) · 1.88 KB
/
1_Ingest_Emails_Into_Lakehouse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# Databricks notebook source
# MAGIC %md
# MAGIC #Ingest Customer Support Emails into the Bronze Delta Table
# COMMAND ----------
# MAGIC %md
# MAGIC Customer support email application is integrated with Databricks platform. The components such as Azure LogicApps drop data into <a href="https://docs.databricks.com/en/connect/unity-catalog/volumes.html" target="_blank">Databricks Volumes</a> or directly ingest into Bronze Delta tables. In this notebook, we are assuming that the raw emails are dropped into Volume and we are ingesting data into Bronze Delta Table using Autoloader.
# MAGIC
# MAGIC Data used for this solution are fake emails being generated manually based on the real world emails received by electricity suppliers for their business customers.
# MAGIC
# MAGIC As we are using Autoloader, the solution can be implemented either using batch processing or stream processing.
# MAGIC
# COMMAND ----------
# MAGIC %run ./_resources/00-setup
# COMMAND ----------
# MAGIC %md ## Ingest data from Volume into Bronze Delta table
# MAGIC
# COMMAND ----------
bronzeDF = (spark.readStream \
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaLocation", config['schema_path']) \
.option("multiLine", "true") \
.option("escape", "\"") \
.option("quote","\"") \
.option("header", "true") \
.option("inferSchema","true") \
.option("rescuedDataColumn", "_rescued_data") \
.load(config['vol_data_landing']))
# COMMAND ----------
display(bronzeDF)
# COMMAND ----------
_ = (
bronzeDF
.writeStream
.format('delta')
.outputMode('append')
.option('checkpointLocation', f"{config['checkpoint_path']}/emails_bronze")
.toTable(config['table_emails_bronze'])
)
# COMMAND ----------