forked from MISP/misp-dashboard
-
Notifications
You must be signed in to change notification settings - Fork 0
/
util.py
124 lines (102 loc) · 3.82 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import datetime
import time
from collections import defaultdict
ONE_DAY = 60*60*24
def getZrange(serv_redis_db, keyCateg, date, topNum, endSubkey=""):
date_str = getDateStrFormat(date)
keyname = "{}:{}{}".format(keyCateg, date_str, endSubkey)
data = serv_redis_db.zrange(keyname, 0, topNum-1, desc=True, withscores=True)
data = [ [record[0], record[1]] for record in data ]
return data
def noSpaceLower(text):
return text.lower().replace(' ', '_')
def push_to_redis_zset(serv_redis_db, mainKey, toAdd, endSubkey="", count=1):
now = datetime.datetime.now()
today_str = getDateStrFormat(now)
keyname = "{}:{}{}".format(mainKey, today_str, endSubkey)
serv_redis_db.zincrby(keyname, count, toAdd)
def getMonthSpan(date):
ds = datetime.datetime(date.year, date.month, 1)
dyear = 1 if ds.month+1 > 12 else 0
dmonth = -12 if ds.month+1 > 12 else 0
de = datetime.datetime(ds.year + dyear, ds.month+1 + dmonth, 1)
delta = de - ds
to_return = []
for i in range(delta.days):
to_return.append(ds + datetime.timedelta(days=i))
return to_return
def getXPrevDaysSpan(date, days):
de = date
ds = de - datetime.timedelta(days=days)
delta = de - ds
to_return = []
for i in range(delta.days+1):
to_return.append(de - datetime.timedelta(days=i))
return to_return
def getXPrevHoursSpan(date, hours):
de = date
de = de.replace(minute=0, second=0, microsecond=0)
ds = de - datetime.timedelta(hours=hours)
delta = de - ds
to_return = []
for i in range(0, int(delta.total_seconds()/3600)+1):
to_return.append(de - datetime.timedelta(hours=i))
return to_return
def getHoursSpanOfDate(date, adaptToFitCurrentTime=True, daySpanned=6):
ds = date
ds = ds.replace(hour=0, minute=0, second=0, microsecond=0)
to_return = []
now = datetime.datetime.now()
for i in range(0, 24):
the_date = ds + datetime.timedelta(hours=i)
if the_date > now or the_date < now - datetime.timedelta(days=daySpanned): # avoid going outside
continue
to_return.append(the_date)
return to_return
def getDateStrFormat(date):
return str(date.year)+str(date.month).zfill(2)+str(date.day).zfill(2)
def getDateHoursStrFormat(date):
return getDateStrFormat(date)+str(date.hour)
def getTimestamp(date):
return int(time.mktime(date.timetuple()))
def sortByTrendingScore(toSort, topNum=5):
scoredLabels = defaultdict(float)
numDay = len(toSort)
baseDecay = 1.0
decayRate = lambda x: baseDecay*((numDay-x**2)/numDay)
for i, arr in enumerate(toSort):
timestamp = arr[0]
dailyData = arr[1]
for item in dailyData:
label = item[0]
occ = item[1]
scoredLabels[label] += occ*decayRate(i)
topList = [[l, s] for l, s in scoredLabels.items()]
topList.sort(key=lambda x: x[1], reverse=True)
topSet = [ l for l, v in topList[:topNum]]
# now that we have the top, filter out poor scored elements
topArray = []
for arr in toSort:
timestamp = arr[0]
dailyData = arr[1]
topDailyArray = list(filter(lambda item: (item[0] in topSet), dailyData))
dailyCombi = [timestamp, topDailyArray]
topArray.append(dailyCombi)
return topArray
def getFields(obj, fields):
jsonWalker = fields.split('.')
itemToExplore = obj
lastName = ""
try:
for i in jsonWalker:
itemToExplore = itemToExplore[i]
lastName = i
if type(itemToExplore) is list:
return {'name': lastName, 'data': itemToExplore}
else:
if i == 'timestamp':
itemToExplore = datetime.datetime.utcfromtimestamp(
int(itemToExplore)).strftime('%Y-%m-%d %H:%M:%S')
return itemToExplore
except KeyError as e:
return None