[FR] Refactor Schema Validation & Support Multi-Dataset Sequence Validation (#5059)

This commit is contained in:
Mika Ayenson, PhD
2025-09-10 13:11:04 -05:00
committed by GitHub
parent 25539fd6c6
commit f0f7d217c0
7 changed files with 785 additions and 481 deletions
+8 -5
View File
@@ -181,17 +181,20 @@ def get_beats_sub_schema(schema: dict[str, Any], beat: str, module: str, *datase
flattened: list[dict[str, Any]] = []
beat_dir = schema[beat]
module_dir = beat_dir.get("folders", {}).get("module", {}).get("folders", {}).get(module, {})
# Normalize module name in case callers include quotes from rendered AST
normalized_module = module.strip("\"' ")
module_dir = beat_dir.get("folders", {}).get("module", {}).get("folders", {}).get(normalized_module, {})
# if we only have a module then we'll work with what we got
all_datasets = datasets if datasets else [d for d in module_dir.get("folders", {}) if not d.startswith("_")]
for _dataset in all_datasets:
# replace aws.s3 -> s3
dataset = _dataset[len(module) + 1 :] if _dataset.startswith(module + ".") else _dataset
ds = _dataset.strip("\"' ")
dataset = ds[len(normalized_module) + 1 :] if ds.startswith(normalized_module + ".") else ds
dataset_dir = module_dir.get("folders", {}).get(dataset, {})
flattened.extend(get_field_schema(dataset_dir, prefix=module + ".", include_common=True))
flattened.extend(get_field_schema(dataset_dir, prefix=normalized_module + ".", include_common=True))
# we also need to capture (beta?) fields which are directly within the module _meta.files.fields
flattened.extend(get_field_schema(module_dir, include_common=True))
@@ -268,11 +271,11 @@ def get_datasets_and_modules(tree: eql.ast.BaseNode | kql.ast.BaseNode) -> tuple
and isinstance(node.right, eql.ast.String)
):
if node.left == eql.ast.Field("event", ["module"]):
modules.add(node.right.render()) # type: ignore[reportUnknownMemberType]
modules.add(node.right.value) # type: ignore[reportUnknownMemberType]
elif node.left == eql.ast.Field("event", ["dataset"]) or node.left == eql.ast.Field(
"data_stream", ["dataset"]
):
datasets.add(node.right.render()) # type: ignore[reportUnknownMemberType]
datasets.add(node.right.value) # type: ignore[reportUnknownMemberType]
elif isinstance(node, eql.ast.InSet):
if node.expression == eql.ast.Field("event", ["module"]):
modules.update(node.get_literals()) # type: ignore[reportUnknownMemberType]
+15 -1
View File
@@ -1,4 +1,11 @@
{
"auditbeat-*": {
"auditd.data.addr": "keyword",
"auditd.data.grantors": "keyword",
"auditd.data.syscall": "keyword",
"auditd.data.terminal": "keyword",
"auditd.result": "keyword"
},
"endgame-*": {
"endgame": {
"metadata": {
@@ -8,6 +15,9 @@
}
},
"winlogbeat-*": {
"problemchild.prediction": "long",
"problemchild.prediction_probability": "long",
"blocklist_label": "long",
"winlog": {
"event_data": {
"AccessList": "keyword",
@@ -17,6 +27,7 @@
"AllowedToDelegateTo": "keyword",
"AttributeLDAPDisplayName": "keyword",
"AttributeValue": "keyword",
"AuditPolicyChangesDescription": "keyword",
"CallerProcessName": "keyword",
"CallTrace": "keyword",
"ClientProcessId": "keyword",
@@ -57,7 +68,9 @@
"Status": "keyword",
"EnabledPrivilegeList": "keyword",
"Operation": "keyword",
"OperationType": "keyword"
"OperationType": "keyword",
"NewUACList": "keyword",
"SubCategory": "keyword"
}
},
"winlog.logon.type": "keyword",
@@ -199,6 +212,7 @@
"azure.platformlogs.properties.id": "keyword"
},
"logs-o365.audit-*": {
"o365.audit.ExtendedProperties.RequestType": "keyword",
"o365.audit.ExtendedProperties.ResultStatusDetail": "keyword",
"o365.audit.OperationProperties.Name": "keyword",
"o365.audit.OperationProperties.Value": "keyword",
+2 -2
View File
@@ -710,8 +710,8 @@ class QueryValidator:
@cached
def get_endgame_schema(self, indices: list[str], endgame_version: str) -> endgame.EndgameSchema | None:
"""Get an assembled flat endgame schema."""
if indices and "endgame-*" not in indices:
# Only include endgame when explicitly requested by TOML via indices
if not indices or "endgame-*" not in indices:
return None
endgame_schema = endgame.read_endgame_schema(endgame_version=endgame_version)
+488 -390
View File
@@ -8,23 +8,29 @@
import re
import typing
from collections.abc import Callable
from dataclasses import dataclass
from enum import Enum
from functools import cached_property, wraps
from typing import Any
import click
import eql # type: ignore[reportMissingTypeStubs]
import kql # type: ignore[reportMissingTypeStubs]
from eql import ast # type: ignore[reportMissingTypeStubs]
from eql.parser import KvTree, LarkToEQL, NodeInfo, TypeHint # type: ignore[reportMissingTypeStubs]
from eql.parser import ( # type: ignore[reportMissingTypeStubs]
KvTree,
LarkToEQL,
NodeInfo,
TypeHint,
)
from eql.parser import _parse as base_parse # type: ignore[reportMissingTypeStubs]
from marshmallow import ValidationError
from semver import Version
from . import ecs, endgame
from .beats import get_datasets_and_modules, parse_beats_from_index
from .config import CUSTOM_RULES_DIR, load_current_package_version, parse_rules_config
from .custom_schemas import update_auto_generated_schema
from .integrations import get_integration_schema_data, load_integrations_manifests
from .integrations import get_integration_schema_data, load_integrations_manifests, parse_datasets
from .rule import EQLRuleData, QueryRuleData, QueryValidator, RuleMeta, TOMLRuleContents, set_eql_config
from .schemas import get_stack_schemas
@@ -41,6 +47,20 @@ KQL_ERROR_TYPES = kql.KqlCompileError | kql.KqlParseError
RULES_CONFIG = parse_rules_config()
@dataclass(frozen=True)
class ValidationTarget:
"""A single validation target for a query."""
query_text: str
schema: Any
err_trailer: str
min_stack_version: str
kind: str # "integration" or "stack"
# Optional context about schema selection
beat_types: list[str] | None = None
integration_types: list[str] | None = None
class ExtendedTypeHint(Enum):
IP = "ip"
@@ -65,7 +85,7 @@ def custom_in_set(self: LarkToEQL, node: KvTree) -> NodeInfo:
if not outer.validate_type(ExtendedTypeHint.primitives()):
# can't compare non-primitives to sets
raise self._type_error(outer, ExtendedTypeHint.primit())
raise self._type_error(outer, ExtendedTypeHint.primitives())
# Check that everything inside the container has the same type as outside
error_message = "Unable to compare {expected_type} to {actual_type}"
@@ -134,187 +154,186 @@ class KQLValidator(QueryValidator):
def to_eql(self) -> eql.ast.Expression:
return kql.to_eql(self.query) # type: ignore[reportUnknownVariableType]
def _prepare_integration_schema(
self, base_schema: dict[str, Any], stack_version: str, data: QueryRuleData
) -> dict[str, Any]:
"""Augment a base integration schema with index/custom/endpoint fields."""
schema = dict(base_schema)
for index_name in data.index_or_dataview:
schema.update(**ecs.flatten(ecs.get_index_schema(index_name)))
if data.index and CUSTOM_RULES_DIR:
for index_name in data.index_or_dataview:
schema.update(**ecs.flatten(ecs.get_custom_index_schema(index_name, stack_version)))
schema.update(**ecs.flatten(ecs.get_endpoint_schemas()))
return schema
def build_validation_plan(self, data: QueryRuleData, meta: RuleMeta) -> list[ValidationTarget]:
"""Return a unified list of validation targets for this query.
Integration targets: union of integration schemas per stack version (if integrations are available)
Stack targets: ECS/beats/endgame schemas per supported stack version
"""
targets: list[ValidationTarget] = []
# Build integration-based targets if available
packages_manifest = load_integrations_manifests()
package_integrations = TOMLRuleContents.get_packaged_integrations(data, meta, packages_manifest)
if package_integrations:
combined_by_stack: dict[str, dict[str, Any]] = {}
ecs_by_stack: dict[str, str] = {}
packages_by_stack: dict[str, set[str]] = {}
for integ in get_integration_schema_data(data, meta, package_integrations):
stack_version = integ["stack_version"]
ecs_version = integ["ecs_version"]
package = integ["package"]
schema = self._prepare_integration_schema(integ["schema"], stack_version, data)
_ = ecs_by_stack.setdefault(stack_version, ecs_version)
_ = packages_by_stack.setdefault(stack_version, set()).add(package)
combined_by_stack.setdefault(stack_version, {}).update(schema)
for stack_version, schema_dict in combined_by_stack.items():
ecs_version = ecs_by_stack.get(stack_version, "unknown")
pkgs_set = packages_by_stack.get(stack_version, set())
pkgs = ", ".join(sorted(pkgs_set))
err_trailer = (
"Try adding event.module or event.dataset to specify integration module\n\n"
f"Checked against packages [{pkgs}]; stack: {stack_version}; ecs: {ecs_version}\n"
f"rule: {data.name} - {data.rule_id}"
)
targets.append(
ValidationTarget(
query_text=self.query,
schema=schema_dict,
err_trailer=err_trailer,
min_stack_version=str(meta.min_stack_version or load_current_package_version()),
beat_types=None,
integration_types=sorted(pkgs_set),
kind="integration",
)
)
# Build stack targets only when TOML indicates stack-based validation is needed
# - If no integration packages resolved, include stack targets as fallback
# - Or when beats or endgame indices are present
beat_types_present = parse_beats_from_index(data.index_or_dataview) if data.index_or_dataview else []
endgame_present = bool(data.index_or_dataview and "endgame-*" in data.index_or_dataview)
should_add_stack_targets = (not package_integrations) or (bool(beat_types_present) or endgame_present)
if should_add_stack_targets:
for stack_version, mapping in meta.get_validation_stack_versions().items():
beats_version = mapping["beats"]
ecs_version = mapping["ecs"]
beat_types, _, schema = self.get_beats_schema(data.index_or_dataview, beats_version, ecs_version)
err_trailer = (
f"stack: {stack_version}, beats: {beats_version}, ecs: {ecs_version}\n"
f"rule: {data.name} - {data.rule_id}"
)
targets.append(
ValidationTarget(
query_text=self.query,
schema=schema,
err_trailer=err_trailer,
min_stack_version=str(meta.min_stack_version or load_current_package_version()),
beat_types=beat_types,
integration_types=None,
kind="stack",
)
)
return targets
def validate(self, data: QueryRuleData, meta: RuleMeta, max_attempts: int = 10) -> None: # type: ignore[reportIncompatibleMethod]
"""Validate the query, called from the parent which contains [metadata] information."""
"""Validate the query using computed schema combinations, favoring integrations when present."""
if meta.query_schema_validation is False or meta.maturity == "deprecated":
# syntax only, which is done via self.ast
return
if data.language != "lucene":
packages_manifest = load_integrations_manifests()
package_integrations = TOMLRuleContents.get_packaged_integrations(data, meta, packages_manifest)
for _ in range(max_attempts):
validation_checks: dict[str, KQL_ERROR_TYPES | None] = {"stack": None, "integrations": None}
# validate the query against fields within beats
validation_checks["stack"] = self.validate_stack_combos(data, meta)
if data.language == "lucene":
return
if package_integrations:
# validate the query against related integration fields
validation_checks["integrations"] = self.validate_integration(data, meta, package_integrations)
for _ in range(max_attempts):
all_targets = self.build_validation_plan(data, meta)
has_integration = any(t.kind == "integration" for t in all_targets)
# Order targets: integrations first (if any), then stack; otherwise just stack
ordered_targets = (
[t for t in all_targets if t.kind == "integration"] + [t for t in all_targets if t.kind == "stack"]
if has_integration
else [t for t in all_targets if t.kind == "stack"]
)
retry = False
for t in ordered_targets:
exc = self.validate_query_text_with_schema(
schema=t.schema,
err_trailer=t.err_trailer,
beat_types=t.beat_types,
integration_types=t.integration_types,
)
if exc is None:
continue
if validation_checks["stack"] and not package_integrations:
# if auto add, try auto adding and then call stack_combo validation again
if validation_checks["stack"].error_msg == "Unknown field" and RULES_CONFIG.auto_gen_schema_file: # type: ignore[reportAttributeAccessIssue]
# auto add the field and re-validate
self.auto_add_field(validation_checks["stack"], data.index_or_dataview[0]) # type: ignore[reportArgumentType]
else:
raise validation_checks["stack"]
if validation_checks["stack"] and validation_checks["integrations"]:
# if auto add, try auto adding and then call stack_combo validation again
if validation_checks["stack"].error_msg == "Unknown field" and RULES_CONFIG.auto_gen_schema_file: # type: ignore[reportAttributeAccessIssue]
# auto add the field and re-validate
self.auto_add_field(validation_checks["stack"], data.index_or_dataview[0]) # type: ignore[reportArgumentType]
else:
click.echo(f"Stack Error Trace: {validation_checks['stack']}")
click.echo(f"Integrations Error Trace: {validation_checks['integrations']}")
raise ValueError("Error in both stack and integrations checks")
else:
# Attempt auto-add for missing fields when enabled
if (
(exc.error_msg == "Unknown field" or "Field not recognized" in exc.error_msg) # type: ignore[reportAttributeAccessIssue]
and RULES_CONFIG.auto_gen_schema_file
and data.index_or_dataview
):
self.auto_add_field(exc, data.index_or_dataview[0]) # type: ignore[reportArgumentType]
retry = True
break
def validate_stack_combos(self, data: QueryRuleData, meta: RuleMeta) -> KQL_ERROR_TYPES | None:
"""Validate the query against ECS and beats schemas across stack combinations."""
for stack_version, mapping in meta.get_validation_stack_versions().items():
beats_version = mapping["beats"]
ecs_version = mapping["ecs"]
err_trailer = f"stack: {stack_version}, beats: {beats_version}, ecs: {ecs_version}"
# Raise enriched error from helper
raise exc
if not retry:
# All targets passed
return
beat_types, _, schema = self.get_beats_schema(data.index_or_dataview, beats_version, ecs_version)
raise ValueError(f"Maximum validation attempts exceeded for {data.rule_id} - {data.name}")
try:
kql.parse(self.query, schema=schema, normalize_kql_keywords=RULES_CONFIG.normalize_kql_keywords) # type: ignore[reportUnknownMemberType]
except kql.KqlParseError as exc:
message = exc.error_msg
trailer = err_trailer
if "Unknown field" in message and beat_types:
trailer = f"\nTry adding event.module or data_stream.dataset to specify beats module\n\n{trailer}"
return kql.KqlParseError(
exc.error_msg, # type: ignore[reportUnknownArgumentType]
exc.line, # type: ignore[reportUnknownArgumentType]
exc.column, # type: ignore[reportUnknownArgumentType]
exc.source, # type: ignore[reportUnknownArgumentType]
len(exc.caret.lstrip()),
trailer=trailer,
)
return None
def validate_integration( # noqa: PLR0912
def validate_query_text_with_schema(
self,
data: QueryRuleData,
meta: RuleMeta,
package_integrations: list[dict[str, Any]],
*,
schema: dict[str, Any],
err_trailer: str,
beat_types: list[str] | None,
integration_types: list[str] | None,
) -> KQL_ERROR_TYPES | None:
"""Validate the query, called from the parent which contains [metadata] information."""
if meta.query_schema_validation is False or meta.maturity == "deprecated":
return None
error_fields = {}
package_schemas = {}
# Initialize package_schemas with a nested structure
for integration_data in package_integrations:
package = integration_data["package"]
integration = integration_data["integration"]
if integration:
package_schemas.setdefault(package, {}).setdefault(integration, {}) # type: ignore[reportUnknownMemberType]
else:
package_schemas.setdefault(package, {}) # type: ignore[reportUnknownMemberType]
# Process each integration schema
for integration_schema_data in get_integration_schema_data(data, meta, package_integrations):
package, integration = (
integration_schema_data["package"],
integration_schema_data["integration"],
"""Validate the KQL query text against a given schema and return an enriched error if it fails."""
try:
kql.parse( # type: ignore[reportUnknownMemberType]
self.query,
schema=schema,
normalize_kql_keywords=RULES_CONFIG.normalize_kql_keywords,
)
integration_schema = integration_schema_data["schema"]
stack_version = integration_schema_data["stack_version"]
# Add non-ecs-schema fields
for index_name in data.index_or_dataview:
integration_schema.update(**ecs.flatten(ecs.get_index_schema(index_name)))
# Add custom schema fields for appropriate stack version
if data.index and CUSTOM_RULES_DIR:
for index_name in data.index_or_dataview:
integration_schema.update(**ecs.flatten(ecs.get_custom_index_schema(index_name, stack_version)))
# Add endpoint schema fields for multi-line fields
integration_schema.update(**ecs.flatten(ecs.get_endpoint_schemas()))
if integration:
package_schemas[package][integration] = integration_schema
else:
package_schemas[package] = integration_schema
# Validate the query against the schema
try:
kql.parse( # type: ignore[reportUnknownMemberType]
self.query,
schema=integration_schema,
normalize_kql_keywords=RULES_CONFIG.normalize_kql_keywords,
except kql.KqlParseError as exc:
# Compose an informative trailer
trailer_parts: list[str] = []
if exc.error_msg == "Unknown field" and beat_types:
trailer_parts.insert(
0,
"Try adding event.module or data_stream.dataset to specify beats module",
)
except kql.KqlParseError as exc:
if exc.error_msg == "Unknown field":
field = extract_error_field(self.query, exc)
trailer = (
f"\n\tTry adding event.module or data_stream.dataset to specify integration module\n\t"
f"Will check against integrations {meta.integration} combined.\n\t"
f"{package=}, {integration=}, {integration_schema_data['package_version']=}, "
f"{integration_schema_data['stack_version']=}, "
f"{integration_schema_data['ecs_version']=}"
)
error_fields[field] = {
"error": exc,
"trailer": trailer,
"package": package,
"integration": integration,
}
if data.get("notify", False):
print(f"\nWarning: `{field}` in `{data.name}` not found in schema. {trailer}")
else:
return kql.KqlParseError(
exc.error_msg, # type: ignore[reportUnknownArgumentType]
exc.line, # type: ignore[reportUnknownArgumentType]
exc.column, # type: ignore[reportUnknownArgumentType]
exc.source, # type: ignore[reportUnknownArgumentType]
len(exc.caret.lstrip()),
exc.trailer, # type: ignore[reportUnknownArgumentType]
)
if integration_types:
pkgs = ", ".join(integration_types)
trailer_parts.append(f"integration_types: [{pkgs}]")
if beat_types:
trailer_parts.append(f"beat_types: [{', '.join(beat_types)}]")
# Check error fields against schemas of different packages or different integrations
for field, error_data in list(error_fields.items()): # type: ignore[reportUnknownArgumentType]
error_package, error_integration = ( # type: ignore[reportUnknownVariableType]
error_data["package"],
error_data["integration"],
)
for package, integrations_or_schema in package_schemas.items(): # type: ignore[reportUnknownVariableType]
if error_integration is None:
# Compare against the schema directly if there's no integration
if error_package != package and field in integrations_or_schema:
del error_fields[field]
break
else:
# Compare against integration schemas
for integration, schema in integrations_or_schema.items(): # type: ignore[reportUnknownMemberType]
check_alt_schema = error_package != package or ( # type: ignore[reportUnknownVariableType]
error_package == package and error_integration != integration
)
if check_alt_schema and field in schema:
del error_fields[field]
if err_trailer:
trailer_parts.append(err_trailer)
trailer = "\n\n".join(tp for tp in trailer_parts if tp)
# Raise the first error
if error_fields:
_, error_data = next(iter(error_fields.items())) # type: ignore[reportUnknownVariableType]
return kql.KqlParseError(
error_data["error"].error_msg, # type: ignore[reportUnknownArgumentType]
error_data["error"].line, # type: ignore[reportUnknownArgumentType]
error_data["error"].column, # type: ignore[reportUnknownArgumentType]
error_data["error"].source, # type: ignore[reportUnknownArgumentType]
len(error_data["error"].caret.lstrip()), # type: ignore[reportUnknownArgumentType]
error_data["trailer"], # type: ignore[reportUnknownArgumentType]
exc.error_msg, # type: ignore[reportUnknownArgumentType]
exc.line, # type: ignore[reportUnknownArgumentType]
exc.column, # type: ignore[reportUnknownArgumentType]
exc.source, # type: ignore[reportUnknownArgumentType]
len(exc.caret.lstrip()),
trailer=trailer or None, # type: ignore[reportUnknownArgumentType]
)
return None
else:
return None
class EQLValidator(QueryValidator):
@@ -339,7 +358,7 @@ class EQLValidator(QueryValidator):
def unique_fields(self) -> list[str]: # type: ignore[reportIncompatibleMethodOverride]
return list({str(f) for f in self.ast if isinstance(f, eql.ast.Field)}) # type: ignore[reportUnknownVariableType]
def auto_add_field(self, validation_checks_error: eql.errors.EqlParseError, index_or_dataview: str) -> None:
def auto_add_field(self, validation_checks_error: eql.EqlParseError, index_or_dataview: str) -> None:
"""Auto add a missing field to the schema."""
field_name = extract_error_field(self.query, validation_checks_error)
if not field_name:
@@ -347,237 +366,312 @@ class EQLValidator(QueryValidator):
field_type = ecs.get_all_flattened_schema().get(field_name)
update_auto_generated_schema(index_or_dataview, field_name, field_type)
def validate(self, data: "QueryRuleData", meta: RuleMeta, max_attempts: int = 10) -> None: # type: ignore[reportIncompatibleMethodOverride] # noqa: PLR0912
"""Validate an EQL query while checking TOMLRule."""
def _build_synthetic_sequence_from_subquery(self, subquery: "ast.SubqueryBy") -> str:
"""Build a minimal synthetic sequence containing the subquery for validation."""
subquery_text = str(subquery)
join_fields = ", ".join(map(str, getattr(subquery, "join_values", []) or []))
dummy_by = f" by {join_fields}" if join_fields else ""
return f"sequence\n {subquery_text}\n [any where true]{dummy_by}"
def build_validation_plan(self, data: "QueryRuleData", meta: RuleMeta) -> list[ValidationTarget]: # noqa: PLR0912 PLR0915
"""Return a unified list of validation targets for EQL validation.
Non-sequence: accumulate integration schemas per stack, optionally add stack schemas.
Sequence: build per-subquery integration targets using synthetic sequences; for datasetless
subqueries without metadata integrations, add per-subquery stack targets; optionally add
whole-query stack schemas when indicated by TOML (indices present) or no integrations.
"""
targets: list[ValidationTarget] = []
is_sequence = getattr(data, "is_sequence", False)
min_stack_str = str(meta.min_stack_version or load_current_package_version())
# Sequence planning below may add per-subquery stack targets when needed
packages_manifest = load_integrations_manifests()
packaged_integrations = TOMLRuleContents.get_packaged_integrations(data, meta, packages_manifest)
beat_types_present = parse_beats_from_index(data.index_or_dataview) if data.index_or_dataview else []
endgame_present = bool(data.index_or_dataview and "endgame-*" in data.index_or_dataview)
# Helper for union-by-stack integration targets
def add_accumulated_integration_targets(query_text: str, packaged: list[dict[str, Any]], context: str) -> None:
combined_by_stack: dict[str, dict[str, Any]] = {}
ecs_by_stack: dict[str, str] = {}
packages_by_stack: dict[str, set[str]] = {}
for integ in get_integration_schema_data(data, meta, packaged):
stack_version = integ["stack_version"]
ecs_version = integ["ecs_version"]
package = integ["package"]
schema = integ["schema"]
# prepare with index/custom/endpoint fields
if data.index_or_dataview:
for index_name in data.index_or_dataview: # type: ignore[reportArgumentType]
schema.update(**ecs.flatten(ecs.get_index_schema(index_name)))
if data.index and CUSTOM_RULES_DIR:
for index_name in data.index_or_dataview:
schema.update(**ecs.flatten(ecs.get_custom_index_schema(index_name, stack_version)))
schema.update(**ecs.flatten(ecs.get_endpoint_schemas()))
# Do not merge Beats into integration schemas; validate independently via stack targets
_ = ecs_by_stack.setdefault(stack_version, ecs_version)
packages_by_stack.setdefault(stack_version, set()).add(package)
combined_by_stack.setdefault(stack_version, {}).update(schema)
for stack_version, schema_dict in combined_by_stack.items():
ecs_version = ecs_by_stack.get(stack_version, "unknown")
pkgs_set = packages_by_stack.get(stack_version, set())
pkgs = ", ".join(sorted(pkgs_set))
err_trailer = (
f"{context}\nChecked against packages [{pkgs}]; stack: {stack_version}; ecs: {ecs_version}\n"
f"rule: {data.name} - {data.rule_id}"
)
targets.append(
ValidationTarget(
query_text=query_text,
schema=ecs.KqlSchema2Eql(schema_dict),
err_trailer=err_trailer,
min_stack_version=min_stack_str,
beat_types=None,
integration_types=sorted(pkgs_set),
kind="integration",
)
)
# Helper to add Beats/ECS (and optionally Endgame) stack targets for a given query text
def add_stack_targets(query_text: str, include_endgame: bool) -> None:
for stack_version, mapping in meta.get_validation_stack_versions().items():
beats_version = mapping["beats"]
ecs_version = mapping["ecs"]
endgame_version = mapping["endgame"]
beat_types, _, kql_schema = self.get_beats_schema(data.index_or_dataview, beats_version, ecs_version)
err_trailer = (
f"stack: {stack_version}, beats: {beats_version},ecs: {ecs_version}, endgame: {endgame_version}\n"
f"rule: {data.name} - {data.rule_id}"
)
# ECS (+beats if present)
targets.append(
ValidationTarget(
query_text=query_text,
schema=ecs.KqlSchema2Eql(kql_schema),
err_trailer=err_trailer,
min_stack_version=min_stack_str,
beat_types=beat_types,
integration_types=None,
kind="stack",
)
)
# Optionally add Endgame
if include_endgame:
endgame_schema = self.get_endgame_schema(data.index_or_dataview, endgame_version)
if endgame_schema:
targets.append(
ValidationTarget(
query_text=query_text,
schema=endgame_schema,
err_trailer=err_trailer,
min_stack_version=min_stack_str,
beat_types=None,
integration_types=None,
kind="stack",
)
)
# Sequence queries: per-subquery validation
if is_sequence:
sequence: ast.Sequence = self.ast.first # type: ignore[reportAttributeAccessIssue]
for subquery in sequence.queries: # type: ignore[reportUnknownVariableType]
subquery_datasets, _ = get_datasets_and_modules(subquery) # type: ignore[reportUnknownVariableType]
synthetic_sequence = self._build_synthetic_sequence_from_subquery(subquery) # type: ignore[reportArgumentType]
if subquery_datasets:
subquery_pkg_ints = parse_datasets(list(subquery_datasets), packages_manifest)
# Per-subquery: validate each integration combination individually (no accumulation)
for integ in get_integration_schema_data(data, meta, subquery_pkg_ints):
package = integ["package"]
package_version = integ["package_version"]
stack_version = integ["stack_version"]
ecs_version = integ["ecs_version"]
schema_dict = integ["schema"]
# prepare schema
if data.index_or_dataview:
for index_name in data.index_or_dataview: # type: ignore[reportArgumentType]
schema_dict.update(**ecs.flatten(ecs.get_index_schema(index_name)))
if data.index and CUSTOM_RULES_DIR:
for index_name in data.index_or_dataview:
schema_dict.update(
**ecs.flatten(ecs.get_custom_index_schema(index_name, stack_version))
)
schema_dict.update(**ecs.flatten(ecs.get_endpoint_schemas()))
err_trailer = (
"Subquery schema mismatch. "
f"package: {package}, package_version: {package_version}, "
f"stack: {stack_version}, ecs: {ecs_version}\n"
f"rule: {data.name} - {data.rule_id}"
)
targets.append(
ValidationTarget(
query_text=synthetic_sequence,
schema=ecs.KqlSchema2Eql(schema_dict),
err_trailer=err_trailer,
min_stack_version=min_stack_str,
beat_types=None,
integration_types=[package],
kind="integration",
)
)
# Additionally validate this subquery against Beats/ECS if beats indices are present
if beat_types_present:
add_stack_targets(synthetic_sequence, include_endgame=False)
else:
# Datasetless subquery: try metadata integrations first, else add per-subquery stack targets
meta_integrations = meta.integration
if isinstance(meta_integrations, str):
meta_integrations = [meta_integrations]
elif meta_integrations is None:
meta_integrations = []
if meta_integrations:
meta_pkg_ints = [
{"package": pkg, "integration": None}
for pkg in meta_integrations
if pkg in packages_manifest
]
add_accumulated_integration_targets(
synthetic_sequence,
meta_pkg_ints,
"Datasetless subquery validation against metadata integrations",
)
# Also validate datasetless subquery against Beats/ECS if beats indices are present
if beat_types_present:
add_stack_targets(synthetic_sequence, include_endgame=False)
else:
# Add stack targets for this datasetless subquery
add_stack_targets(synthetic_sequence, include_endgame=True)
elif packaged_integrations:
# Non-sequence queries: accumulate integrations per stack if available
add_accumulated_integration_targets(
self.query,
packaged_integrations,
"Try adding event.module or event.dataset to specify integration module",
)
# Stack targets for whole query:
# - always when no integrations are resolved; OR
# - for non-sequence queries when beats or endgame indices are present
need_stack_targets = (not packaged_integrations) or (
(not is_sequence) and (beat_types_present or endgame_present)
)
if need_stack_targets:
add_stack_targets(self.query, include_endgame=True)
return targets
def validate(self, data: "QueryRuleData", meta: RuleMeta, max_attempts: int = 10) -> None: # type: ignore[reportIncompatibleMethodOverride]
"""Validate an EQL query using a unified plan of schema combinations."""
if meta.query_schema_validation is False or meta.maturity == "deprecated":
# syntax only, which is done via self.ast
return
if data.language != "lucene":
packages_manifest = load_integrations_manifests()
package_integrations = TOMLRuleContents.get_packaged_integrations(data, meta, packages_manifest)
if data.language == "lucene":
return
for _ in range(max_attempts):
validation_checks = {"stack": None, "integrations": None}
# validate the query against fields within beats
validation_checks["stack"] = self.validate_stack_combos(data, meta) # type: ignore[reportArgumentType]
# Validate rule type configuration fields against ECS schema
set_fields, has_invalid = self.validate_rule_type_configurations(data, meta) # type: ignore[reportArgumentType]
if has_invalid and set_fields:
raise ValueError(f"Rule type configuration fields not in ECS schema: {', '.join(set_fields)}")
stack_check = validation_checks["stack"]
if package_integrations:
# validate the query against related integration fields
validation_checks["integrations"] = self.validate_integration(data, meta, package_integrations) # type: ignore[reportArgumentType]
if stack_check and not package_integrations:
# if auto add, try auto adding and then validate again
if (
"Field not recognized" in str(stack_check) # type: ignore[reportUnknownMemberType]
and RULES_CONFIG.auto_gen_schema_file
):
# auto add the field and re-validate
self.auto_add_field(stack_check, data.index_or_dataview[0]) # type: ignore[reportArgumentType]
else:
raise stack_check
elif stack_check and validation_checks["integrations"]:
# if auto add, try auto adding and then validate again
if (
"Field not recognized" in stack_check.error_msg # type: ignore[reportUnknownMemberType]
and RULES_CONFIG.auto_gen_schema_file
):
# auto add the field and re-validate
self.auto_add_field(stack_check, data.index_or_dataview[0]) # type: ignore[reportArgumentType]
else:
click.echo(f"Stack Error Trace: {stack_check}")
click.echo(f"Integrations Error Trace: {validation_checks['integrations']}")
raise ValueError("Error in both stack and integrations checks")
else:
for _ in range(max_attempts):
all_targets = self.build_validation_plan(data, meta)
has_integration = any(t.kind == "integration" for t in all_targets)
# Order targets: integrations first (if any), then stack; otherwise just stack
ordered_targets = (
[t for t in all_targets if t.kind == "integration"] + [t for t in all_targets if t.kind == "stack"]
if has_integration
else [t for t in all_targets if t.kind == "stack"]
)
first_error: EQL_ERROR_TYPES | ValueError | None = None
for t in ordered_targets:
exc = self.validate_query_text_with_schema(
t.query_text,
t.schema,
err_trailer=t.err_trailer,
min_stack_version=t.min_stack_version,
beat_types=t.beat_types,
integration_types=t.integration_types,
)
if exc is not None:
first_error = exc
break
else:
raise ValueError(f"Maximum validation attempts exceeded for {data.rule_id} - {data.name}")
if first_error is None:
# All targets passed
return
rule_type_config_fields, rule_type_config_validation_failed = self.validate_rule_type_configurations(
data, # type: ignore[reportArgumentType]
meta,
)
if rule_type_config_validation_failed:
raise ValueError(
f"""Rule type config values are not ECS compliant, check these values:
{rule_type_config_fields}"""
)
# Attempt auto-add only when unknown field and enabled; then retry
if (
isinstance(first_error, eql.EqlParseError)
and "Field not recognized" in str(first_error)
and RULES_CONFIG.auto_gen_schema_file
and data.index_or_dataview
):
self.auto_add_field(first_error, data.index_or_dataview[0]) # type: ignore[reportArgumentType]
continue
def validate_stack_combos(self, data: QueryRuleData, meta: RuleMeta) -> EQL_ERROR_TYPES | None | ValueError:
"""Validate the query against ECS and beats schemas across stack combinations."""
for stack_version, mapping in meta.get_validation_stack_versions().items():
beats_version = mapping["beats"]
ecs_version = mapping["ecs"]
endgame_version = mapping["endgame"]
err_trailer = (
f"stack: {stack_version}, beats: {beats_version},ecs: {ecs_version}, endgame: {endgame_version}"
)
# Raise the enriched parse error (includes target trailer + metadata)
raise first_error
beat_types, _, schema = self.get_beats_schema(data.index_or_dataview, beats_version, ecs_version)
endgame_schema = self.get_endgame_schema(data.index_or_dataview, endgame_version)
eql_schema = ecs.KqlSchema2Eql(schema)
raise ValueError(f"Maximum validation attempts exceeded for {data.rule_id} - {data.name}")
# validate query against the beats and eql schema
exc = self.validate_query_with_schema( # type: ignore[reportUnknownVariableType]
data=data,
schema=eql_schema,
err_trailer=err_trailer,
beat_types=beat_types,
min_stack_version=meta.min_stack_version, # type: ignore[reportArgumentType]
)
if exc:
return exc
if endgame_schema:
# validate query against the endgame schema
exc = self.validate_query_with_schema(
data=data,
schema=endgame_schema,
err_trailer=err_trailer,
min_stack_version=meta.min_stack_version, # type: ignore[reportArgumentType]
)
if exc:
raise exc
return None
def validate_integration( # noqa: PLR0912
def validate_query_text_with_schema( # noqa: PLR0913
self,
data: QueryRuleData,
meta: RuleMeta,
package_integrations: list[dict[str, Any]],
) -> EQL_ERROR_TYPES | None | ValueError:
"""Validate an EQL query while checking TOMLRule against integration schemas."""
if meta.query_schema_validation is False or meta.maturity == "deprecated":
# syntax only, which is done via self.ast
return None
error_fields = {}
package_schemas: dict[str, Any] = {}
# Initialize package_schemas with a nested structure
for integration_data in package_integrations:
package = integration_data["package"]
integration = integration_data["integration"]
if integration:
package_schemas.setdefault(package, {}).setdefault(integration, {})
else:
package_schemas.setdefault(package, {})
# Process each integration schema
for integration_schema_data in get_integration_schema_data(data, meta, package_integrations):
ecs_version = integration_schema_data["ecs_version"]
package, integration = (
integration_schema_data["package"],
integration_schema_data["integration"],
)
package_version = integration_schema_data["package_version"]
integration_schema = integration_schema_data["schema"]
stack_version = integration_schema_data["stack_version"]
# add non-ecs-schema fields for edge cases not added to the integration
if data.index_or_dataview:
for index_name in data.index_or_dataview:
integration_schema.update(**ecs.flatten(ecs.get_index_schema(index_name)))
# Add custom schema fields for appropriate stack version
if data.index_or_dataview and CUSTOM_RULES_DIR:
for index_name in data.index_or_dataview:
integration_schema.update(**ecs.flatten(ecs.get_custom_index_schema(index_name, stack_version)))
# add endpoint schema fields for multi-line fields
integration_schema.update(**ecs.flatten(ecs.get_endpoint_schemas()))
package_schemas[package].update(**integration_schema)
eql_schema = ecs.KqlSchema2Eql(integration_schema)
err_trailer = (
f"stack: {stack_version}, integration: {integration},"
f"ecs: {ecs_version}, package: {package}, package_version: {package_version}"
)
# Validate the query against the schema
exc = self.validate_query_with_schema(
data=data,
schema=eql_schema,
err_trailer=err_trailer,
min_stack_version=meta.min_stack_version, # type: ignore[reportArgumentType]
)
if isinstance(exc, eql.EqlParseError):
message = exc.error_msg # type: ignore[reportUnknownVariableType]
if message == "Unknown field" or "Field not recognized" in message:
field = extract_error_field(self.query, exc)
trailer = (
f"\n\tTry adding event.module or data_stream.dataset to specify integration module\n\t"
f"Will check against integrations {meta.integration} combined.\n\t"
f"{package=}, {integration=}, {package_version=}, "
f"{stack_version=}, {ecs_version=}"
)
error_fields[field] = {
"error": exc,
"trailer": trailer,
"package": package,
"integration": integration,
}
if data.get("notify", False):
print(f"\nWarning: `{field}` in `{data.name}` not found in schema. {trailer}")
else:
return exc
# Check error fields against schemas of different packages or different integrations
for field, error_data in list(error_fields.items()): # type: ignore[reportUnknownArgumentType]
error_package, error_integration = ( # type: ignore[reportUnknownVariableType]
error_data["package"],
error_data["integration"],
)
for package, integrations_or_schema in package_schemas.items():
if error_integration is None:
# Compare against the schema directly if there's no integration
if error_package != package and field in integrations_or_schema:
del error_fields[field]
else:
# Compare against integration schemas
for integration, schema in integrations_or_schema.items():
check_alt_schema = ( # type: ignore[reportUnknownVariableType]
error_package != package or (error_package == package and error_integration != integration)
)
if check_alt_schema and field in schema:
del error_fields[field]
# raise the first error
if error_fields:
_, data = next(iter(error_fields.items())) # type: ignore[reportUnknownArgumentType]
return data["error"] # type: ignore[reportIndexIssue]
return None
def validate_query_with_schema(
self,
data: "QueryRuleData", # noqa: ARG002
query_text: str,
schema: ecs.KqlSchema2Eql | endgame.EndgameSchema,
err_trailer: str,
min_stack_version: str,
beat_types: list[str] | None = None,
integration_types: list[str] | None = None,
) -> EQL_ERROR_TYPES | ValueError | None:
"""Validate the query against the schema."""
"""Validate the provided EQL query text against the schema (variant of validate_query_with_schema)."""
try:
config = set_eql_config(min_stack_version)
with config, schema, eql.parser.elasticsearch_syntax, eql.parser.ignore_missing_functions:
_ = eql.parse_query(self.query) # type: ignore[reportUnknownMemberType]
_ = eql.parse_query(query_text) # type: ignore[reportUnknownMemberType]
except eql.EqlParseError as exc:
message = exc.error_msg
trailer = err_trailer
trailer_parts: list[str] = []
# If the error is an unknown field and the field was referenced as optional (prefixed with '?'),
# treat this target as non-fatal to honor EQL optional semantics.
field = extract_error_field(query_text, exc)
if (
field
and ("Unknown field" in message or "Field not recognized" in message)
and f"?{field}" in self.query
):
return None
if "Unknown field" in message and beat_types:
trailer = f"\nTry adding event.module or data_stream.dataset to specify beats module\n\n{trailer}"
elif "Field not recognized" in message:
trailer_parts.insert(0, "Try adding event.module or event.dataset to specify beats module")
elif "Field not recognized" in message and isinstance(schema, ecs.KqlSchema2Eql):
text_fields = self.text_fields(schema)
if text_fields:
fields_str = ", ".join(text_fields)
trailer = f"\neql does not support text fields: {fields_str}\n\n{trailer}"
trailer_parts.insert(0, f"eql does not support text fields: {fields_str}")
# Surface integration packages if available
if integration_types:
pkgs = ", ".join(integration_types)
trailer_parts.append(f"integration_types: [{pkgs}]")
# Surface beat types if available (stack plan)
if beat_types:
trailer_parts.append(f"beat_types: [{', '.join(beat_types)}]")
if err_trailer:
trailer_parts.append(err_trailer)
trailer = "\n\n".join(tp for tp in trailer_parts if tp)
return exc.__class__(
exc.error_msg, # type: ignore[reportUnknownArgumentType]
exc.line, # type: ignore[reportUnknownArgumentType]
@@ -586,30 +680,34 @@ class EQLValidator(QueryValidator):
len(exc.caret.lstrip()),
trailer=trailer,
)
except Exception as exc: # noqa: BLE001
print(err_trailer)
return exc # type: ignore[reportReturnType]
def validate_rule_type_configurations(self, data: EQLRuleData, meta: RuleMeta) -> tuple[list[str | None], bool]:
"""Validate EQL rule type configurations."""
if data.timestamp_field or data.event_category_override or data.tiebreaker_field:
# get a list of rule type configuration fields
# Get a list of rule type configuration fields
fields = ["timestamp_field", "event_category_override", "tiebreaker_field"]
set_fields = list(filter(None, (data.get(field) for field in fields))) # type: ignore[reportUnknownVariableType]
def validate_rule_type_configurations(self, data: EQLRuleData, meta: RuleMeta) -> tuple[list[str], bool]:
"""Validate EQL rule type configurations (timestamp_field, event_category_override, tiebreaker_field).
# get stack_version and ECS schema
min_stack_version = meta.get("min_stack_version")
if min_stack_version is None:
min_stack_version = Version.parse(load_current_package_version(), optional_minor_and_patch=True)
ecs_version = get_stack_schemas()[str(min_stack_version)]["ecs"]
schema = ecs.get_schema(ecs_version)
Returns a tuple of the list of configured field names (non-empty) and a boolean indicating whether
any are not present in the ECS schema for the rule's minimum stack version (or current package version).
"""
configured: list[str] = []
if data.timestamp_field:
configured.append(data.timestamp_field)
if data.event_category_override:
configured.append(data.event_category_override)
if data.tiebreaker_field:
configured.append(data.tiebreaker_field)
# return a list of rule type config field values and whether any are not in the schema
return (set_fields, any(f not in schema for f in set_fields)) # type: ignore[reportUnknownVariableType]
# if rule type fields are not set, return an empty list and False
return [], False
if not configured:
return [], False
stack_version = str(meta.min_stack_version or load_current_package_version())
min_stack_version = str(Version.parse(stack_version, optional_minor_and_patch=True))
stack_map = get_stack_schemas(stack_version)
ecs_version = stack_map[min_stack_version]["ecs"]
schema = ecs.get_schema(ecs_version)
return configured, any(f not in schema for f in configured)
class ESQLValidator(QueryValidator):
+1 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "detection_rules"
version = "1.3.33"
version = "1.4.0"
description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Securitys Detection Engine."
readme = "README.md"
requires-python = ">=3.12"
+2 -32
View File
@@ -34,7 +34,6 @@ from detection_rules.rule import (
TOMLRuleContents,
)
from detection_rules.rule_loader import FILE_PATTERN, RULES_CONFIG
from detection_rules.rule_validators import EQLValidator, KQLValidator
from detection_rules.schemas import definitions, get_min_supported_stack_version, get_stack_schemas
from detection_rules.utils import INTEGRATION_RULE_DIR, PatchedTemplate, get_path, load_etc_dump, make_git
from detection_rules.version_lock import loaded_version_lock
@@ -997,7 +996,7 @@ class TestRuleMetadata(BaseRuleTest):
build_rule(query, "eql")
for query in invalid_integration_queries_eql:
with self.assertRaises(ValueError):
with self.assertRaises(eql.EqlSchemaError):
build_rule(query, "eql")
# kql
for query in valid_queries_kql:
@@ -1008,38 +1007,9 @@ class TestRuleMetadata(BaseRuleTest):
build_rule(query, "kuery")
for query in invalid_integration_queries_kql:
with self.assertRaises(ValueError):
with self.assertRaises(kql.KqlParseError):
build_rule(query, "kuery")
def test_event_dataset(self):
for rule in self.all_rules:
if isinstance(rule.contents.data, QueryRuleData):
# Need to pick validator based on language
if rule.contents.data.language == "kuery":
test_validator = KQLValidator(rule.contents.data.query)
elif rule.contents.data.language == "eql":
test_validator = EQLValidator(rule.contents.data.query)
else:
continue
data = rule.contents.data
meta = rule.contents.metadata
if (meta.query_schema_validation is not False or meta.maturity != "deprecated") and (
isinstance(data, QueryRuleData) and data.language != "lucene"
):
packages_manifest = load_integrations_manifests()
pkg_integrations = TOMLRuleContents.get_packaged_integrations(data, meta, packages_manifest)
validation_integrations_check = None
if pkg_integrations:
# validate the query against related integration fields
validation_integrations_check = test_validator.validate_integration(
data, meta, pkg_integrations
)
if validation_integrations_check and "event.dataset" in rule.contents.data.query:
raise validation_integrations_check
def test_min_stack_version_supported(self):
"""Test that rules have a min_stack_version that is supported in stack-schema-map.yaml."""
failures = []
+269 -50
View File
@@ -3,64 +3,283 @@
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import eql
from detection_rules.rule_loader import RuleCollection
from .base import BaseRuleTest
class TestEQLInSet(BaseRuleTest):
"""Test EQL rule query in set override."""
def mk_metadata(integrations: list[str], comments: str = "Test metadata") -> dict:
"""Create rule metadata dictionary."""
return {
"creation_date": "2020/12/15",
"integration": integrations,
"maturity": "production",
"min_stack_comments": comments,
"min_stack_version": "8.3.0",
"updated_date": "2024/08/30",
}
def test_eql_in_set(self):
"""Test that the query validation is working correctly."""
def mk_rule(
*,
name: str,
rule_id: str,
description: str,
risk_score: int,
query: str,
) -> dict:
"""Create rule dictionary."""
return {
"author": ["Elastic"],
"description": description,
"language": "eql",
"name": name,
"risk_score": risk_score,
"rule_id": rule_id,
"severity": "low",
"type": "eql",
"query": query,
}
class TestEQLInSet(BaseRuleTest):
"""Test EQL rule query in_set override (separate failing and passing cases)."""
def test_eql_in_set_invalid_ip(self) -> None:
rc = RuleCollection()
eql_rule = {
"metadata": {
"creation_date": "2020/12/15",
"integration": ["endpoint", "windows"],
"maturity": "production",
"min_stack_comments": "New fields added: required_fields, related_integrations, setup",
"min_stack_version": "8.3.0",
"updated_date": "2024/03/26",
},
"rule": {
"author": ["Elastic"],
"description": """
Test Rule.
""",
"false_positives": ["Fake."],
"from": "now-9m",
"index": ["winlogbeat-*", "logs-endpoint.events.*", "logs-windows.sysmon_operational-*"],
"language": "eql",
"license": "Elastic License v2",
"name": "Fake Test Rule",
"references": [
"https://example.com",
],
"risk_score": 47,
"rule_id": "4fffae5d-8b7d-4e48-88b1-979ed42fd9a3",
"severity": "medium",
"tags": [
"Domain: Endpoint",
"OS: Windows",
"Use Case: Threat Detection",
"Tactic: Execution",
"Data Source: Elastic Defend",
"Data Source: Sysmon",
],
"type": "eql",
"query": """
sequence by host.id, process.entity_id with maxspan = 5s
[network where destination.ip in ("127.0.0.1", "::1")]
""",
},
query = """
sequence by host.id, process.entity_id with maxspan = 5s
[network where destination.ip in ("127.0.0.1", "::1")]
"""
rule_dict = {
"metadata": mk_metadata(
["endpoint", "windows"], comments="New fields added: required_fields, related_integrations, setup"
),
"rule": mk_rule(
name="Fake Test Rule",
rule_id="4fffae5d-8b7d-4e48-88b1-979ed42fd9a3",
description="Test Rule.",
risk_score=47,
query=query,
),
}
expected_error_message = r"Error in both stack and integrations checks"
with self.assertRaisesRegex(ValueError, expected_error_message):
rc.load_dict(eql_rule)
# Change to appropriate destination.address field
eql_rule["rule"]["query"] = """
with self.assertRaisesRegex(eql.EqlTypeMismatchError, r"Unable to compare ip to string"):
rc.load_dict(rule_dict)
def test_eql_in_set_valid_address(self) -> None:
rc = RuleCollection()
query = """
sequence by host.id, process.entity_id with maxspan = 10s
[network where destination.address in ("192.168.1.1", "::1")]
"""
rc.load_dict(eql_rule)
rule_dict = {
"metadata": mk_metadata(
["endpoint", "windows"], comments="New fields added: required_fields, related_integrations, setup"
),
"rule": mk_rule(
name="Fake Test Rule",
rule_id="4fffae5d-8b7d-4e48-88b1-979ed42fd9a3",
description="Test Rule.",
risk_score=47,
query=query,
),
}
rc.load_dict(rule_dict)
class TestEQLSequencePerIntegration(BaseRuleTest):
"""Tests for per-subquery EQL validation against the correct integration.package schema."""
def test_sequence_valid_per_package(self) -> None:
"""Test that a sequence with subquerys from different packages validates correctly."""
rc = RuleCollection()
query = """
sequence with maxspan=30m
[any where event.dataset == "azure.identity_protection"] by azure.identityprotection.properties.user_principal_name
[any where event.dataset == "azure.auditlogs"] by azure.auditlogs.properties.initiated_by.user.userPrincipalName
"""
rule = {
"metadata": mk_metadata(["azure"], comments="Per-subquery integration validation"),
"rule": mk_rule(
name="EQL sequence per integration test",
rule_id="1b6e2f77-8e1f-4f8d-9f72-1d8e5f3e5f11",
description="Validate per-subquery integration.package schemas.",
risk_score=40,
query=query,
),
}
# Should load without error because each subquery validates against its own package schema
rc.load_dict(rule)
def test_sequence_invalid_join_field_wrong_package(self) -> None:
"""Test that a sequence with a join field from a different package fails validation."""
rc = RuleCollection()
query = """
sequence with maxspan=30m
[any where event.dataset == "azure.identity_protection"] by azure.identityprotection.properties.user_principal_name
[any where event.dataset == "azure.identity_protection"] by azure.auditlogs.properties.initiated_by.user.userPrincipalName
"""
bad_rule = {
"metadata": mk_metadata(["azure"], comments="Per-subquery integration validation"),
"rule": mk_rule(
name="EQL sequence per integration test",
rule_id="1b6e2f77-8e1f-4f8d-9f72-1d8e5f3e5f11",
description="Validate per-subquery integration.package schemas.",
risk_score=40,
query=query,
),
}
# Expect failure: join field belongs to a different package than the subquery dataset
with self.assertRaisesRegex(eql.EqlSchemaError, r"Field not recognized"):
rc.load_dict(bad_rule)
def test_sequence_top_level_by_and_runs_across_integrations_valid(self) -> None:
"""Sequence-level by and per-subquery runs; subqueries use different integrations and validate correctly."""
rc = RuleCollection()
query = """
sequence by host.id, user.id with maxspan=1s
[any where event.dataset == "azure.auditlogs" and event.action == "Register device"] by azure.auditlogs.properties.initiated_by.user.userPrincipalName with runs=5
[authentication where event.dataset == "okta.system" and okta.event_type == "user.mfa.okta_verify.deny_push"] by okta.actor.id
"""
rule = {
"metadata": mk_metadata(["azure", "okta"], comments="Top-level sequence by and runs"),
"rule": mk_rule(
name="EQL sequence with top-level by and runs",
rule_id="4e5f6a99-4567-4f8d-9f72-1d8e5f3e5f15",
description="Validate top-level sequence by and per-subquery runs across integrations.",
risk_score=42,
query=query,
),
}
rc.load_dict(rule)
def test_sequence_top_level_by_and_runs_across_integrations_invalid_join(self) -> None:
"""Sequence-level by with runs; okta subquery incorrectly uses an azure join field causing validation failure."""
rc = RuleCollection()
query = """
sequence by host.id, user.id with maxspan=1s
[any where event.dataset == "azure.auditlogs" and event.action == "Register device"] by azure.auditlogs.properties.initiated_by.user.userPrincipalName with runs=5
[authentication where event.dataset == "okta.system" and okta.event_type == "user.mfa.okta_verify.deny_push"] by azure.auditlogs.properties.initiated_by.user.userPrincipalName
"""
bad_rule = {
"metadata": mk_metadata(["azure", "okta"], comments="Top-level sequence by and runs invalid join"),
"rule": mk_rule(
name="EQL sequence with top-level by and runs invalid",
rule_id="4e5f6a99-4567-4f8d-9f72-1d8e5f3e5f16",
description="Invalid: okta subquery uses azure join field.",
risk_score=42,
query=query,
),
}
with self.assertRaisesRegex(eql.EqlSchemaError, r"Field not recognized"):
rc.load_dict(bad_rule)
def test_sequence_okta_missing_in_metadata_but_present_in_dataset(self) -> None:
"""Okta dataset appears in a subquery but is not listed in metadata; dataset should drive schema selection."""
rc = RuleCollection()
query = """
sequence with maxspan=30m
[any where event.dataset == "azure.identity_protection"] by azure.identityprotection.properties.user_principal_name
[any where event.dataset == "azure.auditlogs" and event.action == "Register device"] by azure.auditlogs.properties.initiated_by.user.userPrincipalName
[authentication where event.dataset == "okta.system" and okta.event_type == "user.mfa.okta_verify.deny_push"] by okta.actor.id
"""
rule = {
# Intentionally do not include "okta" in metadata.integrations
"metadata": mk_metadata(["azure"], comments="Okta present via dataset only"),
"rule": mk_rule(
name="EQL sequence with okta dataset only",
rule_id="3c4d5e77-2345-4f8d-9f72-1d8e5f3e5f13",
description="Validate that dataset usage includes okta schema even if not in metadata.",
risk_score=50,
query=query,
),
}
# Should load without error because get_packaged_integrations includes packages parsed from datasets
rc.load_dict(rule)
def test_sequence_across_integrations_valid(self) -> None:
"""Sequence uses azure and crowdstrike datasets; each subquery validates against its own integration."""
rc = RuleCollection()
query = """
sequence with maxspan=30m
[any where event.dataset == "azure.auditlogs"] by azure.auditlogs.properties.initiated_by.user.userPrincipalName
[any where event.dataset == "crowdstrike.fdr"] by process.executable
"""
rule = {
"metadata": mk_metadata(["azure", "crowdstrike"], comments="Cross-integration per-subquery validation"),
"rule": mk_rule(
name="EQL sequence across integrations valid",
rule_id="2a3b4c55-1234-4f8d-9f72-1d8e5f3e5f11",
description="Validate sequence subquerys across azure and crowdstrike integrations.",
risk_score=35,
query=query,
),
}
rc.load_dict(rule)
def test_sequence_across_integrations_invalid_crowdstrike_subquery_azure_field(self) -> None:
"""CrowdStrike subquery incorrectly uses an azure join field, which should fail validation."""
rc = RuleCollection()
query = """
sequence with maxspan=30m
[any where event.dataset == "azure.auditlogs"] by azure.auditlogs.properties.initiated_by.user.userPrincipalName
[any where event.dataset == "crowdstrike.fdr"] by azure.auditlogs.properties.initiated_by.user.userPrincipalName
"""
bad_rule = {
"metadata": mk_metadata(["azure", "crowdstrike"], comments="Cross-integration per-subquery validation"),
"rule": mk_rule(
name="EQL sequence across integrations invalid",
rule_id="2a3b4c55-1234-4f8d-9f72-1d8e5f3e5f12",
description="CrowdStrike subquery incorrectly uses an azure join field.",
risk_score=35,
query=query,
),
}
with self.assertRaisesRegex(eql.EqlSchemaError, r"Field not recognized"):
rc.load_dict(bad_rule)
def test_sequence_mixed_dataset_and_datasetless_subquery_invalid_field(self) -> None:
"""First subquery has dataset; second is datasetless with an invalid vendor field; with no metadata integration
for the datasetless subquery, integration validation so overall validation should fail.
"""
rc = RuleCollection()
query = """
sequence with maxspan=30m
[any where event.dataset == "azure.auditlogs"] by azure.auditlogs.properties.initiated_by.user.userPrincipalName
[any where foo.invalid_field == "badfield"] by host.id
"""
bad_rule = {
# No integrations in metadata: datasetless subquery should not be validated against any integration
"metadata": mk_metadata([], comments="Mixed dataset and datasetless invalid field"),
"rule": mk_rule(
name="EQL sequence mixed dataset and datasetless invalid",
rule_id="5f6071aa-5678-4f8d-9f72-1d8e5f3e5f17",
description="Second datasetless subquery contains an invalid field; expect failure.",
risk_score=33,
query=query,
),
}
with self.assertRaisesRegex(eql.EqlSchemaError, r"Field not recognized"):
rc.load_dict(bad_rule)
def test_sequence_datasetless_subquery_with_metadata_integration_valid(self) -> None:
"""Datasetless azure subquery uses azure.* fields with metadata including azure; should validate and pass."""
rc = RuleCollection()
query = """
sequence with maxspan=30m
[any where azure.identityprotection.properties.user_principal_name != null] by azure.identityprotection.properties.user_principal_name
[any where event.dataset == "azure.auditlogs"] by azure.auditlogs.properties.initiated_by.user.userPrincipalName
"""
rule = {
"metadata": mk_metadata(["azure"], comments="Datasetless subquery with azure fields"),
"rule": mk_rule(
name="EQL sequence datasetless azure subquery",
rule_id="3d4e5f88-3456-4f8d-9f72-1d8e5f3e5f14",
description="Datasetless azure subquery relies on metadata/field inference for package schema.",
risk_score=30,
query=query,
),
}
rc.load_dict(rule)