Skip to content

Commit

Permalink
Merge pull request #274 from kelnage/logsource-conditions-correlation
Browse files Browse the repository at this point in the history
Implement Correlation rules log source condition
  • Loading branch information
thomaspatzke authored Sep 9, 2024
2 parents d86fc47 + db19c1a commit 571c201
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 2 deletions.
8 changes: 7 additions & 1 deletion sigma/processing/conditions.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ def match_value(
@dataclass
class LogsourceCondition(RuleProcessingCondition):
"""
Matches log source on rule. Not specified log source fields are ignored.
Matches log source on rule. Not specified log source fields are ignored. For Correlation rules,
the condition returns true if any of the associated rules have the required log source fields.
"""

category: Optional[str] = field(default=None)
Expand All @@ -178,6 +179,11 @@ def match(
if isinstance(rule, SigmaRule):
return rule.logsource in self.logsource
elif isinstance(rule, SigmaCorrelationRule):
# Will only return true if the rules have been resolved in advance
for ref in rule.rules:
if hasattr(ref, "rule") and isinstance(ref.rule, (SigmaRule, SigmaCorrelationRule)):
if self.match(pipeline, ref.rule):
return True
return False


Expand Down
118 changes: 117 additions & 1 deletion tests/test_processing_conditions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import inspect
from typing import cast
from sigma.collection import SigmaCollection
from sigma.correlations import SigmaCorrelationRule
from sigma.types import SigmaNull, SigmaNumber, SigmaString
from sigma import processing
from sigma.exceptions import SigmaConfigurationError, SigmaRegularExpressionError
Expand Down Expand Up @@ -100,7 +103,41 @@ def test_logsource_no_match(dummy_processing_pipeline, sigma_rule):
)


def test_logsource_match_correlation_rule(dummy_processing_pipeline, sigma_correlation_rule):
def test_logsource_match_correlation_rule_cat(dummy_processing_pipeline, sigma_correlated_rules):
sigma_correlated_rules.resolve_rule_references()
assert LogsourceCondition(category="test_category").match(
dummy_processing_pipeline,
cast(SigmaCorrelationRule, sigma_correlated_rules.rules[-1]),
)


def test_logsource_match_correlation_rule_prod(dummy_processing_pipeline, sigma_correlated_rules):
sigma_correlated_rules.resolve_rule_references()
assert LogsourceCondition(product="test_product").match(
dummy_processing_pipeline,
cast(SigmaCorrelationRule, sigma_correlated_rules.rules[-1]),
)


def test_logsource_no_match_correlation_rule_both(
dummy_processing_pipeline, sigma_correlated_rules
):
sigma_correlated_rules.resolve_rule_references()
assert not LogsourceCondition(category="test_category", product="test_product").match(
dummy_processing_pipeline,
cast(SigmaCorrelationRule, sigma_correlated_rules.rules[-1]),
)


def test_logsource_no_match_correlation_rule(dummy_processing_pipeline, sigma_correlated_rules):
sigma_correlated_rules.resolve_rule_references()
assert not LogsourceCondition(service="test_service").match(
dummy_processing_pipeline,
cast(SigmaCorrelationRule, sigma_correlated_rules.rules[-1]),
)


def test_logsource_no_rule_correlation_rule(dummy_processing_pipeline, sigma_correlation_rule):
assert not LogsourceCondition(category="test_category", product="other_product").match(
dummy_processing_pipeline,
sigma_correlation_rule,
Expand Down Expand Up @@ -553,3 +590,82 @@ def condition_class_filter(c):
raise AssertionError(
f"Class {name} is not a rule, detection item or field name condition"
)


@pytest.fixture
def sigma_correlated_rules():
return SigmaCollection.from_dicts(
[
{
"title": "Test 1",
"name": "testrule_1",
"logsource": {"category": "test_category"},
"detection": {
"test": [
{
"field1": "value1",
"field2": "value2",
"field3": "value3",
}
],
"condition": "test",
},
"fields": [
"otherfield1",
"field1",
"field2",
"field3",
"otherfield2",
],
},
{
"title": "Test 2",
"name": "testrule_2",
"logsource": {"product": "test_product"},
"detection": {
"test": [
{
"field1": "value1",
"field2": "value2",
"field3": "value3",
}
],
"condition": "test",
},
"fields": [
"otherfield1",
"field1",
"field2",
"field3",
"otherfield2",
],
},
{
"title": "Test",
"status": "test",
"correlation": {
"type": "value_count",
"rules": [
"testrule_1",
"testrule_2",
],
"timespan": "5m",
"group-by": [
"testalias",
"field2",
"field3",
],
"condition": {
"gte": 10,
"field": "field1",
},
"aliases": {
"testalias": {
"testrule_1": "field1",
"testrule_2": "field2",
},
},
},
},
]
)

0 comments on commit 571c201

Please sign in to comment.