Skip to content
This repository has been archived by the owner. It is now read-only.

Commit

Permalink
Merge pull request #46 from HubbleStack/develop
Browse files Browse the repository at this point in the history
Merge to master, pending v2016.10.1
  • Loading branch information
basepi authored Oct 12, 2016
2 parents ed37e85 + 5cd16c2 commit fd9f940
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 178 deletions.
2 changes: 1 addition & 1 deletion FORMULA
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: hubblestack_pulsar
os: RedHat, CentOS, Debian, Ubuntu
os_family: RedHat, Debian
version: 2016.7.1
version: 2016.9.4
release: 1
summary: HubbleStack Pulsar
description: HubbleStack Pulsar
13 changes: 8 additions & 5 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ repo for updates and bugfixes!)

.. code-block:: shell
wget https://spm.hubblestack.io/2016.7.1/hubblestack_pulsar-2016.7.1-1.spm
spm local install hubblestack_pulsar-2016.7.1-1.spm
wget http://spm.hubblestack.io/pulsar/hubblestack_pulsar-2016.9.4-1.spm
spm local install hubblestack_pulsar-2016.9.4-1.spm
You should now be able to sync the new modules to your minion(s) using the
``sync_modules`` Salt utility:
Expand Down Expand Up @@ -172,23 +172,26 @@ is meant to act as a template. It works in tandem with the
different needs and requirements, and we understand that, so we've designed
Pulsar to be flexible.

** pillar.example **

.. code-block:: yaml
# pillar.example
beacons:
pulsar:
paths:
- /var/cache/salt/minion/files/base/hubblestack_pulsar/hubblestack_pulsar_config.yaml
schedule:
cache_nebula:
cache_pulsar:
function: cp.cache_file
seconds: 86400
args:
- salt://hubblestack_pulsar/hubblestack_pulsar_config.yaml
return_job: False
** hubblestack_pulsar_config **

.. code-block:: yaml
# hubblestack_pulsar_config.yaml
/etc: { recurse: True, auto_add: True }
/bin: { recurse: True, auto_add: True }
/sbin: { recurse: True, auto_add: True }
Expand Down
130 changes: 74 additions & 56 deletions _beacons/pulsar.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from __future__ import absolute_import
import collections
import fnmatch
import multiprocessing
import os
import re
import yaml
Expand Down Expand Up @@ -46,6 +47,8 @@


def __virtual__():
if salt.utils.is_windows():
return False, 'This module only works on Linux'
if HAS_PYINOTIFY:
return __virtualname__
return False
Expand Down Expand Up @@ -88,7 +91,7 @@ def beacon(config):
pulsar:
paths:
- /var/cache/salt/minion/files/base/hubblestack_pulsar/hubblestack_pulsar_config.yaml
refresh_frequency: 60
refresh_interval: 300
verbose: False
Example yaml config on fileserver (targeted by pillar)
Expand Down Expand Up @@ -161,16 +164,17 @@ def beacon(config):
ret = []
notifier = _get_notifier()
wm = notifier._watch_manager
update_watches = False

# Get config(s) from salt fileserver if we don't have them already
if CONFIG and CONFIG_STALENESS < config.get('refresh_frequency', 60):
if CONFIG and CONFIG_STALENESS < config.get('refresh_interval', 300):
CONFIG_STALENESS += 1
CONFIG.update(config)
CONFIG['verbose'] = config.get('verbose')
config = CONFIG
else:
if config.get('verbose'):
log.debug('No cached config found for pulsar, fetching new.')
log.debug('No cached config found for pulsar, retrieving fresh from disk.')
new_config = config
if isinstance(config.get('paths'), list):
for path in config['paths']:
Expand All @@ -196,6 +200,7 @@ def beacon(config):
config = new_config
CONFIG_STALENESS = 0
CONFIG = config
update_watches = True

if config.get('verbose'):
log.debug('Pulsar beacon config (compiled from config list):\n{0}'.format(config))
Expand All @@ -205,6 +210,8 @@ def beacon(config):
notifier.read_events()
notifier.process_events()
queue = __context__['pulsar.queue']
if config.get('verbose'):
log.debug('Pulsar found {0} inotify events.'.format(len(queue)))
while queue:
event = queue.popleft()
if event.maskname == 'IN_Q_OVERFLOW':
Expand Down Expand Up @@ -264,59 +271,60 @@ def beacon(config):
else:
log.info('Excluding {0} from event for {1}'.format(event.pathname, path))

