From 7537de0068ef36d6fb0316db06d6694bd720257f Mon Sep 17 00:00:00 2001 From: Drew Hubl Date: Wed, 11 Jul 2018 16:16:38 -0700 Subject: [PATCH] Use watchdog3 instead of inotify directly --- contrib/linux/requirements.txt | 3 +- contrib/linux/sensors/file_watch_sensor.py | 5 +- contrib/linux/sensors/tail.py | 213 ++++++++++++ contrib/linux/sensors/tail/__init__.py | 2 - contrib/linux/sensors/tail/tail.py | 329 ------------------ .../linux/sensors/test_file_watch_sensor.py | 194 +++++++++++ contrib/linux/sensors/{tail => }/test_tail.py | 116 ++---- fixed-requirements.txt | 2 +- requirements.txt | 3 +- 9 files changed, 442 insertions(+), 425 deletions(-) create mode 100644 contrib/linux/sensors/tail.py delete mode 100644 contrib/linux/sensors/tail/__init__.py delete mode 100644 contrib/linux/sensors/tail/tail.py create mode 100644 contrib/linux/sensors/test_file_watch_sensor.py rename contrib/linux/sensors/{tail => }/test_tail.py (55%) diff --git a/contrib/linux/requirements.txt b/contrib/linux/requirements.txt index 798fea8e0b5..4fbce41d35c 100644 --- a/contrib/linux/requirements.txt +++ b/contrib/linux/requirements.txt @@ -1,2 +1,3 @@ # used by file watcher sensor -inotify==0.2.9 +git+https://github.com/blag/watchdog3.git#python2.7-support +pathtools3>=0.2.0 diff --git a/contrib/linux/sensors/file_watch_sensor.py b/contrib/linux/sensors/file_watch_sensor.py index 37f1dbb4685..a0454bdaa5a 100644 --- a/contrib/linux/sensors/file_watch_sensor.py +++ b/contrib/linux/sensors/file_watch_sensor.py @@ -14,7 +14,7 @@ def __init__(self, sensor_service, config=None): self._tail = None def setup(self): - self._tail = Tail(filenames=[]) + self._tail = Tail() self._tail.set_handler(self._handle_line) def run(self): @@ -35,11 +35,12 @@ def add_trigger(self, trigger): return self._trigger = trigger.get('ref', None) + seek_to_end = trigger.get('seek_to_end', True) if not self._trigger: raise Exception('Trigger %s did not contain a ref.' % trigger) - self._tail.add_file(filepath=file_path) + self._tail.add_file(filepath=file_path, seek_to_end=seek_to_end) self._logger.info('Added file "%s"' % (file_path)) def update_trigger(self, trigger): diff --git a/contrib/linux/sensors/tail.py b/contrib/linux/sensors/tail.py new file mode 100644 index 00000000000..614f9480c2f --- /dev/null +++ b/contrib/linux/sensors/tail.py @@ -0,0 +1,213 @@ +# Copyright 2014 Koert van der Veer +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import logging +import os + +import six +from watchdog.observers import Observer +from watchdog.events import FileSystemEventHandler + + +LOG = logging.getLogger(__name__) + + +class FileTail(object): + __slots__ = ['file_descriptor', 'path', 'recursive', 'watch', 'buffer'] + + def __init__(self, file_descriptor=None, recursive=None, path=None, + watch=None, buffer=None): + if buffer is None: + self.buffer = b"" + else: + self.buffer = buffer + self.file_descriptor = file_descriptor + self.recursive = recursive + self.path = path + self.watch = watch + + +class Tail(FileSystemEventHandler): + """Follows files, and processes new lines in those files with a callback + handler. + + Directories are supported, and new files matching the wildcard will be + discovered. + + Rotated files are automatically discovered, and reopened. + + Example for ``input.yml``: + + .. code:: yaml + + - tail: + filename: + - /var/log/syslog + - /var/log/my_app/*.log + """ + log = None + + def __init__(self, handler=None, filenames=None, log=LOG, *args, **kwargs): + # pylint: disable=no-member + self.handler = handler + + if isinstance(filenames, six.string_types): + filenames = [filenames] + + self.tails = {} + self.log = log + self.observer = Observer() + + if filenames: + for filename in filenames: + self.add_file(filename) + + def set_handler(self, handler): + self.handler = handler + + def add_file(self, filepath, recursive=True, seek_to_end=True): + self.log.debug("adding tail %s", filepath) + if os.path.isdir(filepath): + seek_to_end = False + else: + recursive = False + self.tails[filepath] = self.open_tail(filepath, recursive, seek_to_end) + + def remove_file(self, filepath): + self.log.debug("removing tail %s", filepath) + tail = self.tails.get(filepath) + if tail is not None: + self.close_tail(tail) + + def remove_tails(self): + self.log.debug("remove tails") + + for filepath in self.tails.keys(): + self.remove_file(filepath) + + def start(self): + self.log.debug("starting tails") + self.run() + + def stop(self): + self.log.debug("stopping tails") + self.remove_tails() + + def run(self): + self.observer.start() + + def on_modified(self, event): + self.log.info("%s modified. processing") + self.process_tail(event.src_path) + + def on_moved(self, event): + self.log.info("%s looks rotated. reopening", event.src_path) + tail = self.tails.get(event.src_path) + if tail: + self.log.info("closing (old) rotated file") + self.close_tail(tail) + self.log.info("opening (new) rotated file") + self.reprocess_tail(event.src_path, event.dest_path) + + def process_tail(self, path, seek_to_end=False): + # pylint: disable=no-member + self.log.debug("process_tail for %s", path) + # Find or create a tail. + tail = self.tails.get(path) + if tail: + fd_stat = os.fstat(tail.file_descriptor) + pos = os.lseek(tail.file_descriptor, 0, os.SEEK_CUR) + if fd_stat.st_size > pos: + self.log.debug("something to read") + self.read_tail(tail) + elif fd_stat.st_size < pos: + self.log.debug("file shrunk, seeking to new end") + os.lseek(tail.file_descriptor, 0, os.SEEK_END) + else: + self.log.info("tailing %s", path) + self.tails[path] = tail = self.open_tail(path, seek_to_end=seek_to_end) + + def reprocess_tail(self, src_path, dest_path): + self.log.debug("reprocess_tail for %s moved to %s", src_path, dest_path) + # pylint: disable=no-member + file_stat = os.stat(dest_path) + + # Find or create a tail. + tail = self.tails.get(dest_path) + if not tail: + self.log.info("tailing %s", dest_path) + self.tails[dest_path] = tail = self.open_tail(dest_path) + + def read_tail(self, tail): + self.log.debug("reading tail %s", tail.path) + while True: + buff = os.read(tail.file_descriptor, 1024) + if not buff: + return + + buff = buff.decode('utf8') + + # Append to last buffer + if tail.buffer: + buff = tail.buffer + buff + tail.buffer = "" + + lines = buff.splitlines(True) + if lines[-1][-1] != "\n": # incomplete line in buffer + # Save the remainder of the last line as the buffer + # This fixes a bug in the original logshipper.Tail + # implementation, which only assigned the last character: + # tail.buffer = lines[-1][-1] + # The last [-1] was unnecessary + tail.buffer = lines[-1] + # Only return lines with newlines + lines = lines[:-1] + + for line in lines: + self.handler(file_path=tail.path, line=line[:-1]) + + def open_tail(self, path, recursive=True, seek_to_end=False): + self.log.info("Opening tail %s", path) + # pylint: disable=no-member + fd = os.open(path, os.O_RDONLY | os.O_NONBLOCK) + watch = self.observer.schedule(self, path, recursive) + tail = FileTail(file_descriptor=fd, path=path, recursive=recursive, + watch=watch) + + if seek_to_end: + os.lseek(tail.file_descriptor, 0, os.SEEK_END) + + return tail + + def close_tail(self, tail): + # pylint: disable=no-member + self.log.info("Closing tail %s", tail.path) + try: + self.observer.unschedule(tail.watch) + except KeyError: + pass + try: + os.close(tail.file_descriptor) + except OSError: + pass + self.observer.stop() + self.observer.join() + if tail.buffer: + self.log.debug("triggering from tail buffer") + self.handler(file_path=tail.path, line=tail.buffer) + # Empty out tail.buffer so closing the same tail multiple times + # doesn't dispatch multiple times + tail.buffer = "" diff --git a/contrib/linux/sensors/tail/__init__.py b/contrib/linux/sensors/tail/__init__.py deleted file mode 100644 index 52bb73fc921..00000000000 --- a/contrib/linux/sensors/tail/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ - -from .tail import Tail # noqa: F401 diff --git a/contrib/linux/sensors/tail/tail.py b/contrib/linux/sensors/tail/tail.py deleted file mode 100644 index bf30da656de..00000000000 --- a/contrib/linux/sensors/tail/tail.py +++ /dev/null @@ -1,329 +0,0 @@ -# Copyright 2014 Koert van der Veer -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - - -import glob -import logging - -import eventlet -from eventlet.green import os -import inotify.adapters -import inotify.constants as in_constants -import six - -LOG = logging.getLogger(__name__) - -INOTIFY_FILE_MASK = (in_constants.IN_MODIFY | in_constants.IN_OPEN) -INOTIFY_DIR_MASK = (in_constants.IN_CREATE | in_constants.IN_DELETE | in_constants.IN_MOVE) - - -class BaseInput(object): - handler = None - should_run = False - thread = None - - def set_handler(self, handler): - self.handler = handler - - def start(self): - LOG.debug("starting: should_run %r", self.should_run) - self.should_run = True - if self.thread is None: - self.thread = eventlet.spawn(self._run) - return self.thread - - def stop(self): - LOG.debug("stopping: should_run %r", self.should_run) - self.should_run = False - thread = self.thread - if thread is not None: - thread.wait() - self.thread = None - - def _run(self): - try: - self.run() - except Exception as e: - LOG.exception("Encountered exception while running %s", self) - raise e - - def run(self): - raise NotImplementedError # pragma: no cover - - -class Tail(BaseInput): - """Follows files, and processes new lines in those files with a callback - handler. - - Glob-style wildcards are supported, and new files matching the wildcard - will be discovered. - - Rotated files are automatically discovered, and reopened. - - Example for ``input.yml``: - - .. code:: yaml - - - tail: - filename: - - /var/log/syslog - - /var/log/my_app/*.log - """ - - class FileTail(object): - __slots__ = ['file_descriptor', 'path', 'buffer', 'stat', 'rescan', - 'watch_descriptor'] - - def __init__(self): - self.buffer = b"" - self.file_descriptor = None - self.path = None - self.rescan = None - self.stat = None - self.watch_descriptor = None - - class Event(object): - @classmethod - def from_INOTIFY_HEADER(cls, in_header, path, filename=None): - return cls( - wd=in_header.wd, - mask=in_header.mask, - cookie=in_header.cookie, - len_=in_header.len, - dir_=bool(filename), - path=path, - filename=filename) - - def __init__(self, wd, mask, cookie, len_, dir_, path, filename=None): - self.wd = wd - self.mask = mask - self.cookie = cookie - self.len = len_ - self.path = path - self.filename = filename - self.dir = dir_ - - def __str__(self): - return ("{wd} {mask} {path} {filename} {dir}".format( - wd=self.wd, - mask=self.mask, - path=self.path, - filename=self.filename, - dir=self.dir)) - - def __repr__(self): - return ("".format( - wd=self.wd, - mask=self.mask, - path=self.path, - filename=self.filename, - dir=self.dir)) - - def __init__(self, filenames, block_duration_s=None, num_green_threads=10): - # Eventlet monkeypatches the os module, so we disable these checks - # pylint: disable=no-member - if isinstance(filenames, six.string_types): - filenames = [filenames] - - self.globs = [os.path.abspath(filename) for filename in filenames] - # TODO: Handle expanding the greenpool if necessary - kwargs = {} - if block_duration_s: - kwargs['block_duration_s'] = block_duration_s - LOG.debug("creating inotify watch manager") - self.watch_manager = inotify.adapters.Inotify(**kwargs) - LOG.debug("") - self.tails = {} - self.dir_watches = {} - self.thread = None - self.pool = eventlet.greenpool.GreenPool(size=num_green_threads) - - # MAIN API - - def add_file(self, filepath): - self.pool.spawn(self.process_tail, filepath) - eventlet.sleep(0.01) - - def remove_file(self, filepath): - self.pool.spawn(self._remove_file, filepath) - eventlet.sleep(0.01) - - def run(self): - self.update_tails(self.globs, do_read_all=False) - try: - for event in self.watch_manager.event_gen(): - if event: - (inotify_header, type_names, path, filename) = event - tail_event = Tail.Event.from_INOTIFY_HEADER(inotify_header, path, filename) - self.pool.spawn_n(self._inotify_event, tail_event) - else: - eventlet.sleep() - if not self.should_run: - break - finally: - self.pool.waitall() - self.remove_tails() - - # PRIVATE API - - def _remove_file(self, filepath): - tail = self.tails.get(filepath) - if tail: - self.close_tail(tail) - - def _inotify_file(self, event): - LOG.debug("file notified %r", event) - tail = self.tails.get(event.path) - if tail: - if event.mask & in_constants.IN_MODIFY: - if tail.rescan: - LOG.debug("rescanning %r", event.path) - self.process_tail(event.path) - else: - LOG.debug("reading %r", tail) - self.read_tail(tail) - else: - tail.rescan = True - - def _inotify_dir(self, event): - LOG.debug("dir notified %r", event) - tail = self.tails.get(event.path) - if tail: - self.process_tail(event.path) - - if event.dir and not tail: - self.update_tails(self.globs) - - def _inotify_event(self, event): - LOG.debug("change notified %r", event) - if event.dir: - self._inotify_dir(event) - else: - self._inotify_file(event) - eventlet.sleep() - - # INTERNAL TAIL API - - def read_tail(self, tail): - while True: - buff = os.read(tail.file_descriptor, 1024) - if not buff: - return - - buff = buff.decode('utf8') - - # Append to last buffer - if tail.buffer: - buff = tail.buffer + buff - tail.buffer = "" - - lines = buff.splitlines(True) - if lines[-1][-1] != "\n": # incomplete line in buffer - # Save the remainder of the last line as the buffer - # This fixes a bug in the original logshipper.Tail - # implementation, which only assigned the last character: - # tail.buffer = lines[-1][-1] - # The last [-1] was unnecessary - tail.buffer = lines[-1] - # Only return lines with newlines - lines = lines[:-1] - - for line in lines: - self.handler(file_path=tail.path, line=line[:-1]) - - def process_tail(self, path, should_seek=False): - # pylint: disable=no-member - file_stat = os.stat(path) - - LOG.debug("process_tail for %s", path) - # Find or create a tail. - tail = self.tails.get(path) - if tail: - fd_stat = os.fstat(tail.file_descriptor) - pos = os.lseek(tail.file_descriptor, 0, os.SEEK_CUR) - if fd_stat.st_size > pos: - LOG.debug("something to read") - self.read_tail(tail) - if (tail.stat.st_size > file_stat.st_size or - tail.stat.st_ino != file_stat.st_ino): - LOG.info("%s looks rotated. reopening", path) - self.close_tail(tail) - tail = None - should_seek = False - - if not tail: - LOG.info("tailing %s", path) - self.tails[path] = tail = self.open_tail(path, should_seek) - tail.stat = file_stat - self.read_tail(tail) - - tail.rescan = False - - def update_tails(self, globs, do_read_all=True): - # pylint: disable=no-member - watches = set() - - LOG.debug("update tails: %r", globs) - - for fileglob in globs: - for path in glob.iglob(fileglob): - self.process_tail(path, not do_read_all) - watches.add(path) - - for vanished in set(self.tails) - watches: - LOG.info("%s vanished, stop tailing", vanished) - self.close_tail(self.tails.pop(vanished)) - - for path in globs: - while len(path) > 1: - path = os.path.dirname(path) - if path not in self.dir_watches: - LOG.debug("monitoring dir %s", path) - - self.dir_watches[path] = self.watch_manager.add_watch( - path, INOTIFY_DIR_MASK) - - if '*' not in path and '?' not in path: - break - - def remove_tails(self): - LOG.debug("remove tails") - - for path, tail in self.tails.items(): - self.close_tail(tail) - - def open_tail(self, path, go_to_end=False): - # pylint: disable=no-member - tail = Tail.FileTail() - tail.file_descriptor = os.open(path, os.O_RDONLY | os.O_NONBLOCK) - tail.path = path - - if go_to_end: - os.lseek(tail.file_descriptor, 0, os.SEEK_END) - - watch_descriptor = self.watch_manager.add_watch( - path, INOTIFY_FILE_MASK) - - tail.watch_descriptor = watch_descriptor - return tail - - def close_tail(self, tail): - # pylint: disable=no-member - self.watch_manager.remove_watch(tail.path) - os.close(tail.file_descriptor) - if tail.buffer: - LOG.debug("triggering from tail buffer") - self.handler(file_path=tail.path, line=tail.buffer) diff --git a/contrib/linux/sensors/test_file_watch_sensor.py b/contrib/linux/sensors/test_file_watch_sensor.py new file mode 100644 index 00000000000..33351d97c9d --- /dev/null +++ b/contrib/linux/sensors/test_file_watch_sensor.py @@ -0,0 +1,194 @@ + +import tempfile +import time + +import mock +import unittest2 + +from file_watch_sensor import FileWatchSensor + + +class TailTestCase(unittest2.TestCase): + def test_add_trigger_missing_file_path(self): + sensor_service = mock.MagicMock() + fws = FileWatchSensor(sensor_service) + + trigger = { + 'parameters': { + 'file_path': None, + }, + } + + fws_logger = mock.MagicMock() + fws_tail = mock.MagicMock() + + fws._logger = fws_logger + fws._tail = fws_tail + fws.add_trigger(trigger) + + self.assertEqual(fws_logger.error.call_args[0][0], + 'Received trigger type without "file_path" field.') + + def test_add_trigger_missing_ref(self): + sensor_service = mock.MagicMock() + fws = FileWatchSensor(sensor_service) + + trigger = { + 'parameters': { + 'file_path': 'trigger_file_path', + }, + } + + with self.assertRaises(Exception) as cm: + fws.add_trigger(trigger) + + self.assertEqual(cm.exception.args[0], 'Trigger %s did not contain a ref.' % trigger) + + def test_remove_trigger_missing_file_path(self): + sensor_service = mock.MagicMock() + fws = FileWatchSensor(sensor_service) + + trigger = { + 'parameters': { + 'file_path': 'trigger_file_path', + }, + 'ref': 'ref_file_path', + } + + fws_logger = mock.MagicMock() + fws_tail = mock.MagicMock() + + fws._logger = fws_logger + fws._tail = fws_tail + fws.add_trigger(trigger) + + self.assertEqual(fws_logger.info.call_args[0][0], + 'Added file "trigger_file_path"') + + trigger['parameters']['file_path'] = None + + fws.remove_trigger(trigger) + + self.assertEqual(fws_logger.error.call_args[0][0], + 'Received trigger type without "file_path" field.') + + def test_remove_trigger(self): + sensor_service = mock.MagicMock() + fws = FileWatchSensor(sensor_service) + + trigger = { + 'parameters': { + 'file_path': 'trigger_file_path', + }, + 'ref': 'ref_file_path', + } + + fws_logger = mock.MagicMock() + fws_tail = mock.MagicMock() + + fws._logger = fws_logger + fws._tail = fws_tail + fws.add_trigger(trigger) + + self.assertEqual(fws_logger.info.call_args[0][0], + 'Added file "trigger_file_path"') + + fws.remove_trigger(trigger) + + self.assertEqual(fws_logger.info.call_args[0][0], + 'Removed file "trigger_file_path"') + + def test_triggering(self): + temp_file = tempfile.mktemp() + + sensor_service = mock.MagicMock() + fws = FileWatchSensor(sensor_service) + fws.setup() + + trigger = { + 'parameters': { + 'file_path': temp_file, + }, + 'ref': 'ref_file_path', + 'seek_to_end': False, + } + + with open(temp_file, 'w') as f: + f.write("Line one\nLine t") + + fws_logger = mock.MagicMock() + + fws._logger = fws_logger + fws.add_trigger(trigger) + self.assertEqual(fws_logger.info.call_args[0][0], + 'Added file "%s"' % temp_file) + fws.run() + time.sleep(2) + + with open(temp_file, 'a') as f: + f.write("wo\nLine three\nLine four") + + time.sleep(2) + + fws.cleanup() + fws.remove_trigger(trigger) + self.assertEqual(fws_logger.info.call_args[0][0], + 'Removed file "%s"' % temp_file) + + self.assertEqual(sensor_service.dispatch.call_count, 4) + self.assertEqual(sensor_service.dispatch.call_args_list[0][1]['payload']['line'], + 'Line one') + self.assertEqual(sensor_service.dispatch.call_args_list[1][1]['payload']['line'], + 'Line two') + self.assertEqual(sensor_service.dispatch.call_args_list[2][1]['payload']['line'], + 'Line three') + self.assertEqual(sensor_service.dispatch.call_args_list[3][1]['payload']['line'], + 'Line four') + + def test_triggering_empty_tail_buffer(self): + temp_file = tempfile.mktemp() + + sensor_service = mock.MagicMock() + fws = FileWatchSensor(sensor_service) + fws.setup() + + trigger = { + 'parameters': { + 'file_path': temp_file, + }, + 'ref': 'ref_file_path', + } + + with open(temp_file, 'w') as f: + f.write("Line one\nLine t") + + fws_logger = mock.MagicMock() + + fws._logger = fws_logger + fws.add_trigger(trigger) + self.assertEqual(fws_logger.info.call_args[0][0], + 'Added file "%s"' % temp_file) + fws.run() + time.sleep(2) + + with open(temp_file, 'a') as f: + f.write("wo\nLine three\nLine four\n") + + time.sleep(2) + + fws.cleanup() + fws.remove_trigger(trigger) + self.assertEqual(fws_logger.info.call_args[0][0], + 'Removed file "%s"' % temp_file) + + self.assertEqual(sensor_service.dispatch.call_count, 3) + self.assertEqual(sensor_service.dispatch.call_args_list[0][1]['payload']['line'], + 'wo') + self.assertEqual(sensor_service.dispatch.call_args_list[1][1]['payload']['line'], + 'Line three') + self.assertEqual(sensor_service.dispatch.call_args_list[2][1]['payload']['line'], + 'Line four') + + +if __name__ == '__main__': + unittest2.main() diff --git a/contrib/linux/sensors/tail/test_tail.py b/contrib/linux/sensors/test_tail.py similarity index 55% rename from contrib/linux/sensors/tail/test_tail.py rename to contrib/linux/sensors/test_tail.py index f08587233ae..60411906262 100644 --- a/contrib/linux/sensors/tail/test_tail.py +++ b/contrib/linux/sensors/test_tail.py @@ -18,57 +18,30 @@ import shutil import sys import tempfile -import unittest +import time +import unittest2 try: from StringIO import StringIO # Python 2 except ImportError: from io import StringIO # Python 3 -import eventlet -import inotify - -from .tail import Tail +from tail import Tail LOG = logging.getLogger(__name__) -# https://stackoverflow.com/a/30716207 -class ReplaceStdErr(object): - """Context manager that replaces stderr with a StringIO object""" - def __init__(self): - self.original_stderr = sys.stderr - - def __enter__(self): - sys.stderr = StringIO() - - def __exit__(self, type, value, traceback): - sys.stderr = self.original_stderr - - -class TailTestCase(unittest.TestCase): +class TailTestCase(unittest2.TestCase): def test_nonexisting_file(self): messages = [] - def message_handler(file_path, line): - LOG.debug('event generated file_path: %s, line: %s', file_path, line) - messages.append({'file_path': file_path, 'message': line}) - path = os.path.join('/', 'tmp', 'tail_test_case', 'this_file_should_never_exist') self.assertFalse(os.path.exists(path)) - tail = Tail(path) - tail.set_handler(message_handler) - thread = tail.start() - - with self.assertRaises(inotify.calls.InotifyError) as e: - with ReplaceStdErr(): - LOG.debug("Waiting for thread to finish") - thread.wait() # give thread a chance to error out - - self.assertEqual(e.exception.message, "Call failed (should not be -1): (-1) ERRNO=(0)") + with self.assertRaises(OSError) as e: + tail = Tail(filenames=path) def test_preexisting_file(self): messages = [] @@ -81,17 +54,18 @@ def message_handler(file_path, line): f.write(b"test123\n") f.flush() - tail = Tail(f.name) - tail.set_handler(message_handler) + tail = Tail(handler=message_handler, filenames=[]) + + tail.add_file(f.name) + tail.start() - eventlet.sleep(0.01) # give thread a chance to open the file f.write(b"second line\n") f.flush() - eventlet.sleep(0.01) # give thread a chance to read the line + + time.sleep(1) tail.stop() - eventlet.sleep(0.01) # give thread a chance to close the line self.assertEqual(len(messages), 1) self.assertEqual(messages[0]["file_path"], f.name) @@ -110,23 +84,21 @@ def message_handler(file_path, line): self.assertEqual(len(messages), 0) - tail = Tail(f.name) + tail = Tail(filenames=f.name) tail.set_handler(message_handler) tail.start() - eventlet.sleep(0.01) # give thread a chance to open the file f.write(b"second line") f.flush() - eventlet.sleep(0.01) # give thread a chance to read the line self.assertEqual(len(messages), 0) f.write(b"\n") f.flush() - eventlet.sleep(0.01) # give thread a chance to read the line + + time.sleep(1) tail.stop() - eventlet.sleep(0.01) # give thread a chance to close the line self.assertEqual(len(messages), 1) self.assertEqual(messages[0]["file_path"], f.name) @@ -145,14 +117,14 @@ def message_handler(file_path, line): self.assertEqual(len(messages), 0) - tail = Tail(f.name) + tail = Tail(filenames=f.name) tail.set_handler(message_handler) tail.start() - eventlet.sleep(0.01) # give thread a chance to open the file f.write(b"second line\nthird ") f.flush() - eventlet.sleep(0.01) # give thread a chance to read the line + + time.sleep(1) self.assertEqual(len(messages), 1) self.assertEqual(messages[0]["file_path"], f.name) @@ -160,10 +132,10 @@ def message_handler(file_path, line): f.write(b"line\n") f.flush() - eventlet.sleep(0.01) # give thread a chance to read the line + + time.sleep(1) tail.stop() - eventlet.sleep(0.01) # give thread a chance to close the line self.assertEqual(len(messages), 2) self.assertEqual(messages[0]["file_path"], f.name) @@ -184,21 +156,22 @@ def message_handler(file_path, line): self.assertEqual(len(messages), 0) - tail = Tail(f.name) + tail = Tail(filenames=f.name) tail.set_handler(message_handler) tail.start() - eventlet.sleep(0.01) # give thread a chance to open the file f.write(b"second line\nthird") f.flush() - eventlet.sleep(0.01) # give thread a chance to read the line + + time.sleep(1) self.assertEqual(len(messages), 1) self.assertEqual(messages[0]["file_path"], f.name) self.assertEqual(messages[0]["message"], "second line") tail.stop() - eventlet.sleep(0.01) # give thread a chance to close the line + + time.sleep(1) self.assertEqual(len(messages), 2) self.assertEqual(messages[0]["file_path"], f.name) @@ -206,41 +179,6 @@ def message_handler(file_path, line): self.assertEqual(messages[1]["file_path"], f.name) self.assertEqual(messages[1]["message"], "third") - def test_wildcard(self): - messages = [] - - def message_handler(file_path, line): - LOG.debug('event generated file_path: %s, line: %s', file_path, line) - messages.append({'file_path': file_path, 'message': line}) - - try: - path = tempfile.mkdtemp() - tail = Tail(os.path.join(path, "*.log")) - tail.set_handler(message_handler) - tail.start() - eventlet.sleep(0.01) # give thread a chance to open the file - - LOG.debug("about to write line 1") - with open(os.path.join(path, "test.log"), 'w') as f: - f.write("line 1\n") - - eventlet.sleep(0.01) # give thread a chance to read the line - self.assertEqual(len(messages), 1) - self.assertEqual(messages[0]["message"], "line 1") - - LOG.debug("about to write line 2") - with open(os.path.join(path, "test.log"), 'a') as f: - f.write("line 2\n") - - eventlet.sleep(0.01) # give thread a chance to read the line - self.assertEqual(len(messages), 2) - self.assertEqual(messages[0]["file_path"], os.path.join(path, "test.log")) - self.assertEqual(messages[0]["message"], "line 1") - self.assertEqual(messages[1]["file_path"], os.path.join(path, "test.log")) - self.assertEqual(messages[1]["message"], "line 2") - - tail.stop() - eventlet.sleep(0.1) # give thread a chance to close the line - finally: - shutil.rmtree(path) +if __name__ == '__main__': + unittest2.main() diff --git a/fixed-requirements.txt b/fixed-requirements.txt index dda2d16a501..9f2cbcdd29b 100644 --- a/fixed-requirements.txt +++ b/fixed-requirements.txt @@ -27,7 +27,7 @@ passlib==1.7.1 lockfile==0.12.2 python-gnupg==0.4.2 jsonpath-rw==1.4.0 -inotify==0.2.9 +git+https://github.com/blag/watchdog3.git#python2.7-support semver==2.8.0 pytz==2018.4 stevedore==1.28.0 diff --git a/requirements.txt b/requirements.txt index 4431bb36825..7cfe2b598e6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,7 +12,6 @@ git+https://github.com/StackStorm/st2-auth-backend-flat-file.git@master#egg=st2- gitpython==2.1.10 greenlet==0.4.13 gunicorn==19.8.1 -inotify==0.2.9 ipaddr jinja2 jsonpath-rw==1.4.0 @@ -52,6 +51,8 @@ sseclient==0.0.19 stevedore==1.28.0 tooz==1.62.0 unittest2 +# TODO: Move this to into the StackStorm organization +git+https://github.com/blag/watchdog3.git#python2.7-support webob==1.7.4 webtest zake==0.2.2