-
Notifications
You must be signed in to change notification settings - Fork 0
/
degrees-of-separation.py
130 lines (96 loc) · 3.61 KB
/
degrees-of-separation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#Boilerplate stuff:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("DegreesOfSeparation")
sc = SparkContext(conf = conf)
# The characters we wish to find the degree of separation between:
startCharacterID = 5306 # SpiderMan
targetCharacterID = 14 # ADAM 3,031 (who?)
# Our accumulator, used to signal when we find the target character during
# our BFS traversal.
hitCounter = sc.accumulator(0)
def convertToBFS(line):
fields = line.split()
heroID = int(fields[0])
connections = []
for connection in fields[1:]:
connections.append(int(connection))
color = 'WHITE'
distance = 9999
if heroID == startCharacterID:
color = 'GRAY'
distance = 0
return heroID, (connections, distance, color)
def createStartingRdd():
inputFile = sc.textFile("marvel-graph.txt")
return inputFile.map(convertToBFS)
def bfsMap(node):
characterID = node[0]
data = node[1]
connections = data[0]
distance = data[1]
color = data[2]
results = []
# If this node needs to be expanded...
if color == 'GRAY':
for connection in connections:
newCharacterID = connection
newDistance = distance + 1
newColor = 'GRAY'
if targetCharacterID == connection:
hitCounter.add(1)
newEntry = (newCharacterID, ([], newDistance, newColor))
results.append(newEntry)
# We've processed this node, so color it black
color = 'BLACK'
# Emit the input node so we don't lose it.
results.append((characterID, (connections, distance, color)))
return results
def bfsReduce(data1, data2):
edges1 = data1[0]
edges2 = data2[0]
distance1 = data1[1]
distance2 = data2[1]
color1 = data1[2]
color2 = data2[2]
distance = 9999
color = color1
edges = []
# See if one is the original node with its connections.
# If so preserve them.
if len(edges1) > 0:
edges.extend(edges1)
if len(edges2) > 0:
edges.extend(edges2)
# Preserve minimum distance
if distance1 < distance:
distance = distance1
if distance2 < distance:
distance = distance2
# Preserve darkest color
if color1 == 'WHITE' and (color2 == 'GRAY' or color2 == 'BLACK'):
color = color2
if color1 == 'GRAY' and color2 == 'BLACK':
color = color2
if color2 == 'WHITE' and (color1 == 'GRAY' or color1 == 'BLACK'):
color = color1
if color2 == 'GRAY' and color1 == 'BLACK':
color = color1
return edges, distance, color
# Main program here:
iterationRdd = createStartingRdd()
for iteration in range(0, 10):
print("Running BFS iteration# " + str(iteration+1))
# Create new vertices as needed to darken or reduce distances in the
# reduce stage. If we encounter the node we're looking for as a GRAY
# node, increment our accumulator to signal that we're done.
mapped = iterationRdd.flatMap(bfsMap)
# Note that mapped.count() action here forces the RDD to be evaluated, and
# that's the only reason our accumulator is actually updated.
print("Processing " + str(mapped.count()) + " values.")
if hitCounter.value > 0:
print("Hit the target character! From " + str(hitCounter.value) \
+ " different direction(s).")
break
# Reducer combines data for each character ID, preserving the darkest
# color and shortest path.
iterationRdd = mapped.reduceByKey(bfsReduce)