# Get paths currently being watched
current = set()
for wd in wm.watches:
current.add(wm.watches[wd].path)

# Update existing watches and add new ones
# TODO: make the config handle more options
for path in config:
if path == 'return' or path == 'checksum' or path == 'stats' \
or path == 'batch' or path == 'verbose' or path == 'paths' \
or path == 'refresh_frequency':
continue
if isinstance(config[path], dict):
mask = config[path].get('mask', DEFAULT_MASK)
excludes = config[path].get('exclude', None)
if isinstance(mask, list):
r_mask = 0
for sub in mask:
r_mask |= _get_mask(sub)
elif isinstance(mask, salt.ext.six.binary_type):
r_mask = _get_mask(mask)
if update_watches:
# Get paths currently being watched
current = set()
for wd in wm.watches:
current.add(wm.watches[wd].path)

# Update existing watches and add new ones
# TODO: make the config handle more options
for path in config:
if path == 'return' or path == 'checksum' or path == 'stats' \
or path == 'batch' or path == 'verbose' or path == 'paths' \
or path == 'refresh_interval':
continue
if isinstance(config[path], dict):
mask = config[path].get('mask', DEFAULT_MASK)
excludes = config[path].get('exclude', None)
if isinstance(mask, list):
r_mask = 0
for sub in mask:
r_mask |= _get_mask(sub)
elif isinstance(mask, salt.ext.six.binary_type):
r_mask = _get_mask(mask)
else:
r_mask = mask
mask = r_mask
rec = config[path].get('recurse', False)
auto_add = config[path].get('auto_add', False)
else:
r_mask = mask
mask = r_mask
rec = config[path].get('recurse', False)
auto_add = config[path].get('auto_add', False)
else:
mask = DEFAULT_MASK
rec = False
auto_add = False

if path in current:
for wd in wm.watches:
if path == wm.watches[wd].path:
update = False
if wm.watches[wd].mask != mask:
update = True
if wm.watches[wd].auto_add != auto_add:
update = True
if update:
wm.update_watch(wd, mask=mask, rec=rec, auto_add=auto_add)
elif os.path.exists(path):
excl = None
if isinstance(excludes, list):
excl = []
for exclude in excludes:
if isinstance(exclude, dict):
excl.append(exclude.keys()[0])
else:
excl.append(exclude)
excl = pyinotify.ExcludeFilter(excl)
mask = DEFAULT_MASK
rec = False
auto_add = False

if path in current:
for wd in wm.watches:
if path == wm.watches[wd].path:
update = False
if wm.watches[wd].mask != mask:
update = True
if wm.watches[wd].auto_add != auto_add:
update = True
if update:
wm.update_watch(wd, mask=mask, rec=rec, auto_add=auto_add)
elif os.path.exists(path):
excl = None
if isinstance(excludes, list):
excl = []
for exclude in excludes:
if isinstance(exclude, dict):
excl.append(exclude.keys()[0])
else:
excl.append(exclude)
excl = pyinotify.ExcludeFilter(excl)

wm.add_watch(path, mask, rec=rec, auto_add=auto_add, exclude_filter=excl)
wm.add_watch(path, mask, rec=rec, auto_add=auto_add, exclude_filter=excl)

if __salt__['config.get']('hubblestack:pulsar:maintenance', False):
# We're in maintenance mode, throw away findings
Expand Down Expand Up @@ -344,10 +352,20 @@ def beacon(config):
transformed = []
for item in ret:
transformed.append({'return': item})
__returners__[returner](transformed)
if config.get('multiprocessing_return', True):
p = multiprocessing.Process(target=__returners__[returner], args=(transformed,))
p.daemon = True
p.start()
else:
__returners__[returner](transformed)
else:
for item in ret:
__returners__[returner]({'return': item})
if config.get('multiprocessing_return', True):
p = multiprocessing.Process(target=__returners__[returner], args=({'return': item},))
p.daemon = True
p.start()
else:
__returners__[returner]({'return': item})
return []
else:
# Return event data
Expand Down
Loading

0 comments on commit fd9f940

Please sign in to comment.