Add min_stack_version to rule metadata (#1173)

* Add min_stack_version to metadata of rule structure
* validate all "stack versions" between defined and current package
* Use master schemas if min_stack_version > current_package

Co-authored-by: Ross Wolf <31489089+rw-access@users.noreply.github.com>
(cherry picked from commit 781953a0a0)
This commit is contained in:
Justin Ibarra
2021-06-30 13:26:27 -08:00
committed by github-actions[bot]
parent 4d54a87f3c
commit df8f4af3fc
10 changed files with 97 additions and 76 deletions
+7 -1
View File
@@ -6,7 +6,7 @@
"""ECS Schemas management."""
import os
import re
from typing import List
from typing import List, Optional
import kql
import eql
@@ -266,3 +266,9 @@ def get_schema_from_kql(tree: kql.ast.BaseNode, beats: list, version: str = None
datasets.update(child.value for child in node.value if isinstance(child, kql.ast.String))
return get_schema_from_datasets(beats, modules, datasets, version=version)
def parse_beats_from_index(index: Optional[list]) -> List[str]:
indexes = index or []
beat_types = [index.split("-")[0] for index in indexes if "beat-*" in index]
return beat_types
-4
View File
@@ -215,8 +215,4 @@ def rule_prompt(path=None, rule_type=None, required_only=True, save=True, verbos
# rta_mappings.add_rule_to_mapping_file(rule)
# click.echo('Placeholder added to rule-mapping.yml')
click.echo('Rule will validate against the latest ECS schema available (and beats if necessary)')
click.echo(' - to have a rule validate against specific ECS schemas, add them to metadata->ecs_versions')
click.echo(' - to have a rule validate against a specific beats schema, add it to metadata->beats_version')
return rule
+1 -1
View File
@@ -200,7 +200,7 @@ class KqlSchema2Eql(eql.Schema):
@cached
def get_kql_schema(version=None, indexes=None, beat_schema=None):
def get_kql_schema(version=None, indexes=None, beat_schema=None) -> dict:
"""Get schema for KQL."""
indexes = indexes or ()
converted = flatten_multi_fields(get_schema(version, name='ecs_flat'))
+1 -1
View File
@@ -63,7 +63,7 @@ def filter_rule(rule: TOMLRule, config_filter: dict, exclude_fields: Optional[di
@cached
def load_current_package_version():
def load_current_package_version() -> str:
"""Load the current package version from config file."""
return load_etc_dump('packages.yml')['package']['name']
+10 -5
View File
@@ -9,19 +9,20 @@ import typing
from dataclasses import dataclass, field
from functools import cached_property
from pathlib import Path
from typing import Literal, Union, Optional, List, Any
from typing import Literal, Union, Optional, List, Any, Dict
from uuid import uuid4
from marshmallow import ValidationError, validates_schema
from . import utils
from .mixins import MarshmallowDataclassMixin
from .rule_formatter import toml_write, nested_normalize
from .schemas import definitions, SCHEMA_DIR
from .schemas import downgrade
from .schemas import SCHEMA_DIR, definitions, downgrade, get_stack_schemas
from .utils import cached
_META_SCHEMA_REQ_DEFAULTS = {}
MIN_FLEET_PACKAGE_VERSION = '7.13.0'
@dataclass(frozen=True)
@@ -32,10 +33,9 @@ class RuleMeta(MarshmallowDataclassMixin):
deprecation_date: Optional[definitions.Date]
# Optional fields
beats_version: Optional[definitions.BranchVer]
ecs_versions: Optional[List[definitions.BranchVer]]
comments: Optional[str]
maturity: Optional[definitions.Maturity]
min_stack_version: Optional[definitions.SemVer]
os_type_list: Optional[List[definitions.OSType]]
query_schema_validation: Optional[bool]
related_endpoint_rules: Optional[List[str]]
@@ -43,6 +43,11 @@ class RuleMeta(MarshmallowDataclassMixin):
# Extended information as an arbitrary dictionary
extended = Optional[dict]
def get_validation_stack_versions(self) -> Dict[str, dict]:
"""Get a dict of beats and ecs versions per stack release."""
stack_versions = get_stack_schemas(self.min_stack_version or MIN_FLEET_PACKAGE_VERSION)
return stack_versions
@dataclass(frozen=True)
class BaseThreatEntry:
+35 -43
View File
@@ -10,8 +10,8 @@ from typing import List
import eql
import kql
from detection_rules import beats, ecs
from detection_rules.rule import QueryValidator, QueryRuleData, RuleMeta
from . import ecs, beats
from .rule import QueryValidator, QueryRuleData, RuleMeta
class KQLValidator(QueryValidator):
@@ -36,35 +36,34 @@ class KQLValidator(QueryValidator):
# syntax only, which is done via self.ast
return
indexes = data.index or []
beats_version = meta.beats_version or beats.get_max_version()
ecs_versions = meta.ecs_versions or [ecs.get_max_version()]
for stack_version, mapping in meta.get_validation_stack_versions().items():
beats_version = mapping['beats']
ecs_version = mapping['ecs']
err_trailer = f'stack: {stack_version}, beats: {beats_version}, ecs: {ecs_version}'
beat_types = [index.split("-")[0] for index in indexes if "beat-*" in index]
beat_schema = beats.get_schema_from_kql(ast, beat_types, version=beats_version) if beat_types else None
beat_types = beats.parse_beats_from_index(data.index)
beat_schema = beats.get_schema_from_kql(ast, beat_types, version=beats_version) if beat_types else None
schema = ecs.get_kql_schema(version=ecs_version, indexes=data.index or [], beat_schema=beat_schema)
if not ecs_versions:
kql.parse(self.query, schema=ecs.get_kql_schema(indexes=indexes, beat_schema=beat_schema))
else:
for version in ecs_versions:
schema = ecs.get_kql_schema(version=version, indexes=indexes, beat_schema=beat_schema)
try:
kql.parse(self.query, schema=schema)
except kql.KqlParseError as exc:
message = exc.error_msg
trailer = err_trailer
if "Unknown field" in message and beat_types:
trailer = f"\nTry adding event.module or event.dataset to specify beats module\n\n{trailer}"
try:
kql.parse(self.query, schema=schema)
except kql.KqlParseError as exc:
message = exc.error_msg
trailer = None
if "Unknown field" in message and beat_types:
trailer = "\nTry adding event.module or event.dataset to specify beats module"
raise kql.KqlParseError(exc.error_msg, exc.line, exc.column, exc.source,
len(exc.caret.lstrip()), trailer=trailer) from None
raise kql.KqlParseError(exc.error_msg, exc.line, exc.column, exc.source,
len(exc.caret.lstrip()), trailer=trailer) from None
except Exception:
print(err_trailer)
raise
class EQLValidator(QueryValidator):
@cached_property
def ast(self) -> kql.ast.Expression:
def ast(self) -> eql.ast.Expression:
with eql.parser.elasticsearch_syntax, eql.parser.ignore_missing_functions:
return eql.parse_query(self.query)
@@ -74,41 +73,34 @@ class EQLValidator(QueryValidator):
def validate(self, data: 'QueryRuleData', meta: RuleMeta) -> None:
"""Validate an EQL query while checking TOMLRule."""
_ = self.ast
ast = self.ast
if meta.query_schema_validation is False or meta.maturity == "deprecated":
# syntax only, which is done via self.ast
return
indexes = data.index or []
beats_version = meta.beats_version or beats.get_max_version()
ecs_versions = meta.ecs_versions or [ecs.get_max_version()]
for stack_version, mapping in meta.get_validation_stack_versions().items():
beats_version = mapping['beats']
ecs_version = mapping['ecs']
err_trailer = f'stack: {stack_version}, beats: {beats_version}, ecs: {ecs_version}'
# TODO: remove once py-eql supports ipv6 for cidrmatch
# Or, unregister the cidrMatch function and replace it with one that doesn't validate against strict IPv4
with eql.parser.elasticsearch_syntax, eql.parser.ignore_missing_functions:
parsed = eql.parse_query(self.query)
beat_types = [index.split("-")[0] for index in indexes if "beat-*" in index]
beat_schema = beats.get_schema_from_eql(parsed, beat_types, version=beats_version) if beat_types else None
for version in ecs_versions:
schema = ecs.get_kql_schema(indexes=indexes, beat_schema=beat_schema, version=version)
beat_types = beats.parse_beats_from_index(data.index)
beat_schema = beats.get_schema_from_kql(ast, beat_types, version=beats_version) if beat_types else None
schema = ecs.get_kql_schema(version=ecs_version, indexes=data.index or [], beat_schema=beat_schema)
eql_schema = ecs.KqlSchema2Eql(schema)
try:
# TODO: switch to custom cidrmatch that allows ipv6
with eql_schema, eql.parser.elasticsearch_syntax, eql.parser.ignore_missing_functions:
eql.parse_query(self.query)
except eql.EqlTypeMismatchError:
raise
except eql.EqlParseError as exc:
message = exc.error_msg
trailer = None
trailer = err_trailer
if "Unknown field" in message and beat_types:
trailer = "\nTry adding event.module or event.dataset to specify beats module"
trailer = f"\nTry adding event.module or event.dataset to specify beats module\n\n{trailer}"
raise exc.__class__(exc.error_msg, exc.line, exc.column, exc.source,
len(exc.caret.lstrip()), trailer=trailer) from None
except Exception:
print(err_trailer)
raise
+24 -2
View File
@@ -3,13 +3,13 @@
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import json
from typing import List, Optional
from typing import Dict, List, Optional
import jsonschema
from .rta_schema import validate_rta_mapping
from ..semver import Version
from ..utils import cached, get_etc_path
from ..utils import cached, get_etc_path, load_etc_dump
from . import definitions
from pathlib import Path
@@ -18,6 +18,7 @@ __all__ = (
"SCHEMA_DIR",
"definitions",
"downgrade",
"get_stack_schemas",
"validate_rta_mapping",
"all_versions",
)
@@ -181,3 +182,24 @@ def downgrade(api_contents: dict, target_version: str, current_version: Optional
api_contents = migrations[version](version, api_contents)
return api_contents
@cached
def get_stack_schemas(stack_version: str) -> Dict[str, dict]:
"""Return all ECS + beats to stack versions for a every stack version >= specified stack version and <= package."""
from ..packaging import load_current_package_version
stack_version = Version(stack_version)
current_package = Version(load_current_package_version())
if len(current_package) == 2:
current_package = Version(current_package + (0,))
stack_map = load_etc_dump('stack-schema-map.yaml')
versions = {k: v for k, v in stack_map.items()
if (mapped_version := Version(k)) >= stack_version and mapped_version <= current_package and v}
if stack_version > current_package:
versions[stack_version] = {'beats': 'master', 'ecs': 'master'}
return versions
+12
View File
@@ -0,0 +1,12 @@
# alignment of stack with beats and ecs versions
# ECS versions do not align perfectly with stack releases (as of 7.13), so this will reflect MAX ecs version for a
# given release
"7.13.0":
# beats release about the same time as the stack, so we cannot update this until it is released
beats: "7.13.2"
ecs: "1.9.0"
"7.14.0":
beats: "master" # TODO: 7.14.x
ecs: "1.10.0"
+1 -18
View File
@@ -12,7 +12,7 @@ from pathlib import Path
import eql
import kql
from detection_rules import attack, beats, ecs
from detection_rules import attack
from detection_rules.packaging import load_versions
from detection_rules.rule import QueryRuleData
from detection_rules.rule_loader import FILE_PATTERN
@@ -356,23 +356,6 @@ class TestRuleFiles(BaseRuleTest):
class TestRuleMetadata(BaseRuleTest):
"""Test the metadata of rules."""
def test_ecs_and_beats_opt_in_not_latest_only(self):
"""Test that explicitly defined opt-in validation is not only the latest versions to avoid stale tests."""
for rule in self.all_rules:
beats_version = rule.contents.metadata.beats_version
ecs_versions = rule.contents.metadata.ecs_versions or []
latest_beats = str(beats.get_max_version())
latest_ecs = ecs.get_max_version()
error_msg = f'{self.rule_str(rule)} it is unnecessary to define the current latest beats version: ' \
f'{latest_beats}'
self.assertNotEqual(latest_beats, beats_version, error_msg)
if len(ecs_versions) == 1:
error_msg = f'{self.rule_str(rule)} it is unnecessary to define the current latest ecs version if ' \
f'only one version is specified: {latest_ecs}'
self.assertNotIn(latest_ecs, ecs_versions, error_msg)
def test_updated_date_newer_than_creation(self):
"""Test that the updated_date is newer than the creation date."""
invalid = []
+6 -1
View File
@@ -10,6 +10,7 @@ import uuid
import eql
from detection_rules.packaging import load_current_package_version
from detection_rules.rule import TOMLRuleContents
from detection_rules.schemas import downgrade
@@ -165,7 +166,11 @@ class TestSchemas(unittest.TestCase):
}
def build_rule(query):
metadata = {"creation_date": "1970/01/01", "updated_date": "1970/01/01"}
metadata = {
"creation_date": "1970/01/01",
"updated_date": "1970/01/01",
"min_stack_version": load_current_package_version()
}
data = base_fields.copy()
data["query"] = query
obj = {"metadata": metadata, "rule": data}