diff --git a/detection_rules/navigator.py b/detection_rules/navigator.py index c209bc106..1cba01c2e 100644 --- a/detection_rules/navigator.py +++ b/detection_rules/navigator.py @@ -5,6 +5,7 @@ """Create summary documents for a rule package.""" +from functools import reduce from collections import defaultdict from dataclasses import dataclass, field, fields from pathlib import Path @@ -17,6 +18,7 @@ from . import utils from .attack import CURRENT_ATTACK_VERSION from .mixins import MarshmallowDataclassMixin from .rule import TOMLRule +from .schemas import definitions _DEFAULT_PLATFORMS = [ @@ -186,6 +188,8 @@ class NavigatorBuilder: def _update_tags(self, rule: TOMLRule, tactic: str, technique_id: str): for tag in rule.contents.data.get('tags', []): value = rule.id + expected_prefixes = set([tag.split(":")[0] + ":" for tag in definitions.EXPECTED_RULE_TAGS]) + tag = reduce(lambda s, substr: s.replace(substr, ''), expected_prefixes, tag).lstrip() layer_key = tag.replace(' ', '-').lower() self.add_rule_to_technique(rule, 'tags', tactic, technique_id, value, layer_key=layer_key) diff --git a/detection_rules/schemas/definitions.py b/detection_rules/schemas/definitions.py index 8b0d2dbce..5fafb31ed 100644 --- a/detection_rules/schemas/definitions.py +++ b/detection_rules/schemas/definitions.py @@ -54,6 +54,66 @@ TIMELINE_TEMPLATES: Final[dict] = { '4434b91a-94ca-4a89-83cb-a37cdc0532b7': 'Alerts Involving a Single Host Timeline' } +EXPECTED_RULE_TAGS = [ + 'Data Source: Active Directory', + 'Data Source: Amazon Web Services', + 'Data Source: AWS', + 'Data Source: APM', + 'Data Source: Azure', + 'Data Source: CyberArk PAS', + 'Data Source: Elastic Defend', + 'Data Source: Elastic Defend for Containers', + 'Data Source: Elastic Endgame', + 'Data Source: GCP', + 'Data Source: Google Cloud Platform', + 'Data Source: Google Workspace', + 'Data Source: Kubernetes', + 'Data Source: Microsoft 365', + 'Data Source: Okta', + 'Data Source: PowerShell Logs', + 'Data Source: Sysmon Only', + 'Data Source: Zoom', + 'Domain: Cloud', + 'Domain: Container', + 'Domain: Endpoint', + 'OS: Linux', + 'OS: macOS', + 'OS: Windows', + 'Resources: Investigation Guide', + 'Rule Type: Higher-Order Rule', + 'Rule Type: Machine Learning', + 'Rule Type: ML', + 'Tactic: Collection', + 'Tactic: Command and Control', + 'Tactic: Credential Access', + 'Tactic: Defense Evasion', + 'Tactic: Discovery', + 'Tactic: Execution', + 'Tactic: Exfiltration', + 'Tactic: Impact', + 'Tactic: Initial Access', + 'Tactic: Lateral Movement', + 'Tactic: Persistence', + 'Tactic: Privilege Escalation', + 'Tactic: Reconnaissance', + 'Tactic: Resource Development', + 'Threat: BPFDoor', + 'Threat: Cobalt Strike', + 'Threat: Lightning Framework', + 'Threat: Orbit', + 'Threat: Rootkit', + 'Threat: TripleCross', + 'Use Case: Active Directory Monitoring', + 'Use Case: Asset Visibility', + 'Use Case: Configuration Audit', + 'Use Case: Guided Onboarding', + 'Use Case: Identity and Access Audit', + 'Use Case: Log Auditing', + 'Use Case: Network Security Monitoring', + 'Use Case: Threat Detection', + 'Use Case: Vulnerability' +] + NonEmptyStr = NewType('NonEmptyStr', str, validate=validate.Length(min=1)) TimeUnits = Literal['s', 'm', 'h'] diff --git a/rules/cross-platform/guided_onboarding_sample_rule.toml b/rules/cross-platform/guided_onboarding_sample_rule.toml index ec4752264..fd1903a8e 100644 --- a/rules/cross-platform/guided_onboarding_sample_rule.toml +++ b/rules/cross-platform/guided_onboarding_sample_rule.toml @@ -44,7 +44,7 @@ references = ["https://www.elastic.co/guide/en/security/current/prebuilt-rules.h risk_score = 21 rule_id = "a198fbbd-9413-45ec-a269-47ae4ccf59ce" severity = "low" -tags = ["Use case: Guided Onboarding", "Data Source: APM", "OS: Windows", "Data Source: Elastic Endgame"] +tags = ["Use Case: Guided Onboarding", "Data Source: APM", "OS: Windows", "Data Source: Elastic Endgame"] timestamp_override = "event.ingested" type = "threshold" diff --git a/tests/test_all_rules.py b/tests/test_all_rules.py index f13325bd7..a7875c395 100644 --- a/tests/test_all_rules.py +++ b/tests/test_all_rules.py @@ -242,67 +242,7 @@ class TestRuleTags(BaseRuleTest): def test_casing_and_spacing(self): """Ensure consistent and expected casing for controlled tags.""" - expected_tags = [ - 'Data Source: Active Directory', - 'Data Source: Amazon Web Services', - 'Data Source: AWS', - 'Data Source: APM', - 'Data Source: Azure', - 'Data Source: CyberArk PAS', - 'Data Source: Elastic Defend', - 'Data Source: Elastic Defend for Containers', - 'Data Source: Elastic Endgame', - 'Data Source: GCP', - 'Data Source: Google Cloud Platform', - 'Data Source: Google Workspace', - 'Data Source: Kubernetes', - 'Data Source: Microsoft 365', - 'Data Source: Okta', - 'Data Source: PowerShell Logs', - 'Data Source: Sysmon Only', - 'Data Source: Zoom', - 'Domain: Cloud', - 'Domain: Container', - 'Domain: Endpoint', - 'OS: Linux', - 'OS: macOS', - 'OS: Windows', - 'Resources: Investigation Guide', - 'Rule Type: Higher-Order Rule', - 'Rule Type: Machine Learning', - 'Rule Type: ML', - 'Tactic: Collection', - 'Tactic: Command and Control', - 'Tactic: Credential Access', - 'Tactic: Defense Evasion', - 'Tactic: Discovery', - 'Tactic: Execution', - 'Tactic: Exfiltration', - 'Tactic: Impact', - 'Tactic: Initial Access', - 'Tactic: Lateral Movement', - 'Tactic: Persistence', - 'Tactic: Privilege Escalation', - 'Tactic: Reconnaissance', - 'Tactic: Resource Development', - 'Threat: BPFDoor', - 'Threat: Cobalt Strike', - 'Threat: Lightning Framework', - 'Threat: Orbit', - 'Threat: Rootkit', - 'Threat: TripleCross', - 'Use Case: Active Directory Monitoring', - 'Use Case: Asset Visibility', - 'Use Case: Configuration Audit', - 'Use case: Guided Onboarding', - 'Use Case: Identity and Access Audit', - 'Use Case: Log Auditing', - 'Use Case: Network Security Monitoring', - 'Use Case: Threat Detection', - 'Use Case: Vulnerability', - ] - - expected_case = {t.casefold(): t for t in expected_tags} + expected_case = {t.casefold(): t for t in definitions.EXPECTED_RULE_TAGS} for rule in self.all_rules: rule_tags = rule.contents.data.tags @@ -468,6 +408,18 @@ class TestRuleTags(BaseRuleTest): err_msg = '\n'.join(invalid) self.fail(f'Rules with missing Investigation tag:\n{err_msg}') + def test_tag_prefix(self): + """Ensure all tags have a prefix from an expected list.""" + invalid = [] + + for rule in self.all_rules: + rule_tags = rule.contents.data.tags + expected_prefixes = set([tag.split(":")[0] + ":" for tag in definitions.EXPECTED_RULE_TAGS]) + [invalid.append(f"{self.rule_str(rule)}-{tag}") for tag in rule_tags + if not any(prefix in tag for prefix in expected_prefixes)] + if invalid: + self.fail(f'Rules with invalid tags:\n{invalid}') + class TestRuleTimelines(BaseRuleTest): """Test timelines in rules are valid."""