diff --git a/detection_rules/beats.py b/detection_rules/beats.py index 464e60c8d..0ace3f755 100644 --- a/detection_rules/beats.py +++ b/detection_rules/beats.py @@ -6,7 +6,7 @@ """ECS Schemas management.""" import os import re -from typing import List +from typing import List, Optional import kql import eql @@ -266,3 +266,9 @@ def get_schema_from_kql(tree: kql.ast.BaseNode, beats: list, version: str = None datasets.update(child.value for child in node.value if isinstance(child, kql.ast.String)) return get_schema_from_datasets(beats, modules, datasets, version=version) + + +def parse_beats_from_index(index: Optional[list]) -> List[str]: + indexes = index or [] + beat_types = [index.split("-")[0] for index in indexes if "beat-*" in index] + return beat_types diff --git a/detection_rules/cli_utils.py b/detection_rules/cli_utils.py index 71702819e..500b8f15c 100644 --- a/detection_rules/cli_utils.py +++ b/detection_rules/cli_utils.py @@ -215,8 +215,4 @@ def rule_prompt(path=None, rule_type=None, required_only=True, save=True, verbos # rta_mappings.add_rule_to_mapping_file(rule) # click.echo('Placeholder added to rule-mapping.yml') - click.echo('Rule will validate against the latest ECS schema available (and beats if necessary)') - click.echo(' - to have a rule validate against specific ECS schemas, add them to metadata->ecs_versions') - click.echo(' - to have a rule validate against a specific beats schema, add it to metadata->beats_version') - return rule diff --git a/detection_rules/ecs.py b/detection_rules/ecs.py index fa6a4e7b2..a5af5956d 100644 --- a/detection_rules/ecs.py +++ b/detection_rules/ecs.py @@ -200,7 +200,7 @@ class KqlSchema2Eql(eql.Schema): @cached -def get_kql_schema(version=None, indexes=None, beat_schema=None): +def get_kql_schema(version=None, indexes=None, beat_schema=None) -> dict: """Get schema for KQL.""" indexes = indexes or () converted = flatten_multi_fields(get_schema(version, name='ecs_flat')) diff --git a/detection_rules/packaging.py b/detection_rules/packaging.py index 2339d3b4d..ad64a5c9e 100644 --- a/detection_rules/packaging.py +++ b/detection_rules/packaging.py @@ -63,7 +63,7 @@ def filter_rule(rule: TOMLRule, config_filter: dict, exclude_fields: Optional[di @cached -def load_current_package_version(): +def load_current_package_version() -> str: """Load the current package version from config file.""" return load_etc_dump('packages.yml')['package']['name'] diff --git a/detection_rules/rule.py b/detection_rules/rule.py index 9bdfc5ede..2aad2b49a 100644 --- a/detection_rules/rule.py +++ b/detection_rules/rule.py @@ -9,19 +9,20 @@ import typing from dataclasses import dataclass, field from functools import cached_property from pathlib import Path -from typing import Literal, Union, Optional, List, Any +from typing import Literal, Union, Optional, List, Any, Dict from uuid import uuid4 from marshmallow import ValidationError, validates_schema + from . import utils from .mixins import MarshmallowDataclassMixin from .rule_formatter import toml_write, nested_normalize -from .schemas import definitions, SCHEMA_DIR -from .schemas import downgrade +from .schemas import SCHEMA_DIR, definitions, downgrade, get_stack_schemas from .utils import cached _META_SCHEMA_REQ_DEFAULTS = {} +MIN_FLEET_PACKAGE_VERSION = '7.13.0' @dataclass(frozen=True) @@ -32,10 +33,9 @@ class RuleMeta(MarshmallowDataclassMixin): deprecation_date: Optional[definitions.Date] # Optional fields - beats_version: Optional[definitions.BranchVer] - ecs_versions: Optional[List[definitions.BranchVer]] comments: Optional[str] maturity: Optional[definitions.Maturity] + min_stack_version: Optional[definitions.SemVer] os_type_list: Optional[List[definitions.OSType]] query_schema_validation: Optional[bool] related_endpoint_rules: Optional[List[str]] @@ -43,6 +43,11 @@ class RuleMeta(MarshmallowDataclassMixin): # Extended information as an arbitrary dictionary extended = Optional[dict] + def get_validation_stack_versions(self) -> Dict[str, dict]: + """Get a dict of beats and ecs versions per stack release.""" + stack_versions = get_stack_schemas(self.min_stack_version or MIN_FLEET_PACKAGE_VERSION) + return stack_versions + @dataclass(frozen=True) class BaseThreatEntry: diff --git a/detection_rules/rule_validators.py b/detection_rules/rule_validators.py index 825038e2a..c1771c945 100644 --- a/detection_rules/rule_validators.py +++ b/detection_rules/rule_validators.py @@ -10,8 +10,8 @@ from typing import List import eql import kql -from detection_rules import beats, ecs -from detection_rules.rule import QueryValidator, QueryRuleData, RuleMeta +from . import ecs, beats +from .rule import QueryValidator, QueryRuleData, RuleMeta class KQLValidator(QueryValidator): @@ -36,35 +36,34 @@ class KQLValidator(QueryValidator): # syntax only, which is done via self.ast return - indexes = data.index or [] - beats_version = meta.beats_version or beats.get_max_version() - ecs_versions = meta.ecs_versions or [ecs.get_max_version()] + for stack_version, mapping in meta.get_validation_stack_versions().items(): + beats_version = mapping['beats'] + ecs_version = mapping['ecs'] + err_trailer = f'stack: {stack_version}, beats: {beats_version}, ecs: {ecs_version}' - beat_types = [index.split("-")[0] for index in indexes if "beat-*" in index] - beat_schema = beats.get_schema_from_kql(ast, beat_types, version=beats_version) if beat_types else None + beat_types = beats.parse_beats_from_index(data.index) + beat_schema = beats.get_schema_from_kql(ast, beat_types, version=beats_version) if beat_types else None + schema = ecs.get_kql_schema(version=ecs_version, indexes=data.index or [], beat_schema=beat_schema) - if not ecs_versions: - kql.parse(self.query, schema=ecs.get_kql_schema(indexes=indexes, beat_schema=beat_schema)) - else: - for version in ecs_versions: - schema = ecs.get_kql_schema(version=version, indexes=indexes, beat_schema=beat_schema) + try: + kql.parse(self.query, schema=schema) + except kql.KqlParseError as exc: + message = exc.error_msg + trailer = err_trailer + if "Unknown field" in message and beat_types: + trailer = f"\nTry adding event.module or event.dataset to specify beats module\n\n{trailer}" - try: - kql.parse(self.query, schema=schema) - except kql.KqlParseError as exc: - message = exc.error_msg - trailer = None - if "Unknown field" in message and beat_types: - trailer = "\nTry adding event.module or event.dataset to specify beats module" - - raise kql.KqlParseError(exc.error_msg, exc.line, exc.column, exc.source, - len(exc.caret.lstrip()), trailer=trailer) from None + raise kql.KqlParseError(exc.error_msg, exc.line, exc.column, exc.source, + len(exc.caret.lstrip()), trailer=trailer) from None + except Exception: + print(err_trailer) + raise class EQLValidator(QueryValidator): @cached_property - def ast(self) -> kql.ast.Expression: + def ast(self) -> eql.ast.Expression: with eql.parser.elasticsearch_syntax, eql.parser.ignore_missing_functions: return eql.parse_query(self.query) @@ -74,41 +73,34 @@ class EQLValidator(QueryValidator): def validate(self, data: 'QueryRuleData', meta: RuleMeta) -> None: """Validate an EQL query while checking TOMLRule.""" - _ = self.ast + ast = self.ast if meta.query_schema_validation is False or meta.maturity == "deprecated": # syntax only, which is done via self.ast return - indexes = data.index or [] - beats_version = meta.beats_version or beats.get_max_version() - ecs_versions = meta.ecs_versions or [ecs.get_max_version()] + for stack_version, mapping in meta.get_validation_stack_versions().items(): + beats_version = mapping['beats'] + ecs_version = mapping['ecs'] + err_trailer = f'stack: {stack_version}, beats: {beats_version}, ecs: {ecs_version}' - # TODO: remove once py-eql supports ipv6 for cidrmatch - # Or, unregister the cidrMatch function and replace it with one that doesn't validate against strict IPv4 - with eql.parser.elasticsearch_syntax, eql.parser.ignore_missing_functions: - parsed = eql.parse_query(self.query) - - beat_types = [index.split("-")[0] for index in indexes if "beat-*" in index] - beat_schema = beats.get_schema_from_eql(parsed, beat_types, version=beats_version) if beat_types else None - - for version in ecs_versions: - schema = ecs.get_kql_schema(indexes=indexes, beat_schema=beat_schema, version=version) + beat_types = beats.parse_beats_from_index(data.index) + beat_schema = beats.get_schema_from_kql(ast, beat_types, version=beats_version) if beat_types else None + schema = ecs.get_kql_schema(version=ecs_version, indexes=data.index or [], beat_schema=beat_schema) eql_schema = ecs.KqlSchema2Eql(schema) try: # TODO: switch to custom cidrmatch that allows ipv6 with eql_schema, eql.parser.elasticsearch_syntax, eql.parser.ignore_missing_functions: eql.parse_query(self.query) - - except eql.EqlTypeMismatchError: - raise - except eql.EqlParseError as exc: message = exc.error_msg - trailer = None + trailer = err_trailer if "Unknown field" in message and beat_types: - trailer = "\nTry adding event.module or event.dataset to specify beats module" + trailer = f"\nTry adding event.module or event.dataset to specify beats module\n\n{trailer}" raise exc.__class__(exc.error_msg, exc.line, exc.column, exc.source, len(exc.caret.lstrip()), trailer=trailer) from None + except Exception: + print(err_trailer) + raise diff --git a/detection_rules/schemas/__init__.py b/detection_rules/schemas/__init__.py index 7f0ecdf6e..f20eb1b30 100644 --- a/detection_rules/schemas/__init__.py +++ b/detection_rules/schemas/__init__.py @@ -3,13 +3,13 @@ # 2.0; you may not use this file except in compliance with the Elastic License # 2.0. import json -from typing import List, Optional +from typing import Dict, List, Optional import jsonschema from .rta_schema import validate_rta_mapping from ..semver import Version -from ..utils import cached, get_etc_path +from ..utils import cached, get_etc_path, load_etc_dump from . import definitions from pathlib import Path @@ -18,6 +18,7 @@ __all__ = ( "SCHEMA_DIR", "definitions", "downgrade", + "get_stack_schemas", "validate_rta_mapping", "all_versions", ) @@ -181,3 +182,24 @@ def downgrade(api_contents: dict, target_version: str, current_version: Optional api_contents = migrations[version](version, api_contents) return api_contents + + +@cached +def get_stack_schemas(stack_version: str) -> Dict[str, dict]: + """Return all ECS + beats to stack versions for a every stack version >= specified stack version and <= package.""" + from ..packaging import load_current_package_version + + stack_version = Version(stack_version) + current_package = Version(load_current_package_version()) + + if len(current_package) == 2: + current_package = Version(current_package + (0,)) + + stack_map = load_etc_dump('stack-schema-map.yaml') + versions = {k: v for k, v in stack_map.items() + if (mapped_version := Version(k)) >= stack_version and mapped_version <= current_package and v} + + if stack_version > current_package: + versions[stack_version] = {'beats': 'master', 'ecs': 'master'} + + return versions diff --git a/etc/stack-schema-map.yaml b/etc/stack-schema-map.yaml new file mode 100644 index 000000000..e992786fa --- /dev/null +++ b/etc/stack-schema-map.yaml @@ -0,0 +1,12 @@ +# alignment of stack with beats and ecs versions +# ECS versions do not align perfectly with stack releases (as of 7.13), so this will reflect MAX ecs version for a +# given release + +"7.13.0": + # beats release about the same time as the stack, so we cannot update this until it is released + beats: "7.13.2" + ecs: "1.9.0" + +"7.14.0": + beats: "master" # TODO: 7.14.x + ecs: "1.10.0" diff --git a/tests/test_all_rules.py b/tests/test_all_rules.py index f055327a7..382521779 100644 --- a/tests/test_all_rules.py +++ b/tests/test_all_rules.py @@ -12,7 +12,7 @@ from pathlib import Path import eql import kql -from detection_rules import attack, beats, ecs +from detection_rules import attack from detection_rules.packaging import load_versions from detection_rules.rule import QueryRuleData from detection_rules.rule_loader import FILE_PATTERN @@ -356,23 +356,6 @@ class TestRuleFiles(BaseRuleTest): class TestRuleMetadata(BaseRuleTest): """Test the metadata of rules.""" - def test_ecs_and_beats_opt_in_not_latest_only(self): - """Test that explicitly defined opt-in validation is not only the latest versions to avoid stale tests.""" - for rule in self.all_rules: - beats_version = rule.contents.metadata.beats_version - ecs_versions = rule.contents.metadata.ecs_versions or [] - latest_beats = str(beats.get_max_version()) - latest_ecs = ecs.get_max_version() - - error_msg = f'{self.rule_str(rule)} it is unnecessary to define the current latest beats version: ' \ - f'{latest_beats}' - self.assertNotEqual(latest_beats, beats_version, error_msg) - - if len(ecs_versions) == 1: - error_msg = f'{self.rule_str(rule)} it is unnecessary to define the current latest ecs version if ' \ - f'only one version is specified: {latest_ecs}' - self.assertNotIn(latest_ecs, ecs_versions, error_msg) - def test_updated_date_newer_than_creation(self): """Test that the updated_date is newer than the creation date.""" invalid = [] diff --git a/tests/test_schemas.py b/tests/test_schemas.py index aacf4c076..e78c83265 100644 --- a/tests/test_schemas.py +++ b/tests/test_schemas.py @@ -10,6 +10,7 @@ import uuid import eql +from detection_rules.packaging import load_current_package_version from detection_rules.rule import TOMLRuleContents from detection_rules.schemas import downgrade @@ -165,7 +166,11 @@ class TestSchemas(unittest.TestCase): } def build_rule(query): - metadata = {"creation_date": "1970/01/01", "updated_date": "1970/01/01"} + metadata = { + "creation_date": "1970/01/01", + "updated_date": "1970/01/01", + "min_stack_version": load_current_package_version() + } data = base_fields.copy() data["query"] = query obj = {"metadata": metadata, "rule": data}