From 9f291295855f3de2b8759cdbe256385b94dbb718 Mon Sep 17 00:00:00 2001 From: Terrance DeJesus <99630311+terrancedejesus@users.noreply.github.com> Date: Thu, 13 Jul 2023 11:20:14 -0400 Subject: [PATCH] [FR] Add EQL Rule Type Configuration Fields (#2918) * adding initial EQL fields to EQLRuleData * added validation * adjusted validation * fixed flake errors * adjusted type linting; variable names * added a min_compat to EQL Rule fields * Update detection_rules/rule_validators.py * Update detection_rules/rule_validators.py Co-authored-by: Mika Ayenson --------- Co-authored-by: Mika Ayenson --- detection_rules/rule.py | 3 +++ detection_rules/rule_validators.py | 36 ++++++++++++++++++++++++++++-- 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/detection_rules/rule.py b/detection_rules/rule.py index 7f7f85898..00c97d61a 100644 --- a/detection_rules/rule.py +++ b/detection_rules/rule.py @@ -698,6 +698,9 @@ class EQLRuleData(QueryRuleData): """EQL rules are a special case of query rules.""" type: Literal["eql"] language: Literal["eql"] + timestamp_field: Optional[str] = field(metadata=dict(metadata=dict(min_compat="8.0"))) + event_category_override: Optional[str] = field(metadata=dict(metadata=dict(min_compat="8.0"))) + tiebreaker_field: Optional[str] = field(metadata=dict(metadata=dict(min_compat="8.0"))) def convert_relative_delta(self, lookback: str) -> int: now = len("now") diff --git a/detection_rules/rule_validators.py b/detection_rules/rule_validators.py index 079a6edee..08606d02c 100644 --- a/detection_rules/rule_validators.py +++ b/detection_rules/rule_validators.py @@ -5,7 +5,8 @@ """Validation logic for rules containing queries.""" from functools import cached_property -from typing import List, Optional, Union +from typing import List, Optional, Union, Tuple +from semver import Version import eql @@ -13,7 +14,9 @@ import kql from . import ecs, endgame from .integrations import get_integration_schema_data, load_integrations_manifests -from .rule import QueryRuleData, QueryValidator, RuleMeta, TOMLRuleContents +from .misc import load_current_package_version +from .schemas import get_stack_schemas +from .rule import QueryRuleData, QueryValidator, RuleMeta, TOMLRuleContents, EQLRuleData EQL_ERROR_TYPES = Union[eql.EqlCompileError, eql.EqlError, @@ -194,6 +197,12 @@ class EQLValidator(QueryValidator): if validation_checks["stack"] and validation_checks["integrations"]: raise ValueError(f"Error in both stack and integrations checks: {validation_checks}") + rule_type_config_fields, rule_type_config_validation_failed = \ + self.validate_rule_type_configurations(data, meta) + if rule_type_config_validation_failed: + raise ValueError(f"""Rule type config values are not ECS compliant, check these values: + {rule_type_config_fields}""") + def validate_stack_combos(self, data: QueryRuleData, meta: RuleMeta) -> Union[EQL_ERROR_TYPES, None, ValueError]: """Validate the query against ECS and beats schemas across stack combinations.""" for stack_version, mapping in meta.get_validation_stack_versions().items(): @@ -308,6 +317,29 @@ class EQLValidator(QueryValidator): print(err_trailer) return exc + def validate_rule_type_configurations(self, data: EQLRuleData, meta: RuleMeta) -> \ + Tuple[List[Optional[str]], bool]: + """Validate EQL rule type configurations.""" + if data.timestamp_field or data.event_category_override or data.tiebreaker_field: + + # get a list of rule type configuration fields + # Get a list of rule type configuration fields + fields = ["timestamp_field", "event_category_override", "tiebreaker_field"] + set_fields = list(filter(None, (data.get(field) for field in fields))) + + # get stack_version and ECS schema + min_stack_version = meta.get("min_stack_version") + if min_stack_version is None: + min_stack_version = Version.parse(load_current_package_version(), optional_minor_and_patch=True) + ecs_version = get_stack_schemas()[str(min_stack_version)]['ecs'] + schema = ecs.get_schema(ecs_version) + + # return a list of rule type config field values and whether any are not in the schema + return (set_fields, any([f not in schema.keys() for f in set_fields])) + else: + # if rule type fields are not set, return an empty list and False + return [], False + def extract_error_field(exc: Union[eql.EqlParseError, kql.KqlParseError]) -> Optional[str]: """Extract the field name from an EQL or KQL parse error."""