Collect unique query fields per rule (#296)
This commit is contained in:
+13
-9
@@ -100,24 +100,27 @@ def toml_lint(rule_file):
|
||||
|
||||
@root.command('mass-update')
|
||||
@click.argument('query')
|
||||
@click.option('--metadata', '-m', is_flag=True, help='Make an update to the rule metadata rather than contents.')
|
||||
@click.option('--language', type=click.Choice(["eql", "kql"]), default="kql")
|
||||
@click.option('--field', type=(str, str), multiple=True,
|
||||
help='Use rule-search to retrieve a subset of rules and modify values '
|
||||
'(ex: --field management.ecs_version 1.1.1).\n'
|
||||
'Note this is limited to string fields only. Nested fields should use dot notation.')
|
||||
@click.pass_context
|
||||
def mass_update(ctx, query, field):
|
||||
def mass_update(ctx, query, metadata, language, field):
|
||||
"""Update multiple rules based on eql results."""
|
||||
results = ctx.invoke(search_rules, query=query, verbose=False)
|
||||
rules = [rule_loader.get_rule(r['rule_id']) for r in results]
|
||||
results = ctx.invoke(search_rules, query=query, language=language, verbose=False)
|
||||
rules = [rule_loader.get_rule(r['rule_id'], verbose=False) for r in results]
|
||||
|
||||
for rule in rules:
|
||||
for key, value in field:
|
||||
nested_set(rule.contents, key, value)
|
||||
nested_set(rule.metadata if metadata else rule.contents, key, value)
|
||||
|
||||
rule.validate(as_rule=True)
|
||||
rule.save()
|
||||
rule.save(as_rule=True)
|
||||
|
||||
return ctx.invoke(search_rules, query=query, columns=[k[0].split('.')[-1] for k in field])
|
||||
return ctx.invoke(search_rules, query=query, language=language,
|
||||
columns=['rule_id', 'name'] + [k[0].split('.')[-1] for k in field])
|
||||
|
||||
|
||||
@root.command('view-rule')
|
||||
@@ -226,7 +229,7 @@ def search_rules(query, columns, language, verbose=True):
|
||||
|
||||
flattened_rules = []
|
||||
|
||||
for file_name, rule_doc in rule_loader.load_rule_files().items():
|
||||
for file_name, rule_doc in rule_loader.load_rule_files(verbose=verbose).items():
|
||||
flat = {"file": os.path.relpath(file_name)}
|
||||
flat.update(rule_doc)
|
||||
flat.update(rule_doc["metadata"])
|
||||
@@ -234,7 +237,8 @@ def search_rules(query, columns, language, verbose=True):
|
||||
attacks = [threat for threat in rule_doc["rule"].get("threat", []) if threat["framework"] == "MITRE ATT&CK"]
|
||||
techniques = [t["id"] for threat in attacks for t in threat.get("technique", [])]
|
||||
tactics = [threat["tactic"]["name"] for threat in attacks]
|
||||
flat.update(techniques=techniques, tactics=tactics)
|
||||
flat.update(techniques=techniques, tactics=tactics,
|
||||
unique_fields=Rule.get_unique_query_fields(rule_doc['rule']))
|
||||
flattened_rules.append(flat)
|
||||
|
||||
flattened_rules.sort(key=lambda dct: dct["name"])
|
||||
@@ -272,7 +276,7 @@ def build_release(config_file, update_version_lock):
|
||||
"""Assemble all the rules into Kibana-ready release files."""
|
||||
config = load_dump(config_file)['package']
|
||||
click.echo('[+] Building package {}'.format(config.get('name')))
|
||||
package = Package.from_config(config, update_version_lock=update_version_lock)
|
||||
package = Package.from_config(config, update_version_lock=update_version_lock, verbose=True)
|
||||
package.save()
|
||||
package.get_package_hash(verbose=True)
|
||||
click.echo('- {} rules included'.format(len(package.rules)))
|
||||
|
||||
@@ -45,11 +45,12 @@ def nested_get(_dict, dot_key, default=None):
|
||||
|
||||
def nested_set(_dict, dot_key, value):
|
||||
"""Set a nested field from a a key in dot notation."""
|
||||
for key in dot_key.split('.')[:-1]:
|
||||
keys = dot_key.split('.')
|
||||
for key in keys[:-1]:
|
||||
_dict = _dict.setdefault(key, {})
|
||||
|
||||
if isinstance(_dict, dict):
|
||||
_dict[dot_key[-1]] = value
|
||||
_dict[keys[-1]] = value
|
||||
else:
|
||||
raise ValueError('dict cannot set a value to a non-dict for {}'.format(dot_key))
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ RULE_VERSIONS = get_etc_path('version.lock.json')
|
||||
NOTICE_FILE = get_path('NOTICE.txt')
|
||||
|
||||
|
||||
def filter_rule(rule, config_filter): # type: (Rule,dict) -> bool # rule.contents (not api), filter_dict -> match
|
||||
def filter_rule(rule, config_filter, exclude_fields): # type: (Rule,dict,dict) -> bool
|
||||
"""Filter a rule based off metadata and a package configuration."""
|
||||
flat_rule = rule.flattened_contents
|
||||
for key, values in config_filter.items():
|
||||
@@ -41,6 +41,11 @@ def filter_rule(rule, config_filter): # type: (Rule,dict) -> bool # rule.conte
|
||||
if len(rule_values & values) == 0:
|
||||
return False
|
||||
|
||||
for index, fields in exclude_fields.items():
|
||||
if rule.unique_fields and (rule.contents['index'] == index or index == 'any'):
|
||||
if set(rule.unique_fields) & set(fields):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
@@ -219,15 +224,20 @@ class Package(object):
|
||||
return sha256
|
||||
|
||||
@classmethod
|
||||
def from_config(cls, config=None, update_version_lock=False): # type: (dict, bool) -> Package
|
||||
def from_config(cls, config: dict = None, update_version_lock: bool = False, verbose: bool = False) -> 'Package':
|
||||
"""Load a rules package given a config."""
|
||||
all_rules = rule_loader.load_rules(verbose=False).values()
|
||||
config = config or {}
|
||||
exclude_fields = config.pop('exclude_fields', {})
|
||||
rule_filter = config.pop('filter', {})
|
||||
min_version = config.pop('min_version', None)
|
||||
max_version = config.pop('max_version', None)
|
||||
|
||||
rules = filter(lambda rule: filter_rule(rule, rule_filter), all_rules)
|
||||
rules = list(filter(lambda rule: filter_rule(rule, rule_filter, exclude_fields), all_rules))
|
||||
|
||||
if verbose:
|
||||
click.echo(f' - {len(all_rules) - len(rules)} rules excluded from package')
|
||||
|
||||
update = config.pop('update', {})
|
||||
package = cls(rules, min_version=min_version, max_version=max_version, update_version_lock=update_version_lock,
|
||||
**config)
|
||||
|
||||
@@ -94,10 +94,25 @@ class Rule(object):
|
||||
def type(self):
|
||||
return self.contents.get('type')
|
||||
|
||||
@property
|
||||
def unique_fields(self):
|
||||
parsed = self.parsed_query
|
||||
if parsed is not None:
|
||||
return list(set(str(f) for f in parsed if isinstance(f, (eql.ast.Field, kql.ast.Field))))
|
||||
|
||||
def to_eql(self):
|
||||
if self.query and self.contents['language'] == 'kuery':
|
||||
return kql.to_eql(self.query)
|
||||
|
||||
@classmethod
|
||||
def get_unique_query_fields(cls, rule_contents):
|
||||
"""Get a list of unique fields used in a rule query from rule contents."""
|
||||
query = rule_contents.get('query')
|
||||
language = rule_contents.get('language')
|
||||
if language in ('kuery', 'eql'):
|
||||
parsed = kql.parse(query) if language == 'kuery' else eql.parse_query(query)
|
||||
return sorted(set(str(f) for f in parsed if isinstance(f, (eql.ast.Field, kql.ast.Field))))
|
||||
|
||||
@staticmethod
|
||||
@cached
|
||||
def get_meta_schema_required_defaults():
|
||||
|
||||
@@ -2,6 +2,14 @@
|
||||
package:
|
||||
name: "7.10"
|
||||
release: true
|
||||
# exclude rules which have any of the following index <-> field pairs
|
||||
# exclude_fields:
|
||||
# # special field to apply to all indexes
|
||||
# any:
|
||||
# - process.args
|
||||
# - network.direction
|
||||
# logs-endpoint.events.*:
|
||||
# - file.name
|
||||
filter:
|
||||
# ecs_version:
|
||||
# - 1.4.0
|
||||
|
||||
Reference in New Issue
Block a user