Files
sigma-rules/detection_rules/misc.py
T
2020-07-08 18:58:00 -05:00

212 lines
7.3 KiB
Python

# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.
"""Misc support."""
import json
import os
import re
import time
import uuid
import click
import requests
from .utils import ROOT_DIR
_CONFIG = {}
LICENSE_HEADER = """
Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
or more contributor license agreements. Licensed under the Elastic License;
you may not use this file except in compliance with the Elastic License.
""".strip()
LICENSE_LINES = LICENSE_HEADER.splitlines()
PYTHON_LICENSE = "\n".join("# " + line for line in LICENSE_LINES)
JS_LICENSE = """
/*
{}
*/
""".strip().format("\n".join(' * ' + line for line in LICENSE_LINES))
def nested_get(_dict, dot_key, default=None):
"""Get a nested field from a nested dict with dot notation."""
if _dict is None or dot_key is None:
return default
elif '.' in dot_key and isinstance(_dict, dict):
dot_key = dot_key.split('.')
this_key = dot_key.pop(0)
return nested_get(_dict.get(this_key, default), '.'.join(dot_key), default)
else:
return _dict.get(dot_key, default)
def nested_set(_dict, dot_key, value):
"""Set a nested field from a a key in dot notation."""
for key in dot_key.split('.')[:-1]:
_dict = _dict.setdefault(key, {})
if isinstance(_dict, dict):
_dict[dot_key[-1]] = value
else:
raise ValueError('dict cannot set a value to a non-dict for {}'.format(dot_key))
def schema_prompt(name, value=None, required=False, **options):
"""Interactively prompt based on schema requirements."""
name = str(name)
field_type = options.get('type')
pattern = options.get('pattern')
enum = options.get('enum', [])
minimum = options.get('minimum')
maximum = options.get('maximum')
min_item = options.get('min_items', 0)
max_items = options.get('max_items', 9999)
default = options.get('default')
if default is not None and str(default).lower() in ('true', 'false'):
default = str(default).lower()
if 'date' in name:
default = time.strftime('%Y/%m/%d')
if name == 'rule_id':
default = str(uuid.uuid4())
def _check_type(_val):
if field_type in ('number', 'integer') and not str(_val).isdigit():
print('Number expected but got: {}'.format(_val))
return False
if pattern and (not re.match(pattern, _val) or len(re.match(pattern, _val).group(0)) != len(_val)):
print('{} did not match pattern: {}!'.format(_val, pattern))
return False
if enum and _val not in enum:
print('{} not in valid options: {}'.format(_val, ', '.join(enum)))
return False
if minimum and (type(_val) == int and int(_val) < minimum):
print('{} is less than the minimum: {}'.format(str(_val), str(minimum)))
return False
if maximum and (type(_val) == int and int(_val) > maximum):
print('{} is greater than the maximum: {}'.format(str(_val), str(maximum)))
return False
if field_type == 'boolean' and _val.lower() not in ('true', 'false'):
print('Boolean expected but got: {}'.format(str(_val)))
return False
return True
def _convert_type(_val):
if field_type == 'boolean' and not type(_val) == bool:
_val = True if _val.lower() == 'true' else False
return int(_val) if field_type in ('number', 'integer') else _val
prompt = '{name}{default}{required}{multi}'.format(
name=name,
default=' [{}] ("n/a" to leave blank) '.format(default) if default else '',
required=' (required) ' if required else '',
multi=' (multi, comma separated) ' if field_type == 'array' else '').strip() + ': '
while True:
result = value or input(prompt) or default
if result == 'n/a':
result = None
if not result:
if required:
value = None
continue
else:
return
if field_type == 'array':
result_list = result.split(',')
if not (min_item < len(result_list) < max_items):
if required:
value = None
break
else:
return []
for value in result_list:
if not _check_type(value):
if required:
value = None
break
else:
return []
return [_convert_type(r) for r in result_list]
else:
if _check_type(result):
return _convert_type(result)
elif required:
value = None
continue
return
def get_kibana_rules_map(branch='master'):
"""Get list of available rules from the Kibana repo and return a list of URLs."""
r = requests.get('https://api.github.com/repos/elastic/kibana/branches?per_page=1000')
branch_names = [b['name'] for b in r.json()]
if branch not in branch_names:
raise ValueError('branch "{}" does not exist in kibana'.format(branch))
url = ('https://api.github.com/repos/elastic/kibana/contents/x-pack/{legacy}plugins/siem/server/lib/'
'detection_engine/rules/prepackaged_rules?ref={branch}')
gh_rules = requests.get(url.format(legacy='', branch=branch)).json()
# pre-7.8 the siem was under the legacy directory
if isinstance(gh_rules, dict) and gh_rules.get('message', '') == 'Not Found':
gh_rules = requests.get(url.format(legacy='legacy/', branch=branch)).json()
return {os.path.splitext(r['name'])[0]: r['download_url'] for r in gh_rules if r['name'].endswith('.json')}
def get_kibana_rules(*rule_paths, branch='master', verbose=True):
"""Retrieve prepackaged rules from kibana repo."""
if verbose:
click.echo('Downloading rules from {} branch in kibana repo...'.format(branch))
if rule_paths:
rule_paths = [os.path.splitext(os.path.basename(p))[0] for p in rule_paths]
return {n: requests.get(r).json() for n, r in get_kibana_rules_map(branch).items() if n in rule_paths}
else:
return {n: requests.get(r).json() for n, r in get_kibana_rules_map(branch).items()}
def parse_config():
"""Parse a default config file."""
global _CONFIG
if not _CONFIG:
config_file = os.path.join(ROOT_DIR, '.siem-rules-cfg.json')
if os.path.exists(config_file):
with open(config_file) as f:
_CONFIG = json.load(f)
click.secho('Loaded config file: {}'.format(config_file), fg='yellow')
return _CONFIG
def set_param_values(ctx, param, value):
"""Get value for defined key."""
key = param.name
config = parse_config()
env_key = 'SR_' + key
prompt = True if param.hide_input is not False else False
if value:
return value
elif os.environ.get(env_key):
return os.environ[env_key]
elif config.get(key):
return config[key]
elif prompt:
return click.prompt(key, default=param.default if not param.default else None, hide_input=param.hide_input,
show_default=True if param.default else False)