-
Notifications
You must be signed in to change notification settings - Fork 34
/
dovecot-mail
executable file
·349 lines (309 loc) · 13.1 KB
/
dovecot-mail
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
#!/usr/bin/env python
from xml.sax.saxutils import escape as xml_escape
import pathlib as pl, datetime as dt
import email, email.header, email.errors
import os, sys, re, time, json, struct, hashlib
import zmq # pyzmq
def file_follow( src, open_tail=True,
read_int_min=0.1, read_int_max=20, read_int_mul=1.1,
rotation_check_interval=20, yield_file=False, **open_kws ):
'Tails file for lines via read() calls with adaptive interval between those'
open_tail = open_tail and isinstance(src, str)
src_open = lambda: open(path, 'rb', **open_kws)
stat = lambda f: (os.fstat(f) if isinstance(f, int) else os.stat(f))
sanity_chk_stats = lambda stat: (stat.st_ino, stat.st_dev)
sanity_chk_ts = lambda ts=None: (ts or time.time()) + rotation_check_interval
if isinstance(src, str): src, path = None, src
else:
path = src.name
src_inode, src_inode_ts = (
sanity_chk_stats(stat(src.fileno())), sanity_chk_ts() )
line, read_chk = b'', read_int_min
while True:
if not src: # (re)open
src = src_open()
if open_tail:
src.seek(0, os.SEEK_END)
open_tail = False
src_inode, src_inode_ts = (
sanity_chk_stats(stat(src.fileno())), sanity_chk_ts() )
src_inode_chk = None
ts = time.time()
if ts > src_inode_ts: # rotation check
src_inode_chk, src_inode_ts = (
sanity_chk_stats(stat(path)), sanity_chk_ts(ts) )
if stat(src.fileno()).st_size < src.tell(): src.seek(0) # truncated
else: src_inode_chk = None
buff = src.readline()
if not buff: # eof
if src_inode_chk and src_inode_chk != src_inode: # rotated
src.close()
src, line = None, b''
continue
if read_chk is None:
yield (buff if not yield_file else (buff, src))
else:
time.sleep(read_chk)
read_chk *= read_int_mul
if read_chk > read_int_max: read_chk = read_int_max
else:
line += buff
read_chk = read_int_min
if line and line.endswith(b'\n'): # complete line
val = yield (line if not yield_file else (line, src))
if val is not None: break
line = b''
if src: src.close()
def file_follow_durable( path, min_dump_interval=10,
xattr_name='user.mail-notify.pos', xattr_update=True, **follow_kws ):
'''Records log position into xattrs after reading line every min_dump_interval seconds.
Checksum of the last line at the position is also recorded
(so line itself don't have to fit into xattr) to make sure file wasn't
truncated between last xattr dump and re-open.'''
pos_fmt, pos_hash = struct.Struct('>II10s'), ( lambda buff:
hashlib.blake2s(buff, digest_size=10, key=b'tail-chk').digest() )
# Try to restore position
src = open(path, 'rb')
try:
if not xattr_name: raise OSError
pos = os.getxattr(src.fileno(), xattr_name)
pos, line_len, line_hash = pos_fmt.unpack(pos)
except (OSError, struct.error): pos = None
if pos:
try:
src.seek(pos - line_len)
if pos_hash(line := src.read(line_len)) != line_hash:
raise OSError('last log-line does not match hash')
except OSError as err:
log.info('Failed to restore log position: %s', err)
src.seek(0)
tailer = file_follow(src, yield_file=True, **follow_kws)
# ...and keep it updated
pos_dump_ts_next = lambda ts=0: (
(ts or time.monotonic()) + min_dump_interval )
pos_dump_ts = 0
while True:
line, src_chk = next(tailer)
if not line: pos_dump_ts = 0 # force-write xattr
if (ts := time.monotonic()) >= pos_dump_ts:
if src is not src_chk: src = src_chk
pos_new = src.tell()
if pos != pos_new:
pos = pos_new
if xattr_update:
os.setxattr( src.fileno(), xattr_name,
pos_fmt.pack(pos, len(line), pos_hash(line)) )
pos_dump_ts = pos_dump_ts_next(ts)
if (yield line.decode('utf-8', 'replace')): tailer.send(StopIteration); break
def parse_iso8601(spec, tz_default=dt.timezone.utc, _re=re.compile(
r'(\d{4})-(\d{2})-(\d{2})(?:[T ](\d{2}):(\d{2}))?'
r'(?::(?P<s>\d{2})(?:\.(?P<us>\d+))?)?\s*(?P<tz>Z|[-+]\d{2}:\d{2})?' ) ):
if not (m := _re.search(spec)): raise ValueError(spec)
if m.group('tz'):
tz = m.group('tz')
if tz == 'Z': tz = dt.timezone.utc
else:
k = {'+':1,'-':-1}[tz[0]]
hh, mm = ((int(n) * k) for n in tz[1:].split(':', 1))
tz = dt.timezone(dt.timedelta(hours=hh, minutes=mm), tz)
else: tz = tz_default
ts_list = list(m.groups()[:5])
if not ts_list[3]: ts_list[3] = ts_list[4] = 0
ts_list = [ *map(int, ts_list),
int(m.group('s') or 0), int(m.group('us') or 0) ]
ts = dt.datetime.strptime(
'{:04d}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}.{:06d}'.format(*ts_list),
'%Y-%m-%d %H:%M:%S.%f' )
return ts.replace(tzinfo=tz).timestamp()
def mail_find(maildir, msgid, msgpath, ts, ts_diff_max=5*60):
if msgpath != 'INBOX': maildir = maildir / f'.{msgpath}'
for d in 'new', 'cur':
for mail in sorted((maildir / d).iterdir(), reverse=True):
if not re.search(r'^\d+\.', mail.name): continue
try: ts_mail = mail.stat().st_mtime
except FileNotFoundError: continue
if abs(ts - ts_mail) > ts_diff_max: break # newest mails should be first
with mail.open(errors='surrogateescape') as src:
msg_id_line = False
for line in src:
line = line.strip()
if not line: break # end of headers
if msg_id_line: m = re.search(r'^\s*(<[^>]+>)(\s|$)', line)
else:
m = re.search(r'(?i)^message-id:\s*(?:(<[^>]+>)\s*)?$', line)
if m and not m.group(1): msg_id_line = True; continue
msg_id_line = False
if not m or m.group(1) != msgid: continue
src.seek(0)
return email.message_from_file(src)
def mail_iter_log(log_tailer, maildir, path_filters=list()):
path_exclude_cache = set()
for line in log_tailer:
# "2013-11-26T22:50:19.455012+06:00 mail.info dovecot[-]:
# lda(fraggod): sieve: msgid=<[email protected]>:
# stored mail into mailbox 'INBOX'\n"
# "Jan 10 04:47:56 daedalus dovecot[899]: lda(fraggod)<899><kdxmBqy7F16DAwAAMpFoNw>:
# sieve: msgid=<[email protected]>:
# fileinto action: stored mail into mailbox 'reports'"
m = re.search(
r'(?P<ts>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+\+\d{2}:\d{2})'
r' mail\.info dovecot\[(-|\d+)\]:\s+'
r'lda\([^)]+\)(<\d+><[^>]+>)?: sieve:'
r' msgid=(?P<msgid><[^>]+>|[^\s:]+|unspecified)\s*:'
r'( fileinto action:)? stored mail into mailbox \'(?P<path>[^\']+)\'\s*$', line )
if not m:
log.warn('Failed to match line: %r', line.strip())
continue
ts = parse_iso8601(m.group('ts'))
msgid, msgpath = m.group('msgid'), m.group('path')
if msgid == 'unspecified':
log.info('Skipping message with unspeficied msgid (path: %s)', msgpath)
continue
if msgpath in path_exclude_cache: msg_pass = False
else:
msg_pass = True
for pat in path_filters:
if not pat: continue
pat, inc = (pat[1:], pat[0] == '+') if pat[0] in '-+' else (pat, False)
if re.search(pat, msgpath):
if not inc: msg_pass = False
break
if not msg_pass:
path_exclude_cache.add(msgpath)
log.debug('Skipping mail for path: %s', msgpath)
continue
if (msg := mail_find(maildir, msgid, msgpath, ts)) is None:
log.info('Failed to find delivered (path: %s) message: %s', msgpath, msgid)
continue
yield msgpath, msg
def mail_header_decode(line):
dec = list()
for chunk, enc in email.header.decode_header(line):
if isinstance(chunk, str): dec.append(chunk); continue
try: dec.append(chunk.decode(enc or 'utf-8', 'replace'))
except: dec.append(repr(chunk)[2:-1]) # enc can be bogus too
return ' '.join(dec)
def mail_header(msg, k):
res, header = list(), mail_header_decode(msg.get(k, ''))
# email.header.decode_header misses some badly-concatenated stuff
while True:
m = re.search('=\?[\w\d-]+(\*[\w\d-]+)?\?[QB]\?[^?]+\?=', header)
if not m:
res.append(header)
break
start, end = m.span(0)
try: m = mail_header_decode(header[start:end])
except email.errors.HeaderParseError: pass
res.extend([header[:start], m])
header = header[end:]
return ''.join(res)
def main(args=None):
import argparse
parser = argparse.ArgumentParser(
usage='%(prog)s [options] log-path maildir-path',
description='Monitor (tail) specified dovecot log file'
' and dispatch new mail delivery notifications via zeromq pub socket.')
parser.add_argument('log_path', help='Path to dovecot log file.')
parser.add_argument('maildir_path',
help='Path to maildir where mails are delivered to.')
group = parser.add_argument_group('ZeroMQ socket options')
group.add_argument('-b', '--bind',
action='append', metavar='ip:port',
help='zmq socket bind address - can be either ip:port (assumed to be'
' tcp socket, e.g. 1.2.3.4:5678) or full zmq url (tcp://1.2.3.4:5678).'
' Can be specified multiple times to bind to more than one address/port.')
group.add_argument('-d', '--dst',
action='append', metavar='ip:port',
help='Peer address to connect zmq socket to - can be either ip:port'
' (assumed to be tcp socket, like 1.2.3.4:5678)'
' or full zmq url (tcp://1.2.3.4:5678).'
' Can be specified multiple times to deliver message to more than one peer.')
group.add_argument('-w', '--wait-connect',
type=float, metavar='seconds', default=1.0,
help='Timeout to wait for peer connections to establish'
' (default: %(default)s) and unsent messages to linger on app stop.')
group = parser.add_argument_group('Filtering')
group.add_argument('-f', '--filter', action='append', metavar='spec',
help='Filter-pattern (format: [+/-]regexp) for mail path.'
' Patterns are evaluated in same order'
' as they are specified with "pass" as a default action.'
' Optional "-" or "+" sign before regexp explicitly marks pattern as'
' include or exclude one, otherwise patterns are treated as "exclude".')
group = parser.add_argument_group('Message formatting')
group.add_argument('-n', '--hostname',
metavar='network_name', default=os.uname()[1],
help='Source name to use for dispatched message.')
group.add_argument('-a', '--app-name', metavar='name', default='mail_notify',
help='App name for the icon (default: %(default)s).')
group.add_argument('-i', '--icon', action='append', metavar='icon',
help='Icon name, path or alias.'
' Can be specified multiple times (for fallback icon names).')
group.add_argument('--pango-markup', action='store_true',
help='Use pango markup in issued notifications and escape other stuff.')
group = parser.add_argument_group('Testing/debug options')
group.add_argument('--test-message', action='store_true',
help='Issue test notification right after start.')
group.add_argument('--dry-run', action='store_true',
help='Do not store/update log position - just parse and generate notifications.')
group.add_argument('--debug', action='store_true', help='Verbose operation mode.')
opts = parser.parse_args(sys.argv[1:] if args is None else args)
if not opts.bind and not opts.dst:
parser.error('At least one of either --bind or --dst must be specified.')
p_maildir, p_log = map(pl.Path, [opts.maildir_path, opts.log_path])
if not all(p.exists() for p in [p_maildir, p_log]):
parser.error('Missing maildir/log path(s)')
global log
import logging
logging.basicConfig(level=logging.DEBUG if opts.debug else logging.WARNING)
log = logging.getLogger()
log_tailer = file_follow_durable(opts.log_path, xattr_update=not opts.dry_run)
ctx, sock = zmq.Context(), None
try:
sock = ctx.socket(zmq.PUB)
if hasattr(zmq, 'IPV4ONLY'): sock.setsockopt(zmq.IPV4ONLY, False)
sock.setsockopt(zmq.RECONNECT_IVL_MAX, int(300 * 1000))
sock.setsockopt(zmq.LINGER, int(opts.wait_connect * 1000))
sock.setsockopt(zmq.SNDHWM, 50)
zmq_addr = lambda addr: ( f'tcp://{addr}'
if not re.search(r'^\w+://', addr) else addr )
if opts.bind:
for dst in map(zmq_addr, opts.bind):
log.debug('Binding to %s', dst)
sock.bind(dst)
if opts.dst:
log.debug('Connecting to %s peer(s)', len(opts.dst))
for dst in map(zmq_addr, opts.dst): sock.connect(dst)
time.sleep(opts.wait_connect)
def zmq_send(summary, body):
msg = dict(
summary=summary, body=body,
app_name=opts.app_name, icon=','.join(opts.icon or []) )
msg = '\1' + json.dumps([opts.hostname, time.time(), msg])
try: sock.send_string(msg, zmq.DONTWAIT)
except zmq.ZMQError as err:
if err.errno != zmq.EAGAIN: raise
if opts.test_message:
log.debug('Dispatching test notification')
host, pub, sub = os.uname()[1], opts.bind, opts.dst
if opts.pango_markup:
host = f'<b>{xml_escape(host)}</b>'
pub = (f'<u>{xml_escape(v)}</u>' for v in pub)
sub = (f'<u>{xml_escape(v)}</u>' for v in sub)
pub, sub = ', '.join(pub), ', '.join(sub)
zmq_send( 'Mail notification test',
'Mail notification daemon started successfully on host:'
f' {host}\nPub socket(s): {pub}\nConnecting to: {sub}' )
for path, msg in mail_iter_log(log_tailer, p_maildir, opts.filter):
msg_from, msg_subj = (mail_header(msg, k) for k in ['from', 'subject'])
log.debug('Dispatching notification: (%s) %s - %s', path, msg_from, msg_subj)
if not opts.pango_markup: msg_path = path
else:
path, msg_from, msg_subj = map(xml_escape, [path, msg_from, msg_subj])
msg_path = f'<b>{path}</b>'
zmq_send( f'New mail ({path})',
f'Path: {msg_path}\nFrom: {msg_from}\nSubject: {msg_subj}' )
finally:
log.debug('Terminating zmq context...')
if sock: sock.close()
ctx.term()
if __name__ == '__main__': sys.exit(main())