From 98d3c64ddfdd3b4bda470ec190cb069f818115b7 Mon Sep 17 00:00:00 2001 From: Drew Hubl Date: Fri, 1 Jun 2018 04:15:55 -0600 Subject: [PATCH] Replace logshipper.tail with our own tail implementation, and remove logshipper and pyinotify dependencies --- contrib/linux/requirements.txt | 3 +- contrib/linux/sensors/file_watch_sensor.py | 15 +- contrib/linux/sensors/tail/__init__.py | 2 + contrib/linux/sensors/tail/tail.py | 325 +++++++++++++++++++++ contrib/linux/sensors/tail/test_tail.py | 246 ++++++++++++++++ fixed-requirements.txt | 2 +- requirements.txt | 3 +- st2actions/in-requirements.txt | 3 +- 8 files changed, 583 insertions(+), 16 deletions(-) create mode 100644 contrib/linux/sensors/tail/__init__.py create mode 100644 contrib/linux/sensors/tail/tail.py create mode 100644 contrib/linux/sensors/tail/test_tail.py diff --git a/contrib/linux/requirements.txt b/contrib/linux/requirements.txt index 82cfdd69f91..798fea8e0b5 100644 --- a/contrib/linux/requirements.txt +++ b/contrib/linux/requirements.txt @@ -1,3 +1,2 @@ # used by file watcher sensor -pyinotify>=0.9.5,<=0.10 --e git+https://github.com/Kami/logshipper.git@stackstorm_patched#egg=logshipper +inotify==0.2.9 diff --git a/contrib/linux/sensors/file_watch_sensor.py b/contrib/linux/sensors/file_watch_sensor.py index d0a74c71a95..37f1dbb4685 100644 --- a/contrib/linux/sensors/file_watch_sensor.py +++ b/contrib/linux/sensors/file_watch_sensor.py @@ -1,6 +1,6 @@ import os -from logshipper.tail import Tail +from tail import Tail from st2reactor.sensor.base import Sensor @@ -15,18 +15,15 @@ def __init__(self, sensor_service, config=None): def setup(self): self._tail = Tail(filenames=[]) - self._tail.handler = self._handle_line - self._tail.should_run = True + self._tail.set_handler(self._handle_line) def run(self): - self._tail.run() + self._tail.start() def cleanup(self): if self._tail: - self._tail.should_run = False - try: - self._tail.notifier.stop() + self._tail.stop() except Exception: pass @@ -42,7 +39,7 @@ def add_trigger(self, trigger): if not self._trigger: raise Exception('Trigger %s did not contain a ref.' % trigger) - self._tail.add_file(filename=file_path) + self._tail.add_file(filepath=file_path) self._logger.info('Added file "%s"' % (file_path)) def update_trigger(self, trigger): @@ -55,7 +52,7 @@ def remove_trigger(self, trigger): self._logger.error('Received trigger type without "file_path" field.') return - self._tail.remove_file(filename=file_path) + self._tail.remove_file(filepath=file_path) self._trigger = None self._logger.info('Removed file "%s"' % (file_path)) diff --git a/contrib/linux/sensors/tail/__init__.py b/contrib/linux/sensors/tail/__init__.py new file mode 100644 index 00000000000..52bb73fc921 --- /dev/null +++ b/contrib/linux/sensors/tail/__init__.py @@ -0,0 +1,2 @@ + +from .tail import Tail # noqa: F401 diff --git a/contrib/linux/sensors/tail/tail.py b/contrib/linux/sensors/tail/tail.py new file mode 100644 index 00000000000..c0b8b41d857 --- /dev/null +++ b/contrib/linux/sensors/tail/tail.py @@ -0,0 +1,325 @@ +# Copyright 2014 Koert van der Veer +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import glob +import logging + +import eventlet +from eventlet.green import os +import inotify.adapters +import inotify.constants as in_constants +import six + +LOG = logging.getLogger(__name__) + +INOTIFY_FILE_MASK = (in_constants.IN_MODIFY | in_constants.IN_OPEN) +INOTIFY_DIR_MASK = (in_constants.IN_CREATE | in_constants.IN_DELETE | in_constants.IN_MOVE) + + +class BaseInput(object): + handler = None + should_run = False + thread = None + + def set_handler(self, handler): + self.handler = handler + + def start(self): + LOG.debug("starting: should_run %r", self.should_run) + self.should_run = True + if self.thread is None: + self.thread = eventlet.spawn(self._run) + return self.thread + + def stop(self): + LOG.debug("stopping: should_run %r", self.should_run) + self.should_run = False + thread = self.thread + if thread is not None: + thread.wait() + self.thread = None + + def _run(self): + try: + self.run() + except Exception as e: + LOG.exception("Encountered exception while running %s", self) + raise e + + def run(self): + raise NotImplementedError # pragma: no cover + + +class Tail(BaseInput): + """Follows files, and processes new lines in those files with a callback + handler. + + Glob-style wildcards are supported, and new files matching the wildcard + will be discovered. + + Rotated files are automatically discovered, and reopened. + + Example for ``input.yml``: + + .. code:: yaml + + - tail: + filename: + - /var/log/syslog + - /var/log/my_app/*.log + """ + + class FileTail(object): + __slots__ = ['file_descriptor', 'path', 'buffer', 'stat', 'rescan', + 'watch_descriptor'] + + def __init__(self): + self.buffer = b"" + self.file_descriptor = None + self.path = None + self.rescan = None + self.stat = None + self.watch_descriptor = None + + class Event(object): + @classmethod + def from_INOTIFY_HEADER(cls, in_header, path, filename=None): + return cls( + wd=in_header.wd, + mask=in_header.mask, + cookie=in_header.cookie, + len_=in_header.len, + dir_=bool(filename), + path=path, + filename=filename) + + def __init__(self, wd, mask, cookie, len_, dir_, path, filename=None): + self.wd = wd + self.mask = mask + self.cookie = cookie + self.len = len_ + self.path = path + self.filename = filename + self.dir = dir_ + + def __str__(self): + return ("{wd} {mask} {path} {filename} {dir}".format( + wd=self.wd, + mask=self.mask, + path=self.path, + filename=self.filename, + dir=self.dir)) + + def __repr__(self): + return ("".format( + wd=self.wd, + mask=self.mask, + path=self.path, + filename=self.filename, + dir=self.dir)) + + def __init__(self, filenames, block_duration_s=None, num_green_threads=10): + # Eventlet monkeypatches the os module, so we disable these checks + # pylint: disable=no-member + if isinstance(filenames, six.string_types): + filenames = [filenames] + + self.globs = [os.path.abspath(filename) for filename in filenames] + # TODO: Handle expanding the greenpool if necessary + kwargs = {} + if block_duration_s: + kwargs['block_duration_s'] = block_duration_s + LOG.debug("creating inotify watch manager") + self.watch_manager = inotify.adapters.Inotify(**kwargs) + LOG.debug("") + self.tails = {} + self.dir_watches = {} + self.thread = None + self.pool = eventlet.greenpool.GreenPool(size=num_green_threads) + + # MAIN API + + def add_file(self, filepath): + self.pool.spawn(self.process_tail, filepath) + eventlet.sleep(0.01) + + def remove_file(self, filepath): + self.pool.spawn(self._remove_file, filepath) + eventlet.sleep(0.01) + + def run(self): + self.update_tails(self.globs, do_read_all=False) + try: + for event in self.watch_manager.event_gen(): + if event: + (inotify_header, type_names, path, filename) = event + tail_event = Tail.Event.from_INOTIFY_HEADER(inotify_header, path, filename) + self.pool.spawn_n(self._inotify_event, tail_event) + else: + eventlet.sleep() + if not self.should_run: + break + finally: + self.pool.waitall() + self.remove_tails() + + def _remove_file(self, filepath): + tail = self.tails.get(filepath) + if tail: + self.close_tail(tail) + + def _inotify_file(self, event): + LOG.debug("file notified %r", event) + tail = self.tails.get(event.path) + if tail: + if event.mask & in_constants.IN_MODIFY: + if tail.rescan: + LOG.debug("rescanning %r", event.path) + self.process_tail(event.path) + else: + LOG.debug("reading %r", tail) + self.read_tail(tail) + else: + tail.rescan = True + + def _inotify_dir(self, event): + LOG.debug("dir notified %r", event) + tail = self.tails.get(event.path) + if tail: + self.process_tail(event.path) + + if event.dir and not tail: + self.update_tails(self.globs) + + def _inotify_event(self, event): + LOG.debug("change notified %r", event) + if event.dir: + self._inotify_dir(event) + else: + self._inotify_file(event) + eventlet.sleep() + + def read_tail(self, tail): + while True: + buff = os.read(tail.file_descriptor, 1024) + if not buff: + return + + buff = buff.decode('utf8') + + # Append to last buffer + if tail.buffer: + buff = tail.buffer + buff + tail.buffer = "" + + lines = buff.splitlines(True) + if lines[-1][-1] != "\n": # incomplete line in buffer + # Save the remainder of the last line as the buffer + # This fixes a bug in the original logshipper.Tail + # implementation, which only assigned the last character: + # tail.buffer = lines[-1][-1] + # The last [-1] was unnecessary + tail.buffer = lines[-1] + # Only return lines with newlines + lines = lines[:-1] + + for line in lines: + self.handler(file_path=tail.path, line=line[:-1]) + + def process_tail(self, path, should_seek=False): + # pylint: disable=no-member + file_stat = os.stat(path) + + LOG.debug("process_tail for %s", path) + # Find or create a tail. + tail = self.tails.get(path) + if tail: + fd_stat = os.fstat(tail.file_descriptor) + pos = os.lseek(tail.file_descriptor, 0, os.SEEK_CUR) + if fd_stat.st_size > pos: + LOG.debug("something to read") + self.read_tail(tail) + if (tail.stat.st_size > file_stat.st_size or + tail.stat.st_ino != file_stat.st_ino): + LOG.info("%s looks rotated. reopening", path) + self.close_tail(tail) + tail = None + should_seek = False + + if not tail: + LOG.info("tailing %s", path) + self.tails[path] = tail = self.open_tail(path, should_seek) + tail.stat = file_stat + self.read_tail(tail) + + tail.rescan = False + + def update_tails(self, globs, do_read_all=True): + # pylint: disable=no-member + watches = set() + + LOG.debug("update tails: %r", globs) + + for fileglob in globs: + for path in glob.iglob(fileglob): + self.process_tail(path, not do_read_all) + watches.add(path) + + for vanished in set(self.tails) - watches: + LOG.info("%s vanished, stop tailing", vanished) + self.close_tail(self.tails.pop(vanished)) + + for path in globs: + while len(path) > 1: + path = os.path.dirname(path) + if path not in self.dir_watches: + LOG.debug("monitoring dir %s", path) + + self.dir_watches[path] = self.watch_manager.add_watch( + path, INOTIFY_DIR_MASK) + + if '*' not in path and '?' not in path: + break + + def remove_tails(self): + LOG.debug("remove tails") + + for path, tail in self.tails.items(): + self.close_tail(tail) + + def open_tail(self, path, go_to_end=False): + # pylint: disable=no-member + tail = Tail.FileTail() + tail.file_descriptor = os.open(path, os.O_RDONLY | os.O_NONBLOCK) + tail.path = path + + if go_to_end: + os.lseek(tail.file_descriptor, 0, os.SEEK_END) + + watch_descriptor = self.watch_manager.add_watch( + path, INOTIFY_FILE_MASK) + + tail.watch_descriptor = watch_descriptor + return tail + + def close_tail(self, tail): + # pylint: disable=no-member + self.watch_manager.remove_watch(tail.path) + os.close(tail.file_descriptor) + if tail.buffer: + LOG.debug("triggering from tail buffer") + self.handler(file_path=tail.path, line=tail.buffer) diff --git a/contrib/linux/sensors/tail/test_tail.py b/contrib/linux/sensors/tail/test_tail.py new file mode 100644 index 00000000000..f08587233ae --- /dev/null +++ b/contrib/linux/sensors/tail/test_tail.py @@ -0,0 +1,246 @@ +# Copyright 2014 Koert van der Veer +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import os +import shutil +import sys +import tempfile +import unittest + +try: + from StringIO import StringIO # Python 2 +except ImportError: + from io import StringIO # Python 3 + +import eventlet +import inotify + +from .tail import Tail + +LOG = logging.getLogger(__name__) + + +# https://stackoverflow.com/a/30716207 +class ReplaceStdErr(object): + """Context manager that replaces stderr with a StringIO object""" + def __init__(self): + self.original_stderr = sys.stderr + + def __enter__(self): + sys.stderr = StringIO() + + def __exit__(self, type, value, traceback): + sys.stderr = self.original_stderr + + +class TailTestCase(unittest.TestCase): + + def test_nonexisting_file(self): + messages = [] + + def message_handler(file_path, line): + LOG.debug('event generated file_path: %s, line: %s', file_path, line) + messages.append({'file_path': file_path, 'message': line}) + + path = os.path.join('/', 'tmp', 'tail_test_case', 'this_file_should_never_exist') + + self.assertFalse(os.path.exists(path)) + + tail = Tail(path) + tail.set_handler(message_handler) + thread = tail.start() + + with self.assertRaises(inotify.calls.InotifyError) as e: + with ReplaceStdErr(): + LOG.debug("Waiting for thread to finish") + thread.wait() # give thread a chance to error out + + self.assertEqual(e.exception.message, "Call failed (should not be -1): (-1) ERRNO=(0)") + + def test_preexisting_file(self): + messages = [] + + def message_handler(file_path, line): + LOG.debug('event generated file_path: %s, line: %s', file_path, line) + messages.append({'file_path': file_path, 'message': line}) + + with tempfile.NamedTemporaryFile() as f: + f.write(b"test123\n") + f.flush() + + tail = Tail(f.name) + tail.set_handler(message_handler) + tail.start() + eventlet.sleep(0.01) # give thread a chance to open the file + + f.write(b"second line\n") + f.flush() + eventlet.sleep(0.01) # give thread a chance to read the line + + tail.stop() + eventlet.sleep(0.01) # give thread a chance to close the line + + self.assertEqual(len(messages), 1) + self.assertEqual(messages[0]["file_path"], f.name) + self.assertEqual(messages[0]["message"], "second line") + + def test_write_without_newline(self): + messages = [] + + def message_handler(file_path, line): + LOG.debug('event generated file_path: %s, line: %s', file_path, line) + messages.append({'file_path': file_path, 'message': line}) + + with tempfile.NamedTemporaryFile() as f: + f.write(b"test123\n") + f.flush() + + self.assertEqual(len(messages), 0) + + tail = Tail(f.name) + tail.set_handler(message_handler) + tail.start() + eventlet.sleep(0.01) # give thread a chance to open the file + + f.write(b"second line") + f.flush() + eventlet.sleep(0.01) # give thread a chance to read the line + + self.assertEqual(len(messages), 0) + + f.write(b"\n") + f.flush() + eventlet.sleep(0.01) # give thread a chance to read the line + + tail.stop() + eventlet.sleep(0.01) # give thread a chance to close the line + + self.assertEqual(len(messages), 1) + self.assertEqual(messages[0]["file_path"], f.name) + self.assertEqual(messages[0]["message"], "second line") + + def test_write_with_newline_and_without_newline(self): + messages = [] + + def message_handler(file_path, line): + LOG.debug('event generated file_path: %s, line: %s', file_path, line) + messages.append({'file_path': file_path, 'message': line}) + + with tempfile.NamedTemporaryFile() as f: + f.write(b"test123\n") + f.flush() + + self.assertEqual(len(messages), 0) + + tail = Tail(f.name) + tail.set_handler(message_handler) + tail.start() + eventlet.sleep(0.01) # give thread a chance to open the file + + f.write(b"second line\nthird ") + f.flush() + eventlet.sleep(0.01) # give thread a chance to read the line + + self.assertEqual(len(messages), 1) + self.assertEqual(messages[0]["file_path"], f.name) + self.assertEqual(messages[0]["message"], "second line") + + f.write(b"line\n") + f.flush() + eventlet.sleep(0.01) # give thread a chance to read the line + + tail.stop() + eventlet.sleep(0.01) # give thread a chance to close the line + + self.assertEqual(len(messages), 2) + self.assertEqual(messages[0]["file_path"], f.name) + self.assertEqual(messages[0]["message"], "second line") + self.assertEqual(messages[1]["file_path"], f.name) + self.assertEqual(messages[1]["message"], "third line") + + def test_write_after_newline(self): + messages = [] + + def message_handler(file_path, line): + LOG.debug('event generated file_path: %s, line: %s', file_path, line) + messages.append({'file_path': file_path, 'message': line}) + + with tempfile.NamedTemporaryFile() as f: + f.write(b"test123\n") + f.flush() + + self.assertEqual(len(messages), 0) + + tail = Tail(f.name) + tail.set_handler(message_handler) + tail.start() + eventlet.sleep(0.01) # give thread a chance to open the file + + f.write(b"second line\nthird") + f.flush() + eventlet.sleep(0.01) # give thread a chance to read the line + + self.assertEqual(len(messages), 1) + self.assertEqual(messages[0]["file_path"], f.name) + self.assertEqual(messages[0]["message"], "second line") + + tail.stop() + eventlet.sleep(0.01) # give thread a chance to close the line + + self.assertEqual(len(messages), 2) + self.assertEqual(messages[0]["file_path"], f.name) + self.assertEqual(messages[0]["message"], "second line") + self.assertEqual(messages[1]["file_path"], f.name) + self.assertEqual(messages[1]["message"], "third") + + def test_wildcard(self): + messages = [] + + def message_handler(file_path, line): + LOG.debug('event generated file_path: %s, line: %s', file_path, line) + messages.append({'file_path': file_path, 'message': line}) + + try: + path = tempfile.mkdtemp() + tail = Tail(os.path.join(path, "*.log")) + tail.set_handler(message_handler) + tail.start() + eventlet.sleep(0.01) # give thread a chance to open the file + + LOG.debug("about to write line 1") + with open(os.path.join(path, "test.log"), 'w') as f: + f.write("line 1\n") + + eventlet.sleep(0.01) # give thread a chance to read the line + self.assertEqual(len(messages), 1) + self.assertEqual(messages[0]["message"], "line 1") + + LOG.debug("about to write line 2") + with open(os.path.join(path, "test.log"), 'a') as f: + f.write("line 2\n") + + eventlet.sleep(0.01) # give thread a chance to read the line + self.assertEqual(len(messages), 2) + self.assertEqual(messages[0]["file_path"], os.path.join(path, "test.log")) + self.assertEqual(messages[0]["message"], "line 1") + self.assertEqual(messages[1]["file_path"], os.path.join(path, "test.log")) + self.assertEqual(messages[1]["message"], "line 2") + + tail.stop() + eventlet.sleep(0.1) # give thread a chance to close the line + + finally: + shutil.rmtree(path) diff --git a/fixed-requirements.txt b/fixed-requirements.txt index 0a6b7f7c5ce..1b3ceacbfad 100644 --- a/fixed-requirements.txt +++ b/fixed-requirements.txt @@ -27,7 +27,7 @@ passlib==1.7.1 lockfile==0.12.2 python-gnupg==0.4.2 jsonpath-rw==1.4.0 -pyinotify==0.9.6 +inotify==0.2.9 semver==2.8.0 pytz==2018.4 stevedore==1.28.0 diff --git a/requirements.txt b/requirements.txt index 312856a703d..671ca70bfd8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,12 +5,12 @@ argcomplete bcrypt eventlet==0.23.0 flex==6.13.1 -git+https://github.com/Kami/logshipper.git@stackstorm_patched#egg=logshipper git+https://github.com/StackStorm/python-mistralclient.git#egg=python-mistralclient git+https://github.com/StackStorm/st2-auth-backend-flat-file.git@master#egg=st2-auth-backend-flat-file gitpython==2.1.10 greenlet==0.4.13 gunicorn==19.8.1 +inotify==0.2.9 ipaddr jinja2 jsonpath-rw==1.4.0 @@ -30,7 +30,6 @@ prettytable prometheus_client==0.1.1 prompt-toolkit==1.0.15 psutil==5.4.5 -pyinotify==0.9.6 pymongo==3.6.1 pyrabbit python-dateutil diff --git a/st2actions/in-requirements.txt b/st2actions/in-requirements.txt index ca4c2aa74b2..6db3c48998a 100644 --- a/st2actions/in-requirements.txt +++ b/st2actions/in-requirements.txt @@ -15,7 +15,6 @@ python-json-logger gitpython lockfile # needed by core "linux" pack - TODO: create virtualenv for linux pack on postinst -pyinotify -git+https://github.com/Kami/logshipper.git@stackstorm_patched#egg=logshipper +inotify # required by pack_mgmt/setup_virtualenv.py#L135 virtualenv