From 5c2da0b5c4db0e3b6eada6779a6d881e775532fd Mon Sep 17 00:00:00 2001 From: Ross Wolf <31489089+rw-access@users.noreply.github.com> Date: Tue, 9 Mar 2021 16:37:53 -0700 Subject: [PATCH] Move Rule.build to cli_utils.rule_prompt (#1024) * Move Rule.build to cli_utils.rule_prompt * Fix build_threat_map_entry lint * Fix license and add docstring --- detection_rules/cli_utils.py | 143 +++++++++++++++++++++++++++++++++++ detection_rules/main.py | 5 +- detection_rules/rule.py | 132 +------------------------------- 3 files changed, 148 insertions(+), 132 deletions(-) create mode 100644 detection_rules/cli_utils.py diff --git a/detection_rules/cli_utils.py b/detection_rules/cli_utils.py new file mode 100644 index 000000000..b6be9a683 --- /dev/null +++ b/detection_rules/cli_utils.py @@ -0,0 +1,143 @@ +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License +# 2.0; you may not use this file except in compliance with the Elastic License +# 2.0. + +import copy +import os + +import click + +import kql +from . import ecs +from .attack import matrix, tactics, build_threat_map_entry +from .rule import Rule +from .schemas import CurrentSchema +from .utils import clear_caches, get_path + +RULES_DIR = get_path("rules") + + +def rule_prompt(path=None, rule_type=None, required_only=True, save=True, verbose=False, **kwargs) -> Rule: + """Prompt loop to build a rule.""" + from .misc import schema_prompt + + if verbose and path: + click.echo(f'[+] Building rule for {path}') + + kwargs = copy.deepcopy(kwargs) + + if 'rule' in kwargs and 'metadata' in kwargs: + kwargs.update(kwargs.pop('metadata')) + kwargs.update(kwargs.pop('rule')) + + rule_type = rule_type or kwargs.get('type') or \ + click.prompt('Rule type ({})'.format(', '.join(CurrentSchema.RULE_TYPES)), + type=click.Choice(CurrentSchema.RULE_TYPES)) + + schema = CurrentSchema.get_schema(role=rule_type) + props = schema['properties'] + opt_reqs = schema.get('required', []) + contents = {} + skipped = [] + + for name, options in props.items(): + + if name == 'type': + contents[name] = rule_type + continue + + # these are set at package release time + if name == 'version': + continue + + if required_only and name not in opt_reqs: + continue + + # build this from technique ID + if name == 'threat': + threat_map = [] + + while click.confirm('add mitre tactic?'): + tactic = schema_prompt('mitre tactic name', type='string', enum=tactics, required=True) + technique_ids = schema_prompt(f'technique or sub-technique IDs for {tactic}', type='array', + required=False, enum=list(matrix[tactic])) or [] + + try: + threat_map.append(build_threat_map_entry(tactic, *technique_ids)) + except KeyError as e: + click.secho(f'Unknown ID: {e.args[0]} - entry not saved for: {tactic}', fg='red', err=True) + continue + except ValueError as e: + click.secho(f'{e} - entry not saved for: {tactic}', fg='red', err=True) + continue + + if len(threat_map) > 0: + contents[name] = threat_map + continue + + if name == 'threshold': + contents[name] = {n: schema_prompt(f'threshold {n}', required=n in options['required'], **opts.copy()) + for n, opts in options['properties'].items()} + continue + + if kwargs.get(name): + contents[name] = schema_prompt(name, value=kwargs.pop(name)) + continue + + result = schema_prompt(name, required=name in opt_reqs, **options.copy()) + + if result: + if name not in opt_reqs and result == options.get('default', ''): + skipped.append(name) + continue + + contents[name] = result + + suggested_path = os.path.join(RULES_DIR, contents['name']) # TODO: UPDATE BASED ON RULE STRUCTURE + path = os.path.realpath(path or input('File path for rule [{}]: '.format(suggested_path)) or suggested_path) + + rule = None + + try: + rule = Rule(path, {'rule': contents}) + except kql.KqlParseError as e: + if e.error_msg == 'Unknown field': + warning = ('If using a non-ECS field, you must update "ecs{}.non-ecs-schema.json" under `beats` or ' + '`legacy-endgame` (Non-ECS fields should be used minimally).'.format(os.path.sep)) + click.secho(e.args[0], fg='red', err=True) + click.secho(warning, fg='yellow', err=True) + click.pause() + + # if failing due to a query, loop until resolved or terminated + while True: + try: + contents['query'] = click.edit(contents['query'], extension='.eql') + rule = Rule(path, {'rule': contents}) + except kql.KqlParseError as e: + click.secho(e.args[0], fg='red', err=True) + click.pause() + + if e.error_msg.startswith("Unknown field"): + # get the latest schema for schema errors + clear_caches() + ecs.get_kql_schema(indexes=contents.get("index", [])) + continue + + break + + if save: + rule.save(verbose=True, as_rule=True) + + if skipped: + print('Did not set the following values because they are un-required when set to the default value') + print(' - {}'.format('\n - '.join(skipped))) + + # rta_mappings.add_rule_to_mapping_file(rule) + # click.echo('Placeholder added to rule-mapping.yml') + + click.echo('Rule will validate against the latest ECS schema available (and beats if necessary)') + click.echo(' - to have a rule validate against specific ECS schemas, add them to metadata->ecs_versions') + click.echo(' - to have a rule validate against a specific beats schema, add it to metadata->beats_version') + + return rule diff --git a/detection_rules/main.py b/detection_rules/main.py index 7209b6eca..33dbe0dfc 100644 --- a/detection_rules/main.py +++ b/detection_rules/main.py @@ -17,6 +17,7 @@ import jsonschema import pytoml from . import rule_loader +from .cli_utils import rule_prompt from .misc import client_error, nested_set, parse_config from .rule import Rule from .rule_formatter import toml_write @@ -48,7 +49,7 @@ def create_rule(path, config, required_only, rule_type): """Create a detection rule.""" contents = load_rule_contents(config, single_only=True)[0] if config else {} try: - return Rule.build(path, rule_type=rule_type, required_only=required_only, save=True, **contents) + return rule_prompt(path, rule_type=rule_type, required_only=required_only, save=True, **contents) finally: rule_loader.reset() @@ -109,7 +110,7 @@ def import_rules(infile, directory): base_path = contents.get('name') or contents.get('rule', {}).get('name') base_path = name_to_filename(base_path) if base_path else base_path rule_path = os.path.join(RULES_DIR, base_path) if base_path else None - Rule.build(rule_path, required_only=True, save=True, verbose=True, **contents) + rule_prompt(rule_path, required_only=True, save=True, verbose=True, **contents) @root.command('toml-lint') diff --git a/detection_rules/rule.py b/detection_rules/rule.py index dc9b8a3c2..52135bed8 100644 --- a/detection_rules/rule.py +++ b/detection_rules/rule.py @@ -11,16 +11,13 @@ import os from pathlib import Path from uuid import uuid4 -import click -import kql import eql +import kql from . import ecs, beats -from .attack import tactics, build_threat_map_entry, matrix from .rule_formatter import nested_normalize, toml_write from .schemas import CurrentSchema, TomlMetadata, downgrade -from .utils import get_path, clear_caches, cached - +from .utils import get_path, cached RULES_DIR = get_path("rules") _META_SCHEMA_REQ_DEFAULTS = {} @@ -400,131 +397,6 @@ class Rule(object): return payload - @classmethod - def build(cls, path=None, rule_type=None, required_only=True, save=True, verbose=False, **kwargs): - """Build a rule from data and prompts.""" - from .misc import schema_prompt - - if verbose and path: - click.echo(f'[+] Building rule for {path}') - - kwargs = copy.deepcopy(kwargs) - - if 'rule' in kwargs and 'metadata' in kwargs: - kwargs.update(kwargs.pop('metadata')) - kwargs.update(kwargs.pop('rule')) - - rule_type = rule_type or kwargs.get('type') or \ - click.prompt('Rule type ({})'.format(', '.join(CurrentSchema.RULE_TYPES)), - type=click.Choice(CurrentSchema.RULE_TYPES)) - - schema = CurrentSchema.get_schema(role=rule_type) - props = schema['properties'] - opt_reqs = schema.get('required', []) - contents = {} - skipped = [] - - for name, options in props.items(): - - if name == 'type': - contents[name] = rule_type - continue - - # these are set at package release time - if name == 'version': - continue - - if required_only and name not in opt_reqs: - continue - - # build this from technique ID - if name == 'threat': - threat_map = [] - - while click.confirm('add mitre tactic?'): - tactic = schema_prompt('mitre tactic name', type='string', enum=tactics, required=True) - technique_ids = schema_prompt(f'technique or sub-technique IDs for {tactic}', type='array', - required=False, enum=list(matrix[tactic])) or [] - - try: - threat_map.append(build_threat_map_entry(tactic, *technique_ids)) - except KeyError as e: - click.secho(f'Unknown ID: {e.args[0]} - entry not saved for: {tactic}', fg='red', err=True) - continue - except ValueError as e: - click.secho(f'{e} - entry not saved for: {tactic}', fg='red', err=True) - continue - - if len(threat_map) > 0: - contents[name] = threat_map - continue - - if name == 'threshold': - contents[name] = {n: schema_prompt(f'threshold {n}', required=n in options['required'], **opts.copy()) - for n, opts in options['properties'].items()} - continue - - if kwargs.get(name): - contents[name] = schema_prompt(name, value=kwargs.pop(name)) - continue - - result = schema_prompt(name, required=name in opt_reqs, **options.copy()) - - if result: - if name not in opt_reqs and result == options.get('default', ''): - skipped.append(name) - continue - - contents[name] = result - - suggested_path = os.path.join(RULES_DIR, contents['name']) # TODO: UPDATE BASED ON RULE STRUCTURE - path = os.path.realpath(path or input('File path for rule [{}]: '.format(suggested_path)) or suggested_path) - - rule = None - - try: - rule = cls(path, {'rule': contents}) - except kql.KqlParseError as e: - if e.error_msg == 'Unknown field': - warning = ('If using a non-ECS field, you must update "ecs{}.non-ecs-schema.json" under `beats` or ' - '`legacy-endgame` (Non-ECS fields should be used minimally).'.format(os.path.sep)) - click.secho(e.args[0], fg='red', err=True) - click.secho(warning, fg='yellow', err=True) - click.pause() - - # if failing due to a query, loop until resolved or terminated - while True: - try: - contents['query'] = click.edit(contents['query'], extension='.eql') - rule = cls(path, {'rule': contents}) - except kql.KqlParseError as e: - click.secho(e.args[0], fg='red', err=True) - click.pause() - - if e.error_msg.startswith("Unknown field"): - # get the latest schema for schema errors - clear_caches() - ecs.get_kql_schema(indexes=contents.get("index", [])) - continue - - break - - if save: - rule.save(verbose=True, as_rule=True) - - if skipped: - print('Did not set the following values because they are un-required when set to the default value') - print(' - {}'.format('\n - '.join(skipped))) - - # rta_mappings.add_rule_to_mapping_file(rule) - # click.echo('Placeholder added to rule-mapping.yml') - - click.echo('Rule will validate against the latest ECS schema available (and beats if necessary)') - click.echo(' - to have a rule validate against specific ECS schemas, add them to metadata->ecs_versions') - click.echo(' - to have a rule validate against a specific beats schema, add it to metadata->beats_version') - - return rule - def downgrade_contents_from_rule(rule: Rule, target_version: str) -> dict: """Generate the downgraded contents from a rule."""