From d7b62395e78354122d77d142ab6b3312ebee9549 Mon Sep 17 00:00:00 2001 From: Terrance DeJesus <99630311+terrancedejesus@users.noreply.github.com> Date: Thu, 4 Jan 2024 16:02:48 -0500 Subject: [PATCH] [FR] Add `--include-metadata` argument to `export-rules` command (#3365) * added --include-metadata argument to export-rules command * added type hinting in method definitions * changed add_metadata to include_metadata * adjusted argument name to include_metadata in command * Update detection_rules/main.py Co-authored-by: Eric Forte <119343520+eric-forte-elastic@users.noreply.github.com> * fixed flake error * Update detection_rules/rule.py Co-authored-by: Mika Ayenson --------- Co-authored-by: Eric Forte <119343520+eric-forte-elastic@users.noreply.github.com> Co-authored-by: Mika Ayenson --- detection_rules/main.py | 17 +++++++++++------ detection_rules/rule.py | 13 +++++++++---- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/detection_rules/main.py b/detection_rules/main.py index 40f46270d..be68aecc3 100644 --- a/detection_rules/main.py +++ b/detection_rules/main.py @@ -233,7 +233,7 @@ def view_rule(ctx, rule_file, api_format): def _export_rules(rules: RuleCollection, outfile: Path, downgrade_version: Optional[definitions.SemVer] = None, - verbose=True, skip_unsupported=False): + verbose=True, skip_unsupported=False, include_metadata: bool = False): """Export rules into a consolidated ndjson file.""" from .rule import downgrade_contents_from_rule @@ -246,17 +246,20 @@ def _export_rules(rules: RuleCollection, outfile: Path, downgrade_version: Optio for rule in rules: try: - output_lines.append(json.dumps(downgrade_contents_from_rule(rule, downgrade_version), + output_lines.append(json.dumps(downgrade_contents_from_rule(rule, downgrade_version, + include_metadata=include_metadata), sort_keys=True)) except ValueError as e: unsupported.append(f'{e}: {rule.id} - {rule.name}') continue else: - output_lines = [json.dumps(downgrade_contents_from_rule(r, downgrade_version), sort_keys=True) + output_lines = [json.dumps(downgrade_contents_from_rule(r, downgrade_version, + include_metadata=include_metadata), sort_keys=True) for r in rules] else: - output_lines = [json.dumps(r.contents.to_api_format(), sort_keys=True) for r in rules] + output_lines = [json.dumps(r.contents.to_api_format(include_metadata=include_metadata), + sort_keys=True) for r in rules] outfile.write_text('\n'.join(output_lines) + '\n') @@ -278,7 +281,9 @@ def _export_rules(rules: RuleCollection, outfile: Path, downgrade_version: Optio @click.option('--skip-unsupported', '-s', is_flag=True, help='If `--stack-version` is passed, skip rule types which are unsupported ' '(an error will be raised otherwise)') -def export_rules(rules, outfile: Path, replace_id, stack_version, skip_unsupported) -> RuleCollection: +@click.option('--include-metadata', type=bool, is_flag=True, default=False, help='Add metadata to the exported rules') +def export_rules(rules, outfile: Path, replace_id, stack_version, + skip_unsupported, include_metadata: bool) -> RuleCollection: """Export rule(s) into an importable ndjson file.""" assert len(rules) > 0, "No rules found" @@ -295,7 +300,7 @@ def export_rules(rules, outfile: Path, replace_id, stack_version, skip_unsupport outfile.parent.mkdir(exist_ok=True) _export_rules(rules=rules, outfile=outfile, downgrade_version=stack_version, - skip_unsupported=skip_unsupported) + skip_unsupported=skip_unsupported, include_metadata=include_metadata) return rules diff --git a/detection_rules/rule.py b/detection_rules/rule.py index 31f216015..456363a79 100644 --- a/detection_rules/rule.py +++ b/detection_rules/rule.py @@ -1159,11 +1159,15 @@ class TOMLRuleContents(BaseRuleContents, MarshmallowDataclassMixin): flattened.update(self.metadata.to_dict()) return flattened - def to_api_format(self, include_version=True) -> dict: + def to_api_format(self, include_version: bool = True, include_metadata: bool = False) -> dict: """Convert the TOML rule to the API format.""" - converted_data = self.to_dict()['rule'] + rule_dict = self.to_dict() + converted_data = rule_dict['rule'] converted = self._post_dict_conversion(converted_data) + if include_metadata: + converted["meta"] = rule_dict['metadata'] + if include_version: converted["version"] = self.autobumped_version @@ -1295,7 +1299,8 @@ class DeprecatedRule(dict): return self.contents.name -def downgrade_contents_from_rule(rule: TOMLRule, target_version: str, replace_id: bool = True) -> dict: +def downgrade_contents_from_rule(rule: TOMLRule, target_version: str, + replace_id: bool = True, include_metadata: bool = False) -> dict: """Generate the downgraded contents from a rule.""" rule_dict = rule.contents.to_dict()["rule"] min_stack_version = target_version or rule.contents.metadata.min_stack_version or "8.3.0" @@ -1314,7 +1319,7 @@ def downgrade_contents_from_rule(rule: TOMLRule, target_version: str, replace_id rule_contents_dict["transform"] = rule.contents.transform.to_dict() rule_contents = TOMLRuleContents.from_dict(rule_contents_dict) - payload = rule_contents.to_api_format() + payload = rule_contents.to_api_format(include_metadata=include_metadata) payload = strip_non_public_fields(min_stack_version, payload) return payload