Add ATT&CK sub-technique support to CLI (#614)
* Add Mitre sub-technique support to CLI * Add subtechnique enum to schema * Add test to prevent duplicative tactics in mapping
This commit is contained in:
@@ -66,6 +66,8 @@ for tactic in matrix:
|
||||
|
||||
technique_lookup = OrderedDict(sorted(technique_lookup.items()))
|
||||
techniques = sorted({v['name'] for k, v in technique_lookup.items()})
|
||||
technique_id_list = [t for t in technique_lookup if '.' not in t]
|
||||
sub_technique_id_list = [t for t in technique_lookup if '.' in t]
|
||||
|
||||
|
||||
def refresh_attack_data(save=True):
|
||||
@@ -111,15 +113,28 @@ def build_threat_map_entry(tactic: str, *technique_ids: str) -> dict:
|
||||
"""Build rule threat map from technique IDs."""
|
||||
url_base = 'https://attack.mitre.org/{type}/{id}/'
|
||||
tactic_id = tactics_map[tactic]
|
||||
tech_entries = {}
|
||||
|
||||
def make_entry(_id):
|
||||
e = {
|
||||
'id': _id,
|
||||
'name': technique_lookup[_id]['name'],
|
||||
'reference': url_base.format(type='techniques', id=_id.replace('.', '/'))
|
||||
}
|
||||
return e
|
||||
|
||||
for tid in technique_ids:
|
||||
# sub-techniques
|
||||
if '.' in tid:
|
||||
parent_technique, _ = tid.split('.', 1)
|
||||
tech_entries.setdefault(parent_technique, make_entry(parent_technique))
|
||||
tech_entries[parent_technique].setdefault('subtechnique', []).append(make_entry(tid))
|
||||
else:
|
||||
tech_entries.setdefault(tid, make_entry(tid))
|
||||
|
||||
entry = {
|
||||
'framework': 'MITRE ATT&CK',
|
||||
'technique': [
|
||||
{
|
||||
'id': tid,
|
||||
'name': technique_lookup[tid]['name'],
|
||||
'reference': url_base.format(type='techniques', id=tid)
|
||||
} for tid in technique_ids
|
||||
],
|
||||
'technique': sorted(tech_entries.values(), key=lambda x: x['id']),
|
||||
'tactic': {
|
||||
'id': tactic_id,
|
||||
'name': tactic,
|
||||
|
||||
@@ -133,8 +133,8 @@ class PackageDocument(xlsxwriter.Workbook):
|
||||
)
|
||||
|
||||
for row, rule in enumerate(rules, 1):
|
||||
tactic_names, _, _, technique_ids = rule.get_flat_mitre()
|
||||
rule_contents = {'tactics': tactic_names, 'techniques': technique_ids}
|
||||
flat_mitre = rule.get_flat_mitre()
|
||||
rule_contents = {'tactics': flat_mitre['tactic_names'], 'techniques': flat_mitre['technique_ids']}
|
||||
rule_contents.update(rule.contents.copy())
|
||||
|
||||
for column, field in enumerate(metadata_fields):
|
||||
|
||||
+22
-5
@@ -112,13 +112,30 @@ class Rule(object):
|
||||
tactic_ids = []
|
||||
technique_ids = set()
|
||||
technique_names = set()
|
||||
sub_technique_ids = set()
|
||||
sub_technique_names = set()
|
||||
|
||||
for entry in self.contents.get('threat', []):
|
||||
tactic_names.append(entry['tactic']['name'])
|
||||
tactic_ids.append(entry['tactic']['id'])
|
||||
technique_names.update([t['name'] for t in entry['technique']])
|
||||
technique_ids.update([t['id'] for t in entry['technique']])
|
||||
|
||||
return sorted(tactic_names), sorted(tactic_ids), sorted(technique_names), sorted(technique_ids)
|
||||
for technique in entry['technique']:
|
||||
technique_names.add(technique['name'])
|
||||
technique_ids.add(technique['id'])
|
||||
sub_technique = technique.get('subtechnique', [])
|
||||
|
||||
sub_technique_ids.update(st['id'] for st in sub_technique)
|
||||
sub_technique_names.update(st['name'] for st in sub_technique)
|
||||
|
||||
flat = {
|
||||
'tactic_names': sorted(tactic_names),
|
||||
'tactic_ids': sorted(tactic_ids),
|
||||
'technique_names': sorted(technique_names),
|
||||
'technique_ids': sorted(technique_ids),
|
||||
'sub_technique_names': sorted(sub_technique_names),
|
||||
'sub_technique_ids': sorted(sub_technique_ids)
|
||||
}
|
||||
return flat
|
||||
|
||||
@classmethod
|
||||
def get_unique_query_fields(cls, rule_contents):
|
||||
@@ -344,8 +361,8 @@ class Rule(object):
|
||||
|
||||
while click.confirm('add mitre tactic?'):
|
||||
tactic = schema_prompt('mitre tactic name', type='string', enum=tactics, required=True)
|
||||
technique_ids = schema_prompt(f'technique IDs for {tactic}', type='array', required=True,
|
||||
enum=list(technique_lookup))
|
||||
technique_ids = schema_prompt(f'technique or sub-technique IDs for {tactic}', type='array',
|
||||
required=True, enum=list(technique_lookup))
|
||||
|
||||
try:
|
||||
threat_map.append(build_threat_map_entry(tactic, *technique_ids))
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
import jsl
|
||||
from .v7_8 import Threat as Threat78, MITRE_URL_PATTERN
|
||||
from .v7_10 import ApiSchema710
|
||||
from ..attack import sub_technique_id_list
|
||||
|
||||
|
||||
class Threat711(Threat78):
|
||||
@@ -16,7 +17,7 @@ class Threat711(Threat78):
|
||||
"""Patched threat.technique to add threat.technique.subtechnique."""
|
||||
|
||||
class ThreatSubTechnique(jsl.Document):
|
||||
id = jsl.StringField(required=True)
|
||||
id = jsl.StringField(enum=sub_technique_id_list, required=True)
|
||||
name = jsl.StringField(required=True)
|
||||
reference = jsl.StringField(MITRE_URL_PATTERN.format(type='techniques') + r"[0-9]+/")
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
import jsl
|
||||
|
||||
from .base import BaseApiSchema, MarkdownField
|
||||
from ..attack import tactics, tactics_map, technique_lookup
|
||||
from ..attack import tactics, tactics_map, technique_id_list
|
||||
|
||||
|
||||
INTERVAL_PATTERN = r'\d+[mshd]'
|
||||
@@ -70,7 +70,7 @@ class Threat(jsl.Document):
|
||||
reference = jsl.StringField(MITRE_URL_PATTERN.format(type='tactics'))
|
||||
|
||||
class ThreatTechnique(jsl.Document):
|
||||
id = jsl.StringField(enum=list(technique_lookup), required=True)
|
||||
id = jsl.StringField(enum=technique_id_list, required=True)
|
||||
name = jsl.StringField(required=True)
|
||||
reference = jsl.StringField(MITRE_URL_PATTERN.format(type='techniques'))
|
||||
|
||||
|
||||
@@ -43,19 +43,11 @@ id = "T1064"
|
||||
name = "Scripting"
|
||||
reference = "https://attack.mitre.org/techniques/T1064/"
|
||||
|
||||
|
||||
[rule.threat.tactic]
|
||||
id = "TA0002"
|
||||
name = "Execution"
|
||||
reference = "https://attack.mitre.org/tactics/TA0002/"
|
||||
[[rule.threat]]
|
||||
framework = "MITRE ATT&CK"
|
||||
[[rule.threat.technique]]
|
||||
id = "T1086"
|
||||
name = "PowerShell"
|
||||
reference = "https://attack.mitre.org/techniques/T1086/"
|
||||
|
||||
|
||||
[rule.threat.tactic]
|
||||
id = "TA0002"
|
||||
name = "Execution"
|
||||
|
||||
@@ -36,19 +36,11 @@ id = "T1140"
|
||||
name = "Deobfuscate/Decode Files or Information"
|
||||
reference = "https://attack.mitre.org/techniques/T1140/"
|
||||
|
||||
|
||||
[rule.threat.tactic]
|
||||
id = "TA0005"
|
||||
name = "Defense Evasion"
|
||||
reference = "https://attack.mitre.org/tactics/TA0005/"
|
||||
[[rule.threat]]
|
||||
framework = "MITRE ATT&CK"
|
||||
[[rule.threat.technique]]
|
||||
id = "T1027"
|
||||
name = "Obfuscated Files or Information"
|
||||
reference = "https://attack.mitre.org/techniques/T1027/"
|
||||
|
||||
|
||||
[rule.threat.tactic]
|
||||
id = "TA0005"
|
||||
name = "Defense Evasion"
|
||||
|
||||
@@ -36,19 +36,11 @@ id = "T1140"
|
||||
name = "Deobfuscate/Decode Files or Information"
|
||||
reference = "https://attack.mitre.org/techniques/T1140/"
|
||||
|
||||
|
||||
[rule.threat.tactic]
|
||||
id = "TA0005"
|
||||
name = "Defense Evasion"
|
||||
reference = "https://attack.mitre.org/tactics/TA0005/"
|
||||
[[rule.threat]]
|
||||
framework = "MITRE ATT&CK"
|
||||
[[rule.threat.technique]]
|
||||
id = "T1027"
|
||||
name = "Obfuscated Files or Information"
|
||||
reference = "https://attack.mitre.org/techniques/T1027/"
|
||||
|
||||
|
||||
[rule.threat.tactic]
|
||||
id = "TA0005"
|
||||
name = "Defense Evasion"
|
||||
|
||||
@@ -35,19 +35,11 @@ id = "T1140"
|
||||
name = "Deobfuscate/Decode Files or Information"
|
||||
reference = "https://attack.mitre.org/techniques/T1140/"
|
||||
|
||||
|
||||
[rule.threat.tactic]
|
||||
id = "TA0005"
|
||||
name = "Defense Evasion"
|
||||
reference = "https://attack.mitre.org/tactics/TA0005/"
|
||||
[[rule.threat]]
|
||||
framework = "MITRE ATT&CK"
|
||||
[[rule.threat.technique]]
|
||||
id = "T1027"
|
||||
name = "Obfuscated Files or Information"
|
||||
reference = "https://attack.mitre.org/techniques/T1027/"
|
||||
|
||||
|
||||
[rule.threat.tactic]
|
||||
id = "TA0005"
|
||||
name = "Defense Evasion"
|
||||
|
||||
@@ -30,44 +30,39 @@ type = "query"
|
||||
|
||||
query = '''
|
||||
event.category:(network or network_traffic) and network.transport:tcp and destination.port:(9001 or 9030) and
|
||||
source.ip:(
|
||||
10.0.0.0/8 or
|
||||
172.16.0.0/12 or
|
||||
192.168.0.0/16
|
||||
) and
|
||||
not destination.ip:(
|
||||
10.0.0.0/8 or
|
||||
127.0.0.0/8 or
|
||||
169.254.0.0/16 or
|
||||
172.16.0.0/12 or
|
||||
192.168.0.0/16 or
|
||||
224.0.0.0/4 or
|
||||
"::1" or
|
||||
"FE80::/10" or
|
||||
"FF00::/8"
|
||||
)
|
||||
source.ip:(10.0.0.0/8 or
|
||||
172.16.0.0/12 or
|
||||
192.168.0.0/16) and
|
||||
|
||||
not destination.ip:(10.0.0.0/8 or
|
||||
127.0.0.0/8 or
|
||||
169.254.0.0/16 or
|
||||
172.16.0.0/12 or
|
||||
192.168.0.0/16 or
|
||||
224.0.0.0/4 or
|
||||
"::1" or
|
||||
"FE80::/10" or
|
||||
"FF00::/8")
|
||||
'''
|
||||
|
||||
|
||||
[[rule.threat]]
|
||||
framework = "MITRE ATT&CK"
|
||||
|
||||
[[rule.threat.technique]]
|
||||
id = "T1043"
|
||||
name = "Commonly Used Port"
|
||||
reference = "https://attack.mitre.org/techniques/T1043/"
|
||||
|
||||
|
||||
[rule.threat.tactic]
|
||||
id = "TA0011"
|
||||
name = "Command and Control"
|
||||
reference = "https://attack.mitre.org/tactics/TA0011/"
|
||||
[[rule.threat]]
|
||||
framework = "MITRE ATT&CK"
|
||||
[[rule.threat.technique]]
|
||||
id = "T1188"
|
||||
name = "Multi-hop Proxy"
|
||||
reference = "https://attack.mitre.org/techniques/T1188/"
|
||||
id = "T1090"
|
||||
name = "Proxy"
|
||||
reference = "https://attack.mitre.org/techniques/T1090/"
|
||||
|
||||
[[rule.threat.technique.subtechnique]]
|
||||
id = "T1090.003"
|
||||
name = "Multi-hop Proxy"
|
||||
reference = "https://attack.mitre.org/techniques/T1090/003/"
|
||||
|
||||
[rule.threat.tactic]
|
||||
id = "TA0011"
|
||||
|
||||
@@ -34,20 +34,13 @@ id = "T1036"
|
||||
name = "Masquerading"
|
||||
reference = "https://attack.mitre.org/techniques/T1036/"
|
||||
|
||||
|
||||
[rule.threat.tactic]
|
||||
id = "TA0005"
|
||||
name = "Defense Evasion"
|
||||
reference = "https://attack.mitre.org/tactics/TA0005/"
|
||||
[[rule.threat]]
|
||||
framework = "MITRE ATT&CK"
|
||||
[[rule.threat.technique]]
|
||||
id = "T1055"
|
||||
name = "Process Injection"
|
||||
reference = "https://attack.mitre.org/techniques/T1055/"
|
||||
|
||||
|
||||
[rule.threat.tactic]
|
||||
id = "TA0005"
|
||||
name = "Defense Evasion"
|
||||
reference = "https://attack.mitre.org/tactics/TA0005/"
|
||||
|
||||
|
||||
@@ -30,19 +30,11 @@ id = "T1059"
|
||||
name = "Command and Scripting Interpreter"
|
||||
reference = "https://attack.mitre.org/techniques/T1059/"
|
||||
|
||||
|
||||
[rule.threat.tactic]
|
||||
id = "TA0002"
|
||||
name = "Execution"
|
||||
reference = "https://attack.mitre.org/tactics/TA0002/"
|
||||
[[rule.threat]]
|
||||
framework = "MITRE ATT&CK"
|
||||
[[rule.threat.technique]]
|
||||
id = "T1086"
|
||||
name = "PowerShell"
|
||||
reference = "https://attack.mitre.org/techniques/T1086/"
|
||||
|
||||
|
||||
[rule.threat.tactic]
|
||||
id = "TA0002"
|
||||
name = "Execution"
|
||||
|
||||
@@ -33,20 +33,13 @@ id = "T1059"
|
||||
name = "Command and Scripting Interpreter"
|
||||
reference = "https://attack.mitre.org/techniques/T1059/"
|
||||
|
||||
|
||||
[rule.threat.tactic]
|
||||
id = "TA0002"
|
||||
name = "Execution"
|
||||
reference = "https://attack.mitre.org/tactics/TA0002/"
|
||||
[[rule.threat]]
|
||||
framework = "MITRE ATT&CK"
|
||||
[[rule.threat.technique]]
|
||||
id = "T1086"
|
||||
name = "PowerShell"
|
||||
reference = "https://attack.mitre.org/techniques/T1086/"
|
||||
|
||||
|
||||
[rule.threat.tactic]
|
||||
id = "TA0002"
|
||||
name = "Execution"
|
||||
reference = "https://attack.mitre.org/tactics/TA0002/"
|
||||
|
||||
|
||||
@@ -58,20 +58,13 @@ id = "T1193"
|
||||
name = "Spearphishing Attachment"
|
||||
reference = "https://attack.mitre.org/techniques/T1193/"
|
||||
|
||||
|
||||
[rule.threat.tactic]
|
||||
id = "TA0002"
|
||||
name = "Execution"
|
||||
reference = "https://attack.mitre.org/tactics/TA0002/"
|
||||
[[rule.threat]]
|
||||
framework = "MITRE ATT&CK"
|
||||
[[rule.threat.technique]]
|
||||
id = "T1047"
|
||||
name = "Windows Management Instrumentation"
|
||||
reference = "https://attack.mitre.org/techniques/T1047/"
|
||||
|
||||
|
||||
[rule.threat.tactic]
|
||||
id = "TA0002"
|
||||
name = "Execution"
|
||||
reference = "https://attack.mitre.org/tactics/TA0002/"
|
||||
|
||||
|
||||
@@ -140,6 +140,8 @@ class TestThreatMappings(unittest.TestCase):
|
||||
for entry in threat_mapping:
|
||||
tactic = entry.get('tactic')
|
||||
techniques = entry.get('technique', [])
|
||||
|
||||
# tactic
|
||||
expected_tactic = attack.tactics_map[tactic['name']]
|
||||
self.assertEqual(expected_tactic, tactic['id'],
|
||||
f'ATT&CK tactic mapping error for rule: {rule.id} - {rule.name} ->\n'
|
||||
@@ -152,6 +154,7 @@ class TestThreatMappings(unittest.TestCase):
|
||||
f'tactic ID {tactic["id"]} does not match the reference URL ID '
|
||||
f'{tactic["reference"]}')
|
||||
|
||||
# techniques
|
||||
for technique in techniques:
|
||||
expected_technique = attack.technique_lookup[technique['id']]['name']
|
||||
self.assertEqual(expected_technique, technique['name'],
|
||||
@@ -165,6 +168,37 @@ class TestThreatMappings(unittest.TestCase):
|
||||
f'technique ID {technique["id"]} does not match the reference URL ID '
|
||||
f'{technique["reference"]}')
|
||||
|
||||
# sub-techniques
|
||||
sub_techniques = technique.get('subtechnique')
|
||||
if sub_techniques:
|
||||
for sub_technique in sub_techniques:
|
||||
expected_sub_technique = attack.technique_lookup[sub_technique['id']]['name']
|
||||
self.assertEqual(expected_sub_technique, sub_technique['name'],
|
||||
f'ATT&CK sub-technique mapping error for rule: {rule.id} - {rule.name} ->\n' # noqa: E501
|
||||
f'expected: {expected_sub_technique} for {sub_technique["id"]}\n'
|
||||
f'actual: {sub_technique["name"]}')
|
||||
|
||||
sub_technique_reference_id = '.'.join(
|
||||
sub_technique['reference'].rstrip('/').split('/')[-2:])
|
||||
self.assertEqual(sub_technique['id'], sub_technique_reference_id,
|
||||
f'ATT&CK sub-technique mapping error for rule: {rule.id} - {rule.name} ->\n' # noqa: E501
|
||||
f'sub-technique ID {sub_technique["id"]} does not match the reference URL ID ' # noqa: E501
|
||||
f'{sub_technique["reference"]}')
|
||||
|
||||
def test_duplicated_tactics(self):
|
||||
"""Check that a tactic is only defined once."""
|
||||
rules = rule_loader.load_rules().values()
|
||||
|
||||
for rule in rules:
|
||||
rule_info = f'{rule.id} - {rule.name}'
|
||||
threat_mapping = rule.contents.get('threat', [])
|
||||
tactics = [t['tactic']['name'] for t in threat_mapping]
|
||||
duplicates = sorted(set(t for t in tactics if tactics.count(t) > 1))
|
||||
|
||||
if duplicates:
|
||||
self.fail(f'{rule_info} -> duplicate tactics defined for {duplicates}. '
|
||||
f'Flatten to a single entry per tactic')
|
||||
|
||||
def test_technique_deprecations(self):
|
||||
"""Check and warn for use of any ATT&CK techniques that have been deprecated."""
|
||||
deprecated = {} # {technique: rules_using_them
|
||||
|
||||
Reference in New Issue
Block a user