[FR] [DAC] Add Arbitrary File location Support for Local Creation Date (#4915)

* Add support for local file contents

* Update Rule Params

* Update CLI docs

* Update to Pathlib

* Format updating

* Delete duplicate

* Update logic to handle just local_contents path

* Update to Glob Based Approach

* Updated to use RawRuleCollection

* Fix Logging Typo

* New utils functions no longer needed

* Update naming for convention
This commit is contained in:
Eric Forte
2025-07-31 14:35:00 -04:00
committed by GitHub
parent bf3071d3d1
commit a9ad66935c
5 changed files with 53 additions and 13 deletions
+2
View File
@@ -106,6 +106,7 @@ Options:
-snv, --strip-none-values Strip None values from the rule
-lc, --local-creation-date Preserve the local creation date of the rule
-lu, --local-updated-date Preserve the local updated date of the rule
-lr, --load-rule-loading Enable arbitrary rule loading from the rules directories (Can be very slow!)
-h, --help Show this message and exit.
```
@@ -507,6 +508,7 @@ Options:
-lu, --local-updated-date Preserve the local updated date of the rule
-cro, --custom-rules-only Only export custom rules
-eq, --export-query TEXT Apply a query filter to exporting rules e.g. "alert.attributes.tags: \"test\"" to filter for rules that have the tag "test"
-lr, --load-rule-loading Enable arbitrary rule loading from the rules directories (Can be very slow!)
-h, --help Show this message and exit.
```
+25 -3
View File
@@ -8,7 +8,7 @@
import re
import sys
from pathlib import Path
from typing import Any
from typing import Any, cast
import click
import kql # type: ignore[reportMissingTypeStubs]
@@ -27,7 +27,8 @@ from .generic_loader import GenericCollection, GenericCollectionTypes
from .main import root
from .misc import add_params, get_kibana_client, kibana_options, nested_set, raise_client_error
from .rule import TOMLRule, TOMLRuleContents, downgrade_contents_from_rule
from .rule_loader import RuleCollection, update_metadata_from_file
from .rule_loader import RawRuleCollection, RuleCollection, update_metadata_from_file
from .schemas import definitions # noqa: TC001
from .utils import format_command_options, rulename_to_filename
RULES_CONFIG = parse_rules_config()
@@ -250,6 +251,12 @@ def kibana_import_rules( # noqa: PLR0915
'"alert.attributes.tags: \\"test\\"" to filter for rules that have the tag "test"'
),
)
@click.option(
"--load-rule-loading",
"-lr",
is_flag=True,
help="Enable arbitrary rule loading from the rules directories (Can be very slow!)",
)
@click.pass_context
def kibana_export_rules( # noqa: PLR0912, PLR0913, PLR0915
ctx: click.Context,
@@ -268,15 +275,20 @@ def kibana_export_rules( # noqa: PLR0912, PLR0913, PLR0915
local_updated_date: bool = False,
custom_rules_only: bool = False,
export_query: str | None = None,
load_rule_loading: bool = False,
) -> list[TOMLRule]:
"""Export custom rules from Kibana."""
kibana = ctx.obj["kibana"]
kibana_include_details = export_exceptions or export_action_connectors
kibana_include_details = export_exceptions or export_action_connectors or custom_rules_only or export_query
# Only allow one of rule_id or rule_name
if rule_name and rule_id:
raise click.UsageError("Cannot use --rule-id and --rule-name together. Please choose one.")
raw_rule_collection = RawRuleCollection()
if load_rule_loading:
raw_rule_collection = raw_rule_collection.default()
with kibana:
# Look up rule IDs by name if --rule-name was provided
if rule_name:
@@ -365,6 +377,16 @@ def kibana_export_rules( # noqa: PLR0912, PLR0913, PLR0915
rule_name = rulename_to_filename(rule_resource.get("name"), tactic_name=tactic_name) # type: ignore[reportUnknownMemberType]
save_path = directory / f"{rule_name}"
# Get local rule data if load_rule_loading is enabled. If not enabled rules variable will be None.
local_rule: dict[str, Any] = params.get("rule", {})
input_rule_id: str | None = None
if local_rule:
input_rule_id = cast("definitions.UUIDString", local_rule.get("rule_id"))
if input_rule_id and input_rule_id in raw_rule_collection.id_map:
save_path = raw_rule_collection.id_map[input_rule_id].path or save_path
params.update(
update_metadata_from_file(
save_path, {"creation_date": local_creation_date, "updated_date": local_updated_date}
+19 -3
View File
@@ -33,7 +33,7 @@ from .generic_loader import GenericCollection
from .misc import add_client, nested_set, parse_user_config, raise_client_error
from .rule import DeprecatedRule, QueryRuleData, TOMLRule, TOMLRuleContents
from .rule_formatter import toml_write
from .rule_loader import RuleCollection, update_metadata_from_file
from .rule_loader import RawRuleCollection, RuleCollection, update_metadata_from_file
from .schemas import all_versions, definitions, get_incompatible_fields, get_schema_file
from .utils import (
Ndjson,
@@ -157,6 +157,12 @@ def generate_rules_index(
@click.option("--strip-none-values", "-snv", is_flag=True, help="Strip None values from the rule")
@click.option("--local-creation-date", "-lc", is_flag=True, help="Preserve the local creation date of the rule")
@click.option("--local-updated-date", "-lu", is_flag=True, help="Preserve the local updated date of the rule")
@click.option(
"--load-rule-loading",
"-lr",
is_flag=True,
help="Enable arbitrary rule loading from the rules directories (Can be very slow!)",
)
def import_rules_into_repo( # noqa: PLR0912, PLR0913, PLR0915
input_file: tuple[Path, ...] | None,
required_only: bool,
@@ -171,6 +177,7 @@ def import_rules_into_repo( # noqa: PLR0912, PLR0913, PLR0915
strip_none_values: bool,
local_creation_date: bool,
local_updated_date: bool,
load_rule_loading: bool,
) -> None:
"""Import rules from json, toml, or yaml files containing Kibana exported rule(s)."""
errors: list[str] = []
@@ -189,6 +196,10 @@ def import_rules_into_repo( # noqa: PLR0912, PLR0913, PLR0915
if not file_contents:
click.echo("Must specify at least one file!")
raw_rule_collection = RawRuleCollection()
if load_rule_loading:
raw_rule_collection = raw_rule_collection.default()
exceptions_containers = {}
exceptions_items = {}
@@ -210,7 +221,12 @@ def import_rules_into_repo( # noqa: PLR0912, PLR0913, PLR0915
base_path = rulename_to_filename(base_path) if base_path else base_path
if base_path is None:
raise ValueError(f"Invalid rule file, please ensure the rule has a name field: {contents}")
rule_path = Path(os.path.join(str(save_directory) if save_directory else RULES_DIRS[0], base_path)) # noqa: PTH118
rule_base_path = Path(save_directory or RULES_DIRS[0])
rule_path = rule_base_path / base_path
rule_id = contents.get("rule_id")
if rule_id in raw_rule_collection.id_map:
rule_path = raw_rule_collection.id_map[rule_id].path or rule_path
# handle both rule json formats loaded from kibana and toml
data_view_id = contents.get("data_view_id") or contents.get("rule", {}).get("data_view_id")
@@ -226,7 +242,7 @@ def import_rules_into_repo( # noqa: PLR0912, PLR0913, PLR0915
contents.update(
update_metadata_from_file(
Path(rule_path), {"creation_date": local_creation_date, "updated_date": local_updated_date}
rule_path, {"creation_date": local_creation_date, "updated_date": local_updated_date}
)
)
+6 -6
View File
@@ -82,18 +82,18 @@ class DictRule:
@property
def data(self) -> dict[str, Any]:
"""Rule portion of TOML file rule."""
return self.contents.get("data") or self.contents
"""Rule portion of TOML file rule. Supports nested and flattened rule dictionaries"""
return self.contents.get("data", {}) or self.contents or self.contents.get("rule", {})
@property
def id(self) -> str:
"""Get the rule ID."""
return self.data["rule_id"] # type: ignore[reportUnknownMemberType]
"""Get the rule ID. Supports nested and flattened rule dictionaries."""
return self.data.get("rule_id") or self.data.get("rule", {}).get("rule_id")
@property
def name(self) -> str:
"""Get the rule name."""
return self.data["name"] # type: ignore[reportUnknownMemberType]
"""Get the rule name. Supports nested and flattened rule dictionaries"""
return self.data.get("name") or self.data.get("rule", {}).get("name")
def __hash__(self) -> int:
"""Get the hash of the rule."""
+1 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "detection_rules"
version = "1.3.17"
version = "1.3.18"
description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Securitys Detection Engine."
readme = "README.md"
requires-python = ">=3.12"