Files
sigma-rules/tests/test_mappings.py
T
Eric Forte 47d7a3acaa [DaC] Beta Release (#3889)
Co-authored-by: Justin Ibarra <16747370+brokensound77@users.noreply.github.com>
Co-authored-by: brokensound77 <brokensound77@users.noreply.github.com>
Co-authored-by: Mika Ayenson <Mikaayenson@users.noreply.github.com>
Co-authored-by: Mika Ayenson <mika.ayenson@elastic.co>
2024-08-06 18:07:12 -04:00

91 lines
4.0 KiB
Python

# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
"""Test that all rules appropriately match against expected data sets."""
import copy
import unittest
import warnings
from . import get_data_files, get_fp_data_files
from detection_rules.config import parse_rules_config
from detection_rules.utils import combine_sources, evaluate, load_etc_dump
from rta import get_available_tests
from .base import BaseRuleTest
RULES_CONFIG = parse_rules_config()
class TestMappings(BaseRuleTest):
"""Test that all rules appropriately match against expected data sets."""
FP_FILES = get_fp_data_files()
def evaluate(self, documents, rule, expected, msg):
"""KQL engine to evaluate."""
filtered = evaluate(rule, documents, RULES_CONFIG.normalize_kql_keywords)
self.assertEqual(expected, len(filtered), msg)
return filtered
def test_true_positives(self):
"""Test that expected results return against true positives."""
mismatched_ecs = []
mappings = load_etc_dump('rule-mapping.yaml')
for rule in self.all_rules:
if rule.contents.data.type == "query" and rule.contents.data.language == "kuery":
if rule.id not in mappings:
continue
mapping = mappings[rule.id]
expected = mapping['count']
sources = mapping.get('sources')
rta_file = mapping['rta_name']
# ensure sources is defined and not empty; schema allows it to not be set since 'pending' bypasses
self.assertTrue(sources, 'No sources defined for: {} - {} '.format(rule.id, rule.name))
msg = 'Expected TP results did not match for: {} - {}'.format(rule.id, rule.name)
data_files = [get_data_files('true_positives', rta_file).get(s) for s in sources]
data_file = combine_sources(*data_files)
results = self.evaluate(data_file, rule, expected, msg)
ecs_versions = set([r.get('ecs', {}).get('version') for r in results])
rule_ecs = set(rule.metadata.get('ecs_version').copy())
if not ecs_versions & rule_ecs:
msg = '{} - {} ecs_versions ({}) not in source data versions ({})'.format(
rule.id, rule.name, ', '.join(rule_ecs), ', '.join(ecs_versions))
mismatched_ecs.append(msg)
if mismatched_ecs:
msg = 'Rules detected with source data from ecs versions not listed within the rule: \n{}'.format(
'\n'.join(mismatched_ecs))
warnings.warn(msg)
def test_false_positives(self):
"""Test that expected results return against false positives."""
for rule in self.all_rules:
if rule.contents.data.type == "query" and rule.contents.data.language == "kuery":
for fp_name, merged_data in get_fp_data_files().items():
msg = 'Unexpected FP match for: {} - {}, against: {}'.format(rule.id, rule.name, fp_name)
self.evaluate(copy.deepcopy(merged_data), rule, 0, msg)
class TestRTAs(unittest.TestCase):
"""Test that all RTAs have appropriate fields added."""
def test_rtas_with_triggered_rules_have_uuid(self):
"""Ensure that all RTAs with triggered rules have a UUID."""
rule_keys = ["rule_id", "rule_name"]
for rta_test in sorted(get_available_tests().values(), key=lambda r: r['name']):
self.assertIsNotNone(rta_test.get("uuid"), f'RTA {rta_test.get("name")} missing uuid')
for rule_info in rta_test.get("siem") or []:
for rule_key in rule_keys:
self.assertIsNotNone(rule_info.get(rule_key),
f'RTA {rta_test.get("name")} - {rta_test.get("uuid")} missing {rule_key}')