From 033c82858cde5c0b8822efc07dc1dafcd59da622 Mon Sep 17 00:00:00 2001 From: Eric Forte <119343520+eric-forte-elastic@users.noreply.github.com> Date: Wed, 16 Apr 2025 15:41:09 -0400 Subject: [PATCH] [FR] Add Support for Local Dates Flag (#4582) * Add support for local dates flag * Use two variables * Add support for import-rules-to-repo * Revert arg formatting * Update comment * Pass Rule Path as Path Object * Update to rule loader function * Streamline metadata function * Also support dictionaries * Bump patch version * Reduce complexity * Add if path exists check * Fix version bump --- detection_rules/cli_utils.py | 15 ++++----------- detection_rules/kbwrap.py | 33 +++++++++++++++++++-------------- detection_rules/main.py | 13 +++++++++++-- detection_rules/rule_loader.py | 17 ++++++++++++++++- pyproject.toml | 2 +- 5 files changed, 51 insertions(+), 29 deletions(-) diff --git a/detection_rules/cli_utils.py b/detection_rules/cli_utils.py index 96aa258c0..eb19579fb 100644 --- a/detection_rules/cli_utils.py +++ b/detection_rules/cli_utils.py @@ -210,18 +210,11 @@ def rule_prompt(path=None, rule_type=None, required_only=True, save=True, verbos # DEFAULT_PREBUILT_RULES_DIRS[0] is a required directory just as a suggestion suggested_path = Path(DEFAULT_PREBUILT_RULES_DIRS[0]) / contents['name'] path = Path(path or input(f'File path for rule [{suggested_path}]: ') or suggested_path).resolve() - # Inherit maturity from the rule already exists - maturity = "development" - if path.exists(): - rules = RuleCollection() - rules.load_file(path) - if rules: - maturity = rules.rules[0].contents.metadata.maturity - + # Inherit maturity and optionally local dates from the rule if it already exists meta = { - "creation_date": creation_date, - "updated_date": creation_date, - "maturity": maturity, + "creation_date": kwargs.get("creation_date") or creation_date, + "updated_date": kwargs.get("updated_date") or creation_date, + "maturity": "development" or kwargs.get("maturity"), } try: diff --git a/detection_rules/kbwrap.py b/detection_rules/kbwrap.py index 27bcd2e79..500b445f0 100644 --- a/detection_rules/kbwrap.py +++ b/detection_rules/kbwrap.py @@ -24,7 +24,7 @@ from .generic_loader import GenericCollection from .main import root from .misc import add_params, client_error, kibana_options, get_kibana_client, nested_set from .rule import downgrade_contents_from_rule, TOMLRuleContents, TOMLRule -from .rule_loader import RuleCollection +from .rule_loader import RuleCollection, update_metadata_from_file from .utils import format_command_options, rulename_to_filename RULES_CONFIG = parse_rules_config() @@ -199,12 +199,14 @@ def kibana_import_rules(ctx: click.Context, rules: RuleCollection, overwrite: Op @click.option("--export-exceptions", "-e", is_flag=True, help="Include exceptions in export") @click.option("--skip-errors", "-s", is_flag=True, help="Skip errors when exporting rules") @click.option("--strip-version", "-sv", is_flag=True, help="Strip the version fields from all rules") +@click.option("--local-creation-date", "-lc", is_flag=True, help="Preserve the local creation date of the rule") +@click.option("--local-updated-date", "-lu", is_flag=True, help="Preserve the local updated date of the rule") @click.pass_context def kibana_export_rules(ctx: click.Context, directory: Path, action_connectors_directory: Optional[Path], exceptions_directory: Optional[Path], default_author: str, rule_id: Optional[Iterable[str]] = None, export_action_connectors: bool = False, - export_exceptions: bool = False, skip_errors: bool = False, strip_version: bool = False - ) -> List[TOMLRule]: + export_exceptions: bool = False, skip_errors: bool = False, strip_version: bool = False, + local_creation_date: bool = False, local_updated_date: bool = False) -> List[TOMLRule]: """Export custom rules from Kibana.""" kibana = ctx.obj["kibana"] kibana_include_details = export_exceptions or export_action_connectors @@ -232,6 +234,8 @@ def kibana_export_rules(ctx: click.Context, directory: Path, action_connectors_d return [] rules_results = results + action_connector_results = [] + exception_results = [] if kibana_include_details: # Assign counts to variables rules_count = results[-1]["exported_rules_count"] @@ -259,22 +263,23 @@ def kibana_export_rules(ctx: click.Context, directory: Path, action_connectors_d rule_resource["author"] = rule_resource.get("author") or default_author or [rule_resource.get("created_by")] if isinstance(rule_resource["author"], str): rule_resource["author"] = [rule_resource["author"]] - # Inherit maturity from the rule already exists - maturity = "development" + # Inherit maturity and optionally local dates from the rule if it already exists + params = { + "rule": rule_resource, + "maturity": "development", + } threat = rule_resource.get("threat") first_tactic = threat[0].get("tactic").get("name") if threat else "" rule_name = rulename_to_filename(rule_resource.get("name"), tactic_name=first_tactic) - # check if directory / f"{rule_name}" exists - if (directory / f"{rule_name}").exists(): - rules = RuleCollection() - rules.load_file(directory / f"{rule_name}") - if rules: - maturity = rules.rules[0].contents.metadata.maturity - contents = TOMLRuleContents.from_rule_resource( - rule_resource, maturity=maturity + save_path = directory / f"{rule_name}" + params.update( + update_metadata_from_file( + save_path, {"creation_date": local_creation_date, "updated_date": local_updated_date} + ) ) - rule = TOMLRule(contents=contents, path=directory / f"{rule_name}") + contents = TOMLRuleContents.from_rule_resource(**params) + rule = TOMLRule(contents=contents, path=save_path) except Exception as e: if skip_errors: print(f'- skipping {rule_resource.get("name")} - {type(e).__name__}') diff --git a/detection_rules/main.py b/detection_rules/main.py index b07e4c776..1e1dcfcac 100644 --- a/detection_rules/main.py +++ b/detection_rules/main.py @@ -32,7 +32,7 @@ from .misc import ( ) from .rule import TOMLRule, TOMLRuleContents, QueryRuleData from .rule_formatter import toml_write -from .rule_loader import RuleCollection +from .rule_loader import RuleCollection, update_metadata_from_file from .schemas import all_versions, definitions, get_incompatible_fields, get_schema_file from .utils import Ndjson, get_path, get_etc_path, clear_caches, load_dump, load_rule_contents, rulename_to_filename @@ -128,10 +128,13 @@ def generate_rules_index(ctx: click.Context, query, overwrite, save_files=True): @click.option("--skip-errors", "-ske", is_flag=True, help="Skip rule import errors") @click.option("--default-author", "-da", type=str, required=False, help="Default author for rules missing one") @click.option("--strip-none-values", "-snv", is_flag=True, help="Strip None values from the rule") +@click.option("--local-creation-date", "-lc", is_flag=True, help="Preserve the local creation date of the rule") +@click.option("--local-updated-date", "-lu", is_flag=True, help="Preserve the local updated date of the rule") def import_rules_into_repo(input_file: click.Path, required_only: bool, action_connector_import: bool, exceptions_import: bool, directory: click.Path, save_directory: click.Path, action_connectors_directory: click.Path, exceptions_directory: click.Path, - skip_errors: bool, default_author: str, strip_none_values: bool): + skip_errors: bool, default_author: str, strip_none_values: bool, local_creation_date: bool, + local_updated_date: bool): """Import rules from json, toml, or yaml files containing Kibana exported rule(s).""" errors = [] rule_files = glob.glob(os.path.join(directory, "**", "*.*"), recursive=True) if directory else [] @@ -179,6 +182,12 @@ def import_rules_into_repo(input_file: click.Path, required_only: bool, action_c if isinstance(contents["author"], str): contents["author"] = [contents["author"]] + contents.update( + update_metadata_from_file( + Path(rule_path), {"creation_date": local_creation_date, "updated_date": local_updated_date} + ) + ) + output = rule_prompt( rule_path, required_only=required_only, diff --git a/detection_rules/rule_loader.py b/detection_rules/rule_loader.py index b04d00b79..a56253686 100644 --- a/detection_rules/rule_loader.py +++ b/detection_rules/rule_loader.py @@ -18,7 +18,8 @@ from marshmallow.exceptions import ValidationError from . import utils from .config import parse_rules_config from .rule import ( - DeprecatedRule, DeprecatedRuleContents, DictRule, TOMLRule, TOMLRuleContents + DeprecatedRule, DeprecatedRuleContents, DictRule, TOMLRule, + TOMLRuleContents ) from .schemas import definitions from .utils import cached, get_path @@ -116,6 +117,20 @@ def load_locks_from_tag(remote: str, tag: str, version_lock: str = 'detection_ru return commit_hash, version, deprecated +def update_metadata_from_file(rule_path: Path, fields_to_update: dict) -> dict: + """Update metadata fields for a rule with local contents.""" + contents = {} + if not rule_path.exists(): + return contents + local_metadata = RuleCollection().load_file(rule_path).contents.metadata.to_dict() + if local_metadata: + contents["maturity"] = local_metadata.get("maturity", "development") + for field_name, should_update in fields_to_update.items(): + if should_update and field_name in local_metadata: + contents[field_name] = local_metadata[field_name] + return contents + + @dataclass class BaseCollection: """Base class for collections.""" diff --git a/pyproject.toml b/pyproject.toml index 1f8c928ef..03ac2c7c1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "detection_rules" -version = "1.0.9" +version = "1.0.10" description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine." readme = "README.md" requires-python = ">=3.12"