-
Notifications
You must be signed in to change notification settings - Fork 1
/
prelude.py
437 lines (347 loc) · 11.1 KB
/
prelude.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
'''
Some common functional programming (and just useful) functions
inspired by the likes of Clojure, Haskell and LISP.
NOTE: There is a naming convension of i<func_name> returning an
iterator and <func_name> returning a collection.
'''
from collections import Container
import itertools as itools
import functools as ftools
from copy import deepcopy
import operator as op
# Bring in functionality from the other modules
from .dispatch import dispatch_on
from .fmap import fmap
#############################################################
# Make some library functions available without namespacing #
#############################################################
combs_wr = itools.combinations_with_replacement
combs = itools.combinations
perms = itools.permutations
itakewhile = itools.takewhile
idropwhile = itools.dropwhile
groupby = itools.groupby
cprod = itools.product
mask = itools.compress # compress is a terrible name for this!
repeat = itools.repeat
chain = itools.chain
tee = itools.tee
# Functional built-ins also include:
# map filter lambda
reduce = ftools.reduce
# NOTE: having a shorter name for partial makes it a lot nicer
# to work with in nested function calls.
partial = fn = ftools.partial
add = op.add
sub = op.sub
mul = op.mul
div = op.truediv
floordiv = op.floordiv
# Flipping the argument order for comparisons because:
# 1) easier currying/partial application
# 2) as a function call it reads better as lt(x, y) == "is x less than y?"
lt = lambda a, b: op.lt(b, a)
le = lambda a, b: op.le(b, a)
gt = lambda a, b: op.gt(b, a)
ge = lambda a, b: op.ge(b, a)
eq = op.eq
neq = op.ne
####################
# Helper functions #
####################
def iscol(x):
'''
Allow distinguishing between string types and "true" containers
'''
if isinstance(x, Container):
if not isinstance(x, (str, bytes)):
return True
return False
##########################
# Higher order functions #
##########################
def zipwith(func):
'''
Returns a function that will combine elements of a zip using func.
`func` must be a binary operation.
'''
def zipper(*iterables):
return [reduce(func, elems) for elems in zip(*iterables)]
return zipper
def izipwith(func):
'''
Returns a function that will combine elements of a zip using func.
`func` must be a binary operation.
'''
def izipper(*iterables):
for elems in zip(*iterables):
yield reduce(func, elems)
return izipper
def compose(f, g):
'''Create a new function from calling f(g(*args, **kwargs))'''
def composition(*args, **kwargs):
return f(g(*args, **kwargs))
doc = ('The composition of calling {} followed by {}:\n'
'>>> {}\n"{}"\n\n>>> {}\n"{}"')
fname = f.__name__
fdoc = f.__doc__ if f.__doc__ else 'No docstring for {}'.format(fname)
gname = g.__name__
gdoc = g.__doc__ if g.__doc__ else 'No docstring for {}'.format(gname)
composition.__doc__ = doc.format(fname, gname, fname, fdoc, gname, gdoc)
return composition
def flip(func):
'''Flip the arguments to a binary operation'''
if getattr(func, '_before_flip', None):
# Don't nest function calls for flip(flip(func))
return func._before_flip
def flipped(a, b):
return func(b, a)
flipped._before_flip = func
return flipped
def revargs(func):
'''reverse the positional argument order of a function'''
pass
################################################
# Reductions and functions that return a value #
################################################
def nth(n, col):
'''
Return the nth element of a generator
'''
col = iter(col)
for k in range(n):
try:
element = next(col)
except StopIteration:
raise IndexError
return element
def foldl(col, func=add, acc=None):
'''
Fold a list into a single value using a binary function.
NOTE: This is just an alias for reduce with a reordered signature
Python's reduce is reduce(func, col, acc) which looks wrong to me...!
'''
if acc is not None:
return reduce(func, col, acc)
else:
return reduce(func, col)
def foldr(col, func=add, acc=None):
'''
Fold a list with a given binary function from the right
NOTE: Right folds and scans will blow up for infinite generators!
'''
try:
col = reversed(col)
except TypeError:
col = reversed(list(col))
if acc is not None:
return reduce(func, col, acc)
else:
return reduce(func, col)
def dotprod(v1, v2):
'''
Compute the dot product of two "vectors"
'''
if len(v1) != len(v2):
raise IndexError('v1 and v2 must be the same length')
return sum(map(mul, v1, v2))
def all_equal(iterable):
'''
Returns True if all the elements in the iterable are the same
'''
# Taken from the Itertools Recipes section in the docs
# If everything is equal then we should only have one group
g = groupby(iterable)
return next(g, True) and not next(g, False)
##################################################
# Functions that return a collection or iterator #
##################################################
def take(n, col):
'''
Return the up to the first n items from a generator
'''
return list(itools.islice(col, n))
def itake(n, col):
'''
Return the up to the first n items from a generator
'''
for element in itools.islice(col, n):
yield element
def takewhile(predicate, col):
'''
Take elements from a collection while the predicate holds.
Return a list of those elements
'''
return list(itakewhile(predicate, col))
def dropwhile(predicate, col):
'''
Drop elements from a collection while the predicate holds.
Return a list of those elements that are left
'''
return list(idropwhile(predicate, col))
def drop(n, col):
'''
Drop the first n items from a collection and then return the rest
'''
try:
# Allows for the same call to run against an iterator or collection
return col[n:]
except TypeError:
# This is an iterator: take and discard the first n values
for k in range(n):
try:
next(col)
except StopIteration:
return []
return list(col)
def idrop(n, col):
'''
Drop the first n items from a collection and then return a generator that
yields the rest of the elements.
'''
try:
# Allows for the same call to run against an iterator or collection
return (c for c in col[n:])
except TypeError:
# This is an iterator: take and discard the first n values
for k in range(n):
try:
next(col)
except StopIteration:
return col
return col
def scanl(col, func=add, acc=None):
'''
Fold a collection from the left using a binary function
and an accumulator into a list of values
'''
if acc is not None:
col = chain([acc], col)
return list(itools.accumulate(col, func))
def iscanl(col, func=add, acc=None):
'''
Fold a collection from the left using a binary function
and an accumulator into a stream of values
'''
if acc is not None:
col = chain([acc], col)
for element in itools.accumulate(col, func):
yield element
def scanr(col, func=add, acc=None):
'''
Use a given accumulator value to build a list of values obtained
by repeatedly applying acc = func(next(list), acc) from the right.
WARNING: Right folds and scans will blow up for infinite generators!
'''
try:
col = reversed(col)
except TypeError:
col = reversed(list(col))
if acc is not None:
col = chain([acc], col)
return list(itools.accumulate(col, func))
def iscanr(col, func=add, acc=None):
'''
Use a given accumulator value to build a stream of values obtained
by repeatedly applying acc = func(next(list), acc) from the right.
WARNING: Right folds and scans will blow up for infinite generators!
'''
try:
col = reversed(col)
except TypeError:
col = reversed(list(col))
if acc is not None:
col = chain([acc], col)
for element in itools.accumulate(col, func):
yield element
def windowed(iterable, n):
'''
Take successive n-tuples from an iterable using a sliding window
'''
# Take n copies of the iterable
iterables = tee(iterable, n)
# Advance each to the correct starting position
for step, it in enumerate(iterables):
for s in range(step):
next(it)
# Zip the modified iterables and build a list of the result
# NOTE: not using zip longest as we want to stop when we reach the end
return list(zip(*iterables))
def iwindowed(iterable, n):
'''
Take successive n-tuples from an iterable using a sliding window
'''
# Take n copies of the iterable
iterables = tee(iterable, n)
# Advance each to the correct starting position
for step, it in enumerate(iterables):
for s in range(step):
next(it)
# Zip the modified iterables and yield the elements as a genreator
# NOTE: not using zip longest as we want to stop when we reach the end
for t in zip(*iterables):
yield t
def chunked(iterable, n, fillvalue=None):
'''
Split an iterable into fixed-length chunks or blocks
'''
it = iter(iterable)
chunks = itools.zip_longest(*[it for _ in range(n)], fillvalue=fillvalue)
return list(chunks)
def ichunked(iterable, n, fillvalue=None):
'''
Split an iterable into fixed-length chunks or blocks
'''
it = iter(iterable)
chunks = itools.zip_longest(*[it for _ in range(n)], fillvalue=fillvalue)
for chunk in chunks:
yield chunk
def cmap(func, col):
'''
Concat-Map: map a function that takes a value and returns a list over an
iterable and concatenate the results
'''
return foldl(map(func, col))
def flatten(col):
'''
Flatten an arbitrarily nested list of lists into a single list.
'''
if not iscol(col):
return [col]
else:
return cmap(flatten, col)
def iflatten(col):
'''
Flatten an arbitrarily nested list of lists into an iterator of
single values.
'''
if not iscol(col):
yield col
else:
for sub_col in col:
if not iscol(col):
yield col
else:
yield from iflatten(sub_col)
#################################
# Overloaded dispatch functions #
#################################
@dispatch_on(index=1)
def conj(head, tail):
'''
Prepend an element to a collection, returning a new copy
Exact behaviour will differ depending on the collection
'''
tail_type = type(tail)
return op.concat(tail_type([head]), tail)
@conj.add(dict)
def _conj_dict(head, tail):
k, v = head # Allow exception to raise here if this doesn't work
new = deepcopy(tail)
new.update({k: v})
return new
@conj.add(set)
def _conj_set(head, tail):
new = deepcopy(tail)
new.update({head})
return new