Ensure EQL rules with maxspan have a long enough lookback window (#1361)
* Add the following properties to EQLRuleData:
- max_span
- look_back
- interval_ratio
* Add the following tests:
- test_eql_lookback
- test_eql_interval_to_maxspan
Co-authored-by: Ross Wolf <31489089+rw-access@users.noreply.github.com>
(cherry picked from commit 7759fa2500)
This commit is contained in:
committed by
github-actions[bot]
parent
0ae93632fc
commit
3c9079faf3
+46
-1
@@ -12,9 +12,9 @@ from pathlib import Path
|
||||
from typing import Literal, Union, Optional, List, Any, Dict
|
||||
from uuid import uuid4
|
||||
|
||||
import eql
|
||||
from marshmallow import ValidationError, validates_schema
|
||||
|
||||
|
||||
from . import utils
|
||||
from .mixins import MarshmallowDataclassMixin
|
||||
from .rule_formatter import toml_write, nested_normalize
|
||||
@@ -270,6 +270,51 @@ class EQLRuleData(QueryRuleData):
|
||||
type: Literal["eql"]
|
||||
language: Literal["eql"]
|
||||
|
||||
@staticmethod
|
||||
def convert_time_span(span: str) -> int:
|
||||
"""Convert time span in datemath to value in milliseconds."""
|
||||
amount = int("".join(char for char in span if char.isdigit()))
|
||||
unit = eql.ast.TimeUnit("".join(char for char in span if char.isalpha()))
|
||||
return eql.ast.TimeRange(amount, unit).as_milliseconds()
|
||||
|
||||
def convert_relative_delta(self, lookback: str) -> int:
|
||||
now = len("now")
|
||||
min_length = now + len('+5m')
|
||||
|
||||
if lookback.startswith("now") and len(lookback) >= min_length:
|
||||
lookback = lookback[len("now"):]
|
||||
sign = lookback[0] # + or -
|
||||
span = lookback[1:]
|
||||
amount = self.convert_time_span(span)
|
||||
return amount * (-1 if sign == "-" else 1)
|
||||
else:
|
||||
return self.convert_time_span(lookback)
|
||||
|
||||
@cached_property
|
||||
def max_span(self) -> Optional[int]:
|
||||
"""Maxspan value for sequence rules if defined."""
|
||||
if eql.utils.get_query_type(self.ast) == 'sequence' and hasattr(self.ast.first, 'max_span'):
|
||||
return self.ast.first.max_span.as_milliseconds() if self.ast.first.max_span else None
|
||||
|
||||
@cached_property
|
||||
def look_back(self) -> Optional[Union[int, Literal['unknown']]]:
|
||||
"""Lookback value of a rule."""
|
||||
# https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#date-math
|
||||
to = self.convert_relative_delta(self.to) if self.to else 0
|
||||
from_ = self.convert_relative_delta(self.from_ or "now-6m")
|
||||
|
||||
if not (to or from_):
|
||||
return 'unknown'
|
||||
else:
|
||||
return to - from_
|
||||
|
||||
@cached_property
|
||||
def interval_ratio(self) -> Optional[float]:
|
||||
"""Ratio of interval time window / max_span time window."""
|
||||
if self.max_span:
|
||||
interval = self.convert_time_span(self.interval or '5m')
|
||||
return interval / self.max_span
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ThreatMatchRuleData(QueryRuleData):
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
"""Test that all rules have valid metadata and syntax."""
|
||||
import os
|
||||
import re
|
||||
import warnings
|
||||
from collections import defaultdict
|
||||
from pathlib import Path
|
||||
|
||||
@@ -457,6 +458,55 @@ class TestRuleTiming(BaseRuleTest):
|
||||
err_msg = f'The following rules should have a longer `from` defined, due to indexes used\n {rules_str}'
|
||||
self.fail(err_msg)
|
||||
|
||||
def test_eql_lookback(self):
|
||||
"""Ensure EQL rules lookback => max_span, when defined."""
|
||||
unknowns = []
|
||||
invalids = []
|
||||
ten_minutes = 10 * 60 * 1000
|
||||
|
||||
for rule in self.all_rules:
|
||||
if rule.contents.data.type == 'eql' and rule.contents.data.max_span:
|
||||
if rule.contents.data.look_back == 'unknown':
|
||||
unknowns.append(self.rule_str(rule, trailer=None))
|
||||
else:
|
||||
look_back = rule.contents.data.look_back
|
||||
max_span = rule.contents.data.max_span
|
||||
expected = look_back + ten_minutes
|
||||
|
||||
if expected < max_span:
|
||||
invalids.append(f'{self.rule_str(rule)} lookback: {look_back}, maxspan: {max_span}, '
|
||||
f'expected: >={expected}')
|
||||
|
||||
if unknowns:
|
||||
warn_str = '\n'.join(unknowns)
|
||||
warnings.warn(f'Unable to determine lookbacks for the following rules:\n{warn_str}')
|
||||
|
||||
if invalids:
|
||||
invalids_str = '\n'.join(invalids)
|
||||
self.fail(f'The following rules have longer max_spans than lookbacks:\n{invalids_str}')
|
||||
|
||||
def test_eql_interval_to_maxspan(self):
|
||||
"""Check the ratio of interval to maxspan for eql rules."""
|
||||
invalids = []
|
||||
five_minutes = 5 * 60 * 1000
|
||||
|
||||
for rule in self.all_rules:
|
||||
if rule.contents.data.type == 'eql':
|
||||
interval = rule.contents.data.interval or five_minutes
|
||||
maxspan = rule.contents.data.max_span
|
||||
ratio = rule.contents.data.interval_ratio
|
||||
|
||||
# we want to test for at least a ratio of: interval >= 1/2 maxspan
|
||||
# but we only want to make an exception and cap the ratio at 5m interval (2.5m maxspan)
|
||||
if maxspan and maxspan > (five_minutes / 2) and ratio and ratio < .5:
|
||||
expected = maxspan // 2
|
||||
err_msg = f'{self.rule_str(rule)} interval: {interval}, maxspan: {maxspan}, expected: >={expected}'
|
||||
invalids.append(err_msg)
|
||||
|
||||
if invalids:
|
||||
invalids_str = '\n'.join(invalids)
|
||||
self.fail(f'The following rules have intervals too short for their given max_spans (ms):\n{invalids_str}')
|
||||
|
||||
|
||||
class TestLicense(BaseRuleTest):
|
||||
"""Test rule license."""
|
||||
|
||||
Reference in New Issue
Block a user