import glob import json import os import shlex import sys import urllib.parse from collections import defaultdict from functools import partial from typing import Annotated import typer from pydantic import ValidationError from atomic_red_team.common import used_guids_file, atomics_path from atomic_red_team.docs import AtomicRedTeamDocs from atomic_red_team.guid import ( generate_guids_for_yaml, get_unique_guid, ) from atomic_red_team.labels import GithubAPI from atomic_red_team.models import Technique from atomic_red_team.new_atomic import create_or_append_atomic, open_in_editor from atomic_red_team.validator import Validator, format_validation_error, yaml app = typer.Typer(help="Atomic Red Team Maintenance tool CLI helper") @app.command() def generate_docs(): """Generates markdown docs, indexes, and ATT&CK Navigator layers.""" _oks, fails = AtomicRedTeamDocs().generate_all_the_docs() if fails: sys.exit(len(fails)) @app.command() def new_atomic( technique_identifier: Annotated[ str, typer.Argument(help="Technique identifier, such as T1234") ], edit: Annotated[bool, typer.Option("--edit/--no-edit")] = True, ): """Creates a new atomic YAML file or appends a blank test to an existing file.""" output_path = create_or_append_atomic(technique_identifier) if edit: editor = os.environ.get("EDITOR", "vi") if len(shlex.split(editor)) == 0: raise typer.BadParameter("EDITOR must not be empty") sys.exit(open_in_editor(output_path, editor)) @app.command() def generate_guids(): """Generates missing GUIDs for the atomic files""" with open(used_guids_file, "r") as file: used_guids = file.readlines() for file in glob.glob(f"{atomics_path}/T*/T*.yaml"): generate_guids_for_yaml(file, partial(get_unique_guid, guids=used_guids)) @app.command() def generate_schemas(): """Generates JSON and YAML schemas for techniques""" schema = Technique.model_json_schema() # (1)! with open("schema.yaml", "w") as f: yaml.default_flow_style = False yaml.dump(schema, f) with open("schema.json", "w") as f: f.write(json.dumps(schema, indent=2)) @app.command() def generate_counter(): """Generate atomic tests count svg""" test_count = 0 for file in glob.glob(f"{atomics_path}/T*/T*.yaml"): with open(file, "r") as f: yaml_data = yaml.load(f) if yaml_data is not None and "atomic_tests" in yaml_data: test_count += len(yaml_data["atomic_tests"]) # Generate the shields.io badge URL params = {"label": "Atomics", "message": str(test_count), "style": "flat"} url = "https://img.shields.io/badge/{}-{}-{}.svg".format( urllib.parse.quote_plus(params["label"]), urllib.parse.quote_plus(params["message"]), urllib.parse.quote_plus(params["style"]), ) # Save shields URL in GitHub Output to be used in the next step. with open(os.environ["GITHUB_OUTPUT"], "a") as fh: print(f"result={url}", file=fh) @app.command() def generate_labels( pull_request: Annotated[str, typer.Option("--pr")], token: Annotated[str, typer.Option("--token")], ): """Generate labels for a pull request.""" api = GithubAPI(token) api.save_labels_and_maintainers(pull_request) @app.command() def validate(): """ Validate all the atomic techniques in a directory. """ validator = Validator() errors = defaultdict(list) for folder in glob.glob(f"{atomics_path}/T*"): for item in os.scandir(folder): try: validator.validate(item) except ValidationError as error: errors[item.path].append(error) if len(errors) == 0: print("Validation successful") else: print("Validation failed") for i, errors in errors.items(): print(f"Error occurred with {i.replace(f'{atomics_path}/', '')}.") print("Each of the following are why it failed:") for error in errors: if isinstance(error, ValidationError): for k, v in format_validation_error(error).items(): print(f"\n\tInvalid {'.'.join(map(str, v))}: {k}\n") else: print(f"\n\t{error}\n") sys.exit(1) if __name__ == "__main__": app()