diff --git a/detection_rules/rule.py b/detection_rules/rule.py index 6c0ec5d09..e1067d80c 100644 --- a/detection_rules/rule.py +++ b/detection_rules/rule.py @@ -481,9 +481,10 @@ class NewTermsRuleData(QueryRuleData): def validate(self, meta: RuleMeta) -> None: """Validates terms in new_terms_fields are valid ECS schema.""" - super(NewTermsRuleData, self).validate_query(meta) - feature_min_stack = '8.4.0' - feature_min_stack_extended_fields = '8.6.0' + kql_validator = KQLValidator(self.query) + kql_validator.validate(self, meta) + feature_min_stack = Version('8.4.0') + feature_min_stack_extended_fields = Version('8.6.0') # validate history window start field exists and is correct assert self.new_terms.history_window_start, \ @@ -498,20 +499,25 @@ class NewTermsRuleData(QueryRuleData): # ecs validation min_stack_version = meta.get("min_stack_version") if min_stack_version is None: - min_stack_version = str(Version(Version(load_current_package_version()) + (0,))) + min_stack_version = Version(Version(load_current_package_version()) + (0,)) + else: + min_stack_version = Version(min_stack_version) - stack_version = Version(feature_min_stack) - assert stack_version >= Version(feature_min_stack), \ + assert min_stack_version >= feature_min_stack, \ f"New Terms rule types only compatible with {feature_min_stack}+" - ecs_version = get_stack_schemas()[str(stack_version)]['ecs'] - ecs_schema = ecs.get_schema(ecs_version) + ecs_version = get_stack_schemas()[str(min_stack_version)]['ecs'] + beats_version = get_stack_schemas()[str(min_stack_version)]['beats'] + + # checks if new terms field(s) are in ecs, beats or non-ecs schemas + _, _, schema = kql_validator.get_beats_schema(self.index or [], beats_version, ecs_version) + for new_terms_field in self.new_terms.value: - assert new_terms_field in ecs_schema.keys(), \ - f"{new_terms_field} not found in ECS schema (version {ecs_version})" + assert new_terms_field in schema.keys(), \ + f"{new_terms_field} not found in ECS, Beats, or non-ecs schemas" # validates length of new_terms to stack version - https://github.com/elastic/kibana/issues/142862 - if stack_version >= Version(feature_min_stack) and \ - stack_version < Version(feature_min_stack_extended_fields): + if min_stack_version >= feature_min_stack and \ + min_stack_version < feature_min_stack_extended_fields: assert len(self.new_terms.value) == 1, \ f"new terms have a max limit of 1 for stack versions below {feature_min_stack_extended_fields}"