From 57bf1546dd1f7575bf485088a535ffe82d82f177 Mon Sep 17 00:00:00 2001 From: Eric Forte <119343520+eric-forte-elastic@users.noreply.github.com> Date: Tue, 10 Mar 2026 13:03:52 -0400 Subject: [PATCH] [Bug] [DAC] Add filtering to export-rules-from-repo (#5769) * Add filtering to export-rules-from-repo --- detection_rules/generic_loader.py | 14 ++++++++++++++ detection_rules/kbwrap.py | 28 +++++++++++----------------- detection_rules/main.py | 31 +++++++++++++++++++++++-------- pyproject.toml | 2 +- 4 files changed, 49 insertions(+), 26 deletions(-) diff --git a/detection_rules/generic_loader.py b/detection_rules/generic_loader.py index d58e4f61f..b69379e7f 100644 --- a/detection_rules/generic_loader.py +++ b/detection_rules/generic_loader.py @@ -26,6 +26,12 @@ GenericCollectionTypes = TOMLAction | TOMLActionConnector | TOMLException GenericCollectionContentTypes = TOMLActionContents | TOMLActionConnectorContents | TOMLExceptionContents +def matches_rule_ids(item: GenericCollectionTypes, rule_ids: set[str]) -> bool: + """Check if the item is associated with any of the given rule IDs.""" + rule_ids_list = getattr(item.contents.metadata, "rule_ids", []) + getattr(item.contents.metadata, "rule_id", []) + return any(rule_id in rule_ids for rule_id in rule_ids_list) + + def metadata_filter(**metadata: Any) -> Callable[[GenericCollectionTypes], bool]: """Get a filter callback based off item metadata""" flt = dict_filter(metadata) @@ -77,6 +83,14 @@ class GenericCollection: return filtered_collection + def items_matching( + self, + contents_type: type[GenericCollectionContentTypes], + rule_ids: set[str], + ) -> list[GenericCollectionTypes]: + """Return items whose contents are of the given type and match any of the rule IDs.""" + return [d for d in self.items if isinstance(d.contents, contents_type) and matches_rule_ids(d, rule_ids)] + @staticmethod def deserialize_toml_string(contents: bytes | str) -> dict[str, Any]: """Deserialize a TOML string into a dictionary.""" diff --git a/detection_rules/kbwrap.py b/detection_rules/kbwrap.py index ed2a2e1c7..517a90a67 100644 --- a/detection_rules/kbwrap.py +++ b/detection_rules/kbwrap.py @@ -23,7 +23,7 @@ from .action_connector import ( from .cli_utils import multi_collection from .config import parse_rules_config from .exception import TOMLException, TOMLExceptionContents, build_exception_objects, parse_exceptions_results_from_api -from .generic_loader import GenericCollection, GenericCollectionTypes +from .generic_loader import GenericCollection from .main import root from .misc import add_params, get_kibana_client, kibana_options, nested_set, raise_client_error from .rule import TOMLRule, TOMLRuleContents, downgrade_contents_from_rule @@ -123,8 +123,8 @@ def kibana_import_rules( # noqa: PLR0915 workaround_errors: list[str] = [] workaround_error_types: set[str] = set() - flattened_exceptions = [e for sublist in exception_dicts for e in sublist] - all_exception_list_ids = {exception["list_id"] for exception in flattened_exceptions} + flattened_exceptions: list[dict[str, Any]] = [e for sublist in exception_dicts for e in sublist] + all_exception_list_ids: set[str] = {exception["list_id"] for exception in flattened_exceptions} click.echo(f"{len(response['errors'])} rule(s) failed to import!") @@ -160,10 +160,6 @@ def kibana_import_rules( # noqa: PLR0915 ) click.echo() - def _matches_rule_ids(item: GenericCollectionTypes, rule_ids: set[str]) -> bool: - """Check if the item matches any of the rule IDs in the provided set.""" - return any(rule_id in rule_ids for rule_id in item.contents.metadata.get("rule_ids", [])) - def _process_imported_items( imported_items_list: list[list[dict[str, Any]]], item_type_description: str, @@ -181,15 +177,13 @@ def kibana_import_rules( # noqa: PLR0915 rule_ids = {rule["rule_id"] for rule in rule_dicts} with kibana: cl = GenericCollection.default() - exception_dicts = [ - d.contents.to_api_format() - for d in cl.items - if isinstance(d.contents, TOMLExceptionContents) and _matches_rule_ids(d, rule_ids) + exception_dicts: list[list[dict[str, Any]]] = [ + d.contents.to_api_format() # type: ignore[reportAttributeAccessIssue, reportUnknownMemberType] + for d in cl.items_matching(TOMLExceptionContents, rule_ids) ] - action_connectors_dicts = [ - d.contents.to_api_format() - for d in cl.items - if isinstance(d.contents, TOMLActionConnectorContents) and _matches_rule_ids(d, rule_ids) + action_connectors_dicts: list[list[dict[str, Any]]] = [ + d.contents.to_api_format() # type: ignore[reportAttributeAccessIssue, reportUnknownMemberType] + for d in cl.items_matching(TOMLActionConnectorContents, rule_ids) ] response, successful_rule_ids, results = RuleResource.import_rules( # type: ignore[reportUnknownMemberType] rule_dicts, @@ -207,8 +201,8 @@ def kibana_import_rules( # noqa: PLR0915 if response["errors"]: _handle_response_errors(response) # type: ignore[reportUnknownArgumentType] else: - _process_imported_items(exception_dicts, "exception list(s)", "list_id") - _process_imported_items(action_connectors_dicts, "action connector(s)", "id") + _process_imported_items(exception_dicts, "exception list(s)", "list_id") # type: ignore[reportUnknownArgumentType] + _process_imported_items(action_connectors_dicts, "action connector(s)", "id") # type: ignore[reportUnknownArgumentType] return response, results # type: ignore[reportUnknownVariableType] diff --git a/detection_rules/main.py b/detection_rules/main.py index 4636fa018..95e541390 100644 --- a/detection_rules/main.py +++ b/detection_rules/main.py @@ -300,7 +300,12 @@ def import_rules_into_repo( # noqa: PLR0912, PLR0913, PLR0915 exception_id = exception["list_id"] if exception_id not in exception_list_rule_table: exception_list_rule_table[exception_id] = [] - exception_list_rule_table[exception_id].append({"id": contents["id"], "name": contents["name"]}) + exception_list_rule_table[exception_id].append( + { + "id": contents.get("rule_id"), + "name": contents["name"], + } + ) if contents.get("actions"): # If rule has actions with connectors, add them to the action_connector_rule_table under the action_id @@ -308,7 +313,12 @@ def import_rules_into_repo( # noqa: PLR0912, PLR0913, PLR0915 action_id = action["id"] if action_id not in action_connector_rule_table: action_connector_rule_table[action_id] = [] - action_connector_rule_table[action_id].append({"id": contents["id"], "name": contents["name"]}) + action_connector_rule_table[action_id].append( + { + "id": contents.get("rule_id"), + "name": contents["name"], + } + ) # Build TOMLException Objects if exceptions_import: @@ -541,16 +551,21 @@ def _export_rules( # noqa: PLR0913 # Add exceptions to api format here and add to output_lines if include_exceptions or include_action_connectors: cl = GenericCollection.default() - # Get exceptions in API format + rule_ids = {r.id for r in rules} + # Get exceptions in API format (only those linked to the exported rules) if include_exceptions: - exceptions = [d.contents.to_api_format() for d in cl.items if isinstance(d.contents, TOMLExceptionContents)] - exceptions = [e for sublist in exceptions for e in sublist] + exceptions_raw: list[list[dict[str, Any]]] = [ + d.contents.to_api_format() # type: ignore[reportAttributeAccessIssue, reportUnknownMemberType] + for d in cl.items_matching(TOMLExceptionContents, rule_ids) + ] + exceptions: list[dict[str, Any]] = [e for sublist in exceptions_raw for e in sublist] output_lines.extend(json.dumps(e, sort_keys=True) for e in exceptions) if include_action_connectors: - action_connectors = [ - d.contents.to_api_format() for d in cl.items if isinstance(d.contents, TOMLActionConnectorContents) + action_connectors: list[list[dict[str, Any]]] = [ + d.contents.to_api_format() # type: ignore[reportAttributeAccessIssue, reportUnknownMemberType] + for d in cl.items_matching(TOMLActionConnectorContents, rule_ids) ] - actions = [a for sublist in action_connectors for a in sublist] + actions: list[dict[str, Any]] = [a for sublist in action_connectors for a in sublist] output_lines.extend(json.dumps(a, sort_keys=True) for a in actions) _ = outfile.write_text("\n".join(output_lines) + "\n") diff --git a/pyproject.toml b/pyproject.toml index 29c3b0d76..355f6e2f9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "detection_rules" -version = "1.5.54" +version = "1.5.56" description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine." readme = "README.md" requires-python = ">=3.12"