[Bug] [DAC] Add filtering to export-rules-from-repo (#5769)

* Add filtering to export-rules-from-repo
This commit is contained in:
Eric Forte
2026-03-10 13:03:52 -04:00
committed by GitHub
parent 1d3dad243c
commit 57bf1546dd
4 changed files with 49 additions and 26 deletions
+14
View File
@@ -26,6 +26,12 @@ GenericCollectionTypes = TOMLAction | TOMLActionConnector | TOMLException
GenericCollectionContentTypes = TOMLActionContents | TOMLActionConnectorContents | TOMLExceptionContents
def matches_rule_ids(item: GenericCollectionTypes, rule_ids: set[str]) -> bool:
"""Check if the item is associated with any of the given rule IDs."""
rule_ids_list = getattr(item.contents.metadata, "rule_ids", []) + getattr(item.contents.metadata, "rule_id", [])
return any(rule_id in rule_ids for rule_id in rule_ids_list)
def metadata_filter(**metadata: Any) -> Callable[[GenericCollectionTypes], bool]:
"""Get a filter callback based off item metadata"""
flt = dict_filter(metadata)
@@ -77,6 +83,14 @@ class GenericCollection:
return filtered_collection
def items_matching(
self,
contents_type: type[GenericCollectionContentTypes],
rule_ids: set[str],
) -> list[GenericCollectionTypes]:
"""Return items whose contents are of the given type and match any of the rule IDs."""
return [d for d in self.items if isinstance(d.contents, contents_type) and matches_rule_ids(d, rule_ids)]
@staticmethod
def deserialize_toml_string(contents: bytes | str) -> dict[str, Any]:
"""Deserialize a TOML string into a dictionary."""
+11 -17
View File
@@ -23,7 +23,7 @@ from .action_connector import (
from .cli_utils import multi_collection
from .config import parse_rules_config
from .exception import TOMLException, TOMLExceptionContents, build_exception_objects, parse_exceptions_results_from_api
from .generic_loader import GenericCollection, GenericCollectionTypes
from .generic_loader import GenericCollection
from .main import root
from .misc import add_params, get_kibana_client, kibana_options, nested_set, raise_client_error
from .rule import TOMLRule, TOMLRuleContents, downgrade_contents_from_rule
@@ -123,8 +123,8 @@ def kibana_import_rules( # noqa: PLR0915
workaround_errors: list[str] = []
workaround_error_types: set[str] = set()
flattened_exceptions = [e for sublist in exception_dicts for e in sublist]
all_exception_list_ids = {exception["list_id"] for exception in flattened_exceptions}
flattened_exceptions: list[dict[str, Any]] = [e for sublist in exception_dicts for e in sublist]
all_exception_list_ids: set[str] = {exception["list_id"] for exception in flattened_exceptions}
click.echo(f"{len(response['errors'])} rule(s) failed to import!")
@@ -160,10 +160,6 @@ def kibana_import_rules( # noqa: PLR0915
)
click.echo()
def _matches_rule_ids(item: GenericCollectionTypes, rule_ids: set[str]) -> bool:
"""Check if the item matches any of the rule IDs in the provided set."""
return any(rule_id in rule_ids for rule_id in item.contents.metadata.get("rule_ids", []))
def _process_imported_items(
imported_items_list: list[list[dict[str, Any]]],
item_type_description: str,
@@ -181,15 +177,13 @@ def kibana_import_rules( # noqa: PLR0915
rule_ids = {rule["rule_id"] for rule in rule_dicts}
with kibana:
cl = GenericCollection.default()
exception_dicts = [
d.contents.to_api_format()
for d in cl.items
if isinstance(d.contents, TOMLExceptionContents) and _matches_rule_ids(d, rule_ids)
exception_dicts: list[list[dict[str, Any]]] = [
d.contents.to_api_format() # type: ignore[reportAttributeAccessIssue, reportUnknownMemberType]
for d in cl.items_matching(TOMLExceptionContents, rule_ids)
]
action_connectors_dicts = [
d.contents.to_api_format()
for d in cl.items
if isinstance(d.contents, TOMLActionConnectorContents) and _matches_rule_ids(d, rule_ids)
action_connectors_dicts: list[list[dict[str, Any]]] = [
d.contents.to_api_format() # type: ignore[reportAttributeAccessIssue, reportUnknownMemberType]
for d in cl.items_matching(TOMLActionConnectorContents, rule_ids)
]
response, successful_rule_ids, results = RuleResource.import_rules( # type: ignore[reportUnknownMemberType]
rule_dicts,
@@ -207,8 +201,8 @@ def kibana_import_rules( # noqa: PLR0915
if response["errors"]:
_handle_response_errors(response) # type: ignore[reportUnknownArgumentType]
else:
_process_imported_items(exception_dicts, "exception list(s)", "list_id")
_process_imported_items(action_connectors_dicts, "action connector(s)", "id")
_process_imported_items(exception_dicts, "exception list(s)", "list_id") # type: ignore[reportUnknownArgumentType]
_process_imported_items(action_connectors_dicts, "action connector(s)", "id") # type: ignore[reportUnknownArgumentType]
return response, results # type: ignore[reportUnknownVariableType]
+23 -8
View File
@@ -300,7 +300,12 @@ def import_rules_into_repo( # noqa: PLR0912, PLR0913, PLR0915
exception_id = exception["list_id"]
if exception_id not in exception_list_rule_table:
exception_list_rule_table[exception_id] = []
exception_list_rule_table[exception_id].append({"id": contents["id"], "name": contents["name"]})
exception_list_rule_table[exception_id].append(
{
"id": contents.get("rule_id"),
"name": contents["name"],
}
)
if contents.get("actions"):
# If rule has actions with connectors, add them to the action_connector_rule_table under the action_id
@@ -308,7 +313,12 @@ def import_rules_into_repo( # noqa: PLR0912, PLR0913, PLR0915
action_id = action["id"]
if action_id not in action_connector_rule_table:
action_connector_rule_table[action_id] = []
action_connector_rule_table[action_id].append({"id": contents["id"], "name": contents["name"]})
action_connector_rule_table[action_id].append(
{
"id": contents.get("rule_id"),
"name": contents["name"],
}
)
# Build TOMLException Objects
if exceptions_import:
@@ -541,16 +551,21 @@ def _export_rules( # noqa: PLR0913
# Add exceptions to api format here and add to output_lines
if include_exceptions or include_action_connectors:
cl = GenericCollection.default()
# Get exceptions in API format
rule_ids = {r.id for r in rules}
# Get exceptions in API format (only those linked to the exported rules)
if include_exceptions:
exceptions = [d.contents.to_api_format() for d in cl.items if isinstance(d.contents, TOMLExceptionContents)]
exceptions = [e for sublist in exceptions for e in sublist]
exceptions_raw: list[list[dict[str, Any]]] = [
d.contents.to_api_format() # type: ignore[reportAttributeAccessIssue, reportUnknownMemberType]
for d in cl.items_matching(TOMLExceptionContents, rule_ids)
]
exceptions: list[dict[str, Any]] = [e for sublist in exceptions_raw for e in sublist]
output_lines.extend(json.dumps(e, sort_keys=True) for e in exceptions)
if include_action_connectors:
action_connectors = [
d.contents.to_api_format() for d in cl.items if isinstance(d.contents, TOMLActionConnectorContents)
action_connectors: list[list[dict[str, Any]]] = [
d.contents.to_api_format() # type: ignore[reportAttributeAccessIssue, reportUnknownMemberType]
for d in cl.items_matching(TOMLActionConnectorContents, rule_ids)
]
actions = [a for sublist in action_connectors for a in sublist]
actions: list[dict[str, Any]] = [a for sublist in action_connectors for a in sublist]
output_lines.extend(json.dumps(a, sort_keys=True) for a in actions)
_ = outfile.write_text("\n".join(output_lines) + "\n")
+1 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "detection_rules"
version = "1.5.54"
version = "1.5.56"
description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Securitys Detection Engine."
readme = "README.md"
requires-python = ">=3.12"