Add unit tests for rule tags (#359)

This commit is contained in:
Justin Ibarra
2020-10-08 05:29:19 +02:00
committed by GitHub
parent f34c96f4dc
commit 758e4a2c5b
6 changed files with 87 additions and 5 deletions
@@ -19,7 +19,7 @@ references = [
risk_score = 73
rule_id = "ad88231f-e2ab-491c-8fc6-64746da26cfe"
severity = "high"
tags = ["Elastic", "MacOS"]
tags = ["Elastic", "macOS"]
type = "query"
query = '''
@@ -20,7 +20,7 @@ references = ["https://objective-see.com/blog/blog_0x25.html"]
risk_score = 73
rule_id = "96e90768-c3b7-4df6-b5d9-6237f8bc36a8"
severity = "high"
tags = ["Elastic", "MacOS"]
tags = ["Elastic", "macOS"]
type = "query"
query = '''
@@ -19,7 +19,7 @@ references = [
risk_score = 47
rule_id = "5ae4e6f8-d1bf-40fa-96ba-e29645e1e4dc"
severity = "medium"
tags = ["Elastic", "MacOS"]
tags = ["Elastic", "macOS"]
type = "query"
query = '''
@@ -29,7 +29,7 @@ references = [
risk_score = 21
rule_id = "1d72d014-e2ab-4707-b056-9b96abe7b511"
severity = "low"
tags = ["Elastic", "Network", "Threat Detection, Preventing and Hunting", "Post-Execution"]
tags = ["Elastic", "Network"]
type = "query"
query = '''
+1 -1
View File
@@ -19,7 +19,7 @@ risk_score = 47
rule_id = "eb079c62-4481-4d6e-9643-3ca499df7aaa"
rule_name_override = "message"
severity = "medium"
tags = ["Elastic"]
tags = ["Elastic", "Windows", "APM", "Network", "macOS", "Linux"]
timestamp_override = "event.ingested"
type = "query"
+82
View File
@@ -186,3 +186,85 @@ class TestThreatMappings(unittest.TestCase):
mitre_url = 'https://attack.mitre.org/resources/updates/'
warning_str = f'The following rules are using deprecated ATT&CK techniques ({mitre_url}):\n{deprecated_str}'
warnings.warn(warning_str)
class TestRuleTags(unittest.TestCase):
"""Test tags data for rules."""
def test_casing_and_spacing(self):
"""Ensure consistent and expected casing for controlled tags."""
rules = rule_loader.load_rules().values()
def normalize(s):
return ''.join(s.lower().split())
expected_tags = [
'APM', 'AWS', 'Asset Visibility', 'Azure', 'Configuration Audit', 'Continuous Monitoring',
'Data Protection', 'Elastic', 'Endpoint', 'GCP', 'Identity and Access', 'Linux', 'Logging', 'ML', 'macOS',
'Monitoring', 'Network', 'Okta', 'Packetbeat', 'Post-Execution', 'SecOps', 'Windows'
]
expected_case = {normalize(t): t for t in expected_tags}
for rule in rules:
rule_tags = rule.contents.get('tags')
if rule_tags:
invalid_tags = {t: expected_case[normalize(t)] for t in rule_tags
if normalize(t) in list(expected_case) and t != expected_case[normalize(t)]}
if invalid_tags:
error_msg = f'{rule.id} - {rule.name} -> Invalid casing for expected tags\n'
error_msg += f'Actual tags: {", ".join(invalid_tags)}\n'
error_msg += f'Expected tags: {", ".join(invalid_tags.values())}'
self.fail(error_msg)
def test_required_tags(self):
"""Test that expected tags are present within rules."""
rules = rule_loader.load_rules().values()
# indexes considered; only those with obvious relationships included
# 'apm-*-transaction*', 'auditbeat-*', 'endgame-*', 'filebeat-*', 'logs-*', 'logs-aws*',
# 'logs-endpoint.alerts-*', 'logs-endpoint.events.*', 'logs-okta*', 'packetbeat-*', 'winlogbeat-*'
required_tags_map = {
'apm-*-transaction*': {'all': ['APM']},
'auditbeat-*': {'any': ['Windows', 'macOS', 'Linux']},
'endgame-*': {'all': ['Endpoint']},
'logs-aws*': {'all': ['AWS']},
'logs-endpoint.alerts-*': {'all': ['Endpoint']},
'logs-endpoint.events.*': {'any': ['Windows', 'macOS', 'Linux']},
'logs-okta*': {'all': ['Okta']},
'packetbeat-*': {'all': ['Network']},
'winlogbeat-*': {'all': ['Windows']}
}
for rule in rules:
rule_tags = rule.contents.get('tags', [])
indexes = rule.contents.get('index', [])
error_msg = f'{rule.id} - {rule.name} -> Missing tags:\nActual tags: {", ".join(rule_tags)}'
consolidated_optional_tags = []
is_missing_any_tags = False
missing_required_tags = set()
if 'Elastic' not in rule_tags:
missing_required_tags.add('Elastic')
for index in indexes:
expected_tags = required_tags_map.get(index, {})
expected_all = expected_tags.get('all', [])
expected_any = expected_tags.get('any', [])
existing_any_tags = [t for t in rule_tags if t in expected_any]
if expected_any:
# consolidate optional any tags which are not in use
consolidated_optional_tags.extend(t for t in expected_any if t not in existing_any_tags)
missing_required_tags.update(set(expected_all).difference(set(rule_tags)))
is_missing_any_tags = expected_any and not set(expected_any) & set(existing_any_tags)
consolidated_optional_tags = [t for t in consolidated_optional_tags if t not in missing_required_tags]
error_msg += f'\nMissing all of: {", ".join(missing_required_tags)}' if missing_required_tags else ''
error_msg += f'\nMissing any of: {", " .join(consolidated_optional_tags)}' if is_missing_any_tags else ''
if missing_required_tags or is_missing_any_tags:
self.fail(error_msg)