diff --git a/detection_rules/kbwrap.py b/detection_rules/kbwrap.py index b6802b7ef..e52dd507c 100644 --- a/detection_rules/kbwrap.py +++ b/detection_rules/kbwrap.py @@ -206,6 +206,17 @@ def kibana_import_rules(ctx: click.Context, rules: RuleCollection, overwrite: Op "Use same flag for import-rules to prevent warnings and disable its unit test.") @click.option("--local-creation-date", "-lc", is_flag=True, help="Preserve the local creation date of the rule") @click.option("--local-updated-date", "-lu", is_flag=True, help="Preserve the local updated date of the rule") +@click.option("--custom-rules-only", "-cro", is_flag=True, help="Only export custom rules") +@click.option( + "--export-query", + "-eq", + type=str, + required=False, + help=( + "Apply a query filter to exporting rules e.g. " + "\"alert.attributes.tags: \\\"test\\\"\" to filter for rules that have the tag \"test\"" + ) +) @click.pass_context def kibana_export_rules(ctx: click.Context, directory: Path, action_connectors_directory: Optional[Path], exceptions_directory: Optional[Path], default_author: str, @@ -213,7 +224,8 @@ def kibana_export_rules(ctx: click.Context, directory: Path, action_connectors_d export_action_connectors: bool = False, export_exceptions: bool = False, skip_errors: bool = False, strip_version: bool = False, no_tactic_filename: bool = False, local_creation_date: bool = False, - local_updated_date: bool = False) -> List[TOMLRule]: + local_updated_date: bool = False, custom_rules_only: bool = False, + export_query: Optional[str] = None) -> List[TOMLRule]: """Export custom rules from Kibana.""" kibana = ctx.obj["kibana"] kibana_include_details = export_exceptions or export_action_connectors @@ -227,8 +239,21 @@ def kibana_export_rules(ctx: click.Context, directory: Path, action_connectors_d if rule_name: found = RuleResource.find(filter=f"alert.attributes.name:{rule_name}") rule_id = [r["rule_id"] for r in found] - results = RuleResource.export_rules(list(rule_id), exclude_export_details=not kibana_include_details) + query = ( + export_query if not custom_rules_only + else ( + f"alert.attributes.params.ruleSource.type: \"internal\"" + f"{f' and ({export_query})' if export_query else ''}" + ) + ) + results = ( + RuleResource.bulk_export(rule_ids=list(rule_id), query=query) + if query + else RuleResource.export_rules( + list(rule_id), exclude_export_details=not kibana_include_details + ) + ) # Handle Exceptions Directory Location if results and exceptions_directory: exceptions_directory.mkdir(parents=True, exist_ok=True) diff --git a/lib/kibana/kibana/resources.py b/lib/kibana/kibana/resources.py index b29199210..811405681 100644 --- a/lib/kibana/kibana/resources.py +++ b/lib/kibana/kibana/resources.py @@ -4,7 +4,7 @@ # 2.0. import datetime -from typing import Any, List, Optional, Type +from typing import List, Optional, Type import json @@ -138,7 +138,7 @@ class RuleResource(BaseResource): cls, action: definitions.RuleBulkActions, rule_ids: Optional[List[str]] = None, query: Optional[str] = None, dry_run: Optional[bool] = False, edit_object: Optional[list[definitions.RuleBulkEditActionTypes]] = None, include_exceptions: Optional[bool] = False, **kwargs - ) -> (dict, List['RuleResource']): + ) -> dict | List['RuleResource']: """Perform a bulk action on rules using the _bulk_action API.""" assert not (rule_ids and query), 'Cannot provide both rule_ids and query' @@ -155,17 +155,11 @@ class RuleResource(BaseResource): data['rule_ids'] = rule_ids response = Kibana.current().post(cls.BASE_URI + "/_bulk_action", params=params, data=data, **kwargs) - # export returns ndjson, which requires manual parsing since response.json() fails + # export returns ndjson if action == 'export': - response = [json.loads(r) for r in response.text.splitlines()] - result_ids = [r['rule_id'] for r in response if 'rule_id' in r] - else: - results = response['attributes']['results'] - result_ids = [r['rule_id'] for r in results['updated']] - result_ids.extend([r['rule_id'] for r in results['created']]) + response = [cls(r) for r in [json.loads(r) for r in response.text.splitlines()]] - rule_resources = cls.export_rules(result_ids) - return response, rule_resources + return response @classmethod def bulk_enable( diff --git a/lib/kibana/pyproject.toml b/lib/kibana/pyproject.toml index 7a703adab..a9f01f974 100644 --- a/lib/kibana/pyproject.toml +++ b/lib/kibana/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "detection-rules-kibana" -version = "0.4.3" +version = "0.4.4" description = "Kibana API utilities for Elastic Detection Rules" license = {text = "Elastic License v2"} keywords = ["Elastic", "Kibana", "Detection Rules", "Security", "Elasticsearch"] diff --git a/pyproject.toml b/pyproject.toml index 4878ac6c3..626282605 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "detection_rules" -version = "1.2.12" +version = "1.2.13" description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine." readme = "README.md" requires-python = ">=3.12"