From 35ee5eaa2aee44ef2adf7756c7c637d9c39810bf Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Mon, 6 Mar 2017 15:36:57 -0700 Subject: [PATCH] Consolidate pillar and allow for multiple splunk endpoints https://github.com/hubblestack/quasar/pull/51 --- _returners/splunk_nebula_return.py | 203 ++++++++------- _returners/splunk_nova_return.py | 343 +++++++++++++------------- _returners/splunk_pulsar_return.py | 379 +++++++++++++++-------------- 3 files changed, 487 insertions(+), 438 deletions(-) diff --git a/_returners/splunk_nebula_return.py b/_returners/splunk_nebula_return.py index 43bb652..2855e7f 100644 --- a/_returners/splunk_nebula_return.py +++ b/_returners/splunk_nebula_return.py @@ -3,7 +3,6 @@ HubbleStack Nebula-to-Splunk returner :maintainer: HubbleStack -:maturity: 2016.10.4 :platform: All :requires: SaltStack @@ -13,13 +12,12 @@ .. code-block:: yaml hubblestack: - nebula: - returner: - splunk: - token: - indexer: - sourcetype: - index: + returner: + splunk: + - token: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX + indexer: splunk-indexer.domain.tld + index: hubble + sourcetype_nebula: hubble_osquery You can also add an `custom_fields` argument which is a list of keys to add to events with using the results of config.get(). These new keys will be prefixed @@ -30,13 +28,12 @@ .. code-block:: yaml hubblestack: - nebula: - returner: - splunk: - token: - indexer: - sourcetype: - index: + returner: + splunk: + - token: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX + indexer: splunk-indexer.domain.tld + index: hubble + sourcetype_nebula: hubble_osquery custom_fields: - site - product_group @@ -62,16 +59,7 @@ def returner(ret): - # Customized to split up the queries and extract the correct sourcetype - - opts = _get_options() - logging.info('Options: %s' % json.dumps(opts)) - http_event_collector_key = opts['token'] - http_event_collector_host = opts['indexer'] - hec_ssl = opts['http_event_server_ssl'] - proxy = opts['proxy'] - timeout = opts['timeout'] - custom_fields = opts['custom_fields'] + opts_list = _get_options() # Gather amazon information if present aws_ami_id = None @@ -88,79 +76,106 @@ def returner(ret): # Not on an AWS box pass - # Set up the collector - hec = http_event_collector(http_event_collector_key, http_event_collector_host, http_event_server_ssl=hec_ssl, proxy=proxy, timeout=timeout) - - # st = 'salt:hubble:nova' - data = ret['return'] - minion_id = ret['id'] - jid = ret['jid'] - master = __grains__['master'] - fqdn = __grains__['fqdn'] - # Sometimes fqdn is blank. If it is, replace it with minion_id - fqdn = fqdn if fqdn else minion_id - try: - fqdn_ip4 = __grains__['fqdn_ip4'][0] - except IndexError: - fqdn_ip4 = __grains__['ipv4'][0] - - if not data: - return - else: - for query in data: - for query_name, query_results in query.iteritems(): - for query_result in query_results['data']: - event = {} - payload = {} - event.update(query_result) - event.update({'query': query_name}) - event.update({'job_id': jid}) - event.update({'master': master}) - event.update({'minion_id': minion_id}) - event.update({'dest_host': fqdn}) - event.update({'dest_ip': fqdn_ip4}) - - if aws_instance_id is not None: - event.update({'aws_ami_id': aws_ami_id}) - event.update({'aws_instance_id': aws_instance_id}) - event.update({'aws_account_id': aws_account_id}) - - for custom_field in custom_fields: - custom_field_name = 'custom_' + custom_field - custom_field_value = __salt__['config.get'](custom_field, '') - if isinstance(custom_field_value, str): - event.update({custom_field_name: custom_field_value}) - elif isinstance(custom_field_value, list): - custom_field_value = ','.join(custom_field_value) - event.update({custom_field_name: custom_field_value}) - - payload.update({'host': fqdn}) - payload.update({'index': opts['index']}) - payload.update({'sourcetype': opts['sourcetype']}) - payload.update({'event': event}) - hec.batchEvent(payload) - - hec.flushBatch() + for opts in opts_list: + logging.info('Options: %s' % json.dumps(opts)) + http_event_collector_key = opts['token'] + http_event_collector_host = opts['indexer'] + hec_ssl = opts['http_event_server_ssl'] + proxy = opts['proxy'] + timeout = opts['timeout'] + custom_fields = opts['custom_fields'] + + # Set up the collector + hec = http_event_collector(http_event_collector_key, http_event_collector_host, http_event_server_ssl=hec_ssl, proxy=proxy, timeout=timeout) + + # st = 'salt:hubble:nova' + data = ret['return'] + minion_id = ret['id'] + jid = ret['jid'] + master = __grains__['master'] + fqdn = __grains__['fqdn'] + # Sometimes fqdn is blank. If it is, replace it with minion_id + fqdn = fqdn if fqdn else minion_id + try: + fqdn_ip4 = __grains__['fqdn_ip4'][0] + except IndexError: + fqdn_ip4 = __grains__['ipv4'][0] + + if not data: + return + else: + for query in data: + for query_name, query_results in query.iteritems(): + for query_result in query_results['data']: + event = {} + payload = {} + event.update(query_result) + event.update({'query': query_name}) + event.update({'job_id': jid}) + event.update({'master': master}) + event.update({'minion_id': minion_id}) + event.update({'dest_host': fqdn}) + event.update({'dest_ip': fqdn_ip4}) + + if aws_instance_id is not None: + event.update({'aws_ami_id': aws_ami_id}) + event.update({'aws_instance_id': aws_instance_id}) + event.update({'aws_account_id': aws_account_id}) + + for custom_field in custom_fields: + custom_field_name = 'custom_' + custom_field + custom_field_value = __salt__['config.get'](custom_field, '') + if isinstance(custom_field_value, str): + event.update({custom_field_name: custom_field_value}) + elif isinstance(custom_field_value, list): + custom_field_value = ','.join(custom_field_value) + event.update({custom_field_name: custom_field_value}) + + payload.update({'host': fqdn}) + payload.update({'index': opts['index']}) + payload.update({'sourcetype': opts['sourcetype']}) + payload.update({'event': event}) + hec.batchEvent(payload) + + hec.flushBatch() return def _get_options(): - try: - token = __salt__['config.get']('hubblestack:nebula:returner:splunk:token').strip() - indexer = __salt__['config.get']('hubblestack:nebula:returner:splunk:indexer') - sourcetype = __salt__['config.get']('hubblestack:nebula:returner:splunk:sourcetype') - index = __salt__['config.get']('hubblestack:nebula:returner:splunk:index') - custom_fields = __salt__['config.get']('hubblestack:nebula:returner:splunk:custom_fields', []) - except: - return None - splunk_opts = {'token': token, 'indexer': indexer, 'sourcetype': sourcetype, 'index': index, 'custom_fields': custom_fields} - - hec_ssl = __salt__['config.get']('hubblestack:nebula:returner:splunk:hec_ssl', True) - splunk_opts['http_event_server_ssl'] = hec_ssl - splunk_opts['proxy'] = __salt__['config.get']('hubblestack:nebula:returner:splunk:proxy', {}) - splunk_opts['timeout'] = __salt__['config.get']('hubblestack:nebula:returner:splunk:timeout', 9.05) - - return splunk_opts + if __salt__['config.get']('hubblestack:returner:splunk'): + splunk_opts = [] + returner_opts = __salt__['config.get']('hubblestack:returner:splunk') + if not isinstance(returner_opts, list): + returner_opts = [returner_opts] + for opt in returner_opts: + processed = {} + processed['token'] = opt.get('token') + processed['indexer'] = opt.get('indexer') + processed['index'] = opt.get('index') + processed['custom_fields'] = opt.get('custom_fields', []) + processed['sourcetype'] = opt.get('sourcetype_nebula', 'hubble_osquery') + processed['http_event_server_ssl'] = opt.get('hec_ssl', True) + processed['proxy'] = opt.get('proxy', {}) + processed['timeout'] = opt.get('timeout', 9.05) + splunk_opts.append(processed) + return splunk_opts + else: + try: + token = __salt__['config.get']('hubblestack:nebula:returner:splunk:token').strip() + indexer = __salt__['config.get']('hubblestack:nebula:returner:splunk:indexer') + sourcetype = __salt__['config.get']('hubblestack:nebula:returner:splunk:sourcetype') + index = __salt__['config.get']('hubblestack:nebula:returner:splunk:index') + custom_fields = __salt__['config.get']('hubblestack:nebula:returner:splunk:custom_fields', []) + except: + return None + splunk_opts = {'token': token, 'indexer': indexer, 'sourcetype': sourcetype, 'index': index, 'custom_fields': custom_fields} + + hec_ssl = __salt__['config.get']('hubblestack:nebula:returner:splunk:hec_ssl', True) + splunk_opts['http_event_server_ssl'] = hec_ssl + splunk_opts['proxy'] = __salt__['config.get']('hubblestack:nebula:returner:splunk:proxy', {}) + splunk_opts['timeout'] = __salt__['config.get']('hubblestack:nebula:returner:splunk:timeout', 9.05) + + return [splunk_opts] def send_splunk(event, index_override=None, sourcetype_override=None): diff --git a/_returners/splunk_nova_return.py b/_returners/splunk_nova_return.py index 5ce98c0..efed5fa 100644 --- a/_returners/splunk_nova_return.py +++ b/_returners/splunk_nova_return.py @@ -3,7 +3,6 @@ HubbleStack Nova-to-Splunk returner :maintainer: HubbleStack -:maturity: 2016.10.4 :platform: All :requires: SaltStack @@ -13,13 +12,12 @@ .. code-block:: yaml hubblestack: - nova: - returner: - splunk: - token: - indexer: - sourcetype: - index: + returner: + splunk: + - token: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX + indexer: splunk-indexer.domain.tld + index: hubble + sourcetype_nova: hubble_audit You can also add an `custom_fields` argument which is a list of keys to add to events with using the results of config.get(). These new keys will be prefixed @@ -30,13 +28,12 @@ .. code-block:: yaml hubblestack: - nova: - returner: - splunk: - token: - indexer: - sourcetype: - index: + returner: + splunk: + - token: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX + indexer: splunk-indexer.domain.tld + index: hubble + sourcetype_nova: hubble_audit custom_fields: - site - product_group @@ -62,15 +59,7 @@ def returner(ret): - # Customized to split up the queries and extract the correct sourcetype - opts = _get_options() - logging.info('Options: %s' % json.dumps(opts)) - http_event_collector_key = opts['token'] - http_event_collector_host = opts['indexer'] - hec_ssl = opts['http_event_server_ssl'] - proxy = opts['proxy'] - timeout = opts['timeout'] - custom_fields = opts['custom_fields'] + opts_list = _get_options() # Gather amazon information if present aws_ami_id = None @@ -87,131 +76,141 @@ def returner(ret): # Not on an AWS box pass - # Set up the collector - hec = http_event_collector(http_event_collector_key, http_event_collector_host, http_event_server_ssl=hec_ssl, proxy=proxy, timeout=timeout) - # st = 'salt:hubble:nova' - data = ret['return'] - minion_id = ret['id'] - jid = ret['jid'] - fqdn = __grains__['fqdn'] - # Sometimes fqdn is blank. If it is, replace it with minion_id - fqdn = fqdn if fqdn else minion_id - master = __grains__['master'] - try: - fqdn_ip4 = __grains__['fqdn_ip4'][0] - except IndexError: - fqdn_ip4 = __grains__['ipv4'][0] - - if __grains__['master']: + for opts in opts_list: + logging.info('Options: %s' % json.dumps(opts)) + http_event_collector_key = opts['token'] + http_event_collector_host = opts['indexer'] + hec_ssl = opts['http_event_server_ssl'] + proxy = opts['proxy'] + timeout = opts['timeout'] + custom_fields = opts['custom_fields'] + + + # Set up the collector + hec = http_event_collector(http_event_collector_key, http_event_collector_host, http_event_server_ssl=hec_ssl, proxy=proxy, timeout=timeout) + # st = 'salt:hubble:nova' + data = ret['return'] + minion_id = ret['id'] + jid = ret['jid'] + fqdn = __grains__['fqdn'] + # Sometimes fqdn is blank. If it is, replace it with minion_id + fqdn = fqdn if fqdn else minion_id master = __grains__['master'] - else: - master = socket.gethostname() # We *are* the master, so use our hostname - - for fai in data.get('Failure', []): - check_id = fai.keys()[0] - payload = {} - event = {} - event.update({'check_result': 'Failure'}) - event.update({'check_id': check_id}) - event.update({'job_id': jid}) - if not isinstance(fai[check_id], dict): - event.update({'description': fai[check_id]}) - elif 'description' in fai[check_id]: - for key, value in fai[check_id].iteritems(): - if key not in ['tag']: - event[key] = value - event.update({'master': master}) - event.update({'minion_id': minion_id}) - event.update({'dest_host': fqdn}) - event.update({'dest_ip': fqdn_ip4}) - - if aws_instance_id is not None: - event.update({'aws_ami_id': aws_ami_id}) - event.update({'aws_instance_id': aws_instance_id}) - - for custom_field in custom_fields: - custom_field_name = 'custom_' + custom_field - custom_field_value = __salt__['config.get'](custom_field, '') - if isinstance(custom_field_value, str): - event.update({custom_field_name: custom_field_value}) - elif isinstance(custom_field_value, list): - custom_field_value = ','.join(custom_field_value) - event.update({custom_field_name: custom_field_value}) - - payload.update({'host': fqdn}) - payload.update({'index': opts['index']}) - payload.update({'sourcetype': opts['sourcetype']}) - payload.update({'event': event}) - hec.batchEvent(payload) - - for suc in data.get('Success', []): - check_id = suc.keys()[0] - payload = {} - event = {} - event.update({'check_result': 'Success'}) - event.update({'check_id': check_id}) - event.update({'job_id': jid}) - if not isinstance(suc[check_id], dict): - event.update({'description': suc[check_id]}) - elif 'description' in suc[check_id]: - for key, value in suc[check_id].iteritems(): - if key not in ['tag']: - event[key] = value - event.update({'master': master}) - event.update({'minion_id': minion_id}) - event.update({'dest_host': fqdn}) - event.update({'dest_ip': fqdn_ip4}) - - if aws_instance_id is not None: - event.update({'aws_ami_id': aws_ami_id}) - event.update({'aws_instance_id': aws_instance_id}) - event.update({'aws_account_id': aws_account_id}) - - for custom_field in custom_fields: - custom_field_name = 'custom_' + custom_field - custom_field_value = __salt__['config.get'](custom_field, '') - if isinstance(custom_field_value, str): - event.update({custom_field_name: custom_field_value}) - elif isinstance(custom_field_value, list): - custom_field_value = ','.join(custom_field_value) - event.update({custom_field_name: custom_field_value}) - - payload.update({'host': fqdn}) - payload.update({'sourcetype': opts['sourcetype']}) - payload.update({'index': opts['index']}) - payload.update({'event': event}) - hec.batchEvent(payload) - - if data.get('Compliance', None): - payload = {} - event = {} - event.update({'job_id': jid}) - event.update({'compliance_percentage': data['Compliance']}) - event.update({'master': master}) - event.update({'minion_id': minion_id}) - event.update({'dest_host': fqdn}) - event.update({'dest_ip': fqdn_ip4}) - - if aws_instance_id is not None: - event.update({'aws_ami_id': aws_ami_id}) - event.update({'aws_instance_id': aws_instance_id}) - - for custom_field in custom_fields: - custom_field_name = 'custom_' + custom_field - custom_field_value = __salt__['config.get'](custom_field, '') - if isinstance(custom_field_value, str): - event.update({custom_field_name: custom_field_value}) - elif isinstance(custom_field_value, list): - custom_field_value = ','.join(custom_field_value) - event.update({custom_field_name: custom_field_value}) - - payload.update({'host': fqdn}) - payload.update({'sourcetype': opts['sourcetype']}) - payload.update({'index': opts['index']}) - payload.update({'event': event}) - hec.batchEvent(payload) + try: + fqdn_ip4 = __grains__['fqdn_ip4'][0] + except IndexError: + fqdn_ip4 = __grains__['ipv4'][0] - hec.flushBatch() + if __grains__['master']: + master = __grains__['master'] + else: + master = socket.gethostname() # We *are* the master, so use our hostname + + for fai in data.get('Failure', []): + check_id = fai.keys()[0] + payload = {} + event = {} + event.update({'check_result': 'Failure'}) + event.update({'check_id': check_id}) + event.update({'job_id': jid}) + if not isinstance(fai[check_id], dict): + event.update({'description': fai[check_id]}) + elif 'description' in fai[check_id]: + for key, value in fai[check_id].iteritems(): + if key not in ['tag']: + event[key] = value + event.update({'master': master}) + event.update({'minion_id': minion_id}) + event.update({'dest_host': fqdn}) + event.update({'dest_ip': fqdn_ip4}) + + if aws_instance_id is not None: + event.update({'aws_ami_id': aws_ami_id}) + event.update({'aws_instance_id': aws_instance_id}) + + for custom_field in custom_fields: + custom_field_name = 'custom_' + custom_field + custom_field_value = __salt__['config.get'](custom_field, '') + if isinstance(custom_field_value, str): + event.update({custom_field_name: custom_field_value}) + elif isinstance(custom_field_value, list): + custom_field_value = ','.join(custom_field_value) + event.update({custom_field_name: custom_field_value}) + + payload.update({'host': fqdn}) + payload.update({'index': opts['index']}) + payload.update({'sourcetype': opts['sourcetype']}) + payload.update({'event': event}) + hec.batchEvent(payload) + + for suc in data.get('Success', []): + check_id = suc.keys()[0] + payload = {} + event = {} + event.update({'check_result': 'Success'}) + event.update({'check_id': check_id}) + event.update({'job_id': jid}) + if not isinstance(suc[check_id], dict): + event.update({'description': suc[check_id]}) + elif 'description' in suc[check_id]: + for key, value in suc[check_id].iteritems(): + if key not in ['tag']: + event[key] = value + event.update({'master': master}) + event.update({'minion_id': minion_id}) + event.update({'dest_host': fqdn}) + event.update({'dest_ip': fqdn_ip4}) + + if aws_instance_id is not None: + event.update({'aws_ami_id': aws_ami_id}) + event.update({'aws_instance_id': aws_instance_id}) + event.update({'aws_account_id': aws_account_id}) + + for custom_field in custom_fields: + custom_field_name = 'custom_' + custom_field + custom_field_value = __salt__['config.get'](custom_field, '') + if isinstance(custom_field_value, str): + event.update({custom_field_name: custom_field_value}) + elif isinstance(custom_field_value, list): + custom_field_value = ','.join(custom_field_value) + event.update({custom_field_name: custom_field_value}) + + payload.update({'host': fqdn}) + payload.update({'sourcetype': opts['sourcetype']}) + payload.update({'index': opts['index']}) + payload.update({'event': event}) + hec.batchEvent(payload) + + if data.get('Compliance', None): + payload = {} + event = {} + event.update({'job_id': jid}) + event.update({'compliance_percentage': data['Compliance']}) + event.update({'master': master}) + event.update({'minion_id': minion_id}) + event.update({'dest_host': fqdn}) + event.update({'dest_ip': fqdn_ip4}) + + if aws_instance_id is not None: + event.update({'aws_ami_id': aws_ami_id}) + event.update({'aws_instance_id': aws_instance_id}) + + for custom_field in custom_fields: + custom_field_name = 'custom_' + custom_field + custom_field_value = __salt__['config.get'](custom_field, '') + if isinstance(custom_field_value, str): + event.update({custom_field_name: custom_field_value}) + elif isinstance(custom_field_value, list): + custom_field_value = ','.join(custom_field_value) + event.update({custom_field_name: custom_field_value}) + + payload.update({'host': fqdn}) + payload.update({'sourcetype': opts['sourcetype']}) + payload.update({'index': opts['index']}) + payload.update({'event': event}) + hec.batchEvent(payload) + + hec.flushBatch() return @@ -235,22 +234,40 @@ def event_return(event): def _get_options(): - try: - token = __salt__['config.get']('hubblestack:nova:returner:splunk:token').strip() - indexer = __salt__['config.get']('hubblestack:nova:returner:splunk:indexer') - sourcetype = __salt__['config.get']('hubblestack:nova:returner:splunk:sourcetype') - index = __salt__['config.get']('hubblestack:nova:returner:splunk:index') - custom_fields = __salt__['config.get']('hubblestack:nebula:returner:splunk:custom_fields', []) - except: - return None - splunk_opts = {'token': token, 'indexer': indexer, 'sourcetype': sourcetype, 'index': index, 'custom_fields': custom_fields} - - hec_ssl = __salt__['config.get']('hubblestack:nova:returner:splunk:hec_ssl', True) - splunk_opts['http_event_server_ssl'] = hec_ssl - splunk_opts['proxy'] = __salt__['config.get']('hubblestack:nova:returner:splunk:proxy', {}) - splunk_opts['timeout'] = __salt__['config.get']('hubblestack:nova:returner:splunk:timeout', 9.05) - - return splunk_opts + if __salt__['config.get']('hubblestack:returner:splunk'): + splunk_opts = [] + returner_opts = __salt__['config.get']('hubblestack:returner:splunk') + if not isinstance(returner_opts, list): + returner_opts = [returner_opts] + for opt in returner_opts: + processed = {} + processed['token'] = opt.get('token') + processed['indexer'] = opt.get('indexer') + processed['index'] = opt.get('index') + processed['custom_fields'] = opt.get('custom_fields', []) + processed['sourcetype'] = opt.get('sourcetype_nova', 'hubble_audit') + processed['http_event_server_ssl'] = opt.get('hec_ssl', True) + processed['proxy'] = opt.get('proxy', {}) + processed['timeout'] = opt.get('timeout', 9.05) + splunk_opts.append(processed) + return splunk_opts + else: + try: + token = __salt__['config.get']('hubblestack:nova:returner:splunk:token').strip() + indexer = __salt__['config.get']('hubblestack:nova:returner:splunk:indexer') + sourcetype = __salt__['config.get']('hubblestack:nova:returner:splunk:sourcetype') + index = __salt__['config.get']('hubblestack:nova:returner:splunk:index') + custom_fields = __salt__['config.get']('hubblestack:nebula:returner:splunk:custom_fields', []) + except: + return None + splunk_opts = {'token': token, 'indexer': indexer, 'sourcetype': sourcetype, 'index': index, 'custom_fields': custom_fields} + + hec_ssl = __salt__['config.get']('hubblestack:nova:returner:splunk:hec_ssl', True) + splunk_opts['http_event_server_ssl'] = hec_ssl + splunk_opts['proxy'] = __salt__['config.get']('hubblestack:nova:returner:splunk:proxy', {}) + splunk_opts['timeout'] = __salt__['config.get']('hubblestack:nova:returner:splunk:timeout', 9.05) + + return [splunk_opts] def send_splunk(event, index_override=None, sourcetype_override=None): diff --git a/_returners/splunk_pulsar_return.py b/_returners/splunk_pulsar_return.py index b4ef95b..194fbbb 100644 --- a/_returners/splunk_pulsar_return.py +++ b/_returners/splunk_pulsar_return.py @@ -3,7 +3,6 @@ HubbleStack Pulsar-to-Splunk returner :maintainer: HubbleStack -:maturity: 2016.10.4 :platform: All :requires: SaltStack @@ -13,13 +12,12 @@ .. code-block:: yaml hubblestack: - pulsar: - returner: - splunk: - token: - indexer: - sourcetype: - index: + returner: + splunk: + - token: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX + indexer: splunk-indexer.domain.tld + index: hubble + sourcetype_pulsar: hubble_fim You can also add an `custom_fields` argument which is a list of keys to add to events with using the results of config.get(). These new keys will be prefixed @@ -30,13 +28,12 @@ .. code-block:: yaml hubblestack: - pulsar: - returner: - splunk: - token: - indexer: - sourcetype: - index: + returner: + splunk: + - token: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX + indexer: splunk-indexer.domain.tld + index: hubble + sourcetype_pulsar: hubble_fim custom_fields: - site - product_group @@ -68,15 +65,8 @@ def returner(ret): if isinstance(ret, dict) and not ret.get('return'): # Empty single return, let's not do any setup or anything return - # Customized to split up the change events and send to Splunk. - opts = _get_options() - logging.info('Options: %s' % json.dumps(opts)) - http_event_collector_key = opts['token'] - http_event_collector_host = opts['indexer'] - hec_ssl = opts['http_event_server_ssl'] - proxy = opts['proxy'] - timeout = opts['timeout'] - custom_fields = opts['custom_fields'] + + opts_list = _get_options() # Gather amazon information if present aws_ami_id = None @@ -93,147 +83,156 @@ def returner(ret): # Not on an AWS box pass - # Set up the collector - hec = http_event_collector(http_event_collector_key, http_event_collector_host, http_event_server_ssl=hec_ssl, proxy=proxy, timeout=timeout) - # Check whether or not data is batched: - if isinstance(ret, dict): # Batching is disabled - data = [ret] - else: - data = ret - # Sometimes there are duplicate events in the list. Dedup them: - data = _dedupList(data) - minion_id = __opts__['id'] - fqdn = __grains__['fqdn'] - # Sometimes fqdn is blank. If it is, replace it with minion_id - fqdn = fqdn if fqdn else minion_id - master = __grains__['master'] - try: - fqdn_ip4 = __grains__['fqdn_ip4'][0] - except IndexError: - fqdn_ip4 = __grains__['ipv4'][0] - - alerts = [] - for item in data: - events = item['return'] - if not isinstance(events, list): - events = [events] - alerts.extend(events) - - for alert in alerts: - event = {} - payload = {} - if('change' in alert): # Linux, normal pulsar - # The second half of the change will be '|IN_ISDIR' for directories - change = alert['change'].split('|')[0] - # Skip the IN_IGNORED events - if change == 'IN_IGNORED': - continue - if len(alert['change'].split('|')) == 2: - object_type = 'directory' - else: - object_type = 'file' - - actions = defaultdict(lambda: 'unknown') - actions['IN_ACCESS'] = 'read' - actions['IN_ATTRIB'] = 'acl_modified' - actions['IN_CLOSE_NOWRITE'] = 'read' - actions['IN_CLOSE_WRITE'] = 'read' - actions['IN_CREATE'] = 'created' - actions['IN_DELETE'] = 'deleted' - actions['IN_DELETE_SELF'] = 'deleted' - actions['IN_MODIFY'] = 'modified' - actions['IN_MOVE_SELF'] = 'modified' - actions['IN_MOVED_FROM'] = 'modified' - actions['IN_MOVED_TO'] = 'modified' - actions['IN_OPEN'] = 'read' - actions['IN_MOVE'] = 'modified' - actions['IN_CLOSE'] = 'read' - - event['action'] = actions[change] - event['change_type'] = 'filesystem' - event['object_category'] = object_type - event['object_path'] = alert['path'] - event['file_name'] = alert['name'] - event['file_path'] = alert['tag'] - - if alert['stats']: # Gather more data if the change wasn't a delete - stats = alert['stats'] - event['object_id'] = stats['inode'] - event['file_acl'] = stats['mode'] - event['file_create_time'] = stats['ctime'] - event['file_modify_time'] = stats['mtime'] - event['file_size'] = stats['size'] / 1024.0 # Convert bytes to kilobytes - event['user'] = stats['user'] - event['group'] = stats['group'] - if object_type == 'file': - event['file_hash'] = alert['checksum'] - event['file_hash_type'] = alert['checksum_type'] - - else: # Windows, win_pulsar - change = alert['Accesses'] - if alert['Hash'] == 'Item is a directory': - object_type = 'directory' - else: - object_type = 'file' - - actions = defaultdict(lambda: 'unknown') - actions['Delete'] = 'deleted' - actions['Read Control'] = 'read' - actions['Write DAC'] = 'acl_modified' - actions['Write Owner'] = 'modified' - actions['Synchronize'] = 'modified' - actions['Access Sys Sec'] = 'read' - actions['Read Data'] = 'read' - actions['Write Data'] = 'modified' - actions['Append Data'] = 'modified' - actions['Read EA'] = 'read' - actions['Write EA'] = 'modified' - actions['Execute/Traverse'] = 'read' - actions['Read Attributes'] = 'read' - actions['Write Attributes'] = 'acl_modified' - actions['Query Key Value'] = 'read' - actions['Set Key Value'] = 'modified' - actions['Create Sub Key'] = 'created' - actions['Enumerate Sub-Keys'] = 'read' - actions['Notify About Changes to Keys'] = 'read' - actions['Create Link'] = 'created' - actions['Print'] = 'read' - - event['action'] = actions[change] - event['change_type'] = 'filesystem' - event['object_category'] = object_type - event['object_path'] = alert['Object Name'] - event['file_name'] = os.path.basename(alert['Object Name']) - event['file_path'] = os.path.dirname(alert['Object Name']) - # TODO: Should we be reporting 'EntryType' or 'TimeGenerated? - # EntryType reports whether attempt to change was successful. - - event.update({'master': master}) - event.update({'minion_id': minion_id}) - event.update({'dest_host': fqdn}) - event.update({'dest_ip': fqdn_ip4}) - - if aws_instance_id is not None: - event.update({'aws_ami_id': aws_ami_id}) - event.update({'aws_instance_id': aws_instance_id}) - event.update({'aws_account_id': aws_account_id}) - - for custom_field in custom_fields: - custom_field_name = 'custom_' + custom_field - custom_field_value = __salt__['config.get'](custom_field, '') - if isinstance(custom_field_value, str): - event.update({custom_field_name: custom_field_value}) - elif isinstance(custom_field_value, list): - custom_field_value = ','.join(custom_field_value) - event.update({custom_field_name: custom_field_value}) - - payload.update({'host': fqdn}) - payload.update({'index': opts['index']}) - payload.update({'sourcetype': opts['sourcetype']}) - payload.update({'event': event}) - hec.batchEvent(payload) - - hec.flushBatch() + for opts in opts_list: + logging.info('Options: %s' % json.dumps(opts)) + http_event_collector_key = opts['token'] + http_event_collector_host = opts['indexer'] + hec_ssl = opts['http_event_server_ssl'] + proxy = opts['proxy'] + timeout = opts['timeout'] + custom_fields = opts['custom_fields'] + + # Set up the collector + hec = http_event_collector(http_event_collector_key, http_event_collector_host, http_event_server_ssl=hec_ssl, proxy=proxy, timeout=timeout) + # Check whether or not data is batched: + if isinstance(ret, dict): # Batching is disabled + data = [ret] + else: + data = ret + # Sometimes there are duplicate events in the list. Dedup them: + data = _dedupList(data) + minion_id = __opts__['id'] + fqdn = __grains__['fqdn'] + # Sometimes fqdn is blank. If it is, replace it with minion_id + fqdn = fqdn if fqdn else minion_id + master = __grains__['master'] + try: + fqdn_ip4 = __grains__['fqdn_ip4'][0] + except IndexError: + fqdn_ip4 = __grains__['ipv4'][0] + + alerts = [] + for item in data: + events = item['return'] + if not isinstance(events, list): + events = [events] + alerts.extend(events) + + for alert in alerts: + event = {} + payload = {} + if('change' in alert): # Linux, normal pulsar + # The second half of the change will be '|IN_ISDIR' for directories + change = alert['change'].split('|')[0] + # Skip the IN_IGNORED events + if change == 'IN_IGNORED': + continue + if len(alert['change'].split('|')) == 2: + object_type = 'directory' + else: + object_type = 'file' + + actions = defaultdict(lambda: 'unknown') + actions['IN_ACCESS'] = 'read' + actions['IN_ATTRIB'] = 'acl_modified' + actions['IN_CLOSE_NOWRITE'] = 'read' + actions['IN_CLOSE_WRITE'] = 'read' + actions['IN_CREATE'] = 'created' + actions['IN_DELETE'] = 'deleted' + actions['IN_DELETE_SELF'] = 'deleted' + actions['IN_MODIFY'] = 'modified' + actions['IN_MOVE_SELF'] = 'modified' + actions['IN_MOVED_FROM'] = 'modified' + actions['IN_MOVED_TO'] = 'modified' + actions['IN_OPEN'] = 'read' + actions['IN_MOVE'] = 'modified' + actions['IN_CLOSE'] = 'read' + + event['action'] = actions[change] + event['change_type'] = 'filesystem' + event['object_category'] = object_type + event['object_path'] = alert['path'] + event['file_name'] = alert['name'] + event['file_path'] = alert['tag'] + + if alert['stats']: # Gather more data if the change wasn't a delete + stats = alert['stats'] + event['object_id'] = stats['inode'] + event['file_acl'] = stats['mode'] + event['file_create_time'] = stats['ctime'] + event['file_modify_time'] = stats['mtime'] + event['file_size'] = stats['size'] / 1024.0 # Convert bytes to kilobytes + event['user'] = stats['user'] + event['group'] = stats['group'] + if object_type == 'file': + event['file_hash'] = alert['checksum'] + event['file_hash_type'] = alert['checksum_type'] + + else: # Windows, win_pulsar + change = alert['Accesses'] + if alert['Hash'] == 'Item is a directory': + object_type = 'directory' + else: + object_type = 'file' + + actions = defaultdict(lambda: 'unknown') + actions['Delete'] = 'deleted' + actions['Read Control'] = 'read' + actions['Write DAC'] = 'acl_modified' + actions['Write Owner'] = 'modified' + actions['Synchronize'] = 'modified' + actions['Access Sys Sec'] = 'read' + actions['Read Data'] = 'read' + actions['Write Data'] = 'modified' + actions['Append Data'] = 'modified' + actions['Read EA'] = 'read' + actions['Write EA'] = 'modified' + actions['Execute/Traverse'] = 'read' + actions['Read Attributes'] = 'read' + actions['Write Attributes'] = 'acl_modified' + actions['Query Key Value'] = 'read' + actions['Set Key Value'] = 'modified' + actions['Create Sub Key'] = 'created' + actions['Enumerate Sub-Keys'] = 'read' + actions['Notify About Changes to Keys'] = 'read' + actions['Create Link'] = 'created' + actions['Print'] = 'read' + + event['action'] = actions[change] + event['change_type'] = 'filesystem' + event['object_category'] = object_type + event['object_path'] = alert['Object Name'] + event['file_name'] = os.path.basename(alert['Object Name']) + event['file_path'] = os.path.dirname(alert['Object Name']) + # TODO: Should we be reporting 'EntryType' or 'TimeGenerated? + # EntryType reports whether attempt to change was successful. + + event.update({'master': master}) + event.update({'minion_id': minion_id}) + event.update({'dest_host': fqdn}) + event.update({'dest_ip': fqdn_ip4}) + + if aws_instance_id is not None: + event.update({'aws_ami_id': aws_ami_id}) + event.update({'aws_instance_id': aws_instance_id}) + event.update({'aws_account_id': aws_account_id}) + + for custom_field in custom_fields: + custom_field_name = 'custom_' + custom_field + custom_field_value = __salt__['config.get'](custom_field, '') + if isinstance(custom_field_value, str): + event.update({custom_field_name: custom_field_value}) + elif isinstance(custom_field_value, list): + custom_field_value = ','.join(custom_field_value) + event.update({custom_field_name: custom_field_value}) + + payload.update({'host': fqdn}) + payload.update({'index': opts['index']}) + payload.update({'sourcetype': opts['sourcetype']}) + payload.update({'event': event}) + hec.batchEvent(payload) + + hec.flushBatch() return @@ -246,22 +245,40 @@ def _dedupList(l): def _get_options(): - try: - token = __salt__['config.get']('hubblestack:pulsar:returner:splunk:token').strip() - indexer = __salt__['config.get']('hubblestack:pulsar:returner:splunk:indexer') - sourcetype = __salt__['config.get']('hubblestack:pulsar:returner:splunk:sourcetype') - index = __salt__['config.get']('hubblestack:pulsar:returner:splunk:index') - custom_fields = __salt__['config.get']('hubblestack:nebula:returner:splunk:custom_fields', []) - except: - return None - splunk_opts = {'token': token, 'indexer': indexer, 'sourcetype': sourcetype, 'index': index, 'custom_fields': custom_fields} - - hec_ssl = __salt__['config.get']('hubblestack:pulsar:returner:splunk:hec_ssl', True) - splunk_opts['http_event_server_ssl'] = hec_ssl - splunk_opts['proxy'] = __salt__['config.get']('hubblestack:pulsar:returner:splunk:proxy', {}) - splunk_opts['timeout'] = __salt__['config.get']('hubblestack:pulsar:returner:splunk:timeout', 9.05) - - return splunk_opts + if __salt__['config.get']('hubblestack:returner:splunk'): + splunk_opts = [] + returner_opts = __salt__['config.get']('hubblestack:returner:splunk') + if not isinstance(returner_opts, list): + returner_opts = [returner_opts] + for opt in returner_opts: + processed = {} + processed['token'] = opt.get('token') + processed['indexer'] = opt.get('indexer') + processed['index'] = opt.get('index') + processed['custom_fields'] = opt.get('custom_fields', []) + processed['sourcetype'] = opt.get('sourcetype_pulsar', 'hubble_fim') + processed['http_event_server_ssl'] = opt.get('hec_ssl', True) + processed['proxy'] = opt.get('proxy', {}) + processed['timeout'] = opt.get('timeout', 9.05) + splunk_opts.append(processed) + return splunk_opts + else: + try: + token = __salt__['config.get']('hubblestack:pulsar:returner:splunk:token').strip() + indexer = __salt__['config.get']('hubblestack:pulsar:returner:splunk:indexer') + sourcetype = __salt__['config.get']('hubblestack:pulsar:returner:splunk:sourcetype') + index = __salt__['config.get']('hubblestack:pulsar:returner:splunk:index') + custom_fields = __salt__['config.get']('hubblestack:nebula:returner:splunk:custom_fields', []) + except: + return None + splunk_opts = {'token': token, 'indexer': indexer, 'sourcetype': sourcetype, 'index': index, 'custom_fields': custom_fields} + + hec_ssl = __salt__['config.get']('hubblestack:pulsar:returner:splunk:hec_ssl', True) + splunk_opts['http_event_server_ssl'] = hec_ssl + splunk_opts['proxy'] = __salt__['config.get']('hubblestack:pulsar:returner:splunk:proxy', {}) + splunk_opts['timeout'] = __salt__['config.get']('hubblestack:pulsar:returner:splunk:timeout', 9.05) + + return [splunk_opts] # Thanks to George Starcher for the http_event_collector class (https://github.com/georgestarcher/)