-
Notifications
You must be signed in to change notification settings - Fork 0
/
jaggedSketchImproved.py
executable file
·280 lines (244 loc) · 8.66 KB
/
jaggedSketchImproved.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
#!/usr/bin/python3
from random import random
from math import log
# CONSTANTS
SMALLEST_MEANINGFUL_SECTION_SIZE = 4
INIT_SECTIONS = 1.5
class JaggedSketch:
def __init__(self, epsilon=0.01, delta=0.01, important_quantiles={0},
constant_J=0.5, improvement_for_high_ranks=True):
if epsilon <= 0 or epsilon > 1:
raise ValueError("epsilon must be between 0 and 1")
if delta <= 0 or delta > 0.5:
raise ValueError("delta must be between 0 and 0.5")
if constant_J < 0:
raise ValueError("J must be non-negative")
if not all(x >= 0 and x <= 1 for x in important_quantiles):
raise ValueError("All important quantiles must be between 0 and 1")
if constant_J != 0 and important_quantiles == set():
raise ValueError("with no important quentiles, j must equal 0")
# Set of ranks with higher accuracy given as an imput to the quantile function
self.important_quantiles = important_quantiles
# Gives importance of ranks in Q;
# J=0 means all ranks have the same importance
self.J = constant_J
# Relative error for the desired rank in Q (quarantee from the theory)
self.epsilon = epsilon
# Error improvement for high ranks
self.improvement_for_high_ranks = improvement_for_high_ranks
# Delta is the probability of error larger than epsilon for given query
self.probability_constant = log(1/delta)**0.5
# Size of the input summarized
self.N = 0
# Current number of saved items
self.size = 0
# Sum of capacities of all compactors
self.capacity = 0
# Levels corresponding to important quantiles
self.important_levels = set()
self.compactors = []
self.compactors.append(RelativeCompactor(self))
self.compactors[0].set_capacity_and_section_size()
def H(self):
return len(self.compactors)
# Adds new compactor to the sketch
def grow(self):
# Add a new compactor
self.compactors.append(RelativeCompactor(self))
self.compactors[-1].set_capacity_and_section_size()
# Do the full compaction for all compactors
for (h, compactor) in enumerate(self.compactors[:self.H()-1]):
self.compactors[h+1].extend(compactor.full_compaction())
while self.compactors[-1].is_full():
self.compactors.append(RelativeCompactor(self))
self.compactors[-1].set_capacity_and_section_size()
self.compactors[-1].extend(self.compactors[-2].full_compaction())
# Update all the parameters
self.update_important_levels()
for c in self.compactors:
c.set_capacity_and_section_size()
# Adds new item to the skech
def update(self, item):
self.compactors[0].append(item)
self.N += 1
self.size += 1
if self.size >= self.capacity:
self.compress()
assert self.size < self.capacity
# Do the compaction on level zero and possibly on higher levels
def compress(self):
for (h, compactor) in enumerate(self.compactors):
if compactor.is_full():
if h+1 == self.H():
self.grow()
return
self.compactors[h+1].extend(compactor.normal_compaction())
# Be lazy and do not continue under capacity
if self.size < self.capacity:
return
# Find the right levels corresponding to the quantiles
# We assume that when this function is called, all compactors are sorted
def update_important_levels(self):
self.important_levels.clear()
for q in self.important_quantiles:
x = self.quantile(q) # item with appropriate quantile
# binary seach for the right level
i = 0
j = self.H() - 1
while i < j-1:
m = (i + j) // 2
level_min = self.compactors[m][0]
if x >= level_min:
i = m
else:
j = m
# save the calculated level
self.important_levels.add(i)
# Computes a list of items and their ranks
def ranks(self):
ranks_list = []
items_and_weights = []
for (h, items) in enumerate(self.compactors):
items_and_weights.extend( (item, 2**h) for item in items )
items_and_weights.sort()
cum_weight = 0
for (item, weight) in items_and_weights:
cum_weight += weight
ranks_list.append( (item, cum_weight) )
return ranks_list
# Computes cummulative distribution function (as a list of items
# and their ranks expressed as a number in [0,1])
def cdf(self):
cdf = []
rank_list = self.ranks()
_, total_weight = rank_list[-1]
for (item, cum_weight) in rank_list:
cdf.append( (item, cum_weight / total_weight) )
return cdf
# Returns an approximate rank of value
def rank(self, value):
return sum(c.rank(value)*2**h for (h, c) in enumerate(self.compactors))
# Returns an input item which is approx. q-quantile
# (i.e. has rank approx. q*self.N)
def quantile(self, q):
assert (q >= 0 and q <= 1), f"parameter q must be in [0, 1], but q = {q}"
desired_rank = q*self.N
ranks = self.ranks()
i = 0
j = len(ranks)
while i < j:
m = (i + j) // 2
(item, rank) = ranks[m]
if desired_rank > rank:
i = m + 1
else: j = m
(item, rank) = ranks[i]
return item
class RelativeCompactor(list):
def __init__(self, sketch):
self.num_compactions = 0 # Number of compaction operations performed
self.state = 0 # State of the deterministic compaction schedule
self.offset = 0 # Indicator for taking even or odd items
self.shift = 0 # Indicator for shifting the compacted part by one item
self.sketch = sketch
self.h = sketch.H() # height (level) of the compactor
self.capacity = None
self.section_size = None
def rank(self, value):
return sum(1 for v in self if v <= value)
def is_full(self):
return len(self) >= self.capacity
def reset_compaction_schedule(self):
self.state = 0
self.set_capacity_and_section_size()
def set_capacity(self):
old_capacity = self.capacity if self.capacity != None else 0
if self.sketch.improvement_for_high_ranks:
self.capacity = int(self.sketch.probability_constant *
self.sketch.H()**(0.5 + min(1, self.sketch.J)) /
(self.scale() * self.sketch.epsilon)
)
else:
self.capacity = int(self.sketch.probability_constant *
self.sketch.H()**min(1, self.sketch.J) *
log(2 + self.num_compactions, 2)**0.5 /
(self.scale() * self.sketch.epsilon)
)
self.sketch.capacity += self.capacity - old_capacity
def set_section_size(self):
self.section_size = int(
self.capacity /
( 2 * INIT_SECTIONS * log(2 + self.num_compactions, 2) )
)
def set_capacity_and_section_size(self):
self.set_capacity()
self.set_section_size()
# Chooses a scaling factor by the distance to the closest important level
def scale(self):
# distance from the closest important level
if len(self.sketch.important_levels) > 0:
dist = min([abs(self.h-l) for l in self.sketch.important_levels])
else:
dist = 0
# choose the scaling factor (based on J parameter)
if dist == 0:
scale = 1
elif dist == 1:
scale = 1.5**self.sketch.J
else:
scale = dist**self.sketch.J
return min(scale, self.sketch.H())
# Counts the number of protected items based on the compaction schedule
def count_protected(self):
right_part = self.capacity // 2
rest = len(self) - self.capacity
section_size = self.section_size
# If the section size is too small we do not use the schedule
if section_size < SMALLEST_MEANINGFUL_SECTION_SIZE:
compacted = right_part + rest
else:
sections_to_compact = trailing_ones_binary(self.state) + 1
self.state += 1
right_compacted = sections_to_compact * section_size
# schedule overflow
if right_compacted >= right_part:
right_compacted = right_part
self.reset_compaction_schedule()
compacted = right_compacted + rest
compacted += compacted % 2
return len(self) - compacted
# Compacts everything except for the left half and resets the schedule
def full_compaction(self):
protected = self.capacity // 2 + 1
protected -= (len(self)-protected) % 2
self.reset_compaction_schedule()
for x in self.compact(protected):
yield x
# Standard compaction by the schedule
def normal_compaction(self):
assert self.is_full()
for x in self.compact(self.count_protected()):
yield x
# Compacts all items exept the smallest "protected"
def compact(self, protected):
assert len(self[protected: ]) % 2 == 0
self.sort()
# Set the random offset and random shift independently
# each choice every other time
if self.num_compactions % 2 == 1:
self.offset = 1 - self.offset
self.shift = int(random() < 0.5)
else:
self.offset = int(random() < 0.5)
self.shift = 1 - self.shift
# yield half of non-protected and delete all of them from self
for i in range(protected + self.offset - self.shift, len(self) - self.shift, 2):
yield self[i] # yield selected items
self.sketch.size -= len(self[protected: ]) // 2
del self[protected - self.shift : len(self) - self.shift]
self.num_compactions += 1
assert not self.is_full()
# AUXILIARY FUNCTIONS
def trailing_ones_binary(n):
s = str("{0:b}".format(n))
return len(s)-len(s.rstrip('1'))