-
Notifications
You must be signed in to change notification settings - Fork 34
/
hz
executable file
·83 lines (65 loc) · 2.75 KB
/
hz
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#!/usr/bin/env python
import os, sys, io, re
bs_start = 2**7
bs_max = 20 * 2**20
def main(args=None):
args = list(sys.argv[1:] if args is None else args)
for n, arg in enumerate(args):
if re.search(r'^-\d+$', arg): args[n] = '-n{}'.format(arg.lstrip('-'))
import argparse
parser = argparse.ArgumentParser(
description='"head", but with \\x00 delimeters.'
' Ensures that final chunk ends with delimeter by default.')
parser.add_argument('file', nargs='?', metavar='path',
help='Path to a file to process instead of stdin.')
parser.add_argument('-n', '--count', metavar='number', type=int,
help='Max number of entries to read. Default is to read ALL of them.')
parser.add_argument('-r', '--replace-with-newlines',
action='store_true', help='Replace zero bytes with newlines in the output.')
parser.add_argument('-z', '--replace-with-nulls',
action='store_true', help='Handle newlines in the'
' input and replace them with zero bytes in the output.'
'If --replace-with-newlines is also specified, behaves same as "head".')
parser.add_argument('-b', '--max-buffer-size',
metavar='bytes', type=int, default=2*2**20,
help='Max size of a single entry (in bytes),'
f' i.e. a peek buffer. Cannot be larger than {bs_max}B.')
parser.add_argument('--no-final-delimiter',
action='store_true', help='Do not ensure that final chunk ends with delimeter (e.g. \\x00).')
opts = parser.parse_args(args)
if opts.max_buffer_size > bs_max:
parser.error(f'--max-buffer-size cannot be larger than {bs_max} (hardcoded limit).')
sep = b'\0' if not opts.replace_with_nulls else b'\n'
glue = b'\0' if not opts.replace_with_newlines else b'\n'
src = sys.stdin.fileno() if not opts.file else opts.file
dst = sys.stdout.buffer
with open(src, 'rb', opts.max_buffer_size) as src:
bs, bs_last, count = None, None, 0
while True:
if bs is None: bs = bs_start
buff = src.peek(bs)
bs_now = len(buff)
if buff:
if bs_now < bs and bs_now == bs_last: pos = None
else: pos = buff.find(sep)
if pos == -1:
if bs == opts.max_buffer_size:
raise RuntimeError( 'Failed to find next'
f' \\x00 byte with buffer size: {opts.max_buffer_size}' )
bs, bs_last = max(bs * 2, opts.max_buffer_size), bs_now
continue
bs_last = None
if count and (buff or not opts.no_final_delimiter): dst.write(glue)
if opts.count and count >= opts.count: break
if not buff: break
count += 1
if pos is not None:
buff = src.read(pos)
assert sep not in buff and src.read(1) == sep, 'bad chunk read'
dst.write(buff)
else: # "read until the end" command
for buff in iter(lambda: src.read(bs_max), b''):
assert sep not in buff, 'bad unlimited read'
dst.write(buff)
bs = None
if __name__ == '__main__': sys.exit(main())