-
Notifications
You must be signed in to change notification settings - Fork 1
/
bfs_seq.py
141 lines (113 loc) · 4.11 KB
/
bfs_seq.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#from joblib import Parallel, delayed
from multiprocessing import Pool, cpu_count
from collections import defaultdict
from math import log
import random
from scipy.stats import sem
import numpy as np
#global output_file
#from ic_bfs_eval import print_out
def loadSeedSet(fname):
seed_set = []
f = open(fname, 'r')
for line in f.readlines():
seed_set.append(int(line.strip()))
return seed_set
def IC_Instance(seed_set, threshold, sample):
R = defaultdict(bool)
distances = defaultdict(int)
Q = list(seed_set)
for u in seed_set:
distances[u] = 0
R[u] = True
nReached = len(seed_set)
while len(Q)>0 and nReached <= threshold:
# pop a vertex from the queue
u = Q.pop()
R[u]=True
out_neighbourhood = l_server.getOutNeighbourhood(u)
for v, p in out_neighbourhood:
if not R[v]:
random.seed()
t = random.random()
if t <= p:
Q.append(v)
R[v] = True
nReached += 1
distances[v] = distances[u] + 1
max_distance = max(distances.values())
return nReached, max_distance
def IC_Instance2(input):
return IC_Instance(input[0],input[1],input[2])
def GenerateParam(input_list, times):
for i in xrange(int(times)):
yield input_list + [i]
def IC_Sample(L, seed_set, threshold, nSamples=1, total_steps_cap = pow(10,30), num_cores=-1):
random.seed()
global l_server
l_server = L
adjacency_lists = L.adj_lists
if num_cores == -1:
pool = Pool()
else:
pool = Pool(num_cores)
#print "Sampling %d instances of the IC process"%nSamples
results = pool.map(IC_Instance2, GenerateParam([seed_set, threshold], nSamples))
pool.close()
pool.join()
if sum([x[0] for x in results]) > total_steps_cap:
return "exceeded", 1
nExceeded = sum([1 for x in results if x[0] >= threshold])
max_rounds = max([x[1] for x in results])
frac = 1. * nExceeded / nSamples
if not type(frac) is float:
print "ERROR! frac isnt a float"
print "frac = ", frac
raise ValueError()
return frac, max_rounds
def sequential_estimation(L, seeds, max_samples_cap = 0, nCores = -1, \
bReturnValues = False, min_samples = 0, min_relative_standard_error = -1):
#global output_file
global l_server
#g = open('report_bfs_seq.txt','w')
#g.write('Seed set: %s, max_samples_cap = %d, nCores = %d'%(str(seeds), max_samples_cap, nCores))
#g.close()
#output_file= "
l_server = L
nSamples = 0
n = l_server.getNumNodes()
avg = 0.0000001
samples_total = 0.0
bDone = False
if nCores == -1:
nCores = int(cpu_count())
rounds = 0
pool = Pool()
max_samples = max_samples_cap if max_samples_cap > 0 else int(n * log(n, 2))
spread_values = []
total_values = 0
while not bDone:
num_concurrent_process = min( max_samples - samples_total, nCores)
#g = open('bfs_seq_report.txt', 'w')
#g.write('running %d concurrent IC instances on seed set: %s\n'%(num_concurrent_process, str(seeds)))
#g.close()
ret_values = pool.map(IC_Instance2, GenerateParam([seeds, l_server.getNumNodes()], num_concurrent_process))
results = [r[0] for r in ret_values]
assert all([r <= n for r in results])
assert len(results) == num_concurrent_process
spread_values += results
total_values += sum(results)
samples_total += num_concurrent_process
bDone = samples_total >= max_samples
if min_relative_standard_error > 0:
std_error = sem(spread_values)/np.mean(spread_values)
if std_error <= min_relative_standard_error:
if min_samples <= 0 or (samples_total >= min_samples):
bDone = True
pool.close()
pool.join()
mean = 1. * total_values / samples_total
if bReturnValues:
return mean, samples_total, spread_values
else:
return mean, samples_total