Add ATT&CK subtechniques to the schema (#337)
* Add ATT&CK subtechniques to the schema * Switch subtechniques to the 7.11 schema * Make technique still required * Lint fixes * Cleanup EQL constant * Trim more cruft * Restore EQL for 710 Co-authored-by: Justin Ibarra <brokensound77@users.noreply.github.com>
This commit is contained in:
@@ -10,6 +10,7 @@ from ..semver import Version
|
||||
from .v7_8 import ApiSchema78
|
||||
from .v7_9 import ApiSchema79
|
||||
from .v7_10 import ApiSchema710
|
||||
from .v7_11 import ApiSchema711
|
||||
|
||||
__all__ = (
|
||||
"all_schemas",
|
||||
@@ -23,6 +24,7 @@ all_schemas = [
|
||||
ApiSchema78,
|
||||
ApiSchema79,
|
||||
ApiSchema710,
|
||||
ApiSchema711,
|
||||
]
|
||||
|
||||
CurrentSchema = all_schemas[-1]
|
||||
|
||||
@@ -0,0 +1,53 @@
|
||||
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
# or more contributor license agreements. Licensed under the Elastic License;
|
||||
# you may not use this file except in compliance with the Elastic License.
|
||||
|
||||
"""Definitions for rule metadata and schemas."""
|
||||
|
||||
import jsl
|
||||
from .v7_8 import Threat as Threat78, MITRE_URL_PATTERN
|
||||
from .v7_10 import ApiSchema710
|
||||
|
||||
|
||||
class Threat711(Threat78):
|
||||
"""Threat framework mapping such as MITRE ATT&CK."""
|
||||
|
||||
class ThreatTechnique(Threat78.ThreatTechnique):
|
||||
"""Patched threat.technique to add threat.technique.subtechnique."""
|
||||
|
||||
class ThreatSubTechnique(jsl.Document):
|
||||
id = jsl.StringField(required=True)
|
||||
name = jsl.StringField(required=True)
|
||||
reference = jsl.StringField(MITRE_URL_PATTERN.format(type='techniques') + r"[0-9]+/")
|
||||
|
||||
subtechnique = jsl.ArrayField(jsl.DocumentField(ThreatSubTechnique), required=False)
|
||||
|
||||
# override the `technique` field definition
|
||||
technique = jsl.ArrayField(jsl.DocumentField(ThreatTechnique), required=True)
|
||||
|
||||
|
||||
class ApiSchema711(ApiSchema710):
|
||||
"""Schema for siem rule in API format."""
|
||||
|
||||
STACK_VERSION = "7.11"
|
||||
|
||||
threat = jsl.ArrayField(jsl.DocumentField(Threat711))
|
||||
|
||||
@classmethod
|
||||
def downgrade(cls, target_cls, document, role=None):
|
||||
"""Remove 7.11 additions from the rule."""
|
||||
# ignore when this method is inherited by subclasses
|
||||
if cls == ApiSchema711 and "threat" in document:
|
||||
threat_field = list(document["threat"])
|
||||
for threat in threat_field:
|
||||
if "technique" in threat:
|
||||
threat["technique"] = [t.copy() for t in threat["technique"]]
|
||||
|
||||
for technique in threat["technique"]:
|
||||
technique.pop("subtechnique", None)
|
||||
|
||||
document = document.copy()
|
||||
document["threat"] = threat_field
|
||||
|
||||
# now strip any any unrecognized properties
|
||||
return target_cls.strip_additional_properties(document, role)
|
||||
@@ -65,13 +65,13 @@ class Threat(jsl.Document):
|
||||
"""Threat framework mapping such as MITRE ATT&CK."""
|
||||
|
||||
class ThreatTactic(jsl.Document):
|
||||
id = jsl.StringField(enum=tactics_map.values())
|
||||
name = jsl.StringField(enum=tactics)
|
||||
id = jsl.StringField(enum=tactics_map.values(), required=True)
|
||||
name = jsl.StringField(enum=tactics, required=True)
|
||||
reference = jsl.StringField(MITRE_URL_PATTERN.format(type='tactics'))
|
||||
|
||||
class ThreatTechnique(jsl.Document):
|
||||
id = jsl.StringField(enum=list(technique_lookup))
|
||||
name = jsl.StringField()
|
||||
id = jsl.StringField(enum=list(technique_lookup), required=True)
|
||||
name = jsl.StringField(required=True)
|
||||
reference = jsl.StringField(MITRE_URL_PATTERN.format(type='techniques'))
|
||||
|
||||
framework = jsl.StringField(default='MITRE ATT&CK', required=True)
|
||||
|
||||
+51
-39
@@ -6,6 +6,7 @@
|
||||
import unittest
|
||||
import uuid
|
||||
import eql
|
||||
import copy
|
||||
|
||||
from detection_rules.rule import Rule
|
||||
from detection_rules.schemas import downgrade, CurrentSchema
|
||||
@@ -16,21 +17,46 @@ class TestSchemas(unittest.TestCase):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
cls.compatible_rule = Rule("test.toml", {
|
||||
"author": ["Elastic"],
|
||||
# expected contents for a downgraded rule
|
||||
cls.v78_kql = {
|
||||
"description": "test description",
|
||||
"index": ["filebeat-*"],
|
||||
"language": "kuery",
|
||||
"license": "Elastic License",
|
||||
"name": "test rule",
|
||||
"query": "process.name:test.query",
|
||||
"risk_score": 21,
|
||||
"rule_id": str(uuid.uuid4()),
|
||||
"severity": "low",
|
||||
"type": "query"
|
||||
})
|
||||
cls.versioned_rule = cls.compatible_rule.copy()
|
||||
"type": "query",
|
||||
"threat": [
|
||||
{
|
||||
"framework": "MITRE ATT&CK",
|
||||
"tactic": {
|
||||
"id": "TA0001",
|
||||
"name": "Execution",
|
||||
"reference": "https://attack.mitre.org/tactics/TA0001/"
|
||||
},
|
||||
"technique": [
|
||||
{
|
||||
"id": "T1059",
|
||||
"name": "Command and Scripting Interpreter",
|
||||
"reference": "https://attack.mitre.org/techniques/T1059/",
|
||||
}
|
||||
],
|
||||
}
|
||||
]
|
||||
}
|
||||
cls.v79_kql = dict(cls.v78_kql, author=["Elastic"], license="Elastic License")
|
||||
cls.v711_kql = copy.deepcopy(cls.v79_kql)
|
||||
cls.v711_kql["threat"][0]["technique"][0]["subtechnique"] = [{
|
||||
"id": "T1059.001",
|
||||
"name": "PowerShell",
|
||||
"reference": "https://attack.mitre.org/techniques/T1059/001/"
|
||||
}]
|
||||
|
||||
cls.versioned_rule = Rule("test.toml", copy.deepcopy(cls.v79_kql))
|
||||
cls.versioned_rule.contents["version"] = 10
|
||||
|
||||
cls.threshold_rule = Rule("test.toml", {
|
||||
"author": ["Elastic"],
|
||||
"description": "test description",
|
||||
@@ -50,47 +76,33 @@ class TestSchemas(unittest.TestCase):
|
||||
|
||||
def test_query_downgrade(self):
|
||||
"""Downgrade a standard KQL rule."""
|
||||
api_contents = self.compatible_rule.contents
|
||||
self.assertDictEqual(downgrade(api_contents, CurrentSchema.STACK_VERSION), api_contents)
|
||||
self.assertDictEqual(downgrade(api_contents, "7.9"), api_contents)
|
||||
self.assertDictEqual(downgrade(api_contents, "7.9.2"), api_contents)
|
||||
self.assertDictEqual(downgrade(api_contents, "7.8"), {
|
||||
# "author": ["Elastic"],
|
||||
"description": "test description",
|
||||
"index": ["filebeat-*"],
|
||||
"language": "kuery",
|
||||
# "license": "Elastic License",
|
||||
"name": "test rule",
|
||||
"query": "process.name:test.query",
|
||||
"risk_score": 21,
|
||||
"rule_id": self.compatible_rule.id,
|
||||
"severity": "low",
|
||||
"type": "query"
|
||||
})
|
||||
self.assertDictEqual(downgrade(self.v711_kql, "7.11"), self.v711_kql)
|
||||
self.assertDictEqual(downgrade(self.v711_kql, "7.9"), self.v79_kql)
|
||||
self.assertDictEqual(downgrade(self.v711_kql, "7.9.2"), self.v79_kql)
|
||||
self.assertDictEqual(downgrade(self.v711_kql, "7.8.1"), self.v78_kql)
|
||||
self.assertDictEqual(downgrade(self.v79_kql, "7.8"), self.v78_kql)
|
||||
self.assertDictEqual(downgrade(self.v79_kql, "7.8"), self.v78_kql)
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
downgrade(api_contents, "7.7")
|
||||
downgrade(self.v711_kql, "7.7")
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
downgrade(self.v79_kql, "7.7")
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
downgrade(self.v78_kql, "7.7")
|
||||
|
||||
def test_versioned_downgrade(self):
|
||||
"""Downgrade a KQL rule with version information"""
|
||||
api_contents = self.versioned_rule.contents
|
||||
self.assertDictEqual(downgrade(api_contents, CurrentSchema.STACK_VERSION), api_contents)
|
||||
self.assertDictEqual(downgrade(api_contents, "7.9"), api_contents)
|
||||
self.assertDictEqual(downgrade(api_contents, "7.9.2"), api_contents)
|
||||
self.assertDictEqual(downgrade(api_contents, "7.8"), {
|
||||
# "author": ["Elastic"],
|
||||
"description": "test description",
|
||||
"index": ["filebeat-*"],
|
||||
"language": "kuery",
|
||||
# "license": "Elastic License",
|
||||
"name": "test rule",
|
||||
"query": "process.name:test.query",
|
||||
"risk_score": 21,
|
||||
"rule_id": self.versioned_rule.id,
|
||||
"severity": "low",
|
||||
"type": "query",
|
||||
"version": 10,
|
||||
})
|
||||
|
||||
api_contents78 = api_contents.copy()
|
||||
api_contents78.pop("author")
|
||||
api_contents78.pop("license")
|
||||
|
||||
self.assertDictEqual(downgrade(api_contents, "7.8"), api_contents78)
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
downgrade(api_contents, "7.7")
|
||||
|
||||
Reference in New Issue
Block a user