0455307577
* Downgrade version before uploading to Kibana * Update downgrade exception format * Update s/siem/detection Co-authored-by: Justin Ibarra <brokensound77@users.noreply.github.com>
212 lines
7.3 KiB
Python
212 lines
7.3 KiB
Python
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
|
# or more contributor license agreements. Licensed under the Elastic License;
|
|
# you may not use this file except in compliance with the Elastic License.
|
|
|
|
"""Misc support."""
|
|
import json
|
|
import os
|
|
import re
|
|
import time
|
|
import uuid
|
|
|
|
import click
|
|
import requests
|
|
|
|
from .utils import ROOT_DIR
|
|
|
|
_CONFIG = {}
|
|
|
|
LICENSE_HEADER = """
|
|
Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
|
or more contributor license agreements. Licensed under the Elastic License;
|
|
you may not use this file except in compliance with the Elastic License.
|
|
""".strip()
|
|
|
|
LICENSE_LINES = LICENSE_HEADER.splitlines()
|
|
PYTHON_LICENSE = "\n".join("# " + line for line in LICENSE_LINES)
|
|
JS_LICENSE = """
|
|
/*
|
|
{}
|
|
*/
|
|
""".strip().format("\n".join(' * ' + line for line in LICENSE_LINES))
|
|
|
|
|
|
def nested_get(_dict, dot_key, default=None):
|
|
"""Get a nested field from a nested dict with dot notation."""
|
|
if _dict is None or dot_key is None:
|
|
return default
|
|
elif '.' in dot_key and isinstance(_dict, dict):
|
|
dot_key = dot_key.split('.')
|
|
this_key = dot_key.pop(0)
|
|
return nested_get(_dict.get(this_key, default), '.'.join(dot_key), default)
|
|
else:
|
|
return _dict.get(dot_key, default)
|
|
|
|
|
|
def nested_set(_dict, dot_key, value):
|
|
"""Set a nested field from a a key in dot notation."""
|
|
for key in dot_key.split('.')[:-1]:
|
|
_dict = _dict.setdefault(key, {})
|
|
|
|
if isinstance(_dict, dict):
|
|
_dict[dot_key[-1]] = value
|
|
else:
|
|
raise ValueError('dict cannot set a value to a non-dict for {}'.format(dot_key))
|
|
|
|
|
|
def schema_prompt(name, value=None, required=False, **options):
|
|
"""Interactively prompt based on schema requirements."""
|
|
name = str(name)
|
|
field_type = options.get('type')
|
|
pattern = options.get('pattern')
|
|
enum = options.get('enum', [])
|
|
minimum = options.get('minimum')
|
|
maximum = options.get('maximum')
|
|
min_item = options.get('min_items', 0)
|
|
max_items = options.get('max_items', 9999)
|
|
|
|
default = options.get('default')
|
|
if default is not None and str(default).lower() in ('true', 'false'):
|
|
default = str(default).lower()
|
|
|
|
if 'date' in name:
|
|
default = time.strftime('%Y/%m/%d')
|
|
|
|
if name == 'rule_id':
|
|
default = str(uuid.uuid4())
|
|
|
|
def _check_type(_val):
|
|
if field_type in ('number', 'integer') and not str(_val).isdigit():
|
|
print('Number expected but got: {}'.format(_val))
|
|
return False
|
|
if pattern and (not re.match(pattern, _val) or len(re.match(pattern, _val).group(0)) != len(_val)):
|
|
print('{} did not match pattern: {}!'.format(_val, pattern))
|
|
return False
|
|
if enum and _val not in enum:
|
|
print('{} not in valid options: {}'.format(_val, ', '.join(enum)))
|
|
return False
|
|
if minimum and (type(_val) == int and int(_val) < minimum):
|
|
print('{} is less than the minimum: {}'.format(str(_val), str(minimum)))
|
|
return False
|
|
if maximum and (type(_val) == int and int(_val) > maximum):
|
|
print('{} is greater than the maximum: {}'.format(str(_val), str(maximum)))
|
|
return False
|
|
if field_type == 'boolean' and _val.lower() not in ('true', 'false'):
|
|
print('Boolean expected but got: {}'.format(str(_val)))
|
|
return False
|
|
return True
|
|
|
|
def _convert_type(_val):
|
|
if field_type == 'boolean' and not type(_val) == bool:
|
|
_val = True if _val.lower() == 'true' else False
|
|
return int(_val) if field_type in ('number', 'integer') else _val
|
|
|
|
prompt = '{name}{default}{required}{multi}'.format(
|
|
name=name,
|
|
default=' [{}] ("n/a" to leave blank) '.format(default) if default else '',
|
|
required=' (required) ' if required else '',
|
|
multi=' (multi, comma separated) ' if field_type == 'array' else '').strip() + ': '
|
|
|
|
while True:
|
|
result = value or input(prompt) or default
|
|
if result == 'n/a':
|
|
result = None
|
|
|
|
if not result:
|
|
if required:
|
|
value = None
|
|
continue
|
|
else:
|
|
return
|
|
|
|
if field_type == 'array':
|
|
result_list = result.split(',')
|
|
|
|
if not (min_item < len(result_list) < max_items):
|
|
if required:
|
|
value = None
|
|
break
|
|
else:
|
|
return []
|
|
|
|
for value in result_list:
|
|
if not _check_type(value):
|
|
if required:
|
|
value = None
|
|
break
|
|
else:
|
|
return []
|
|
return [_convert_type(r) for r in result_list]
|
|
else:
|
|
if _check_type(result):
|
|
return _convert_type(result)
|
|
elif required:
|
|
value = None
|
|
continue
|
|
return
|
|
|
|
|
|
def get_kibana_rules_map(branch='master'):
|
|
"""Get list of available rules from the Kibana repo and return a list of URLs."""
|
|
r = requests.get('https://api.github.com/repos/elastic/kibana/branches?per_page=1000')
|
|
branch_names = [b['name'] for b in r.json()]
|
|
if branch not in branch_names:
|
|
raise ValueError('branch "{}" does not exist in kibana'.format(branch))
|
|
|
|
url = ('https://api.github.com/repos/elastic/kibana/contents/x-pack/{legacy}plugins/siem/server/lib/'
|
|
'detection_engine/rules/prepackaged_rules?ref={branch}')
|
|
|
|
gh_rules = requests.get(url.format(legacy='', branch=branch)).json()
|
|
|
|
# pre-7.8 the siem was under the legacy directory
|
|
if isinstance(gh_rules, dict) and gh_rules.get('message', '') == 'Not Found':
|
|
gh_rules = requests.get(url.format(legacy='legacy/', branch=branch)).json()
|
|
|
|
return {os.path.splitext(r['name'])[0]: r['download_url'] for r in gh_rules if r['name'].endswith('.json')}
|
|
|
|
|
|
def get_kibana_rules(*rule_paths, branch='master', verbose=True):
|
|
"""Retrieve prepackaged rules from kibana repo."""
|
|
if verbose:
|
|
click.echo('Downloading rules from {} branch in kibana repo...'.format(branch))
|
|
|
|
if rule_paths:
|
|
rule_paths = [os.path.splitext(os.path.basename(p))[0] for p in rule_paths]
|
|
return {n: requests.get(r).json() for n, r in get_kibana_rules_map(branch).items() if n in rule_paths}
|
|
else:
|
|
return {n: requests.get(r).json() for n, r in get_kibana_rules_map(branch).items()}
|
|
|
|
|
|
def parse_config():
|
|
"""Parse a default config file."""
|
|
global _CONFIG
|
|
|
|
if not _CONFIG:
|
|
config_file = os.path.join(ROOT_DIR, '.detection-rules-cfg.json')
|
|
|
|
if os.path.exists(config_file):
|
|
with open(config_file) as f:
|
|
_CONFIG = json.load(f)
|
|
|
|
click.secho('Loaded config file: {}'.format(config_file), fg='yellow')
|
|
|
|
return _CONFIG
|
|
|
|
|
|
def set_param_values(ctx, param, value):
|
|
"""Get value for defined key."""
|
|
key = param.name
|
|
config = parse_config()
|
|
env_key = 'DR_' + key.upper()
|
|
prompt = True if param.hide_input is not False else False
|
|
|
|
if value:
|
|
return value
|
|
elif os.environ.get(env_key):
|
|
return os.environ[env_key]
|
|
elif config.get(key) is not None:
|
|
return config[key]
|
|
elif prompt:
|
|
return click.prompt(key, default=param.default if not param.default else None, hide_input=param.hide_input,
|
|
show_default=True if param.default else False)
|