-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy path0_Load_data.py
134 lines (89 loc) · 3.89 KB
/
0_Load_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# Databricks notebook source
# MAGIC %md
# MAGIC
# MAGIC The github repo contains a small dataset, which we will load into a database called "emr_sample"; this will let you run through the exercises, but it is not really large enough to train a good ML model. A single-node cluster is sufficient for working with this dataset.
# MAGIC
# MAGIC We will provide a SAS token to people who want to stay after the workshop and re-run the model on the larger "missouri" dataset. If you want to work with the larger datast, you should set up a multi-node cluster, the run the "Load big dataset from storage container" section below. The only modification you should have to make in the other notebooks is to change 'use emr_sample' to 'use missouri'.
# COMMAND ----------
# MAGIC %md
# MAGIC
# MAGIC # Load sample data from github repo
# COMMAND ----------
displayHTML(f'''
<iframe
src="https://view.officeapps.live.com/op/view.aspx?src=https%3A%2F%2Fraw.githubusercontent.com%2Frmhorton%2FEMR-data-science%2Fmain%2FML_with_simulated_EMR.pptx&slide=8&wdSlideId=260"
frameborder="0"
width="80%"
height="640"
></iframe>
''')
# wdSlideIndex=8
# COMMAND ----------
import os
import zipfile
data_path = '/FileStore/emr_sample'
local_path = '/dbfs' + data_path
dbutils.fs.mkdirs(local_path)
with zipfile.ZipFile("sample_data.zip", "r") as zip_ref:
zip_ref.extractall(local_path)
## If you change your mind:
# dbutils.fs.rm(data_path, recurse=True)
# COMMAND ----------
# MAGIC %fs
# MAGIC
# MAGIC ls /FileStore
# COMMAND ----------
!ls -R /dbfs/FileStore
# COMMAND ----------
import os
import re
DB_NAME = "emr_sample"
spark.sql(f"create database if not exists {DB_NAME}")
spark.sql(f"use {DB_NAME}")
for file_info in dbutils.fs.ls('/FileStore/emr_sample/csv'):
table_name = re.sub('(.*)\\.csv$', '\\1', file_info.name).lower()
print(f"creating table '{DB_NAME}.{table_name}' from file {file_info.path}")
spark.read.options(header=True).csv(file_info.path).write.mode('overwrite').saveAsTable(table_name)
## If you change your mind:
# spark.sql(f"drop database {DB_NAME} cascade")
# COMMAND ----------
# MAGIC %md
# MAGIC
# MAGIC # Load big dataset from storage container
# COMMAND ----------
# DBTITLE 1,Enter your data connection information here
secrets = {'storage_account_name':'syntheauploadsa',
'container_name':'syntheadata1',
'data_path': '/missouri/2021_07_11T17_42_12Z_parquet',
'sas_token':'PUT_YOUR_SAS_TOKEN_HERE'}
# COMMAND ----------
if secrets['sas_token'] == 'PUT_YOUR_SAS_TOKEN_HERE':
displayHTML('''<blink><font color="red"><h1>You need to enter your connection info in the 'secrets' dict!</h1></font></blink>''')
# COMMAND ----------
# DBTITLE 1,Mount storage container to DBFS
DATA_SOURCE = "wasbs://{container_name}@{storage_account_name}.blob.core.windows.net".format(**secrets)
DATA_PATH = secrets['data_path']
DB_NAME = 'missouri'
MOUNT_POINT = f"/mnt/{DB_NAME}"
EXTRA_CONFIGS = {"fs.azure.sas.{container_name}.{storage_account_name}.blob.core.windows.net".format(**secrets): secrets['sas_token']}
CURRENTLY_MOUNTED = {mount_info.mountPoint for mount_info in dbutils.fs.mounts()}
if MOUNT_POINT in CURRENTLY_MOUNTED:
dbutils.fs.unmount(MOUNT_POINT)
dbutils.fs.mount(
source = DATA_SOURCE + DATA_PATH,
mount_point = MOUNT_POINT,
extra_configs = EXTRA_CONFIGS
)
[f.name for f in dbutils.fs.ls(MOUNT_POINT)]
# COMMAND ----------
# DBTITLE 1,Create new database and tables
import re
spark.sql(f"create database if not exists {DB_NAME}")
spark.sql(f"use {DB_NAME}")
for file_info in dbutils.fs.ls(MOUNT_POINT):
table_name = re.sub('(.*)\\.parquet/$', '\\1', file_info.name).lower()
print(f"creating table '{DB_NAME}.{table_name}' from file {file_info.path}")
spark.read.parquet(file_info.path).write.mode('overwrite').saveAsTable(table_name)
## If you change your mind:
# spark.sql(f"drop database {DB_NAME} cascade")
# COMMAND ----------