Skip to content

Commit

Permalink
RuleContainsFieldCondition/contains_field
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaspatzke committed Sep 13, 2024
1 parent d86fc47 commit 1e034c9
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 8 deletions.
46 changes: 38 additions & 8 deletions sigma/processing/conditions.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,8 @@ def match(


@dataclass
class RuleContainsDetectionItemCondition(RuleProcessingCondition):
"""Returns True if rule contains a detection item that matches the given field name and value."""

field: Optional[str]
value: Union[str, int, float, bool]

def __post_init__(self):
self.sigma_value = sigma_type(self.value)
class RuleDetectionItemCondition(RuleProcessingCondition, ABC):
"""Base class for rule conditions that search for a detection item with certain properties."""

def match(
self,
Expand All @@ -204,6 +198,41 @@ def match(
elif isinstance(rule, SigmaCorrelationRule):
return False

@abstractmethod
def find_detection_item(self, detection: Union[SigmaDetectionItem, SigmaDetection]) -> bool:
pass


@dataclass
class RuleContainsFieldCondition(RuleDetectionItemCondition):
"""Returns True if rule contains a field that matches the given field name."""

field: Optional[str]

def find_detection_item(self, detection: Union[SigmaDetectionItem, SigmaDetection]) -> bool:
if isinstance(detection, SigmaDetection):
for detection_item in detection.detection_items:
if self.find_detection_item(detection_item):
return True
elif isinstance(detection, SigmaDetectionItem):
if detection.field is not None and detection.field == self.field:
return True
else:
raise TypeError("Parameter of type SigmaDetection or SigmaDetectionItem expected.")

return False


@dataclass
class RuleContainsDetectionItemCondition(RuleDetectionItemCondition):
"""Returns True if rule contains a detection item that matches the given field name and value."""

field: Optional[str]
value: Union[str, int, float, bool]

def __post_init__(self):
self.sigma_value = sigma_type(self.value)

def find_detection_item(self, detection: Union[SigmaDetectionItem, SigmaDetection]) -> bool:
if isinstance(detection, SigmaDetection):
for detection_item in detection.detection_items:
Expand Down Expand Up @@ -625,6 +654,7 @@ def match_detection_item(
rule_conditions: Dict[str, RuleProcessingCondition] = {
"logsource": LogsourceCondition,
"contains_detection_item": RuleContainsDetectionItemCondition,
"contains_field": RuleContainsFieldCondition,
"processing_item_applied": RuleProcessingItemAppliedCondition,
"processing_state": RuleProcessingStateCondition,
"is_sigma_rule": IsSigmaRuleCondition,
Expand Down
17 changes: 17 additions & 0 deletions tests/test_processing_conditions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
MatchStringCondition,
ProcessingCondition,
RuleContainsDetectionItemCondition,
RuleContainsFieldCondition,
RuleProcessingCondition,
RuleProcessingItemAppliedCondition,
RuleAttributeCondition,
Expand Down Expand Up @@ -195,6 +196,22 @@ def test_rule_contains_detection_item_correlation_rule(
)


def test_rule_contains_field_match(dummy_processing_pipeline, sigma_rule):
assert RuleContainsFieldCondition("fieldA").match(dummy_processing_pipeline, sigma_rule)


def test_rule_contains_field_nomatch(dummy_processing_pipeline, sigma_rule):
assert not RuleContainsFieldCondition("non_existing").match(
dummy_processing_pipeline, sigma_rule
)


def test_rule_contains_field_correlation_rule(dummy_processing_pipeline, sigma_correlation_rule):
assert not RuleContainsFieldCondition("fieldA").match(
dummy_processing_pipeline, sigma_correlation_rule
)


def test_is_sigma_rule_with_rule(dummy_processing_pipeline, sigma_rule):
assert IsSigmaRuleCondition().match(dummy_processing_pipeline, sigma_rule)

Expand Down

0 comments on commit 1e034c9

Please sign in to comment.