-
Notifications
You must be signed in to change notification settings - Fork 27
/
container_convertir_a_parquet.py
38 lines (28 loc) · 1.11 KB
/
container_convertir_a_parquet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
from helpers import container_fields, parse_container
sc = SparkContext('local', 'barcos')
sq = SQLContext(sc)
csv_source = sc \
.textFile('data/containers_tiny.csv') \
.filter(lambda s: not s.startswith(container_fields[0])) \
.map(parse_container) \
.map(lambda c: Row(**dict(c._asdict())))
print(csv_source.count())
# Python 2.7.6 to 3.5
# http://stackoverflow.com/a/26180604
# .map(lambda c: Row(**dict(c.__dict__)))
# Convert RDD to a DataFrame (in scala, DataSet[Row])
# It will preserve types from the RDD ones. Note it
# won't do anything fancy, since the namedtuple types
# are just strings.
containerSchema = sq.createDataFrame(csv_source)
containerSchema.createOrReplaceTempView('container')
containerSchema.printSchema()
denmark_only = sq.sql("SELECT ship_name FROM container WHERE country = 'DK'")
print(denmark_only.first())
todo_df = sq.sql("SELECT * FROM container")
todo_df.printSchema()
outpath = 'data/containers_tiny.parquet'
todo_df.write.mode('overwrite').parquet(outpath)
print("\nDatos guardados en", outpath)