[FR] Add support for building block rules (BBR) (#2822)

* added test bbr

* initial implementation

* Added Unit test and exempted bbr from integrations

* fixed linting

* Add schema validation to building block rules

* add separate error messages

* fixed linting

* Add testing bbr validation

* fixed linting

* Add default values

* fixed linting

* added defaults

* fixed linting

* cleaned up test rule

* removed .gitkeep

* read .gitkeep

* Switch to using validates_schema

* addressing some linting

* fixed linting

* Update detection_rules/schemas/definitions.py

Co-authored-by: Justin Ibarra <16747370+brokensound77@users.noreply.github.com>

* add env variable check

* fix skip function

* updated name

* Update detection_rules/schemas/definitions.py

Co-authored-by: Justin Ibarra <16747370+brokensound77@users.noreply.github.com>

* Add bbr validation unit test

* Clean up comments

* fix linting

* Move convert time to utils

* Moved to rules_building_block

* Add check for only bbr in bbr dir

* fix linting

* additional linting fix

* Changed to bbr rule loader

* fixed bbr default

* Updated error messages and README

* fixed more linting

* Updating root level README

* Fixed convert_time_span calls

* fixed typo in unit test logic and updated txt

* fixed error message

* updated comment for clarity

* Update detection_rules/rule.py

Co-authored-by: Justin Ibarra <16747370+brokensound77@users.noreply.github.com>

* Update detection_rules/rule.py

Co-authored-by: Justin Ibarra <16747370+brokensound77@users.noreply.github.com>

* Updated validation methods for clarity

* fix doctring location

* Fixed typo

* updated error messages.

* removed excess whitespace

* Add per rule bypass

* Add single rule bypass

* Split unit tests

* Update detection_rules/rule.py

Co-authored-by: Mika Ayenson <Mikaayenson@users.noreply.github.com>

* Update detection_rules/rule.py

Co-authored-by: Mika Ayenson <Mikaayenson@users.noreply.github.com>

---------

Co-authored-by: Justin Ibarra <16747370+brokensound77@users.noreply.github.com>
Co-authored-by: Mika Ayenson <Mikaayenson@users.noreply.github.com>
This commit is contained in:
eric-forte-elastic
2023-06-20 13:00:30 +00:00
committed by GitHub
parent dc05f1d8f3
commit 6449cecd08
9 changed files with 165 additions and 23 deletions
+3
View File
@@ -36,6 +36,9 @@ EX: `DR_USER=joe`
Using the environment variable `DR_BYPASS_NOTE_VALIDATION_AND_PARSE` will bypass the Detection Rules validation on the `note` field in toml files.
Using the environment variable `DR_BYPASS_BBR_LOOKBACK_VALIDATION` will bypass the Detection Rules lookback and interval validation
on the building block rules.
## Importing rules into the repo
+10 -9
View File
@@ -24,15 +24,16 @@ This repository was first announced on Elastic's blog post, [Elastic Security op
Detection Rules contains more than just static rule files. This repository also contains code for unit testing in Python and integrating with the Detection Engine in Kibana.
| folder | description |
|-------------------------------------- |------------------------------------------------------------------------------------ |
| [`detection_rules/`](detection_rules) | Python module for rule parsing, validating and packaging |
| [`detection_rules/etc/`](etc) | Miscellaneous files, such as ECS and Beats schemas |
| [`kibana/`](kibana) | Python library for handling the API calls to Kibana and the Detection Engine |
| [`kql/`](kql) | Python library for parsing and validating Kibana Query Language |
| [`rta/`](rta) | Red Team Automation code used to emulate attacker techniques, used for rule testing |
| [`rules/`](rules) | Root directory where rules are stored |
| [`tests/`](tests) | Python code for unit testing rules |
| folder | description |
|------------------------------------------------ |------------------------------------------------------------------------------------ |
| [`detection_rules/`](detection_rules) | Python module for rule parsing, validating and packaging |
| [`detection_rules/etc/`](etc) | Miscellaneous files, such as ECS and Beats schemas |
| [`kibana/`](kibana) | Python library for handling the API calls to Kibana and the Detection Engine |
| [`kql/`](kql) | Python library for parsing and validating Kibana Query Language |
| [`rta/`](rta) | Red Team Automation code used to emulate attacker techniques, used for rule testing |
| [`rules/`](rules) | Root directory where rules are stored |
| [`rules_building_block/`](rules_building_block) | Root directory where building block rules are stored |
| [`tests/`](tests) | Python code for unit testing rules |
## Getting started
+65 -12
View File
@@ -33,7 +33,7 @@ from .rule_formatter import nested_normalize, toml_write
from .schemas import (SCHEMA_DIR, definitions, downgrade,
get_min_supported_stack_version, get_stack_schemas)
from .schemas.stack_compat import get_restricted_fields
from .utils import cached, PatchedTemplate
from .utils import cached, convert_time_span, PatchedTemplate
_META_SCHEMA_REQ_DEFAULTS = {}
MIN_FLEET_PACKAGE_VERSION = '7.13.0'
@@ -53,6 +53,7 @@ class RuleMeta(MarshmallowDataclassMixin):
deprecation_date: Optional[definitions.Date]
# Optional fields
bypass_bbr_timing: Optional[bool]
comments: Optional[str]
integration: Optional[Union[str, List[str]]]
maturity: Optional[definitions.Maturity]
@@ -247,7 +248,7 @@ class BaseRuleData(MarshmallowDataclassMixin, StackCompatMixin):
actions: Optional[list]
alert_suppression: Optional[AlertSuppressionMapping] = field(metadata=dict(metadata=dict(min_compat="8.6")))
author: List[str]
building_block_type: Optional[str]
building_block_type: Optional[definitions.BuildingBlockType]
description: str
enabled: Optional[bool]
exceptions_list: Optional[list]
@@ -369,12 +370,18 @@ class DataValidator:
name: definitions.RuleName,
is_elastic_rule: bool,
note: Optional[definitions.Markdown] = None,
interval: Optional[definitions.Interval] = None,
building_block_type: Optional[definitions.BuildingBlockType] = None,
setup: Optional[str] = None,
**extras):
# only define fields needing additional validation
self.name = name
self.is_elastic_rule = is_elastic_rule
self.note = note
# Need to use extras because from is a reserved word in python
self.from_ = extras.get('from')
self.interval = interval
self.building_block_type = building_block_type
self.setup = setup
self._setup_in_note = False
@@ -395,6 +402,58 @@ class DataValidator:
def skip_validate_note(self) -> bool:
return os.environ.get('DR_BYPASS_NOTE_VALIDATION_AND_PARSE') is not None
@cached_property
def skip_validate_bbr(self) -> bool:
return os.environ.get('DR_BYPASS_BBR_LOOKBACK_VALIDATION') is not None
def validate_bbr(self, bypass: bool = False):
"""Validate building block type and rule type."""
if self.skip_validate_bbr or bypass:
return
def validate_lookback(str_time: str) -> bool:
"""Validate that the time is at least now-119m and at least 60m respectively."""
try:
if "now-" in str_time:
str_time = str_time[4:]
time = convert_time_span(str_time)
# if from time is less than 119m as milliseconds
if time < 119 * 60 * 1000:
return False
else:
return False
except Exception as e:
raise ValidationError(f"Invalid time format: {e}")
return True
def validate_interval(str_time: str) -> bool:
"""Validate that the time is at least now-119m and at least 60m respectively."""
try:
time = convert_time_span(str_time)
# if interval time is less than 60m as milliseconds
if time < 60 * 60 * 1000:
return False
except Exception as e:
raise ValidationError(f"Invalid time format: {e}")
return True
bypass_instructions = "To bypass, use the environment variable `DR_BYPASS_BBR_LOOKBACK_VALIDATION`"
if self.building_block_type:
if not self.from_ or not self.interval:
raise ValidationError(
f"{self.name} is invalid."
"BBR require `from` and `interval` to be defined. "
"Please set or bypass." + bypass_instructions
)
elif not validate_lookback(self.from_) or not validate_interval(self.interval):
raise ValidationError(
f"{self.name} is invalid."
"Default BBR require `from` and `interval` to be at least now-119m and at least 60m respectively "
"(using the now-Xm and Xm format where x is in minuets). "
"Please update values or bypass. " + bypass_instructions
)
def validate_note(self):
if self.skip_validate_note or not self.note:
return
@@ -640,13 +699,6 @@ class EQLRuleData(QueryRuleData):
type: Literal["eql"]
language: Literal["eql"]
@staticmethod
def convert_time_span(span: str) -> int:
"""Convert time span in datemath to value in milliseconds."""
amount = int("".join(char for char in span if char.isdigit()))
unit = eql.ast.TimeUnit("".join(char for char in span if char.isalpha()))
return eql.ast.TimeRange(amount, unit).as_milliseconds()
def convert_relative_delta(self, lookback: str) -> int:
now = len("now")
min_length = now + len('+5m')
@@ -655,10 +707,10 @@ class EQLRuleData(QueryRuleData):
lookback = lookback[len("now"):]
sign = lookback[0] # + or -
span = lookback[1:]
amount = self.convert_time_span(span)
amount = convert_time_span(span)
return amount * (-1 if sign == "-" else 1)
else:
return self.convert_time_span(lookback)
return convert_time_span(lookback)
@cached_property
def is_sequence(self) -> bool:
@@ -687,7 +739,7 @@ class EQLRuleData(QueryRuleData):
def interval_ratio(self) -> Optional[float]:
"""Ratio of interval time window / max_span time window."""
if self.max_span:
interval = self.convert_time_span(self.interval or '5m')
interval = convert_time_span(self.interval or '5m')
return interval / self.max_span
@@ -1088,6 +1140,7 @@ class TOMLRuleContents(BaseRuleContents, MarshmallowDataclassMixin):
data.validate_query(metadata)
data.data_validator.validate_note()
data.data_validator.validate_bbr(metadata.get('bypass_bbr_timing'))
data.validate(metadata) if hasattr(data, 'validate') else False
def to_dict(self, strip_none_values=True) -> dict:
+13
View File
@@ -23,6 +23,7 @@ from .schemas import definitions
from .utils import cached, get_path
DEFAULT_RULES_DIR = Path(get_path("rules"))
DEFAULT_BBR_DIR = Path(get_path("rules_building_block"))
DEFAULT_DEPRECATED_DIR = DEFAULT_RULES_DIR / '_deprecated'
RTA_DIR = get_path("rta")
FILE_PATTERN = r'^([a-z0-9_])+\.(json|toml)$'
@@ -156,6 +157,7 @@ class RuleCollection(BaseCollection):
"""Collection of rule objects."""
__default = None
__default_bbr = None
def __init__(self, rules: Optional[List[TOMLRule]] = None):
from .version_lock import VersionLock
@@ -352,6 +354,17 @@ class RuleCollection(BaseCollection):
return cls.__default
@classmethod
def default_bbr(cls) -> 'RuleCollection':
"""Return the default BBR collection, which retrieves from building_block_rules/."""
if cls.__default_bbr is None:
collection = RuleCollection()
collection.load_directory(DEFAULT_BBR_DIR)
collection.freeze()
cls.__default_bbr = collection
return cls.__default_bbr
def compare_collections(self, other: 'RuleCollection'
) -> (Dict[str, TOMLRule], Dict[str, TOMLRule], Dict[str, DeprecatedRule]):
"""Get the changes between two sets of rules."""
+1 -1
View File
@@ -85,7 +85,7 @@ ThresholdValue = NewType("ThresholdValue", int, validate=validate.Range(min=1))
TimelineTemplateId = NewType('TimelineTemplateId', str, validate=validate.OneOf(list(TIMELINE_TEMPLATES)))
TimelineTemplateTitle = NewType('TimelineTemplateTitle', str, validate=validate.OneOf(TIMELINE_TEMPLATES.values()))
UUIDString = NewType('UUIDString', str, validate=validate.Regexp(UUID_PATTERN))
BuildingBlockType = Literal['default']
# experimental machine learning features and releases
MachineLearningType = Literal['DGA', 'ProblemChild']
+7
View File
@@ -234,6 +234,13 @@ def combine_sources(*sources): # type: (list[list]) -> list
return event_sort(combined)
def convert_time_span(span: str) -> int:
"""Convert time span in Date Math to value in milliseconds."""
amount = int("".join(char for char in span if char.isdigit()))
unit = eql.ast.TimeUnit("".join(char for char in span if char.isalpha()))
return eql.ast.TimeRange(amount, unit).as_milliseconds()
def evaluate(rule, events):
"""Evaluate a query against events."""
evaluator = kql.get_evaluator(kql.parse(rule.query))
View File
+7
View File
@@ -23,6 +23,11 @@ def default_rules() -> RuleCollection:
return RuleCollection.default()
@lru_cache
def default_bbr() -> RuleCollection:
return RuleCollection.default_bbr()
class BaseRuleTest(unittest.TestCase):
"""Base class for shared test cases which need to load rules"""
@@ -40,9 +45,11 @@ class BaseRuleTest(unittest.TestCase):
if not RULE_LOADER_FAIL:
try:
rc = default_rules()
rc_bbr = default_bbr()
cls.all_rules = rc.rules
cls.rule_lookup = rc.id_map
cls.production_rules = rc.filter(production_filter)
cls.bbr = rc_bbr.rules
cls.deprecated_rules: DeprecatedCollection = rc.deprecated
except Exception as e:
RULE_LOADER_FAIL = True
+59 -1
View File
@@ -10,6 +10,7 @@ import unittest
import uuid
import warnings
from collections import defaultdict
from marshmallow import ValidationError
from pathlib import Path
import eql.ast
@@ -96,6 +97,50 @@ class TestValidRules(BaseRuleTest):
"""Test that a rule type did not change for a locked version"""
default_version_lock.manage_versions(self.production_rules)
def test_bbr_validation(self):
base_fields = {
"author": ["Elastic"],
"description": "test description",
"index": ["filebeat-*", "logs-aws*"],
"language": "kuery",
"license": "Elastic License v2",
"name": "test rule",
"risk_score": 21,
"rule_id": str(uuid.uuid4()),
"severity": "low",
"type": "query",
"timestamp_override": "event.ingested"
}
def build_rule(query, bbr_type="default", from_field="now-120m", interval="60m"):
metadata = {
"creation_date": "1970/01/01",
"updated_date": "1970/01/01",
"min_stack_version": load_current_package_version(),
"integration": ["cloud_defend"]
}
data = base_fields.copy()
data["query"] = query
data["building_block_type"] = bbr_type
if from_field:
data["from"] = from_field
if interval:
data["interval"] = interval
obj = {"metadata": metadata, "rule": data}
return TOMLRuleContents.from_dict(obj)
query = """
event.dataset:aws.cloudtrail and event.outcome:success
"""
build_rule(query=query)
with self.assertRaises(ValidationError):
build_rule(query=query, bbr_type="invalid")
with self.assertRaises(ValidationError):
build_rule(query=query, from_field="now-10m", interval="10m")
class TestThreatMappings(BaseRuleTest):
"""Test threat mapping data for rules."""
@@ -370,6 +415,18 @@ class TestRuleFiles(BaseRuleTest):
rule_err_str = '\n'.join(bad_name_rules)
self.fail(f'{error_msg}:\n{rule_err_str}')
def test_bbr_in_correct_dir(self):
"""Ensure that BBR are in the correct directory."""
for rule in self.bbr:
self.assertEqual(rule.path.parent.name, 'rules_building_block',
f'{self.rule_str(rule)} should be in the rules_building_block directory')
def test_non_bbr_in_correct_dir(self):
"""Ensure that non-BBR are not in BBR directory."""
for rule in self.all_rules:
if rule.path.parent.name == 'rules_building_block':
self.assertIn(rule, self.bbr, f'{self.rule_str(rule)} should be in the rules_building_block directory')
class TestRuleMetadata(BaseRuleTest):
"""Test the metadata of rules."""
@@ -471,8 +528,9 @@ class TestRuleMetadata(BaseRuleTest):
failures.append(err_msg)
# checks if the rule path matches the intended integration
# excludes BBR rules
if rule_integration in valid_integration_folders:
if rule.path.parent.name not in rule_integrations:
if rule.path.parent.name not in rule_integrations and rule.path.parent.name != "bbr":
err_msg = f'{self.rule_str(rule)} {rule_integration} tag, path is {rule.path.parent.name}'
failures.append(err_msg)