diff --git a/detection_rules/devtools.py b/detection_rules/devtools.py index e10ce9a9c..f6bc27378 100644 --- a/detection_rules/devtools.py +++ b/detection_rules/devtools.py @@ -29,6 +29,7 @@ from kibana.connector import Kibana from . import rule_loader, utils from .cli_utils import single_collection from .docs import IntegrationSecurityDocs +from .endgame import EndgameSchemaManager from .eswrap import CollectEvents, add_range_to_dsl from .ghwrap import GithubClient, update_gist from .main import root @@ -779,14 +780,6 @@ def deprecate_rule(ctx: click.Context, rule_file: Path): click.echo(f'Rule moved to {deprecated_path} - remember to git add this file') -@dev_group.command("update-schemas") -def update_schemas(): - classes = [BaseRuleData] + list(typing.get_args(AnyRuleData)) - - for cls in classes: - cls.save_schema() - - @dev_group.command('update-navigator-gists') @click.option('--directory', type=Path, default=CURRENT_RELEASE_PATH.joinpath('extras', 'navigator_layers'), help='Directory containing only navigator files.') @@ -1138,3 +1131,29 @@ def build_integration_manifests(overwrite: bool): integration_tags = list(set([r.contents.metadata.integration for r in rules if r.contents.metadata.integration])) click.echo(f"integration tags identified: {integration_tags}") build_integrations_manifest(overwrite, integration_tags) + + +@dev_group.group('schemas') +def schemas_group(): + """Commands for dev schema methods.""" + + +@schemas_group.command("update-rule-data") +def update_rule_data_schemas(): + classes = [BaseRuleData] + list(typing.get_args(AnyRuleData)) + + for cls in classes: + cls.save_schema() + + +@schemas_group.command("generate-endgame") +@click.option("--token", required=True, prompt=get_github_token() is None, default=get_github_token(), + help="GitHub token to use for the PR", hide_input=True) +@click.option("--endgame-version", "-e", required=True, help="Tagged version from TBD. e.g., 1.9.0") +@click.option("--overwrite", is_flag=True, help="Overwrite if versions exist") +def generate_endgame_schema(token: str, endgame_version: str, overwrite: bool): + """Download Endgame-ECS mapping.json and generate flattend schema.""" + github = GithubClient(token) + client = github.authenticated_client + schema_manager = EndgameSchemaManager(client, endgame_version) + schema_manager.save_schemas(overwrite=overwrite) diff --git a/detection_rules/endgame.py b/detection_rules/endgame.py new file mode 100644 index 000000000..1794861af --- /dev/null +++ b/detection_rules/endgame.py @@ -0,0 +1,102 @@ +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License +# 2.0; you may not use this file except in compliance with the Elastic License +# 2.0. + +"""Endgame Schemas management.""" +import json +import shutil +import sys +from pathlib import Path + +import eql + +from .utils import ETC_DIR, DateTimeEncoder, cached, gzip_compress, read_gzip + +ENDGAME_SCHEMA_DIR = Path(ETC_DIR) / "endgame_schemas" + + +class EndgameSchemaManager: + """Endgame Class to download, convert, and save endgame schemas from endgame-evecs.""" + + def __init__(self, github_client, endgame_version: str): + self.repo = github_client.get_repo("elastic/endgame-evecs") + self.endgame_version = endgame_version + self.endgame_schema = self.download_endgame_schema() + + def download_endgame_schema(self) -> dict: + """Download schema from endgame-evecs.""" + + # Use the static mapping.json file downloaded from the endgame-evecs repo. + main_branch = self.repo.get_branch("master") + main_branch_sha = main_branch.commit.sha + schema_path = "pkg/mapper/ecs/schema.json" + contents = self.repo.get_contents(schema_path, ref=main_branch_sha) + endgame_mapping = json.loads(contents.decoded_content.decode()) + + return endgame_mapping + + def save_schemas(self, overwrite: bool = False): + """Save the endgame schemas to the etc/endgame_schemas directory.""" + + schemas_dir = ENDGAME_SCHEMA_DIR / self.endgame_version + if schemas_dir.exists() and not overwrite: + raise FileExistsError(f"{schemas_dir} exists, use overwrite to force") + else: + shutil.rmtree(str(schemas_dir.resolve()), ignore_errors=True) + schemas_dir.mkdir() + + # write the raw schema to disk + raw_os_schema = self.endgame_schema + os_schema_path = schemas_dir / "endgame_ecs_mapping.json.gz" + compressed = gzip_compress(json.dumps(raw_os_schema, sort_keys=True, cls=DateTimeEncoder)) + os_schema_path.write_bytes(compressed) + print(f"Endgame raw schema file saved: {os_schema_path}") + + +class EndgameSchema(eql.Schema): + """Endgame schema for query validation.""" + + type_mapping = { + "keyword": eql.types.TypeHint.String, + "ip": eql.types.TypeHint.String, + "float": eql.types.TypeHint.Numeric, + # "double": eql.types.TypeHint.Numeric, + # "long": eql.types.TypeHint.Numeric, + # "short": eql.types.TypeHint.Numeric, + "integer": eql.types.TypeHint.Numeric, + "boolean": eql.types.TypeHint.Boolean, + } + + def __init__(self, endgame_schema): + self.endgame_schema = endgame_schema + eql.Schema.__init__(self, {}, allow_any=True, allow_generic=False, allow_missing=False) + + def get_event_type_hint(self, event_type, path): + dotted = ".".join(str(p) for p in path) + elasticsearch_type = self.endgame_schema.get(dotted) + eql_hint = self.type_mapping.get(elasticsearch_type) + + if eql_hint is not None: + return eql_hint, None + + +@cached +def read_endgame_schema(endgame_version: str, warn=False) -> dict: + """Load Endgame json schema. The schemas + must be generated with the `download_endgame_schema()` method.""" + # expect versions to be in format of N.N.N or master/main + + endgame_schema_path = ENDGAME_SCHEMA_DIR / endgame_version / "endgame_ecs_mapping.json.gz" + + if not endgame_schema_path.exists(): + if warn: + relative_path = endgame_schema_path.relative_to(ENDGAME_SCHEMA_DIR) + print(f"Missing file to validate: {relative_path}, skipping", file=sys.stderr) + return + else: + raise FileNotFoundError(str(endgame_schema_path)) + + schema = json.loads(read_gzip(endgame_schema_path)) + + return schema diff --git a/detection_rules/etc/endgame_schemas/1.9.0/endgame_ecs_mapping.json.gz b/detection_rules/etc/endgame_schemas/1.9.0/endgame_ecs_mapping.json.gz new file mode 100644 index 000000000..533ab4341 Binary files /dev/null and b/detection_rules/etc/endgame_schemas/1.9.0/endgame_ecs_mapping.json.gz differ diff --git a/detection_rules/etc/stack-schema-map.yaml b/detection_rules/etc/stack-schema-map.yaml index 61930038f..2aea729a9 100644 --- a/detection_rules/etc/stack-schema-map.yaml +++ b/detection_rules/etc/stack-schema-map.yaml @@ -27,31 +27,39 @@ "7.16.0": beats: "7.16.2" ecs: "1.12.2" + endgame: "1.9.0" "8.0.0": beats: "8.0.1" ecs: "8.0.1" + endgame: "1.9.0" "8.1.0": beats: "8.1.2" ecs: "8.1.0" + endgame: "1.9.0" "8.2.0": beats: "8.2.1" ecs: "8.2.1" + endgame: "1.9.0" "8.3.0": beats: "main" ecs: "8.3.1" + endgame: "1.9.0" "8.4.0": beats: "main" ecs: "8.3.1" + endgame: "1.9.0" "8.5.0": beats: "main" ecs: "8.3.1" + endgame: "1.9.0" "8.6.0": beats: "main" ecs: "8.4.0" + endgame: "1.9.0" diff --git a/detection_rules/rule.py b/detection_rules/rule.py index 5bf20653e..c1fe2fb64 100644 --- a/detection_rules/rule.py +++ b/detection_rules/rule.py @@ -22,7 +22,7 @@ from marko.block import Document as MarkoDocument from marko.ext.gfm import gfm from marshmallow import ValidationError, validates_schema -from . import beats, ecs, utils +from . import beats, ecs, endgame, utils from .integrations import (find_least_compatible_version, load_integrations_manifests) from .misc import load_current_package_version @@ -353,9 +353,11 @@ class QueryValidator: current_version = Version(Version(load_current_package_version()) + (0,)) ecs_version = get_stack_schemas()[str(current_version)]['ecs'] beats_version = get_stack_schemas()[str(current_version)]['beats'] + endgame_version = get_stack_schemas()[str(current_version)]['endgame'] ecs_schema = ecs.get_schema(ecs_version) beat_types, beat_schema, schema = self.get_beats_schema(index or [], beats_version, ecs_version) + endgame_schema = self.get_endgame_schema(index or [], endgame_version) required = [] unique_fields = self.unique_fields or [] @@ -364,8 +366,11 @@ class QueryValidator: field_type = ecs_schema.get(fld, {}).get('type') is_ecs = field_type is not None - if beat_schema and not is_ecs: - field_type = beat_schema.get(fld, {}).get('type') + if not is_ecs: + if beat_schema: + field_type = beat_schema.get(fld, {}).get('type') + elif endgame_schema: + field_type = endgame_schema.endgame_schema.get(fld, None) required.append(dict(name=fld, type=field_type or 'unknown', ecs=is_ecs)) @@ -379,6 +384,16 @@ class QueryValidator: schema = ecs.get_kql_schema(version=ecs_version, indexes=index, beat_schema=beat_schema) return beat_types, beat_schema, schema + @cached + def get_endgame_schema(self, index: list, endgame_version: str) -> Optional[endgame.EndgameSchema]: + """Get an assembled flat endgame schema.""" + + if "endgame-*" not in index: + return None + + endgame_schema = endgame.read_endgame_schema(endgame_version=endgame_version) + return endgame.EndgameSchema(endgame_schema) + @dataclass(frozen=True) class QueryRuleData(BaseRuleData): diff --git a/detection_rules/rule_validators.py b/detection_rules/rule_validators.py index dca70b6df..332955eb1 100644 --- a/detection_rules/rule_validators.py +++ b/detection_rules/rule_validators.py @@ -8,10 +8,10 @@ from functools import cached_property from typing import List, Optional, Union import eql - import kql -from . import ecs -from .rule import QueryValidator, QueryRuleData, RuleMeta + +from . import ecs, endgame +from .rule import QueryRuleData, QueryValidator, RuleMeta class KQLValidator(QueryValidator): @@ -63,16 +63,40 @@ class EQLValidator(QueryValidator): with eql.parser.elasticsearch_syntax, eql.parser.ignore_missing_functions: return eql.parse_query(self.query) - def text_fields(self, eql_schema: ecs.KqlSchema2Eql) -> List[str]: + def text_fields(self, eql_schema: Union[ecs.KqlSchema2Eql, endgame.EndgameSchema]) -> List[str]: """Return a list of fields of type text.""" from kql.parser import elasticsearch_type_family + schema = eql_schema.kql_schema if isinstance(eql_schema, ecs.KqlSchema2Eql) else eql_schema.endgame_schema - return [f for f in self.unique_fields if elasticsearch_type_family(eql_schema.kql_schema.get(f)) == 'text'] + return [f for f in self.unique_fields if elasticsearch_type_family(schema.get(f)) == 'text'] @cached_property def unique_fields(self) -> List[str]: return list(set(str(f) for f in self.ast if isinstance(f, eql.ast.Field))) + def validate_query_with_schema(self, schema: Union[ecs.KqlSchema2Eql, endgame.EndgameSchema], + err_trailer: str, beat_types: list = None) -> None: + """Validate the query against the schema.""" + try: + with schema, eql.parser.elasticsearch_syntax, eql.parser.ignore_missing_functions: + eql.parse_query(self.query) + except eql.EqlParseError as exc: + message = exc.error_msg + trailer = err_trailer + if "Unknown field" in message and beat_types: + trailer = f"\nTry adding event.module or event.dataset to specify beats module\n\n{trailer}" + elif "Field not recognized" in message: + text_fields = self.text_fields(schema) + if text_fields: + fields_str = ', '.join(text_fields) + trailer = f"\neql does not support text fields: {fields_str}\n\n{trailer}" + + raise exc.__class__(exc.error_msg, exc.line, exc.column, exc.source, + len(exc.caret.lstrip()), trailer=trailer) from None + except Exception: + print(err_trailer) + raise + def validate(self, data: 'QueryRuleData', meta: RuleMeta) -> None: """Validate an EQL query while checking TOMLRule.""" if meta.query_schema_validation is False or meta.maturity == "deprecated": @@ -82,31 +106,20 @@ class EQLValidator(QueryValidator): for stack_version, mapping in meta.get_validation_stack_versions().items(): beats_version = mapping['beats'] ecs_version = mapping['ecs'] - err_trailer = f'stack: {stack_version}, beats: {beats_version}, ecs: {ecs_version}' + endgame_version = mapping['endgame'] + err_trailer = f'stack: {stack_version}, beats: {beats_version},' \ + f'ecs: {ecs_version}, endgame: {endgame_version}' beat_types, beat_schema, schema = self.get_beats_schema(data.index or [], beats_version, ecs_version) + endgame_schema = self.get_endgame_schema(data.index, endgame_version) eql_schema = ecs.KqlSchema2Eql(schema) - try: - # TODO: switch to custom cidrmatch that allows ipv6 - with eql_schema, eql.parser.elasticsearch_syntax, eql.parser.ignore_missing_functions: - eql.parse_query(self.query) - except eql.EqlParseError as exc: - message = exc.error_msg - trailer = err_trailer - if "Unknown field" in message and beat_types: - trailer = f"\nTry adding event.module or event.dataset to specify beats module\n\n{trailer}" - elif "Field not recognized" in message: - text_fields = self.text_fields(eql_schema) - if text_fields: - fields_str = ', '.join(text_fields) - trailer = f"\neql does not support text fields: {fields_str}\n\n{trailer}" + # validate query against the beats and eql schema + self.validate_query_with_schema(schema=eql_schema, err_trailer=err_trailer, beat_types=beat_types) - raise exc.__class__(exc.error_msg, exc.line, exc.column, exc.source, - len(exc.caret.lstrip()), trailer=trailer) from None - except Exception: - print(err_trailer) - raise + if endgame_schema: + # validate query against the endgame schema + self.validate_query_with_schema(schema=endgame_schema, err_trailer=err_trailer) def extract_error_field(exc: Union[eql.EqlParseError, kql.KqlParseError]) -> Optional[str]: