diff --git a/rules/macos/credential_access_kerberosdump_kcc.toml b/rules/macos/credential_access_kerberosdump_kcc.toml index 1e88b3843..67e263e1c 100644 --- a/rules/macos/credential_access_kerberosdump_kcc.toml +++ b/rules/macos/credential_access_kerberosdump_kcc.toml @@ -19,7 +19,7 @@ references = [ risk_score = 73 rule_id = "ad88231f-e2ab-491c-8fc6-64746da26cfe" severity = "high" -tags = ["Elastic", "MacOS"] +tags = ["Elastic", "macOS"] type = "query" query = ''' diff --git a/rules/macos/exfiltration_compress_credentials_keychains.toml b/rules/macos/exfiltration_compress_credentials_keychains.toml index 3d59f4139..1d68d9a0d 100644 --- a/rules/macos/exfiltration_compress_credentials_keychains.toml +++ b/rules/macos/exfiltration_compress_credentials_keychains.toml @@ -20,7 +20,7 @@ references = ["https://objective-see.com/blog/blog_0x25.html"] risk_score = 73 rule_id = "96e90768-c3b7-4df6-b5d9-6237f8bc36a8" severity = "high" -tags = ["Elastic", "MacOS"] +tags = ["Elastic", "macOS"] type = "query" query = ''' diff --git a/rules/macos/lateral_movement_remote_ssh_login_enabled.toml b/rules/macos/lateral_movement_remote_ssh_login_enabled.toml index 3bdf4f275..6d9be34c7 100644 --- a/rules/macos/lateral_movement_remote_ssh_login_enabled.toml +++ b/rules/macos/lateral_movement_remote_ssh_login_enabled.toml @@ -19,7 +19,7 @@ references = [ risk_score = 47 rule_id = "5ae4e6f8-d1bf-40fa-96ba-e29645e1e4dc" severity = "medium" -tags = ["Elastic", "MacOS"] +tags = ["Elastic", "macOS"] type = "query" query = ''' diff --git a/rules/network/discovery_post_exploitation_public_ip_reconnaissance.toml b/rules/network/discovery_post_exploitation_public_ip_reconnaissance.toml index bc32a8d73..fc35ac27c 100644 --- a/rules/network/discovery_post_exploitation_public_ip_reconnaissance.toml +++ b/rules/network/discovery_post_exploitation_public_ip_reconnaissance.toml @@ -29,7 +29,7 @@ references = [ risk_score = 21 rule_id = "1d72d014-e2ab-4707-b056-9b96abe7b511" severity = "low" -tags = ["Elastic", "Network", "Threat Detection, Preventing and Hunting", "Post-Execution"] +tags = ["Elastic", "Network"] type = "query" query = ''' diff --git a/rules/promotions/external_alerts.toml b/rules/promotions/external_alerts.toml index b05e3036d..355f4d2e8 100644 --- a/rules/promotions/external_alerts.toml +++ b/rules/promotions/external_alerts.toml @@ -19,7 +19,7 @@ risk_score = 47 rule_id = "eb079c62-4481-4d6e-9643-3ca499df7aaa" rule_name_override = "message" severity = "medium" -tags = ["Elastic"] +tags = ["Elastic", "Windows", "APM", "Network", "macOS", "Linux"] timestamp_override = "event.ingested" type = "query" diff --git a/tests/test_all_rules.py b/tests/test_all_rules.py index 9c6112724..02d469986 100644 --- a/tests/test_all_rules.py +++ b/tests/test_all_rules.py @@ -186,3 +186,85 @@ class TestThreatMappings(unittest.TestCase): mitre_url = 'https://attack.mitre.org/resources/updates/' warning_str = f'The following rules are using deprecated ATT&CK techniques ({mitre_url}):\n{deprecated_str}' warnings.warn(warning_str) + + +class TestRuleTags(unittest.TestCase): + """Test tags data for rules.""" + + def test_casing_and_spacing(self): + """Ensure consistent and expected casing for controlled tags.""" + rules = rule_loader.load_rules().values() + + def normalize(s): + return ''.join(s.lower().split()) + + expected_tags = [ + 'APM', 'AWS', 'Asset Visibility', 'Azure', 'Configuration Audit', 'Continuous Monitoring', + 'Data Protection', 'Elastic', 'Endpoint', 'GCP', 'Identity and Access', 'Linux', 'Logging', 'ML', 'macOS', + 'Monitoring', 'Network', 'Okta', 'Packetbeat', 'Post-Execution', 'SecOps', 'Windows' + ] + expected_case = {normalize(t): t for t in expected_tags} + + for rule in rules: + rule_tags = rule.contents.get('tags') + if rule_tags: + invalid_tags = {t: expected_case[normalize(t)] for t in rule_tags + if normalize(t) in list(expected_case) and t != expected_case[normalize(t)]} + + if invalid_tags: + error_msg = f'{rule.id} - {rule.name} -> Invalid casing for expected tags\n' + error_msg += f'Actual tags: {", ".join(invalid_tags)}\n' + error_msg += f'Expected tags: {", ".join(invalid_tags.values())}' + self.fail(error_msg) + + def test_required_tags(self): + """Test that expected tags are present within rules.""" + rules = rule_loader.load_rules().values() + + # indexes considered; only those with obvious relationships included + # 'apm-*-transaction*', 'auditbeat-*', 'endgame-*', 'filebeat-*', 'logs-*', 'logs-aws*', + # 'logs-endpoint.alerts-*', 'logs-endpoint.events.*', 'logs-okta*', 'packetbeat-*', 'winlogbeat-*' + + required_tags_map = { + 'apm-*-transaction*': {'all': ['APM']}, + 'auditbeat-*': {'any': ['Windows', 'macOS', 'Linux']}, + 'endgame-*': {'all': ['Endpoint']}, + 'logs-aws*': {'all': ['AWS']}, + 'logs-endpoint.alerts-*': {'all': ['Endpoint']}, + 'logs-endpoint.events.*': {'any': ['Windows', 'macOS', 'Linux']}, + 'logs-okta*': {'all': ['Okta']}, + 'packetbeat-*': {'all': ['Network']}, + 'winlogbeat-*': {'all': ['Windows']} + } + + for rule in rules: + rule_tags = rule.contents.get('tags', []) + indexes = rule.contents.get('index', []) + error_msg = f'{rule.id} - {rule.name} -> Missing tags:\nActual tags: {", ".join(rule_tags)}' + + consolidated_optional_tags = [] + is_missing_any_tags = False + missing_required_tags = set() + + if 'Elastic' not in rule_tags: + missing_required_tags.add('Elastic') + + for index in indexes: + expected_tags = required_tags_map.get(index, {}) + expected_all = expected_tags.get('all', []) + expected_any = expected_tags.get('any', []) + + existing_any_tags = [t for t in rule_tags if t in expected_any] + if expected_any: + # consolidate optional any tags which are not in use + consolidated_optional_tags.extend(t for t in expected_any if t not in existing_any_tags) + + missing_required_tags.update(set(expected_all).difference(set(rule_tags))) + is_missing_any_tags = expected_any and not set(expected_any) & set(existing_any_tags) + + consolidated_optional_tags = [t for t in consolidated_optional_tags if t not in missing_required_tags] + error_msg += f'\nMissing all of: {", ".join(missing_required_tags)}' if missing_required_tags else '' + error_msg += f'\nMissing any of: {", " .join(consolidated_optional_tags)}' if is_missing_any_tags else '' + + if missing_required_tags or is_missing_any_tags: + self.fail(error_msg)