cc66323d1d
* Omit Esql.* columns from ES|QL rule required_fields Kibana treats required_fields as index mappings. ES|QL stats and similar commands expose Esql.* and Esql_priv.* result columns that are not mapped on source indices, which produced noisy validation warnings for shipped rules. Filter those names when building required_fields. Add a check in test_esql_endpoint_alerts_index when remote ES|QL validation runs. Fixes #6026. * Move required_fields check to its own remote test * Iterate production rules in required_fields test * Use direct get_required_fields call in remote test Skip to_api_format() and call data.get_required_fields(index) directly, gated on ESQLRuleData. Mirrors the ESQLValidator scope of the fix and avoids the unrelated packaging steps that to_api_format runs per rule. * Bump version to 1.6.30 * Centralize ES|QL dynamic field prefix tuple Define ESQL_DYNAMIC_FIELD_PREFIXES = ("Esql.", "Esql_priv.") in schemas/definitions.py and reuse it in QueryValidator.get_required_fields, ESQLValidator.validate_columns_index_mapping, and the remote test. Single source of truth and consistent ordering across the codebase.
347 lines
18 KiB
Python
347 lines
18 KiB
Python
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
|
# or more contributor license agreements. Licensed under the Elastic License
|
|
# 2.0; you may not use this file except in compliance with the Elastic License
|
|
# 2.0.
|
|
|
|
import unittest
|
|
from copy import deepcopy
|
|
|
|
import pytest
|
|
|
|
from detection_rules.esql_errors import (
|
|
EsqlSchemaError,
|
|
EsqlSemanticError,
|
|
EsqlSyntaxError,
|
|
EsqlTypeMismatchError,
|
|
EsqlUnknownIndexError,
|
|
)
|
|
from detection_rules.misc import (
|
|
get_default_config,
|
|
getdefault,
|
|
)
|
|
from detection_rules.rule import ESQLRuleData
|
|
from detection_rules.rule_loader import RuleCollection
|
|
from detection_rules.schemas.definitions import ESQL_DYNAMIC_FIELD_PREFIXES
|
|
from detection_rules.utils import get_path, load_rule_contents
|
|
|
|
from .base import BaseRuleTest
|
|
|
|
|
|
@unittest.skipIf(get_default_config() is None, "Skipping remote validation due to missing config")
|
|
@unittest.skipIf(
|
|
not getdefault("remote_esql_validation")(), "Skipping remote validation because remote_esql_validation is False"
|
|
)
|
|
class TestRemoteRules(BaseRuleTest):
|
|
"""Test rules against a remote Elastic stack instance."""
|
|
|
|
def test_get_hashable_content_required_fields_popped_when_keep_star_used(self):
|
|
"""Hashable content must not contain required_fields when query uses keep * or field wildcards."""
|
|
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
|
|
original_production_rule = load_rule_contents(file_path)
|
|
production_rule = deepcopy(original_production_rule)[0]
|
|
# Non-aggregate queries must include _id, _version, _index in keep when keep is not exactly "*"
|
|
base = "from logs-aws.cloudtrail* metadata _id, _version, _index\n"
|
|
base += '| where event.action == "start"\n | eval Esql.entity_type = cloud.target.entity.type\n | keep '
|
|
keep_star_queries = [
|
|
base + "*",
|
|
base + "Esql.*, _id, _version, _index",
|
|
base + "host.name, Esql.*, _id, _version, _index",
|
|
base + "event.*, _id, _version, _index",
|
|
]
|
|
for query in keep_star_queries:
|
|
production_rule_copy = deepcopy(production_rule)
|
|
production_rule_copy["rule"]["query"] = query
|
|
rule = RuleCollection().load_dict(production_rule_copy)
|
|
hashable = rule.contents.get_hashable_content()
|
|
assert "required_fields" not in hashable, f"required_fields should be popped for keep-star query: {query!r}"
|
|
|
|
def test_get_hashable_content_required_fields_kept_when_no_keep_star(self):
|
|
"""Hashable content keeps required_fields when query uses explicit keep (no wildcards)."""
|
|
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
|
|
original_production_rule = load_rule_contents(file_path)
|
|
production_rule = deepcopy(original_production_rule)[0]
|
|
production_rule["rule"]["query"] = """
|
|
from logs-aws.cloudtrail* metadata _id, _version, _index
|
|
| where event.action == "start"
|
|
| keep _id, _version, _index
|
|
"""
|
|
rule = RuleCollection().load_dict(production_rule)
|
|
api = rule.contents.to_api_format()
|
|
hashable = rule.contents.get_hashable_content()
|
|
if "required_fields" in api:
|
|
assert "required_fields" in hashable, "required_fields must not be popped when keep has no wildcards"
|
|
|
|
def test_get_hashable_content_required_fields_kept_for_explicit_keep_only(self):
|
|
"""Hashable content keeps required_fields when keep lists only explicit fields."""
|
|
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
|
|
original_production_rule = load_rule_contents(file_path)
|
|
production_rule = deepcopy(original_production_rule)[0]
|
|
production_rule["rule"]["query"] = """
|
|
from logs-aws.cloudtrail* metadata _id, _version, _index
|
|
| where event.action == "start"
|
|
| keep host.name, user.name, _id, _version, _index
|
|
"""
|
|
rule = RuleCollection().load_dict(production_rule)
|
|
api = rule.contents.to_api_format()
|
|
hashable = rule.contents.get_hashable_content()
|
|
if "required_fields" in api:
|
|
assert "required_fields" in hashable
|
|
|
|
def test_esql_related_integrations(self):
|
|
"""Test an ESQL rule has its related integrations built correctly."""
|
|
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
|
|
original_production_rule = load_rule_contents(file_path)
|
|
production_rule = deepcopy(original_production_rule)[0]
|
|
production_rule["metadata"]["integration"] = ["aws"]
|
|
production_rule["rule"]["query"] = """
|
|
from logs-aws.cloudtrail* metadata _id, _version, _index
|
|
| where @timestamp > now() - 30 minutes
|
|
and event.dataset in ("aws.cloudtrail", "aws.billing")
|
|
and aws.cloudtrail.user_identity.arn is not null
|
|
and aws.cloudtrail.user_identity.type == "IAMUser"
|
|
| keep
|
|
aws.cloudtrail.user_identity.type, _id, _version, _index
|
|
"""
|
|
rule = RuleCollection().load_dict(production_rule)
|
|
related_integrations = rule.contents.to_api_format()["related_integrations"]
|
|
for integration in related_integrations:
|
|
assert integration["package"] == "aws", f"Expected 'aws', but got {integration['package']}"
|
|
|
|
def test_esql_non_dataset_package_related_integrations(self):
|
|
"""Test an ESQL rule has its related integrations built correctly with a non dataset package."""
|
|
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
|
|
original_production_rule = load_rule_contents(file_path)
|
|
production_rule = deepcopy(original_production_rule)[0]
|
|
production_rule["metadata"]["integration"] = ["aws_bedrock"]
|
|
production_rule["rule"]["query"] = """
|
|
from logs-aws_bedrock.invocation-* metadata _id, _version, _index
|
|
// Filter for access denied errors from GenAI responses
|
|
| where gen_ai.response.error_code == "AccessDeniedException"
|
|
// keep ECS and response fields
|
|
| keep
|
|
user.id,
|
|
gen_ai.request.model.id,
|
|
cloud.account.id,
|
|
gen_ai.response.error_code, _id, _version, _index
|
|
"""
|
|
rule = RuleCollection().load_dict(production_rule)
|
|
related_integrations = rule.contents.to_api_format()["related_integrations"]
|
|
for integration in related_integrations:
|
|
assert integration["package"] == "aws_bedrock", f"Expected 'aws_bedrock', but got {integration['package']}"
|
|
|
|
def test_esql_event_dataset_schema_error(self):
|
|
"""Test an ESQL rule that uses event.dataset field in the query that restricts the schema failing validation."""
|
|
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
|
|
original_production_rule = load_rule_contents(file_path)
|
|
# Test that a ValidationError is raised if the query doesn't match the schema
|
|
production_rule = deepcopy(original_production_rule)[0]
|
|
del production_rule["metadata"]["integration"]
|
|
production_rule["rule"]["query"] = """
|
|
from logs-aws.cloudtrail* metadata _id, _version, _index
|
|
| where @timestamp > now() - 30 minutes
|
|
and event.dataset in ("aws.billing")
|
|
and aws.cloudtrail.user_identity.type == "IAMUser"
|
|
| keep
|
|
aws.cloudtrail.user_identity.type, _id, _version, _index
|
|
"""
|
|
with pytest.raises(EsqlSchemaError):
|
|
_ = RuleCollection().load_dict(production_rule)
|
|
|
|
def test_esql_type_mismatch_error(self):
|
|
"""Test an ESQL rule that produces a type error comparing a keyword to a number."""
|
|
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
|
|
original_production_rule = load_rule_contents(file_path)
|
|
# Test that a ValidationError is raised if the query doesn't match the schema
|
|
production_rule = deepcopy(original_production_rule)[0]
|
|
production_rule["metadata"]["integration"] = ["aws"]
|
|
production_rule["rule"]["query"] = """
|
|
from logs-aws.cloudtrail* metadata _id, _version, _index
|
|
| where @timestamp > now() - 30 minutes
|
|
and event.dataset in ("aws.cloudtrail", "aws.billing")
|
|
and aws.cloudtrail.user_identity.type == 5
|
|
| keep
|
|
aws.cloudtrail.user_identity.type, _id, _version, _index
|
|
"""
|
|
with pytest.raises(EsqlTypeMismatchError):
|
|
_ = RuleCollection().load_dict(production_rule)
|
|
|
|
def test_esql_syntax_error(self):
|
|
"""Test an ESQL rule that incorrectly using = for comparison."""
|
|
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
|
|
original_production_rule = load_rule_contents(file_path)
|
|
# Test that a ValidationError is raised if the query doesn't match the schema
|
|
production_rule = deepcopy(original_production_rule)[0]
|
|
production_rule["metadata"]["integration"] = ["aws"]
|
|
production_rule["rule"]["query"] = """
|
|
from logs-aws.cloudtrail* metadata _id, _version, _index
|
|
| where @timestamp > now() - 30 minutes
|
|
and event.dataset in ("aws.cloudtrail", "aws.billing")
|
|
and aws.cloudtrail.user_identity.type = "IAMUser"
|
|
| keep
|
|
aws.cloudtrail.user_identity.type, _id, _version, _index
|
|
"""
|
|
with pytest.raises(EsqlSyntaxError):
|
|
_ = RuleCollection().load_dict(production_rule)
|
|
|
|
def test_esql_filtered_index(self):
|
|
"""Test an ESQL rule's schema validation to properly reduce it by the index and handle implicit fields."""
|
|
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
|
|
original_production_rule = load_rule_contents(file_path)
|
|
# Test that a ValidationError is raised if the query doesn't match the schema
|
|
production_rule = deepcopy(original_production_rule)[0]
|
|
production_rule["metadata"]["integration"] = ["aws"]
|
|
production_rule["rule"]["query"] = """
|
|
from logs-aws.cloud* metadata _id, _version, _index
|
|
| where @timestamp > now() - 30 minutes
|
|
and aws.cloudtrail.user_identity.type == "IAMUser"
|
|
| keep
|
|
aws.*, _id, _version, _index
|
|
"""
|
|
_ = RuleCollection().load_dict(production_rule)
|
|
|
|
def test_esql_filtered_index_error(self):
|
|
"""Test an ESQL rule's schema validation when reduced by the index and check if the field is present."""
|
|
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
|
|
original_production_rule = load_rule_contents(file_path)
|
|
# Test that a ValidationError is raised if the query doesn't match the schema
|
|
production_rule = deepcopy(original_production_rule)[0]
|
|
production_rule["metadata"]["integration"] = ["aws"]
|
|
production_rule["rule"]["query"] = """
|
|
from logs-aws.billing* metadata _id, _version, _index
|
|
| where @timestamp > now() - 30 minutes
|
|
and aws.cloudtrail.user_identity.type == "IAMUser"
|
|
| keep
|
|
aws.cloudtrail.user_identity.type, _id, _version, _index
|
|
"""
|
|
with pytest.raises(EsqlSchemaError):
|
|
_ = RuleCollection().load_dict(production_rule)
|
|
|
|
def test_new_line_split_index(self):
|
|
"""Test an ESQL rule's index validation to ensure that it can handle new line split indices."""
|
|
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
|
|
original_production_rule = load_rule_contents(file_path)
|
|
production_rule = deepcopy(original_production_rule)[0]
|
|
production_rule["metadata"]["integration"] = ["aws"]
|
|
production_rule["rule"]["query"] = """
|
|
from logs-aws.cloud*, logs-network_traffic.http-*,
|
|
logs-nginx.access-* metadata _id, _version, _index
|
|
| where @timestamp > now() - 30 minutes
|
|
and aws.cloudtrail.user_identity.type == "IAMUser"
|
|
| keep
|
|
aws.*, _id, _version, _index
|
|
"""
|
|
_ = RuleCollection().load_dict(production_rule)
|
|
|
|
def test_esql_endpoint_alerts_index(self):
|
|
"""Test an ESQL rule's schema validation using ecs fields in the alerts index."""
|
|
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
|
|
original_production_rule = load_rule_contents(file_path)
|
|
production_rule = deepcopy(original_production_rule)[0]
|
|
production_rule["rule"]["query"] = """
|
|
from logs-endpoint.alerts-* METADATA _id, _version, _index
|
|
| where event.code in ("malicious_file", "memory_signature", "shellcode_thread") and rule.name is not null
|
|
| keep host.id, rule.name, event.code, _id, _version, _index
|
|
| stats Esql.host_id_count_distinct = count_distinct(host.id) by rule.name, event.code
|
|
| where Esql.host_id_count_distinct >= 3
|
|
"""
|
|
_ = RuleCollection().load_dict(production_rule)
|
|
|
|
def test_esql_required_fields_omit_engine_columns(self):
|
|
"""ESQL required_fields must not list Esql.* / Esql_priv.* (not index mappings)."""
|
|
for rule in self.all_rules:
|
|
data = rule.contents.data
|
|
if not isinstance(data, ESQLRuleData):
|
|
continue
|
|
index = data.get("index") or []
|
|
for rf in data.get_required_fields(index) or []:
|
|
name = rf["name"]
|
|
assert not name.startswith(ESQL_DYNAMIC_FIELD_PREFIXES), (
|
|
f"{rule.id} - {rule.name}: required_fields must not include ES|QL engine columns "
|
|
f"(not index mappings): {name!r}"
|
|
)
|
|
|
|
def test_esql_endpoint_unknown_index(self):
|
|
"""Test an ESQL rule's index validation. This is expected to error on an unknown index."""
|
|
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
|
|
original_production_rule = load_rule_contents(file_path)
|
|
production_rule = deepcopy(original_production_rule)[0]
|
|
production_rule["rule"]["query"] = """
|
|
from logs-endpoint.fake-*
|
|
| where event.code in ("malicious_file", "memory_signature", "shellcode_thread") and rule.name is not null
|
|
| keep host.id, rule.name, event.code, _id, _version, _index
|
|
| stats Esql.host_id_count_distinct = count_distinct(host.id) by rule.name, event.code
|
|
| where Esql.host_id_count_distinct >= 3
|
|
"""
|
|
with pytest.raises(EsqlUnknownIndexError):
|
|
_ = RuleCollection().load_dict(production_rule)
|
|
|
|
def test_esql_endpoint_alerts_index_endpoint_fields(self):
|
|
"""Test an ESQL rule's schema validation using endpoint integration fields in the alerts index."""
|
|
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
|
|
original_production_rule = load_rule_contents(file_path)
|
|
production_rule = deepcopy(original_production_rule)[0]
|
|
production_rule["metadata"]["integration"] = []
|
|
production_rule["rule"]["query"] = """
|
|
from logs-endpoint.alerts-* METADATA _id, _version, _index
|
|
| where event.code in ("malicious_file", "memory_signature", "shellcode_thread") and rule.name is not null and file.Ext.entry_modified > 0
|
|
| keep host.id, rule.name, event.code, file.Ext.entry_modified, _id, _version, _index
|
|
| stats Esql.host_id_count_distinct = count_distinct(host.id) by rule.name, event.code, file.Ext.entry_modified
|
|
| where Esql.host_id_count_distinct >= 3
|
|
"""
|
|
# This is a type mismatch error due to Elastic Container project including the Endpoint integration by default.
|
|
# Otherwise one would expect an EsqlSchemaError due to the field not being present in the alerts index.
|
|
with pytest.raises(EsqlTypeMismatchError):
|
|
_ = RuleCollection().load_dict(production_rule)
|
|
|
|
def test_esql_filtered_keep(self):
|
|
"""Test an ESQL rule's schema validation."""
|
|
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
|
|
original_production_rule = load_rule_contents(file_path)
|
|
# Test that a ValidationError is raised if the query doesn't match the schema
|
|
production_rule = deepcopy(original_production_rule)[0]
|
|
production_rule["metadata"]["integration"] = ["aws"]
|
|
production_rule["rule"]["query"] = """
|
|
from logs-aws.billing* metadata _id, _version, _index
|
|
| where @timestamp > now() - 30 minutes and aws.cloudtrail.user_identity.type == "IAMUser"
|
|
| keep host.id, rule.name, event.code, _id, _version, _index
|
|
| stats Esql.host_id_count_distinct = count_distinct(host.id) by rule.name, event.code
|
|
| where Esql.host_id_count_distinct >= 3
|
|
"""
|
|
with pytest.raises(EsqlSchemaError):
|
|
_ = RuleCollection().load_dict(production_rule)
|
|
|
|
def test_esql_non_ecs_schema_conflict_resolution(self):
|
|
"""Test an ESQL rule that has a known conflict between non_ecs and integrations for correct handling."""
|
|
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
|
|
original_production_rule = load_rule_contents(file_path)
|
|
production_rule = deepcopy(original_production_rule)[0]
|
|
production_rule["metadata"]["integration"] = ["azure", "o365"]
|
|
production_rule["rule"]["query"] = """
|
|
from logs-azure.signinlogs-* metadata _id, _version, _index
|
|
| where @timestamp > now() - 30 minutes
|
|
and event.dataset in ("azure.signinlogs")
|
|
and event.outcome == "success"
|
|
and azure.signinlogs.properties.user_id is not null
|
|
| keep
|
|
event.outcome, _id, _version, _index
|
|
"""
|
|
_ = RuleCollection().load_dict(production_rule)
|
|
|
|
def test_esql_multiple_keeps(self):
|
|
"""Test an ESQL rule that has multiple keeps in the query."""
|
|
file_path = get_path(["tests", "data", "command_control_dummy_production_rule.toml"])
|
|
original_production_rule = load_rule_contents(file_path)
|
|
production_rule = deepcopy(original_production_rule)[0]
|
|
production_rule["metadata"]["integration"] = ["aws"]
|
|
production_rule["rule"]["query"] = """
|
|
from logs-aws.cloudtrail* metadata _id, _version, _index
|
|
| where @timestamp > now() - 30 minutes
|
|
and event.dataset in ("aws.cloudtrail", "aws.billing")
|
|
and aws.cloudtrail.user_identity.type == "IAMUser"
|
|
| keep aws.cloudtrail.user_identity.type, _id, _version, _index
|
|
| eval Esql.user_type = aws.cloudtrail.user_identity.type
|
|
| keep Esql.user_type
|
|
"""
|
|
with pytest.raises(EsqlSemanticError):
|
|
_ = RuleCollection().load_dict(production_rule)
|