[Bug] Add Non-ECS Checks to New Terms Rule Validation (#2435)

* initial commit with changes to new terms validation

* adjusted validation to call KQLValidator for flattened ECS variable

* changed call to KQLValidator instead of super; validate from same variable

* removed testing rules

* removed commented line

* Version() called on all string versions prior to comparison logic

* adjusted assert error punctuation
This commit is contained in:
Terrance DeJesus
2022-12-19 12:44:42 -05:00
committed by GitHub
parent 80548b97f4
commit e9169b4cfa
+18 -12
View File
@@ -481,9 +481,10 @@ class NewTermsRuleData(QueryRuleData):
def validate(self, meta: RuleMeta) -> None:
"""Validates terms in new_terms_fields are valid ECS schema."""
super(NewTermsRuleData, self).validate_query(meta)
feature_min_stack = '8.4.0'
feature_min_stack_extended_fields = '8.6.0'
kql_validator = KQLValidator(self.query)
kql_validator.validate(self, meta)
feature_min_stack = Version('8.4.0')
feature_min_stack_extended_fields = Version('8.6.0')
# validate history window start field exists and is correct
assert self.new_terms.history_window_start, \
@@ -498,20 +499,25 @@ class NewTermsRuleData(QueryRuleData):
# ecs validation
min_stack_version = meta.get("min_stack_version")
if min_stack_version is None:
min_stack_version = str(Version(Version(load_current_package_version()) + (0,)))
min_stack_version = Version(Version(load_current_package_version()) + (0,))
else:
min_stack_version = Version(min_stack_version)
stack_version = Version(feature_min_stack)
assert stack_version >= Version(feature_min_stack), \
assert min_stack_version >= feature_min_stack, \
f"New Terms rule types only compatible with {feature_min_stack}+"
ecs_version = get_stack_schemas()[str(stack_version)]['ecs']
ecs_schema = ecs.get_schema(ecs_version)
ecs_version = get_stack_schemas()[str(min_stack_version)]['ecs']
beats_version = get_stack_schemas()[str(min_stack_version)]['beats']
# checks if new terms field(s) are in ecs, beats or non-ecs schemas
_, _, schema = kql_validator.get_beats_schema(self.index or [], beats_version, ecs_version)
for new_terms_field in self.new_terms.value:
assert new_terms_field in ecs_schema.keys(), \
f"{new_terms_field} not found in ECS schema (version {ecs_version})"
assert new_terms_field in schema.keys(), \
f"{new_terms_field} not found in ECS, Beats, or non-ecs schemas"
# validates length of new_terms to stack version - https://github.com/elastic/kibana/issues/142862
if stack_version >= Version(feature_min_stack) and \
stack_version < Version(feature_min_stack_extended_fields):
if min_stack_version >= feature_min_stack and \
min_stack_version < feature_min_stack_extended_fields:
assert len(self.new_terms.value) == 1, \
f"new terms have a max limit of 1 for stack versions below {feature_min_stack_extended_fields}"