forked from PaytmLabs/WeblogChallenge
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsessionize.spark.py
104 lines (90 loc) · 3.71 KB
/
sessionize.spark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
from datetime import datetime
from pyspark import SparkConf, SparkContext
import uuid
conf = SparkConf().setMaster("local").setAppName("PayTM")
sc = SparkContext(conf=conf)
SESSION_THRESHOLD = 15*60
#INPUT_FILE='sample.log'
INPUT_FILE='full.log'
def get_data(elem):
"""
Filter out the fields needed for analysis and convert them to proper type for easier and faster processing
:param elem: one line of the log
:return: K-V tuple of the ip address as key and the timestamp and url in a dict as value
"""
fields = elem.split()
ip = fields[2].split(':')[0]
ts = datetime.strptime(fields[0], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()
if '"' in fields[12]:
url='<invalid url according to rfc3986>'
else:
url=fields[12]
return (ip, {'ts': ts, 'url': url})
def avg_user_sessions(elem):
"""
Get the average length of the sessions associated to user/IP
:param elem: list of dicts of all sessions
:return: K-V tuple of the length average and IP address
"""
sum = 0.0
for s in elem:
sum += s['length']
return (sum/len(elem),elem[0]['ip'])
def tag_sessions(elem):
"""
Group all url hits from same client inside a session and calculate the length and uniq hits. It does all the heavy lifting to sessionize the data
:param elem: The input is the groupByKey tuple of the get_data() function.
:return: List of sessions. Each session is a dict with uuid and 5 attributes. Being a list, it is best to be called from flatMap, instead of map
"""
hits = elem[1]
ip = elem[0]
sessions = []
uniqs = set()
sorted_hits = sorted(hits, key=lambda x: x['ts'])
start_tstamp = sorted_hits[0]['ts']
previous_tstamp = start_tstamp
for hit in sorted_hits:
tstamp = hit['ts']
if tstamp - previous_tstamp < SESSION_THRESHOLD:
# we are in the same session, just roll over to the next hit
uniqs.add(hit['url'])
else:
# pack the previous session and start new session
sessions.append({'id': str(uuid.uuid4()),
'ip': ip,
'start': start_tstamp,
'end': previous_tstamp,
'hits': len(uniqs),
'length': previous_tstamp-start_tstamp})
start_tstamp = tstamp
uniqs = set()
# roll over the the next hit
previous_tstamp = tstamp
# after the for loop, we have to append the final session
sessions.append({'id': str(uuid.uuid4()),
'ip': ip,
'start': start_tstamp,
'end': previous_tstamp,
'hits': len(uniqs),
'length': previous_tstamp-start_tstamp})
return sessions
logs = sc.textFile(INPUT_FILE)
# 1)
sessions = logs.map(get_data).groupByKey().flatMap(tag_sessions)
# 2a) global avg
total_time = sessions.map(lambda item: item['length']).reduce(lambda a,b: a+b)
#print ("Global session avg: ", total_time / sessions.count())
# 2b) avg per user/IP
user_avg_session = logs.map(get_data).groupByKey().map(tag_sessions).map(avg_user_sessions).sortByKey(ascending=False)
# 3) The unique hits per session are already included in 1), here we will just sort them using that value as key
uniq_hits=sessions.map(lambda item: (item['hits'], item)).sortByKey(ascending=False)
# 4)
engaged_users = sessions.map(lambda item: (item['length'], item)).sortByKey(ascending=False)
## Answer section ##
# limit the answers to 10 entries each, for speed and simplicity
#answer1 = sessions.take(10)
#answer2 = user_avg_session.take(10)
#answer3 = uniq_hits.take(10)
answer4 = engaged_users.take(10)
for out in answer4:
print (out)