Files
2026-05-02 18:30:22 -04:00

581 lines
21 KiB
Python

import csv
import json
import re
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import lru_cache
from io import StringIO
from pathlib import Path
from typing import Pattern, get_args
import yaml
from jinja2 import Environment, FileSystemLoader
from atomic_red_team.attack_api import Attack, PlatformFilter, _matches_platform
from atomic_red_team.common import atomics_path, base_path
from atomic_red_team.models import Platform, get_language, get_supported_platform
PLATFORMS: list[str] = list(get_args(Platform))
class AtomicRedTeam:
"""Loads Atomic Red Team YAML files and formats technique links."""
def __init__(
self, atomics_dir: str | Path = atomics_path, attack_api: Attack | None = None
):
self.atomics_dir = Path(atomics_dir)
self.attack_api = attack_api or Attack()
self._atomic_tests: list[dict] | None = None
self._tests_by_id: dict[str, list[dict]] | None = None
@property
def atomic_test_paths(self) -> list[Path]:
return sorted(self.atomics_dir.glob("T*/T*.yaml"))
@property
def atomic_tests(self) -> list[dict]:
if self._atomic_tests is None:
self._atomic_tests = []
for path in self.atomic_test_paths:
atomic_yaml = yaml.load(path.read_text(), Loader=yaml.CSafeLoader)
atomic_yaml["atomic_yaml_path"] = str(path)
self._atomic_tests.append(atomic_yaml)
return self._atomic_tests
def atomic_tests_for_technique(
self, technique_or_identifier: dict | str
) -> list[dict]:
if self._tests_by_id is None:
self._tests_by_id = {
atomic["attack_technique"].upper(): atomic.get("atomic_tests", [])
for atomic in self.atomic_tests
}
technique_identifier = self._technique_identifier(technique_or_identifier)
return self._tests_by_id.get(technique_identifier.upper(), [])
def atomic_tests_for_technique_by_platform(
self,
technique_or_identifier: dict | str,
platform: str,
) -> list[dict]:
tests = self.atomic_tests_for_technique(technique_or_identifier)
return [
test for test in tests if platform in test.get("supported_platforms", [])
]
def github_link_to_technique(
self,
technique: dict,
include_identifier: bool = False,
only_platform: PlatformFilter = ".*",
) -> str:
technique_identifier = self.attack_api.technique_identifier_for_technique(
technique
).upper()
link_display = (
f"{technique_identifier} {technique['name']}"
if include_identifier
else technique["name"]
)
yaml_file = (
self.atomics_dir / technique_identifier / f"{technique_identifier}.yaml"
)
markdown_file = (
self.atomics_dir / technique_identifier / f"{technique_identifier}.md"
)
if (
self.atomic_yaml_has_test_for_platform(yaml_file, only_platform)
and markdown_file.exists()
):
return f"[{link_display}](../../{technique_identifier}/{technique_identifier}.md)"
return f"{link_display} [CONTRIBUTE A TEST](https://github.com/redcanaryco/atomic-red-team/wiki/Contributing)"
def atomic_yaml_has_test_for_platform(
self,
yaml_file: Path,
only_platform: PlatformFilter,
) -> bool:
if not yaml_file.exists():
return False
atomic_yaml = yaml.load(yaml_file.read_text(), Loader=yaml.CSafeLoader)
return any(
_matches_platform(platform, only_platform)
for atomic in atomic_yaml.get("atomic_tests", [])
for platform in atomic.get("supported_platforms", [])
)
def _technique_identifier(self, technique_or_identifier: dict | str) -> str:
if isinstance(technique_or_identifier, dict):
return self.attack_api.technique_identifier_for_technique(
technique_or_identifier
)
return technique_or_identifier
class AtomicRedTeamDocs:
"""Generates Atomic Red Team markdown, CSV, YAML, and Navigator indexes."""
def __init__(
self,
base_dir: str | Path = base_path,
attack_file: str | Path | None = None,
atomic_red_team: AtomicRedTeam | None = None,
):
self.base_dir = Path(base_dir)
self.attack_api = Attack(attack_file)
self.atomic_red_team = atomic_red_team or AtomicRedTeam(
self.base_dir / "atomics",
self.attack_api,
)
def generate_all_the_docs(self) -> tuple[list[str], list[str]]:
# Pre-warm all caches on the main thread before handing off to workers.
# atomic_tests loads all YAML; _tests_by_id builds the lookup dict;
# techniques_by_id is used by generate_technique_docs per technique.
_ = self.atomic_red_team.atomic_tests
_ = self.attack_api.techniques_by_id
def _render_one(atomic_yaml: dict) -> tuple[str, str, str]:
output_path = Path(atomic_yaml["atomic_yaml_path"]).with_suffix(".md")
try:
self.generate_technique_docs(atomic_yaml, output_path)
return ("ok", atomic_yaml["atomic_yaml_path"], str(output_path))
except Exception as ex:
return ("fail", atomic_yaml["atomic_yaml_path"], str(ex))
oks: list[str] = []
fails: list[str] = []
with ThreadPoolExecutor() as executor:
futures = [
executor.submit(_render_one, ay)
for ay in self.atomic_red_team.atomic_tests
]
for future in as_completed(futures):
status, src_path, detail = future.result()
if status == "ok":
print(f"Generating docs for {src_path} => {detail} => OK")
oks.append(src_path)
else:
print(f"Generating docs for {src_path} FAIL\n{detail}")
fails.append(src_path)
print()
print(f"Generated docs for {len(oks)} techniques, {len(fails)} failures")
self.generate_indexes()
return oks, fails
def generate_technique_docs(
self, atomic_yaml: dict, output_doc_path: str | Path
) -> None:
technique = self.attack_api.technique_info(atomic_yaml["attack_technique"])
if technique is None:
raise ValueError(
f"Unknown ATT&CK technique {atomic_yaml['attack_technique']}"
)
technique = {**technique, "identifier": atomic_yaml["attack_technique"].upper()}
output_doc_path = Path(output_doc_path)
output_doc_path.write_text(_render_technique_markdown(technique, atomic_yaml))
def generate_indexes(self) -> None:
index_dir = self.base_dir / "atomics" / "Indexes"
for subdir in (
"Matrices",
"Indexes-Markdown",
"Indexes-CSV",
"Attack-Navigator-Layers",
):
(index_dir / subdir).mkdir(parents=True, exist_ok=True)
self.generate_attack_matrix("All", index_dir / "Matrices" / "matrix.md")
for title, platform, filename in _platform_outputs("matrix.md"):
self.generate_attack_matrix(
title, index_dir / "Matrices" / filename, platform
)
self.generate_index("All", index_dir / "Indexes-Markdown" / "index.md")
for title, platform, filename in _platform_outputs("index.md"):
# Use ".*" for attack_platform so STIX v19 platform renames don't break
# cloud-platform indexes; rely on YAML supported_platforms for filtering.
self.generate_index(
title, index_dir / "Indexes-Markdown" / filename, platform
)
self.generate_index_csv(index_dir / "Indexes-CSV" / "index.csv")
for title, platform, filename in _platform_outputs("index.csv"):
self.generate_index_csv(index_dir / "Indexes-CSV" / filename, platform)
self.generate_yaml_index(index_dir / "index.yaml")
for platform in PLATFORMS:
self.generate_yaml_index_by_platform(
index_dir / f"{platform.replace(':', '_')}-index.yaml",
platform,
)
self.generate_navigator_layers(index_dir / "Attack-Navigator-Layers")
def generate_attack_matrix(
self,
title_prefix: str,
output_doc_path: str | Path,
only_platform: PlatformFilter = ".*",
) -> None:
rows = [f"# {title_prefix} Atomic Tests by ATT&CK Tactic & Technique\n"]
tactics = self.attack_api.ordered_tactics()
rows.append(f"| {' | '.join(tactics)} |\n")
rows.append(f"|{'-----|' * len(tactics)}\n")
for row_of_techniques in self.attack_api.ordered_tactic_to_technique_matrix(
only_platform
):
row = [
self.atomic_red_team.github_link_to_technique(
technique, only_platform=only_platform
)
if technique
else ""
for technique in row_of_techniques
]
rows.append(f"| {' | '.join(row)} |\n")
Path(output_doc_path).write_text("".join(rows))
def generate_index(
self,
title_prefix: str,
output_doc_path: str | Path,
only_platform: PlatformFilter = ".*",
attack_platform: PlatformFilter = ".*",
) -> None:
rows = [f"# {title_prefix} Atomic Tests by ATT&CK Tactic & Technique\n"]
by_tactic = self.attack_api.techniques_by_tactic(attack_platform)
for tactic in self.attack_api.ordered_tactics():
techniques = sorted(
by_tactic.get(tactic, []),
key=lambda t: _technique_sort_key(
self.attack_api.technique_identifier_for_technique(t)
),
)
if not techniques:
continue
# For platform-specific indexes exclude techniques with no tests on that platform.
if only_platform != ".*":
techniques = [
t for t in techniques
if any(
_matches_platform(platform, only_platform)
for test in self.atomic_red_team.atomic_tests_for_technique(t)
for platform in test.get("supported_platforms", [])
)
]
if not techniques:
continue
rows.append(f"# {tactic}\n")
for technique in techniques:
rows.append(
f"- {self.atomic_red_team.github_link_to_technique(technique, True, only_platform)}\n"
)
tests = self.atomic_red_team.atomic_tests_for_technique(technique)
for index, atomic_test in enumerate(tests, start=1):
platforms = atomic_test.get("supported_platforms", [])
if any(
_matches_platform(platform, only_platform)
for platform in platforms
):
rows.append(
f" - Atomic Test #{index}: {atomic_test['name']} [{', '.join(platforms)}]\n"
)
rows.append("\n")
Path(output_doc_path).write_text("".join(rows))
def generate_index_csv(
self,
output_doc_path: str | Path,
only_platform: PlatformFilter = ".*",
attack_platform: PlatformFilter = ".*",
) -> None:
output = StringIO()
writer = csv.writer(output, lineterminator="\n")
writer.writerow(
[
"Tactic",
"Technique #",
"Technique Name",
"Test #",
"Test Name",
"Test GUID",
"Executor Name",
]
)
by_tactic = self.attack_api.techniques_by_tactic(attack_platform)
for tactic in self.attack_api.ordered_tactics():
techniques = sorted(
by_tactic.get(tactic, []),
key=lambda t: _technique_sort_key(
self.attack_api.technique_identifier_for_technique(t)
),
)
for technique in techniques:
tests = self.atomic_red_team.atomic_tests_for_technique(technique)
for index, atomic_test in enumerate(tests, start=1):
if any(
_matches_platform(platform, only_platform)
for platform in atomic_test.get("supported_platforms", [])
):
writer.writerow(
[
tactic,
technique["external_references"][0]["external_id"],
technique["name"],
index,
atomic_test["name"],
atomic_test.get("auto_generated_guid", ""),
atomic_test["executor"]["name"],
]
)
Path(output_doc_path).write_text(output.getvalue())
def generate_yaml_index(self, output_doc_path: str | Path) -> None:
result = self._yaml_index()
Path(output_doc_path).write_text(
yaml.dump(
json.loads(json.dumps(result)),
Dumper=yaml.CSafeDumper,
sort_keys=False,
allow_unicode=True,
)
)
def generate_yaml_index_by_platform(
self, output_doc_path: str | Path, platform: str
) -> None:
result = self._yaml_index(platform)
Path(output_doc_path).write_text(
yaml.dump(
json.loads(json.dumps(result)),
Dumper=yaml.CSafeDumper,
sort_keys=False,
allow_unicode=True,
)
)
def generate_navigator_layers(self, output_dir: str | Path) -> None:
output_dir = Path(output_dir)
layer_specs = [("art-navigator-layer.json", None)]
layer_specs.extend(
(f"art-navigator-layer-{platform.replace(':', '-')}.json", platform)
for platform in PLATFORMS
)
for filename, platform in layer_specs:
layer = _navigator_layer(
techniques=self._navigator_techniques(platform),
layer_name=_layer_name(filename),
platform=platform,
)
(output_dir / filename).write_text(
json.dumps(layer, separators=(",", ":")), encoding="utf-8"
)
def _yaml_index(self, platform: str | None = None) -> dict:
result = {}
for tactic, techniques in self.attack_api.techniques_by_tactic().items():
result[tactic] = {}
for technique in techniques:
identifier = technique["external_references"][0]["external_id"]
tests = self.atomic_red_team.atomic_tests_for_technique(technique)
if platform:
tests = [
test
for test in tests
if platform in test.get("supported_platforms", [])
]
result[tactic][identifier] = {
"technique": technique,
"atomic_tests": tests,
}
return result
def _navigator_techniques(self, platform: str | None = None) -> list[dict]:
entries: dict[str, dict] = {}
parent_scores: dict[str, int] = {}
for atomic_yaml in self.atomic_red_team.atomic_tests:
tests = atomic_yaml.get("atomic_tests", [])
if platform:
tests = [
test
for test in tests
if platform in test.get("supported_platforms", [])
]
if not tests:
continue
technique_id = atomic_yaml["attack_technique"]
entry: dict = {
"techniqueID": technique_id,
"score": len(tests),
"enabled": True,
}
if platform is not None:
entry["comment"] = "\n" + "".join(
f"- {t['name']}\n" for t in tests
)
entry["links"] = [{"label": "View Atomic", "url": _atomic_url(technique_id)}]
entries[technique_id] = entry
if "." in technique_id:
parent_id = technique_id.split(".")[0]
parent_scores[parent_id] = parent_scores.get(parent_id, 0) + len(tests)
for parent_id, score in parent_scores.items():
if parent_id in entries:
entries[parent_id]["score"] += score
else:
entries[parent_id] = {
"techniqueID": parent_id,
"score": score,
"enabled": True,
"links": [{"label": "View Atomic", "url": _atomic_url(parent_id)}],
}
return list(entries.values())
@lru_cache(maxsize=None)
def _get_template():
environment = Environment(
autoescape=False,
keep_trailing_newline=True,
loader=FileSystemLoader(Path(__file__).parent),
)
environment.filters.update(
{
"anchor": _anchor,
"attack_url_identifier": lambda value: str(value).replace(".", "/"),
"cleanup": _cleanup,
"language": get_language,
"platform_list": _platform_list,
}
)
return environment.get_template("atomic_doc_template.md.j2")
def _render_technique_markdown(technique: dict, atomic_yaml: dict) -> str:
template = _get_template()
technique = {
**technique,
"name": atomic_yaml.get("display_name", technique["name"]),
}
return template.render(
atomic_yaml=atomic_yaml,
attack_description_lines=_attack_description_lines(technique),
technique=technique,
)
def _attack_description_lines(technique: dict) -> list[str]:
description = technique.get("description", "").replace("%\\<", "%<")
description = re.sub(
r"<code>.*?</code>",
lambda match: match.group(0).replace("~", r"\~"),
description,
)
return description.splitlines()
def _platform_list(platforms: list[str]) -> str:
return ", ".join(get_supported_platform(platform) for platform in platforms)
def _cleanup(value: object) -> str:
return str(value or "").strip().replace("\\", "&#92;")
def _anchor(title: str) -> str:
return re.sub(
r"[`~!@#$%^&*()+=<>?,./:;\"'|{}\[\]\\–—]", "", title.lower().replace(" ", "-")
)
_PLATFORM_TITLES: dict[str, str] = {
"windows": "Windows",
"macos": "macOS",
"linux": "Linux",
"office-365": "Office 365",
"azure-ad": "Azure AD",
"google-workspace": "Google Workspace",
"saas": "SaaS",
"iaas": "IaaS",
"containers": "Containers",
"iaas:gcp": "IaaS:GCP",
"iaas:azure": "IaaS:Azure",
"iaas:aws": "IaaS:AWS",
"esxi": "ESXi",
}
# Platforms whose filename prefix differs from their identifier (e.g. colon is invalid).
_PLATFORM_FILENAME_PREFIX: dict[str, str] = {
"iaas:gcp": "gcp",
"iaas:azure": "azure",
"iaas:aws": "aws",
}
def _platform_outputs(suffix: str) -> list[tuple[str, str, str]]:
return [
(
_PLATFORM_TITLES.get(p, p),
p,
f"{_PLATFORM_FILENAME_PREFIX.get(p, p)}-{suffix}",
)
for p in PLATFORMS
]
def _atomic_url(technique_id: str) -> str:
return f"https://github.com/redcanaryco/atomic-red-team/blob/master/atomics/{technique_id}/{technique_id}.md"
def _navigator_layer(
techniques: list[dict], layer_name: str, platform: str | None = None
) -> dict:
filters: dict = {}
if platform in ("windows", "macos", "linux"):
filters = {
"platforms": [
platform.replace("macos", "macOS")
.replace("windows", "Windows")
.replace("linux", "Linux")
]
}
return {
"name": layer_name,
"versions": {"attack": "19", "navigator": "5.3.0", "layer": "4.5"},
"description": f"{layer_name} MITRE ATT&CK Navigator Layer",
"domain": "enterprise-attack",
"filters": filters,
"gradient": {"colors": ["#ffffff", "#ce232e"], "minValue": 0, "maxValue": 10},
"legendItems": [
{"label": "10 or more tests", "color": "#ce232e"},
{"label": "1 or more tests", "color": "#ffffff"},
],
"techniques": techniques,
}
def _technique_sort_key(technique_id: str) -> tuple[int, ...]:
"""Return a sort key that orders T1001 < T1001.001 < T1001.002 < T1002."""
parts = technique_id.lstrip("Tt").split(".")
return tuple(int(p) for p in parts)
def _layer_name(filename: str) -> str:
if filename == "art-navigator-layer.json":
return "Atomic Red Team"
platform = filename.removeprefix("art-navigator-layer-").removesuffix(".json")
names = {
"windows": "Windows",
"macos": "macOS",
"linux": "Linux",
"iaas": "Iaas",
"iaas-aws": "Iaas:AWS",
"iaas-azure": "Iaas:Azure",
"iaas-gcp": "Iaas:GCP",
"containers": "Containers",
"saas": "SaaS",
"google-workspace": "Google-Workspace",
"azure-ad": "Azure-AD",
"office-365": "Office-365",
"esxi": "ESXi",
}
return f"Atomic Red Team ({names.get(platform, platform)})"