diff --git a/CHANGELOG.md b/CHANGELOG.md index c7cb6887bb..99c43214ee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ ### Core - `intelmq.lib.utils.drop_privileges`: When IntelMQ is called as `root` and dropping the privileges to user `intelmq`, also set the non-primary groups associated with the `intelmq` user. Makes the behaviour of running intelmqctl as `root` closer to the behaviour of `sudo -u intelmq ...` (PR#2507 by Mikk Margus Möll). - `intelmq.lib.utils.unzip`: Ignore directories themselves when extracting data to prevent the extraction of empty data for a directory entries (PR#2512 by Kamil Mankowski). +- `intelmq.lib.mixins.cache.CacheMixin` was extended to support temporary storing messages in a cache queue + (PR#2509 by Kamil Mankowski). ### Development @@ -43,7 +45,13 @@ - Treat value `false` for parameter `filter_regex` as false (PR#2499 by Sebastian Wagner). #### Outputs -- `intelmq.bots.outputs.misp.output_feed`: Handle failures if saved current event wasn't saved or is incorrect (PR by Kamil Mankowski). +- `intelmq.bots.outputs.misp.output_feed`: + - Handle failures if saved current event wasn't saved or is incorrect (PR by Kamil Mankowski). + - Allow saving messages in bulks instead of refreshing the feed immediately (PR#2509 by Kamil Mankowski). + - Add `attribute_mapping` parameter to allow selecting a subset of event attributes as well as additional attribute parameters (PR#2509 by Kamil Mankowski). + - Add `event_separator` parameter to allow keeping IntelMQ events in separated MISP Events based on a given field (PR#2509 by Kamil Mankowski). + - Add `tagging` parameter to allow adding tags to MISP events (PR#2509 by Kamil Mankowski). + - Add `additional_info` parameter to extend the default description of MISP Events (PR#2509 by Kamil Mankowski). - `intelmq.bots.outputs.smtp_batch.output`: Documentation on multiple recipients added (PR#2501 by Edvard Rejthar). ### Documentation diff --git a/docs/dev/bot-development.md b/docs/dev/bot-development.md index 39253c8cf8..89d6414f65 100644 --- a/docs/dev/bot-development.md +++ b/docs/dev/bot-development.md @@ -197,7 +197,7 @@ The `CacheMixin` provides methods to cache values for bots in a Redis database. - `redis_cache_ttl: int = 15` - `redis_cache_password: Optional[str] = None` -and provides the methods: +and provides the methods to cache key-value pairs: - `cache_exists` - `cache_get` @@ -205,6 +205,18 @@ and provides the methods: - `cache_flush` - `cache_get_redis_instance` +and following methods to cache objects in a queue: + +- `cache_put` +- `cache_pop` +- `cache_length`. + +Caching key-value pairs and queue caching are two separated mechanisms. The first is designed + for arbitrary values, the second one is focused on temporary storing messages (but can handle other + data). You won't see caches from one in the another. For example, if adding a key-value pair using + `cache_set`, it does not change the value from `cache_length`, and if adding an element using + `cache_put` you cannot use `check_exists` to look for it. + ### Pipeline Interactions We can call three methods related to the pipeline: diff --git a/docs/user/bots.md b/docs/user/bots.md index 2c8ec6e9d9..c39b322376 100644 --- a/docs/user/bots.md +++ b/docs/user/bots.md @@ -4608,6 +4608,12 @@ Create a directory layout in the MISP Feed format. The PyMISP library >= 2.4.119.1 is required, see [REQUIREMENTS.txt](https://github.com/certtools/intelmq/blob/master/intelmq/bots/outputs/misp/REQUIREMENTS.txt). +Note: please test the produced feed before using in production. This bot allows you to do an +extensive customisation of the MISP feed, including creating multiple events and tags, but it can +be tricky to configure properly. Misconfiguration can prevent bot from starting or have bad +consequences for your MISP Instance (e.g. spaming with events). Use `intelmqctl check` command +to validate your configuration against common mistakes. + **Module:** `intelmq.bots.outputs.misp.output_feed` **Parameters:** @@ -4632,6 +4638,103 @@ The PyMISP library >= 2.4.119.1 is required, see () The output bot creates one event per each interval, all data in this time frame is part of this event. Default "1 hour", string. +**`bulk_save_count`** + +(optional, int) If set to a non-0 value, the bot won't refresh the MISP feed immediately, but will cache +incoming messages until the given number of them. Use it if your bot proceeds a high number of messages +and constant saving to the disk is a problem. Reloading or restarting bot as well as generating +a new MISP event based on `interval_event` triggers regenerating MISP feed regardless of the cache size. + +**`attribute_mapping`** + +(optional, dict) If set, allows selecting which IntelMQ event fields are mapped to MISP attributes +as well as attribute parameters (like e.g. a comment). The expected format is a *dictionary of dictionaries*: +first-level key represents an IntelMQ field that will be directly translated to a MISP attribute; nested +dictionary represents additional parameters PyMISP can take when creating an attribute. They can use +names of other IntelMQ fields (then the value of such field will be used), or static values. If not needed, +leave empty dict. + +For available attribute parameters, refer to the +[PyMISP documentation](https://pymisp.readthedocs.io/en/latest/_modules/pymisp/mispevent.html#MISPObjectAttribute) +for the `MISPObjectAttribute`. + +For example: + +```yaml +attribute_mapping: + source.ip: + feed.name: + comment: event_description.text + destination.ip: + to_ids: False +``` + +would create a MISP object with three attributes `source.ip`, `feed.name` and `destination.ip` +and set their values as in the IntelMQ event. In addition, the `feed.name` would have a comment +as given in the `event_description.text` from IntelMQ event, and `destination.ip` would be set +as not usable for IDS. + +**`event_separator` + +(optional, string): If set to a field name from IntelMQ event, the bot will work in parallel on a few +events instead of saving all incoming messages to a one. Each unique value from the field will +use its own MISP Event. This is useful if your feed provides data about multiple entities you would +like to group, for example IPs of C2 servers from different botnets. For a given value, the bot will +use the same MISP Event as long as it's allowed by the `interval_event`. + +**`additional_info` + +(optional, string): If set, the generated MISP Event will use it in the `info` field of the event, +in addition to the standard IntelMQ description with the time frame (you cannot remove it as the bot +depends of datetimes saved there). If you use `event_separator`, you may want to use `{separator}` +placeholder which will be then replaced with the value of the separator. + +For example, the following configuration can be used to create MISP Feed with IPs of C2 servers +of different botnets, having each botnet in a separated MISP Events with an appropriate description. +Each MISP Event will contain objects with the `source.ip` field only, and the events' info will look +like *C2 Servers for botnet-1. IntelMQ event 2024-07-09T14:51:10.825123 - 2024-07-10T14:51:10.825123* + +```yaml +event_separator: malware.name +additional_info: C2 Servers for {separator}. +attribute_mapping: + source.ip: +``` + +**`tagging` + +(optional, dict): Allows setting MISP tags to MISP events. The structure is a *dict of list of dicts*. +The keys refers to which MISP events you want to tag. If you want to tag all of them, use `__all__`. +If you use `event_separator` and want to add additional tags to some events, use the expected values +of the separation field. The *list of dicts* defines MISP tags as parameters to create `MISPTag` +objects from. Each dictionary has to have at least `name`. For all available parameters refer to the +[PyMISP documentation](https://pymisp.readthedocs.io/en/latest/_modules/pymisp/abstract.html#MISPTag) +for `MISPTag`. + +Note: setting `name` is enough for MISP to match a correct tag from the global collection. You may +see it lacking the colour in the MISP Feed view, but it will be retriven after importing to your +instance. + +Example 1 - set two tags for every MISP event: + +```yaml +tagging: + __all__: + - name: tlp:red + - name: source:intelmq +``` + +Example 2 - create separated events based on `malware.name` and set additional family tag: + +```yaml +event_separator: malware.name +tagging: + __all__: + - name: tlp:red + njrat: + - name: njrat +``` + **Usage in MISP** Configure the destination directory of this feed as feed in MISP, either as local location, or served via a web server. diff --git a/intelmq/bots/outputs/misp/output_feed.py b/intelmq/bots/outputs/misp/output_feed.py index cbeeec09ea..beb0ded5fb 100644 --- a/intelmq/bots/outputs/misp/output_feed.py +++ b/intelmq/bots/outputs/misp/output_feed.py @@ -1,40 +1,78 @@ -# SPDX-FileCopyrightText: 2019 Sebastian Wagner +# SPDX-FileCopyrightText: 2019 Sebastian Wagner, 2024 CERT.at GmbH # # SPDX-License-Identifier: AGPL-3.0-or-later # -*- coding: utf-8 -*- import datetime import json +import re from pathlib import Path from uuid import uuid4 -import re + from intelmq.lib.bot import OutputBot from intelmq.lib.exceptions import MissingDependencyError +from intelmq.lib.message import Event, Message, MessageFactory +from intelmq.lib.mixins import CacheMixin from intelmq.lib.utils import parse_relative try: - from pymisp import MISPEvent, MISPOrganisation, NewAttributeError + from pymisp import ( + MISPEvent, + MISPObject, + MISPOrganisation, + MISPTag, + MISPObjectAttribute, + NewAttributeError, + ) from pymisp.tools import feed_meta_generator except ImportError: - # catching SyntaxError because of https://github.com/MISP/PyMISP/issues/501 MISPEvent = None - import_fail_reason = 'import' -except SyntaxError: - # catching SyntaxError because of https://github.com/MISP/PyMISP/issues/501 - MISPEvent = None - import_fail_reason = 'syntax' - -# NOTE: This module is compatible with Python 3.6+ +DEFAULT_KEY = "default" -class MISPFeedOutputBot(OutputBot): +class MISPFeedOutputBot(OutputBot, CacheMixin): """Generate an output in the MISP Feed format""" + interval_event: str = "1 hour" misp_org_name = None misp_org_uuid = None - output_dir: str = "/opt/intelmq/var/lib/bots/mispfeed-output" # TODO: should be path + output_dir: str = ( + "/opt/intelmq/var/lib/bots/mispfeed-output" # TODO: should be path + ) + # Enables regenerating the MISP feed after collecting given number of messages + bulk_save_count: int = None + + # Additional information to be added to the MISP event description + additional_info: str = None + + # An optional field used to create multiple MISP events from incoming messages + event_separator: str = None + + # Optional non-standard mapping of message fields to MISP object attributes + # The structure is like: + # {: {}} + # For example: + # {"source.ip": {"comment": "This is the source of the event"}} + # will include only the "source.ip" field in the MISP object attributes, + # and set the comment + attribute_mapping: dict = None + + # Optional definition to add tags to the MISP event. It should be a dict where keys are + # '__all__' (to add tags for every event) or, if the event_separator is used, the separator + # values. For each key, there should be a list of dicts defining parameters for the MISPTag + # object, but only the "name" is required to set. + # For example: + # {"__all__": [{"name": "tag1"}, {"name": "tag2"}]} + # will add two tags to every event + # {"infostealer": [{"name": "type:infostealer"}], "__all__": [{"name": "tag1"}]} + # will add two tags to every event separated by "infostealer", and + # one tag to every other event + tagging: dict = None + + # Delaying reloading would delay saving eventually long-awaiting messages + _sighup_delay = False _is_multithreadable: bool = False @staticmethod @@ -45,15 +83,11 @@ def check_output_dir(dirname): return True def init(self): - if MISPEvent is None and import_fail_reason == 'syntax': - raise MissingDependencyError("pymisp", - version='>=2.4.117.3', - additional_text="Python versions below 3.6 are " - "only supported by pymisp <= 2.4.119.1.") - elif MISPEvent is None: - raise MissingDependencyError('pymisp', version='>=2.4.117.3') + if MISPEvent is None: + raise MissingDependencyError("pymisp", version=">=2.4.117.3") - self.current_event = None + self.current_events = {} + self.current_files = {} self.misp_org = MISPOrganisation() self.misp_org.name = self.misp_org_name @@ -65,80 +99,364 @@ def init(self): if self.interval_event is None: self.timedelta = datetime.timedelta(hours=1) else: - self.timedelta = datetime.timedelta(minutes=parse_relative(self.interval_event)) + self.timedelta = datetime.timedelta( + minutes=parse_relative(self.interval_event) + ) + + self.min_time_current = datetime.datetime.max + self.max_time_current = datetime.datetime.min - if (self.output_dir / '.current').exists(): + if (self.output_dir / ".current").exists(): try: - with (self.output_dir / '.current').open() as f: - self.current_file = Path(f.read()) - self.current_event = MISPEvent() - self.current_event.load_file(self.current_file) - - last_min_time, last_max_time = re.findall('IntelMQ event (.*) - (.*)', self.current_event.info)[0] - last_min_time = datetime.datetime.strptime(last_min_time, '%Y-%m-%dT%H:%M:%S.%f') - last_max_time = datetime.datetime.strptime(last_max_time, '%Y-%m-%dT%H:%M:%S.%f') - if last_max_time < datetime.datetime.now(): - self.min_time_current = datetime.datetime.now() - self.max_time_current = self.min_time_current + self.timedelta - self.current_event = None + with (self.output_dir / ".current").open() as f: + current = f.read() + + if not self.event_separator: + self.current_files[DEFAULT_KEY] = Path(current) else: - self.min_time_current = last_min_time - self.max_time_current = last_max_time - except: - self.logger.exception("Loading current event %s failed. Skipping it.", self.current_event) - self.current_event = None - else: + self.current_files = { + k: Path(v) for k, v in json.loads(current).items() + } + + for key, path in self.current_files.items(): + self._load_event(path, key) + except Exception: + self.logger.exception( + "Loading current events %s failed. Skipping it.", self.current_files + ) + self.current_events = {} + + if not self.current_files or self.max_time_current < datetime.datetime.now(): self.min_time_current = datetime.datetime.now() self.max_time_current = self.min_time_current + self.timedelta + self.current_events = {} - def process(self): + self._tagging_objects = {} + if self.tagging: + for key, tag_list in self.tagging.items(): + self._tagging_objects[key] = list() + for kw in tag_list: + # For some reason, PyMISP do not uses classmethod, and from_dict requires + # unpacking. So this is really the way to initialize tag objects. + tag = MISPTag() + tag.from_dict(**kw) + self._tagging_objects[key].append(tag) + + # Ensure we do generate feed on reload / restart, so awaiting messages won't wait forever + if self.cache_length() and not getattr(self, "testing", False): + self.logger.debug( + "Found %s awaiting messages. Generating feed.", self.cache_length() + ) + self._generate_misp_feed() - if not self.current_event or datetime.datetime.now() > self.max_time_current: + def _load_event(self, file_path: Path, key: str): + if file_path.exists(): + self.current_events[key] = MISPEvent() + self.current_events[key].load_file(file_path) + + last_min_time, last_max_time = re.findall( + "IntelMQ event (.*) - (.*)", self.current_events[key].info + )[0] + last_min_time = datetime.datetime.strptime( + last_min_time, "%Y-%m-%dT%H:%M:%S.%f" + ) + last_max_time = datetime.datetime.strptime( + last_max_time, "%Y-%m-%dT%H:%M:%S.%f" + ) + + self.min_time_current = min(last_min_time, self.min_time_current) + self.max_time_current = max(last_max_time, self.max_time_current) + + def process(self): + if datetime.datetime.now() > self.max_time_current: self.min_time_current = datetime.datetime.now() self.max_time_current = self.min_time_current + self.timedelta - self.current_event = MISPEvent() - self.current_event.info = ('IntelMQ event {begin} - {end}' - ''.format(begin=self.min_time_current.isoformat(), - end=self.max_time_current.isoformat())) - self.current_event.set_date(datetime.date.today()) - self.current_event.Orgc = self.misp_org - self.current_event.uuid = str(uuid4()) - self.current_file = self.output_dir / f'{self.current_event.uuid}.json' - with (self.output_dir / '.current').open('w') as f: - f.write(str(self.current_file)) + + self._generate_misp_feed() event = self.receive_message().to_dict(jsondict_as_string=True) - obj = self.current_event.add_object(name='intelmq_event') - for object_relation, value in event.items(): + cache_size = None + if self.bulk_save_count: + cache_size = self.cache_put(event) + + if cache_size is None: + self._generate_misp_feed(event) + elif not self.current_events: + # Always create the first event so we can keep track of the interval. + self._generate_misp_feed() + elif cache_size >= self.bulk_save_count: + self._generate_misp_feed() + + self.acknowledge_message() + + def _generate_new_misp_event(self, key): + self.current_events[key] = MISPEvent() + + tags: list[MISPTag] = [] + if "__all__" in self._tagging_objects: + tags.extend(self._tagging_objects["__all__"]) + if key in self._tagging_objects: + tags.extend(self._tagging_objects[key]) + self.current_events[key].tags = tags + + info = "IntelMQ event {begin} - {end}" "".format( + begin=self.min_time_current.isoformat(), + end=self.max_time_current.isoformat(), + ) + if self.additional_info: + info = f"{self.additional_info.format(separator=key)} {info}" + + self.current_events[key].info = info + self.current_events[key].set_date(datetime.date.today()) + self.current_events[key].Orgc = self.misp_org + self.current_events[key].uuid = str(uuid4()) + self.current_files[key] = ( + self.output_dir / f"{self.current_events[key].uuid}.json" + ) + with (self.output_dir / ".current").open("w") as f: + if not self.event_separator: + f.write(str(self.current_files[key])) + else: + json.dump({k: str(v) for k, v in self.current_files.items()}, f) + return self.current_events[key] + + def _add_message_to_misp_event(self, message: dict): + # For proper handling of nested fields, we need the object + message_obj = MessageFactory.from_dict( + message, harmonization=self.harmonization, default_type="Event" + ) + if not self.event_separator: + key = DEFAULT_KEY + else: + key = message_obj.get(self.event_separator) or DEFAULT_KEY + + if key in self.current_events: + event = self.current_events[key] + else: + event = self._generate_new_misp_event(key) + + obj = event.add_object(name="intelmq_event") + # For caching and default mapping, the serialized version is the right format to work on. + # However, for any custom mapping the Message object is more sufficient as it handles + # subfields. + if not self.attribute_mapping: + self._default_mapping(obj, message) + else: + self._custom_mapping(obj, message_obj) + + def _default_mapping(self, obj: "MISPObject", message: dict): + for object_relation, value in message.items(): try: obj.add_attribute(object_relation, value=value) except NewAttributeError: # This entry isn't listed in the harmonization file, ignoring. - pass + if object_relation != "__type": + self.logger.warning( + "Object relation %s not exists in MISP definition, ignoring", + object_relation, + ) + + def _extract_misp_attribute_kwargs(self, message: dict, definition: dict) -> dict: + """Creates the a dict with arguments to create a MISPObjectAttribute.""" + result = {} + for parameter, value in definition.items(): + # Check if the value is a harmonization key or a static value + if isinstance(value, str) and ( + value in self.harmonization["event"] or + value.split(".", 1)[0] in self.harmonization["event"] + ): + result[parameter] = message.get(value) + else: + result[parameter] = value + return result + + def _custom_mapping(self, obj: "MISPObject", message: Message): + """Map the IntelMQ event to the MISP Object using the custom mapping definition.""" + for object_relation, definition in self.attribute_mapping.items(): + if object_relation in message: + obj.add_attribute( + object_relation, + value=message[object_relation], + **self._extract_misp_attribute_kwargs(message, definition), + ) + # In case of custom mapping, we want to fail if it produces incorrect values - feed_output = self.current_event.to_feed(with_meta=False) + def _generate_misp_feed(self, message: dict = None): + if message: + self._add_message_to_misp_event(message) - with self.current_file.open('w') as f: - json.dump(feed_output, f) + message = self.cache_pop() + while message: + self._add_message_to_misp_event(message) + message = self.cache_pop() + + for key, event in self.current_events.items(): + feed_output = event.to_feed(with_meta=False) + with self.current_files[key].open("w") as f: + json.dump(feed_output, f) feed_meta_generator(self.output_dir) - self.acknowledge_message() @staticmethod def check(parameters): - if 'output_dir' not in parameters: - return [["error", "Parameter 'output_dir' not given."]] - try: - created = MISPFeedOutputBot.check_output_dir(parameters['output_dir']) - except OSError: - return [["error", - "Directory %r of parameter 'output_dir' does not exist and could not be created." % parameters['output_dir']]] + results = [] + if "output_dir" not in parameters: + results.append(["error", "Parameter 'output_dir' not given."]) else: - if created: - return [["info", - "Directory %r of parameter 'output_dir' did not exist, but has now been created." - "" % parameters['output_dir']]] + try: + created = MISPFeedOutputBot.check_output_dir(parameters["output_dir"]) + except OSError: + results.append( + [ + "error", + "Directory %r of parameter 'output_dir' does not exist and could not be created." + % parameters["output_dir"], + ] + ) + else: + if created: + results.append( + [ + "info", + "Directory %r of parameter 'output_dir' did not exist, but has now been created." + "" % parameters["output_dir"], + ] + ) + + bulk_save_count = parameters.get("bulk_save_count") + if bulk_save_count and not isinstance(bulk_save_count, int): + results.append( + ["error", "Parameter 'bulk_save_count' has to be int if set."] + ) + + sanity_event = Event({}) + event_separator = parameters.get("event_separator") + if ( + event_separator and not + sanity_event._Message__is_valid_key(event_separator)[0] + ): + results.append( + [ + "error", + f"Value {event_separator} in 'event_separator' is not a valid event key.", + ] + ) + + not_feed_field_warning = ( + "Parameter '{parameter}' of {context} looks like not being a field exportable to" + " MISP Feed. It may be a valid PyMISP parameter, but won't be exported to the feed." + " Please ensure it's intended and consult PyMISP documentation at https://pymisp.readthedocs.io/" + " for valid parameters for the {object}." + ) + attribute_mapping = parameters.get("attribute_mapping") + if attribute_mapping: + if not isinstance(attribute_mapping, dict): + results.append( + ["error", "Parameter 'attribute_mapping has to be a dictionary."] + ) + else: + for key, value in attribute_mapping.items(): + if not sanity_event._Message__is_valid_key(key)[0]: + results.append( + [ + "error", + f"The key '{key}' in attribute_mapping is not a valid IDF field.", + ] + ) + if not isinstance(value, dict): + results.append( + [ + "error", + f"The config attribute_mapping['{key}'] should be a " + "dict with parameters for MISPObjectAttribute.", + ] + ) + else: + for parameter in value.keys(): + if parameter not in MISPObjectAttribute._fields_for_feed: + results.append( + [ + "warning", + not_feed_field_warning.format( + parameter=parameter, + context=f"attribute_mapping['{key}']", + object="MISPObjectAttribute", + ), + ] + ) + + tagging = parameters.get("tagging") + if tagging: + tagging_error = ( + "should be a list of dictionaries with parameters for the MISPTag object." + " Please consult PyMISP documentation at https://pymisp.readthedocs.io/" + " to find valid fields." + ) + if not isinstance(tagging, dict): + results.append( + [ + "error", + ( + "Parameter 'tagging' has to be a dictionary with keys as '__all__' " + "or possible 'event_separator' values. Each dictionary value " + + tagging_error, + ), + ] + ) + else: + if not event_separator and ( + "__all__" not in tagging or len(tagging.keys()) > 1 + ): + results.append( + [ + "error", + ( + "Tagging configuration expects custom values, but the 'event_separator'" + " parameter is not set. If you want to just tag all events, use only" + " the '__all__' key." + ), + ] + ) + for key, value in tagging.items(): + if not isinstance(value, list): + results.append( + [ + "error", + f"The config tagging['{key}'] {tagging_error}", + ] + ) + else: + for tag in value: + if not isinstance(tag, dict): + results.append( + [ + "error", + f"The config tagging['{key}'] {tagging_error}", + ] + ) + else: + if "name" not in tag: + results.append( + [ + "error", + f"The config tagging['{key}'] contains a tag without 'name'.", + ] + ) + for parameter in tag.keys(): + if parameter not in MISPTag._fields_for_feed: + results.append( + [ + "warning", + not_feed_field_warning.format( + parameter=parameter, + context=f"tagging['{key}']", + object="MISPTag", + ), + ] + ) + + return results or None BOT = MISPFeedOutputBot diff --git a/intelmq/lib/bot.py b/intelmq/lib/bot.py index 4325ecbb96..e4afe5268e 100644 --- a/intelmq/lib/bot.py +++ b/intelmq/lib/bot.py @@ -279,6 +279,10 @@ def catch_shutdown(): def harmonization(self): return self._harmonization + @property + def bot_id(self): + return self.__bot_id_full + def __handle_sigterm_signal(self, signum: int, stack: Optional[object]): """ Calls when a SIGTERM is received. Stops the bot. diff --git a/intelmq/lib/message.py b/intelmq/lib/message.py index bdbf921d26..71186d2592 100644 --- a/intelmq/lib/message.py +++ b/intelmq/lib/message.py @@ -48,6 +48,7 @@ def from_dict(message: dict, harmonization=None, MessageFactory.unserialize MessageFactory.serialize """ + if default_type and "__type" not in message: message["__type"] = default_type try: diff --git a/intelmq/lib/mixins/cache.py b/intelmq/lib/mixins/cache.py index 3cf5365023..01465ae3df 100644 --- a/intelmq/lib/mixins/cache.py +++ b/intelmq/lib/mixins/cache.py @@ -1,4 +1,4 @@ -""" CacheMixin for IntelMQ +"""CacheMixin for IntelMQ SPDX-FileCopyrightText: 2021 Sebastian Waldbauer SPDX-License-Identifier: AGPL-3.0-or-later @@ -6,12 +6,26 @@ CacheMixin is used for caching/storing data in redis. """ +import json from typing import Any, Optional import redis import intelmq.lib.utils as utils class CacheMixin: + """Provides caching possibilities for bots, see also https://docs.intelmq.org/latest/dev/bot-development/#mixins + + For key-value cache, use methods: + cache_exists + cache_get + cache_set + + To store dict elements in a cache queue named after bot id, use methods: + cache_put + cache_pop + cache_length + """ + __redis: redis.Redis = None redis_cache_host: str = "127.0.0.1" redis_cache_port: int = 6379 @@ -31,7 +45,9 @@ def __init__(self, **kwargs): "socket_timeout": 5, } - self.__redis = redis.Redis(db=self.redis_cache_db, password=self.redis_cache_password, **kwargs) + self.__redis = redis.Redis( + db=self.redis_cache_db, password=self.redis_cache_password, **kwargs + ) super().__init__() def cache_exists(self, key: str): @@ -51,6 +67,20 @@ def cache_set(self, key: str, value: Any, ttl: Optional[int] = None): if self.redis_cache_ttl: self.__redis.expire(key, self.redis_cache_ttl) + def cache_put(self, value: dict) -> int: + # Returns the length of the list after pushing + size = self.__redis.lpush(self.bot_id, json.dumps(value)) + return size + + def cache_length(self) -> int: + return self.__redis.llen(self.bot_id) + + def cache_pop(self) -> dict: + data = self.__redis.rpop(self.bot_id) + if data is None: + return None + return json.loads(data) + def cache_flush(self): """ Flushes the currently opened database by calling FLUSHDB. diff --git a/intelmq/tests/bots/outputs/misp/test_output_feed.py b/intelmq/tests/bots/outputs/misp/test_output_feed.py index 783f2bfa94..5fedb657f7 100644 --- a/intelmq/tests/bots/outputs/misp/test_output_feed.py +++ b/intelmq/tests/bots/outputs/misp/test_output_feed.py @@ -3,58 +3,387 @@ # SPDX-License-Identifier: AGPL-3.0-or-later # -*- coding: utf-8 -*- +import json import unittest -import sys +from pathlib import Path from tempfile import TemporaryDirectory import intelmq.lib.test as test from intelmq.bots.outputs.misp.output_feed import MISPFeedOutputBot +from intelmq.lib.message import MessageFactory -EXAMPLE_EVENT = {"classification.type": "infected-system", - "destination.port": 9796, - "feed.accuracy": 100.0, - "destination.ip": "52.18.196.169", - "malware.name": "salityp2p", - "event_description.text": "Sinkhole attempted connection", - "time.source": "2016-04-19T23:16:08+00:00", - "source.ip": "152.166.119.2", - "feed.url": "http://alerts.bitsighttech.com:8080/stream?", - "source.geolocation.country": "Dominican Republic", - "time.observation": "2016-04-19T23:16:08+00:00", - "source.port": 65118, - "__type": "Event", - "feed.name": "BitSight", - "extra.non_ascii": "ççãããã\x80\ua000 \164 \x80\x80 abcd \165\166", - "raw": "eyJ0cm9qYW5mYW1pbHkiOiJTYWxpdHlwMnAiLCJlbnYiOnsic" - "mVtb3RlX2FkZHIiOiIxNTIuMTY2LjExOS4yIiwicmVtb3RlX3" - "BvcnQiOiI2NTExOCIsInNlcnZlcl9hZGRyIjoiNTIuMTguMTk" - "2LjE2OSIsInNlcnZlcl9wb3J0IjoiOTc5NiJ9LCJfdHMiOjE0" - "NjExMDc3NjgsIl9nZW9fZW52X3JlbW90ZV9hZGRyIjp7ImNvd" - "W50cnlfbmFtZSI6IkRvbWluaWNhbiBSZXB1YmxpYyJ9fQ==", - "__type": "Event", - } +EXAMPLE_EVENT = { + "classification.type": "infected-system", + "destination.port": 9796, + "feed.accuracy": 100.0, + "destination.ip": "52.18.196.169", + "malware.name": "salityp2p", + "event_description.text": "Sinkhole attempted connection", + "time.source": "2016-04-19T23:16:08+00:00", + "source.ip": "152.166.119.2", + "feed.url": "http://alerts.bitsighttech.com:8080/stream?", + "source.geolocation.country": "Dominican Republic", + "time.observation": "2016-04-19T23:16:08+00:00", + "source.port": 65118, + "__type": "Event", + "feed.name": "BitSight", + "extra.non_ascii": "ççãããã\x80\ua000 \164 \x80\x80 abcd \165\166", + "raw": "eyJ0cm9qYW5mYW1pbHkiOiJTYWxpdHlwMnAiLCJlbnYiOnsic" + "mVtb3RlX2FkZHIiOiIxNTIuMTY2LjExOS4yIiwicmVtb3RlX3" + "BvcnQiOiI2NTExOCIsInNlcnZlcl9hZGRyIjoiNTIuMTguMTk" + "2LjE2OSIsInNlcnZlcl9wb3J0IjoiOTc5NiJ9LCJfdHMiOjE0" + "NjExMDc3NjgsIl9nZW9fZW52X3JlbW90ZV9hZGRyIjp7ImNvd" + "W50cnlfbmFtZSI6IkRvbWluaWNhbiBSZXB1YmxpYyJ9fQ==", + "__type": "Event", +} @test.skip_exotic() class TestMISPFeedOutputBot(test.BotTestCase, unittest.TestCase): - @classmethod def set_bot(cls): + cls.use_cache = True cls.bot_reference = MISPFeedOutputBot cls.default_input_message = EXAMPLE_EVENT - cls.directory = TemporaryDirectory() - cls.sysconfig = {"misp_org_name": 'IntelMQTestOrg', - "misp_org_uuid": "b89da4c2-0f74-11ea-96a1-6fa873a0eb4d", - "output_dir": cls.directory.name, - "interval_event": '1 hour'} + cls.sysconfig = { + "misp_org_name": "IntelMQTestOrg", + "misp_org_uuid": "b89da4c2-0f74-11ea-96a1-6fa873a0eb4d", + "interval_event": "1 hour", + } + + def setUp(self) -> None: + super().setUp() + self.directory = TemporaryDirectory() + self.sysconfig["output_dir"] = self.directory.name def test_event(self): self.run_bot() - @classmethod - def tearDownClass(cls): - cls.directory.cleanup() + current_event = open(f"{self.directory.name}/.current").read() + with open(current_event) as f: + objects = json.load(f).get("Event", {}).get("Object", []) + assert len(objects) == 1 + + def test_additional_info(self): + self.run_bot(parameters={"additional_info": "This is my custom info."}) + + current_event = open(f"{self.directory.name}/.current").read() + with open(current_event) as f: + info: str = json.load(f).get("Event", {}).get("info", "") + assert info.startswith("This is my custom info. IntelMQ event ") + + def test_additional_info_with_separator(self): + self.run_bot( + parameters={ + "additional_info": "Event related to {separator}.", + "event_separator": "malware.name", + } + ) + + current_events = json.loads(open(f"{self.directory.name}/.current").read()) + with open(current_events["salityp2p"]) as f: + info: str = json.load(f).get("Event", {}).get("info", "") + assert info.startswith("Event related to salityp2p. IntelMQ event ") + + def test_accumulating_events(self): + """Ensures bot first collects events and then saves them in bulks to MISP feed, + and also respects the event interval to create a new event periodically. + """ + self.input_message = [EXAMPLE_EVENT, EXAMPLE_EVENT] + self.run_bot(iterations=2, parameters={"bulk_save_count": 3}) + + current_event = open(f"{self.directory.name}/.current").read() + + # The first event is always immediately dumped to the MISP feed + # But the second wait until bulk saving size is achieved + with open(current_event) as f: + objects = json.load(f).get("Event", {}).get("Object", []) + assert len(objects) == 1 + + self.input_message = [EXAMPLE_EVENT, EXAMPLE_EVENT] + self.run_bot(iterations=2, parameters={"bulk_save_count": 3}) + + # When enough events were collected, save them + with open(current_event) as f: + objects = json.load(f)["Event"]["Object"] + assert len(objects) == 4 + + self.input_message = [EXAMPLE_EVENT, EXAMPLE_EVENT, EXAMPLE_EVENT] + self.run_bot(iterations=3, parameters={"bulk_save_count": 3}) + + # We continue saving to the same file until interval timeout + with open(current_event) as f: + objects = json.load(f)["Event"]["Object"] + assert len(objects) == 7 + + # Simulating leftovers in the queue when it's time to generate new event + Path(f"{self.directory.name}/.current").unlink() + self.bot.cache_put( + MessageFactory.from_dict(EXAMPLE_EVENT).to_dict(jsondict_as_string=True) + ) + self.run_bot(parameters={"bulk_save_count": 3}) + + new_event = open(f"{self.directory.name}/.current").read() + with open(new_event) as f: + objects = json.load(f)["Event"]["Object"] + assert len(objects) == 2 + + def test_attribute_mapping(self): + """Tests custom attribute mapping that selects just a subset of fields to export + and allows including custom parameters for MISPObjectAttribute, like comments.""" + self.run_bot( + parameters={ + "attribute_mapping": { + "source.ip": {}, + "feed.name": {"comment": "event_description.text"}, + "destination.ip": {"to_ids": False}, + "malware.name": {"comment": "extra.non_ascii"}, + } + } + ) + + current_event = open(f"{self.directory.name}/.current").read() + with open(current_event) as f: + objects = json.load(f).get("Event", {}).get("Object", []) + + assert len(objects) == 1 + attributes = objects[0].get("Attribute") + assert len(attributes) == 4 + source_ip = next( + attr for attr in attributes if attr.get("object_relation") == "source.ip" + ) + assert source_ip["value"] == "152.166.119.2" + assert source_ip["comment"] == "" + + feed_name = next( + attr for attr in attributes if attr.get("object_relation") == "feed.name" + ) + assert feed_name["value"] == EXAMPLE_EVENT["feed.name"] + assert feed_name["comment"] == EXAMPLE_EVENT["event_description.text"] + + destination_ip = next( + attr + for attr in attributes + if attr.get("object_relation") == "destination.ip" + ) + assert destination_ip["value"] == EXAMPLE_EVENT["destination.ip"] + assert destination_ip["to_ids"] is False + + malware_name = next( + attr for attr in attributes if attr.get("object_relation") == "malware.name" + ) + assert malware_name["value"] == EXAMPLE_EVENT["malware.name"] + assert malware_name["comment"] == EXAMPLE_EVENT["extra.non_ascii"] + + def test_attribute_mapping_omitted_when_field_is_empty(self): + self.run_bot( + parameters={ + "attribute_mapping": { + "source.ip": {}, + "source.fqdn": {}, # not exists in the message + } + } + ) + + current_event = open(f"{self.directory.name}/.current").read() + with open(current_event) as f: + objects = json.load(f).get("Event", {}).get("Object", []) + + assert len(objects) == 1 + attributes = objects[0].get("Attribute") + assert len(attributes) == 1 + source_ip = next( + attr for attr in attributes if attr.get("object_relation") == "source.ip" + ) + assert source_ip["value"] == "152.166.119.2" + + def test_event_separation(self): + """Tests that based on the value of the given field, incoming messages are put in separated + MISP events.""" + self.input_message = [ + EXAMPLE_EVENT, + {**EXAMPLE_EVENT, "malware.name": "another_malware"}, + EXAMPLE_EVENT, + ] + self.run_bot(iterations=3, parameters={"event_separator": "malware.name"}) + + current_events = json.loads(open(f"{self.directory.name}/.current").read()) + assert len(current_events) == 2 + + with open(current_events["salityp2p"]) as f: + objects = json.load(f).get("Event", {}).get("Object", []) + assert len(objects) == 2 + malware_name = next( + attr["value"] + for attr in objects[0]["Attribute"] + if attr.get("object_relation") == "malware.name" + ) + assert malware_name == "salityp2p" + + with open(current_events["another_malware"]) as f: + objects = json.load(f).get("Event", {}).get("Object", []) + assert len(objects) == 1 + malware_name = next( + attr["value"] + for attr in objects[0]["Attribute"] + if attr.get("object_relation") == "malware.name" + ) + assert malware_name == "another_malware" + + def test_event_separation_with_extra_and_bulk_save(self): + self.input_message = [ + {**EXAMPLE_EVENT, "extra.some_key": "another_malware"}, + {**EXAMPLE_EVENT, "extra.some_key": "first_malware"}, + {**EXAMPLE_EVENT, "extra.some_key": "another_malware"}, + ] + self.run_bot( + iterations=3, + parameters={"event_separator": "extra.some_key", "bulk_save_count": 3}, + ) + + # Only the initial event is saved, the rest is cached + current_events = json.loads(open(f"{self.directory.name}/.current").read()) + assert len(current_events) == 1 + with open(current_events["another_malware"]) as f: + objects = json.load(f).get("Event", {}).get("Object", []) + assert len(objects) == 1 + + self.input_message = {**EXAMPLE_EVENT, "extra.some_key": "first_malware"} + self.run_bot( + parameters={"event_separator": "extra.some_key", "bulk_save_count": 3}, + ) + + # Now everything is saved + current_events = json.loads(open(f"{self.directory.name}/.current").read()) + assert len(current_events) == 2 + with open(current_events["another_malware"]) as f: + objects = json.load(f).get("Event", {}).get("Object", []) + assert len(objects) == 2 + + with open(current_events["first_malware"]) as f: + objects = json.load(f).get("Event", {}).get("Object", []) + assert len(objects) == 2 + + def test_tagging(self): + """Ensures MISP events get correct MISP tags""" + self.run_bot( + parameters={ + "tagging": { + "__all__": [ + {"name": "tlp:unclear", "colour": "#7e7eae"}, + {"name": "source:intelmq"}, + ] + } + } + ) + + current_event = open(f"{self.directory.name}/.current").read() + with open(current_event) as f: + tags = json.load(f).get("Event", {}).get("Tag", []) + assert len(tags) == 2 + + tlp = next(t for t in tags if t["name"] == "tlp:unclear") + assert tlp["colour"] == "#7e7eae" + + def test_tagging_and_event_separation(self): + """When separating events, it is possible to add different MISP tags to specific MISP + events.""" + self.input_message = [ + EXAMPLE_EVENT, + {**EXAMPLE_EVENT, "malware.name": "another_malware"}, + ] + self.run_bot( + iterations=2, + parameters={ + "event_separator": "malware.name", + "tagging": { + "__all__": [{"name": "source:intelmq"}], + "salityp2p": [{"name": "family:salityp2p"}], + "another_malware": [{"name": "family:malware_2"}], + }, + }, + ) + + current_events = json.loads(open(f"{self.directory.name}/.current").read()) + assert len(current_events) == 2 + + with open(current_events["salityp2p"]) as f: + tags = json.load(f).get("Event", {}).get("Tag", []) + assert len(tags) == 2 + assert next(t for t in tags if t["name"] == "source:intelmq") + assert next(t for t in tags if t["name"] == "family:salityp2p") + + with open(current_events["another_malware"]) as f: + tags = json.load(f).get("Event", {}).get("Tag", []) + assert len(tags) == 2 + assert next(t for t in tags if t["name"] == "source:intelmq") + assert next(t for t in tags if t["name"] == "family:malware_2") + + def test_parameter_check_correct(self): + result = self.bot_reference.check( + { + **self.sysconfig, + "attribute_mapping": { + "source.ip": {}, + "feed.name": {"comment": "event_description.text"}, + "destination.ip": {"to_ids": False, "comment": "Possible FP"}, + "malware.name": {"comment": "extra.non_ascii"}, + }, + "event_separator": "extra.botnet", + "bulk_save_count": 10, + "tagging": { + "__all__": [{"name": "source:feed", "colour": "#000000"}], + "abotnet": [{"name": "type:botnet"}], + }, + } + ) + assert result is None + + def test_parameter_check_errors(self): + cases = [ + {"bulk_save_count": "not-a-number"}, + {"event_separator": "not-a-field"}, + {"attribute_mapping": "not-a-dict"}, + {"attribute_mapping": {"not-a-field": {}}}, + {"attribute_mapping": {"source.ip": "not-a-dict"}}, + { + "tagging": {"not-all": []} + }, # without event_separator, only __all__ is allowed + {"tagging": {"__all__": [], "other": []}}, + {"event_separator": "malware.name", "tagging": ["not", "a", "dict"]}, + { + "event_separator": "malware.name", + "tagging": {"case": "must-be-list-of-dicts"}, + }, + { + "event_separator": "malware.name", + "tagging": {"case": ["must-be-list-of-dicts"]}, + }, + { + "event_separator": "malware.name", + "tagging": {"case": [{"must": "have a name"}]}, + }, + ] + for case in cases: + with self.subTest(): + result = self.bot_reference.check({**self.sysconfig, **case}) + assert len(list(r for r in result if r[0] == "error")) == 1 + + def test_parameter_check_warnings(self): + cases = [ + {"attribute_mapping": {"source.ip": {"not-a-feed-arg": "any"}}}, + {"tagging": {"case": [{"name": "", "not-a-feed-arg": "any"}]}}, + ] + for case in cases: + with self.subTest(): + result = self.bot_reference.check({**self.sysconfig, **case}) + assert len(list(r for r in result if r[0] == "warning")) == 1 + + def tearDown(self): + self.cache.delete(self.bot_id) + self.directory.cleanup() + super().tearDown() -if __name__ == '__main__': # pragma: no cover +if __name__ == "__main__": # pragma: no cover unittest.main()