diff --git a/detection_rules/rule.py b/detection_rules/rule.py index 108efca19..6e2bee580 100644 --- a/detection_rules/rule.py +++ b/detection_rules/rule.py @@ -12,9 +12,9 @@ from pathlib import Path from typing import Literal, Union, Optional, List, Any, Dict from uuid import uuid4 +import eql from marshmallow import ValidationError, validates_schema - from . import utils from .mixins import MarshmallowDataclassMixin from .rule_formatter import toml_write, nested_normalize @@ -270,6 +270,51 @@ class EQLRuleData(QueryRuleData): type: Literal["eql"] language: Literal["eql"] + @staticmethod + def convert_time_span(span: str) -> int: + """Convert time span in datemath to value in milliseconds.""" + amount = int("".join(char for char in span if char.isdigit())) + unit = eql.ast.TimeUnit("".join(char for char in span if char.isalpha())) + return eql.ast.TimeRange(amount, unit).as_milliseconds() + + def convert_relative_delta(self, lookback: str) -> int: + now = len("now") + min_length = now + len('+5m') + + if lookback.startswith("now") and len(lookback) >= min_length: + lookback = lookback[len("now"):] + sign = lookback[0] # + or - + span = lookback[1:] + amount = self.convert_time_span(span) + return amount * (-1 if sign == "-" else 1) + else: + return self.convert_time_span(lookback) + + @cached_property + def max_span(self) -> Optional[int]: + """Maxspan value for sequence rules if defined.""" + if eql.utils.get_query_type(self.ast) == 'sequence' and hasattr(self.ast.first, 'max_span'): + return self.ast.first.max_span.as_milliseconds() if self.ast.first.max_span else None + + @cached_property + def look_back(self) -> Optional[Union[int, Literal['unknown']]]: + """Lookback value of a rule.""" + # https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#date-math + to = self.convert_relative_delta(self.to) if self.to else 0 + from_ = self.convert_relative_delta(self.from_ or "now-6m") + + if not (to or from_): + return 'unknown' + else: + return to - from_ + + @cached_property + def interval_ratio(self) -> Optional[float]: + """Ratio of interval time window / max_span time window.""" + if self.max_span: + interval = self.convert_time_span(self.interval or '5m') + return interval / self.max_span + @dataclass(frozen=True) class ThreatMatchRuleData(QueryRuleData): diff --git a/tests/test_all_rules.py b/tests/test_all_rules.py index 1fbee4962..b1f9744ea 100644 --- a/tests/test_all_rules.py +++ b/tests/test_all_rules.py @@ -6,6 +6,7 @@ """Test that all rules have valid metadata and syntax.""" import os import re +import warnings from collections import defaultdict from pathlib import Path @@ -457,6 +458,55 @@ class TestRuleTiming(BaseRuleTest): err_msg = f'The following rules should have a longer `from` defined, due to indexes used\n {rules_str}' self.fail(err_msg) + def test_eql_lookback(self): + """Ensure EQL rules lookback => max_span, when defined.""" + unknowns = [] + invalids = [] + ten_minutes = 10 * 60 * 1000 + + for rule in self.all_rules: + if rule.contents.data.type == 'eql' and rule.contents.data.max_span: + if rule.contents.data.look_back == 'unknown': + unknowns.append(self.rule_str(rule, trailer=None)) + else: + look_back = rule.contents.data.look_back + max_span = rule.contents.data.max_span + expected = look_back + ten_minutes + + if expected < max_span: + invalids.append(f'{self.rule_str(rule)} lookback: {look_back}, maxspan: {max_span}, ' + f'expected: >={expected}') + + if unknowns: + warn_str = '\n'.join(unknowns) + warnings.warn(f'Unable to determine lookbacks for the following rules:\n{warn_str}') + + if invalids: + invalids_str = '\n'.join(invalids) + self.fail(f'The following rules have longer max_spans than lookbacks:\n{invalids_str}') + + def test_eql_interval_to_maxspan(self): + """Check the ratio of interval to maxspan for eql rules.""" + invalids = [] + five_minutes = 5 * 60 * 1000 + + for rule in self.all_rules: + if rule.contents.data.type == 'eql': + interval = rule.contents.data.interval or five_minutes + maxspan = rule.contents.data.max_span + ratio = rule.contents.data.interval_ratio + + # we want to test for at least a ratio of: interval >= 1/2 maxspan + # but we only want to make an exception and cap the ratio at 5m interval (2.5m maxspan) + if maxspan and maxspan > (five_minutes / 2) and ratio and ratio < .5: + expected = maxspan // 2 + err_msg = f'{self.rule_str(rule)} interval: {interval}, maxspan: {maxspan}, expected: >={expected}' + invalids.append(err_msg) + + if invalids: + invalids_str = '\n'.join(invalids) + self.fail(f'The following rules have intervals too short for their given max_spans (ms):\n{invalids_str}') + class TestLicense(BaseRuleTest): """Test rule license."""