Files
Mika Ayenson, PhD cc66323d1d [Bug] Omit ES|QL engine columns from required_fields (#6027)
* Omit Esql.* columns from ES|QL rule required_fields

Kibana treats required_fields as index mappings. ES|QL stats and
similar commands expose Esql.* and Esql_priv.* result columns that
are not mapped on source indices, which produced noisy validation
warnings for shipped rules.

Filter those names when building required_fields. Add a check in
test_esql_endpoint_alerts_index when remote ES|QL validation runs.

Fixes #6026.

* Move required_fields check to its own remote test

* Iterate production rules in required_fields test

* Use direct get_required_fields call in remote test

Skip to_api_format() and call data.get_required_fields(index) directly,
gated on ESQLRuleData. Mirrors the ESQLValidator scope of the fix and
avoids the unrelated packaging steps that to_api_format runs per rule.

* Bump version to 1.6.30

* Centralize ES|QL dynamic field prefix tuple

Define ESQL_DYNAMIC_FIELD_PREFIXES = ("Esql.", "Esql_priv.") in
schemas/definitions.py and reuse it in QueryValidator.get_required_fields,
ESQLValidator.validate_columns_index_mapping, and the remote test.
Single source of truth and consistent ordering across the codebase.
2026-05-01 17:37:31 -05:00

952 lines
45 KiB
Python

# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
"""Validation logic for rules containing queries."""
import re
import typing
from collections.abc import Callable
from dataclasses import dataclass
from enum import Enum
from functools import cached_property, wraps
from typing import Any
import eql # type: ignore[reportMissingTypeStubs]
import kql # type: ignore[reportMissingTypeStubs]
from elastic_transport import ObjectApiResponse
from elasticsearch import Elasticsearch # type: ignore[reportMissingTypeStubs]
from eql import ast # type: ignore[reportMissingTypeStubs]
from eql.parser import ( # type: ignore[reportMissingTypeStubs]
KvTree,
LarkToEQL,
NodeInfo,
TypeHint,
)
from eql.parser import _parse as base_parse # type: ignore[reportMissingTypeStubs]
from kibana import Kibana # type: ignore[reportMissingTypeStubs]
from semver import Version
from . import ecs, endgame, misc, utils
from .beats import get_datasets_and_modules, parse_beats_from_index
from .config import CUSTOM_RULES_DIR, load_current_package_version, parse_rules_config
from .custom_schemas import update_auto_generated_schema
from .esql import get_esql_query_event_dataset_integrations
from .esql_errors import EsqlTypeMismatchError
from .index_mappings import (
create_remote_indices,
execute_query_against_indices,
get_rule_integrations,
prepare_mappings,
)
from .integrations import (
get_integration_schema_data,
load_integrations_manifests,
parse_datasets,
)
from .rule import EQLRuleData, QueryRuleData, QueryValidator, RuleMeta, TOMLRuleContents, set_eql_config
from .schemas import get_latest_stack_version, get_stack_schemas, get_stack_versions
from .schemas.definitions import ESQL_DYNAMIC_FIELD_PREFIXES, FROM_SOURCES_REGEX
EQL_ERROR_TYPES = (
eql.EqlCompileError
| eql.EqlError
| eql.EqlParseError
| eql.EqlSchemaError
| eql.EqlSemanticError
| eql.EqlSyntaxError
| eql.EqlTypeMismatchError
)
KQL_ERROR_TYPES = kql.KqlCompileError | kql.KqlParseError
RULES_CONFIG = parse_rules_config()
@dataclass(frozen=True)
class ValidationTarget:
"""A single validation target for a query."""
query_text: str
schema: Any
err_trailer: str
min_stack_version: str
kind: str # "integration" or "stack"
# Optional context about schema selection
beat_types: list[str] | None = None
integration_types: list[str] | None = None
class ExtendedTypeHint(Enum):
IP = "ip"
@classmethod
def primitives(cls): # noqa: ANN206
"""Get all primitive types."""
return TypeHint.Boolean, TypeHint.Numeric, TypeHint.Null, TypeHint.String, ExtendedTypeHint.IP
def is_primitive(self) -> bool:
"""Check if a type is a primitive."""
return self in self.primitives()
@typing.no_type_check
def custom_in_set(self: LarkToEQL, node: KvTree) -> NodeInfo:
"""Override and address the limitations of the eql in_set method."""
response = self.visit(node.child_trees)
if not response:
raise ValueError("Child trees are not provided")
outer, container = response
if not outer.validate_type(ExtendedTypeHint.primitives()):
# can't compare non-primitives to sets
raise self._type_error(outer, ExtendedTypeHint.primitives())
# Check that everything inside the container has the same type as outside
error_message = "Unable to compare {expected_type} to {actual_type}"
for inner in container:
if not inner.validate_type(outer):
raise self._type_error(inner, outer, error_message)
if self._elasticsearch_syntax and hasattr(outer, "type_info"):
# Check edge case of in_set and ip/string comparison
outer_type = outer.type_info
if isinstance(self._schema, ecs.KqlSchema2Eql):
type_hint = self._schema.kql_schema.get(str(outer.node), "unknown")
if hasattr(self._schema, "type_mapping") and type_hint == "ip":
outer.type_info = ExtendedTypeHint.IP
for inner in container:
if not inner.validate_type(outer):
raise self._type_error(inner, outer, error_message)
# reset the type
outer.type_info = outer_type
# This will always evaluate to true/false, so it should be a boolean
term = ast.InSet(outer.node, [c.node for c in container])
nullable = outer.nullable or any(c.nullable for c in container)
return NodeInfo(term, TypeHint.Boolean, nullable=nullable, source=node)
def custom_base_parse_decorator(func: Callable[..., Any]) -> Callable[..., Any]:
"""Override and address the limitations of the eql in_set method."""
@wraps(func)
def wrapper(query: str, start: str | None = None, **kwargs: dict[str, Any]) -> Any:
original_in_set = LarkToEQL.in_set # type: ignore[reportUnknownMemberType]
LarkToEQL.in_set = custom_in_set
try:
result = func(query, start=start, **kwargs)
finally: # Using finally to ensure that the original method is restored
LarkToEQL.in_set = original_in_set
return result
return wrapper
eql.parser._parse = custom_base_parse_decorator(base_parse) # type: ignore[reportPrivateUsage] # noqa: SLF001
class KQLValidator(QueryValidator):
"""Specific fields for KQL query event types."""
@cached_property
def ast(self) -> kql.ast.Expression: # type: ignore[reportIncompatibleMethod]
return kql.parse(self.query, normalize_kql_keywords=RULES_CONFIG.normalize_kql_keywords) # type: ignore[reportUnknownMemberType]
@cached_property
def unique_fields(self) -> list[str]: # type: ignore[reportIncompatibleMethod]
return list({str(f) for f in self.ast if isinstance(f, kql.ast.Field)}) # type: ignore[reportUnknownVariableType]
def auto_add_field(self, validation_checks_error: kql.errors.KqlParseError, index_or_dataview: str) -> None:
"""Auto add a missing field to the schema."""
field_name = extract_error_field(self.query, validation_checks_error)
if not field_name:
raise ValueError("No fied name found for the error")
field_type = ecs.get_all_flattened_schema().get(field_name)
update_auto_generated_schema(index_or_dataview, field_name, field_type)
def to_eql(self) -> eql.ast.Expression:
return kql.to_eql(self.query) # type: ignore[reportUnknownVariableType]
def _prepare_integration_schema(
self, base_schema: dict[str, Any], stack_version: str, data: QueryRuleData
) -> dict[str, Any]:
"""Augment a base integration schema with index/custom/endpoint fields."""
schema = dict(base_schema)
for index_name in data.index_or_dataview:
schema.update(**ecs.flatten(ecs.get_index_schema(index_name)))
if data.index and CUSTOM_RULES_DIR:
for index_name in data.index_or_dataview:
schema.update(**ecs.flatten(ecs.get_custom_index_schema(index_name, stack_version)))
schema.update(**ecs.flatten(ecs.get_endpoint_schemas()))
return schema
def build_validation_plan(self, data: QueryRuleData, meta: RuleMeta) -> list[ValidationTarget]:
"""Return a unified list of validation targets for this query.
Integration targets: union of integration schemas per stack version (if integrations are available)
Stack targets: ECS/beats/endgame schemas per supported stack version
"""
targets: list[ValidationTarget] = []
# Build integration-based targets if available
packages_manifest = load_integrations_manifests()
package_integrations = TOMLRuleContents.get_packaged_integrations(data, meta, packages_manifest)
if package_integrations:
combined_by_stack: dict[str, dict[str, Any]] = {}
ecs_by_stack: dict[str, str] = {}
packages_by_stack: dict[str, set[str]] = {}
for integ in get_integration_schema_data(data, meta, package_integrations):
stack_version = integ["stack_version"]
ecs_version = integ["ecs_version"]
package = integ["package"]
schema = self._prepare_integration_schema(integ["schema"], stack_version, data)
_ = ecs_by_stack.setdefault(stack_version, ecs_version)
_ = packages_by_stack.setdefault(stack_version, set()).add(package)
combined_by_stack.setdefault(stack_version, {}).update(schema)
for stack_version, schema_dict in combined_by_stack.items():
ecs_version = ecs_by_stack.get(stack_version, "unknown")
pkgs_set = packages_by_stack.get(stack_version, set())
pkgs = ", ".join(sorted(pkgs_set))
err_trailer = (
"Try adding event.module or event.dataset to specify integration module\n\n"
f"Checked against packages [{pkgs}]; stack: {stack_version}; ecs: {ecs_version}\n"
f"rule: {data.name} - {data.rule_id}"
)
targets.append(
ValidationTarget(
query_text=self.query,
schema=schema_dict,
err_trailer=err_trailer,
min_stack_version=str(meta.min_stack_version or load_current_package_version()),
beat_types=None,
integration_types=sorted(pkgs_set),
kind="integration",
)
)
# Build stack targets only when TOML indicates stack-based validation is needed
# - If no integration packages resolved, include stack targets as fallback
# - Or when beats or endgame indices are present
beat_types_present = parse_beats_from_index(data.index_or_dataview) if data.index_or_dataview else []
endgame_present = bool(data.index_or_dataview and "endgame-*" in data.index_or_dataview)
should_add_stack_targets = (not package_integrations) or (bool(beat_types_present) or endgame_present)
if should_add_stack_targets:
for stack_version, mapping in meta.get_validation_stack_versions().items():
beats_version = mapping["beats"]
ecs_version = mapping["ecs"]
beat_types, _, schema = self.get_beats_schema(data.index_or_dataview, beats_version, ecs_version)
err_trailer = (
f"stack: {stack_version}, beats: {beats_version}, ecs: {ecs_version}\n"
f"rule: {data.name} - {data.rule_id}"
)
targets.append(
ValidationTarget(
query_text=self.query,
schema=schema,
err_trailer=err_trailer,
min_stack_version=str(meta.min_stack_version or load_current_package_version()),
beat_types=beat_types,
integration_types=None,
kind="stack",
)
)
return targets
def validate(self, data: QueryRuleData, meta: RuleMeta, max_attempts: int = 10) -> None: # type: ignore[reportIncompatibleMethod]
"""Validate the query using computed schema combinations, favoring integrations when present."""
if meta.query_schema_validation is False or meta.maturity == "deprecated":
return
if data.language == "lucene":
return
for _ in range(max_attempts):
all_targets = self.build_validation_plan(data, meta)
has_integration = any(t.kind == "integration" for t in all_targets)
# Order targets: integrations first (if any), then stack; otherwise just stack
ordered_targets = (
[t for t in all_targets if t.kind == "integration"] + [t for t in all_targets if t.kind == "stack"]
if has_integration
else [t for t in all_targets if t.kind == "stack"]
)
retry = False
for t in ordered_targets:
exc = self.validate_query_text_with_schema(
schema=t.schema,
err_trailer=t.err_trailer,
beat_types=t.beat_types,
integration_types=t.integration_types,
)
if exc is None:
continue
# Attempt auto-add for missing fields when enabled
if (
(exc.error_msg == "Unknown field" or "Field not recognized" in exc.error_msg) # type: ignore[reportAttributeAccessIssue]
and RULES_CONFIG.auto_gen_schema_file
and data.index_or_dataview
):
self.auto_add_field(exc, data.index_or_dataview[0]) # type: ignore[reportArgumentType]
retry = True
break
# Raise enriched error from helper
raise exc
if not retry:
# All targets passed
return
raise ValueError(f"Maximum validation attempts exceeded for {data.rule_id} - {data.name}")
def validate_query_text_with_schema(
self,
*,
schema: dict[str, Any],
err_trailer: str,
beat_types: list[str] | None,
integration_types: list[str] | None,
) -> KQL_ERROR_TYPES | None:
"""Validate the KQL query text against a given schema and return an enriched error if it fails."""
try:
kql.parse( # type: ignore[reportUnknownMemberType]
self.query,
schema=schema,
normalize_kql_keywords=RULES_CONFIG.normalize_kql_keywords,
)
except kql.KqlParseError as exc:
# Compose an informative trailer
trailer_parts: list[str] = []
if exc.error_msg == "Unknown field" and beat_types:
trailer_parts.insert(
0,
"Try adding event.module or data_stream.dataset to specify beats module",
)
if integration_types:
pkgs = ", ".join(integration_types)
trailer_parts.append(f"integration_types: [{pkgs}]")
if beat_types:
trailer_parts.append(f"beat_types: [{', '.join(beat_types)}]")
if err_trailer:
trailer_parts.append(err_trailer)
trailer = "\n\n".join(tp for tp in trailer_parts if tp)
return kql.KqlParseError(
exc.error_msg, # type: ignore[reportUnknownArgumentType]
exc.line, # type: ignore[reportUnknownArgumentType]
exc.column, # type: ignore[reportUnknownArgumentType]
exc.source, # type: ignore[reportUnknownArgumentType]
len(exc.caret.lstrip()),
trailer=trailer or None, # type: ignore[reportUnknownArgumentType]
)
else:
return None
class EQLValidator(QueryValidator):
"""Specific fields for EQL query event types."""
@cached_property
def ast(self) -> eql.ast.Expression: # type: ignore[reportIncompatibleMethodOverrichemas]
latest_version = Version.parse(load_current_package_version(), optional_minor_and_patch=True)
cfg = set_eql_config(str(latest_version))
with eql.parser.elasticsearch_syntax, eql.parser.ignore_missing_functions, eql.parser.skip_optimizations, cfg:
return eql.parse_query(self.query) # type: ignore[reportUnknownVariableType]
def text_fields(self, eql_schema: ecs.KqlSchema2Eql | endgame.EndgameSchema) -> list[str]:
"""Return a list of fields of type text."""
from kql.parser import elasticsearch_type_family # type: ignore[reportMissingTypeStubs]
schema = eql_schema.kql_schema if isinstance(eql_schema, ecs.KqlSchema2Eql) else eql_schema.endgame_schema
return [f for f in self.unique_fields if elasticsearch_type_family(schema.get(f)) == "text"] # type: ignore[reportArgumentType]
@cached_property
def unique_fields(self) -> list[str]: # type: ignore[reportIncompatibleMethodOverride]
return list({str(f) for f in self.ast if isinstance(f, eql.ast.Field)}) # type: ignore[reportUnknownVariableType]
def auto_add_field(
self, validation_checks_error: eql.EqlParseError, index_or_dataview: str, field: str | None = None
) -> None:
"""Auto add a missing field to the schema."""
field_name = field
if not field:
field_name = extract_error_field(self.query, validation_checks_error)
if not field_name:
raise ValueError("No field name found")
field_type = ecs.get_all_flattened_schema().get(field_name)
update_auto_generated_schema(index_or_dataview, field_name, field_type)
def _build_synthetic_sequence_from_subquery(self, subquery: "ast.SubqueryBy") -> str:
"""Build a minimal synthetic sequence containing the subquery for validation."""
subquery_text = str(subquery)
join_fields = ", ".join(map(str, getattr(subquery, "join_values", []) or []))
dummy_by = f" by {join_fields}" if join_fields else ""
return f"sequence\n {subquery_text}\n [any where true]{dummy_by}"
def build_validation_plan(self, data: "QueryRuleData", meta: RuleMeta) -> list[ValidationTarget]: # noqa: PLR0912 PLR0915
"""Return a unified list of validation targets for EQL validation.
Non-sequence: accumulate integration schemas per stack, optionally add stack schemas.
Sequence: build per-subquery integration targets using synthetic sequences; for datasetless
subqueries without metadata integrations, add per-subquery stack targets; optionally add
whole-query stack schemas when indicated by TOML (indices present) or no integrations.
"""
targets: list[ValidationTarget] = []
is_sequence = getattr(data, "is_sequence", False)
min_stack_str = str(meta.min_stack_version or load_current_package_version())
# Sequence planning below may add per-subquery stack targets when needed
packages_manifest = load_integrations_manifests()
packaged_integrations = TOMLRuleContents.get_packaged_integrations(data, meta, packages_manifest)
beat_types_present = parse_beats_from_index(data.index_or_dataview) if data.index_or_dataview else []
endgame_present = bool(data.index_or_dataview and "endgame-*" in data.index_or_dataview)
# Helper for union-by-stack integration targets
def add_accumulated_integration_targets(query_text: str, packaged: list[dict[str, Any]], context: str) -> None:
"""Add integration-based validation targets by accumulating schemas per stack version."""
combined_by_stack: dict[str, dict[str, Any]] = {}
ecs_by_stack: dict[str, str] = {}
packages_by_stack: dict[str, set[str]] = {}
for integ in get_integration_schema_data(data, meta, packaged):
stack_version = integ["stack_version"]
ecs_version = integ["ecs_version"]
package = integ["package"]
schema = integ["schema"]
# prepare with index/custom/endpoint fields
if data.index_or_dataview:
for index_name in data.index_or_dataview: # type: ignore[reportArgumentType]
schema.update(**ecs.flatten(ecs.get_index_schema(index_name)))
if data.index and CUSTOM_RULES_DIR:
for index_name in data.index_or_dataview:
schema.update(**ecs.flatten(ecs.get_custom_index_schema(index_name, stack_version)))
schema.update(**ecs.flatten(ecs.get_endpoint_schemas()))
# Do not merge Beats into integration schemas; validate independently via stack targets
_ = ecs_by_stack.setdefault(stack_version, ecs_version)
packages_by_stack.setdefault(stack_version, set()).add(package)
combined_by_stack.setdefault(stack_version, {}).update(schema)
for stack_version, schema_dict in combined_by_stack.items():
ecs_version = ecs_by_stack.get(stack_version, "unknown")
pkgs_set = packages_by_stack.get(stack_version, set())
pkgs = ", ".join(sorted(pkgs_set))
err_trailer = (
f"{context}\nChecked against packages [{pkgs}]; stack: {stack_version}; ecs: {ecs_version}\n"
f"rule: {data.name} - {data.rule_id}"
)
targets.append(
ValidationTarget(
query_text=query_text,
schema=ecs.KqlSchema2Eql(schema_dict),
err_trailer=err_trailer,
min_stack_version=min_stack_str,
beat_types=None,
integration_types=sorted(pkgs_set),
kind="integration",
)
)
# Helper to add Beats/ECS (and optionally Endgame) stack targets for a given query text
def add_stack_targets(query_text: str, include_endgame: bool) -> None:
for stack_version, mapping in meta.get_validation_stack_versions().items():
beats_version = mapping["beats"]
ecs_version = mapping["ecs"]
endgame_version = mapping["endgame"]
beat_types, _, kql_schema = self.get_beats_schema(data.index_or_dataview, beats_version, ecs_version)
err_trailer = (
f"stack: {stack_version}, beats: {beats_version},ecs: {ecs_version}, endgame: {endgame_version}\n"
f"rule: {data.name} - {data.rule_id}"
)
# ECS (+beats if present)
targets.append(
ValidationTarget(
query_text=query_text,
schema=ecs.KqlSchema2Eql(kql_schema),
err_trailer=err_trailer,
min_stack_version=min_stack_str,
beat_types=beat_types,
integration_types=None,
kind="stack",
)
)
# Optionally add Endgame
if include_endgame:
endgame_schema = self.get_endgame_schema(data.index_or_dataview, endgame_version)
if endgame_schema:
targets.append(
ValidationTarget(
query_text=query_text,
schema=endgame_schema,
err_trailer=err_trailer,
min_stack_version=min_stack_str,
beat_types=None,
integration_types=None,
kind="stack",
)
)
# Sequence queries: per-subquery validation
if is_sequence:
sequence: ast.Sequence = self.ast.first # type: ignore[reportAttributeAccessIssue]
for subquery in sequence.queries: # type: ignore[reportUnknownVariableType]
subquery_datasets, _ = get_datasets_and_modules(subquery) # type: ignore[reportUnknownVariableType]
synthetic_sequence = self._build_synthetic_sequence_from_subquery(subquery) # type: ignore[reportArgumentType]
if subquery_datasets:
subquery_pkg_ints = parse_datasets(list(subquery_datasets), packages_manifest)
# Per-subquery: validate each integration combination individually (no accumulation)
for integ in get_integration_schema_data(data, meta, subquery_pkg_ints):
package = integ["package"]
package_version = integ["package_version"]
stack_version = integ["stack_version"]
ecs_version = integ["ecs_version"]
schema_dict = integ["schema"]
# prepare schema
if data.index_or_dataview:
for index_name in data.index_or_dataview: # type: ignore[reportArgumentType]
schema_dict.update(**ecs.flatten(ecs.get_index_schema(index_name)))
if data.index and CUSTOM_RULES_DIR:
for index_name in data.index_or_dataview:
schema_dict.update(
**ecs.flatten(ecs.get_custom_index_schema(index_name, stack_version))
)
schema_dict.update(**ecs.flatten(ecs.get_endpoint_schemas()))
err_trailer = (
"Subquery schema mismatch. "
f"package: {package}, package_version: {package_version}, "
f"stack: {stack_version}, ecs: {ecs_version}\n"
f"rule: {data.name} - {data.rule_id}"
)
targets.append(
ValidationTarget(
query_text=synthetic_sequence,
schema=ecs.KqlSchema2Eql(schema_dict),
err_trailer=err_trailer,
min_stack_version=min_stack_str,
beat_types=None,
integration_types=[package],
kind="integration",
)
)
# Additionally validate this subquery against Beats/ECS if beats indices are present
if beat_types_present:
add_stack_targets(synthetic_sequence, include_endgame=False)
else:
# Datasetless subquery: try metadata integrations first, else add per-subquery stack targets
meta_integrations = get_rule_integrations(meta)
if meta_integrations:
meta_pkg_ints = [
{"package": pkg, "integration": None}
for pkg in meta_integrations
if pkg in packages_manifest
]
add_accumulated_integration_targets(
synthetic_sequence,
meta_pkg_ints,
"Datasetless subquery validation against metadata integrations",
)
# Also validate datasetless subquery against Beats/ECS if beats indices are present
if beat_types_present:
add_stack_targets(synthetic_sequence, include_endgame=False)
else:
# Add stack targets for this datasetless subquery
add_stack_targets(synthetic_sequence, include_endgame=True)
elif packaged_integrations:
# Non-sequence queries: accumulate integrations per stack if available
add_accumulated_integration_targets(
self.query,
packaged_integrations,
"Try adding event.module or event.dataset to specify integration module",
)
# Stack targets for whole query:
# - always when no integrations are resolved; OR
# - for non-sequence queries when beats or endgame indices are present
need_stack_targets = (not packaged_integrations) or (
(not is_sequence) and (beat_types_present or endgame_present)
)
if need_stack_targets:
add_stack_targets(self.query, include_endgame=True)
return targets
def validate(self, data: "QueryRuleData", meta: RuleMeta, max_attempts: int = 10) -> None: # type: ignore[reportIncompatibleMethodOverride]
"""Validate an EQL query using a unified plan of schema combinations."""
# base field declaration
field = None
if meta.query_schema_validation is False or meta.maturity == "deprecated":
return
if data.language == "lucene":
return
# Validate rule type configuration fields against ECS schema
set_fields, has_invalid = self.validate_rule_type_configurations(data, meta) # type: ignore[reportArgumentType]
if has_invalid and set_fields:
raise ValueError(f"Rule type configuration fields not in ECS schema: {', '.join(set_fields)}")
for _ in range(max_attempts):
all_targets = self.build_validation_plan(data, meta)
has_integration = any(t.kind == "integration" for t in all_targets)
# Order targets: integrations first (if any), then stack; otherwise just stack
ordered_targets = (
[t for t in all_targets if t.kind == "integration"] + [t for t in all_targets if t.kind == "stack"]
if has_integration
else [t for t in all_targets if t.kind == "stack"]
)
first_error: EQL_ERROR_TYPES | ValueError | None = None
for t in ordered_targets:
exc, field = self.validate_query_text_with_schema(
t.query_text,
t.schema,
err_trailer=t.err_trailer,
min_stack_version=t.min_stack_version,
beat_types=t.beat_types,
integration_types=t.integration_types,
)
if exc is not None:
first_error = exc
break
if first_error is None:
# All targets passed
return
# Attempt auto-add only when unknown field and enabled; then retry
if (
isinstance(first_error, eql.EqlParseError)
and "Field not recognized" in str(first_error)
and RULES_CONFIG.auto_gen_schema_file
and data.index_or_dataview
):
self.auto_add_field(first_error, data.index_or_dataview[0], field=field) # type: ignore[reportArgumentType]
continue
# Raise the enriched parse error (includes target trailer + metadata)
raise first_error
raise ValueError(f"Maximum validation attempts exceeded for {data.rule_id} - {data.name}")
def validate_query_text_with_schema( # noqa: PLR0913
self,
query_text: str,
schema: ecs.KqlSchema2Eql | endgame.EndgameSchema,
err_trailer: str,
min_stack_version: str,
beat_types: list[str] | None = None,
integration_types: list[str] | None = None,
) -> tuple[EQL_ERROR_TYPES | ValueError | None, str | None]:
"""Validate the provided EQL query text against the schema (variant of validate_query_with_schema)."""
try:
config = set_eql_config(min_stack_version)
with config, schema, eql.parser.elasticsearch_syntax, eql.parser.ignore_missing_functions:
_ = eql.parse_query(query_text) # type: ignore[reportUnknownMemberType]
except eql.EqlParseError as exc:
message = exc.error_msg
trailer_parts: list[str] = []
# If the error is an unknown field and the field was referenced as optional (prefixed with '?'),
# treat this target as non-fatal to honor EQL optional semantics.
# To support EQL sequence and sub query validation we need to return this field to overwrite
# what would have been parsed via auto_add_field as the error message and query may be out of sync
# depending on how the method is called.
field = extract_error_field(query_text, exc)
if (
field
and ("Unknown field" in message or "Field not recognized" in message)
and f"?{field}" in self.query
):
return None, field
if "Unknown field" in message and beat_types:
trailer_parts.insert(0, "Try adding event.module or event.dataset to specify beats module")
elif "Field not recognized" in message and isinstance(schema, ecs.KqlSchema2Eql):
text_fields = self.text_fields(schema)
if text_fields:
fields_str = ", ".join(text_fields)
trailer_parts.insert(0, f"eql does not support text fields: {fields_str}")
# Surface integration packages if available
if integration_types:
pkgs = ", ".join(integration_types)
trailer_parts.append(f"integration_types: [{pkgs}]")
# Surface beat types if available (stack plan)
if beat_types:
trailer_parts.append(f"beat_types: [{', '.join(beat_types)}]")
if err_trailer:
trailer_parts.append(err_trailer)
trailer = "\n\n".join(tp for tp in trailer_parts if tp)
return exc.__class__(
exc.error_msg, # type: ignore[reportUnknownArgumentType]
exc.line, # type: ignore[reportUnknownArgumentType]
exc.column, # type: ignore[reportUnknownArgumentType]
exc.source, # type: ignore[reportUnknownArgumentType]
len(exc.caret.lstrip()),
trailer=trailer,
), field
except Exception as exc: # noqa: BLE001
print(err_trailer)
return exc, None # type: ignore[reportReturnType]
return None, None
def validate_rule_type_configurations(self, data: EQLRuleData, meta: RuleMeta) -> tuple[list[str], bool]:
"""Validate EQL rule type configurations (timestamp_field, event_category_override, tiebreaker_field).
Returns a tuple of the list of configured field names (non-empty) and a boolean indicating whether
any are not present in the ECS schema for the rule's minimum stack version (or current package version).
"""
configured: list[str] = []
if data.timestamp_field:
configured.append(data.timestamp_field)
if data.event_category_override:
configured.append(data.event_category_override)
if data.tiebreaker_field:
configured.append(data.tiebreaker_field)
if not configured:
return [], False
stack_version = str(meta.min_stack_version or load_current_package_version())
min_stack_version = str(Version.parse(stack_version, optional_minor_and_patch=True))
stack_map = get_stack_schemas(stack_version)
ecs_version = stack_map[min_stack_version]["ecs"]
schema = ecs.get_schema(ecs_version)
return configured, any(f not in schema for f in configured)
class ESQLValidator(QueryValidator):
"""Validate specific fields for ESQL query event types."""
kibana_client: Kibana
elastic_client: Elasticsearch
metadata: RuleMeta
rule_id: str
verbosity: int
esql_unique_fields: list[dict[str, str]]
def log(self, val: str) -> None:
"""Log if verbosity is 1 or greater (1 corresponds to `-v` in pytest)"""
unit_test_verbose_level = 1
if self.verbosity >= unit_test_verbose_level:
print(f"{self.rule_id}:", val)
@property
def ast(self) -> Any:
"""Return the AST of the ESQL query. Dependant on an ESQL parser, which is not implemented"""
# Needs to return none to prevent not implemented error
return None
@cached_property
def unique_fields(self) -> list[str]: # type: ignore[reportIncompatibleMethodOverride]
"""Return a list of unique fields in the query. Requires remote validation to have occurred."""
esql_unique_fields = getattr(self, "esql_unique_fields", None)
if esql_unique_fields:
return [field["name"] for field in self.esql_unique_fields]
return []
def get_esql_query_indices(self, query: str) -> tuple[str, list[str]]:
"""Extract indices from an ES|QL query."""
match = FROM_SOURCES_REGEX.search(query)
if not match:
return "", []
sources_str = match.group("sources")
# Truncate cross cluster search indices to local indices
sources_list: list[str] = [
source.split(":", 1)[-1].strip() if ":" in source else source.strip() for source in sources_str.split(",")
]
return sources_str, sources_list
def get_unique_field_type(self, field_name: str) -> str | None: # type: ignore[reportIncompatibleMethodOverride]
"""Get the type of the unique field. Requires remote validation to have occurred."""
esql_unique_fields = getattr(self, "esql_unique_fields", [])
for field in esql_unique_fields:
if field["name"] == field_name:
return field["type"]
return None
def validate_columns_index_mapping(
self, query_columns: list[dict[str, str]], combined_mappings: dict[str, Any], version: str = "", query: str = ""
) -> bool:
"""Validate that the columns in the ESQL query match the provided mappings."""
mismatched_columns: list[str] = []
for column in query_columns:
column_name = column["name"]
# Skip Dynamic fields
if column_name.startswith(ESQL_DYNAMIC_FIELD_PREFIXES):
continue
# Skip internal fields
if column_name in ("_id", "_version", "_index"):
continue
# Skip implicit fields
if column_name not in query:
continue
column_type = column["type"]
# Check if the column exists in combined_mappings or a valid field generated from a function or operator
keys = column_name.split(".")
schema_type = utils.get_column_from_index_mapping_schema(keys, combined_mappings)
schema_type = kql.parser.elasticsearch_type_family(schema_type) if schema_type else None
# If it is in the schema, but Kibana returns unsupported
if schema_type and column_type == "unsupported":
continue
# Validate the type
if not schema_type or column_type != schema_type:
# Attempt reverse mapping as for our purposes they are equivalent.
# We are generally concerned about the operators for the types not the values themselves.
reverse_col_type = kql.parser.elasticsearch_type_family(column_type) if column_type else None
if reverse_col_type is not None and schema_type is not None and reverse_col_type == schema_type:
continue
if reverse_col_type is not None and reverse_col_type == column_type:
continue
mismatched_columns.append(
f"Dynamic field `{column_name}` is not correctly mapped. "
f"If not dynamic: expected from schema: `{schema_type}`, got from Kibana: `{column_type}`."
)
if mismatched_columns:
raise EsqlTypeMismatchError(
f"Column validation errors in Stack Version {version}:\n" + "\n".join(mismatched_columns)
)
return True
def validate(self, data: "QueryRuleData", rule_meta: RuleMeta, force_remote_validation: bool = False) -> None: # type: ignore[reportIncompatibleMethodOverride]
"""Validate an ESQL query while checking TOMLRule."""
if misc.getdefault("remote_esql_validation")() or force_remote_validation:
resolved_kibana_options = {
str(option.name): option.default() if callable(option.default) else option.default
for option in misc.kibana_options
if option.name is not None
}
resolved_elastic_options = {
option.name: option.default() if callable(option.default) else option.default
for option in misc.elasticsearch_options
if option.name is not None
}
with (
misc.get_kibana_client(**resolved_kibana_options) as kibana_client, # type: ignore[reportUnknownVariableType]
misc.get_elasticsearch_client(**resolved_elastic_options) as elastic_client, # type: ignore[reportUnknownVariableType]
):
_ = self.remote_validate_rule(
kibana_client,
elastic_client,
data.query,
rule_meta,
data.rule_id,
)
def remote_validate_rule_contents(
self, kibana_client: Kibana, elastic_client: Elasticsearch, contents: TOMLRuleContents, verbosity: int = 0
) -> ObjectApiResponse[Any]:
"""Remote validate a rule's ES|QL query using an Elastic Stack."""
return self.remote_validate_rule(
kibana_client=kibana_client,
elastic_client=elastic_client,
query=contents.data.query, # type: ignore[reportUnknownVariableType]
metadata=contents.metadata,
rule_id=contents.data.rule_id,
verbosity=verbosity,
)
def remote_validate_rule( # noqa: PLR0913
self,
kibana_client: Kibana,
elastic_client: Elasticsearch,
query: str,
metadata: RuleMeta,
rule_id: str = "",
verbosity: int = 0,
) -> ObjectApiResponse[Any]:
"""Uses remote validation from an Elastic Stack to validate ES|QL a given rule"""
self.rule_id = rule_id
self.verbosity = verbosity
# Validate that all fields (columns) are either dynamic fields or correctly mapped
# against the combined mapping of all the indices
kibana_details: dict[str, Any] = kibana_client.get("/api/status", {}) # type: ignore[reportUnknownVariableType]
if "version" not in kibana_details:
raise ValueError("Failed to retrieve Kibana details.")
stack_version = get_latest_stack_version()
self.log(f"Validating against {stack_version} stack")
indices_str, indices = self.get_esql_query_indices(query) # type: ignore[reportUnknownVariableType]
self.log(f"Extracted indices from query: {', '.join(indices)}")
event_dataset_integrations = get_esql_query_event_dataset_integrations(query)
self.log(
"Extracted Event Dataset integrations from query: "
f"{', '.join(str(integration) for integration in event_dataset_integrations)}"
)
# Get mappings for all matching existing index templates
existing_mappings, index_lookup, combined_mappings = prepare_mappings(
elastic_client, indices, event_dataset_integrations, metadata, stack_version, self.log
)
self.log(f"Collected mappings: {len(existing_mappings)}")
self.log(f"Combined mappings prepared: {len(combined_mappings)}")
# Create remote indices
full_index_str = create_remote_indices(elastic_client, existing_mappings, index_lookup, self.log)
# Replace all sources with the test indices
query = query.replace(indices_str, full_index_str) # type: ignore[reportUnknownVariableType]
query_columns, response = execute_query_against_indices(elastic_client, query, full_index_str, self.log) # type: ignore[reportUnknownVariableType]
self.esql_unique_fields = query_columns
# Build a mapping lookup for all stack versions to validate against.
# We only need to check against the schemas locally for the type
# mismatch error, as the EsqlSchemaError and EsqlSyntaxError errors from the stack
# will not be impacted by the difference in schema type mapping.
mappings_lookup: dict[str, dict[str, Any]] = {stack_version: combined_mappings}
versions = get_stack_versions()
for version in versions:
if version in mappings_lookup:
continue
_, _, combined_mappings = prepare_mappings(
elastic_client, indices, event_dataset_integrations, metadata, version, self.log
)
mappings_lookup[version] = combined_mappings
for version, mapping in mappings_lookup.items():
self.log(f"Validating {rule_id} against {version} stack")
if not self.validate_columns_index_mapping(query_columns, mapping, version=version, query=query):
self.log("Dynamic column(s) have improper formatting.")
return response
def extract_error_field(source: str, exc: eql.EqlParseError | kql.KqlParseError) -> str | None:
"""Extract the field name from an EQL or KQL parse error."""
lines = source.splitlines()
mod = -1 if exc.line == len(lines) else 0 # type: ignore[reportUnknownMemberType]
line = lines[exc.line + mod] # type: ignore[reportUnknownMemberType]
start = exc.column # type: ignore[reportUnknownMemberType]
stop = start + len(exc.caret.strip()) # type: ignore[reportUnknownVariableType]
return re.sub(r"^\W+|\W+$", "", line[start:stop]) # type: ignore[reportUnknownArgumentType]