This repository has been archived by the owner on Jul 23, 2024. It is now read-only.
forked from bcolloran/deorphaning_mrjob
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnaiveHeadDocIdExtraction.py
67 lines (48 loc) · 2.76 KB
/
naiveHeadDocIdExtraction.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from mrjob.job import MRJob
import mrjob
import sys, codecs
sys.stdout = codecs.getwriter('utf-8')(sys.stdout)
import random
class headRecordExtractionJob(MRJob):
HADOOP_INPUT_FORMAT="org.apache.hadoop.mapred.TextInputFormat"
INPUT_PROTOCOL = mrjob.protocol.RawProtocol
INTERNAL_PROTOCOL = mrjob.protocol.JSONProtocol
OUTPUT_PROTOCOL = mrjob.protocol.RawProtocol
def mapper(self,partId, docId_tieBreakInfo):
# this must be:
# k: "partId"
# v: "{docId}|{thisPingDate}_{numAppSessionsPreviousOnThisPingDate}_{currentSessionTime}"
self.increment_counter("MAPPER", "input: docIds with tieBreakInfo (and partId)")
yield(partId,docId_tieBreakInfo)
def reducer(self,partId, iter_docIdAndTieBreakInfo):
self.increment_counter("REDUCER", "number of parts")
maxRecordDocIdList = None
maxRecordTieBreakInfo = ("0000-00-00",0,0)
# saveIter=[]
for docIdAndTieBreakInfo in iter_docIdAndTieBreakInfo:
docId, tieBreakInfoStr = docIdAndTieBreakInfo.split("|")
tieBreakInfo = tuple(tieBreakInfoStr.split("_"))
if tieBreakInfo>maxRecordTieBreakInfo:
#if this is the maximal record, update the maxRecordTieBreakInfo and reset the maxRecordDocIdList
# print partId,docId,tieBreakInfo
maxRecordDocIdList=[docId]
maxRecordTieBreakInfo=tieBreakInfo
elif tieBreakInfo==maxRecordTieBreakInfo:
#if this record is tied for maximal record, add it to the list of record tups that tie for max
maxRecordDocIdList+=[docId]
#not sure why this was happening, but for some record(s) maxRecordDocIdList was not being set, which means that for all records with the given fingerprint, it must be that:
# (thisPingDate, numAppSessionsPreviousOnThisPingDate, currentSessionTime) < ("0000-00-00",0,0)
# this should only be possible if there is a bad thisPingDate, in which case we will discard the fingerprint
if maxRecordDocIdList:
if len(maxRecordDocIdList)==1:
docIdOut = maxRecordDocIdList[0]
self.increment_counter("REDUCER", "unique naive head records")
self.increment_counter("REDUCER", "FINAL HEAD RECORD docIds OUT")
if len(maxRecordDocIdList)>1:
docIdOut = random.choice(maxRecordDocIdList)
self.increment_counter("REDUCER", "parts with records tied for naive head")
self.increment_counter("REDUCER", "records tied for naive head",len(maxRecordDocIdList))
self.increment_counter("REDUCER", "FINAL HEAD RECORD docIds OUT")
yield(docIdOut, "h")
if __name__ == '__main__':
headRecordExtractionJob.run()