import csv import json import re from concurrent.futures import ThreadPoolExecutor, as_completed from functools import lru_cache from io import StringIO from pathlib import Path from typing import Pattern, get_args import yaml from jinja2 import Environment, FileSystemLoader from atomic_red_team.attack_api import Attack, PlatformFilter, _matches_platform from atomic_red_team.common import atomics_path, base_path from atomic_red_team.models import Platform, get_language, get_supported_platform PLATFORMS: list[str] = list(get_args(Platform)) class AtomicRedTeam: """Loads Atomic Red Team YAML files and formats technique links.""" def __init__( self, atomics_dir: str | Path = atomics_path, attack_api: Attack | None = None ): self.atomics_dir = Path(atomics_dir) self.attack_api = attack_api or Attack() self._atomic_tests: list[dict] | None = None self._tests_by_id: dict[str, list[dict]] | None = None @property def atomic_test_paths(self) -> list[Path]: return sorted(self.atomics_dir.glob("T*/T*.yaml")) @property def atomic_tests(self) -> list[dict]: if self._atomic_tests is None: self._atomic_tests = [] for path in self.atomic_test_paths: atomic_yaml = yaml.load(path.read_text(), Loader=yaml.CSafeLoader) atomic_yaml["atomic_yaml_path"] = str(path) self._atomic_tests.append(atomic_yaml) return self._atomic_tests def atomic_tests_for_technique( self, technique_or_identifier: dict | str ) -> list[dict]: if self._tests_by_id is None: self._tests_by_id = { atomic["attack_technique"].upper(): atomic.get("atomic_tests", []) for atomic in self.atomic_tests } technique_identifier = self._technique_identifier(technique_or_identifier) return self._tests_by_id.get(technique_identifier.upper(), []) def atomic_tests_for_technique_by_platform( self, technique_or_identifier: dict | str, platform: str, ) -> list[dict]: tests = self.atomic_tests_for_technique(technique_or_identifier) return [ test for test in tests if platform in test.get("supported_platforms", []) ] def github_link_to_technique( self, technique: dict, include_identifier: bool = False, only_platform: PlatformFilter = ".*", ) -> str: technique_identifier = self.attack_api.technique_identifier_for_technique( technique ).upper() link_display = ( f"{technique_identifier} {technique['name']}" if include_identifier else technique["name"] ) yaml_file = ( self.atomics_dir / technique_identifier / f"{technique_identifier}.yaml" ) markdown_file = ( self.atomics_dir / technique_identifier / f"{technique_identifier}.md" ) if ( self.atomic_yaml_has_test_for_platform(yaml_file, only_platform) and markdown_file.exists() ): return f"[{link_display}](../../{technique_identifier}/{technique_identifier}.md)" return f"{link_display} [CONTRIBUTE A TEST](https://github.com/redcanaryco/atomic-red-team/wiki/Contributing)" def atomic_yaml_has_test_for_platform( self, yaml_file: Path, only_platform: PlatformFilter, ) -> bool: if not yaml_file.exists(): return False atomic_yaml = yaml.load(yaml_file.read_text(), Loader=yaml.CSafeLoader) return any( _matches_platform(platform, only_platform) for atomic in atomic_yaml.get("atomic_tests", []) for platform in atomic.get("supported_platforms", []) ) def _technique_identifier(self, technique_or_identifier: dict | str) -> str: if isinstance(technique_or_identifier, dict): return self.attack_api.technique_identifier_for_technique( technique_or_identifier ) return technique_or_identifier class AtomicRedTeamDocs: """Generates Atomic Red Team markdown, CSV, YAML, and Navigator indexes.""" def __init__( self, base_dir: str | Path = base_path, attack_file: str | Path | None = None, atomic_red_team: AtomicRedTeam | None = None, ): self.base_dir = Path(base_dir) self.attack_api = Attack(attack_file) self.atomic_red_team = atomic_red_team or AtomicRedTeam( self.base_dir / "atomics", self.attack_api, ) def generate_all_the_docs(self) -> tuple[list[str], list[str]]: # Pre-warm all caches on the main thread before handing off to workers. # atomic_tests loads all YAML; _tests_by_id builds the lookup dict; # techniques_by_id is used by generate_technique_docs per technique. _ = self.atomic_red_team.atomic_tests _ = self.attack_api.techniques_by_id def _render_one(atomic_yaml: dict) -> tuple[str, str, str]: output_path = Path(atomic_yaml["atomic_yaml_path"]).with_suffix(".md") try: self.generate_technique_docs(atomic_yaml, output_path) return ("ok", atomic_yaml["atomic_yaml_path"], str(output_path)) except Exception as ex: return ("fail", atomic_yaml["atomic_yaml_path"], str(ex)) oks: list[str] = [] fails: list[str] = [] with ThreadPoolExecutor() as executor: futures = [ executor.submit(_render_one, ay) for ay in self.atomic_red_team.atomic_tests ] for future in as_completed(futures): status, src_path, detail = future.result() if status == "ok": print(f"Generating docs for {src_path} => {detail} => OK") oks.append(src_path) else: print(f"Generating docs for {src_path} FAIL\n{detail}") fails.append(src_path) print() print(f"Generated docs for {len(oks)} techniques, {len(fails)} failures") self.generate_indexes() return oks, fails def generate_technique_docs( self, atomic_yaml: dict, output_doc_path: str | Path ) -> None: technique = self.attack_api.technique_info(atomic_yaml["attack_technique"]) if technique is None: raise ValueError( f"Unknown ATT&CK technique {atomic_yaml['attack_technique']}" ) technique = {**technique, "identifier": atomic_yaml["attack_technique"].upper()} output_doc_path = Path(output_doc_path) output_doc_path.write_text(_render_technique_markdown(technique, atomic_yaml)) def generate_indexes(self) -> None: index_dir = self.base_dir / "atomics" / "Indexes" for subdir in ( "Matrices", "Indexes-Markdown", "Indexes-CSV", "Attack-Navigator-Layers", ): (index_dir / subdir).mkdir(parents=True, exist_ok=True) self.generate_attack_matrix("All", index_dir / "Matrices" / "matrix.md") for title, platform, filename in _platform_outputs("matrix.md"): self.generate_attack_matrix( title, index_dir / "Matrices" / filename, platform ) self.generate_index("All", index_dir / "Indexes-Markdown" / "index.md") for title, platform, filename in _platform_outputs("index.md"): # Use ".*" for attack_platform so STIX v19 platform renames don't break # cloud-platform indexes; rely on YAML supported_platforms for filtering. self.generate_index( title, index_dir / "Indexes-Markdown" / filename, platform ) self.generate_index_csv(index_dir / "Indexes-CSV" / "index.csv") for title, platform, filename in _platform_outputs("index.csv"): self.generate_index_csv(index_dir / "Indexes-CSV" / filename, platform) self.generate_yaml_index(index_dir / "index.yaml") for platform in PLATFORMS: self.generate_yaml_index_by_platform( index_dir / f"{platform.replace(':', '_')}-index.yaml", platform, ) self.generate_navigator_layers(index_dir / "Attack-Navigator-Layers") def generate_attack_matrix( self, title_prefix: str, output_doc_path: str | Path, only_platform: PlatformFilter = ".*", ) -> None: rows = [f"# {title_prefix} Atomic Tests by ATT&CK Tactic & Technique\n"] tactics = self.attack_api.ordered_tactics() rows.append(f"| {' | '.join(tactics)} |\n") rows.append(f"|{'-----|' * len(tactics)}\n") for row_of_techniques in self.attack_api.ordered_tactic_to_technique_matrix( only_platform ): row = [ self.atomic_red_team.github_link_to_technique( technique, only_platform=only_platform ) if technique else "" for technique in row_of_techniques ] rows.append(f"| {' | '.join(row)} |\n") Path(output_doc_path).write_text("".join(rows)) def generate_index( self, title_prefix: str, output_doc_path: str | Path, only_platform: PlatformFilter = ".*", attack_platform: PlatformFilter = ".*", ) -> None: rows = [f"# {title_prefix} Atomic Tests by ATT&CK Tactic & Technique\n"] by_tactic = self.attack_api.techniques_by_tactic(attack_platform) for tactic in self.attack_api.ordered_tactics(): techniques = sorted( by_tactic.get(tactic, []), key=lambda t: _technique_sort_key( self.attack_api.technique_identifier_for_technique(t) ), ) if not techniques: continue # For platform-specific indexes exclude techniques with no tests on that platform. if only_platform != ".*": techniques = [ t for t in techniques if any( _matches_platform(platform, only_platform) for test in self.atomic_red_team.atomic_tests_for_technique(t) for platform in test.get("supported_platforms", []) ) ] if not techniques: continue rows.append(f"# {tactic}\n") for technique in techniques: rows.append( f"- {self.atomic_red_team.github_link_to_technique(technique, True, only_platform)}\n" ) tests = self.atomic_red_team.atomic_tests_for_technique(technique) for index, atomic_test in enumerate(tests, start=1): platforms = atomic_test.get("supported_platforms", []) if any( _matches_platform(platform, only_platform) for platform in platforms ): rows.append( f" - Atomic Test #{index}: {atomic_test['name']} [{', '.join(platforms)}]\n" ) rows.append("\n") Path(output_doc_path).write_text("".join(rows)) def generate_index_csv( self, output_doc_path: str | Path, only_platform: PlatformFilter = ".*", attack_platform: PlatformFilter = ".*", ) -> None: output = StringIO() writer = csv.writer(output, lineterminator="\n") writer.writerow( [ "Tactic", "Technique #", "Technique Name", "Test #", "Test Name", "Test GUID", "Executor Name", ] ) by_tactic = self.attack_api.techniques_by_tactic(attack_platform) for tactic in self.attack_api.ordered_tactics(): techniques = sorted( by_tactic.get(tactic, []), key=lambda t: _technique_sort_key( self.attack_api.technique_identifier_for_technique(t) ), ) for technique in techniques: tests = self.atomic_red_team.atomic_tests_for_technique(technique) for index, atomic_test in enumerate(tests, start=1): if any( _matches_platform(platform, only_platform) for platform in atomic_test.get("supported_platforms", []) ): writer.writerow( [ tactic, technique["external_references"][0]["external_id"], technique["name"], index, atomic_test["name"], atomic_test.get("auto_generated_guid", ""), atomic_test["executor"]["name"], ] ) Path(output_doc_path).write_text(output.getvalue()) def generate_yaml_index(self, output_doc_path: str | Path) -> None: result = self._yaml_index() Path(output_doc_path).write_text( yaml.dump( json.loads(json.dumps(result)), Dumper=yaml.CSafeDumper, sort_keys=False, allow_unicode=True, ) ) def generate_yaml_index_by_platform( self, output_doc_path: str | Path, platform: str ) -> None: result = self._yaml_index(platform) Path(output_doc_path).write_text( yaml.dump( json.loads(json.dumps(result)), Dumper=yaml.CSafeDumper, sort_keys=False, allow_unicode=True, ) ) def generate_navigator_layers(self, output_dir: str | Path) -> None: output_dir = Path(output_dir) layer_specs = [("art-navigator-layer.json", None)] layer_specs.extend( (f"art-navigator-layer-{platform.replace(':', '-')}.json", platform) for platform in PLATFORMS ) for filename, platform in layer_specs: layer = _navigator_layer( techniques=self._navigator_techniques(platform), layer_name=_layer_name(filename), platform=platform, ) (output_dir / filename).write_text( json.dumps(layer, separators=(",", ":")), encoding="utf-8" ) def _yaml_index(self, platform: str | None = None) -> dict: result = {} for tactic, techniques in self.attack_api.techniques_by_tactic().items(): result[tactic] = {} for technique in techniques: identifier = technique["external_references"][0]["external_id"] tests = self.atomic_red_team.atomic_tests_for_technique(technique) if platform: tests = [ test for test in tests if platform in test.get("supported_platforms", []) ] result[tactic][identifier] = { "technique": technique, "atomic_tests": tests, } return result def _navigator_techniques(self, platform: str | None = None) -> list[dict]: entries: dict[str, dict] = {} parent_scores: dict[str, int] = {} for atomic_yaml in self.atomic_red_team.atomic_tests: tests = atomic_yaml.get("atomic_tests", []) if platform: tests = [ test for test in tests if platform in test.get("supported_platforms", []) ] if not tests: continue technique_id = atomic_yaml["attack_technique"] entry: dict = { "techniqueID": technique_id, "score": len(tests), "enabled": True, } if platform is not None: entry["comment"] = "\n" + "".join( f"- {t['name']}\n" for t in tests ) entry["links"] = [{"label": "View Atomic", "url": _atomic_url(technique_id)}] entries[technique_id] = entry if "." in technique_id: parent_id = technique_id.split(".")[0] parent_scores[parent_id] = parent_scores.get(parent_id, 0) + len(tests) for parent_id, score in parent_scores.items(): if parent_id in entries: entries[parent_id]["score"] += score else: entries[parent_id] = { "techniqueID": parent_id, "score": score, "enabled": True, "links": [{"label": "View Atomic", "url": _atomic_url(parent_id)}], } return list(entries.values()) @lru_cache(maxsize=None) def _get_template(): environment = Environment( autoescape=False, keep_trailing_newline=True, loader=FileSystemLoader(Path(__file__).parent), ) environment.filters.update( { "anchor": _anchor, "attack_url_identifier": lambda value: str(value).replace(".", "/"), "cleanup": _cleanup, "language": get_language, "platform_list": _platform_list, } ) return environment.get_template("atomic_doc_template.md.j2") def _render_technique_markdown(technique: dict, atomic_yaml: dict) -> str: template = _get_template() technique = { **technique, "name": atomic_yaml.get("display_name", technique["name"]), } return template.render( atomic_yaml=atomic_yaml, attack_description_lines=_attack_description_lines(technique), technique=technique, ) def _attack_description_lines(technique: dict) -> list[str]: description = technique.get("description", "").replace("%\\<", "%<") description = re.sub( r".*?", lambda match: match.group(0).replace("~", r"\~"), description, ) return description.splitlines() def _platform_list(platforms: list[str]) -> str: return ", ".join(get_supported_platform(platform) for platform in platforms) def _cleanup(value: object) -> str: return str(value or "").strip().replace("\\", "\") def _anchor(title: str) -> str: return re.sub( r"[`~!@#$%^&*()+=<>?,./:;\"'|{}\[\]\\–—]", "", title.lower().replace(" ", "-") ) _PLATFORM_TITLES: dict[str, str] = { "windows": "Windows", "macos": "macOS", "linux": "Linux", "office-365": "Office 365", "azure-ad": "Azure AD", "google-workspace": "Google Workspace", "saas": "SaaS", "iaas": "IaaS", "containers": "Containers", "iaas:gcp": "IaaS:GCP", "iaas:azure": "IaaS:Azure", "iaas:aws": "IaaS:AWS", "esxi": "ESXi", } # Platforms whose filename prefix differs from their identifier (e.g. colon is invalid). _PLATFORM_FILENAME_PREFIX: dict[str, str] = { "iaas:gcp": "gcp", "iaas:azure": "azure", "iaas:aws": "aws", } def _platform_outputs(suffix: str) -> list[tuple[str, str, str]]: return [ ( _PLATFORM_TITLES.get(p, p), p, f"{_PLATFORM_FILENAME_PREFIX.get(p, p)}-{suffix}", ) for p in PLATFORMS ] def _atomic_url(technique_id: str) -> str: return f"https://github.com/redcanaryco/atomic-red-team/blob/master/atomics/{technique_id}/{technique_id}.md" def _navigator_layer( techniques: list[dict], layer_name: str, platform: str | None = None ) -> dict: filters: dict = {} if platform in ("windows", "macos", "linux"): filters = { "platforms": [ platform.replace("macos", "macOS") .replace("windows", "Windows") .replace("linux", "Linux") ] } return { "name": layer_name, "versions": {"attack": "19", "navigator": "5.3.0", "layer": "4.5"}, "description": f"{layer_name} MITRE ATT&CK Navigator Layer", "domain": "enterprise-attack", "filters": filters, "gradient": {"colors": ["#ffffff", "#ce232e"], "minValue": 0, "maxValue": 10}, "legendItems": [ {"label": "10 or more tests", "color": "#ce232e"}, {"label": "1 or more tests", "color": "#ffffff"}, ], "techniques": techniques, } def _technique_sort_key(technique_id: str) -> tuple[int, ...]: """Return a sort key that orders T1001 < T1001.001 < T1001.002 < T1002.""" parts = technique_id.lstrip("Tt").split(".") return tuple(int(p) for p in parts) def _layer_name(filename: str) -> str: if filename == "art-navigator-layer.json": return "Atomic Red Team" platform = filename.removeprefix("art-navigator-layer-").removesuffix(".json") names = { "windows": "Windows", "macos": "macOS", "linux": "Linux", "iaas": "Iaas", "iaas-aws": "Iaas:AWS", "iaas-azure": "Iaas:Azure", "iaas-gcp": "Iaas:GCP", "containers": "Containers", "saas": "SaaS", "google-workspace": "Google-Workspace", "azure-ad": "Azure-AD", "office-365": "Office-365", "esxi": "ESXi", } return f"Atomic Red Team ({names.get(platform, platform)})"