Skip to content

Commit 080852f

Browse files
committed
MPI method for Apriori
1 parent bc6d4b1 commit 080852f

File tree

1 file changed

+126
-19
lines changed

1 file changed

+126
-19
lines changed

apriori.py apriori_mpi.py

+126-19
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,18 @@
22
import sys
33
import operator
44
import time
5+
from math import floor
6+
from mpi4py import MPI
7+
8+
from os import getcwd, walk, system, path
9+
10+
11+
comm = MPI.COMM_WORLD
12+
rank = comm.Get_rank()
513

614
start_time = time.clock()
715

8-
def find_frequent_1_itemsets(D, min_sup, row_count):
16+
def find_frequent_1_itemsets(D, min_sup):
917

1018
dataset = None
1119
itemset = {}
@@ -26,7 +34,7 @@ def find_frequent_1_itemsets(D, min_sup, row_count):
2634
Calculate frequent itemsets
2735
"""
2836
for item in itemset.copy():
29-
if itemset[item]/float(row_count) < min_sup:
37+
if itemset[item] < min_sup:
3038
itemset.pop(item, None)
3139

3240
return sorted(itemset.items(), key=operator.itemgetter(0))
@@ -61,15 +69,15 @@ def has_frequent_subset(c, L, k):
6169

6270
return True
6371

64-
def apriori_gen(L, k, row_count):
72+
def apriori_gen(L, k):
6573

6674
C = []
6775
for l1 in L:
6876
for l2 in L:
6977

7078
first_itemlist = l1[0].split(",")
7179
second_itemlist = l2[0].split(",")
72-
#print first_itemlist, second_itemlist
80+
7381
i = 0
7482
flag = True
7583
while i <= k-2-1:
@@ -111,15 +119,14 @@ def generate_association_rules(itemset, min_conf, row_count):
111119
if item_sup is not None and pair[1]/float(item_sup) >= min_conf:
112120
print ",".join(item), "=>", ",".join(list(set(pair[0].split(',')) - set(item))), "Support: ", float("{0:.2f}".format(float(item_sup)/row_count))*100, "%", "Confidence: ", float("{0:.2f}".format(pair[1]/float(item_sup)*100)), "%"
113121

114-
def main():
122+
def main(D):
115123

116124
"""
117125
Input: D, a dataset of transaction
118126
min_sup, the minimum support count threshold
119127
min_conf, the minimum confidence threshold
120128
"""
121129

122-
D = str(sys.argv[1])
123130
min_sup = float(sys.argv[2])
124131
min_conf = float(sys.argv[3])
125132

@@ -129,7 +136,10 @@ def main():
129136
dataset = csv.reader(f)
130137
row_count = sum(1 for row in dataset)
131138

132-
L1 = find_frequent_1_itemsets(D, min_sup, row_count)
139+
min_sup = min_sup * row_count
140+
min_conf = min_conf * row_count
141+
142+
L1 = find_frequent_1_itemsets(D, min_sup)
133143
itemset = [L1]
134144

135145

@@ -140,10 +150,10 @@ def main():
140150
if not itemset[k-2]:
141151
break
142152

143-
C = apriori_gen(itemset[k-2], k, row_count)
153+
C = apriori_gen(itemset[k-2], k)
144154
L = {}
145155

146-
156+
147157
with open(D, 'rb') as f:
148158
dataset = csv.reader(f)
149159
for t in dataset:
@@ -153,25 +163,122 @@ def main():
153163
L[c] += 1
154164
else:
155165
L[c] = 1
156-
166+
157167
for item in L.copy():
158-
if L[item]/float(row_count) < min_sup:
168+
if L[item] < min_sup:
159169
L.pop(item, None)
160170

161171
itemset.append(sorted(L.items(), key=operator.itemgetter(0)))
162172
k += 1
163173

164174
itemset.pop()
165-
generate_association_rules(itemset, min_conf, row_count)
166-
print "\nResultant Item sets:"
167175

168-
for k in range(1, len(itemset)):
169-
print "\n", k, "-itemsets:\n"
170-
for item in itemset[k]:
171-
print item[0], "| Support ", float("{0:.2f}".format(item[1]/float(row_count)))*100, "%"
176+
return itemset
172177

173178

174179
if __name__ == "__main__":
175-
main()
176180

177-
print "\nProgram Execution Time: ",time.clock() - start_time, " seconds"
181+
onlyfiles = []
182+
183+
if rank == 0:
184+
185+
"""
186+
Make a directory called "temp"
187+
to split given dataset with the number of processes
188+
"""
189+
190+
system("mkdir temp")
191+
dataset = str(sys.argv[1])
192+
num_process = comm.Get_size()
193+
file_size = int(floor(path.getsize(dataset)/(float(1000000) * num_process)))
194+
system("split --bytes=" + str(file_size)+"M " + dataset + " temp/retail")
195+
196+
197+
# Get current working directory
198+
cwd = getcwd()
199+
200+
"""
201+
Get list of files
202+
"""
203+
204+
for (dirpath, dirnames, filenames) in walk(cwd+"/temp"):
205+
onlyfiles.extend(filenames)
206+
break
207+
208+
# Get the dataset partition name
209+
dataset = comm.scatter(onlyfiles, root=0)
210+
211+
# Generate local frequent itemsets
212+
itemset = main("temp/"+dataset)
213+
214+
# Root process collects all the local frequent itemsets
215+
set_itemsets = comm.gather(itemset, root=0)
216+
217+
if rank == 0:
218+
219+
"""
220+
Merge all the local frequent itemset gathered from processes according to their size
221+
"""
222+
223+
itemsetsi = []
224+
225+
max_itemsets_length = max([len(itemsets) for itemsets in set_itemsets])
226+
227+
for i in xrange(0, max_itemsets_length):
228+
iset = set()
229+
230+
for j in xrange(0, num_process):
231+
temp_set = []
232+
if(i <= (len(set_itemsets[j])-1)):
233+
for item in set_itemsets[j][i]:
234+
temp_set.append(list(item)[0])
235+
236+
iset = iset.union(list(temp_set))
237+
238+
itemsetsi.append(dict((k,0) for k in list(iset)))
239+
240+
# Remove the non-empty temp directory
241+
system("rm -rf temp")
242+
243+
# Get the original dataset name
244+
D = str(sys.argv[1])
245+
246+
# Find candidate global frequent itemsets
247+
row_count = 0
248+
with open(D, 'rb') as f:
249+
dataset = csv.reader(f)
250+
for t in dataset:
251+
for itemset in itemsetsi:
252+
for item in itemset:
253+
if set(item.split(",")).issubset(set(t)):
254+
itemset[item] += 1
255+
row_count += 1
256+
257+
# Remove non-frequent global itemsets
258+
min_sup = float(sys.argv[2])
259+
for itemset in itemsetsi:
260+
for item in itemset.copy():
261+
if (itemset[item]/float(row_count)) < min_sup:
262+
itemset.pop(item, None)
263+
264+
# Display Itemsets
265+
print "\nResultant Item sets:"
266+
267+
k = 1
268+
for itemset in itemsetsi:
269+
if bool(itemset):
270+
print "\n", k, "-itemsets:\n"
271+
k += 1
272+
for item in itemset:
273+
print item, "| Support ", float("{0:.2f}".format(itemset[item]/float(row_count)))*100, "%"
274+
275+
# Convert list of dictionaries into multi-dimensional list
276+
list_itemsets = [(sorted(itemset.items(), key=operator.itemgetter(0))) for itemset in itemsetsi if bool(itemset)]
277+
278+
# Get minimum confidence
279+
min_conf = float(sys.argv[3])
280+
281+
# Generate association rules
282+
generate_association_rules(list_itemsets, min_conf, row_count)
283+
284+
print "\nRank : ",rank, " - Program Execution Time: ",time.clock() - start_time, " seconds"

0 commit comments

Comments
 (0)