# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License # 2.0; you may not use this file except in compliance with the Elastic License # 2.0. """CLI commands for detection_rules.""" import dataclasses import json import os import time from collections.abc import Iterable from datetime import UTC, datetime from pathlib import Path from typing import TYPE_CHECKING, Any, Literal, get_args from uuid import uuid4 import click import pytoml # type: ignore[reportMissingTypeStubs] from marshmallow_dataclass import class_schema from semver import Version from .action_connector import ( TOMLActionConnectorContents, build_action_connector_objects, parse_action_connector_results_from_api, ) from .attack import build_threat_map_entry from .cli_utils import multi_collection, rule_prompt from .config import load_current_package_version, parse_rules_config from .exception import TOMLExceptionContents, build_exception_objects, parse_exceptions_results_from_api from .generic_loader import GenericCollection from .misc import ( add_client, getdefault, nested_set, parse_user_config, raise_client_error, ) from .rule import DeprecatedRule, ESQLRuleData, QueryRuleData, RuleMeta, TOMLRule, TOMLRuleContents from .rule_formatter import toml_write from .rule_loader import RawRuleCollection, RuleCollection, update_metadata_from_file from .rule_validators import ESQLValidator from .schemas import all_versions, definitions, get_incompatible_fields, get_schema_file from .utils import ( Ndjson, clear_caches, get_etc_path, get_path, load_dump, # type: ignore[reportUnknownVariableType] load_rule_contents, rulename_to_filename, ) if TYPE_CHECKING: from elasticsearch import Elasticsearch RULES_CONFIG = parse_rules_config() RULES_DIRS = RULES_CONFIG.rule_dirs @click.group( "detection-rules", context_settings={ "help_option_names": ["-h", "--help"], "max_content_width": int(os.getenv("DR_CLI_MAX_WIDTH", 240)), # noqa: PLW1508 }, ) @click.option( "--debug/--no-debug", "-D/-N", is_flag=True, default=None, help="Print full exception stacktrace on errors", ) @click.pass_context def root(ctx: click.Context, debug: bool) -> None: """Commands for detection-rules repository.""" debug = debug if debug else parse_user_config().get("debug") ctx.obj = {"debug": debug, "rules_config": RULES_CONFIG} if debug: click.secho("DEBUG MODE ENABLED", fg="yellow") @root.command("create-rule") @click.argument("path", type=Path) @click.option( "--config", "-c", type=click.Path(exists=True, dir_okay=False, path_type=Path), help="Rule or config file" ) @click.option("--required-only", is_flag=True, help="Only prompt for required fields") @click.option( "--rule-type", "-t", type=click.Choice(sorted(TOMLRuleContents.all_rule_types())), help="Type of rule to create" ) def create_rule(path: Path, config: Path, required_only: bool, rule_type: str): # noqa: ANN201 """Create a detection rule.""" contents: dict[str, Any] = load_rule_contents(config, single_only=True)[0] if config else {} return rule_prompt(path, rule_type=rule_type, required_only=required_only, save=True, **contents) @root.command("generate-rules-index") @click.option("--query", "-q", help="Optional KQL query to limit to specific rules") @click.option("--overwrite", is_flag=True, help="Overwrite files in an existing folder") @click.pass_context def generate_rules_index( ctx: click.Context, query: str, overwrite: bool, save_files: bool = True, ) -> tuple[Ndjson, Ndjson]: """Generate enriched indexes of rules, based on a KQL search, for indexing/importing into elasticsearch/kibana.""" from .packaging import Package if query: rule_paths = [r["file"] for r in ctx.invoke(search_rules, query=query, verbose=False)] rules = RuleCollection() rules.load_files(Path(p) for p in rule_paths) else: rules = RuleCollection.default() rule_count = len(rules) package = Package(rules, name=load_current_package_version(), verbose=False) package_hash = package.get_package_hash() bulk_upload_docs, importable_rules_docs = package.create_bulk_index_body() if save_files: path = get_path(["enriched-rule-indexes", package_hash]) path.mkdir(parents=True, exist_ok=overwrite) bulk_upload_docs.dump(path.joinpath("enriched-rules-index-uploadable.ndjson"), sort_keys=True) importable_rules_docs.dump(path.joinpath("enriched-rules-index-importable.ndjson"), sort_keys=True) click.echo(f"files saved to: {path}") click.echo(f"{rule_count} rules included") return bulk_upload_docs, importable_rules_docs @root.command("import-rules-to-repo") @click.argument("input-file", type=click.Path(dir_okay=False, exists=True, path_type=Path), nargs=-1, required=False) @click.option("--action-connector-import", "-ac", is_flag=True, help="Include action connectors in export") @click.option("--exceptions-import", "-e", is_flag=True, help="Include exceptions in export") @click.option("--required-only", is_flag=True, help="Only prompt for required fields") @click.option("--directory", "-d", type=click.Path(file_okay=False, exists=True), help="Load files from a directory") @click.option( "--save-directory", "-s", type=click.Path(file_okay=False, exists=True, path_type=Path), help="Save imported rules to a directory", ) @click.option( "--exceptions-directory", "-se", type=click.Path(file_okay=False, exists=True, path_type=Path), help="Save imported exceptions to a directory", ) @click.option( "--action-connectors-directory", "-sa", type=click.Path(file_okay=False, exists=True, path_type=Path), help="Save imported actions to a directory", ) @click.option("--skip-errors", "-ske", is_flag=True, help="Skip rule import errors") @click.option("--default-author", "-da", type=str, required=False, help="Default author for rules missing one") @click.option("--strip-none-values", "-snv", is_flag=True, help="Strip None values from the rule") @click.option("--local-creation-date", "-lc", is_flag=True, help="Preserve the local creation date of the rule") @click.option("--local-updated-date", "-lu", is_flag=True, help="Preserve the local updated date of the rule") @click.option( "--load-rule-loading", "-lr", is_flag=True, help="Enable arbitrary rule loading from the rules directories (Can be very slow!)", ) def import_rules_into_repo( # noqa: PLR0912, PLR0913, PLR0915 input_file: tuple[Path, ...] | None, required_only: bool, action_connector_import: bool, exceptions_import: bool, directory: Path | None, save_directory: Path, action_connectors_directory: Path | None, exceptions_directory: Path | None, skip_errors: bool, default_author: str, strip_none_values: bool, local_creation_date: bool, local_updated_date: bool, load_rule_loading: bool, ) -> None: """Import rules from json, toml, or yaml files containing Kibana exported rule(s).""" errors: list[str] = [] rule_files: list[Path] = [] if directory: rule_files = list(directory.glob("**/*.*")) if input_file: rule_files = sorted({*rule_files, *input_file}) file_contents: list[Any] = [] for rule_file in rule_files: file_contents.extend(load_rule_contents(Path(rule_file))) if not file_contents: click.echo("Must specify at least one file!") raw_rule_collection = RawRuleCollection() if load_rule_loading: raw_rule_collection = raw_rule_collection.default() exceptions_containers = {} exceptions_items = {} exceptions_containers, exceptions_items, _, unparsed_results = parse_exceptions_results_from_api(file_contents) action_connectors, unparsed_results = parse_action_connector_results_from_api(unparsed_results) file_contents = unparsed_results exception_list_rule_table: dict[str, Any] = {} action_connector_rule_table: dict[str, Any] = {} rule_count = 0 for contents in file_contents: # Don't load exceptions as rules if contents.get("type") not in get_args(definitions.RuleType): click.echo(f"Skipping - {contents.get('type')} is not a supported rule type") continue base_path = contents.get("name") or contents.get("rule", {}).get("name") base_path = rulename_to_filename(base_path) if base_path else base_path if base_path is None: raise ValueError(f"Invalid rule file, please ensure the rule has a name field: {contents}") rule_base_path = Path(save_directory or RULES_DIRS[0]) rule_path = rule_base_path / base_path rule_id = contents.get("rule_id") if rule_id in raw_rule_collection.id_map: rule_path = raw_rule_collection.id_map[rule_id].path or rule_path # handle both rule json formats loaded from kibana and toml data_view_id = contents.get("data_view_id") or contents.get("rule", {}).get("data_view_id") additional = ["index"] if not data_view_id else ["data_view_id"] # Use additional to store all available fields for the rule additional += [key for key in contents if key not in additional and contents.get(key, None)] # use default author if not provided contents["author"] = contents.get("author") or default_author or [contents.get("created_by")] if isinstance(contents["author"], str): contents["author"] = [contents["author"]] contents.update( update_metadata_from_file( rule_path, {"creation_date": local_creation_date, "updated_date": local_updated_date} ) ) output = rule_prompt( rule_path, required_only=required_only, save=True, verbose=True, additional_required=additional, skip_errors=skip_errors, strip_none_values=strip_none_values, **contents, ) # If output is not a TOMLRule if isinstance(output, str): errors.append(output) else: rule_count += 1 if contents.get("exceptions_list"): # For each item in rule.contents.data.exceptions_list to the exception_list_rule_table under the list_id for exception in contents["exceptions_list"]: exception_id = exception["list_id"] if exception_id not in exception_list_rule_table: exception_list_rule_table[exception_id] = [] exception_list_rule_table[exception_id].append({"id": contents["id"], "name": contents["name"]}) if contents.get("actions"): # If rule has actions with connectors, add them to the action_connector_rule_table under the action_id for action in contents["actions"]: action_id = action["id"] if action_id not in action_connector_rule_table: action_connector_rule_table[action_id] = [] action_connector_rule_table[action_id].append({"id": contents["id"], "name": contents["name"]}) # Build TOMLException Objects if exceptions_import: _, e_output, e_errors = build_exception_objects( exceptions_containers, exceptions_items, exception_list_rule_table, exceptions_directory, save_toml=True, skip_errors=skip_errors, verbose=True, ) for line in e_output: click.echo(line) errors.extend(e_errors) # Build TOMLActionConnector Objects if action_connector_import: _, ac_output, ac_errors = build_action_connector_objects( action_connectors, action_connector_rule_table, action_connectors_directory, save_toml=True, skip_errors=skip_errors, verbose=True, ) for line in ac_output: click.echo(line) errors.extend(ac_errors) exceptions_count = 0 if not exceptions_import else len(exceptions_containers) + len(exceptions_items) click.echo(f"{rule_count + exceptions_count + len(action_connectors)} results exported") click.echo(f"{rule_count} rules converted") click.echo(f"{exceptions_count} exceptions exported") click.echo(f"{len(action_connectors)} actions connectors exported") if errors: _dir = save_directory if save_directory else RULES_DIRS[0] err_file = _dir / "_errors.txt" _ = err_file.write_text("\n".join(errors)) click.echo(f"{len(errors)} errors saved to {err_file}") @root.command("build-limited-rules") @click.option( "--stack-version", type=click.Choice(all_versions()), required=True, help="Version to downgrade to be compatible with the older instance of Kibana", ) @click.option("--output-file", "-o", type=click.Path(dir_okay=False, exists=False), required=True) def build_limited_rules(stack_version: str, output_file: str) -> None: """ Import rules from json, toml, or Kibana exported rule file(s), filter out unsupported ones, and write to output NDJSON file. """ # Schema generation and incompatible fields detection query_rule_data = class_schema(QueryRuleData)() fields = getattr(query_rule_data, "fields", {}) incompatible_fields = get_incompatible_fields( list(fields.values()), Version.parse(stack_version, optional_minor_and_patch=True) ) # Load all rules rules = RuleCollection.default() # Define output path output_path = Path(output_file) # Define ndjson instance for output ndjson_output = Ndjson() # Get API schema for rule type api_schema = get_schema_file(stack_version, "base")["properties"]["type"]["enum"] # Function to process each rule def process_rule(rule: TOMLRule, incompatible_fields: list[str]) -> dict[str, Any] | None: if rule.contents.type not in api_schema: click.secho( f"{rule.contents.name} - Skipping unsupported rule type: {rule.contents.get('type')}", fg="yellow" ) return None # Remove unsupported fields from rule rule_contents = rule.contents.to_api_format() for field in incompatible_fields: rule_contents.pop(field, None) return rule_contents # Process each rule and add to ndjson_output for rule in rules.rules: processed_rule = process_rule(rule, incompatible_fields) if processed_rule is not None: ndjson_output.append(processed_rule) # Write ndjson_output to file ndjson_output.dump(output_path) click.echo(f"Success: Rules written to {output_file}") @root.command("toml-lint") @click.option( "--rule-file", "-f", multiple=True, type=click.Path(exists=True, path_type=Path), help="Specify one or more rule files.", ) def toml_lint(rule_file: list[Path]) -> None: """Cleanup files with some simple toml formatting.""" if rule_file: rules = RuleCollection() rules.load_files(Path(p) for p in rule_file) else: rules = RuleCollection.default() # re-save the rules to force TOML reformatting for rule in rules: rule.save_toml() click.echo("TOML file linting complete") @root.command("mass-update") @click.argument("query") @click.option("--metadata", "-m", is_flag=True, help="Make an update to the rule metadata rather than contents.") @click.option("--language", type=click.Choice(["eql", "kql"]), default="kql") @click.option( "--field", type=(str, str), multiple=True, help="Use rule-search to retrieve a subset of rules and modify values " "(ex: --field management.ecs_version 1.1.1).\n" "Note this is limited to string fields only. Nested fields should use dot notation.", ) @click.pass_context def mass_update( ctx: click.Context, query: str, metadata: bool, language: Literal["eql", "kql"], field: tuple[str, str], ) -> Any: """Update multiple rules based on eql results.""" rules = RuleCollection().default() results = ctx.invoke(search_rules, query=query, language=language, verbose=False) matching_ids = {r["rule_id"] for r in results} rules = rules.filter(lambda r: r.id in matching_ids) for rule in rules: for key, value in field: nested_set(rule.metadata if metadata else rule.contents, key, value) # type: ignore[reportAttributeAccessIssue] rule.validate(as_rule=True) # type: ignore[reportAttributeAccessIssue] rule.save(as_rule=True) # type: ignore[reportAttributeAccessIssue] return ctx.invoke( search_rules, query=query, language=language, columns=["rule_id", "name"] + [k[0].split(".")[-1] for k in field], ) @root.command("view-rule") @click.argument("rule-file", type=Path) @click.option("--api-format/--rule-format", default=True, help="Print the rule in final api or rule format") @click.option("--esql-remote-validation", is_flag=True, default=False, help="Enable remote validation for the rule") @click.pass_context def view_rule( _: click.Context, rule_file: Path, api_format: str, esql_remote_validation: bool ) -> TOMLRule | DeprecatedRule: """View an internal rule or specified rule file.""" rule = RuleCollection().load_file(rule_file) if ( esql_remote_validation and isinstance(rule.contents.data, ESQLRuleData) and isinstance(rule.contents.data.validator, ESQLValidator) and isinstance(rule.contents.metadata, RuleMeta) and not getdefault("remote_esql_validation")() ): rule.contents.data.validator.validate(rule.contents.data, rule.contents.metadata, force_remote_validation=True) if api_format: click.echo(json.dumps(rule.contents.to_api_format(), indent=2, sort_keys=True)) else: click.echo(toml_write(rule.contents.to_dict())) # type: ignore[reportAttributeAccessIssue] return rule def _export_rules( # noqa: PLR0913 rules: RuleCollection, outfile: Path, downgrade_version: definitions.SemVer | None = None, verbose: bool = True, skip_unsupported: bool = False, include_metadata: bool = False, include_action_connectors: bool = False, include_exceptions: bool = False, ) -> None: """Export rules and exceptions into a consolidated ndjson file.""" from .rule import downgrade_contents_from_rule outfile = outfile.with_suffix(".ndjson") unsupported: list[str] = [] output_lines: list[str] = [] if downgrade_version: for rule in rules: try: output_lines.append( json.dumps( downgrade_contents_from_rule(rule, downgrade_version, include_metadata=include_metadata), sort_keys=True, ) ) except ValueError as e: if skip_unsupported: unsupported.append(f"{e}: {rule.id} - {rule.name}") else: raise else: output_lines = [ json.dumps(r.contents.to_api_format(include_metadata=include_metadata), sort_keys=True) for r in rules ] # Add exceptions to api format here and add to output_lines if include_exceptions or include_action_connectors: cl = GenericCollection.default() # Get exceptions in API format if include_exceptions: exceptions = [d.contents.to_api_format() for d in cl.items if isinstance(d.contents, TOMLExceptionContents)] exceptions = [e for sublist in exceptions for e in sublist] output_lines.extend(json.dumps(e, sort_keys=True) for e in exceptions) if include_action_connectors: action_connectors = [ d.contents.to_api_format() for d in cl.items if isinstance(d.contents, TOMLActionConnectorContents) ] actions = [a for sublist in action_connectors for a in sublist] output_lines.extend(json.dumps(a, sort_keys=True) for a in actions) _ = outfile.write_text("\n".join(output_lines) + "\n") if verbose: click.echo(f"Exported {len(rules) - len(unsupported)} rules into {outfile}") if skip_unsupported and unsupported: unsupported_str = "\n- ".join(unsupported) click.echo(f"Skipped {len(unsupported)} unsupported rules: \n- {unsupported_str}") @root.command("export-rules-from-repo") @multi_collection @click.option( "--outfile", "-o", default=Path(get_path(["exports", f"{time.strftime('%Y%m%dT%H%M%SL')}.ndjson"])), type=Path, help="Name of file for exported rules", ) @click.option("--replace-id", "-r", is_flag=True, help="Replace rule IDs with new IDs before export") @click.option( "--stack-version", type=click.Choice(all_versions()), help="Downgrade a rule version to be compatible with older instances of Kibana", ) @click.option( "--skip-unsupported", "-s", is_flag=True, help="If `--stack-version` is passed, skip rule types which are unsupported (an error will be raised otherwise)", ) @click.option("--include-metadata", type=bool, is_flag=True, default=False, help="Add metadata to the exported rules") @click.option( "--include-action-connectors", "-ac", type=bool, is_flag=True, default=False, help="Include Action Connectors in export", ) @click.option( "--include-exceptions", "-e", type=bool, is_flag=True, default=False, help="Include Exceptions Lists in export" ) def export_rules_from_repo( # noqa: PLR0913 rules: RuleCollection, outfile: Path, replace_id: bool, stack_version: str, skip_unsupported: bool, include_metadata: bool, include_action_connectors: bool, include_exceptions: bool, ) -> RuleCollection: """Export rule(s) and exception(s) into an importable ndjson file.""" if len(rules) == 0: raise ValueError("No rules found") if replace_id: # if we need to replace the id, take each rule object and create a copy # of it, with only the rule_id field changed old_rules = rules rules = RuleCollection() for rule in old_rules: new_data = dataclasses.replace(rule.contents.data, rule_id=str(uuid4())) new_contents = dataclasses.replace(rule.contents, data=new_data) rules.add_rule(TOMLRule(contents=new_contents)) outfile.parent.mkdir(exist_ok=True) _export_rules( rules=rules, outfile=outfile, downgrade_version=stack_version, skip_unsupported=skip_unsupported, include_metadata=include_metadata, include_action_connectors=include_action_connectors, include_exceptions=include_exceptions, ) return rules @root.command("validate-rule") @click.argument("path") @click.pass_context def validate_rule(_: click.Context, path: str) -> TOMLRule | DeprecatedRule: """Check if a rule staged in rules dir validates against a schema.""" rule = RuleCollection().load_file(Path(path)) click.echo("Rule validation successful") return rule @root.command("validate-all") def validate_all() -> None: """Check if all rules validates against a schema.""" _ = RuleCollection.default() click.echo("Rule validation successful") @root.command("rule-search") @click.argument("query", required=False) @click.option("--columns", "-c", multiple=True, help="Specify columns to add the table") @click.option("--language", type=click.Choice(["eql", "kql"]), default="kql") @click.option("--count", is_flag=True, help="Return a count rather than table") def search_rules( # noqa: PLR0913 query: str | None, columns: list[str], language: Literal["eql", "kql"], count: bool, verbose: bool = True, rules: dict[str, TOMLRule] | None = None, pager: bool = False, ) -> list[dict[str, Any]]: """Use KQL or EQL to find matching rules.""" from eql import parse_query # type: ignore[reportMissingTypeStubs] from eql.build import get_engine # type: ignore[reportMissingTypeStubs] from eql.pipes import CountPipe # type: ignore[reportMissingTypeStubs] from eql.table import Table # type: ignore[reportMissingTypeStubs] from kql import get_evaluator # type: ignore[reportMissingTypeStubs] from .rule import get_unique_query_fields flattened_rules: list[dict[str, Any]] = [] rules = rules or {str(rule.path): rule for rule in RuleCollection.default()} for file_name, rule in rules.items(): flat: dict[str, Any] = {"file": os.path.relpath(file_name)} flat.update(rule.contents.to_dict()) flat.update(flat["metadata"]) flat.update(flat["rule"]) tactic_names: list[str] = [] technique_ids: list[str] = [] subtechnique_ids: list[str] = [] for entry in flat["rule"].get("threat", []): if entry["framework"] != "MITRE ATT&CK": continue techniques = entry.get("technique", []) tactic_names.append(entry["tactic"]["name"]) technique_ids.extend([t["id"] for t in techniques]) subtechnique_ids.extend([st["id"] for t in techniques for st in t.get("subtechnique", [])]) flat.update( techniques=technique_ids, tactics=tactic_names, subtechniques=subtechnique_ids, unique_fields=get_unique_query_fields(rule), ) flattened_rules.append(flat) flattened_rules.sort(key=lambda dct: dct["name"]) filtered: list[dict[str, Any]] = [] if language == "kql": evaluator = get_evaluator(query) if query else lambda _: True # type: ignore[reportUnknownLambdaType] filtered = list(filter(evaluator, flattened_rules)) # type: ignore[reportCallIssue] elif language == "eql": parsed = parse_query(query, implied_any=True, implied_base=True) # type: ignore[reportUnknownVariableType] evaluator = get_engine(parsed) # type: ignore[reportUnknownVariableType] filtered = [result.events[0].data for result in evaluator(flattened_rules)] # type: ignore[reportUnknownVariableType] if not columns and any(isinstance(pipe, CountPipe) for pipe in parsed.pipes): # type: ignore[reportAttributeAccessIssue] columns = ["key", "count", "percent"] if count: click.echo(f"{len(filtered)} rules") return filtered columns = ",".join(columns).split(",") if columns else ["rule_id", "file", "name"] table: Table = Table.from_list(columns, filtered) # type: ignore[reportUnknownMemberType] if verbose: click.echo_via_pager(table) if pager else click.echo(table) return filtered @root.command("build-threat-map-entry") @click.argument("tactic") @click.argument("technique-ids", nargs=-1) def build_threat_map(tactic: str, technique_ids: Iterable[str]) -> dict[str, Any]: """Build a threat map entry.""" entry = build_threat_map_entry(tactic, *technique_ids) rendered = pytoml.dumps({"rule": {"threat": [entry]}}) # type: ignore[reportUnknownMemberType] # strip out [rule] cleaned = "\n".join(rendered.splitlines()[2:]) print(cleaned) return entry @root.command("test") @click.pass_context def test_rules(ctx: click.Context) -> None: """Run unit tests over all of the rules.""" import pytest rules_config = ctx.obj["rules_config"] test_config = rules_config.test_config tests, skipped = test_config.get_test_names(formatted=True) if skipped: click.echo(f"Tests skipped per config ({len(skipped)}):") click.echo("\n".join(skipped)) clear_caches() if tests: ctx.exit(pytest.main(["-v", *tests])) else: click.echo("No tests found to execute!") @root.group("typosquat") def typosquat_group() -> None: """Commands for generating typosquat detections.""" @typosquat_group.command("create-dnstwist-index") @click.argument("input-file", type=click.Path(exists=True, dir_okay=False), required=True) @click.pass_context @add_client(["elasticsearch"], add_func_arg=False) def create_dnstwist_index(ctx: click.Context, input_file: click.Path) -> None: """Create a dnstwist index in Elasticsearch to work with a threat match rule.""" es_client: Elasticsearch = ctx.obj["es"] click.echo(f"Attempting to load dnstwist data from {input_file}") dnstwist_data: list[dict[str, Any]] = load_dump(str(input_file)) # type: ignore[reportAssignmentType] click.echo(f"{len(dnstwist_data)} records loaded") original_domain = next(r["domain-name"] for r in dnstwist_data if r.get("fuzzer", "") == "original*") # type: ignore[reportAttributeAccessIssue] click.echo(f"Original domain name identified: {original_domain}") domain = original_domain.split(".")[0] domain_index = f"dnstwist-{domain}" # If index already exists, prompt user to confirm if they want to overwrite if es_client.indices.exists(index=domain_index) and click.confirm( f"dnstwist index: {domain_index} already exists for {original_domain}. Do you want to overwrite?", abort=True, ): _ = es_client.indices.delete(index=domain_index) fields = [ "dns-a", "dns-aaaa", "dns-mx", "dns-ns", "banner-http", "fuzzer", "original-domain", "dns.question.registered_domain", ] timestamp_field = "@timestamp" mappings = {"mappings": {"properties": {f: {"type": "keyword"} for f in fields}}} mappings["mappings"]["properties"][timestamp_field] = {"type": "date"} _ = es_client.indices.create(index=domain_index, body=mappings) # handle dns.question.registered_domain separately _ = fields.pop() es_updates: list[dict[str, Any]] = [] now = datetime.now(UTC) for item in dnstwist_data: if item["fuzzer"] == "original*": continue record = item.copy() record.setdefault("dns", {}).setdefault("question", {}).setdefault("registered_domain", item.get("domain-name")) for field in fields: _ = record.setdefault(field, None) record["@timestamp"] = now es_updates.extend([{"create": {"_index": domain_index}}, record]) click.echo(f"Indexing data for domain {original_domain}") results = es_client.bulk(body=es_updates) if results["errors"]: error = {r["create"]["result"] for r in results["items"] if r["create"]["status"] != 201} # noqa: PLR2004 raise_client_error(f"Errors occurred during indexing:\n{error}") click.echo(f"{len(results['items'])} watchlist domains added to index") click.echo("Run `prep-rule` and import to Kibana to create alerts on this index") @typosquat_group.command("prep-rule") @click.argument("author") def prep_rule(author: str) -> None: """Prep the detection threat match rule for dnstwist data with a rule_id and author.""" rule_template_file = get_etc_path(["rule_template_typosquatting_domain.json"]) template_rule = json.loads(rule_template_file.read_text()) template_rule.update(author=[author], rule_id=str(uuid4())) updated_rule = get_path(["rule_typosquatting_domain.ndjson"]) _ = updated_rule.write_text(json.dumps(template_rule, sort_keys=True)) click.echo(f"Rule saved to: {updated_rule}. Import this to Kibana to create alerts on all dnstwist-* indexes") click.echo("Note: you only need to import and enable this rule one time for all dnstwist-* indexes")