[FR] Add EQL Rule Type Configuration Fields (#2918)

* adding initial EQL fields to EQLRuleData

* added validation

* adjusted validation

* fixed flake errors

* adjusted type linting; variable names

* added a min_compat to EQL Rule fields

* Update detection_rules/rule_validators.py

* Update detection_rules/rule_validators.py

Co-authored-by: Mika Ayenson <Mikaayenson@users.noreply.github.com>

---------

Co-authored-by: Mika Ayenson <Mikaayenson@users.noreply.github.com>
This commit is contained in:
Terrance DeJesus
2023-07-13 11:20:14 -04:00
committed by GitHub
parent 9414095d96
commit 9f29129585
2 changed files with 37 additions and 2 deletions
+3
View File
@@ -698,6 +698,9 @@ class EQLRuleData(QueryRuleData):
"""EQL rules are a special case of query rules."""
type: Literal["eql"]
language: Literal["eql"]
timestamp_field: Optional[str] = field(metadata=dict(metadata=dict(min_compat="8.0")))
event_category_override: Optional[str] = field(metadata=dict(metadata=dict(min_compat="8.0")))
tiebreaker_field: Optional[str] = field(metadata=dict(metadata=dict(min_compat="8.0")))
def convert_relative_delta(self, lookback: str) -> int:
now = len("now")
+34 -2
View File
@@ -5,7 +5,8 @@
"""Validation logic for rules containing queries."""
from functools import cached_property
from typing import List, Optional, Union
from typing import List, Optional, Union, Tuple
from semver import Version
import eql
@@ -13,7 +14,9 @@ import kql
from . import ecs, endgame
from .integrations import get_integration_schema_data, load_integrations_manifests
from .rule import QueryRuleData, QueryValidator, RuleMeta, TOMLRuleContents
from .misc import load_current_package_version
from .schemas import get_stack_schemas
from .rule import QueryRuleData, QueryValidator, RuleMeta, TOMLRuleContents, EQLRuleData
EQL_ERROR_TYPES = Union[eql.EqlCompileError,
eql.EqlError,
@@ -194,6 +197,12 @@ class EQLValidator(QueryValidator):
if validation_checks["stack"] and validation_checks["integrations"]:
raise ValueError(f"Error in both stack and integrations checks: {validation_checks}")
rule_type_config_fields, rule_type_config_validation_failed = \
self.validate_rule_type_configurations(data, meta)
if rule_type_config_validation_failed:
raise ValueError(f"""Rule type config values are not ECS compliant, check these values:
{rule_type_config_fields}""")
def validate_stack_combos(self, data: QueryRuleData, meta: RuleMeta) -> Union[EQL_ERROR_TYPES, None, ValueError]:
"""Validate the query against ECS and beats schemas across stack combinations."""
for stack_version, mapping in meta.get_validation_stack_versions().items():
@@ -308,6 +317,29 @@ class EQLValidator(QueryValidator):
print(err_trailer)
return exc
def validate_rule_type_configurations(self, data: EQLRuleData, meta: RuleMeta) -> \
Tuple[List[Optional[str]], bool]:
"""Validate EQL rule type configurations."""
if data.timestamp_field or data.event_category_override or data.tiebreaker_field:
# get a list of rule type configuration fields
# Get a list of rule type configuration fields
fields = ["timestamp_field", "event_category_override", "tiebreaker_field"]
set_fields = list(filter(None, (data.get(field) for field in fields)))
# get stack_version and ECS schema
min_stack_version = meta.get("min_stack_version")
if min_stack_version is None:
min_stack_version = Version.parse(load_current_package_version(), optional_minor_and_patch=True)
ecs_version = get_stack_schemas()[str(min_stack_version)]['ecs']
schema = ecs.get_schema(ecs_version)
# return a list of rule type config field values and whether any are not in the schema
return (set_fields, any([f not in schema.keys() for f in set_fields]))
else:
# if rule type fields are not set, return an empty list and False
return [], False
def extract_error_field(exc: Union[eql.EqlParseError, kql.KqlParseError]) -> Optional[str]:
"""Extract the field name from an EQL or KQL parse error."""