Update schema for threshold rule type for 7.12 (#976)

* Update schema for threshold rule type for 7.12
* add downgrade function to drop new fields
* update existing threshold rules
This commit is contained in:
Justin Ibarra
2021-03-05 14:35:50 -09:00
committed by GitHub
parent 0ef7d87b34
commit 0e0b2ea1a4
13 changed files with 86 additions and 18 deletions
+47
View File
@@ -5,6 +5,9 @@
"""Definitions for rule metadata and schemas."""
import jsl
from .v7_9 import ThresholdMapping
from .v7_11 import ApiSchema711
@@ -12,3 +15,47 @@ class ApiSchema712(ApiSchema711):
"""Schema for siem rule in API format."""
STACK_VERSION = "7.12"
# there might be a bug in jsl that requires us to redefine these here
query_scope = ApiSchema711.query_scope
saved_id_scope = ApiSchema711.saved_id_scope
ml_scope = ApiSchema711.ml_scope
eql_scope = ApiSchema711.eql_scope
class ThresholdMappingV12(ThresholdMapping):
"""7.12 schema for threshold mapping."""
class ThresholdCardinality(jsl.Document):
"""Threshold cardinality field."""
field = jsl.StringField(required=True)
value = jsl.IntField(minimum=1, required=True)
field = jsl.ArrayField(jsl.StringField(required=True, default=""))
cardinality = jsl.DocumentField(ThresholdCardinality, required=False)
threshold_scope = ApiSchema711.threshold_scope
threshold_scope.threshold = jsl.DocumentField(ThresholdMappingV12, required=True)
@classmethod
def downgrade(cls, target_cls, document, role=None):
"""Remove 7.12 additions from the rule."""
# ignore when this method is inherited by subclasses
if cls in (ApiSchema712, ApiSchema712.versioned()) and 'threshold' in document:
threshold = document['threshold']
threshold_field = threshold['field']
# attempt to convert threshold field to a string
if len(threshold_field) > 1:
raise ValueError('Cannot downgrade a threshold rule that has multiple threshold fields defined')
if threshold.get('cardinality', {}).get('field') or threshold.get('cardinality', {}).get('value'):
raise ValueError('Cannot downgrade a threshold rule that has a defined cardinality')
document = document.copy()
document["threshold"] = document["threshold"].copy()
# if cardinality was defined with no field or value
document['threshold'].pop('cardinality', None)
document["threshold"]["field"] = document["threshold"]["field"][0]
# now strip any any unrecognized properties
return target_cls.strip_additional_properties(document, role)
@@ -47,6 +47,6 @@ name = "Credential Access"
reference = "https://attack.mitre.org/tactics/TA0006/"
[rule.threshold]
field = ""
field = [""]
value = 25
@@ -48,6 +48,6 @@ name = "Credential Access"
reference = "https://attack.mitre.org/tactics/TA0006/"
[rule.threshold]
field = "cloud.account.id"
field = ["cloud.account.id"]
value = 10
@@ -54,6 +54,6 @@ name = "Privilege Escalation"
reference = "https://attack.mitre.org/tactics/TA0004/"
[rule.threshold]
field = "host.hostname"
field = ["host.hostname"]
value = 100
@@ -40,6 +40,6 @@ name = "Credential Access"
reference = "https://attack.mitre.org/tactics/TA0006/"
[rule.threshold]
field = "host.id"
field = ["host.id"]
value = 20
@@ -46,6 +46,6 @@ name = "Credential Access"
reference = "https://attack.mitre.org/tactics/TA0006/"
[rule.threshold]
field = "user.id"
field = ["user.id"]
value = 10
@@ -47,6 +47,6 @@ name = "Credential Access"
reference = "https://attack.mitre.org/tactics/TA0006/"
[rule.threshold]
field = "source.ip"
field = ["source.ip"]
value = 25
@@ -45,6 +45,6 @@ name = "Credential Access"
reference = "https://attack.mitre.org/tactics/TA0006/"
[rule.threshold]
field = "okta.actor.id"
field = ["okta.actor.id"]
value = 3
@@ -50,6 +50,6 @@ name = "Credential Access"
reference = "https://attack.mitre.org/tactics/TA0006/"
[rule.threshold]
field = "source.ip"
field = ["source.ip"]
value = 25
@@ -80,6 +80,6 @@ name = "Initial Access"
reference = "https://attack.mitre.org/tactics/TA0001/"
[rule.threshold]
field = "okta.actor.id"
field = ["okta.actor.id"]
value = 5
@@ -40,6 +40,6 @@ name = "Command and Control"
reference = "https://attack.mitre.org/tactics/TA0011/"
[rule.threshold]
field = "host.id"
field = ["host.id"]
value = 15
@@ -45,6 +45,6 @@ name = "Defense Evasion"
reference = "https://attack.mitre.org/tactics/TA0005/"
[rule.threshold]
field = "host.id"
field = ["host.id"]
value = 10
+28 -7
View File
@@ -66,7 +66,7 @@ class TestSchemas(unittest.TestCase):
cls.versioned_rule = Rule("test.toml", copy.deepcopy(cls.v79_kql))
cls.versioned_rule.contents["version"] = 10
cls.threshold_rule = Rule("test.toml", {
cls.v79_threshold_contents = {
"author": ["Elastic"],
"description": "test description",
"language": "kuery",
@@ -81,7 +81,15 @@ class TestSchemas(unittest.TestCase):
"value": 75,
},
"type": "threshold",
})
}
cls.v712_threshold_rule = Rule('test.toml', dict(copy.deepcopy(cls.v79_threshold_contents), threshold={
'field': ['destination.bytes', 'process.args'],
'value': 75,
'cardinality': {
'field': 'user.name',
'value': 2
}
}))
def test_query_downgrade(self):
"""Downgrade a standard KQL rule."""
@@ -118,16 +126,29 @@ class TestSchemas(unittest.TestCase):
def test_threshold_downgrade(self):
"""Downgrade a threshold rule that was first introduced in 7.9."""
api_contents = self.threshold_rule.contents
api_contents = self.v712_threshold_rule.contents
self.assertDictEqual(downgrade(api_contents, CurrentSchema.STACK_VERSION), api_contents)
self.assertDictEqual(downgrade(api_contents, "7.9"), api_contents)
self.assertDictEqual(downgrade(api_contents, "7.9.2"), api_contents)
self.assertDictEqual(downgrade(api_contents, CurrentSchema.STACK_VERSION + '.1'), api_contents)
exc_msg = 'Cannot downgrade a threshold rule that has multiple threshold fields defined'
with self.assertRaisesRegex(ValueError, exc_msg):
downgrade(api_contents, '7.9')
v712_threshold_contents_single_field = copy.deepcopy(api_contents)
v712_threshold_contents_single_field['threshold']['field'].pop()
with self.assertRaisesRegex(ValueError, "Cannot downgrade a threshold rule that has a defined cardinality"):
downgrade(v712_threshold_contents_single_field, "7.9")
v712_no_cardinality = copy.deepcopy(v712_threshold_contents_single_field)
v712_no_cardinality['threshold'].pop('cardinality')
self.assertEqual(downgrade(v712_no_cardinality, "7.9"), self.v79_threshold_contents)
with self.assertRaises(ValueError):
downgrade(api_contents, "7.7")
downgrade(v712_no_cardinality, "7.7")
with self.assertRaisesRegex(ValueError, "Unsupported rule type"):
downgrade(api_contents, "7.8")
downgrade(v712_no_cardinality, "7.8")
def test_eql_validation(self):
base_fields = {