Files
Eric Forte 9736407ef3 [FR] [DAC] Initial Yaml Support (#5821)
* Initial Yaml Support
2026-04-10 11:29:15 -04:00

590 lines
26 KiB
Python

# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
"""Kibana cli commands."""
import re
import sys
from pathlib import Path
from typing import Any, cast
import click
import kql # type: ignore[reportMissingTypeStubs]
from kibana import RuleResource, Signal # type: ignore[reportMissingTypeStubs]
from .action_connector import (
TOMLActionConnector,
TOMLActionConnectorContents,
build_action_connector_objects,
parse_action_connector_results_from_api,
)
from .cli_utils import multi_collection
from .config import parse_rules_config
from .exception import TOMLException, TOMLExceptionContents, build_exception_objects, parse_exceptions_results_from_api
from .generic_loader import GenericCollection
from .main import root
from .misc import add_params, get_kibana_client, kibana_options, nested_set, raise_client_error
from .rule import TOMLRule, TOMLRuleContents, downgrade_contents_from_rule
from .rule_loader import RawRuleCollection, RuleCollection, update_metadata_from_file
from .schemas import definitions # noqa: TC001
from .utils import CUSTOM_RULES_KQL, format_command_options, rulename_to_filename
RULES_CONFIG = parse_rules_config()
@root.group("kibana")
@add_params(*kibana_options)
@click.pass_context
def kibana_group(ctx: click.Context, **kibana_kwargs: Any) -> None:
"""Commands for integrating with Kibana."""
_ = ctx.ensure_object(dict) # type: ignore[reportUnknownVariableType]
# only initialize an kibana client if the subcommand is invoked without help (hacky)
if sys.argv[-1] in ctx.help_option_names:
click.echo("Kibana client:")
click.echo(format_command_options(ctx))
else:
ctx.obj["kibana"] = get_kibana_client(**kibana_kwargs)
@kibana_group.command("upload-rule")
@multi_collection
@click.option("--replace-id", "-r", is_flag=True, help="Replace rule IDs with new IDs before export")
@click.pass_context
def upload_rule(ctx: click.Context, rules: RuleCollection, replace_id: bool) -> list[RuleResource]:
"""[Deprecated] Upload a list of rule .toml files to Kibana."""
kibana = ctx.obj["kibana"]
api_payloads: list[RuleResource] = []
click.secho(
"WARNING: This command is deprecated as of Elastic Stack version 9.0. Please use `kibana import-rules`.",
fg="yellow",
)
for rule in rules:
try:
payload = downgrade_contents_from_rule(rule, kibana.version, replace_id=replace_id)
except ValueError as e:
raise_client_error(f"{e} in version:{kibana.version}, for rule: {rule.name}", e, ctx=ctx)
api_payloads.append(RuleResource(payload))
with kibana:
results: list[RuleResource] = RuleResource.bulk_create_legacy(api_payloads) # type: ignore[reportUnknownMemberType]
success: list[str] = []
errors: list[str] = []
for result in results:
if "error" in result:
errors.append(f"{result['rule_id']} - {result['error']['message']}")
else:
success.append(result["rule_id"]) # type: ignore[reportUnknownArgumentType]
if success:
click.echo("Successful uploads:\n - " + "\n - ".join(success))
if errors:
click.echo("Failed uploads:\n - " + "\n - ".join(errors))
return results
@kibana_group.command("import-rules")
@multi_collection
@click.option("--overwrite", "-o", is_flag=True, help="Overwrite existing rules")
@click.option("--overwrite-exceptions", "-e", is_flag=True, help="Overwrite exceptions in existing rules")
@click.option(
"--overwrite-action-connectors",
"-ac",
is_flag=True,
help="Overwrite action connectors in existing rules",
)
@click.pass_context
def kibana_import_rules( # noqa: PLR0915
ctx: click.Context,
rules: RuleCollection,
overwrite: bool = False,
overwrite_exceptions: bool = False,
overwrite_action_connectors: bool = False,
) -> tuple[dict[str, Any], list[RuleResource]]:
"""Import rules into Kibana."""
def _handle_response_errors(response: dict[str, Any]) -> None:
"""Handle errors from the import response."""
def _parse_list_id(s: str) -> str | None:
"""Parse the list ID from the error message."""
match = re.search(r'list_id: "(.*?)"', s)
return match.group(1) if match else None
# Re-try to address known Kibana issue: https://github.com/elastic/kibana/issues/143864
workaround_errors: list[str] = []
workaround_error_types: set[str] = set()
flattened_exceptions: list[dict[str, Any]] = [e for sublist in exception_dicts for e in sublist]
all_exception_list_ids: set[str] = {exception["list_id"] for exception in flattened_exceptions}
click.echo(f"{len(response['errors'])} rule(s) failed to import!")
action_connector_validation_error = "Error validating create data"
action_connector_type_error = "expected value of type [string] but got [undefined]"
for error in response["errors"]:
error_message = error["error"]["message"]
click.echo(f" - {error['rule_id']}: ({error['error']['status_code']}) {error_message}")
if "references a non existent exception list" in error_message:
list_id = _parse_list_id(error_message)
if list_id in all_exception_list_ids:
workaround_errors.append(error["rule_id"])
workaround_error_types.add("non existent exception list")
if action_connector_validation_error in error_message and action_connector_type_error in error_message:
workaround_error_types.add("connector still being built")
if workaround_errors:
workaround_errors = list(set(workaround_errors))
if "non existent exception list" in workaround_error_types:
click.echo(
f"Missing exception list errors detected for {len(workaround_errors)} rules. "
"Try re-importing using the following command and rule IDs:\n"
)
click.echo("python -m detection_rules kibana import-rules -o ", nl=False)
click.echo(" ".join(f"-id {rule_id}" for rule_id in workaround_errors))
click.echo()
if "connector still being built" in workaround_error_types:
click.echo(
f"Connector still being built errors detected for {len(workaround_errors)} rules. "
"Please try re-importing the rules again."
)
click.echo()
def _process_imported_items(
imported_items_list: list[list[dict[str, Any]]],
item_type_description: str,
item_key: str,
) -> None:
"""Displays appropriately formatted success message that all items imported successfully."""
all_ids = {item[item_key] for sublist in imported_items_list for item in sublist}
if all_ids:
click.echo(f"{len(all_ids)} {item_type_description} successfully imported")
ids_str = "\n - ".join(all_ids)
click.echo(f" - {ids_str}")
kibana = ctx.obj["kibana"]
rule_dicts = [r.contents.to_api_format() for r in rules]
rule_ids = {rule["rule_id"] for rule in rule_dicts}
with kibana:
cl = GenericCollection.default()
exception_dicts: list[list[dict[str, Any]]] = [
d.contents.to_api_format() # type: ignore[reportAttributeAccessIssue, reportUnknownMemberType]
for d in cl.items_matching(TOMLExceptionContents, rule_ids)
]
action_connectors_dicts: list[list[dict[str, Any]]] = [
d.contents.to_api_format() # type: ignore[reportAttributeAccessIssue, reportUnknownMemberType]
for d in cl.items_matching(TOMLActionConnectorContents, rule_ids)
]
response, successful_rule_ids, results = RuleResource.import_rules( # type: ignore[reportUnknownMemberType]
rule_dicts,
exception_dicts,
action_connectors_dicts,
overwrite=overwrite,
overwrite_exceptions=overwrite_exceptions,
overwrite_action_connectors=overwrite_action_connectors,
)
if successful_rule_ids:
click.echo(f"{len(successful_rule_ids)} rule(s) successfully imported") # type: ignore[reportUnknownArgumentType]
rule_str = "\n - ".join(successful_rule_ids) # type: ignore[reportUnknownArgumentType]
click.echo(f" - {rule_str}")
if response["errors"]:
_handle_response_errors(response) # type: ignore[reportUnknownArgumentType]
else:
_process_imported_items(exception_dicts, "exception list(s)", "list_id") # type: ignore[reportUnknownArgumentType]
_process_imported_items(action_connectors_dicts, "action connector(s)", "id") # type: ignore[reportUnknownArgumentType]
return response, results # type: ignore[reportUnknownVariableType]
@kibana_group.command("export-rules")
@click.option("--directory", "-d", required=True, type=Path, help="Directory to export rules to")
@click.option(
"--action-connectors-directory", "-acd", required=False, type=Path, help="Directory to export action connectors to"
)
@click.option("--exceptions-directory", "-ed", required=False, type=Path, help="Directory to export exceptions to")
@click.option("--default-author", "-da", type=str, required=False, help="Default author for rules missing one")
@click.option("--rule-id", "-r", multiple=True, help="Optional Rule IDs to restrict export to")
@click.option(
"--rule-name",
"-rn",
required=False,
help="Optional Rule name to restrict export to (KQL, case-insensitive, supports wildcards)",
)
@click.option("--export-action-connectors", "-ac", is_flag=True, help="Include action connectors in export")
@click.option("--export-exceptions", "-e", is_flag=True, help="Include exceptions in export")
@click.option("--skip-errors", "-s", is_flag=True, help="Skip errors when exporting rules")
@click.option(
"--save-as-yaml",
"-sy",
is_flag=True,
default=False,
help="Save exported rules and objects as YAML into --directory (instead of TOML)",
)
@click.option("--strip-version", "-sv", is_flag=True, help="Strip the version fields from all rules")
@click.option(
"--no-tactic-filename",
"-nt",
is_flag=True,
help="Exclude tactic prefix in exported filenames for rules. "
"Use same flag for import-rules to prevent warnings and disable its unit test.",
)
@click.option("--local-creation-date", "-lc", is_flag=True, help="Preserve the local creation date of the rule")
@click.option("--local-updated-date", "-lu", is_flag=True, help="Preserve the local updated date of the rule")
@click.option("--custom-rules-only", "-cro", is_flag=True, help="Only export custom rules")
@click.option(
"--export-query",
"-eq",
type=str,
required=False,
help=(
"Apply a query filter to exporting rules e.g. "
'"alert.attributes.tags: \\"test\\"" to filter for rules that have the tag "test"'
),
)
@click.option(
"--load-rule-loading",
"-lr",
is_flag=True,
help="Enable arbitrary rule loading from the rules directories (Can be very slow!)",
)
@click.pass_context
def kibana_export_rules( # noqa: PLR0912, PLR0913, PLR0915
ctx: click.Context,
directory: Path,
action_connectors_directory: Path | None,
exceptions_directory: Path | None,
default_author: str,
rule_id: list[str] | None = None,
rule_name: str | None = None,
export_action_connectors: bool = False,
export_exceptions: bool = False,
skip_errors: bool = False,
save_as_yaml: bool = False,
strip_version: bool = False,
no_tactic_filename: bool = False,
local_creation_date: bool = False,
local_updated_date: bool = False,
custom_rules_only: bool = False,
export_query: str | None = None,
load_rule_loading: bool = False,
) -> list[TOMLRule]:
"""Export rules from Kibana."""
def _raise_missing_path(message: str) -> None:
raise ValueError(message)
kibana = ctx.obj["kibana"]
kibana_include_details = export_exceptions or export_action_connectors or custom_rules_only or export_query
# Only allow one of rule_id or rule_name
if rule_name and rule_id:
raise click.UsageError("Cannot use --rule-id and --rule-name together. Please choose one.")
raw_rule_collection = RawRuleCollection()
if load_rule_loading:
raw_rule_collection = raw_rule_collection.default()
with kibana:
# Look up rule IDs by name if --rule-name was provided
if rule_name:
found = RuleResource.find(filter=f"alert.attributes.name:{rule_name}") # type: ignore[reportUnknownMemberType]
rule_id = [r["rule_id"] for r in found] # type: ignore[reportUnknownVariableType]
if not rule_id:
click.echo(
f"No rules found to export matching the provided name '{rule_name}' "
f"using filter 'alert.attributes.name:{rule_name}'"
)
return []
query = (
export_query
if not custom_rules_only
else (f"({CUSTOM_RULES_KQL}){f' and ({export_query})' if export_query else ''}")
)
results = ( # type: ignore[reportUnknownVariableType]
RuleResource.bulk_export(rule_ids=list(rule_id), query=query) # type: ignore[reportArgumentType]
if query
else RuleResource.export_rules(list(rule_id), exclude_export_details=not kibana_include_details) # type: ignore[reportArgumentType]
)
# Handle Exceptions Directory Location
if results and exceptions_directory:
exceptions_directory.mkdir(parents=True, exist_ok=True)
exceptions_directory = exceptions_directory or RULES_CONFIG.exception_dir
if not exceptions_directory and export_exceptions:
click.echo("Warning: Exceptions export requested, but no exceptions directory found")
# Handle Actions Connector Directory Location
if results and action_connectors_directory:
action_connectors_directory.mkdir(parents=True, exist_ok=True)
action_connectors_directory = action_connectors_directory or RULES_CONFIG.action_connector_dir
if not action_connectors_directory and export_action_connectors:
click.echo("Warning: Action Connector export requested, but no Action Connector directory found")
if results:
directory.mkdir(parents=True, exist_ok=True)
else:
click.echo("No rules found to export")
return []
rules_results = results # type: ignore[reportUnknownVariableType]
action_connector_results = []
exception_results = []
results_len = len(results) # type: ignore[reportUnknownVariableType]
if kibana_include_details:
# Assign counts to variables
results_len = results_len - 1
rules_count = results[-1]["exported_rules_count"] # type: ignore[reportUnknownVariableType]
exception_list_count = results[-1]["exported_exception_list_count"] # type: ignore[reportUnknownVariableType]
exception_list_item_count = results[-1]["exported_exception_list_item_count"] # type: ignore[reportUnknownVariableType]
action_connector_count = results[-1]["exported_action_connector_count"] # type: ignore[reportUnknownVariableType]
# Parse rules results and exception results from API return
rules_results = results[:rules_count] # type: ignore[reportUnknownVariableType]
exception_results = results[rules_count : rules_count + exception_list_count + exception_list_item_count] # type: ignore[reportUnknownVariableType]
rules_and_exceptions_count = rules_count + exception_list_count + exception_list_item_count # type: ignore[reportUnknownVariableType]
action_connector_results = results[ # type: ignore[reportUnknownVariableType]
rules_and_exceptions_count : rules_and_exceptions_count + action_connector_count
]
errors: list[str] = []
exported: list[TOMLRule] = []
exception_list_rule_table: dict[str, list[dict[str, Any]]] = {}
action_connector_rule_table: dict[str, list[dict[str, Any]]] = {}
for rule_resource in rules_results: # type: ignore[reportUnknownVariableType]
try:
if strip_version:
rule_resource.pop("revision", None) # type: ignore[reportUnknownMemberType]
rule_resource.pop("version", None) # type: ignore[reportUnknownMemberType]
rule_resource["author"] = rule_resource.get("author") or default_author or [rule_resource.get("created_by")] # type: ignore[reportUnknownMemberType]
if isinstance(rule_resource["author"], str):
rule_resource["author"] = [rule_resource["author"]]
# Inherit maturity and optionally local dates from the rule if it already exists
params: dict[str, Any] = {
"rule": rule_resource,
"maturity": "development",
}
threat = rule_resource.get("threat") # type: ignore[reportUnknownMemberType]
first_tactic = threat[0].get("tactic").get("name") if threat else "" # type: ignore[reportUnknownMemberType]
# Check if flag or config is set to not include tactic in the filename
no_tactic_filename = no_tactic_filename or RULES_CONFIG.no_tactic_filename
# Check if the flag is set to not include tactic in the filename
tactic_name = first_tactic if not no_tactic_filename else None # type: ignore[reportUnknownMemberType]
rule_name = rulename_to_filename(rule_resource.get("name"), tactic_name=tactic_name) # type: ignore[reportUnknownMemberType]
save_path = directory / f"{rule_name}"
# Get local rule data if load_rule_loading is enabled. If not enabled rules variable will be None.
local_rule: dict[str, Any] = params.get("rule", {})
input_rule_id: str | None = None
if local_rule:
input_rule_id = cast("definitions.UUIDString", local_rule.get("rule_id"))
if input_rule_id and input_rule_id in raw_rule_collection.id_map:
save_path = raw_rule_collection.id_map[input_rule_id].path or save_path
params.update(
update_metadata_from_file(
save_path, {"creation_date": local_creation_date, "updated_date": local_updated_date}
)
)
contents = TOMLRuleContents.from_rule_resource(**params) # type: ignore[reportArgumentType]
rule = TOMLRule(contents=contents, path=save_path)
except Exception as e:
if skip_errors:
print(f"- skipping {rule_resource.get('name')} - {type(e).__name__}") # type: ignore[reportUnknownMemberType]
errors.append(f"- {rule_resource.get('name')} - {e}") # type: ignore[reportUnknownMemberType]
continue
raise
if rule.contents.data.exceptions_list:
# For each item in rule.contents.data.exceptions_list to the exception_list_rule_table under the list_id
for exception in rule.contents.data.exceptions_list:
exception_id = exception["list_id"]
if exception_id not in exception_list_rule_table:
exception_list_rule_table[exception_id] = []
exception_list_rule_table[exception_id].append({"id": rule.id, "name": rule.name})
if rule.contents.data.actions:
# use connector ids as rule source
for action in rule.contents.data.actions:
action_id = action["id"]
if action_id not in action_connector_rule_table:
action_connector_rule_table[action_id] = []
action_connector_rule_table[action_id].append({"id": rule.id, "name": rule.name})
exported.append(rule)
# Parse exceptions results from API return
exceptions = []
if export_exceptions:
exceptions_containers = {}
exceptions_items = {}
exceptions_containers, exceptions_items, parse_errors, _ = parse_exceptions_results_from_api(exception_results) # type: ignore[reportArgumentType]
errors.extend(parse_errors)
# Build TOMLException Objects
exceptions, e_output, e_errors = build_exception_objects(
exceptions_containers,
exceptions_items,
exception_list_rule_table,
exceptions_directory if exceptions_directory else None,
save_toml=False,
skip_errors=skip_errors,
verbose=False,
)
for line in e_output:
click.echo(line)
errors.extend(e_errors)
# Parse action connector results from API return
action_connectors = []
if export_action_connectors:
action_connector_results, _ = parse_action_connector_results_from_api(action_connector_results) # type: ignore[reportArgumentType]
# Build TOMLActionConnector Objects
action_connectors, ac_output, ac_errors = build_action_connector_objects(
action_connector_results,
action_connector_rule_table,
action_connectors_directory=action_connectors_directory if action_connectors_directory else None,
save_toml=False,
skip_errors=skip_errors,
verbose=False,
)
for line in ac_output:
click.echo(line)
errors.extend(ac_errors)
saved: list[TOMLRule] = []
for rule in exported:
try:
if save_as_yaml:
rule_path = rule.path
if isinstance(rule_path, Path):
rule.save_yaml(directory / rule_path.name)
else:
_raise_missing_path(f"Can't save rule {rule.name} ({rule.id}) without a path")
else:
rule.save_toml()
except Exception as e:
if skip_errors:
print(f"- skipping {rule.contents.data.name} - {type(e).__name__}")
errors.append(f"- {rule.contents.data.name} - {e}")
continue
raise
saved.append(rule)
saved_exceptions: list[TOMLException] = []
for exception in exceptions:
try:
if save_as_yaml:
exception_path = exception.path
if isinstance(exception_path, Path):
exception.save_yaml(directory / exception_path.name)
else:
_raise_missing_path(f"Can't save exception {exception.name} without a path")
else:
exception.save_toml()
except Exception as e:
if skip_errors:
print(f"- skipping {exception.rule_name} - {type(e).__name__}") # type: ignore[reportUnknownMemberType]
errors.append(f"- {exception.rule_name} - {e}") # type: ignore[reportUnknownMemberType]
continue
raise
saved_exceptions.append(exception)
saved_action_connectors: list[TOMLActionConnector] = []
for action in action_connectors:
try:
if save_as_yaml:
action.save_yaml(directory / action.path.name)
else:
action.save_toml()
except Exception as e:
if skip_errors:
print(f"- skipping {action.name} - {type(e).__name__}")
errors.append(f"- {action.name} - {e}")
continue
raise
saved_action_connectors.append(action)
click.echo(f"{results_len} results exported") # type: ignore[reportUnknownArgumentType]
click.echo(f"{len(exported)} rules converted")
click.echo(f"{len(exceptions)} exceptions exported")
click.echo(f"{len(action_connectors)} action connectors exported")
click.echo(f"{len(saved)} rules saved to {directory}")
click.echo(f"{len(saved_exceptions)} exception lists saved to {exceptions_directory}")
click.echo(f"{len(saved_action_connectors)} action connectors saved to {action_connectors_directory}")
if errors:
err_file = directory / "_errors.txt"
_ = err_file.write_text("\n".join(errors))
click.echo(f"{len(errors)} errors saved to {err_file}")
return exported
@kibana_group.command("search-alerts")
@click.argument("query", required=False)
@click.option("--date-range", "-d", type=(str, str), default=("now-7d", "now"), help="Date range to scope search")
@click.option("--columns", "-c", multiple=True, help="Columns to display in table")
@click.option("--extend", "-e", is_flag=True, help="If columns are specified, extend the original columns")
@click.option("--max-count", "-m", default=100, help="The max number of alerts to return")
@click.pass_context
def search_alerts( # noqa: PLR0913
ctx: click.Context,
query: str,
date_range: tuple[str, str],
columns: list[str],
extend: bool,
max_count: int,
) -> None:
"""Search detection engine alerts with KQL."""
from eql.table import Table # type: ignore[reportMissingTypeStubs]
from .eswrap import MATCH_ALL, add_range_to_dsl
kibana = ctx.obj["kibana"]
start_time, end_time = date_range
kql_query = kql.to_dsl(query) if query else MATCH_ALL # type: ignore[reportUnknownMemberType]
add_range_to_dsl(kql_query["bool"].setdefault("filter", []), start_time, end_time) # type: ignore[reportUnknownArgumentType]
with kibana:
alerts = [a["_source"] for a in Signal.search({"query": kql_query}, size=max_count)["hits"]["hits"]] # type: ignore[reportUnknownMemberType]
# check for events with nested signal fields
if alerts:
table_columns = ["host.hostname"]
if "signal" in alerts[0]:
table_columns += ["signal.rule.name", "signal.status", "signal.original_time"]
elif "kibana.alert.rule.name" in alerts[0]:
table_columns += ["kibana.alert.rule.name", "kibana.alert.status", "kibana.alert.original_time"]
else:
table_columns += ["rule.name", "@timestamp"]
if columns:
columns = list(columns)
table_columns = table_columns + columns if extend else columns
# Table requires the data to be nested, but depending on the version, some data uses dotted keys, so
# they must be nested explicitly
for alert in alerts: # type: ignore[reportUnknownVariableType]
for key in table_columns:
if key in alert:
nested_set(alert, key, alert[key]) # type: ignore[reportUnknownArgumentType]
click.echo(Table.from_list(table_columns, alerts)) # type: ignore[reportUnknownMemberType]
else:
click.echo("No alerts detected")