-
Notifications
You must be signed in to change notification settings - Fork 27
/
Copy pathship_routes.py
38 lines (29 loc) · 1.25 KB
/
ship_routes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# RUN: ./graphframes.sh ship_routes.py
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import lead, col, explode
from pyspark.sql.window import Window
from graphframes import *
from graphframes.examples import Graphs
sc = SparkContext('local', 'barcos')
sq = SQLContext(sc)
csv = sc.textFile("data/ship_routes.csv") \
.map(lambda c: c.split("|")) \
.map(lambda c: (c[0], c[1], c[4]))
sequential_route = sq.createDataFrame(csv, ["order", "ship_imo", "country_code"])
sequential_route.orderBy("ship_imo", "order").show()
w = Window().partitionBy("ship_imo").orderBy(col("order"))
routes = sequential_route.select("*", lead("country_code").over(w).alias("dst")).na.drop()
routes.orderBy("ship_imo", "order").show()
edges = routes.select(col("country_code").alias("src"), col("dst"), col("ship_imo"))
# edges.show(100)
countries_rdd = sc \
.textFile('./data/country_codes.csv') \
.map(lambda c: tuple(reversed(c.split(','))))
vertices = sq.createDataFrame(countries_rdd, ["id", "country_label"])
# vertices.show(100)
g = GraphFrame(vertices, edges)
results = g.shortestPaths(landmarks=["AT", "GS"]) \
.select("id", "country_label", explode("distances"))
results.show(200)