Files
sigma-rules/detection_rules/kbwrap.py
T
David French 90aa65aed3 Generate detection rule to alert on traffic to typosquatting/homonym domains (#1199)
* create new cli commands

* add kibana object to create_dnstwist_rule

* Adding code for index-dnstwist-results

* Changed es to es_client

* Tested. it works!

* flake8-ed

* Adding timestamps

* use eql.utils.load_dump to load json file

* rename data to dnstwist_data

* start working on create-dnstwist-rule command

* add print statements for user

* tweak formatting for line length

* add template threat match rule file

* continue working on threat match rule creation

* create rule using TomlRuleContents

* save rule to toml file

* Moving rule creation to eswrap.py

* Moving create dnstwist rule stuff to eswrap

* Fixed imports

* flake8 fixes

* More flake8 fixes

* fix usage of @add_client('kibana')

* use ctx.invoke to upload rule

* cleanup record assembly and use bulk api

* swap order of notes in `note` for sample rule

* small modifications

* move command to root click group

* remove unused click group

* Update detection_rules/main.py

Co-authored-by: Justin Ibarra <brokensound77@users.noreply.github.com>

* remove rule upload and convert template to ndjson

* Adding docs for typosquatting rule

* renaming the file

* Adding a note

* separate index and rule prep commands

* Final changes

Co-authored-by: Apoorva <appujo@gmail.com>
Co-authored-by: brokensound77 <brokensound77@users.noreply.github.com>
Co-authored-by: Apoorva Joshi <30438249+ajosh0504@users.noreply.github.com>
2021-09-03 13:35:59 -07:00

92 lines
3.2 KiB
Python

# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
"""Kibana cli commands."""
import uuid
import click
import kql
from kibana import Signal, RuleResource
from .cli_utils import multi_collection
from .main import root
from .misc import add_params, client_error, kibana_options, get_kibana_client
from .schemas import downgrade
from .utils import format_command_options
@root.group('kibana')
@add_params(*kibana_options)
@click.pass_context
def kibana_group(ctx: click.Context, **kibana_kwargs):
"""Commands for integrating with Kibana."""
ctx.ensure_object(dict)
# only initialize an kibana client if the subcommand is invoked without help (hacky)
if click.get_os_args()[-1] in ctx.help_option_names:
click.echo('Kibana client:')
click.echo(format_command_options(ctx))
else:
ctx.obj['kibana'] = get_kibana_client(**kibana_kwargs)
@kibana_group.command("upload-rule")
@multi_collection
@click.option('--replace-id', '-r', is_flag=True, help='Replace rule IDs with new IDs before export')
@click.pass_context
def upload_rule(ctx, rules, replace_id):
"""Upload a list of rule .toml files to Kibana."""
kibana = ctx.obj['kibana']
api_payloads = []
for rule in rules:
try:
payload = rule.contents.to_api_format()
payload.setdefault("meta", {}).update(rule.contents.metadata.to_dict())
if replace_id:
payload["rule_id"] = str(uuid.uuid4())
payload = downgrade(payload, target_version=kibana.version)
except ValueError as e:
client_error(f'{e} in version:{kibana.version}, for rule: {rule.name}', e, ctx=ctx)
rule = RuleResource(payload)
api_payloads.append(rule)
with kibana:
rules = RuleResource.bulk_create(api_payloads)
click.echo(f"Successfully uploaded {len(rules)} rules")
@kibana_group.command('search-alerts')
@click.argument('query', required=False)
@click.option('--date-range', '-d', type=(str, str), default=('now-7d', 'now'), help='Date range to scope search')
@click.option('--columns', '-c', multiple=True, help='Columns to display in table')
@click.option('--extend', '-e', is_flag=True, help='If columns are specified, extend the original columns')
@click.pass_context
def search_alerts(ctx, query, date_range, columns, extend):
"""Search detection engine alerts with KQL."""
from eql.table import Table
from .eswrap import MATCH_ALL, add_range_to_dsl
kibana = ctx.obj['kibana']
start_time, end_time = date_range
kql_query = kql.to_dsl(query) if query else MATCH_ALL
add_range_to_dsl(kql_query['bool'].setdefault('filter', []), start_time, end_time)
with kibana:
alerts = [a['_source'] for a in Signal.search({'query': kql_query})['hits']['hits']]
table_columns = ['host.hostname', 'signal.rule.name', 'signal.status', 'signal.original_time']
if columns:
columns = list(columns)
table_columns = table_columns + columns if extend else columns
click.echo(Table.from_list(table_columns, alerts))
return alerts