diff --git a/detection_rules/main.py b/detection_rules/main.py index f87bacb1b..9ca99a59b 100644 --- a/detection_rules/main.py +++ b/detection_rules/main.py @@ -7,6 +7,7 @@ import glob import io import json import os +import re import shutil import subprocess @@ -21,7 +22,7 @@ from .packaging import PACKAGE_FILE, Package, manage_versions, RELEASE_DIR from .rule import Rule from .rule_formatter import toml_write from .schemas import CurrentSchema -from .utils import get_path, clear_caches +from .utils import get_path, clear_caches, load_rule_contents RULES_DIR = get_path('rules') @@ -39,33 +40,36 @@ def root(): @click.option('--rule-type', '-t', type=click.Choice(CurrentSchema.RULE_TYPES), help='Type of rule to create') def create_rule(path, config, required_only, rule_type): """Create a detection rule.""" - config = load_dump(config) if config else {} + contents = load_rule_contents(config, single_only=True)[0] if config else {} try: - return Rule.build(path, rule_type=rule_type, required_only=required_only, save=True, **config) + return Rule.build(path, rule_type=rule_type, required_only=required_only, save=True, **contents) finally: rule_loader.reset() -@root.command('load-from-file') +@root.command('import-rules') @click.argument('infile', type=click.Path(dir_okay=False, exists=True), nargs=-1, required=False) @click.option('--directory', '-d', type=click.Path(file_okay=False, exists=True), help='Load files from a directory') -def load_from_file(infile, directory): - """Load rules from file(s).""" - if infile: - for rule_file in infile: - rule_path = os.path.join(RULES_DIR, os.path.basename(rule_file)) - rule = Rule(rule_path, load_dump(rule_file)) - rule.save(as_rule=True, verbose=True) - elif directory: - for rule_file in glob.glob(os.path.join(directory, '**', '*.*'), recursive=True): - try: - rule_path = os.path.join(RULES_DIR, os.path.basename(rule_file)) - rule = Rule(rule_path, load_dump(rule_file)) - rule.save(as_rule=True, verbose=True) - except ValueError: - click.echo('Unable to load file: {}'.format(rule_file)) - else: - click.echo('No files specified!') +def import_rules(infile, directory): + """Import rules from json, toml, or Kibana exported rule file(s).""" + rule_files = glob.glob(os.path.join(directory, '**', '*.*'), recursive=True) if directory else [] + rule_files = sorted(set(rule_files + list(infile))) + + rule_contents = [] + for rule_file in rule_files: + rule_contents.extend(load_rule_contents(rule_file)) + + if not rule_contents: + click.echo('Must specify at least one file!') + + def name_to_filename(name): + return re.sub(r'[^_a-z0-9]+', '_', name.strip().lower()).strip('_') + '.toml' + + for contents in rule_contents: + base_path = contents.get('name') or contents.get('rule', {}).get('name') + base_path = name_to_filename(base_path) if base_path else base_path + rule_path = os.path.join(RULES_DIR, base_path) if base_path else None + Rule.build(rule_path, required_only=True, save=True, verbose=True, **contents) @root.command('toml-lint') @@ -120,19 +124,28 @@ def mass_update(ctx, query, field): @click.argument('rule-id', required=False) @click.option('--rule-file', '-f', type=click.Path(dir_okay=False), help='Optionally view a rule from a specified file') @click.option('--as-api/--as-rule', default=True, help='Print the rule in final api or rule format') -def view_rule(rule_id, rule_file, as_api): +@click.pass_context +def view_rule(ctx, rule_id, rule_file, as_api): """View an internal rule or specified rule file.""" + rule = None + if rule_id: rule = rule_loader.get_rule(rule_id, verbose=False) elif rule_file: - rule = Rule(rule_file, load_dump(rule_file)) + contents = {k: v for k, v in load_rule_contents(rule_file, single_only=True)[0].items() if v} + + try: + rule = Rule(rule_file, contents) + except jsonschema.ValidationError as e: + click.secho(e.args[0], fg='red') + ctx.exit(1) else: click.secho('Unknown rule!', fg='red') - return + ctx.exit(1) if not rule: click.secho('Unknown format!', fg='red') - return + ctx.exit(1) click.echo(toml_write(rule.rule_format()) if not as_api else json.dumps(rule.contents, indent=2, sort_keys=True)) diff --git a/detection_rules/rule.py b/detection_rules/rule.py index d692f3bf2..01cb6286e 100644 --- a/detection_rules/rule.py +++ b/detection_rules/rule.py @@ -208,14 +208,22 @@ class Rule(object): return hashlib.sha256(contents).hexdigest() @classmethod - def build(cls, path=None, rule_type=None, required_only=True, save=True, **kwargs): + def build(cls, path=None, rule_type=None, required_only=True, save=True, verbose=False, **kwargs): """Build a rule from data and prompts.""" from .misc import schema_prompt + if verbose and path: + click.echo(f'[+] Building rule for {path}') + kwargs = copy.deepcopy(kwargs) - rule_type = click.prompt('Rule type ({})'.format(', '.join(CurrentSchema.RULE_TYPES)), - type=click.Choice(CurrentSchema.RULE_TYPES)) + if 'rule' in kwargs and 'metadata' in kwargs: + kwargs.update(kwargs.pop('metadata')) + kwargs.update(kwargs.pop('rule')) + + rule_type = rule_type or kwargs.get('type') or \ + click.prompt('Rule type ({})'.format(', '.join(CurrentSchema.RULE_TYPES)), + type=click.Choice(CurrentSchema.RULE_TYPES)) schema = CurrentSchema.get_schema(role=rule_type) props = schema['properties'] @@ -256,15 +264,15 @@ class Rule(object): continue if name == 'threshold': - contents[name] = {n: schema_prompt(f'threshold {n}', required=n in options['required'], **opts) + contents[name] = {n: schema_prompt(f'threshold {n}', required=n in options['required'], **opts.copy()) for n, opts in options['properties'].items()} continue if kwargs.get(name): - contents[name] = schema_prompt(kwargs.pop(name)) + contents[name] = schema_prompt(name, value=kwargs.pop(name)) continue - result = schema_prompt(name, required=name in opt_reqs, **options) + result = schema_prompt(name, required=name in opt_reqs, **options.copy()) if result: if name not in opt_reqs and result == options.get('default', ''): @@ -274,13 +282,12 @@ class Rule(object): contents[name] = result metadata = {} - ecs_version = schema_prompt('ecs_version', required=False, value=None, - **TomlMetadata.get_schema()['properties']['ecs_version']) - if ecs_version: - metadata['ecs_version'] = ecs_version - # validate before creating - CurrentSchema.toml_schema().validate(contents) + if not required_only: + ecs_version = schema_prompt('ecs_version', required=False, value=None, + **TomlMetadata.get_schema()['properties']['ecs_version']) + if ecs_version: + metadata['ecs_version'] = ecs_version suggested_path = os.path.join(RULES_DIR, contents['name']) # TODO: UPDATE BASED ON RULE STRUCTURE path = os.path.realpath(path or input('File path for rule [{}]: '.format(suggested_path)) or suggested_path) diff --git a/detection_rules/utils.py b/detection_rules/utils.py index 2c6dc89d5..22428474a 100644 --- a/detection_rules/utils.py +++ b/detection_rules/utils.py @@ -16,7 +16,7 @@ from datetime import datetime, date import kql import eql.utils -from eql.utils import stream_json_lines +from eql.utils import load_dump, stream_json_lines CURR_DIR = os.path.dirname(os.path.abspath(__file__)) ROOT_DIR = os.path.dirname(CURR_DIR) @@ -192,3 +192,29 @@ def cached(f): def clear_caches(): _cache.clear() + + +def load_rule_contents(rule_file: str, single_only=False) -> list: + """Load a rule file from multiple formats.""" + _, extension = os.path.splitext(rule_file) + + if extension in ('.ndjson', '.jsonl'): + # kibana exported rule object is ndjson with the export metadata on the last line + with open(rule_file, 'r') as f: + contents = [json.loads(line) for line in f.readlines()] + + if len(contents) > 1 and 'exported_count' in contents[-1]: + contents.pop(-1) + + if single_only and len(contents) > 1: + raise ValueError('Multiple rules not allowed') + + return contents or [{}] + else: + rule = load_dump(rule_file) + if isinstance(rule, dict): + return [rule] + elif isinstance(rule, list): + return rule + else: + raise ValueError(f"Expected a list or dictionary in {rule_file}")