[Bug] Add Integration Schema Validation to NewTermsRuleData.validate Method (#3227)
* adjusted validation method to include integration schema checks * fixed linting errors * re-factored NewTermsRuleData and added unit testing
This commit is contained in:
@@ -647,52 +647,6 @@ class NewTermsRuleData(QueryRuleData):
|
||||
type: Literal["new_terms"]
|
||||
new_terms: NewTermsMapping
|
||||
|
||||
def validate(self, meta: RuleMeta) -> None:
|
||||
"""Validates terms in new_terms_fields are valid ECS schema."""
|
||||
|
||||
kql_validator = KQLValidator(self.query)
|
||||
kql_validator.validate(self, meta)
|
||||
feature_min_stack = Version.parse('8.4.0')
|
||||
feature_min_stack_extended_fields = Version.parse('8.6.0')
|
||||
current_package_version = Version.parse(load_current_package_version(), optional_minor_and_patch=True)
|
||||
|
||||
# validate history window start field exists and is correct
|
||||
assert self.new_terms.history_window_start, \
|
||||
"new terms field found with no history_window_start field defined"
|
||||
assert self.new_terms.history_window_start[0].field == "history_window_start", \
|
||||
f"{self.new_terms.history_window_start} should be 'history_window_start'"
|
||||
|
||||
# validate new terms and history window start fields is correct
|
||||
assert self.new_terms.field == "new_terms_fields", \
|
||||
f"{self.new_terms.field} should be 'new_terms_fields' for new_terms rule type"
|
||||
|
||||
# ecs validation
|
||||
min_stack_version = Version.parse(meta.get("min_stack_version")) if meta.get("min_stack_version") else None
|
||||
min_stack_version = current_package_version if min_stack_version is None or min_stack_version < \
|
||||
current_package_version else min_stack_version
|
||||
|
||||
assert min_stack_version >= feature_min_stack, \
|
||||
f"New Terms rule types only compatible with {feature_min_stack}+"
|
||||
ecs_version = get_stack_schemas()[str(min_stack_version)]['ecs']
|
||||
beats_version = get_stack_schemas()[str(min_stack_version)]['beats']
|
||||
|
||||
# checks if new terms field(s) are in ecs, beats or non-ecs schemas
|
||||
_, _, schema = kql_validator.get_beats_schema(self.index or [], beats_version, ecs_version)
|
||||
|
||||
for new_terms_field in self.new_terms.value:
|
||||
assert new_terms_field in schema.keys(), \
|
||||
f"{new_terms_field} not found in ECS, Beats, or non-ecs schemas"
|
||||
|
||||
# validates length of new_terms to stack version - https://github.com/elastic/kibana/issues/142862
|
||||
if min_stack_version >= feature_min_stack and \
|
||||
min_stack_version < feature_min_stack_extended_fields:
|
||||
assert len(self.new_terms.value) == 1, \
|
||||
f"new terms have a max limit of 1 for stack versions below {feature_min_stack_extended_fields}"
|
||||
|
||||
# validate fields are unique
|
||||
assert len(set(self.new_terms.value)) == len(self.new_terms.value), \
|
||||
f"new terms fields values are not unique - {self.new_terms.value}"
|
||||
|
||||
def transform(self, obj: dict) -> dict:
|
||||
"""Transforms new terms data to API format for Kibana."""
|
||||
|
||||
|
||||
@@ -1319,3 +1319,101 @@ class TestAlertSuppression(BaseRuleTest):
|
||||
if fld not in schema.keys():
|
||||
self.fail(f"{self.rule_str(rule)} alert suppression field {fld} not \
|
||||
found in ECS, Beats, or non-ecs schemas")
|
||||
|
||||
|
||||
class TestNewTerms(BaseRuleTest):
|
||||
"""Test new term rules."""
|
||||
|
||||
@unittest.skipIf(PACKAGE_STACK_VERSION < Version.parse("8.4.0"),
|
||||
"Test only applicable to 8.4+ stacks for new terms feature.")
|
||||
def test_history_window_start(self):
|
||||
"""Test new terms history window start field."""
|
||||
|
||||
for rule in self.production_rules:
|
||||
if rule.contents.data.type == "new_terms":
|
||||
|
||||
# validate history window start field exists and is correct
|
||||
assert rule.contents.data.new_terms.history_window_start, \
|
||||
"new terms field found with no history_window_start field defined"
|
||||
assert rule.contents.data.new_terms.history_window_start[0].field == "history_window_start", \
|
||||
f"{rule.contents.data.new_terms.history_window_start} should be 'history_window_start'"
|
||||
|
||||
@unittest.skipIf(PACKAGE_STACK_VERSION < Version.parse("8.4.0"),
|
||||
"Test only applicable to 8.4+ stacks for new terms feature.")
|
||||
def test_new_terms_field_exists(self):
|
||||
# validate new terms and history window start fields are correct
|
||||
for rule in self.production_rules:
|
||||
if rule.contents.data.type == "new_terms":
|
||||
assert rule.contents.data.new_terms.field == "new_terms_fields", \
|
||||
f"{rule.contents.data.new_terms.field} should be 'new_terms_fields' for new_terms rule type"
|
||||
|
||||
@unittest.skipIf(PACKAGE_STACK_VERSION < Version.parse("8.4.0"),
|
||||
"Test only applicable to 8.4+ stacks for new terms feature.")
|
||||
def test_new_terms_fields(self):
|
||||
"""Test new terms fields are schema validated."""
|
||||
# ecs validation
|
||||
for rule in self.production_rules:
|
||||
if rule.contents.data.type == "new_terms":
|
||||
meta = rule.contents.metadata
|
||||
feature_min_stack = Version.parse('8.4.0')
|
||||
current_package_version = Version.parse(load_current_package_version(), optional_minor_and_patch=True)
|
||||
min_stack_version = Version.parse(meta.get("min_stack_version")) if \
|
||||
meta.get("min_stack_version") else None
|
||||
min_stack_version = current_package_version if min_stack_version is None or min_stack_version < \
|
||||
current_package_version else min_stack_version
|
||||
|
||||
assert min_stack_version >= feature_min_stack, \
|
||||
f"New Terms rule types only compatible with {feature_min_stack}+"
|
||||
ecs_version = get_stack_schemas()[str(min_stack_version)]['ecs']
|
||||
beats_version = get_stack_schemas()[str(min_stack_version)]['beats']
|
||||
|
||||
# checks if new terms field(s) are in ecs, beats non-ecs or integration schemas
|
||||
queryvalidator = QueryValidator(rule.contents.data.query)
|
||||
_, _, schema = queryvalidator.get_beats_schema([], beats_version, ecs_version)
|
||||
integration_manifests = load_integrations_manifests()
|
||||
integration_schemas = load_integrations_schemas()
|
||||
integration_tags = meta.get("integration")
|
||||
if integration_tags:
|
||||
for tag in integration_tags:
|
||||
latest_tag_compat_ver, _ = find_latest_compatible_version(
|
||||
package=tag,
|
||||
integration="",
|
||||
rule_stack_version=min_stack_version,
|
||||
packages_manifest=integration_manifests)
|
||||
if latest_tag_compat_ver:
|
||||
integration_schema = integration_schemas[tag][latest_tag_compat_ver]
|
||||
for policy_template in integration_schema.keys():
|
||||
schema.update(**integration_schemas[tag][latest_tag_compat_ver][policy_template])
|
||||
for new_terms_field in rule.contents.data.new_terms.value:
|
||||
assert new_terms_field in schema.keys(), \
|
||||
f"{new_terms_field} not found in ECS, Beats, or non-ecs schemas"
|
||||
|
||||
@unittest.skipIf(PACKAGE_STACK_VERSION < Version.parse("8.4.0"),
|
||||
"Test only applicable to 8.4+ stacks for new terms feature.")
|
||||
def test_new_terms_max_limit(self):
|
||||
"""Test new terms max limit."""
|
||||
# validates length of new_terms to stack version - https://github.com/elastic/kibana/issues/142862
|
||||
for rule in self.production_rules:
|
||||
if rule.contents.data.type == "new_terms":
|
||||
meta = rule.contents.metadata
|
||||
feature_min_stack = Version.parse('8.4.0')
|
||||
feature_min_stack_extended_fields = Version.parse('8.6.0')
|
||||
current_package_version = Version.parse(load_current_package_version(), optional_minor_and_patch=True)
|
||||
min_stack_version = Version.parse(meta.get("min_stack_version")) if \
|
||||
meta.get("min_stack_version") else None
|
||||
min_stack_version = current_package_version if min_stack_version is None or min_stack_version < \
|
||||
current_package_version else min_stack_version
|
||||
if min_stack_version >= feature_min_stack and \
|
||||
min_stack_version < feature_min_stack_extended_fields:
|
||||
assert len(rule.contents.data.new_terms.value) == 1, \
|
||||
f"new terms have a max limit of 1 for stack versions below {feature_min_stack_extended_fields}"
|
||||
|
||||
@unittest.skipIf(PACKAGE_STACK_VERSION < Version.parse("8.6.0"),
|
||||
"Test only applicable to 8.4+ stacks for new terms feature.")
|
||||
def test_new_terms_fields_unique(self):
|
||||
"""Test new terms fields are unique."""
|
||||
# validate fields are unique
|
||||
for rule in self.production_rules:
|
||||
if rule.contents.data.type == "new_terms":
|
||||
assert len(set(rule.contents.data.new_terms.value)) == len(rule.contents.data.new_terms.value), \
|
||||
f"new terms fields values are not unique - {rule.contents.data.new_terms.value}"
|
||||
|
||||
Reference in New Issue
Block a user