Skip to content

Commit

Permalink
[client] Handle command line for Sentinel collector when detected as …
Browse files Browse the repository at this point in the history
…a file (EICAR) (#1685)
  • Loading branch information
isselparra authored Nov 7, 2024
1 parent abed7b4 commit 56e962c
Showing 1 changed file with 48 additions and 0 deletions.
48 changes: 48 additions & 0 deletions pyobas/helpers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import base64
import json
import os
import re
import sched
import ssl
import tempfile
Expand Down Expand Up @@ -455,6 +457,11 @@ def match_alert_element_fuzzy(self, signature_value, alert_values, fuzzy_scoring
return False

def match_alert_elements(self, signatures, alert_data):
return self._match_alert_elements_original(
signatures, alert_data
) or self._match_alert_elements_for_command_line(signatures, alert_data)

def _match_alert_elements_original(self, signatures, alert_data):
# Example for alert_data
# {"process_name": {"list": ["xx", "yy"], "fuzzy": 90}}
relevant_signatures = [
Expand Down Expand Up @@ -484,3 +491,44 @@ def match_alert_elements(self, signatures, alert_data):
if signatures_number == matching_number:
return True
return False

def _match_alert_elements_for_command_line(self, signatures, alert_data):
command_line_signatures = [
signature
for signature in signatures
if signature.get("type") == "command_line"
]
if len(command_line_signatures) == 0:
return False
key_types = ["command_line", "process_name", "file_name"]
alert_datas = [alert_data.get(key) for key in key_types if key in alert_data]
for signature in command_line_signatures:
signature_result = False
signature_value = self._decode_value(signature["value"]).strip().lower()
for alert_data in alert_datas:
trimmed_lowered_datas = [s.strip().lower() for s in alert_data["data"]]
signature_result = any(
data in signature_value for data in trimmed_lowered_datas
)
if signature_result:
return True
return False

def _decode_value(self, signature_value):
if _is_base64_encoded(signature_value):
try:
decoded_bytes = base64.b64decode(signature_value)
decoded_str = decoded_bytes.decode("utf-8")
return decoded_str
except Exception as e:
self.logger.error(str(e))
else:
return signature_value


def _is_base64_encoded(str_maybe_base64):
# Check if the length is a multiple of 4 and matches the Base64 character set
base64_pattern = re.compile(r"^[A-Za-z0-9+/]*={0,2}$")
return len(str_maybe_base64) % 4 == 0 and bool(
base64_pattern.match(str_maybe_base64)
)

0 comments on commit 56e962c

Please sign in to comment.