From 4edef2ea80b7cc24fe34f11af2f0ad282015e857 Mon Sep 17 00:00:00 2001 From: Eric Forte <119343520+eric-forte-elastic@users.noreply.github.com> Date: Wed, 9 Oct 2024 17:19:59 -0400 Subject: [PATCH] [FR][DAC] Import Rules Verbose Message (#4093) * Draft Verbose Message * Fix Linting * Made more descriptive * Updated for readability --- detection_rules/kbwrap.py | 75 ++++++++++++++++++++++----------------- 1 file changed, 43 insertions(+), 32 deletions(-) diff --git a/detection_rules/kbwrap.py b/detection_rules/kbwrap.py index d83b0d66a..5f4719b5f 100644 --- a/detection_rules/kbwrap.py +++ b/detection_rules/kbwrap.py @@ -94,6 +94,45 @@ def kibana_import_rules(ctx: click.Context, rules: RuleCollection, overwrite: Op overwrite_exceptions: Optional[bool] = False, overwrite_action_connectors: Optional[bool] = False) -> (dict, List[RuleResource]): """Import custom rules into Kibana.""" + def _handle_response_errors(response: dict): + """Handle errors from the import response.""" + def _parse_list_id(s: str): + """Parse the list ID from the error message.""" + match = re.search(r'list_id: "(.*?)"', s) + return match.group(1) if match else None + + # Re-try to address known Kibana issue: https://github.com/elastic/kibana/issues/143864 + workaround_errors = [] + + flattened_exceptions = [e for sublist in exception_dicts for e in sublist] + all_exception_list_ids = {exception["list_id"] for exception in flattened_exceptions} + + click.echo(f'{len(response["errors"])} rule(s) failed to import!') + + for error in response['errors']: + click.echo(f' - {error["rule_id"]}: ({error["error"]["status_code"]}) {error["error"]["message"]}') + + if "references a non existent exception list" in error["error"]["message"]: + list_id = _parse_list_id(error["error"]["message"]) + if list_id in all_exception_list_ids: + workaround_errors.append(error["rule_id"]) + + if workaround_errors: + workaround_errors = list(set(workaround_errors)) + click.echo(f'Missing exception list errors detected for {len(workaround_errors)} rules. ' + 'Try re-importing using the following command and rule IDs:\n') + click.echo('python -m detection_rules kibana import-rules -o ', nl=False) + click.echo(' '.join(f'-id {rule_id}' for rule_id in workaround_errors)) + click.echo() + + def _process_imported_items(imported_items_list, item_type_description, item_key): + """Displays appropriately formatted success message that all items imported successfully.""" + all_ids = {item[item_key] for sublist in imported_items_list for item in sublist} + if all_ids: + click.echo(f'{len(all_ids)} {item_type_description} successfully imported') + ids_str = '\n - '.join(all_ids) + click.echo(f' - {ids_str}') + kibana = ctx.obj['kibana'] rule_dicts = [r.contents.to_api_format() for r in rules] with kibana: @@ -113,43 +152,15 @@ def kibana_import_rules(ctx: click.Context, rules: RuleCollection, overwrite: Op overwrite_action_connectors=overwrite_action_connectors ) - def handle_response_errors(response: dict): - """Handle errors from the import response.""" - def parse_list_id(s: str): - """Parse the list ID from the error message.""" - match = re.search(r'list_id: "(.*?)"', s) - return match.group(1) if match else None - - # Re-try to address known Kibana issue: https://github.com/elastic/kibana/issues/143864 - workaround_errors = [] - - flattened_exceptions = [e for sublist in exception_dicts for e in sublist] - all_exception_list_ids = {exception["list_id"] for exception in flattened_exceptions} - - click.echo(f'{len(response["errors"])} rule(s) failed to import!') - - for error in response['errors']: - click.echo(f' - {error["rule_id"]}: ({error["error"]["status_code"]}) {error["error"]["message"]}') - - if "references a non existent exception list" in error["error"]["message"]: - list_id = parse_list_id(error["error"]["message"]) - if list_id in all_exception_list_ids: - workaround_errors.append(error["rule_id"]) - - if workaround_errors: - workaround_errors = list(set(workaround_errors)) - click.echo(f'Missing exception list errors detected for {len(workaround_errors)} rules. ' - 'Try re-importing using the following command and rule IDs:\n') - click.echo('python -m detection_rules kibana import-rules -o ', nl=False) - click.echo(' '.join(f'-id {rule_id}' for rule_id in workaround_errors)) - click.echo() - if successful_rule_ids: click.echo(f'{len(successful_rule_ids)} rule(s) successfully imported') rule_str = '\n - '.join(successful_rule_ids) click.echo(f' - {rule_str}') if response['errors']: - handle_response_errors(response) + _handle_response_errors(response) + else: + _process_imported_items(exception_dicts, 'exception list(s)', 'list_id') + _process_imported_items(action_connectors_dicts, 'action connector(s)', 'id') return response, results