Add test commands to search and survey rule hits (#485)

This commit is contained in:
Justin Ibarra
2020-11-17 23:08:00 +01:00
committed by GitHub
parent abea5d0779
commit ad4a2ef0eb
9 changed files with 571 additions and 123 deletions
+13 -13
View File
@@ -111,11 +111,11 @@ Usage: detection_rules es [OPTIONS] COMMAND [ARGS]...
Commands for integrating with Elasticsearch.
Options:
-e, --elasticsearch-url TEXT
-et, --timeout INTEGER Timeout for elasticsearch client
-ep, --es-password TEXT
-eu, --es-user TEXT
--cloud-id TEXT
-u, --es-user TEXT
-p, --es-password TEXT
-t, --timeout INTEGER Timeout for elasticsearch client
-e, --elasticsearch-url TEXT
-h, --help Show this message and exit.
Commands:
@@ -130,12 +130,12 @@ Usage: detection_rules kibana [OPTIONS] COMMAND [ARGS]...
Commands for integrating with Kibana.
Options:
-k, --kibana-url TEXT
--space TEXT Kibana space
-kp, --kibana-password TEXT
-ku, --kibana-user TEXT
--cloud-id TEXT
-u, --kibana-user TEXT
-p, --kibana-password TEXT
-t, --timeout INTEGER Timeout for kibana client
-h, --help Show this message and exit.
-k, --kibana-url TEXT
-h, --help Show this message and exit.
Commands:
upload-rule Upload a list of rule .toml files to Kibana.
@@ -153,11 +153,11 @@ python -m detection_rules kibana upload-rule -h
Kibana client:
Options:
-k, --kibana-url TEXT
--space TEXT Kibana space
-kp, --kibana-password TEXT
-ku, --kibana-user TEXT
--cloud-id TEXT
-u, --kibana-user TEXT
-p, --kibana-password TEXT
-t, --timeout INTEGER Timeout for kibana client
-k, --kibana-url TEXT
Usage: detection_rules kibana upload-rule [OPTIONS] TOML_FILES...
+154 -1
View File
@@ -9,15 +9,20 @@ import json
import os
import shutil
import subprocess
import time
import click
from elasticsearch import Elasticsearch
from eql import load_dump
from kibana.connector import Kibana
from . import rule_loader
from .eswrap import CollectEvents, add_range_to_dsl
from .main import root
from .misc import PYTHON_LICENSE, client_error
from .misc import PYTHON_LICENSE, add_client, client_error
from .packaging import PACKAGE_FILE, Package, manage_versions, RELEASE_DIR
from .rule import Rule
from .rule_loader import get_rule
from .utils import get_path
@@ -201,3 +206,151 @@ def license_check(ctx):
click.echo(relative_path, err=True)
ctx.exit(int(failed))
@dev_group.group('test')
def test_group():
"""Commands for testing against stack resources."""
@test_group.command('event-search')
@click.argument('query')
@click.option('--index', '-i', multiple=True, help='Index patterns to search against')
@click.option('--eql/--lucene', '-e/-l', 'language', default=None, help='Query language used (default: kql)')
@click.option('--date-range', '-d', type=(str, str), default=('now-7d', 'now'), help='Date range to scope search')
@click.option('--count', '-c', is_flag=True, help='Return count of results only')
@click.option('--max-results', '-m', type=click.IntRange(1, 1000), default=100,
help='Max results to return (capped at 1000)')
@click.option('--verbose', '-v', is_flag=True, default=True)
@add_client('elasticsearch')
def event_search(query, index, language, date_range, count, max_results, verbose=True,
elasticsearch_client: Elasticsearch = None):
"""Search using a query against an Elasticsearch instance."""
start_time, end_time = date_range
index = index or ('*',)
language_used = "kql" if language is None else "eql" if language is True else "lucene"
collector = CollectEvents(elasticsearch_client, max_results)
if verbose:
click.echo(f'searching {",".join(index)} from {start_time} to {end_time}')
click.echo(f'{language_used}: {query}')
if count:
results = collector.count(query, language_used, index, start_time, end_time)
click.echo(f'total results: {results}')
else:
results = collector.search(query, language_used, index, start_time, end_time, max_results)
click.echo(f'total results: {len(results)} (capped at {max_results})')
click.echo_via_pager(json.dumps(results, indent=2, sort_keys=True))
return results
@test_group.command('rule-event-search')
@click.argument('rule-file', type=click.Path(dir_okay=False), required=False)
@click.option('--rule-id', '-id')
@click.option('--date-range', '-d', type=(str, str), default=('now-7d', 'now'), help='Date range to scope search')
@click.option('--count', '-c', is_flag=True, help='Return count of results only')
@click.option('--max-results', '-m', type=click.IntRange(1, 1000), default=100,
help='Max results to return (capped at 1000)')
@click.option('--verbose', '-v', is_flag=True)
@click.pass_context
@add_client('elasticsearch')
def rule_event_search(ctx, rule_file, rule_id, date_range, count, max_results, verbose,
elasticsearch_client: Elasticsearch = None):
"""Search using a rule file against an Elasticsearch instance."""
rule = None
if rule_id:
rule = get_rule(rule_id, verbose=False)
elif rule_file:
rule = Rule(rule_file, load_dump(rule_file))
else:
client_error('Must specify a rule file or rule ID')
if rule.query and rule.contents.get('language'):
if verbose:
click.echo(f'Searching rule: {rule.name}')
rule_lang = rule.contents.get('language')
if rule_lang == 'kuery':
language = None
elif rule_lang == 'eql':
language = True
else:
language = False
ctx.invoke(event_search, query=rule.query, index=rule.contents.get('index', ['*']), language=language,
date_range=date_range, count=count, max_results=max_results, verbose=verbose,
elasticsearch_client=elasticsearch_client)
else:
client_error('Rule is not a query rule!')
@test_group.command('rule-survey')
@click.argument('query', required=False)
@click.option('--date-range', '-d', type=(str, str), default=('now-7d', 'now'), help='Date range to scope search')
@click.option('--dump-file', type=click.Path(dir_okay=False),
default=get_path('surveys', f'{time.strftime("%Y%m%dT%H%M%SL")}.json'),
help='Save details of results (capped at 1000 results/rule)')
@click.option('--hide-zero-counts', '-z', is_flag=True, help='Exclude rules with zero hits from printing')
@click.option('--hide-errors', '-e', is_flag=True, help='Exclude rules with errors from printing')
@click.pass_context
@add_client('elasticsearch', 'kibana', add_to_ctx=True)
def rule_survey(ctx: click.Context, query, date_range, dump_file, hide_zero_counts, hide_errors,
elasticsearch_client: Elasticsearch = None, kibana_client: Kibana = None):
"""Survey rule counts."""
from eql.table import Table
from kibana.resources import Signal
from . import rule_loader
from .main import search_rules
survey_results = []
start_time, end_time = date_range
if query:
rule_paths = [r['file'] for r in ctx.invoke(search_rules, query=query, verbose=False)]
rules = rule_loader.load_rules(rule_loader.load_rule_files(paths=rule_paths, verbose=False), verbose=False)
rules = rules.values()
else:
rules = rule_loader.load_rules(verbose=False).values()
click.echo(f'Running survey against {len(rules)} rules')
click.echo(f'Saving detailed dump to: {dump_file}')
collector = CollectEvents(elasticsearch_client)
details = collector.search_from_rule(*rules, start_time=start_time, end_time=end_time)
counts = collector.count_from_rule(*rules, start_time=start_time, end_time=end_time)
# add alerts
with kibana_client:
range_dsl = {'query': {'bool': {'filter': []}}}
add_range_to_dsl(range_dsl['query']['bool']['filter'], start_time, end_time)
alerts = {a['_source']['signal']['rule']['rule_id']: a['_source']
for a in Signal.search(range_dsl)['hits']['hits']}
for rule_id, count in counts.items():
alert_count = len(alerts.get(rule_id, []))
if alert_count > 0:
count['alert_count'] = alert_count
details[rule_id].update(count)
search_count = count['search_count']
if not alert_count and (hide_zero_counts and search_count == 0) or (hide_errors and search_count == -1):
continue
survey_results.append(count)
fields = ['rule_id', 'name', 'search_count', 'alert_count']
table = Table.from_list(fields, survey_results)
if len(survey_results) > 200:
click.echo_via_pager(table)
else:
click.echo(table)
os.makedirs(get_path('surveys'), exist_ok=True)
with open(dump_file, 'w') as f:
json.dump(details, f, indent=2, sort_keys=True)
return survey_results
+228 -80
View File
@@ -6,37 +6,56 @@
import json
import os
import time
from collections import defaultdict
from typing import Union
import click
from elasticsearch import AuthenticationException, Elasticsearch
import elasticsearch
from elasticsearch import Elasticsearch
from elasticsearch.client import AsyncSearchClient
import kql
from .main import root
from .misc import client_error, getdefault
from .misc import add_params, client_error, elasticsearch_options
from .utils import format_command_options, normalize_timing_and_sort, unix_time_to_formatted, get_path
from .rule import Rule
from .rule_loader import get_rule, rta_mappings
COLLECTION_DIR = get_path('collections')
MATCH_ALL = {'bool': {'filter': [{'match_all': {}}]}}
def get_es_client(es_user, es_password, elasticsearch_url=None, cloud_id=None, **kwargs):
"""Get an auth-validated elsticsearch client."""
assert elasticsearch_url or cloud_id, \
'You must specify a host or cloud_id to authenticate to an elasticsearch instance'
def get_elasticsearch_client(cloud_id=None, elasticsearch_url=None, es_user=None, es_password=None, ctx=None, **kwargs):
"""Get an authenticated elasticsearch client."""
if not (cloud_id or elasticsearch_url):
client_error("Missing required --cloud-id or --elasticsearch-url")
hosts = [elasticsearch_url] if elasticsearch_url else elasticsearch_url
# don't prompt for these until there's a cloud id or elasticsearch URL
es_user = es_user or click.prompt("es_user")
es_password = es_password or click.prompt("es_password", hide_input=True)
hosts = [elasticsearch_url] if elasticsearch_url else None
client = Elasticsearch(hosts=hosts, cloud_id=cloud_id, http_auth=(es_user, es_password), **kwargs)
# force login to test auth
client.info()
return client
try:
client = Elasticsearch(hosts=hosts, cloud_id=cloud_id, http_auth=(es_user, es_password), **kwargs)
# force login to test auth
client.info()
return client
except elasticsearch.AuthenticationException as e:
error_msg = f'Failed authentication for {elasticsearch_url or cloud_id}'
client_error(error_msg, e, ctx=ctx, err=True)
class Events(object):
def add_range_to_dsl(dsl_filter, start_time, end_time='now'):
dsl_filter.append(
{"range": {"@timestamp": {"gt": start_time, "lte": end_time, "format": "strict_date_optional_time"}}}
)
class RtaEvents(object):
"""Events collected from Elasticsearch."""
def __init__(self, agent_hostname, events):
self.agent_hostname = agent_hostname
self.events = self._normalize_event_timing(events)
def __init__(self, events):
self.events: dict = self._normalize_event_timing(events)
@staticmethod
def _normalize_event_timing(events):
@@ -46,7 +65,8 @@ class Events(object):
return events
def _get_dump_dir(self, rta_name=None):
@staticmethod
def _get_dump_dir(rta_name=None, host_id=None):
"""Prepare and get the dump path."""
if rta_name:
dump_dir = get_path('unit_tests', 'data', 'true_positives', rta_name)
@@ -54,7 +74,7 @@ class Events(object):
return dump_dir
else:
time_str = time.strftime('%Y%m%dT%H%M%SL')
dump_dir = os.path.join(COLLECTION_DIR, self.agent_hostname, time_str)
dump_dir = os.path.join(COLLECTION_DIR, host_id or 'unknown_host', time_str)
os.makedirs(dump_dir, exist_ok=True)
return dump_dir
@@ -81,11 +101,11 @@ class Events(object):
echo_fn = click.echo_via_pager if pager else click.echo
echo_fn(json.dumps(self.events, indent=2 if pretty else None, sort_keys=True))
def save(self, rta_name=None, dump_dir=None):
def save(self, rta_name=None, dump_dir=None, host_id=None):
"""Save collected events."""
assert self.events, 'Nothing to save. Run Collector.run() method first'
assert self.events, 'Nothing to save. Run Collector.run() method first or verify logging'
dump_dir = dump_dir or self._get_dump_dir(rta_name)
dump_dir = dump_dir or self._get_dump_dir(rta_name=rta_name, host_id=host_id)
for source, events in self.events.items():
path = os.path.join(dump_dir, source + '.jsonl')
@@ -98,8 +118,8 @@ class CollectEvents(object):
"""Event collector for elastic stack."""
def __init__(self, client, max_events=3000):
self.client = client
self.MAX_EVENTS = max_events
self.client: Elasticsearch = client
self.max_events = max_events
def _build_timestamp_map(self, index_str):
"""Build a mapping of indexes to timestamp data formats."""
@@ -107,16 +127,17 @@ class CollectEvents(object):
timestamp_map = {n: m['mappings'].get('properties', {}).get('@timestamp', {}) for n, m in mappings.items()}
return timestamp_map
def _get_current_time(self, agent_hostname, index_str):
def _get_last_event_time(self, index_str, dsl=None):
"""Get timestamp of most recent event."""
# https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-date-format.html
timestamp_map = self._build_timestamp_map(index_str)
last_event = self._search_window(agent_hostname, index_str, start_time='now-1m', size=1, sort='@timestamp:desc')
last_event = last_event['hits']['hits'][0]
last_event = self.client.search(dsl, index_str, size=1, sort='@timestamp:desc')['hits']['hits']
if not last_event:
return
last_event = last_event[0]
index = last_event['_index']
timestamp = last_event['_source']['@timestamp']
timestamp_map = self._build_timestamp_map(index_str)
event_date_format = timestamp_map[index].get('format', '').split('||')
# there are many native supported date formats and even custom data formats, but most, including beats use the
@@ -127,44 +148,183 @@ class CollectEvents(object):
return timestamp
def _search_window(self, agent_hostname, index_str, start_time, end_time='now', size=None, sort='@timestamp:asc',
**match):
"""Collect all events within a time window and parse by source."""
match = match.copy()
match.update({"agent.hostname": agent_hostname})
body = {"query": {"bool": {"filter": [
{"match": {"agent.hostname": agent_hostname}},
{"range": {"@timestamp": {"gt": start_time, "lte": end_time, "format": "strict_date_optional_time"}}}]
}}}
@staticmethod
def _prep_query(query, language, index, start_time=None, end_time=None):
"""Prep a query for search."""
index_str = ','.join(index if isinstance(index, (list, tuple)) else index.split(','))
lucene_query = query if language == 'lucene' else None
if match:
body['query']['bool']['filter'].extend([{'match': {k: v}} for k, v in match.items()])
if language in ('kql', 'kuery'):
formatted_dsl = {'query': kql.to_dsl(query)}
elif language == 'eql':
formatted_dsl = {'query': query, 'filter': MATCH_ALL}
elif language == 'lucene':
formatted_dsl = {'query': {'bool': {'filter': []}}}
elif language == 'dsl':
formatted_dsl = {'query': query}
else:
raise ValueError('Unknown search language')
return self.client.search(index=index_str, body=body, size=size or self.MAX_EVENTS, sort=sort)
if start_time or end_time:
end_time = end_time or 'now'
dsl = formatted_dsl['filter']['bool']['filter'] if language == 'eql' else \
formatted_dsl['query']['bool'].setdefault('filter', [])
add_range_to_dsl(dsl, start_time, end_time)
return index_str, formatted_dsl, lucene_query
def search(self, query, language, index: Union[str, list] = '*', start_time=None, end_time=None, size=None,
**kwargs):
"""Search an elasticsearch instance."""
index_str, formatted_dsl, lucene_query = self._prep_query(query=query, language=language, index=index,
start_time=start_time, end_time=end_time)
formatted_dsl.update(size=size or self.max_events)
if language == 'eql':
results = self.client.eql.search(body=formatted_dsl, index=index_str, **kwargs)['hits']
results = results.get('events') or results.get('sequences', [])
else:
results = self.client.search(body=formatted_dsl, q=lucene_query, index=index_str,
allow_no_indices=True, ignore_unavailable=True, **kwargs)['hits']['hits']
return results
def search_from_rule(self, *rules: Rule, start_time=None, end_time='now', size=None):
"""Search an elasticsearch instance using a rule."""
from .misc import nested_get
async_client = AsyncSearchClient(self.client)
survey_results = {}
def parse_unique_field_results(rule_type, unique_fields, search_results):
parsed_results = defaultdict(lambda: defaultdict(int))
hits = search_results['hits']
hits = hits['hits'] if rule_type != 'eql' else hits.get('events') or hits.get('sequences', [])
for hit in hits:
for field in unique_fields:
match = nested_get(hit['_source'], field)
match = ','.join(sorted(match)) if isinstance(match, list) else match
parsed_results[field][match] += 1
# if rule.type == eql, structure is different
return {'results': parsed_results} if parsed_results else {}
multi_search = []
multi_search_rules = []
async_searches = {}
eql_searches = {}
for rule in rules:
if not rule.query:
continue
index_str, formatted_dsl, lucene_query = self._prep_query(query=rule.query,
language=rule.contents.get('language'),
index=rule.contents.get('index', '*'),
start_time=start_time,
end_time=end_time)
formatted_dsl.update(size=size or self.max_events)
# prep for searches: msearch for kql | async search for lucene | eql client search for eql
if rule.contents['language'] == 'kuery':
multi_search_rules.append(rule)
multi_search.append(json.dumps(
{'index': index_str, 'allow_no_indices': 'true', 'ignore_unavailable': 'true'}))
multi_search.append(json.dumps(formatted_dsl))
elif rule.contents['language'] == 'lucene':
# wait for 0 to try and force async with no immediate results (not guaranteed)
result = async_client.submit(body=formatted_dsl, q=rule.query, index=index_str,
allow_no_indices=True, ignore_unavailable=True,
wait_for_completion_timeout=0)
if result['is_running'] is True:
async_searches[rule] = result['id']
else:
survey_results[rule.id] = parse_unique_field_results(rule.type, rule.unique_fields,
result['response'])
elif rule.contents['language'] == 'eql':
eql_body = {
'index': index_str,
'params': {'ignore_unavailable': 'true', 'allow_no_indices': 'true'},
'body': {'query': rule.query, 'filter': formatted_dsl['filter']}
}
eql_searches[rule] = eql_body
# assemble search results
multi_search_results = self.client.msearch('\n'.join(multi_search) + '\n')
for index, result in enumerate(multi_search_results['responses']):
try:
rule = multi_search_rules[index]
survey_results[rule.id] = parse_unique_field_results(rule.type, rule.unique_fields, result)
except KeyError:
survey_results[multi_search_rules[index].id] = {'error_retrieving_results': True}
for rule, search_args in eql_searches.items():
try:
result = self.client.eql.search(**search_args)
survey_results[rule.id] = parse_unique_field_results(rule.type, rule.unique_fields, result)
except (elasticsearch.NotFoundError, elasticsearch.RequestError) as e:
survey_results[rule.id] = {'error_retrieving_results': True, 'error': e.info['error']['reason']}
for rule, async_id in async_searches.items():
result = async_client.get(async_id)['response']
survey_results[rule.id] = parse_unique_field_results(rule.type, rule.unique_fields, result)
return survey_results
def count(self, query, language, index: Union[str, list], start_time=None, end_time='now'):
"""Get a count of documents from elasticsearch."""
index_str, formatted_dsl, lucene_query = self._prep_query(query=query, language=language, index=index,
start_time=start_time, end_time=end_time)
# EQL API has no count endpoint
if language == 'eql':
results = self.search(query=query, language=language, index=index, start_time=start_time, end_time=end_time,
size=1000)
return len(results)
else:
return self.client.count(body=formatted_dsl, index=index_str, q=lucene_query, allow_no_indices=True,
ignore_unavailable=True)['count']
def count_from_rule(self, *rules, start_time=None, end_time='now'):
"""Get a count of documents from elasticsearch using a rule."""
survey_results = {}
for rule in rules:
rule_results = {'rule_id': rule.id, 'name': rule.name}
if not rule.query:
continue
try:
rule_results['search_count'] = self.count(query=rule.query, language=rule.contents.get('language'),
index=rule.contents.get('index', '*'), start_time=start_time,
end_time=end_time)
except (elasticsearch.NotFoundError, elasticsearch.RequestError):
rule_results['search_count'] = -1
survey_results[rule.id] = rule_results
return survey_results
class CollectRtaEvents(CollectEvents):
"""Collect RTA events from elasticsearch."""
@staticmethod
def _group_events_by_type(events):
"""Group events by agent.type."""
event_by_type = {}
for event in events['hits']['hits']:
for event in events:
event_by_type.setdefault(event['_source']['agent']['type'], []).append(event['_source'])
return event_by_type
def run(self, agent_hostname, indexes, verbose=True, **match):
def run(self, dsl, indexes, start_time):
"""Collect the events."""
index_str = ','.join(indexes)
start_time = self._get_current_time(agent_hostname, index_str)
if verbose:
click.echo('Setting start of event capture to: {}'.format(click.style(start_time, fg='yellow')))
click.pause('Press any key once detonation is complete ...')
time.sleep(5)
events = self._group_events_by_type(self._search_window(agent_hostname, index_str, start_time, **match))
return Events(agent_hostname, events)
results = self.search(dsl, language='dsl', index=indexes, start_time=start_time, end_time='now', size=5000,
sort='@timestamp:asc')
events = self._group_events_by_type(results)
return RtaEvents(events)
@root.command('normalize-data')
@@ -172,18 +332,14 @@ class CollectEvents(object):
def normalize_data(events_file):
"""Normalize Elasticsearch data timestamps and sort."""
file_name = os.path.splitext(os.path.basename(events_file.name))[0]
events = Events('_', {file_name: [json.loads(e) for e in events_file.readlines()]})
events = RtaEvents({file_name: [json.loads(e) for e in events_file.readlines()]})
events.save(dump_dir=os.path.dirname(events_file.name))
@root.group('es')
@click.option('--elasticsearch-url', '-e', default=getdefault("elasticsearch_url"))
@click.option('--cloud-id', default=getdefault("cloud_id"))
@click.option('--es-user', '-u', default=getdefault("es_user"))
@click.option('--es-password', '-p', default=getdefault("es_password"))
@click.option('--timeout', '-t', default=60, help='Timeout for elasticsearch client')
@add_params(*elasticsearch_options)
@click.pass_context
def es_group(ctx: click.Context, **es_kwargs):
def es_group(ctx: click.Context, **kwargs):
"""Commands for integrating with Elasticsearch."""
ctx.ensure_object(dict)
@@ -193,38 +349,30 @@ def es_group(ctx: click.Context, **es_kwargs):
click.echo(format_command_options(ctx))
else:
if not (es_kwargs['cloud_id'] or es_kwargs['elasticsearch_url']):
client_error("Missing required --cloud-id or --elasticsearch-url")
# don't prompt for these until there's a cloud id or elasticsearch URL
es_kwargs['es_user'] = es_kwargs['es_user'] or click.prompt("es_user")
es_kwargs['es_password'] = es_kwargs['es_password'] or click.prompt("es_password", hide_input=True)
try:
client = get_es_client(use_ssl=True, **es_kwargs)
ctx.obj['es'] = client
except AuthenticationException as e:
error_msg = f'Failed authentication for {es_kwargs.get("elasticsearch_url") or es_kwargs.get("cloud_id")}'
client_error(error_msg, e, ctx=ctx, err=True)
ctx.obj['es'] = get_elasticsearch_client(ctx=ctx, **kwargs)
@es_group.command('collect-events')
@click.argument('agent-hostname')
@click.argument('host-id')
@click.option('--query', '-q', help='KQL query to scope search')
@click.option('--index', '-i', multiple=True, help='Index(es) to search against (default: all indexes)')
@click.option('--agent-type', '-a', help='Restrict results to a source type (agent.type) ex: auditbeat')
@click.option('--rta-name', '-r', help='Name of RTA in order to save events directly to unit tests data directory')
@click.option('--rule-id', help='Updates rule mapping in rule-mapping.yml file (requires --rta-name)')
@click.option('--view-events', is_flag=True, help='Print events after saving')
@click.pass_context
def collect_events(ctx, agent_hostname, index, agent_type, rta_name, rule_id, view_events):
def collect_events(ctx, host_id, query, index, rta_name, rule_id, view_events):
"""Collect events from Elasticsearch."""
match = {'agent.type': agent_type} if agent_type else {}
client = ctx.obj['es']
dsl = kql.to_dsl(query) if query else MATCH_ALL
dsl['bool'].setdefault('filter', []).append({'bool': {'should': [{'match_phrase': {'host.id': host_id}}]}})
try:
collector = CollectEvents(client)
events = collector.run(agent_hostname, index, **match)
events.save(rta_name)
collector = CollectRtaEvents(client)
start = time.time()
click.pause('Press any key once detonation is complete ...')
start_time = f'now-{round(time.time() - start) + 5}s'
events = collector.run(dsl, index or '*', start_time)
events.save(rta_name=rta_name, host_id=host_id)
if rta_name and rule_id:
events.evaluate_against_rule_and_update_mapping(rule_id, rta_name)
+46 -17
View File
@@ -4,20 +4,31 @@
"""Kibana cli commands."""
import click
from kibana import Kibana, RuleResource
import kql
from kibana import Kibana, Signal, RuleResource
from .main import root
from .misc import client_error, getdefault
from .misc import add_params, client_error, kibana_options
from .rule_loader import load_rule_files, load_rules
from .utils import format_command_options
def get_kibana_client(cloud_id, kibana_url, kibana_user, kibana_password, **kwargs):
"""Get an authenticated Kibana client."""
if not (cloud_id or kibana_url):
client_error("Missing required --cloud-id or --kibana-url")
# don't prompt for these until there's a cloud id or Kibana URL
kibana_user = kibana_user or click.prompt("kibana_user")
kibana_password = kibana_password or click.prompt("kibana_password", hide_input=True)
with Kibana(cloud_id=cloud_id, kibana_url=kibana_url, **kwargs) as kibana:
kibana.login(kibana_user, kibana_password)
return kibana
@root.group('kibana')
@click.option('--kibana-url', '-k', default=getdefault('kibana_url'))
@click.option('--cloud-id', default=getdefault('cloud_id'))
@click.option('--kibana-user', '-u', default=getdefault('kibana_user'))
@click.option('--kibana-password', '-p', default=getdefault('kibana_password'))
@click.option('--space', default=None)
@add_params(*kibana_options)
@click.pass_context
def kibana_group(ctx: click.Context, **kibana_kwargs):
"""Commands for integrating with Kibana."""
@@ -29,16 +40,7 @@ def kibana_group(ctx: click.Context, **kibana_kwargs):
click.echo(format_command_options(ctx))
else:
if not (kibana_kwargs['cloud_id'] or kibana_kwargs['kibana_url']):
client_error("Missing required --cloud-id or --kibana-url")
# don't prompt for these until there's a cloud id or Kibana URL
kibana_user = kibana_kwargs.pop('kibana_user', None) or click.prompt("kibana_user")
kibana_password = kibana_kwargs.pop('kibana_password', None) or click.prompt("kibana_password", hide_input=True)
with Kibana(**kibana_kwargs) as kibana:
kibana.login(kibana_user, kibana_password)
ctx.obj['kibana'] = kibana
ctx.obj['kibana'] = get_kibana_client(**kibana_kwargs)
@kibana_group.command("upload-rule")
@@ -73,3 +75,30 @@ def upload_rule(ctx, toml_files):
with kibana:
rules = RuleResource.bulk_create(api_payloads)
click.echo(f"Successfully uploaded {len(rules)} rules")
@kibana_group.command('search-alerts')
@click.argument('query', required=False)
@click.option('--date-range', '-d', type=(str, str), default=('now-7d', 'now'), help='Date range to scope search')
@click.option('--columns', '-c', multiple=True, help='Columns to display in table')
@click.option('--extend', '-e', is_flag=True, help='If columns are specified, extend the original columns')
@click.pass_context
def search_alerts(ctx, query, date_range, columns, extend):
"""Search detection engine alerts with KQL."""
from eql.table import Table
from .eswrap import MATCH_ALL, add_range_to_dsl
kibana = ctx.obj['kibana']
start_time, end_time = date_range
kql_query = kql.to_dsl(query) if query else MATCH_ALL
add_range_to_dsl(kql_query['bool'].setdefault('filter', []), start_time, end_time)
with kibana:
alerts = [a['_source'] for a in Signal.search({'query': kql_query})['hits']['hits']]
table_columns = ['host.hostname', 'signal.rule.name', 'signal.status', 'signal.original_time']
if columns:
columns = list(columns)
table_columns = table_columns + columns if extend else columns
click.echo(Table.from_list(table_columns, alerts))
return alerts
+94 -5
View File
@@ -8,11 +8,12 @@ import os
import re
import time
import uuid
from functools import wraps
import click
import requests
from .utils import cached, get_path
from .utils import add_params, cached, get_path
_CONFIG = {}
@@ -37,12 +38,13 @@ class ClientError(click.ClickException):
def __init__(self, message, original_error=None):
super(ClientError, self).__init__(message)
self.original_error = original_error
self.original_error_type = type(original_error).__name__ if original_error else ''
def show(self, file=None, err=True):
"""Print the error to the console."""
err = f' ({self.original_error})' if self.original_error else ''
click.echo(f'{click.style(f"CLI Error{err}", fg="red", bold=True)}: {self.format_message()}',
err=err, file=file)
err = f' {self.original_error_type}' if self.original_error else ''
msg = f'{click.style(f"CLI Error{self.original_error_type}", fg="red", bold=True)}: {self.format_message()}'
click.echo(msg, err=err, file=file)
def client_error(message, exc: Exception = None, debug=None, ctx: click.Context = None, file=None, err=None):
@@ -53,7 +55,7 @@ def client_error(message, exc: Exception = None, debug=None, ctx: click.Context
click.echo(click.style('DEBUG: ', fg='yellow') + message, err=err, file=file)
raise
else:
raise ClientError(message, original_error=exc and type(exc).__name__)
raise ClientError(message, original_error=exc)
def nested_get(_dict, dot_key, default=None):
@@ -246,3 +248,90 @@ def getdefault(name):
envvar = f"DR_{name.upper()}"
config = parse_config()
return lambda: os.environ.get(envvar, config.get(name))
client_options = {
'kibana': {
'kibana_url': click.Option(['--kibana-url'], default=getdefault('kibana_url')),
'cloud_id': click.Option(['--cloud-id'], default=getdefault('cloud_id')),
'kibana_user': click.Option(['--kibana-user', '-ku'], default=getdefault('kibana_user')),
'kibana_password': click.Option(['--kibana-password', '-kp'], default=getdefault('kibana_password')),
'space': click.Option(['--space'], default=None, help='Kibana space')
},
'elasticsearch': {
'elasticsearch_url': click.Option(['--elasticsearch-url'], default=getdefault("elasticsearch_url")),
'cloud_id': click.Option(['--cloud-id'], default=getdefault("cloud_id")),
'es_user': click.Option(['--es-user', '-eu'], default=getdefault("es_user")),
'es_password': click.Option(['--es-password', '-ep'], default=getdefault("es_password")),
'timeout': click.Option(['--timeout', '-et'], default=60, help='Timeout for elasticsearch client')
}
}
kibana_options = list(client_options['kibana'].values())
elasticsearch_options = list(client_options['elasticsearch'].values())
def add_client(*client_type, add_to_ctx=True):
"""Wrapper to add authed client."""
from elasticsearch import Elasticsearch, ElasticsearchException
from kibana import Kibana
from .eswrap import get_elasticsearch_client
from .kbwrap import get_kibana_client
def _wrapper(func):
client_ops_dict = {}
client_ops_keys = {}
for c_type in client_type:
ops = client_options.get(c_type)
client_ops_dict.update(ops)
client_ops_keys[c_type] = list(ops)
if not client_ops_dict:
raise ValueError(f'Unknown client: {client_type} in {func.__name__}')
client_ops = list(client_ops_dict.values())
@wraps(func)
@add_params(*client_ops)
def _wrapped(*args, **kwargs):
ctx: click.Context = next((a for a in args if isinstance(a, click.Context)), None)
es_client_args = {k: kwargs.pop(k, None) for k in client_ops_keys.get('elasticsearch', [])}
# shared args like cloud_id
kibana_client_args = {k: kwargs.pop(k, es_client_args.get(k)) for k in client_ops_keys.get('kibana', [])}
if 'elasticsearch' in client_type:
# for nested ctx invocation, no need to re-auth if an existing client is already passed
elasticsearch_client: Elasticsearch = kwargs.get('elasticsearch_client')
try:
if elasticsearch_client and isinstance(elasticsearch_client, Elasticsearch) and \
elasticsearch_client.info():
pass
else:
elasticsearch_client = get_elasticsearch_client(use_ssl=True, **es_client_args)
except ElasticsearchException:
elasticsearch_client = get_elasticsearch_client(use_ssl=True, **es_client_args)
kwargs['elasticsearch_client'] = elasticsearch_client
if ctx and add_to_ctx:
ctx.obj['es'] = elasticsearch_client
if 'kibana' in client_type:
# for nested ctx invocation, no need to re-auth if an existing client is already passed
kibana_client: Kibana = kwargs.get('kibana_client')
try:
with kibana_client:
if kibana_client and isinstance(kibana_client, Kibana) and kibana_client.version:
pass
else:
kibana_client = get_kibana_client(**kibana_client_args)
except (requests.HTTPError, AttributeError):
kibana_client = get_kibana_client(**kibana_client_args)
kwargs['kibana_client'] = kibana_client
if ctx and add_to_ctx:
ctx.obj['kibana'] = kibana_client
return func(*args, **kwargs)
return _wrapped
return _wrapper
+11
View File
@@ -252,3 +252,14 @@ def format_command_options(ctx):
formatter.write_dl(opts)
return formatter.getvalue()
def add_params(*params):
"""Add parameters to a click command."""
def decorator(f):
if not hasattr(f, '__click_params__'):
f.__click_params__ = []
f.__click_params__.extend(params)
return f
return decorator
+22 -4
View File
@@ -29,7 +29,7 @@ class Kibana(object):
self.cloud_id = cloud_id
self.kibana_url = kibana_url
self.elastic_url = None
self.space = space
self.space = space if space and space.lower() != 'default' else None
self.status = None
if self.cloud_id:
@@ -62,7 +62,7 @@ class Kibana(object):
uri = "s/{}/{}".format(self.space, uri)
return f"{self.kibana_url}/{uri}"
def request(self, method, uri, params=None, data=None, error=True, verbose=True):
def request(self, method, uri, params=None, data=None, error=True, verbose=True, raw=False):
"""Perform a RESTful HTTP request with JSON responses."""
params = params or {}
url = self.url(uri)
@@ -83,7 +83,7 @@ class Kibana(object):
if not response.content:
return
return response.json()
return response.content if raw else response.json()
def get(self, uri, params=None, data=None, error=True, **kwargs):
"""Perform an HTTP GET."""
@@ -120,6 +120,10 @@ class Kibana(object):
else:
raise
# Kibana will authenticate against URLs which contain invalid spaces
if self.space:
self.verify_space(self.space)
self.authenticated = True
self.status = self.get("/api/status")
@@ -133,7 +137,14 @@ class Kibana(object):
def logout(self):
"""Quit the current session."""
# TODO: implement session logout
try:
self.get('/logout', raw=True, error=False)
except requests.exceptions.ConnectionError:
# for really short scoping from buildup to teardown, ES will cause a Max retry error
pass
self.status = None
self.authenticated = False
self.session = requests.Session()
self.elasticsearch = None
def __del__(self):
@@ -161,3 +172,10 @@ class Kibana(object):
raise RuntimeError("No Kibana connector in scope!")
return stack[-1]
def verify_space(self, space):
"""Verify a space is valid."""
spaces = self.get('/api/spaces/space')
space_names = [s['name'] for s in spaces]
if space not in space_names:
raise ValueError(f'Unknown Kibana space: {space}')
+1 -1
View File
@@ -6,7 +6,7 @@ requests==2.22.0
Click==7.0
PyYAML~=5.3
eql~=0.9.5
elasticsearch~=7.5.1
elasticsearch~=7.9
XlsxWriter==1.3.6
# test deps
+2 -2
View File
@@ -8,7 +8,7 @@ import time
import unittest
from detection_rules.utils import normalize_timing_and_sort, cached
from detection_rules.eswrap import Events
from detection_rules.eswrap import RtaEvents
from detection_rules.ecs import get_kql_schema
@@ -56,7 +56,7 @@ class TestTimeUtils(unittest.TestCase):
"""Test that events are normalized properly within Events."""
events_data = self.get_events()
for date_format, events in events_data.items():
normalized = Events('_', {'winlogbeat': events})
normalized = RtaEvents({'winlogbeat': events})
self.assert_sort(normalized.events['winlogbeat'], date_format)
def test_schema_multifields(self):