From 1e034c9dfccc4691d03f0cae427638facfe0ff3d Mon Sep 17 00:00:00 2001 From: Thomas Patzke Date: Sat, 14 Sep 2024 00:11:18 +0200 Subject: [PATCH] RuleContainsFieldCondition/contains_field --- sigma/processing/conditions.py | 46 ++++++++++++++++++++++++----- tests/test_processing_conditions.py | 17 +++++++++++ 2 files changed, 55 insertions(+), 8 deletions(-) diff --git a/sigma/processing/conditions.py b/sigma/processing/conditions.py index 91da6647..d193f100 100644 --- a/sigma/processing/conditions.py +++ b/sigma/processing/conditions.py @@ -182,14 +182,8 @@ def match( @dataclass -class RuleContainsDetectionItemCondition(RuleProcessingCondition): - """Returns True if rule contains a detection item that matches the given field name and value.""" - - field: Optional[str] - value: Union[str, int, float, bool] - - def __post_init__(self): - self.sigma_value = sigma_type(self.value) +class RuleDetectionItemCondition(RuleProcessingCondition, ABC): + """Base class for rule conditions that search for a detection item with certain properties.""" def match( self, @@ -204,6 +198,41 @@ def match( elif isinstance(rule, SigmaCorrelationRule): return False + @abstractmethod + def find_detection_item(self, detection: Union[SigmaDetectionItem, SigmaDetection]) -> bool: + pass + + +@dataclass +class RuleContainsFieldCondition(RuleDetectionItemCondition): + """Returns True if rule contains a field that matches the given field name.""" + + field: Optional[str] + + def find_detection_item(self, detection: Union[SigmaDetectionItem, SigmaDetection]) -> bool: + if isinstance(detection, SigmaDetection): + for detection_item in detection.detection_items: + if self.find_detection_item(detection_item): + return True + elif isinstance(detection, SigmaDetectionItem): + if detection.field is not None and detection.field == self.field: + return True + else: + raise TypeError("Parameter of type SigmaDetection or SigmaDetectionItem expected.") + + return False + + +@dataclass +class RuleContainsDetectionItemCondition(RuleDetectionItemCondition): + """Returns True if rule contains a detection item that matches the given field name and value.""" + + field: Optional[str] + value: Union[str, int, float, bool] + + def __post_init__(self): + self.sigma_value = sigma_type(self.value) + def find_detection_item(self, detection: Union[SigmaDetectionItem, SigmaDetection]) -> bool: if isinstance(detection, SigmaDetection): for detection_item in detection.detection_items: @@ -625,6 +654,7 @@ def match_detection_item( rule_conditions: Dict[str, RuleProcessingCondition] = { "logsource": LogsourceCondition, "contains_detection_item": RuleContainsDetectionItemCondition, + "contains_field": RuleContainsFieldCondition, "processing_item_applied": RuleProcessingItemAppliedCondition, "processing_state": RuleProcessingStateCondition, "is_sigma_rule": IsSigmaRuleCondition, diff --git a/tests/test_processing_conditions.py b/tests/test_processing_conditions.py index ba34260c..58d58bda 100644 --- a/tests/test_processing_conditions.py +++ b/tests/test_processing_conditions.py @@ -21,6 +21,7 @@ MatchStringCondition, ProcessingCondition, RuleContainsDetectionItemCondition, + RuleContainsFieldCondition, RuleProcessingCondition, RuleProcessingItemAppliedCondition, RuleAttributeCondition, @@ -195,6 +196,22 @@ def test_rule_contains_detection_item_correlation_rule( ) +def test_rule_contains_field_match(dummy_processing_pipeline, sigma_rule): + assert RuleContainsFieldCondition("fieldA").match(dummy_processing_pipeline, sigma_rule) + + +def test_rule_contains_field_nomatch(dummy_processing_pipeline, sigma_rule): + assert not RuleContainsFieldCondition("non_existing").match( + dummy_processing_pipeline, sigma_rule + ) + + +def test_rule_contains_field_correlation_rule(dummy_processing_pipeline, sigma_correlation_rule): + assert not RuleContainsFieldCondition("fieldA").match( + dummy_processing_pipeline, sigma_correlation_rule + ) + + def test_is_sigma_rule_with_rule(dummy_processing_pipeline, sigma_rule): assert IsSigmaRuleCondition().match(dummy_processing_pipeline, sigma_rule)