diff --git a/sigma/processing/conditions.py b/sigma/processing/conditions.py index a3f0a958..40167e6b 100644 --- a/sigma/processing/conditions.py +++ b/sigma/processing/conditions.py @@ -188,14 +188,8 @@ def match( @dataclass -class RuleContainsDetectionItemCondition(RuleProcessingCondition): - """Returns True if rule contains a detection item that matches the given field name and value.""" - - field: Optional[str] - value: Union[str, int, float, bool] - - def __post_init__(self): - self.sigma_value = sigma_type(self.value) +class RuleDetectionItemCondition(RuleProcessingCondition, ABC): + """Base class for rule conditions that search for a detection item with certain properties.""" def match( self, @@ -210,6 +204,41 @@ def match( elif isinstance(rule, SigmaCorrelationRule): return False + @abstractmethod + def find_detection_item(self, detection: Union[SigmaDetectionItem, SigmaDetection]) -> bool: + pass + + +@dataclass +class RuleContainsFieldCondition(RuleDetectionItemCondition): + """Returns True if rule contains a field that matches the given field name.""" + + field: Optional[str] + + def find_detection_item(self, detection: Union[SigmaDetectionItem, SigmaDetection]) -> bool: + if isinstance(detection, SigmaDetection): + for detection_item in detection.detection_items: + if self.find_detection_item(detection_item): + return True + elif isinstance(detection, SigmaDetectionItem): + if detection.field is not None and detection.field == self.field: + return True + else: + raise TypeError("Parameter of type SigmaDetection or SigmaDetectionItem expected.") + + return False + + +@dataclass +class RuleContainsDetectionItemCondition(RuleDetectionItemCondition): + """Returns True if rule contains a detection item that matches the given field name and value.""" + + field: Optional[str] + value: Union[str, int, float, bool] + + def __post_init__(self): + self.sigma_value = sigma_type(self.value) + def find_detection_item(self, detection: Union[SigmaDetectionItem, SigmaDetection]) -> bool: if isinstance(detection, SigmaDetection): for detection_item in detection.detection_items: @@ -631,6 +660,7 @@ def match_detection_item( rule_conditions: Dict[str, RuleProcessingCondition] = { "logsource": LogsourceCondition, "contains_detection_item": RuleContainsDetectionItemCondition, + "contains_field": RuleContainsFieldCondition, "processing_item_applied": RuleProcessingItemAppliedCondition, "processing_state": RuleProcessingStateCondition, "is_sigma_rule": IsSigmaRuleCondition, diff --git a/tests/test_processing_conditions.py b/tests/test_processing_conditions.py index 4b1d75ff..9483f365 100644 --- a/tests/test_processing_conditions.py +++ b/tests/test_processing_conditions.py @@ -24,6 +24,7 @@ MatchStringCondition, ProcessingCondition, RuleContainsDetectionItemCondition, + RuleContainsFieldCondition, RuleProcessingCondition, RuleProcessingItemAppliedCondition, RuleAttributeCondition, @@ -232,6 +233,22 @@ def test_rule_contains_detection_item_correlation_rule( ) +def test_rule_contains_field_match(dummy_processing_pipeline, sigma_rule): + assert RuleContainsFieldCondition("fieldA").match(dummy_processing_pipeline, sigma_rule) + + +def test_rule_contains_field_nomatch(dummy_processing_pipeline, sigma_rule): + assert not RuleContainsFieldCondition("non_existing").match( + dummy_processing_pipeline, sigma_rule + ) + + +def test_rule_contains_field_correlation_rule(dummy_processing_pipeline, sigma_correlation_rule): + assert not RuleContainsFieldCondition("fieldA").match( + dummy_processing_pipeline, sigma_correlation_rule + ) + + def test_is_sigma_rule_with_rule(dummy_processing_pipeline, sigma_rule): assert IsSigmaRuleCondition().match(dummy_processing_pipeline, sigma_rule)