Files
sigma-rules/tests/test_rules_remote.py
Mika Ayenson, PhD cc66323d1d [Bug] Omit ES|QL engine columns from required_fields (#6027)
* Omit Esql.* columns from ES|QL rule required_fields

Kibana treats required_fields as index mappings. ES|QL stats and
similar commands expose Esql.* and Esql_priv.* result columns that
are not mapped on source indices, which produced noisy validation
warnings for shipped rules.

Filter those names when building required_fields. Add a check in
test_esql_endpoint_alerts_index when remote ES|QL validation runs.

Fixes #6026.

* Move required_fields check to its own remote test

* Iterate production rules in required_fields test

* Use direct get_required_fields call in remote test

Skip to_api_format() and call data.get_required_fields(index) directly,
gated on ESQLRuleData. Mirrors the ESQLValidator scope of the fix and
avoids the unrelated packaging steps that to_api_format runs per rule.

* Bump version to 1.6.30

* Centralize ES|QL dynamic field prefix tuple

Define ESQL_DYNAMIC_FIELD_PREFIXES = ("Esql.", "Esql_priv.") in
schemas/definitions.py and reuse it in QueryValidator.get_required_fields,
ESQLValidator.validate_columns_index_mapping, and the remote test.
Single source of truth and consistent ordering across the codebase.
2026-05-01 17:37:31 -05:00

347 lines
18 KiB
Python

# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import unittest
from copy import deepcopy
import pytest
from detection_rules.esql_errors import (
EsqlSchemaError,
EsqlSemanticError,
EsqlSyntaxError,
EsqlTypeMismatchError,
EsqlUnknownIndexError,
)
from detection_rules.misc import (
get_default_config,
getdefault,
)
from detection_rules.rule import ESQLRuleData
from detection_rules.rule_loader import RuleCollection
from detection_rules.schemas.definitions import ESQL_DYNAMIC_FIELD_PREFIXES
from detection_rules.utils import get_path, load_rule_contents
from .base import BaseRuleTest
@unittest.skipIf(get_default_config() is None, "Skipping remote validation due to missing config")
@unittest.skipIf(
not getdefault("remote_esql_validation")(), "Skipping remote validation because remote_esql_validation is False"
)
class TestRemoteRules(BaseRuleTest):
"""Test rules against a remote Elastic stack instance."""
def test_get_hashable_content_required_fields_popped_when_keep_star_used(self):
"""Hashable content must not contain required_fields when query uses keep * or field wildcards."""
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
original_production_rule = load_rule_contents(file_path)
production_rule = deepcopy(original_production_rule)[0]
# Non-aggregate queries must include _id, _version, _index in keep when keep is not exactly "*"
base = "from logs-aws.cloudtrail* metadata _id, _version, _index\n"
base += '| where event.action == "start"\n | eval Esql.entity_type = cloud.target.entity.type\n | keep '
keep_star_queries = [
base + "*",
base + "Esql.*, _id, _version, _index",
base + "host.name, Esql.*, _id, _version, _index",
base + "event.*, _id, _version, _index",
]
for query in keep_star_queries:
production_rule_copy = deepcopy(production_rule)
production_rule_copy["rule"]["query"] = query
rule = RuleCollection().load_dict(production_rule_copy)
hashable = rule.contents.get_hashable_content()
assert "required_fields" not in hashable, f"required_fields should be popped for keep-star query: {query!r}"
def test_get_hashable_content_required_fields_kept_when_no_keep_star(self):
"""Hashable content keeps required_fields when query uses explicit keep (no wildcards)."""
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
original_production_rule = load_rule_contents(file_path)
production_rule = deepcopy(original_production_rule)[0]
production_rule["rule"]["query"] = """
from logs-aws.cloudtrail* metadata _id, _version, _index
| where event.action == "start"
| keep _id, _version, _index
"""
rule = RuleCollection().load_dict(production_rule)
api = rule.contents.to_api_format()
hashable = rule.contents.get_hashable_content()
if "required_fields" in api:
assert "required_fields" in hashable, "required_fields must not be popped when keep has no wildcards"
def test_get_hashable_content_required_fields_kept_for_explicit_keep_only(self):
"""Hashable content keeps required_fields when keep lists only explicit fields."""
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
original_production_rule = load_rule_contents(file_path)
production_rule = deepcopy(original_production_rule)[0]
production_rule["rule"]["query"] = """
from logs-aws.cloudtrail* metadata _id, _version, _index
| where event.action == "start"
| keep host.name, user.name, _id, _version, _index
"""
rule = RuleCollection().load_dict(production_rule)
api = rule.contents.to_api_format()
hashable = rule.contents.get_hashable_content()
if "required_fields" in api:
assert "required_fields" in hashable
def test_esql_related_integrations(self):
"""Test an ESQL rule has its related integrations built correctly."""
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
original_production_rule = load_rule_contents(file_path)
production_rule = deepcopy(original_production_rule)[0]
production_rule["metadata"]["integration"] = ["aws"]
production_rule["rule"]["query"] = """
from logs-aws.cloudtrail* metadata _id, _version, _index
| where @timestamp > now() - 30 minutes
and event.dataset in ("aws.cloudtrail", "aws.billing")
and aws.cloudtrail.user_identity.arn is not null
and aws.cloudtrail.user_identity.type == "IAMUser"
| keep
aws.cloudtrail.user_identity.type, _id, _version, _index
"""
rule = RuleCollection().load_dict(production_rule)
related_integrations = rule.contents.to_api_format()["related_integrations"]
for integration in related_integrations:
assert integration["package"] == "aws", f"Expected 'aws', but got {integration['package']}"
def test_esql_non_dataset_package_related_integrations(self):
"""Test an ESQL rule has its related integrations built correctly with a non dataset package."""
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
original_production_rule = load_rule_contents(file_path)
production_rule = deepcopy(original_production_rule)[0]
production_rule["metadata"]["integration"] = ["aws_bedrock"]
production_rule["rule"]["query"] = """
from logs-aws_bedrock.invocation-* metadata _id, _version, _index
// Filter for access denied errors from GenAI responses
| where gen_ai.response.error_code == "AccessDeniedException"
// keep ECS and response fields
| keep
user.id,
gen_ai.request.model.id,
cloud.account.id,
gen_ai.response.error_code, _id, _version, _index
"""
rule = RuleCollection().load_dict(production_rule)
related_integrations = rule.contents.to_api_format()["related_integrations"]
for integration in related_integrations:
assert integration["package"] == "aws_bedrock", f"Expected 'aws_bedrock', but got {integration['package']}"
def test_esql_event_dataset_schema_error(self):
"""Test an ESQL rule that uses event.dataset field in the query that restricts the schema failing validation."""
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
original_production_rule = load_rule_contents(file_path)
# Test that a ValidationError is raised if the query doesn't match the schema
production_rule = deepcopy(original_production_rule)[0]
del production_rule["metadata"]["integration"]
production_rule["rule"]["query"] = """
from logs-aws.cloudtrail* metadata _id, _version, _index
| where @timestamp > now() - 30 minutes
and event.dataset in ("aws.billing")
and aws.cloudtrail.user_identity.type == "IAMUser"
| keep
aws.cloudtrail.user_identity.type, _id, _version, _index
"""
with pytest.raises(EsqlSchemaError):
_ = RuleCollection().load_dict(production_rule)
def test_esql_type_mismatch_error(self):
"""Test an ESQL rule that produces a type error comparing a keyword to a number."""
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
original_production_rule = load_rule_contents(file_path)
# Test that a ValidationError is raised if the query doesn't match the schema
production_rule = deepcopy(original_production_rule)[0]
production_rule["metadata"]["integration"] = ["aws"]
production_rule["rule"]["query"] = """
from logs-aws.cloudtrail* metadata _id, _version, _index
| where @timestamp > now() - 30 minutes
and event.dataset in ("aws.cloudtrail", "aws.billing")
and aws.cloudtrail.user_identity.type == 5
| keep
aws.cloudtrail.user_identity.type, _id, _version, _index
"""
with pytest.raises(EsqlTypeMismatchError):
_ = RuleCollection().load_dict(production_rule)
def test_esql_syntax_error(self):
"""Test an ESQL rule that incorrectly using = for comparison."""
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
original_production_rule = load_rule_contents(file_path)
# Test that a ValidationError is raised if the query doesn't match the schema
production_rule = deepcopy(original_production_rule)[0]
production_rule["metadata"]["integration"] = ["aws"]
production_rule["rule"]["query"] = """
from logs-aws.cloudtrail* metadata _id, _version, _index
| where @timestamp > now() - 30 minutes
and event.dataset in ("aws.cloudtrail", "aws.billing")
and aws.cloudtrail.user_identity.type = "IAMUser"
| keep
aws.cloudtrail.user_identity.type, _id, _version, _index
"""
with pytest.raises(EsqlSyntaxError):
_ = RuleCollection().load_dict(production_rule)
def test_esql_filtered_index(self):
"""Test an ESQL rule's schema validation to properly reduce it by the index and handle implicit fields."""
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
original_production_rule = load_rule_contents(file_path)
# Test that a ValidationError is raised if the query doesn't match the schema
production_rule = deepcopy(original_production_rule)[0]
production_rule["metadata"]["integration"] = ["aws"]
production_rule["rule"]["query"] = """
from logs-aws.cloud* metadata _id, _version, _index
| where @timestamp > now() - 30 minutes
and aws.cloudtrail.user_identity.type == "IAMUser"
| keep
aws.*, _id, _version, _index
"""
_ = RuleCollection().load_dict(production_rule)
def test_esql_filtered_index_error(self):
"""Test an ESQL rule's schema validation when reduced by the index and check if the field is present."""
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
original_production_rule = load_rule_contents(file_path)
# Test that a ValidationError is raised if the query doesn't match the schema
production_rule = deepcopy(original_production_rule)[0]
production_rule["metadata"]["integration"] = ["aws"]
production_rule["rule"]["query"] = """
from logs-aws.billing* metadata _id, _version, _index
| where @timestamp > now() - 30 minutes
and aws.cloudtrail.user_identity.type == "IAMUser"
| keep
aws.cloudtrail.user_identity.type, _id, _version, _index
"""
with pytest.raises(EsqlSchemaError):
_ = RuleCollection().load_dict(production_rule)
def test_new_line_split_index(self):
"""Test an ESQL rule's index validation to ensure that it can handle new line split indices."""
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
original_production_rule = load_rule_contents(file_path)
production_rule = deepcopy(original_production_rule)[0]
production_rule["metadata"]["integration"] = ["aws"]
production_rule["rule"]["query"] = """
from logs-aws.cloud*, logs-network_traffic.http-*,
logs-nginx.access-* metadata _id, _version, _index
| where @timestamp > now() - 30 minutes
and aws.cloudtrail.user_identity.type == "IAMUser"
| keep
aws.*, _id, _version, _index
"""
_ = RuleCollection().load_dict(production_rule)
def test_esql_endpoint_alerts_index(self):
"""Test an ESQL rule's schema validation using ecs fields in the alerts index."""
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
original_production_rule = load_rule_contents(file_path)
production_rule = deepcopy(original_production_rule)[0]
production_rule["rule"]["query"] = """
from logs-endpoint.alerts-* METADATA _id, _version, _index
| where event.code in ("malicious_file", "memory_signature", "shellcode_thread") and rule.name is not null
| keep host.id, rule.name, event.code, _id, _version, _index
| stats Esql.host_id_count_distinct = count_distinct(host.id) by rule.name, event.code
| where Esql.host_id_count_distinct >= 3
"""
_ = RuleCollection().load_dict(production_rule)
def test_esql_required_fields_omit_engine_columns(self):
"""ESQL required_fields must not list Esql.* / Esql_priv.* (not index mappings)."""
for rule in self.all_rules:
data = rule.contents.data
if not isinstance(data, ESQLRuleData):
continue
index = data.get("index") or []
for rf in data.get_required_fields(index) or []:
name = rf["name"]
assert not name.startswith(ESQL_DYNAMIC_FIELD_PREFIXES), (
f"{rule.id} - {rule.name}: required_fields must not include ES|QL engine columns "
f"(not index mappings): {name!r}"
)
def test_esql_endpoint_unknown_index(self):
"""Test an ESQL rule's index validation. This is expected to error on an unknown index."""
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
original_production_rule = load_rule_contents(file_path)
production_rule = deepcopy(original_production_rule)[0]
production_rule["rule"]["query"] = """
from logs-endpoint.fake-*
| where event.code in ("malicious_file", "memory_signature", "shellcode_thread") and rule.name is not null
| keep host.id, rule.name, event.code, _id, _version, _index
| stats Esql.host_id_count_distinct = count_distinct(host.id) by rule.name, event.code
| where Esql.host_id_count_distinct >= 3
"""
with pytest.raises(EsqlUnknownIndexError):
_ = RuleCollection().load_dict(production_rule)
def test_esql_endpoint_alerts_index_endpoint_fields(self):
"""Test an ESQL rule's schema validation using endpoint integration fields in the alerts index."""
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
original_production_rule = load_rule_contents(file_path)
production_rule = deepcopy(original_production_rule)[0]
production_rule["metadata"]["integration"] = []
production_rule["rule"]["query"] = """
from logs-endpoint.alerts-* METADATA _id, _version, _index
| where event.code in ("malicious_file", "memory_signature", "shellcode_thread") and rule.name is not null and file.Ext.entry_modified > 0
| keep host.id, rule.name, event.code, file.Ext.entry_modified, _id, _version, _index
| stats Esql.host_id_count_distinct = count_distinct(host.id) by rule.name, event.code, file.Ext.entry_modified
| where Esql.host_id_count_distinct >= 3
"""
# This is a type mismatch error due to Elastic Container project including the Endpoint integration by default.
# Otherwise one would expect an EsqlSchemaError due to the field not being present in the alerts index.
with pytest.raises(EsqlTypeMismatchError):
_ = RuleCollection().load_dict(production_rule)
def test_esql_filtered_keep(self):
"""Test an ESQL rule's schema validation."""
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
original_production_rule = load_rule_contents(file_path)
# Test that a ValidationError is raised if the query doesn't match the schema
production_rule = deepcopy(original_production_rule)[0]
production_rule["metadata"]["integration"] = ["aws"]
production_rule["rule"]["query"] = """
from logs-aws.billing* metadata _id, _version, _index
| where @timestamp > now() - 30 minutes and aws.cloudtrail.user_identity.type == "IAMUser"
| keep host.id, rule.name, event.code, _id, _version, _index
| stats Esql.host_id_count_distinct = count_distinct(host.id) by rule.name, event.code
| where Esql.host_id_count_distinct >= 3
"""
with pytest.raises(EsqlSchemaError):
_ = RuleCollection().load_dict(production_rule)
def test_esql_non_ecs_schema_conflict_resolution(self):
"""Test an ESQL rule that has a known conflict between non_ecs and integrations for correct handling."""
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
original_production_rule = load_rule_contents(file_path)
production_rule = deepcopy(original_production_rule)[0]
production_rule["metadata"]["integration"] = ["azure", "o365"]
production_rule["rule"]["query"] = """
from logs-azure.signinlogs-* metadata _id, _version, _index
| where @timestamp > now() - 30 minutes
and event.dataset in ("azure.signinlogs")
and event.outcome == "success"
and azure.signinlogs.properties.user_id is not null
| keep
event.outcome, _id, _version, _index
"""
_ = RuleCollection().load_dict(production_rule)
def test_esql_multiple_keeps(self):
"""Test an ESQL rule that has multiple keeps in the query."""
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
original_production_rule = load_rule_contents(file_path)
production_rule = deepcopy(original_production_rule)[0]
production_rule["metadata"]["integration"] = ["aws"]
production_rule["rule"]["query"] = """
from logs-aws.cloudtrail* metadata _id, _version, _index
| where @timestamp > now() - 30 minutes
and event.dataset in ("aws.cloudtrail", "aws.billing")
and aws.cloudtrail.user_identity.type == "IAMUser"
| keep aws.cloudtrail.user_identity.type, _id, _version, _index
| eval Esql.user_type = aws.cloudtrail.user_identity.type
| keep Esql.user_type
"""
with pytest.raises(EsqlSemanticError):
_ = RuleCollection().load_dict(production_rule)