[Bug] Fix Tag Navigator Generation (#2875)
* bug fix for tag navigator generation * addressing flake errors * added unit test to ensure prefix exists * updated unit test case sensitivity * moved expected tags to definitions.py * removed expected prefixes * revert downloadable updates JSON file
This commit is contained in:
@@ -5,6 +5,7 @@
|
||||
|
||||
"""Create summary documents for a rule package."""
|
||||
|
||||
from functools import reduce
|
||||
from collections import defaultdict
|
||||
from dataclasses import dataclass, field, fields
|
||||
from pathlib import Path
|
||||
@@ -17,6 +18,7 @@ from . import utils
|
||||
from .attack import CURRENT_ATTACK_VERSION
|
||||
from .mixins import MarshmallowDataclassMixin
|
||||
from .rule import TOMLRule
|
||||
from .schemas import definitions
|
||||
|
||||
|
||||
_DEFAULT_PLATFORMS = [
|
||||
@@ -186,6 +188,8 @@ class NavigatorBuilder:
|
||||
def _update_tags(self, rule: TOMLRule, tactic: str, technique_id: str):
|
||||
for tag in rule.contents.data.get('tags', []):
|
||||
value = rule.id
|
||||
expected_prefixes = set([tag.split(":")[0] + ":" for tag in definitions.EXPECTED_RULE_TAGS])
|
||||
tag = reduce(lambda s, substr: s.replace(substr, ''), expected_prefixes, tag).lstrip()
|
||||
layer_key = tag.replace(' ', '-').lower()
|
||||
self.add_rule_to_technique(rule, 'tags', tactic, technique_id, value, layer_key=layer_key)
|
||||
|
||||
|
||||
@@ -54,6 +54,66 @@ TIMELINE_TEMPLATES: Final[dict] = {
|
||||
'4434b91a-94ca-4a89-83cb-a37cdc0532b7': 'Alerts Involving a Single Host Timeline'
|
||||
}
|
||||
|
||||
EXPECTED_RULE_TAGS = [
|
||||
'Data Source: Active Directory',
|
||||
'Data Source: Amazon Web Services',
|
||||
'Data Source: AWS',
|
||||
'Data Source: APM',
|
||||
'Data Source: Azure',
|
||||
'Data Source: CyberArk PAS',
|
||||
'Data Source: Elastic Defend',
|
||||
'Data Source: Elastic Defend for Containers',
|
||||
'Data Source: Elastic Endgame',
|
||||
'Data Source: GCP',
|
||||
'Data Source: Google Cloud Platform',
|
||||
'Data Source: Google Workspace',
|
||||
'Data Source: Kubernetes',
|
||||
'Data Source: Microsoft 365',
|
||||
'Data Source: Okta',
|
||||
'Data Source: PowerShell Logs',
|
||||
'Data Source: Sysmon Only',
|
||||
'Data Source: Zoom',
|
||||
'Domain: Cloud',
|
||||
'Domain: Container',
|
||||
'Domain: Endpoint',
|
||||
'OS: Linux',
|
||||
'OS: macOS',
|
||||
'OS: Windows',
|
||||
'Resources: Investigation Guide',
|
||||
'Rule Type: Higher-Order Rule',
|
||||
'Rule Type: Machine Learning',
|
||||
'Rule Type: ML',
|
||||
'Tactic: Collection',
|
||||
'Tactic: Command and Control',
|
||||
'Tactic: Credential Access',
|
||||
'Tactic: Defense Evasion',
|
||||
'Tactic: Discovery',
|
||||
'Tactic: Execution',
|
||||
'Tactic: Exfiltration',
|
||||
'Tactic: Impact',
|
||||
'Tactic: Initial Access',
|
||||
'Tactic: Lateral Movement',
|
||||
'Tactic: Persistence',
|
||||
'Tactic: Privilege Escalation',
|
||||
'Tactic: Reconnaissance',
|
||||
'Tactic: Resource Development',
|
||||
'Threat: BPFDoor',
|
||||
'Threat: Cobalt Strike',
|
||||
'Threat: Lightning Framework',
|
||||
'Threat: Orbit',
|
||||
'Threat: Rootkit',
|
||||
'Threat: TripleCross',
|
||||
'Use Case: Active Directory Monitoring',
|
||||
'Use Case: Asset Visibility',
|
||||
'Use Case: Configuration Audit',
|
||||
'Use Case: Guided Onboarding',
|
||||
'Use Case: Identity and Access Audit',
|
||||
'Use Case: Log Auditing',
|
||||
'Use Case: Network Security Monitoring',
|
||||
'Use Case: Threat Detection',
|
||||
'Use Case: Vulnerability'
|
||||
]
|
||||
|
||||
|
||||
NonEmptyStr = NewType('NonEmptyStr', str, validate=validate.Length(min=1))
|
||||
TimeUnits = Literal['s', 'm', 'h']
|
||||
|
||||
@@ -44,7 +44,7 @@ references = ["https://www.elastic.co/guide/en/security/current/prebuilt-rules.h
|
||||
risk_score = 21
|
||||
rule_id = "a198fbbd-9413-45ec-a269-47ae4ccf59ce"
|
||||
severity = "low"
|
||||
tags = ["Use case: Guided Onboarding", "Data Source: APM", "OS: Windows", "Data Source: Elastic Endgame"]
|
||||
tags = ["Use Case: Guided Onboarding", "Data Source: APM", "OS: Windows", "Data Source: Elastic Endgame"]
|
||||
timestamp_override = "event.ingested"
|
||||
type = "threshold"
|
||||
|
||||
|
||||
+13
-61
@@ -242,67 +242,7 @@ class TestRuleTags(BaseRuleTest):
|
||||
def test_casing_and_spacing(self):
|
||||
"""Ensure consistent and expected casing for controlled tags."""
|
||||
|
||||
expected_tags = [
|
||||
'Data Source: Active Directory',
|
||||
'Data Source: Amazon Web Services',
|
||||
'Data Source: AWS',
|
||||
'Data Source: APM',
|
||||
'Data Source: Azure',
|
||||
'Data Source: CyberArk PAS',
|
||||
'Data Source: Elastic Defend',
|
||||
'Data Source: Elastic Defend for Containers',
|
||||
'Data Source: Elastic Endgame',
|
||||
'Data Source: GCP',
|
||||
'Data Source: Google Cloud Platform',
|
||||
'Data Source: Google Workspace',
|
||||
'Data Source: Kubernetes',
|
||||
'Data Source: Microsoft 365',
|
||||
'Data Source: Okta',
|
||||
'Data Source: PowerShell Logs',
|
||||
'Data Source: Sysmon Only',
|
||||
'Data Source: Zoom',
|
||||
'Domain: Cloud',
|
||||
'Domain: Container',
|
||||
'Domain: Endpoint',
|
||||
'OS: Linux',
|
||||
'OS: macOS',
|
||||
'OS: Windows',
|
||||
'Resources: Investigation Guide',
|
||||
'Rule Type: Higher-Order Rule',
|
||||
'Rule Type: Machine Learning',
|
||||
'Rule Type: ML',
|
||||
'Tactic: Collection',
|
||||
'Tactic: Command and Control',
|
||||
'Tactic: Credential Access',
|
||||
'Tactic: Defense Evasion',
|
||||
'Tactic: Discovery',
|
||||
'Tactic: Execution',
|
||||
'Tactic: Exfiltration',
|
||||
'Tactic: Impact',
|
||||
'Tactic: Initial Access',
|
||||
'Tactic: Lateral Movement',
|
||||
'Tactic: Persistence',
|
||||
'Tactic: Privilege Escalation',
|
||||
'Tactic: Reconnaissance',
|
||||
'Tactic: Resource Development',
|
||||
'Threat: BPFDoor',
|
||||
'Threat: Cobalt Strike',
|
||||
'Threat: Lightning Framework',
|
||||
'Threat: Orbit',
|
||||
'Threat: Rootkit',
|
||||
'Threat: TripleCross',
|
||||
'Use Case: Active Directory Monitoring',
|
||||
'Use Case: Asset Visibility',
|
||||
'Use Case: Configuration Audit',
|
||||
'Use case: Guided Onboarding',
|
||||
'Use Case: Identity and Access Audit',
|
||||
'Use Case: Log Auditing',
|
||||
'Use Case: Network Security Monitoring',
|
||||
'Use Case: Threat Detection',
|
||||
'Use Case: Vulnerability',
|
||||
]
|
||||
|
||||
expected_case = {t.casefold(): t for t in expected_tags}
|
||||
expected_case = {t.casefold(): t for t in definitions.EXPECTED_RULE_TAGS}
|
||||
|
||||
for rule in self.all_rules:
|
||||
rule_tags = rule.contents.data.tags
|
||||
@@ -468,6 +408,18 @@ class TestRuleTags(BaseRuleTest):
|
||||
err_msg = '\n'.join(invalid)
|
||||
self.fail(f'Rules with missing Investigation tag:\n{err_msg}')
|
||||
|
||||
def test_tag_prefix(self):
|
||||
"""Ensure all tags have a prefix from an expected list."""
|
||||
invalid = []
|
||||
|
||||
for rule in self.all_rules:
|
||||
rule_tags = rule.contents.data.tags
|
||||
expected_prefixes = set([tag.split(":")[0] + ":" for tag in definitions.EXPECTED_RULE_TAGS])
|
||||
[invalid.append(f"{self.rule_str(rule)}-{tag}") for tag in rule_tags
|
||||
if not any(prefix in tag for prefix in expected_prefixes)]
|
||||
if invalid:
|
||||
self.fail(f'Rules with invalid tags:\n{invalid}')
|
||||
|
||||
|
||||
class TestRuleTimelines(BaseRuleTest):
|
||||
"""Test timelines in rules are valid."""
|
||||
|
||||
Reference in New Issue
Block a user