[FR] Add --include-metadata argument to export-rules command (#3365)

* added --include-metadata argument to export-rules command

* added type hinting in method definitions

* changed add_metadata to include_metadata

* adjusted argument name to include_metadata in command

* Update detection_rules/main.py

Co-authored-by: Eric Forte <119343520+eric-forte-elastic@users.noreply.github.com>

* fixed flake error

* Update detection_rules/rule.py

Co-authored-by: Mika Ayenson <Mikaayenson@users.noreply.github.com>

---------

Co-authored-by: Eric Forte <119343520+eric-forte-elastic@users.noreply.github.com>
Co-authored-by: Mika Ayenson <Mikaayenson@users.noreply.github.com>
This commit is contained in:
Terrance DeJesus
2024-01-04 16:02:48 -05:00
committed by GitHub
parent 7b1215ccf1
commit d7b62395e7
2 changed files with 20 additions and 10 deletions
+11 -6
View File
@@ -233,7 +233,7 @@ def view_rule(ctx, rule_file, api_format):
def _export_rules(rules: RuleCollection, outfile: Path, downgrade_version: Optional[definitions.SemVer] = None,
verbose=True, skip_unsupported=False):
verbose=True, skip_unsupported=False, include_metadata: bool = False):
"""Export rules into a consolidated ndjson file."""
from .rule import downgrade_contents_from_rule
@@ -246,17 +246,20 @@ def _export_rules(rules: RuleCollection, outfile: Path, downgrade_version: Optio
for rule in rules:
try:
output_lines.append(json.dumps(downgrade_contents_from_rule(rule, downgrade_version),
output_lines.append(json.dumps(downgrade_contents_from_rule(rule, downgrade_version,
include_metadata=include_metadata),
sort_keys=True))
except ValueError as e:
unsupported.append(f'{e}: {rule.id} - {rule.name}')
continue
else:
output_lines = [json.dumps(downgrade_contents_from_rule(r, downgrade_version), sort_keys=True)
output_lines = [json.dumps(downgrade_contents_from_rule(r, downgrade_version,
include_metadata=include_metadata), sort_keys=True)
for r in rules]
else:
output_lines = [json.dumps(r.contents.to_api_format(), sort_keys=True) for r in rules]
output_lines = [json.dumps(r.contents.to_api_format(include_metadata=include_metadata),
sort_keys=True) for r in rules]
outfile.write_text('\n'.join(output_lines) + '\n')
@@ -278,7 +281,9 @@ def _export_rules(rules: RuleCollection, outfile: Path, downgrade_version: Optio
@click.option('--skip-unsupported', '-s', is_flag=True,
help='If `--stack-version` is passed, skip rule types which are unsupported '
'(an error will be raised otherwise)')
def export_rules(rules, outfile: Path, replace_id, stack_version, skip_unsupported) -> RuleCollection:
@click.option('--include-metadata', type=bool, is_flag=True, default=False, help='Add metadata to the exported rules')
def export_rules(rules, outfile: Path, replace_id, stack_version,
skip_unsupported, include_metadata: bool) -> RuleCollection:
"""Export rule(s) into an importable ndjson file."""
assert len(rules) > 0, "No rules found"
@@ -295,7 +300,7 @@ def export_rules(rules, outfile: Path, replace_id, stack_version, skip_unsupport
outfile.parent.mkdir(exist_ok=True)
_export_rules(rules=rules, outfile=outfile, downgrade_version=stack_version,
skip_unsupported=skip_unsupported)
skip_unsupported=skip_unsupported, include_metadata=include_metadata)
return rules
+9 -4
View File
@@ -1159,11 +1159,15 @@ class TOMLRuleContents(BaseRuleContents, MarshmallowDataclassMixin):
flattened.update(self.metadata.to_dict())
return flattened
def to_api_format(self, include_version=True) -> dict:
def to_api_format(self, include_version: bool = True, include_metadata: bool = False) -> dict:
"""Convert the TOML rule to the API format."""
converted_data = self.to_dict()['rule']
rule_dict = self.to_dict()
converted_data = rule_dict['rule']
converted = self._post_dict_conversion(converted_data)
if include_metadata:
converted["meta"] = rule_dict['metadata']
if include_version:
converted["version"] = self.autobumped_version
@@ -1295,7 +1299,8 @@ class DeprecatedRule(dict):
return self.contents.name
def downgrade_contents_from_rule(rule: TOMLRule, target_version: str, replace_id: bool = True) -> dict:
def downgrade_contents_from_rule(rule: TOMLRule, target_version: str,
replace_id: bool = True, include_metadata: bool = False) -> dict:
"""Generate the downgraded contents from a rule."""
rule_dict = rule.contents.to_dict()["rule"]
min_stack_version = target_version or rule.contents.metadata.min_stack_version or "8.3.0"
@@ -1314,7 +1319,7 @@ def downgrade_contents_from_rule(rule: TOMLRule, target_version: str, replace_id
rule_contents_dict["transform"] = rule.contents.transform.to_dict()
rule_contents = TOMLRuleContents.from_dict(rule_contents_dict)
payload = rule_contents.to_api_format()
payload = rule_contents.to_api_format(include_metadata=include_metadata)
payload = strip_non_public_fields(min_stack_version, payload)
return payload