Move Rule.build to cli_utils.rule_prompt (#1024)

* Move Rule.build to cli_utils.rule_prompt
* Fix build_threat_map_entry lint
* Fix license and add docstring
This commit is contained in:
Ross Wolf
2021-03-09 16:37:53 -07:00
committed by GitHub
parent fc9dfde2c4
commit 5c2da0b5c4
3 changed files with 148 additions and 132 deletions
+143
View File
@@ -0,0 +1,143 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import copy
import os
import click
import kql
from . import ecs
from .attack import matrix, tactics, build_threat_map_entry
from .rule import Rule
from .schemas import CurrentSchema
from .utils import clear_caches, get_path
RULES_DIR = get_path("rules")
def rule_prompt(path=None, rule_type=None, required_only=True, save=True, verbose=False, **kwargs) -> Rule:
"""Prompt loop to build a rule."""
from .misc import schema_prompt
if verbose and path:
click.echo(f'[+] Building rule for {path}')
kwargs = copy.deepcopy(kwargs)
if 'rule' in kwargs and 'metadata' in kwargs:
kwargs.update(kwargs.pop('metadata'))
kwargs.update(kwargs.pop('rule'))
rule_type = rule_type or kwargs.get('type') or \
click.prompt('Rule type ({})'.format(', '.join(CurrentSchema.RULE_TYPES)),
type=click.Choice(CurrentSchema.RULE_TYPES))
schema = CurrentSchema.get_schema(role=rule_type)
props = schema['properties']
opt_reqs = schema.get('required', [])
contents = {}
skipped = []
for name, options in props.items():
if name == 'type':
contents[name] = rule_type
continue
# these are set at package release time
if name == 'version':
continue
if required_only and name not in opt_reqs:
continue
# build this from technique ID
if name == 'threat':
threat_map = []
while click.confirm('add mitre tactic?'):
tactic = schema_prompt('mitre tactic name', type='string', enum=tactics, required=True)
technique_ids = schema_prompt(f'technique or sub-technique IDs for {tactic}', type='array',
required=False, enum=list(matrix[tactic])) or []
try:
threat_map.append(build_threat_map_entry(tactic, *technique_ids))
except KeyError as e:
click.secho(f'Unknown ID: {e.args[0]} - entry not saved for: {tactic}', fg='red', err=True)
continue
except ValueError as e:
click.secho(f'{e} - entry not saved for: {tactic}', fg='red', err=True)
continue
if len(threat_map) > 0:
contents[name] = threat_map
continue
if name == 'threshold':
contents[name] = {n: schema_prompt(f'threshold {n}', required=n in options['required'], **opts.copy())
for n, opts in options['properties'].items()}
continue
if kwargs.get(name):
contents[name] = schema_prompt(name, value=kwargs.pop(name))
continue
result = schema_prompt(name, required=name in opt_reqs, **options.copy())
if result:
if name not in opt_reqs and result == options.get('default', ''):
skipped.append(name)
continue
contents[name] = result
suggested_path = os.path.join(RULES_DIR, contents['name']) # TODO: UPDATE BASED ON RULE STRUCTURE
path = os.path.realpath(path or input('File path for rule [{}]: '.format(suggested_path)) or suggested_path)
rule = None
try:
rule = Rule(path, {'rule': contents})
except kql.KqlParseError as e:
if e.error_msg == 'Unknown field':
warning = ('If using a non-ECS field, you must update "ecs{}.non-ecs-schema.json" under `beats` or '
'`legacy-endgame` (Non-ECS fields should be used minimally).'.format(os.path.sep))
click.secho(e.args[0], fg='red', err=True)
click.secho(warning, fg='yellow', err=True)
click.pause()
# if failing due to a query, loop until resolved or terminated
while True:
try:
contents['query'] = click.edit(contents['query'], extension='.eql')
rule = Rule(path, {'rule': contents})
except kql.KqlParseError as e:
click.secho(e.args[0], fg='red', err=True)
click.pause()
if e.error_msg.startswith("Unknown field"):
# get the latest schema for schema errors
clear_caches()
ecs.get_kql_schema(indexes=contents.get("index", []))
continue
break
if save:
rule.save(verbose=True, as_rule=True)
if skipped:
print('Did not set the following values because they are un-required when set to the default value')
print(' - {}'.format('\n - '.join(skipped)))
# rta_mappings.add_rule_to_mapping_file(rule)
# click.echo('Placeholder added to rule-mapping.yml')
click.echo('Rule will validate against the latest ECS schema available (and beats if necessary)')
click.echo(' - to have a rule validate against specific ECS schemas, add them to metadata->ecs_versions')
click.echo(' - to have a rule validate against a specific beats schema, add it to metadata->beats_version')
return rule
+3 -2
View File
@@ -17,6 +17,7 @@ import jsonschema
import pytoml
from . import rule_loader
from .cli_utils import rule_prompt
from .misc import client_error, nested_set, parse_config
from .rule import Rule
from .rule_formatter import toml_write
@@ -48,7 +49,7 @@ def create_rule(path, config, required_only, rule_type):
"""Create a detection rule."""
contents = load_rule_contents(config, single_only=True)[0] if config else {}
try:
return Rule.build(path, rule_type=rule_type, required_only=required_only, save=True, **contents)
return rule_prompt(path, rule_type=rule_type, required_only=required_only, save=True, **contents)
finally:
rule_loader.reset()
@@ -109,7 +110,7 @@ def import_rules(infile, directory):
base_path = contents.get('name') or contents.get('rule', {}).get('name')
base_path = name_to_filename(base_path) if base_path else base_path
rule_path = os.path.join(RULES_DIR, base_path) if base_path else None
Rule.build(rule_path, required_only=True, save=True, verbose=True, **contents)
rule_prompt(rule_path, required_only=True, save=True, verbose=True, **contents)
@root.command('toml-lint')
+2 -130
View File
@@ -11,16 +11,13 @@ import os
from pathlib import Path
from uuid import uuid4
import click
import kql
import eql
import kql
from . import ecs, beats
from .attack import tactics, build_threat_map_entry, matrix
from .rule_formatter import nested_normalize, toml_write
from .schemas import CurrentSchema, TomlMetadata, downgrade
from .utils import get_path, clear_caches, cached
from .utils import get_path, cached
RULES_DIR = get_path("rules")
_META_SCHEMA_REQ_DEFAULTS = {}
@@ -400,131 +397,6 @@ class Rule(object):
return payload
@classmethod
def build(cls, path=None, rule_type=None, required_only=True, save=True, verbose=False, **kwargs):
"""Build a rule from data and prompts."""
from .misc import schema_prompt
if verbose and path:
click.echo(f'[+] Building rule for {path}')
kwargs = copy.deepcopy(kwargs)
if 'rule' in kwargs and 'metadata' in kwargs:
kwargs.update(kwargs.pop('metadata'))
kwargs.update(kwargs.pop('rule'))
rule_type = rule_type or kwargs.get('type') or \
click.prompt('Rule type ({})'.format(', '.join(CurrentSchema.RULE_TYPES)),
type=click.Choice(CurrentSchema.RULE_TYPES))
schema = CurrentSchema.get_schema(role=rule_type)
props = schema['properties']
opt_reqs = schema.get('required', [])
contents = {}
skipped = []
for name, options in props.items():
if name == 'type':
contents[name] = rule_type
continue
# these are set at package release time
if name == 'version':
continue
if required_only and name not in opt_reqs:
continue
# build this from technique ID
if name == 'threat':
threat_map = []
while click.confirm('add mitre tactic?'):
tactic = schema_prompt('mitre tactic name', type='string', enum=tactics, required=True)
technique_ids = schema_prompt(f'technique or sub-technique IDs for {tactic}', type='array',
required=False, enum=list(matrix[tactic])) or []
try:
threat_map.append(build_threat_map_entry(tactic, *technique_ids))
except KeyError as e:
click.secho(f'Unknown ID: {e.args[0]} - entry not saved for: {tactic}', fg='red', err=True)
continue
except ValueError as e:
click.secho(f'{e} - entry not saved for: {tactic}', fg='red', err=True)
continue
if len(threat_map) > 0:
contents[name] = threat_map
continue
if name == 'threshold':
contents[name] = {n: schema_prompt(f'threshold {n}', required=n in options['required'], **opts.copy())
for n, opts in options['properties'].items()}
continue
if kwargs.get(name):
contents[name] = schema_prompt(name, value=kwargs.pop(name))
continue
result = schema_prompt(name, required=name in opt_reqs, **options.copy())
if result:
if name not in opt_reqs and result == options.get('default', ''):
skipped.append(name)
continue
contents[name] = result
suggested_path = os.path.join(RULES_DIR, contents['name']) # TODO: UPDATE BASED ON RULE STRUCTURE
path = os.path.realpath(path or input('File path for rule [{}]: '.format(suggested_path)) or suggested_path)
rule = None
try:
rule = cls(path, {'rule': contents})
except kql.KqlParseError as e:
if e.error_msg == 'Unknown field':
warning = ('If using a non-ECS field, you must update "ecs{}.non-ecs-schema.json" under `beats` or '
'`legacy-endgame` (Non-ECS fields should be used minimally).'.format(os.path.sep))
click.secho(e.args[0], fg='red', err=True)
click.secho(warning, fg='yellow', err=True)
click.pause()
# if failing due to a query, loop until resolved or terminated
while True:
try:
contents['query'] = click.edit(contents['query'], extension='.eql')
rule = cls(path, {'rule': contents})
except kql.KqlParseError as e:
click.secho(e.args[0], fg='red', err=True)
click.pause()
if e.error_msg.startswith("Unknown field"):
# get the latest schema for schema errors
clear_caches()
ecs.get_kql_schema(indexes=contents.get("index", []))
continue
break
if save:
rule.save(verbose=True, as_rule=True)
if skipped:
print('Did not set the following values because they are un-required when set to the default value')
print(' - {}'.format('\n - '.join(skipped)))
# rta_mappings.add_rule_to_mapping_file(rule)
# click.echo('Placeholder added to rule-mapping.yml')
click.echo('Rule will validate against the latest ECS schema available (and beats if necessary)')
click.echo(' - to have a rule validate against specific ECS schemas, add them to metadata->ecs_versions')
click.echo(' - to have a rule validate against a specific beats schema, add it to metadata->beats_version')
return rule
def downgrade_contents_from_rule(rule: Rule, target_version: str) -> dict:
"""Generate the downgraded contents from a rule."""