[DaC] Beta Release (#3889)
Co-authored-by: Justin Ibarra <16747370+brokensound77@users.noreply.github.com> Co-authored-by: brokensound77 <brokensound77@users.noreply.github.com> Co-authored-by: Mika Ayenson <Mikaayenson@users.noreply.github.com> Co-authored-by: Mika Ayenson <mika.ayenson@elastic.co>
This commit is contained in:
@@ -5,6 +5,7 @@
|
||||
|
||||
import copy
|
||||
import datetime
|
||||
import functools
|
||||
import os
|
||||
import typing
|
||||
from pathlib import Path
|
||||
@@ -13,18 +14,15 @@ from typing import List, Optional
|
||||
import click
|
||||
|
||||
import kql
|
||||
import functools
|
||||
|
||||
from . import ecs
|
||||
from .attack import matrix, tactics, build_threat_map_entry
|
||||
from .rule import TOMLRule, TOMLRuleContents
|
||||
from .rule_loader import (RuleCollection,
|
||||
DEFAULT_RULES_DIR,
|
||||
DEFAULT_BBR_DIR,
|
||||
from .attack import build_threat_map_entry, matrix, tactics
|
||||
from .rule import BYPASS_VERSION_LOCK, TOMLRule, TOMLRuleContents
|
||||
from .rule_loader import (DEFAULT_PREBUILT_BBR_DIRS,
|
||||
DEFAULT_PREBUILT_RULES_DIRS, RuleCollection,
|
||||
dict_filter)
|
||||
from .schemas import definitions
|
||||
from .utils import clear_caches, get_path
|
||||
|
||||
RULES_DIR = get_path("rules")
|
||||
from .utils import clear_caches
|
||||
|
||||
|
||||
def single_collection(f):
|
||||
@@ -49,7 +47,7 @@ def single_collection(f):
|
||||
rules.load_directories(Path(d) for d in directories)
|
||||
|
||||
if rule_id:
|
||||
rules.load_directories((DEFAULT_RULES_DIR, DEFAULT_BBR_DIR),
|
||||
rules.load_directories(DEFAULT_PREBUILT_RULES_DIRS + DEFAULT_PREBUILT_BBR_DIRS,
|
||||
obj_filter=dict_filter(rule__rule_id=rule_id))
|
||||
if len(rules) != 1:
|
||||
client_error(f"Could not find rule with ID {rule_id}")
|
||||
@@ -64,10 +62,10 @@ def multi_collection(f):
|
||||
"""Add arguments to get a RuleCollection by file, directory or a list of IDs"""
|
||||
from .misc import client_error
|
||||
|
||||
@click.option('--rule-file', '-f', multiple=True, type=click.Path(dir_okay=False), required=False)
|
||||
@click.option('--directory', '-d', multiple=True, type=click.Path(file_okay=False), required=False,
|
||||
help='Recursively load rules from a directory')
|
||||
@click.option('--rule-id', '-id', multiple=True, required=False)
|
||||
@click.option("--rule-file", "-f", multiple=True, type=click.Path(dir_okay=False), required=False)
|
||||
@click.option("--directory", "-d", multiple=True, type=click.Path(file_okay=False), required=False,
|
||||
help="Recursively load rules from a directory")
|
||||
@click.option("--rule-id", "-id", multiple=True, required=False)
|
||||
@functools.wraps(f)
|
||||
def get_collection(*args, **kwargs):
|
||||
rule_id: List[str] = kwargs.pop("rule_id", [])
|
||||
@@ -76,20 +74,23 @@ def multi_collection(f):
|
||||
|
||||
rules = RuleCollection()
|
||||
|
||||
if not (directories or rule_id or rule_files):
|
||||
client_error('Required: at least one of --rule-id, --rule-file, or --directory')
|
||||
if not (directories or rule_id or rule_files or (DEFAULT_PREBUILT_RULES_DIRS + DEFAULT_PREBUILT_BBR_DIRS)):
|
||||
client_error("Required: at least one of --rule-id, --rule-file, or --directory")
|
||||
|
||||
rules.load_files(Path(p) for p in rule_files)
|
||||
rules.load_directories(Path(d) for d in directories)
|
||||
|
||||
if rule_id:
|
||||
rules.load_directories((DEFAULT_RULES_DIR, DEFAULT_BBR_DIR),
|
||||
obj_filter=dict_filter(rule__rule_id=rule_id))
|
||||
rules.load_directories(
|
||||
DEFAULT_PREBUILT_RULES_DIRS + DEFAULT_PREBUILT_BBR_DIRS, obj_filter=dict_filter(rule__rule_id=rule_id)
|
||||
)
|
||||
found_ids = {rule.id for rule in rules}
|
||||
missing = set(rule_id).difference(found_ids)
|
||||
|
||||
if missing:
|
||||
client_error(f'Could not find rules with IDs: {", ".join(missing)}')
|
||||
elif not rule_files and not directories:
|
||||
rules.load_directories(Path(d) for d in (DEFAULT_PREBUILT_RULES_DIRS + DEFAULT_PREBUILT_BBR_DIRS))
|
||||
|
||||
if len(rules) == 0:
|
||||
client_error("No rules found")
|
||||
@@ -101,7 +102,8 @@ def multi_collection(f):
|
||||
|
||||
|
||||
def rule_prompt(path=None, rule_type=None, required_only=True, save=True, verbose=False,
|
||||
additional_required: Optional[list] = None, **kwargs) -> TOMLRule:
|
||||
additional_required: Optional[list] = None, skip_errors: bool = False, strip_none_values=True, **kwargs,
|
||||
) -> TOMLRule:
|
||||
"""Prompt loop to build a rule."""
|
||||
from .misc import schema_prompt
|
||||
|
||||
@@ -112,6 +114,8 @@ def rule_prompt(path=None, rule_type=None, required_only=True, save=True, verbos
|
||||
|
||||
kwargs = copy.deepcopy(kwargs)
|
||||
|
||||
rule_name = kwargs.get('name')
|
||||
|
||||
if 'rule' in kwargs and 'metadata' in kwargs:
|
||||
kwargs.update(kwargs.pop('metadata'))
|
||||
kwargs.update(kwargs.pop('rule'))
|
||||
@@ -132,8 +136,8 @@ def rule_prompt(path=None, rule_type=None, required_only=True, save=True, verbos
|
||||
contents[name] = rule_type
|
||||
continue
|
||||
|
||||
# these are set at package release time
|
||||
if name == 'version':
|
||||
# these are set at package release time depending on the version strategy
|
||||
if (name == 'version' or name == 'revision') and not BYPASS_VERSION_LOCK:
|
||||
continue
|
||||
|
||||
if required_only and name not in required_fields:
|
||||
@@ -142,20 +146,20 @@ def rule_prompt(path=None, rule_type=None, required_only=True, save=True, verbos
|
||||
# build this from technique ID
|
||||
if name == 'threat':
|
||||
threat_map = []
|
||||
if not skip_errors:
|
||||
while click.confirm('add mitre tactic?'):
|
||||
tactic = schema_prompt('mitre tactic name', type='string', enum=tactics, is_required=True)
|
||||
technique_ids = schema_prompt(f'technique or sub-technique IDs for {tactic}', type='array',
|
||||
is_required=False, enum=list(matrix[tactic])) or []
|
||||
|
||||
while click.confirm('add mitre tactic?'):
|
||||
tactic = schema_prompt('mitre tactic name', type='string', enum=tactics, is_required=True)
|
||||
technique_ids = schema_prompt(f'technique or sub-technique IDs for {tactic}', type='array',
|
||||
is_required=False, enum=list(matrix[tactic])) or []
|
||||
|
||||
try:
|
||||
threat_map.append(build_threat_map_entry(tactic, *technique_ids))
|
||||
except KeyError as e:
|
||||
click.secho(f'Unknown ID: {e.args[0]} - entry not saved for: {tactic}', fg='red', err=True)
|
||||
continue
|
||||
except ValueError as e:
|
||||
click.secho(f'{e} - entry not saved for: {tactic}', fg='red', err=True)
|
||||
continue
|
||||
try:
|
||||
threat_map.append(build_threat_map_entry(tactic, *technique_ids))
|
||||
except KeyError as e:
|
||||
click.secho(f'Unknown ID: {e.args[0]} - entry not saved for: {tactic}', fg='red', err=True)
|
||||
continue
|
||||
except ValueError as e:
|
||||
click.secho(f'{e} - entry not saved for: {tactic}', fg='red', err=True)
|
||||
continue
|
||||
|
||||
if len(threat_map) > 0:
|
||||
contents[name] = threat_map
|
||||
@@ -178,8 +182,11 @@ def rule_prompt(path=None, rule_type=None, required_only=True, save=True, verbos
|
||||
]
|
||||
|
||||
else:
|
||||
result = schema_prompt(name, is_required=name in required_fields, **options.copy())
|
||||
|
||||
if skip_errors:
|
||||
# return missing information
|
||||
return f"Rule: {kwargs["id"]}, Rule Name: {rule_name} is missing {name} information"
|
||||
else:
|
||||
result = schema_prompt(name, is_required=name in required_fields, **options.copy())
|
||||
if result:
|
||||
if name not in required_fields and result == options.get('default', ''):
|
||||
skipped.append(name)
|
||||
@@ -187,13 +194,16 @@ def rule_prompt(path=None, rule_type=None, required_only=True, save=True, verbos
|
||||
|
||||
contents[name] = result
|
||||
|
||||
suggested_path = os.path.join(RULES_DIR, contents['name']) # TODO: UPDATE BASED ON RULE STRUCTURE
|
||||
path = os.path.realpath(path or input('File path for rule [{}]: '.format(suggested_path)) or suggested_path)
|
||||
# DEFAULT_PREBUILT_RULES_DIRS[0] is a required directory just as a suggestion
|
||||
suggested_path = Path(DEFAULT_PREBUILT_RULES_DIRS[0]) / contents['name']
|
||||
path = Path(path or input(f'File path for rule [{suggested_path}]: ') or suggested_path).resolve()
|
||||
meta = {'creation_date': creation_date, 'updated_date': creation_date, 'maturity': 'development'}
|
||||
|
||||
try:
|
||||
rule = TOMLRule(path=Path(path), contents=TOMLRuleContents.from_dict({'rule': contents, 'metadata': meta}))
|
||||
except kql.KqlParseError as e:
|
||||
if skip_errors:
|
||||
return f"Rule: {kwargs['id']}, Rule Name: {rule_name} query failed to parse: {e.error_msg}"
|
||||
if e.error_msg == 'Unknown field':
|
||||
warning = ('If using a non-ECS field, you must update "ecs{}.non-ecs-schema.json" under `beats` or '
|
||||
'`legacy-endgame` (Non-ECS fields should be used minimally).'.format(os.path.sep))
|
||||
@@ -218,9 +228,13 @@ def rule_prompt(path=None, rule_type=None, required_only=True, save=True, verbos
|
||||
continue
|
||||
|
||||
break
|
||||
except Exception as e:
|
||||
if skip_errors:
|
||||
return f"Rule: {kwargs['id']}, Rule Name: {rule_name} failed: {e}"
|
||||
raise e
|
||||
|
||||
if save:
|
||||
rule.save_toml()
|
||||
rule.save_toml(strip_none_values=strip_none_values)
|
||||
|
||||
if skipped:
|
||||
print('Did not set the following values because they are un-required when set to the default value')
|
||||
|
||||
Reference in New Issue
Block a user