diff --git a/detection_rules/rule.py b/detection_rules/rule.py index 6a8beae0d..f02254934 100644 --- a/detection_rules/rule.py +++ b/detection_rules/rule.py @@ -647,52 +647,6 @@ class NewTermsRuleData(QueryRuleData): type: Literal["new_terms"] new_terms: NewTermsMapping - def validate(self, meta: RuleMeta) -> None: - """Validates terms in new_terms_fields are valid ECS schema.""" - - kql_validator = KQLValidator(self.query) - kql_validator.validate(self, meta) - feature_min_stack = Version.parse('8.4.0') - feature_min_stack_extended_fields = Version.parse('8.6.0') - current_package_version = Version.parse(load_current_package_version(), optional_minor_and_patch=True) - - # validate history window start field exists and is correct - assert self.new_terms.history_window_start, \ - "new terms field found with no history_window_start field defined" - assert self.new_terms.history_window_start[0].field == "history_window_start", \ - f"{self.new_terms.history_window_start} should be 'history_window_start'" - - # validate new terms and history window start fields is correct - assert self.new_terms.field == "new_terms_fields", \ - f"{self.new_terms.field} should be 'new_terms_fields' for new_terms rule type" - - # ecs validation - min_stack_version = Version.parse(meta.get("min_stack_version")) if meta.get("min_stack_version") else None - min_stack_version = current_package_version if min_stack_version is None or min_stack_version < \ - current_package_version else min_stack_version - - assert min_stack_version >= feature_min_stack, \ - f"New Terms rule types only compatible with {feature_min_stack}+" - ecs_version = get_stack_schemas()[str(min_stack_version)]['ecs'] - beats_version = get_stack_schemas()[str(min_stack_version)]['beats'] - - # checks if new terms field(s) are in ecs, beats or non-ecs schemas - _, _, schema = kql_validator.get_beats_schema(self.index or [], beats_version, ecs_version) - - for new_terms_field in self.new_terms.value: - assert new_terms_field in schema.keys(), \ - f"{new_terms_field} not found in ECS, Beats, or non-ecs schemas" - - # validates length of new_terms to stack version - https://github.com/elastic/kibana/issues/142862 - if min_stack_version >= feature_min_stack and \ - min_stack_version < feature_min_stack_extended_fields: - assert len(self.new_terms.value) == 1, \ - f"new terms have a max limit of 1 for stack versions below {feature_min_stack_extended_fields}" - - # validate fields are unique - assert len(set(self.new_terms.value)) == len(self.new_terms.value), \ - f"new terms fields values are not unique - {self.new_terms.value}" - def transform(self, obj: dict) -> dict: """Transforms new terms data to API format for Kibana.""" diff --git a/tests/test_all_rules.py b/tests/test_all_rules.py index 7a13f4cb8..d6f28d68e 100644 --- a/tests/test_all_rules.py +++ b/tests/test_all_rules.py @@ -1319,3 +1319,101 @@ class TestAlertSuppression(BaseRuleTest): if fld not in schema.keys(): self.fail(f"{self.rule_str(rule)} alert suppression field {fld} not \ found in ECS, Beats, or non-ecs schemas") + + +class TestNewTerms(BaseRuleTest): + """Test new term rules.""" + + @unittest.skipIf(PACKAGE_STACK_VERSION < Version.parse("8.4.0"), + "Test only applicable to 8.4+ stacks for new terms feature.") + def test_history_window_start(self): + """Test new terms history window start field.""" + + for rule in self.production_rules: + if rule.contents.data.type == "new_terms": + + # validate history window start field exists and is correct + assert rule.contents.data.new_terms.history_window_start, \ + "new terms field found with no history_window_start field defined" + assert rule.contents.data.new_terms.history_window_start[0].field == "history_window_start", \ + f"{rule.contents.data.new_terms.history_window_start} should be 'history_window_start'" + + @unittest.skipIf(PACKAGE_STACK_VERSION < Version.parse("8.4.0"), + "Test only applicable to 8.4+ stacks for new terms feature.") + def test_new_terms_field_exists(self): + # validate new terms and history window start fields are correct + for rule in self.production_rules: + if rule.contents.data.type == "new_terms": + assert rule.contents.data.new_terms.field == "new_terms_fields", \ + f"{rule.contents.data.new_terms.field} should be 'new_terms_fields' for new_terms rule type" + + @unittest.skipIf(PACKAGE_STACK_VERSION < Version.parse("8.4.0"), + "Test only applicable to 8.4+ stacks for new terms feature.") + def test_new_terms_fields(self): + """Test new terms fields are schema validated.""" + # ecs validation + for rule in self.production_rules: + if rule.contents.data.type == "new_terms": + meta = rule.contents.metadata + feature_min_stack = Version.parse('8.4.0') + current_package_version = Version.parse(load_current_package_version(), optional_minor_and_patch=True) + min_stack_version = Version.parse(meta.get("min_stack_version")) if \ + meta.get("min_stack_version") else None + min_stack_version = current_package_version if min_stack_version is None or min_stack_version < \ + current_package_version else min_stack_version + + assert min_stack_version >= feature_min_stack, \ + f"New Terms rule types only compatible with {feature_min_stack}+" + ecs_version = get_stack_schemas()[str(min_stack_version)]['ecs'] + beats_version = get_stack_schemas()[str(min_stack_version)]['beats'] + + # checks if new terms field(s) are in ecs, beats non-ecs or integration schemas + queryvalidator = QueryValidator(rule.contents.data.query) + _, _, schema = queryvalidator.get_beats_schema([], beats_version, ecs_version) + integration_manifests = load_integrations_manifests() + integration_schemas = load_integrations_schemas() + integration_tags = meta.get("integration") + if integration_tags: + for tag in integration_tags: + latest_tag_compat_ver, _ = find_latest_compatible_version( + package=tag, + integration="", + rule_stack_version=min_stack_version, + packages_manifest=integration_manifests) + if latest_tag_compat_ver: + integration_schema = integration_schemas[tag][latest_tag_compat_ver] + for policy_template in integration_schema.keys(): + schema.update(**integration_schemas[tag][latest_tag_compat_ver][policy_template]) + for new_terms_field in rule.contents.data.new_terms.value: + assert new_terms_field in schema.keys(), \ + f"{new_terms_field} not found in ECS, Beats, or non-ecs schemas" + + @unittest.skipIf(PACKAGE_STACK_VERSION < Version.parse("8.4.0"), + "Test only applicable to 8.4+ stacks for new terms feature.") + def test_new_terms_max_limit(self): + """Test new terms max limit.""" + # validates length of new_terms to stack version - https://github.com/elastic/kibana/issues/142862 + for rule in self.production_rules: + if rule.contents.data.type == "new_terms": + meta = rule.contents.metadata + feature_min_stack = Version.parse('8.4.0') + feature_min_stack_extended_fields = Version.parse('8.6.0') + current_package_version = Version.parse(load_current_package_version(), optional_minor_and_patch=True) + min_stack_version = Version.parse(meta.get("min_stack_version")) if \ + meta.get("min_stack_version") else None + min_stack_version = current_package_version if min_stack_version is None or min_stack_version < \ + current_package_version else min_stack_version + if min_stack_version >= feature_min_stack and \ + min_stack_version < feature_min_stack_extended_fields: + assert len(rule.contents.data.new_terms.value) == 1, \ + f"new terms have a max limit of 1 for stack versions below {feature_min_stack_extended_fields}" + + @unittest.skipIf(PACKAGE_STACK_VERSION < Version.parse("8.6.0"), + "Test only applicable to 8.4+ stacks for new terms feature.") + def test_new_terms_fields_unique(self): + """Test new terms fields are unique.""" + # validate fields are unique + for rule in self.production_rules: + if rule.contents.data.type == "new_terms": + assert len(set(rule.contents.data.new_terms.value)) == len(rule.contents.data.new_terms.value), \ + f"new terms fields values are not unique - {rule.contents.data.new_terms.value}"