Add better CLI support for handling Kibana exported rules (#83)

This commit is contained in:
Justin Ibarra
2020-07-27 23:31:19 -05:00
committed by GitHub
parent d15da0ada1
commit 8f5ddbb121
3 changed files with 84 additions and 38 deletions
+38 -25
View File
@@ -7,6 +7,7 @@ import glob
import io
import json
import os
import re
import shutil
import subprocess
@@ -21,7 +22,7 @@ from .packaging import PACKAGE_FILE, Package, manage_versions, RELEASE_DIR
from .rule import Rule
from .rule_formatter import toml_write
from .schemas import CurrentSchema
from .utils import get_path, clear_caches
from .utils import get_path, clear_caches, load_rule_contents
RULES_DIR = get_path('rules')
@@ -39,33 +40,36 @@ def root():
@click.option('--rule-type', '-t', type=click.Choice(CurrentSchema.RULE_TYPES), help='Type of rule to create')
def create_rule(path, config, required_only, rule_type):
"""Create a detection rule."""
config = load_dump(config) if config else {}
contents = load_rule_contents(config, single_only=True)[0] if config else {}
try:
return Rule.build(path, rule_type=rule_type, required_only=required_only, save=True, **config)
return Rule.build(path, rule_type=rule_type, required_only=required_only, save=True, **contents)
finally:
rule_loader.reset()
@root.command('load-from-file')
@root.command('import-rules')
@click.argument('infile', type=click.Path(dir_okay=False, exists=True), nargs=-1, required=False)
@click.option('--directory', '-d', type=click.Path(file_okay=False, exists=True), help='Load files from a directory')
def load_from_file(infile, directory):
"""Load rules from file(s)."""
if infile:
for rule_file in infile:
rule_path = os.path.join(RULES_DIR, os.path.basename(rule_file))
rule = Rule(rule_path, load_dump(rule_file))
rule.save(as_rule=True, verbose=True)
elif directory:
for rule_file in glob.glob(os.path.join(directory, '**', '*.*'), recursive=True):
try:
rule_path = os.path.join(RULES_DIR, os.path.basename(rule_file))
rule = Rule(rule_path, load_dump(rule_file))
rule.save(as_rule=True, verbose=True)
except ValueError:
click.echo('Unable to load file: {}'.format(rule_file))
else:
click.echo('No files specified!')
def import_rules(infile, directory):
"""Import rules from json, toml, or Kibana exported rule file(s)."""
rule_files = glob.glob(os.path.join(directory, '**', '*.*'), recursive=True) if directory else []
rule_files = sorted(set(rule_files + list(infile)))
rule_contents = []
for rule_file in rule_files:
rule_contents.extend(load_rule_contents(rule_file))
if not rule_contents:
click.echo('Must specify at least one file!')
def name_to_filename(name):
return re.sub(r'[^_a-z0-9]+', '_', name.strip().lower()).strip('_') + '.toml'
for contents in rule_contents:
base_path = contents.get('name') or contents.get('rule', {}).get('name')
base_path = name_to_filename(base_path) if base_path else base_path
rule_path = os.path.join(RULES_DIR, base_path) if base_path else None
Rule.build(rule_path, required_only=True, save=True, verbose=True, **contents)
@root.command('toml-lint')
@@ -120,19 +124,28 @@ def mass_update(ctx, query, field):
@click.argument('rule-id', required=False)
@click.option('--rule-file', '-f', type=click.Path(dir_okay=False), help='Optionally view a rule from a specified file')
@click.option('--as-api/--as-rule', default=True, help='Print the rule in final api or rule format')
def view_rule(rule_id, rule_file, as_api):
@click.pass_context
def view_rule(ctx, rule_id, rule_file, as_api):
"""View an internal rule or specified rule file."""
rule = None
if rule_id:
rule = rule_loader.get_rule(rule_id, verbose=False)
elif rule_file:
rule = Rule(rule_file, load_dump(rule_file))
contents = {k: v for k, v in load_rule_contents(rule_file, single_only=True)[0].items() if v}
try:
rule = Rule(rule_file, contents)
except jsonschema.ValidationError as e:
click.secho(e.args[0], fg='red')
ctx.exit(1)
else:
click.secho('Unknown rule!', fg='red')
return
ctx.exit(1)
if not rule:
click.secho('Unknown format!', fg='red')
return
ctx.exit(1)
click.echo(toml_write(rule.rule_format()) if not as_api else json.dumps(rule.contents, indent=2, sort_keys=True))
+19 -12
View File
@@ -208,14 +208,22 @@ class Rule(object):
return hashlib.sha256(contents).hexdigest()
@classmethod
def build(cls, path=None, rule_type=None, required_only=True, save=True, **kwargs):
def build(cls, path=None, rule_type=None, required_only=True, save=True, verbose=False, **kwargs):
"""Build a rule from data and prompts."""
from .misc import schema_prompt
if verbose and path:
click.echo(f'[+] Building rule for {path}')
kwargs = copy.deepcopy(kwargs)
rule_type = click.prompt('Rule type ({})'.format(', '.join(CurrentSchema.RULE_TYPES)),
type=click.Choice(CurrentSchema.RULE_TYPES))
if 'rule' in kwargs and 'metadata' in kwargs:
kwargs.update(kwargs.pop('metadata'))
kwargs.update(kwargs.pop('rule'))
rule_type = rule_type or kwargs.get('type') or \
click.prompt('Rule type ({})'.format(', '.join(CurrentSchema.RULE_TYPES)),
type=click.Choice(CurrentSchema.RULE_TYPES))
schema = CurrentSchema.get_schema(role=rule_type)
props = schema['properties']
@@ -256,15 +264,15 @@ class Rule(object):
continue
if name == 'threshold':
contents[name] = {n: schema_prompt(f'threshold {n}', required=n in options['required'], **opts)
contents[name] = {n: schema_prompt(f'threshold {n}', required=n in options['required'], **opts.copy())
for n, opts in options['properties'].items()}
continue
if kwargs.get(name):
contents[name] = schema_prompt(kwargs.pop(name))
contents[name] = schema_prompt(name, value=kwargs.pop(name))
continue
result = schema_prompt(name, required=name in opt_reqs, **options)
result = schema_prompt(name, required=name in opt_reqs, **options.copy())
if result:
if name not in opt_reqs and result == options.get('default', ''):
@@ -274,13 +282,12 @@ class Rule(object):
contents[name] = result
metadata = {}
ecs_version = schema_prompt('ecs_version', required=False, value=None,
**TomlMetadata.get_schema()['properties']['ecs_version'])
if ecs_version:
metadata['ecs_version'] = ecs_version
# validate before creating
CurrentSchema.toml_schema().validate(contents)
if not required_only:
ecs_version = schema_prompt('ecs_version', required=False, value=None,
**TomlMetadata.get_schema()['properties']['ecs_version'])
if ecs_version:
metadata['ecs_version'] = ecs_version
suggested_path = os.path.join(RULES_DIR, contents['name']) # TODO: UPDATE BASED ON RULE STRUCTURE
path = os.path.realpath(path or input('File path for rule [{}]: '.format(suggested_path)) or suggested_path)
+27 -1
View File
@@ -16,7 +16,7 @@ from datetime import datetime, date
import kql
import eql.utils
from eql.utils import stream_json_lines
from eql.utils import load_dump, stream_json_lines
CURR_DIR = os.path.dirname(os.path.abspath(__file__))
ROOT_DIR = os.path.dirname(CURR_DIR)
@@ -192,3 +192,29 @@ def cached(f):
def clear_caches():
_cache.clear()
def load_rule_contents(rule_file: str, single_only=False) -> list:
"""Load a rule file from multiple formats."""
_, extension = os.path.splitext(rule_file)
if extension in ('.ndjson', '.jsonl'):
# kibana exported rule object is ndjson with the export metadata on the last line
with open(rule_file, 'r') as f:
contents = [json.loads(line) for line in f.readlines()]
if len(contents) > 1 and 'exported_count' in contents[-1]:
contents.pop(-1)
if single_only and len(contents) > 1:
raise ValueError('Multiple rules not allowed')
return contents or [{}]
else:
rule = load_dump(rule_file)
if isinstance(rule, dict):
return [rule]
elif isinstance(rule, list):
return rule
else:
raise ValueError(f"Expected a list or dictionary in {rule_file}")