c567d3731a
* Refresh Kibana module with API updates * add import/export commands * rename repo commands * add RawRuleCollection and DictRule objects * save exported rules to files; rule.from_rule_resource * strip unknown fields in schema * add remote cli test * update docs * bump kibana lib version --------- Co-authored-by: brokensound77 <brokensound77@users.noreply.github.com>
212 lines
8.1 KiB
Python
212 lines
8.1 KiB
Python
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
|
# or more contributor license agreements. Licensed under the Elastic License
|
|
# 2.0; you may not use this file except in compliance with the Elastic License
|
|
# 2.0.
|
|
|
|
"""Kibana cli commands."""
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Iterable, List, Optional
|
|
|
|
import click
|
|
|
|
import kql
|
|
from kibana import Signal, RuleResource
|
|
|
|
from .cli_utils import multi_collection
|
|
from .main import root
|
|
from .misc import add_params, client_error, kibana_options, get_kibana_client, nested_set
|
|
from .rule import downgrade_contents_from_rule, TOMLRuleContents, TOMLRule
|
|
from .rule_loader import RuleCollection
|
|
from .utils import format_command_options, rulename_to_filename
|
|
|
|
|
|
@root.group('kibana')
|
|
@add_params(*kibana_options)
|
|
@click.pass_context
|
|
def kibana_group(ctx: click.Context, **kibana_kwargs):
|
|
"""Commands for integrating with Kibana."""
|
|
ctx.ensure_object(dict)
|
|
|
|
# only initialize an kibana client if the subcommand is invoked without help (hacky)
|
|
if sys.argv[-1] in ctx.help_option_names:
|
|
click.echo('Kibana client:')
|
|
click.echo(format_command_options(ctx))
|
|
|
|
else:
|
|
ctx.obj['kibana'] = get_kibana_client(**kibana_kwargs)
|
|
|
|
|
|
@kibana_group.command("upload-rule")
|
|
@multi_collection
|
|
@click.option('--replace-id', '-r', is_flag=True, help='Replace rule IDs with new IDs before export')
|
|
@click.pass_context
|
|
def upload_rule(ctx, rules: RuleCollection, replace_id):
|
|
"""Upload a list of rule .toml files to Kibana."""
|
|
kibana = ctx.obj['kibana']
|
|
api_payloads = []
|
|
|
|
for rule in rules:
|
|
try:
|
|
payload = downgrade_contents_from_rule(rule, kibana.version, replace_id=replace_id)
|
|
except ValueError as e:
|
|
client_error(f'{e} in version:{kibana.version}, for rule: {rule.name}', e, ctx=ctx)
|
|
|
|
rule = RuleResource(payload)
|
|
api_payloads.append(rule)
|
|
|
|
with kibana:
|
|
results = RuleResource.bulk_create_legacy(api_payloads)
|
|
|
|
success = []
|
|
errors = []
|
|
for result in results:
|
|
if 'error' in result:
|
|
errors.append(f'{result["rule_id"]} - {result["error"]["message"]}')
|
|
else:
|
|
success.append(result['rule_id'])
|
|
|
|
if success:
|
|
click.echo('Successful uploads:\n - ' + '\n - '.join(success))
|
|
if errors:
|
|
click.echo('Failed uploads:\n - ' + '\n - '.join(errors))
|
|
|
|
return results
|
|
|
|
|
|
@kibana_group.command('import-rules')
|
|
@multi_collection
|
|
@click.option('--overwrite', '-o', is_flag=True, help='Overwrite existing rules')
|
|
@click.option('--overwrite-exceptions', '-e', is_flag=True, help='Overwrite exceptions in existing rules')
|
|
@click.option('--overwrite-action-connectors', '-a', is_flag=True,
|
|
help='Overwrite action connectors in existing rules')
|
|
@click.pass_context
|
|
def kibana_import_rules(ctx: click.Context, rules: RuleCollection, overwrite: Optional[bool] = False,
|
|
overwrite_exceptions: Optional[bool] = False,
|
|
overwrite_action_connectors: Optional[bool] = False) -> (dict, List[RuleResource]):
|
|
"""Import custom rules into Kibana."""
|
|
kibana = ctx.obj['kibana']
|
|
rule_dicts = [r.contents.to_api_format() for r in rules]
|
|
with kibana:
|
|
response, successful_rule_ids, results = RuleResource.import_rules(
|
|
rule_dicts,
|
|
overwrite=overwrite,
|
|
overwrite_exceptions=overwrite_exceptions,
|
|
overwrite_action_connectors=overwrite_action_connectors
|
|
)
|
|
|
|
if successful_rule_ids:
|
|
click.echo(f'{len(successful_rule_ids)} rule(s) successfully imported')
|
|
rule_str = '\n - '.join(successful_rule_ids)
|
|
print(f' - {rule_str}')
|
|
if response['errors']:
|
|
click.echo(f'{len(response["errors"])} rule(s) failed to import!')
|
|
for error in response['errors']:
|
|
click.echo(f' - {error["rule_id"]}: ({error["error"]["status_code"]}) {error["error"]["message"]}')
|
|
|
|
return response, results
|
|
|
|
|
|
@kibana_group.command('export-rules')
|
|
@click.option('--directory', '-d', required=True, type=Path, help='Directory to export rules to')
|
|
@click.option('--rule-id', '-r', multiple=True, help='Optional Rule IDs to restrict export to')
|
|
@click.option('--skip-errors', '-s', is_flag=True, help='Skip errors when exporting rules')
|
|
@click.pass_context
|
|
def kibana_export_rules(ctx: click.Context, directory: Path,
|
|
rule_id: Optional[Iterable[str]] = None, skip_errors: bool = False) -> List[TOMLRule]:
|
|
"""Export custom rules from Kibana."""
|
|
kibana = ctx.obj['kibana']
|
|
with kibana:
|
|
results = RuleResource.export_rules(list(rule_id))
|
|
|
|
if results:
|
|
directory.mkdir(parents=True, exist_ok=True)
|
|
|
|
errors = []
|
|
exported = []
|
|
for rule_resource in results:
|
|
try:
|
|
contents = TOMLRuleContents.from_rule_resource(rule_resource, maturity='production')
|
|
threat = contents.data.get('threat')
|
|
first_tactic = threat[0].tactic.name if threat else ''
|
|
rule_name = rulename_to_filename(contents.data.name, tactic_name=first_tactic)
|
|
rule = TOMLRule(contents=contents, path=directory / f'{rule_name}')
|
|
except Exception as e:
|
|
if skip_errors:
|
|
print(f'- skipping {rule_resource.get("name")} - {type(e).__name__}')
|
|
errors.append(f'- {rule_resource.get("name")} - {e}')
|
|
continue
|
|
raise
|
|
|
|
exported.append(rule)
|
|
|
|
saved = []
|
|
for rule in exported:
|
|
try:
|
|
rule.save_toml()
|
|
except Exception as e:
|
|
if skip_errors:
|
|
print(f'- skipping {rule.contents.data.name} - {type(e).__name__}')
|
|
errors.append(f'- {rule.contents.data.name} - {e}')
|
|
continue
|
|
raise
|
|
|
|
saved.append(rule)
|
|
|
|
click.echo(f'{len(results)} rules exported')
|
|
click.echo(f'{len(exported)} rules converted')
|
|
click.echo(f'{len(saved)} saved to {directory}')
|
|
if errors:
|
|
err_file = directory / '_errors.txt'
|
|
err_file.write_text('\n'.join(errors))
|
|
click.echo(f'{len(errors)} errors saved to {err_file}')
|
|
|
|
return exported
|
|
|
|
|
|
@kibana_group.command('search-alerts')
|
|
@click.argument('query', required=False)
|
|
@click.option('--date-range', '-d', type=(str, str), default=('now-7d', 'now'), help='Date range to scope search')
|
|
@click.option('--columns', '-c', multiple=True, help='Columns to display in table')
|
|
@click.option('--extend', '-e', is_flag=True, help='If columns are specified, extend the original columns')
|
|
@click.option('--max-count', '-m', default=100, help='The max number of alerts to return')
|
|
@click.pass_context
|
|
def search_alerts(ctx, query, date_range, columns, extend, max_count):
|
|
"""Search detection engine alerts with KQL."""
|
|
from eql.table import Table
|
|
from .eswrap import MATCH_ALL, add_range_to_dsl
|
|
|
|
kibana = ctx.obj['kibana']
|
|
start_time, end_time = date_range
|
|
kql_query = kql.to_dsl(query) if query else MATCH_ALL
|
|
add_range_to_dsl(kql_query['bool'].setdefault('filter', []), start_time, end_time)
|
|
|
|
with kibana:
|
|
alerts = [a['_source'] for a in Signal.search({'query': kql_query}, size=max_count)['hits']['hits']]
|
|
|
|
# check for events with nested signal fields
|
|
if alerts:
|
|
table_columns = ['host.hostname']
|
|
|
|
if 'signal' in alerts[0]:
|
|
table_columns += ['signal.rule.name', 'signal.status', 'signal.original_time']
|
|
elif 'kibana.alert.rule.name' in alerts[0]:
|
|
table_columns += ['kibana.alert.rule.name', 'kibana.alert.status', 'kibana.alert.original_time']
|
|
else:
|
|
table_columns += ['rule.name', '@timestamp']
|
|
if columns:
|
|
columns = list(columns)
|
|
table_columns = table_columns + columns if extend else columns
|
|
|
|
# Table requires the data to be nested, but depending on the version, some data uses dotted keys, so
|
|
# they must be nested explicitly
|
|
for alert in alerts:
|
|
for key in table_columns:
|
|
if key in alert:
|
|
nested_set(alert, key, alert[key])
|
|
|
|
click.echo(Table.from_list(table_columns, alerts))
|
|
else:
|
|
click.echo('No alerts detected')
|
|
return alerts
|