diff --git a/detection_rules/beats.py b/detection_rules/beats.py index 00ffe518a..882bde7d6 100644 --- a/detection_rules/beats.py +++ b/detection_rules/beats.py @@ -181,17 +181,20 @@ def get_beats_sub_schema(schema: dict[str, Any], beat: str, module: str, *datase flattened: list[dict[str, Any]] = [] beat_dir = schema[beat] - module_dir = beat_dir.get("folders", {}).get("module", {}).get("folders", {}).get(module, {}) + # Normalize module name in case callers include quotes from rendered AST + normalized_module = module.strip("\"' ") + module_dir = beat_dir.get("folders", {}).get("module", {}).get("folders", {}).get(normalized_module, {}) # if we only have a module then we'll work with what we got all_datasets = datasets if datasets else [d for d in module_dir.get("folders", {}) if not d.startswith("_")] for _dataset in all_datasets: # replace aws.s3 -> s3 - dataset = _dataset[len(module) + 1 :] if _dataset.startswith(module + ".") else _dataset + ds = _dataset.strip("\"' ") + dataset = ds[len(normalized_module) + 1 :] if ds.startswith(normalized_module + ".") else ds dataset_dir = module_dir.get("folders", {}).get(dataset, {}) - flattened.extend(get_field_schema(dataset_dir, prefix=module + ".", include_common=True)) + flattened.extend(get_field_schema(dataset_dir, prefix=normalized_module + ".", include_common=True)) # we also need to capture (beta?) fields which are directly within the module _meta.files.fields flattened.extend(get_field_schema(module_dir, include_common=True)) @@ -268,11 +271,11 @@ def get_datasets_and_modules(tree: eql.ast.BaseNode | kql.ast.BaseNode) -> tuple and isinstance(node.right, eql.ast.String) ): if node.left == eql.ast.Field("event", ["module"]): - modules.add(node.right.render()) # type: ignore[reportUnknownMemberType] + modules.add(node.right.value) # type: ignore[reportUnknownMemberType] elif node.left == eql.ast.Field("event", ["dataset"]) or node.left == eql.ast.Field( "data_stream", ["dataset"] ): - datasets.add(node.right.render()) # type: ignore[reportUnknownMemberType] + datasets.add(node.right.value) # type: ignore[reportUnknownMemberType] elif isinstance(node, eql.ast.InSet): if node.expression == eql.ast.Field("event", ["module"]): modules.update(node.get_literals()) # type: ignore[reportUnknownMemberType] diff --git a/detection_rules/etc/non-ecs-schema.json b/detection_rules/etc/non-ecs-schema.json index 4e24ae9fa..36c96605d 100644 --- a/detection_rules/etc/non-ecs-schema.json +++ b/detection_rules/etc/non-ecs-schema.json @@ -1,4 +1,11 @@ { + "auditbeat-*": { + "auditd.data.addr": "keyword", + "auditd.data.grantors": "keyword", + "auditd.data.syscall": "keyword", + "auditd.data.terminal": "keyword", + "auditd.result": "keyword" + }, "endgame-*": { "endgame": { "metadata": { @@ -8,6 +15,9 @@ } }, "winlogbeat-*": { + "problemchild.prediction": "long", + "problemchild.prediction_probability": "long", + "blocklist_label": "long", "winlog": { "event_data": { "AccessList": "keyword", @@ -17,6 +27,7 @@ "AllowedToDelegateTo": "keyword", "AttributeLDAPDisplayName": "keyword", "AttributeValue": "keyword", + "AuditPolicyChangesDescription": "keyword", "CallerProcessName": "keyword", "CallTrace": "keyword", "ClientProcessId": "keyword", @@ -57,7 +68,9 @@ "Status": "keyword", "EnabledPrivilegeList": "keyword", "Operation": "keyword", - "OperationType": "keyword" + "OperationType": "keyword", + "NewUACList": "keyword", + "SubCategory": "keyword" } }, "winlog.logon.type": "keyword", @@ -199,6 +212,7 @@ "azure.platformlogs.properties.id": "keyword" }, "logs-o365.audit-*": { + "o365.audit.ExtendedProperties.RequestType": "keyword", "o365.audit.ExtendedProperties.ResultStatusDetail": "keyword", "o365.audit.OperationProperties.Name": "keyword", "o365.audit.OperationProperties.Value": "keyword", diff --git a/detection_rules/rule.py b/detection_rules/rule.py index 766382eb6..e80cd6848 100644 --- a/detection_rules/rule.py +++ b/detection_rules/rule.py @@ -710,8 +710,8 @@ class QueryValidator: @cached def get_endgame_schema(self, indices: list[str], endgame_version: str) -> endgame.EndgameSchema | None: """Get an assembled flat endgame schema.""" - - if indices and "endgame-*" not in indices: + # Only include endgame when explicitly requested by TOML via indices + if not indices or "endgame-*" not in indices: return None endgame_schema = endgame.read_endgame_schema(endgame_version=endgame_version) diff --git a/detection_rules/rule_validators.py b/detection_rules/rule_validators.py index 48e888e22..23d63f778 100644 --- a/detection_rules/rule_validators.py +++ b/detection_rules/rule_validators.py @@ -8,23 +8,29 @@ import re import typing from collections.abc import Callable +from dataclasses import dataclass from enum import Enum from functools import cached_property, wraps from typing import Any -import click import eql # type: ignore[reportMissingTypeStubs] import kql # type: ignore[reportMissingTypeStubs] from eql import ast # type: ignore[reportMissingTypeStubs] -from eql.parser import KvTree, LarkToEQL, NodeInfo, TypeHint # type: ignore[reportMissingTypeStubs] +from eql.parser import ( # type: ignore[reportMissingTypeStubs] + KvTree, + LarkToEQL, + NodeInfo, + TypeHint, +) from eql.parser import _parse as base_parse # type: ignore[reportMissingTypeStubs] from marshmallow import ValidationError from semver import Version from . import ecs, endgame +from .beats import get_datasets_and_modules, parse_beats_from_index from .config import CUSTOM_RULES_DIR, load_current_package_version, parse_rules_config from .custom_schemas import update_auto_generated_schema -from .integrations import get_integration_schema_data, load_integrations_manifests +from .integrations import get_integration_schema_data, load_integrations_manifests, parse_datasets from .rule import EQLRuleData, QueryRuleData, QueryValidator, RuleMeta, TOMLRuleContents, set_eql_config from .schemas import get_stack_schemas @@ -41,6 +47,20 @@ KQL_ERROR_TYPES = kql.KqlCompileError | kql.KqlParseError RULES_CONFIG = parse_rules_config() +@dataclass(frozen=True) +class ValidationTarget: + """A single validation target for a query.""" + + query_text: str + schema: Any + err_trailer: str + min_stack_version: str + kind: str # "integration" or "stack" + # Optional context about schema selection + beat_types: list[str] | None = None + integration_types: list[str] | None = None + + class ExtendedTypeHint(Enum): IP = "ip" @@ -65,7 +85,7 @@ def custom_in_set(self: LarkToEQL, node: KvTree) -> NodeInfo: if not outer.validate_type(ExtendedTypeHint.primitives()): # can't compare non-primitives to sets - raise self._type_error(outer, ExtendedTypeHint.primit()) + raise self._type_error(outer, ExtendedTypeHint.primitives()) # Check that everything inside the container has the same type as outside error_message = "Unable to compare {expected_type} to {actual_type}" @@ -134,187 +154,186 @@ class KQLValidator(QueryValidator): def to_eql(self) -> eql.ast.Expression: return kql.to_eql(self.query) # type: ignore[reportUnknownVariableType] + def _prepare_integration_schema( + self, base_schema: dict[str, Any], stack_version: str, data: QueryRuleData + ) -> dict[str, Any]: + """Augment a base integration schema with index/custom/endpoint fields.""" + schema = dict(base_schema) + for index_name in data.index_or_dataview: + schema.update(**ecs.flatten(ecs.get_index_schema(index_name))) + if data.index and CUSTOM_RULES_DIR: + for index_name in data.index_or_dataview: + schema.update(**ecs.flatten(ecs.get_custom_index_schema(index_name, stack_version))) + schema.update(**ecs.flatten(ecs.get_endpoint_schemas())) + return schema + + def build_validation_plan(self, data: QueryRuleData, meta: RuleMeta) -> list[ValidationTarget]: + """Return a unified list of validation targets for this query. + + Integration targets: union of integration schemas per stack version (if integrations are available) + Stack targets: ECS/beats/endgame schemas per supported stack version + """ + targets: list[ValidationTarget] = [] + + # Build integration-based targets if available + packages_manifest = load_integrations_manifests() + package_integrations = TOMLRuleContents.get_packaged_integrations(data, meta, packages_manifest) + + if package_integrations: + combined_by_stack: dict[str, dict[str, Any]] = {} + ecs_by_stack: dict[str, str] = {} + packages_by_stack: dict[str, set[str]] = {} + + for integ in get_integration_schema_data(data, meta, package_integrations): + stack_version = integ["stack_version"] + ecs_version = integ["ecs_version"] + package = integ["package"] + schema = self._prepare_integration_schema(integ["schema"], stack_version, data) + + _ = ecs_by_stack.setdefault(stack_version, ecs_version) + _ = packages_by_stack.setdefault(stack_version, set()).add(package) + combined_by_stack.setdefault(stack_version, {}).update(schema) + + for stack_version, schema_dict in combined_by_stack.items(): + ecs_version = ecs_by_stack.get(stack_version, "unknown") + pkgs_set = packages_by_stack.get(stack_version, set()) + pkgs = ", ".join(sorted(pkgs_set)) + err_trailer = ( + "Try adding event.module or event.dataset to specify integration module\n\n" + f"Checked against packages [{pkgs}]; stack: {stack_version}; ecs: {ecs_version}\n" + f"rule: {data.name} - {data.rule_id}" + ) + targets.append( + ValidationTarget( + query_text=self.query, + schema=schema_dict, + err_trailer=err_trailer, + min_stack_version=str(meta.min_stack_version or load_current_package_version()), + beat_types=None, + integration_types=sorted(pkgs_set), + kind="integration", + ) + ) + + # Build stack targets only when TOML indicates stack-based validation is needed + # - If no integration packages resolved, include stack targets as fallback + # - Or when beats or endgame indices are present + beat_types_present = parse_beats_from_index(data.index_or_dataview) if data.index_or_dataview else [] + endgame_present = bool(data.index_or_dataview and "endgame-*" in data.index_or_dataview) + should_add_stack_targets = (not package_integrations) or (bool(beat_types_present) or endgame_present) + if should_add_stack_targets: + for stack_version, mapping in meta.get_validation_stack_versions().items(): + beats_version = mapping["beats"] + ecs_version = mapping["ecs"] + beat_types, _, schema = self.get_beats_schema(data.index_or_dataview, beats_version, ecs_version) + err_trailer = ( + f"stack: {stack_version}, beats: {beats_version}, ecs: {ecs_version}\n" + f"rule: {data.name} - {data.rule_id}" + ) + targets.append( + ValidationTarget( + query_text=self.query, + schema=schema, + err_trailer=err_trailer, + min_stack_version=str(meta.min_stack_version or load_current_package_version()), + beat_types=beat_types, + integration_types=None, + kind="stack", + ) + ) + + return targets + def validate(self, data: QueryRuleData, meta: RuleMeta, max_attempts: int = 10) -> None: # type: ignore[reportIncompatibleMethod] - """Validate the query, called from the parent which contains [metadata] information.""" + """Validate the query using computed schema combinations, favoring integrations when present.""" if meta.query_schema_validation is False or meta.maturity == "deprecated": - # syntax only, which is done via self.ast return - if data.language != "lucene": - packages_manifest = load_integrations_manifests() - package_integrations = TOMLRuleContents.get_packaged_integrations(data, meta, packages_manifest) - for _ in range(max_attempts): - validation_checks: dict[str, KQL_ERROR_TYPES | None] = {"stack": None, "integrations": None} - # validate the query against fields within beats - validation_checks["stack"] = self.validate_stack_combos(data, meta) + if data.language == "lucene": + return - if package_integrations: - # validate the query against related integration fields - validation_checks["integrations"] = self.validate_integration(data, meta, package_integrations) + for _ in range(max_attempts): + all_targets = self.build_validation_plan(data, meta) + has_integration = any(t.kind == "integration" for t in all_targets) + # Order targets: integrations first (if any), then stack; otherwise just stack + ordered_targets = ( + [t for t in all_targets if t.kind == "integration"] + [t for t in all_targets if t.kind == "stack"] + if has_integration + else [t for t in all_targets if t.kind == "stack"] + ) + retry = False + for t in ordered_targets: + exc = self.validate_query_text_with_schema( + schema=t.schema, + err_trailer=t.err_trailer, + beat_types=t.beat_types, + integration_types=t.integration_types, + ) + if exc is None: + continue - if validation_checks["stack"] and not package_integrations: - # if auto add, try auto adding and then call stack_combo validation again - if validation_checks["stack"].error_msg == "Unknown field" and RULES_CONFIG.auto_gen_schema_file: # type: ignore[reportAttributeAccessIssue] - # auto add the field and re-validate - self.auto_add_field(validation_checks["stack"], data.index_or_dataview[0]) # type: ignore[reportArgumentType] - else: - raise validation_checks["stack"] - - if validation_checks["stack"] and validation_checks["integrations"]: - # if auto add, try auto adding and then call stack_combo validation again - if validation_checks["stack"].error_msg == "Unknown field" and RULES_CONFIG.auto_gen_schema_file: # type: ignore[reportAttributeAccessIssue] - # auto add the field and re-validate - self.auto_add_field(validation_checks["stack"], data.index_or_dataview[0]) # type: ignore[reportArgumentType] - else: - click.echo(f"Stack Error Trace: {validation_checks['stack']}") - click.echo(f"Integrations Error Trace: {validation_checks['integrations']}") - raise ValueError("Error in both stack and integrations checks") - - else: + # Attempt auto-add for missing fields when enabled + if ( + (exc.error_msg == "Unknown field" or "Field not recognized" in exc.error_msg) # type: ignore[reportAttributeAccessIssue] + and RULES_CONFIG.auto_gen_schema_file + and data.index_or_dataview + ): + self.auto_add_field(exc, data.index_or_dataview[0]) # type: ignore[reportArgumentType] + retry = True break - def validate_stack_combos(self, data: QueryRuleData, meta: RuleMeta) -> KQL_ERROR_TYPES | None: - """Validate the query against ECS and beats schemas across stack combinations.""" - for stack_version, mapping in meta.get_validation_stack_versions().items(): - beats_version = mapping["beats"] - ecs_version = mapping["ecs"] - err_trailer = f"stack: {stack_version}, beats: {beats_version}, ecs: {ecs_version}" + # Raise enriched error from helper + raise exc + if not retry: + # All targets passed + return - beat_types, _, schema = self.get_beats_schema(data.index_or_dataview, beats_version, ecs_version) + raise ValueError(f"Maximum validation attempts exceeded for {data.rule_id} - {data.name}") - try: - kql.parse(self.query, schema=schema, normalize_kql_keywords=RULES_CONFIG.normalize_kql_keywords) # type: ignore[reportUnknownMemberType] - except kql.KqlParseError as exc: - message = exc.error_msg - trailer = err_trailer - if "Unknown field" in message and beat_types: - trailer = f"\nTry adding event.module or data_stream.dataset to specify beats module\n\n{trailer}" - - return kql.KqlParseError( - exc.error_msg, # type: ignore[reportUnknownArgumentType] - exc.line, # type: ignore[reportUnknownArgumentType] - exc.column, # type: ignore[reportUnknownArgumentType] - exc.source, # type: ignore[reportUnknownArgumentType] - len(exc.caret.lstrip()), - trailer=trailer, - ) - return None - - def validate_integration( # noqa: PLR0912 + def validate_query_text_with_schema( self, - data: QueryRuleData, - meta: RuleMeta, - package_integrations: list[dict[str, Any]], + *, + schema: dict[str, Any], + err_trailer: str, + beat_types: list[str] | None, + integration_types: list[str] | None, ) -> KQL_ERROR_TYPES | None: - """Validate the query, called from the parent which contains [metadata] information.""" - if meta.query_schema_validation is False or meta.maturity == "deprecated": - return None - - error_fields = {} - package_schemas = {} - - # Initialize package_schemas with a nested structure - for integration_data in package_integrations: - package = integration_data["package"] - integration = integration_data["integration"] - if integration: - package_schemas.setdefault(package, {}).setdefault(integration, {}) # type: ignore[reportUnknownMemberType] - else: - package_schemas.setdefault(package, {}) # type: ignore[reportUnknownMemberType] - - # Process each integration schema - for integration_schema_data in get_integration_schema_data(data, meta, package_integrations): - package, integration = ( - integration_schema_data["package"], - integration_schema_data["integration"], + """Validate the KQL query text against a given schema and return an enriched error if it fails.""" + try: + kql.parse( # type: ignore[reportUnknownMemberType] + self.query, + schema=schema, + normalize_kql_keywords=RULES_CONFIG.normalize_kql_keywords, ) - integration_schema = integration_schema_data["schema"] - stack_version = integration_schema_data["stack_version"] - - # Add non-ecs-schema fields - for index_name in data.index_or_dataview: - integration_schema.update(**ecs.flatten(ecs.get_index_schema(index_name))) - - # Add custom schema fields for appropriate stack version - if data.index and CUSTOM_RULES_DIR: - for index_name in data.index_or_dataview: - integration_schema.update(**ecs.flatten(ecs.get_custom_index_schema(index_name, stack_version))) - - # Add endpoint schema fields for multi-line fields - integration_schema.update(**ecs.flatten(ecs.get_endpoint_schemas())) - if integration: - package_schemas[package][integration] = integration_schema - else: - package_schemas[package] = integration_schema - - # Validate the query against the schema - try: - kql.parse( # type: ignore[reportUnknownMemberType] - self.query, - schema=integration_schema, - normalize_kql_keywords=RULES_CONFIG.normalize_kql_keywords, + except kql.KqlParseError as exc: + # Compose an informative trailer + trailer_parts: list[str] = [] + if exc.error_msg == "Unknown field" and beat_types: + trailer_parts.insert( + 0, + "Try adding event.module or data_stream.dataset to specify beats module", ) - except kql.KqlParseError as exc: - if exc.error_msg == "Unknown field": - field = extract_error_field(self.query, exc) - trailer = ( - f"\n\tTry adding event.module or data_stream.dataset to specify integration module\n\t" - f"Will check against integrations {meta.integration} combined.\n\t" - f"{package=}, {integration=}, {integration_schema_data['package_version']=}, " - f"{integration_schema_data['stack_version']=}, " - f"{integration_schema_data['ecs_version']=}" - ) - error_fields[field] = { - "error": exc, - "trailer": trailer, - "package": package, - "integration": integration, - } - if data.get("notify", False): - print(f"\nWarning: `{field}` in `{data.name}` not found in schema. {trailer}") - else: - return kql.KqlParseError( - exc.error_msg, # type: ignore[reportUnknownArgumentType] - exc.line, # type: ignore[reportUnknownArgumentType] - exc.column, # type: ignore[reportUnknownArgumentType] - exc.source, # type: ignore[reportUnknownArgumentType] - len(exc.caret.lstrip()), - exc.trailer, # type: ignore[reportUnknownArgumentType] - ) + if integration_types: + pkgs = ", ".join(integration_types) + trailer_parts.append(f"integration_types: [{pkgs}]") + if beat_types: + trailer_parts.append(f"beat_types: [{', '.join(beat_types)}]") - # Check error fields against schemas of different packages or different integrations - for field, error_data in list(error_fields.items()): # type: ignore[reportUnknownArgumentType] - error_package, error_integration = ( # type: ignore[reportUnknownVariableType] - error_data["package"], - error_data["integration"], - ) - for package, integrations_or_schema in package_schemas.items(): # type: ignore[reportUnknownVariableType] - if error_integration is None: - # Compare against the schema directly if there's no integration - if error_package != package and field in integrations_or_schema: - del error_fields[field] - break - else: - # Compare against integration schemas - for integration, schema in integrations_or_schema.items(): # type: ignore[reportUnknownMemberType] - check_alt_schema = error_package != package or ( # type: ignore[reportUnknownVariableType] - error_package == package and error_integration != integration - ) - if check_alt_schema and field in schema: - del error_fields[field] + if err_trailer: + trailer_parts.append(err_trailer) + + trailer = "\n\n".join(tp for tp in trailer_parts if tp) - # Raise the first error - if error_fields: - _, error_data = next(iter(error_fields.items())) # type: ignore[reportUnknownVariableType] return kql.KqlParseError( - error_data["error"].error_msg, # type: ignore[reportUnknownArgumentType] - error_data["error"].line, # type: ignore[reportUnknownArgumentType] - error_data["error"].column, # type: ignore[reportUnknownArgumentType] - error_data["error"].source, # type: ignore[reportUnknownArgumentType] - len(error_data["error"].caret.lstrip()), # type: ignore[reportUnknownArgumentType] - error_data["trailer"], # type: ignore[reportUnknownArgumentType] + exc.error_msg, # type: ignore[reportUnknownArgumentType] + exc.line, # type: ignore[reportUnknownArgumentType] + exc.column, # type: ignore[reportUnknownArgumentType] + exc.source, # type: ignore[reportUnknownArgumentType] + len(exc.caret.lstrip()), + trailer=trailer or None, # type: ignore[reportUnknownArgumentType] ) - return None + else: + return None class EQLValidator(QueryValidator): @@ -339,7 +358,7 @@ class EQLValidator(QueryValidator): def unique_fields(self) -> list[str]: # type: ignore[reportIncompatibleMethodOverride] return list({str(f) for f in self.ast if isinstance(f, eql.ast.Field)}) # type: ignore[reportUnknownVariableType] - def auto_add_field(self, validation_checks_error: eql.errors.EqlParseError, index_or_dataview: str) -> None: + def auto_add_field(self, validation_checks_error: eql.EqlParseError, index_or_dataview: str) -> None: """Auto add a missing field to the schema.""" field_name = extract_error_field(self.query, validation_checks_error) if not field_name: @@ -347,237 +366,312 @@ class EQLValidator(QueryValidator): field_type = ecs.get_all_flattened_schema().get(field_name) update_auto_generated_schema(index_or_dataview, field_name, field_type) - def validate(self, data: "QueryRuleData", meta: RuleMeta, max_attempts: int = 10) -> None: # type: ignore[reportIncompatibleMethodOverride] # noqa: PLR0912 - """Validate an EQL query while checking TOMLRule.""" + def _build_synthetic_sequence_from_subquery(self, subquery: "ast.SubqueryBy") -> str: + """Build a minimal synthetic sequence containing the subquery for validation.""" + subquery_text = str(subquery) + join_fields = ", ".join(map(str, getattr(subquery, "join_values", []) or [])) + dummy_by = f" by {join_fields}" if join_fields else "" + return f"sequence\n {subquery_text}\n [any where true]{dummy_by}" + + def build_validation_plan(self, data: "QueryRuleData", meta: RuleMeta) -> list[ValidationTarget]: # noqa: PLR0912 PLR0915 + """Return a unified list of validation targets for EQL validation. + + Non-sequence: accumulate integration schemas per stack, optionally add stack schemas. + Sequence: build per-subquery integration targets using synthetic sequences; for datasetless + subqueries without metadata integrations, add per-subquery stack targets; optionally add + whole-query stack schemas when indicated by TOML (indices present) or no integrations. + """ + targets: list[ValidationTarget] = [] + + is_sequence = getattr(data, "is_sequence", False) + min_stack_str = str(meta.min_stack_version or load_current_package_version()) + # Sequence planning below may add per-subquery stack targets when needed + + packages_manifest = load_integrations_manifests() + packaged_integrations = TOMLRuleContents.get_packaged_integrations(data, meta, packages_manifest) + beat_types_present = parse_beats_from_index(data.index_or_dataview) if data.index_or_dataview else [] + endgame_present = bool(data.index_or_dataview and "endgame-*" in data.index_or_dataview) + + # Helper for union-by-stack integration targets + def add_accumulated_integration_targets(query_text: str, packaged: list[dict[str, Any]], context: str) -> None: + combined_by_stack: dict[str, dict[str, Any]] = {} + ecs_by_stack: dict[str, str] = {} + packages_by_stack: dict[str, set[str]] = {} + for integ in get_integration_schema_data(data, meta, packaged): + stack_version = integ["stack_version"] + ecs_version = integ["ecs_version"] + package = integ["package"] + schema = integ["schema"] + # prepare with index/custom/endpoint fields + if data.index_or_dataview: + for index_name in data.index_or_dataview: # type: ignore[reportArgumentType] + schema.update(**ecs.flatten(ecs.get_index_schema(index_name))) + if data.index and CUSTOM_RULES_DIR: + for index_name in data.index_or_dataview: + schema.update(**ecs.flatten(ecs.get_custom_index_schema(index_name, stack_version))) + schema.update(**ecs.flatten(ecs.get_endpoint_schemas())) + + # Do not merge Beats into integration schemas; validate independently via stack targets + + _ = ecs_by_stack.setdefault(stack_version, ecs_version) + packages_by_stack.setdefault(stack_version, set()).add(package) + combined_by_stack.setdefault(stack_version, {}).update(schema) + + for stack_version, schema_dict in combined_by_stack.items(): + ecs_version = ecs_by_stack.get(stack_version, "unknown") + pkgs_set = packages_by_stack.get(stack_version, set()) + pkgs = ", ".join(sorted(pkgs_set)) + err_trailer = ( + f"{context}\nChecked against packages [{pkgs}]; stack: {stack_version}; ecs: {ecs_version}\n" + f"rule: {data.name} - {data.rule_id}" + ) + targets.append( + ValidationTarget( + query_text=query_text, + schema=ecs.KqlSchema2Eql(schema_dict), + err_trailer=err_trailer, + min_stack_version=min_stack_str, + beat_types=None, + integration_types=sorted(pkgs_set), + kind="integration", + ) + ) + + # Helper to add Beats/ECS (and optionally Endgame) stack targets for a given query text + def add_stack_targets(query_text: str, include_endgame: bool) -> None: + for stack_version, mapping in meta.get_validation_stack_versions().items(): + beats_version = mapping["beats"] + ecs_version = mapping["ecs"] + endgame_version = mapping["endgame"] + + beat_types, _, kql_schema = self.get_beats_schema(data.index_or_dataview, beats_version, ecs_version) + err_trailer = ( + f"stack: {stack_version}, beats: {beats_version},ecs: {ecs_version}, endgame: {endgame_version}\n" + f"rule: {data.name} - {data.rule_id}" + ) + # ECS (+beats if present) + targets.append( + ValidationTarget( + query_text=query_text, + schema=ecs.KqlSchema2Eql(kql_schema), + err_trailer=err_trailer, + min_stack_version=min_stack_str, + beat_types=beat_types, + integration_types=None, + kind="stack", + ) + ) + # Optionally add Endgame + if include_endgame: + endgame_schema = self.get_endgame_schema(data.index_or_dataview, endgame_version) + if endgame_schema: + targets.append( + ValidationTarget( + query_text=query_text, + schema=endgame_schema, + err_trailer=err_trailer, + min_stack_version=min_stack_str, + beat_types=None, + integration_types=None, + kind="stack", + ) + ) + + # Sequence queries: per-subquery validation + if is_sequence: + sequence: ast.Sequence = self.ast.first # type: ignore[reportAttributeAccessIssue] + for subquery in sequence.queries: # type: ignore[reportUnknownVariableType] + subquery_datasets, _ = get_datasets_and_modules(subquery) # type: ignore[reportUnknownVariableType] + synthetic_sequence = self._build_synthetic_sequence_from_subquery(subquery) # type: ignore[reportArgumentType] + + if subquery_datasets: + subquery_pkg_ints = parse_datasets(list(subquery_datasets), packages_manifest) + # Per-subquery: validate each integration combination individually (no accumulation) + for integ in get_integration_schema_data(data, meta, subquery_pkg_ints): + package = integ["package"] + package_version = integ["package_version"] + stack_version = integ["stack_version"] + ecs_version = integ["ecs_version"] + schema_dict = integ["schema"] + + # prepare schema + if data.index_or_dataview: + for index_name in data.index_or_dataview: # type: ignore[reportArgumentType] + schema_dict.update(**ecs.flatten(ecs.get_index_schema(index_name))) + if data.index and CUSTOM_RULES_DIR: + for index_name in data.index_or_dataview: + schema_dict.update( + **ecs.flatten(ecs.get_custom_index_schema(index_name, stack_version)) + ) + schema_dict.update(**ecs.flatten(ecs.get_endpoint_schemas())) + + err_trailer = ( + "Subquery schema mismatch. " + f"package: {package}, package_version: {package_version}, " + f"stack: {stack_version}, ecs: {ecs_version}\n" + f"rule: {data.name} - {data.rule_id}" + ) + targets.append( + ValidationTarget( + query_text=synthetic_sequence, + schema=ecs.KqlSchema2Eql(schema_dict), + err_trailer=err_trailer, + min_stack_version=min_stack_str, + beat_types=None, + integration_types=[package], + kind="integration", + ) + ) + # Additionally validate this subquery against Beats/ECS if beats indices are present + if beat_types_present: + add_stack_targets(synthetic_sequence, include_endgame=False) + else: + # Datasetless subquery: try metadata integrations first, else add per-subquery stack targets + meta_integrations = meta.integration + if isinstance(meta_integrations, str): + meta_integrations = [meta_integrations] + elif meta_integrations is None: + meta_integrations = [] + + if meta_integrations: + meta_pkg_ints = [ + {"package": pkg, "integration": None} + for pkg in meta_integrations + if pkg in packages_manifest + ] + add_accumulated_integration_targets( + synthetic_sequence, + meta_pkg_ints, + "Datasetless subquery validation against metadata integrations", + ) + # Also validate datasetless subquery against Beats/ECS if beats indices are present + if beat_types_present: + add_stack_targets(synthetic_sequence, include_endgame=False) + else: + # Add stack targets for this datasetless subquery + add_stack_targets(synthetic_sequence, include_endgame=True) + + elif packaged_integrations: + # Non-sequence queries: accumulate integrations per stack if available + add_accumulated_integration_targets( + self.query, + packaged_integrations, + "Try adding event.module or event.dataset to specify integration module", + ) + + # Stack targets for whole query: + # - always when no integrations are resolved; OR + # - for non-sequence queries when beats or endgame indices are present + need_stack_targets = (not packaged_integrations) or ( + (not is_sequence) and (beat_types_present or endgame_present) + ) + if need_stack_targets: + add_stack_targets(self.query, include_endgame=True) + + return targets + + def validate(self, data: "QueryRuleData", meta: RuleMeta, max_attempts: int = 10) -> None: # type: ignore[reportIncompatibleMethodOverride] + """Validate an EQL query using a unified plan of schema combinations.""" if meta.query_schema_validation is False or meta.maturity == "deprecated": - # syntax only, which is done via self.ast return - if data.language != "lucene": - packages_manifest = load_integrations_manifests() - package_integrations = TOMLRuleContents.get_packaged_integrations(data, meta, packages_manifest) + if data.language == "lucene": + return - for _ in range(max_attempts): - validation_checks = {"stack": None, "integrations": None} - # validate the query against fields within beats - validation_checks["stack"] = self.validate_stack_combos(data, meta) # type: ignore[reportArgumentType] + # Validate rule type configuration fields against ECS schema + set_fields, has_invalid = self.validate_rule_type_configurations(data, meta) # type: ignore[reportArgumentType] + if has_invalid and set_fields: + raise ValueError(f"Rule type configuration fields not in ECS schema: {', '.join(set_fields)}") - stack_check = validation_checks["stack"] - - if package_integrations: - # validate the query against related integration fields - validation_checks["integrations"] = self.validate_integration(data, meta, package_integrations) # type: ignore[reportArgumentType] - - if stack_check and not package_integrations: - # if auto add, try auto adding and then validate again - if ( - "Field not recognized" in str(stack_check) # type: ignore[reportUnknownMemberType] - and RULES_CONFIG.auto_gen_schema_file - ): - # auto add the field and re-validate - self.auto_add_field(stack_check, data.index_or_dataview[0]) # type: ignore[reportArgumentType] - else: - raise stack_check - - elif stack_check and validation_checks["integrations"]: - # if auto add, try auto adding and then validate again - if ( - "Field not recognized" in stack_check.error_msg # type: ignore[reportUnknownMemberType] - and RULES_CONFIG.auto_gen_schema_file - ): - # auto add the field and re-validate - self.auto_add_field(stack_check, data.index_or_dataview[0]) # type: ignore[reportArgumentType] - else: - click.echo(f"Stack Error Trace: {stack_check}") - click.echo(f"Integrations Error Trace: {validation_checks['integrations']}") - raise ValueError("Error in both stack and integrations checks") - - else: + for _ in range(max_attempts): + all_targets = self.build_validation_plan(data, meta) + has_integration = any(t.kind == "integration" for t in all_targets) + # Order targets: integrations first (if any), then stack; otherwise just stack + ordered_targets = ( + [t for t in all_targets if t.kind == "integration"] + [t for t in all_targets if t.kind == "stack"] + if has_integration + else [t for t in all_targets if t.kind == "stack"] + ) + first_error: EQL_ERROR_TYPES | ValueError | None = None + for t in ordered_targets: + exc = self.validate_query_text_with_schema( + t.query_text, + t.schema, + err_trailer=t.err_trailer, + min_stack_version=t.min_stack_version, + beat_types=t.beat_types, + integration_types=t.integration_types, + ) + if exc is not None: + first_error = exc break - else: - raise ValueError(f"Maximum validation attempts exceeded for {data.rule_id} - {data.name}") + if first_error is None: + # All targets passed + return - rule_type_config_fields, rule_type_config_validation_failed = self.validate_rule_type_configurations( - data, # type: ignore[reportArgumentType] - meta, - ) - if rule_type_config_validation_failed: - raise ValueError( - f"""Rule type config values are not ECS compliant, check these values: - {rule_type_config_fields}""" - ) + # Attempt auto-add only when unknown field and enabled; then retry + if ( + isinstance(first_error, eql.EqlParseError) + and "Field not recognized" in str(first_error) + and RULES_CONFIG.auto_gen_schema_file + and data.index_or_dataview + ): + self.auto_add_field(first_error, data.index_or_dataview[0]) # type: ignore[reportArgumentType] + continue - def validate_stack_combos(self, data: QueryRuleData, meta: RuleMeta) -> EQL_ERROR_TYPES | None | ValueError: - """Validate the query against ECS and beats schemas across stack combinations.""" - for stack_version, mapping in meta.get_validation_stack_versions().items(): - beats_version = mapping["beats"] - ecs_version = mapping["ecs"] - endgame_version = mapping["endgame"] - err_trailer = ( - f"stack: {stack_version}, beats: {beats_version},ecs: {ecs_version}, endgame: {endgame_version}" - ) + # Raise the enriched parse error (includes target trailer + metadata) + raise first_error - beat_types, _, schema = self.get_beats_schema(data.index_or_dataview, beats_version, ecs_version) - endgame_schema = self.get_endgame_schema(data.index_or_dataview, endgame_version) - eql_schema = ecs.KqlSchema2Eql(schema) + raise ValueError(f"Maximum validation attempts exceeded for {data.rule_id} - {data.name}") - # validate query against the beats and eql schema - exc = self.validate_query_with_schema( # type: ignore[reportUnknownVariableType] - data=data, - schema=eql_schema, - err_trailer=err_trailer, - beat_types=beat_types, - min_stack_version=meta.min_stack_version, # type: ignore[reportArgumentType] - ) - if exc: - return exc - - if endgame_schema: - # validate query against the endgame schema - exc = self.validate_query_with_schema( - data=data, - schema=endgame_schema, - err_trailer=err_trailer, - min_stack_version=meta.min_stack_version, # type: ignore[reportArgumentType] - ) - if exc: - raise exc - return None - - def validate_integration( # noqa: PLR0912 + def validate_query_text_with_schema( # noqa: PLR0913 self, - data: QueryRuleData, - meta: RuleMeta, - package_integrations: list[dict[str, Any]], - ) -> EQL_ERROR_TYPES | None | ValueError: - """Validate an EQL query while checking TOMLRule against integration schemas.""" - if meta.query_schema_validation is False or meta.maturity == "deprecated": - # syntax only, which is done via self.ast - return None - - error_fields = {} - package_schemas: dict[str, Any] = {} - - # Initialize package_schemas with a nested structure - for integration_data in package_integrations: - package = integration_data["package"] - integration = integration_data["integration"] - if integration: - package_schemas.setdefault(package, {}).setdefault(integration, {}) - else: - package_schemas.setdefault(package, {}) - - # Process each integration schema - for integration_schema_data in get_integration_schema_data(data, meta, package_integrations): - ecs_version = integration_schema_data["ecs_version"] - package, integration = ( - integration_schema_data["package"], - integration_schema_data["integration"], - ) - package_version = integration_schema_data["package_version"] - integration_schema = integration_schema_data["schema"] - stack_version = integration_schema_data["stack_version"] - - # add non-ecs-schema fields for edge cases not added to the integration - if data.index_or_dataview: - for index_name in data.index_or_dataview: - integration_schema.update(**ecs.flatten(ecs.get_index_schema(index_name))) - - # Add custom schema fields for appropriate stack version - if data.index_or_dataview and CUSTOM_RULES_DIR: - for index_name in data.index_or_dataview: - integration_schema.update(**ecs.flatten(ecs.get_custom_index_schema(index_name, stack_version))) - - # add endpoint schema fields for multi-line fields - integration_schema.update(**ecs.flatten(ecs.get_endpoint_schemas())) - package_schemas[package].update(**integration_schema) - - eql_schema = ecs.KqlSchema2Eql(integration_schema) - err_trailer = ( - f"stack: {stack_version}, integration: {integration}," - f"ecs: {ecs_version}, package: {package}, package_version: {package_version}" - ) - - # Validate the query against the schema - exc = self.validate_query_with_schema( - data=data, - schema=eql_schema, - err_trailer=err_trailer, - min_stack_version=meta.min_stack_version, # type: ignore[reportArgumentType] - ) - - if isinstance(exc, eql.EqlParseError): - message = exc.error_msg # type: ignore[reportUnknownVariableType] - if message == "Unknown field" or "Field not recognized" in message: - field = extract_error_field(self.query, exc) - trailer = ( - f"\n\tTry adding event.module or data_stream.dataset to specify integration module\n\t" - f"Will check against integrations {meta.integration} combined.\n\t" - f"{package=}, {integration=}, {package_version=}, " - f"{stack_version=}, {ecs_version=}" - ) - error_fields[field] = { - "error": exc, - "trailer": trailer, - "package": package, - "integration": integration, - } - if data.get("notify", False): - print(f"\nWarning: `{field}` in `{data.name}` not found in schema. {trailer}") - else: - return exc - - # Check error fields against schemas of different packages or different integrations - for field, error_data in list(error_fields.items()): # type: ignore[reportUnknownArgumentType] - error_package, error_integration = ( # type: ignore[reportUnknownVariableType] - error_data["package"], - error_data["integration"], - ) - for package, integrations_or_schema in package_schemas.items(): - if error_integration is None: - # Compare against the schema directly if there's no integration - if error_package != package and field in integrations_or_schema: - del error_fields[field] - else: - # Compare against integration schemas - for integration, schema in integrations_or_schema.items(): - check_alt_schema = ( # type: ignore[reportUnknownVariableType] - error_package != package or (error_package == package and error_integration != integration) - ) - if check_alt_schema and field in schema: - del error_fields[field] - - # raise the first error - if error_fields: - _, data = next(iter(error_fields.items())) # type: ignore[reportUnknownArgumentType] - return data["error"] # type: ignore[reportIndexIssue] - return None - - def validate_query_with_schema( - self, - data: "QueryRuleData", # noqa: ARG002 + query_text: str, schema: ecs.KqlSchema2Eql | endgame.EndgameSchema, err_trailer: str, min_stack_version: str, beat_types: list[str] | None = None, + integration_types: list[str] | None = None, ) -> EQL_ERROR_TYPES | ValueError | None: - """Validate the query against the schema.""" + """Validate the provided EQL query text against the schema (variant of validate_query_with_schema).""" try: config = set_eql_config(min_stack_version) with config, schema, eql.parser.elasticsearch_syntax, eql.parser.ignore_missing_functions: - _ = eql.parse_query(self.query) # type: ignore[reportUnknownMemberType] + _ = eql.parse_query(query_text) # type: ignore[reportUnknownMemberType] except eql.EqlParseError as exc: message = exc.error_msg - trailer = err_trailer + trailer_parts: list[str] = [] + # If the error is an unknown field and the field was referenced as optional (prefixed with '?'), + # treat this target as non-fatal to honor EQL optional semantics. + + field = extract_error_field(query_text, exc) + if ( + field + and ("Unknown field" in message or "Field not recognized" in message) + and f"?{field}" in self.query + ): + return None if "Unknown field" in message and beat_types: - trailer = f"\nTry adding event.module or data_stream.dataset to specify beats module\n\n{trailer}" - elif "Field not recognized" in message: + trailer_parts.insert(0, "Try adding event.module or event.dataset to specify beats module") + elif "Field not recognized" in message and isinstance(schema, ecs.KqlSchema2Eql): text_fields = self.text_fields(schema) if text_fields: fields_str = ", ".join(text_fields) - trailer = f"\neql does not support text fields: {fields_str}\n\n{trailer}" + trailer_parts.insert(0, f"eql does not support text fields: {fields_str}") + # Surface integration packages if available + if integration_types: + pkgs = ", ".join(integration_types) + trailer_parts.append(f"integration_types: [{pkgs}]") + # Surface beat types if available (stack plan) + if beat_types: + trailer_parts.append(f"beat_types: [{', '.join(beat_types)}]") + + if err_trailer: + trailer_parts.append(err_trailer) + + trailer = "\n\n".join(tp for tp in trailer_parts if tp) return exc.__class__( exc.error_msg, # type: ignore[reportUnknownArgumentType] exc.line, # type: ignore[reportUnknownArgumentType] @@ -586,30 +680,34 @@ class EQLValidator(QueryValidator): len(exc.caret.lstrip()), trailer=trailer, ) - except Exception as exc: # noqa: BLE001 print(err_trailer) return exc # type: ignore[reportReturnType] - def validate_rule_type_configurations(self, data: EQLRuleData, meta: RuleMeta) -> tuple[list[str | None], bool]: - """Validate EQL rule type configurations.""" - if data.timestamp_field or data.event_category_override or data.tiebreaker_field: - # get a list of rule type configuration fields - # Get a list of rule type configuration fields - fields = ["timestamp_field", "event_category_override", "tiebreaker_field"] - set_fields = list(filter(None, (data.get(field) for field in fields))) # type: ignore[reportUnknownVariableType] + def validate_rule_type_configurations(self, data: EQLRuleData, meta: RuleMeta) -> tuple[list[str], bool]: + """Validate EQL rule type configurations (timestamp_field, event_category_override, tiebreaker_field). - # get stack_version and ECS schema - min_stack_version = meta.get("min_stack_version") - if min_stack_version is None: - min_stack_version = Version.parse(load_current_package_version(), optional_minor_and_patch=True) - ecs_version = get_stack_schemas()[str(min_stack_version)]["ecs"] - schema = ecs.get_schema(ecs_version) + Returns a tuple of the list of configured field names (non-empty) and a boolean indicating whether + any are not present in the ECS schema for the rule's minimum stack version (or current package version). + """ + configured: list[str] = [] + if data.timestamp_field: + configured.append(data.timestamp_field) + if data.event_category_override: + configured.append(data.event_category_override) + if data.tiebreaker_field: + configured.append(data.tiebreaker_field) - # return a list of rule type config field values and whether any are not in the schema - return (set_fields, any(f not in schema for f in set_fields)) # type: ignore[reportUnknownVariableType] - # if rule type fields are not set, return an empty list and False - return [], False + if not configured: + return [], False + + stack_version = str(meta.min_stack_version or load_current_package_version()) + min_stack_version = str(Version.parse(stack_version, optional_minor_and_patch=True)) + stack_map = get_stack_schemas(stack_version) + ecs_version = stack_map[min_stack_version]["ecs"] + schema = ecs.get_schema(ecs_version) + + return configured, any(f not in schema for f in configured) class ESQLValidator(QueryValidator): diff --git a/pyproject.toml b/pyproject.toml index f3f25759c..85c1850f1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "detection_rules" -version = "1.3.33" +version = "1.4.0" description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine." readme = "README.md" requires-python = ">=3.12" diff --git a/tests/test_all_rules.py b/tests/test_all_rules.py index d692dc3e7..58af2b798 100644 --- a/tests/test_all_rules.py +++ b/tests/test_all_rules.py @@ -34,7 +34,6 @@ from detection_rules.rule import ( TOMLRuleContents, ) from detection_rules.rule_loader import FILE_PATTERN, RULES_CONFIG -from detection_rules.rule_validators import EQLValidator, KQLValidator from detection_rules.schemas import definitions, get_min_supported_stack_version, get_stack_schemas from detection_rules.utils import INTEGRATION_RULE_DIR, PatchedTemplate, get_path, load_etc_dump, make_git from detection_rules.version_lock import loaded_version_lock @@ -997,7 +996,7 @@ class TestRuleMetadata(BaseRuleTest): build_rule(query, "eql") for query in invalid_integration_queries_eql: - with self.assertRaises(ValueError): + with self.assertRaises(eql.EqlSchemaError): build_rule(query, "eql") # kql for query in valid_queries_kql: @@ -1008,38 +1007,9 @@ class TestRuleMetadata(BaseRuleTest): build_rule(query, "kuery") for query in invalid_integration_queries_kql: - with self.assertRaises(ValueError): + with self.assertRaises(kql.KqlParseError): build_rule(query, "kuery") - def test_event_dataset(self): - for rule in self.all_rules: - if isinstance(rule.contents.data, QueryRuleData): - # Need to pick validator based on language - if rule.contents.data.language == "kuery": - test_validator = KQLValidator(rule.contents.data.query) - elif rule.contents.data.language == "eql": - test_validator = EQLValidator(rule.contents.data.query) - else: - continue - data = rule.contents.data - meta = rule.contents.metadata - if (meta.query_schema_validation is not False or meta.maturity != "deprecated") and ( - isinstance(data, QueryRuleData) and data.language != "lucene" - ): - packages_manifest = load_integrations_manifests() - pkg_integrations = TOMLRuleContents.get_packaged_integrations(data, meta, packages_manifest) - - validation_integrations_check = None - - if pkg_integrations: - # validate the query against related integration fields - validation_integrations_check = test_validator.validate_integration( - data, meta, pkg_integrations - ) - - if validation_integrations_check and "event.dataset" in rule.contents.data.query: - raise validation_integrations_check - def test_min_stack_version_supported(self): """Test that rules have a min_stack_version that is supported in stack-schema-map.yaml.""" failures = [] diff --git a/tests/test_python_library.py b/tests/test_python_library.py index 7a02f8900..b62a9e3b5 100644 --- a/tests/test_python_library.py +++ b/tests/test_python_library.py @@ -3,64 +3,283 @@ # 2.0; you may not use this file except in compliance with the Elastic License # 2.0. +import eql + from detection_rules.rule_loader import RuleCollection from .base import BaseRuleTest -class TestEQLInSet(BaseRuleTest): - """Test EQL rule query in set override.""" +def mk_metadata(integrations: list[str], comments: str = "Test metadata") -> dict: + """Create rule metadata dictionary.""" + return { + "creation_date": "2020/12/15", + "integration": integrations, + "maturity": "production", + "min_stack_comments": comments, + "min_stack_version": "8.3.0", + "updated_date": "2024/08/30", + } - def test_eql_in_set(self): - """Test that the query validation is working correctly.""" + +def mk_rule( + *, + name: str, + rule_id: str, + description: str, + risk_score: int, + query: str, +) -> dict: + """Create rule dictionary.""" + return { + "author": ["Elastic"], + "description": description, + "language": "eql", + "name": name, + "risk_score": risk_score, + "rule_id": rule_id, + "severity": "low", + "type": "eql", + "query": query, + } + + +class TestEQLInSet(BaseRuleTest): + """Test EQL rule query in_set override (separate failing and passing cases).""" + + def test_eql_in_set_invalid_ip(self) -> None: rc = RuleCollection() - eql_rule = { - "metadata": { - "creation_date": "2020/12/15", - "integration": ["endpoint", "windows"], - "maturity": "production", - "min_stack_comments": "New fields added: required_fields, related_integrations, setup", - "min_stack_version": "8.3.0", - "updated_date": "2024/03/26", - }, - "rule": { - "author": ["Elastic"], - "description": """ - Test Rule. - """, - "false_positives": ["Fake."], - "from": "now-9m", - "index": ["winlogbeat-*", "logs-endpoint.events.*", "logs-windows.sysmon_operational-*"], - "language": "eql", - "license": "Elastic License v2", - "name": "Fake Test Rule", - "references": [ - "https://example.com", - ], - "risk_score": 47, - "rule_id": "4fffae5d-8b7d-4e48-88b1-979ed42fd9a3", - "severity": "medium", - "tags": [ - "Domain: Endpoint", - "OS: Windows", - "Use Case: Threat Detection", - "Tactic: Execution", - "Data Source: Elastic Defend", - "Data Source: Sysmon", - ], - "type": "eql", - "query": """ - sequence by host.id, process.entity_id with maxspan = 5s - [network where destination.ip in ("127.0.0.1", "::1")] - """, - }, + query = """ + sequence by host.id, process.entity_id with maxspan = 5s + [network where destination.ip in ("127.0.0.1", "::1")] + """ + rule_dict = { + "metadata": mk_metadata( + ["endpoint", "windows"], comments="New fields added: required_fields, related_integrations, setup" + ), + "rule": mk_rule( + name="Fake Test Rule", + rule_id="4fffae5d-8b7d-4e48-88b1-979ed42fd9a3", + description="Test Rule.", + risk_score=47, + query=query, + ), } - expected_error_message = r"Error in both stack and integrations checks" - with self.assertRaisesRegex(ValueError, expected_error_message): - rc.load_dict(eql_rule) - # Change to appropriate destination.address field - eql_rule["rule"]["query"] = """ + with self.assertRaisesRegex(eql.EqlTypeMismatchError, r"Unable to compare ip to string"): + rc.load_dict(rule_dict) + + def test_eql_in_set_valid_address(self) -> None: + rc = RuleCollection() + query = """ sequence by host.id, process.entity_id with maxspan = 10s [network where destination.address in ("192.168.1.1", "::1")] """ - rc.load_dict(eql_rule) + rule_dict = { + "metadata": mk_metadata( + ["endpoint", "windows"], comments="New fields added: required_fields, related_integrations, setup" + ), + "rule": mk_rule( + name="Fake Test Rule", + rule_id="4fffae5d-8b7d-4e48-88b1-979ed42fd9a3", + description="Test Rule.", + risk_score=47, + query=query, + ), + } + rc.load_dict(rule_dict) + + +class TestEQLSequencePerIntegration(BaseRuleTest): + """Tests for per-subquery EQL validation against the correct integration.package schema.""" + + def test_sequence_valid_per_package(self) -> None: + """Test that a sequence with subquerys from different packages validates correctly.""" + rc = RuleCollection() + query = """ + sequence with maxspan=30m + [any where event.dataset == "azure.identity_protection"] by azure.identityprotection.properties.user_principal_name + [any where event.dataset == "azure.auditlogs"] by azure.auditlogs.properties.initiated_by.user.userPrincipalName + """ + rule = { + "metadata": mk_metadata(["azure"], comments="Per-subquery integration validation"), + "rule": mk_rule( + name="EQL sequence per integration test", + rule_id="1b6e2f77-8e1f-4f8d-9f72-1d8e5f3e5f11", + description="Validate per-subquery integration.package schemas.", + risk_score=40, + query=query, + ), + } + # Should load without error because each subquery validates against its own package schema + rc.load_dict(rule) + + def test_sequence_invalid_join_field_wrong_package(self) -> None: + """Test that a sequence with a join field from a different package fails validation.""" + rc = RuleCollection() + query = """ + sequence with maxspan=30m + [any where event.dataset == "azure.identity_protection"] by azure.identityprotection.properties.user_principal_name + [any where event.dataset == "azure.identity_protection"] by azure.auditlogs.properties.initiated_by.user.userPrincipalName + """ + bad_rule = { + "metadata": mk_metadata(["azure"], comments="Per-subquery integration validation"), + "rule": mk_rule( + name="EQL sequence per integration test", + rule_id="1b6e2f77-8e1f-4f8d-9f72-1d8e5f3e5f11", + description="Validate per-subquery integration.package schemas.", + risk_score=40, + query=query, + ), + } + # Expect failure: join field belongs to a different package than the subquery dataset + with self.assertRaisesRegex(eql.EqlSchemaError, r"Field not recognized"): + rc.load_dict(bad_rule) + + def test_sequence_top_level_by_and_runs_across_integrations_valid(self) -> None: + """Sequence-level by and per-subquery runs; subqueries use different integrations and validate correctly.""" + rc = RuleCollection() + query = """ + sequence by host.id, user.id with maxspan=1s + [any where event.dataset == "azure.auditlogs" and event.action == "Register device"] by azure.auditlogs.properties.initiated_by.user.userPrincipalName with runs=5 + [authentication where event.dataset == "okta.system" and okta.event_type == "user.mfa.okta_verify.deny_push"] by okta.actor.id + """ + rule = { + "metadata": mk_metadata(["azure", "okta"], comments="Top-level sequence by and runs"), + "rule": mk_rule( + name="EQL sequence with top-level by and runs", + rule_id="4e5f6a99-4567-4f8d-9f72-1d8e5f3e5f15", + description="Validate top-level sequence by and per-subquery runs across integrations.", + risk_score=42, + query=query, + ), + } + rc.load_dict(rule) + + def test_sequence_top_level_by_and_runs_across_integrations_invalid_join(self) -> None: + """Sequence-level by with runs; okta subquery incorrectly uses an azure join field causing validation failure.""" + rc = RuleCollection() + query = """ + sequence by host.id, user.id with maxspan=1s + [any where event.dataset == "azure.auditlogs" and event.action == "Register device"] by azure.auditlogs.properties.initiated_by.user.userPrincipalName with runs=5 + [authentication where event.dataset == "okta.system" and okta.event_type == "user.mfa.okta_verify.deny_push"] by azure.auditlogs.properties.initiated_by.user.userPrincipalName + """ + bad_rule = { + "metadata": mk_metadata(["azure", "okta"], comments="Top-level sequence by and runs invalid join"), + "rule": mk_rule( + name="EQL sequence with top-level by and runs invalid", + rule_id="4e5f6a99-4567-4f8d-9f72-1d8e5f3e5f16", + description="Invalid: okta subquery uses azure join field.", + risk_score=42, + query=query, + ), + } + with self.assertRaisesRegex(eql.EqlSchemaError, r"Field not recognized"): + rc.load_dict(bad_rule) + + def test_sequence_okta_missing_in_metadata_but_present_in_dataset(self) -> None: + """Okta dataset appears in a subquery but is not listed in metadata; dataset should drive schema selection.""" + rc = RuleCollection() + query = """ + sequence with maxspan=30m + [any where event.dataset == "azure.identity_protection"] by azure.identityprotection.properties.user_principal_name + [any where event.dataset == "azure.auditlogs" and event.action == "Register device"] by azure.auditlogs.properties.initiated_by.user.userPrincipalName + [authentication where event.dataset == "okta.system" and okta.event_type == "user.mfa.okta_verify.deny_push"] by okta.actor.id + """ + rule = { + # Intentionally do not include "okta" in metadata.integrations + "metadata": mk_metadata(["azure"], comments="Okta present via dataset only"), + "rule": mk_rule( + name="EQL sequence with okta dataset only", + rule_id="3c4d5e77-2345-4f8d-9f72-1d8e5f3e5f13", + description="Validate that dataset usage includes okta schema even if not in metadata.", + risk_score=50, + query=query, + ), + } + # Should load without error because get_packaged_integrations includes packages parsed from datasets + rc.load_dict(rule) + + def test_sequence_across_integrations_valid(self) -> None: + """Sequence uses azure and crowdstrike datasets; each subquery validates against its own integration.""" + rc = RuleCollection() + query = """ + sequence with maxspan=30m + [any where event.dataset == "azure.auditlogs"] by azure.auditlogs.properties.initiated_by.user.userPrincipalName + [any where event.dataset == "crowdstrike.fdr"] by process.executable + """ + rule = { + "metadata": mk_metadata(["azure", "crowdstrike"], comments="Cross-integration per-subquery validation"), + "rule": mk_rule( + name="EQL sequence across integrations valid", + rule_id="2a3b4c55-1234-4f8d-9f72-1d8e5f3e5f11", + description="Validate sequence subquerys across azure and crowdstrike integrations.", + risk_score=35, + query=query, + ), + } + rc.load_dict(rule) + + def test_sequence_across_integrations_invalid_crowdstrike_subquery_azure_field(self) -> None: + """CrowdStrike subquery incorrectly uses an azure join field, which should fail validation.""" + rc = RuleCollection() + query = """ + sequence with maxspan=30m + [any where event.dataset == "azure.auditlogs"] by azure.auditlogs.properties.initiated_by.user.userPrincipalName + [any where event.dataset == "crowdstrike.fdr"] by azure.auditlogs.properties.initiated_by.user.userPrincipalName + """ + bad_rule = { + "metadata": mk_metadata(["azure", "crowdstrike"], comments="Cross-integration per-subquery validation"), + "rule": mk_rule( + name="EQL sequence across integrations invalid", + rule_id="2a3b4c55-1234-4f8d-9f72-1d8e5f3e5f12", + description="CrowdStrike subquery incorrectly uses an azure join field.", + risk_score=35, + query=query, + ), + } + with self.assertRaisesRegex(eql.EqlSchemaError, r"Field not recognized"): + rc.load_dict(bad_rule) + + def test_sequence_mixed_dataset_and_datasetless_subquery_invalid_field(self) -> None: + """First subquery has dataset; second is datasetless with an invalid vendor field; with no metadata integration + for the datasetless subquery, integration validation so overall validation should fail. + """ + rc = RuleCollection() + query = """ + sequence with maxspan=30m + [any where event.dataset == "azure.auditlogs"] by azure.auditlogs.properties.initiated_by.user.userPrincipalName + [any where foo.invalid_field == "badfield"] by host.id + """ + bad_rule = { + # No integrations in metadata: datasetless subquery should not be validated against any integration + "metadata": mk_metadata([], comments="Mixed dataset and datasetless invalid field"), + "rule": mk_rule( + name="EQL sequence mixed dataset and datasetless invalid", + rule_id="5f6071aa-5678-4f8d-9f72-1d8e5f3e5f17", + description="Second datasetless subquery contains an invalid field; expect failure.", + risk_score=33, + query=query, + ), + } + with self.assertRaisesRegex(eql.EqlSchemaError, r"Field not recognized"): + rc.load_dict(bad_rule) + + def test_sequence_datasetless_subquery_with_metadata_integration_valid(self) -> None: + """Datasetless azure subquery uses azure.* fields with metadata including azure; should validate and pass.""" + rc = RuleCollection() + query = """ + sequence with maxspan=30m + [any where azure.identityprotection.properties.user_principal_name != null] by azure.identityprotection.properties.user_principal_name + [any where event.dataset == "azure.auditlogs"] by azure.auditlogs.properties.initiated_by.user.userPrincipalName + """ + rule = { + "metadata": mk_metadata(["azure"], comments="Datasetless subquery with azure fields"), + "rule": mk_rule( + name="EQL sequence datasetless azure subquery", + rule_id="3d4e5f88-3456-4f8d-9f72-1d8e5f3e5f14", + description="Datasetless azure subquery relies on metadata/field inference for package schema.", + risk_score=30, + query=query, + ), + } + rc.load_dict(rule)