Files
atomic-red-team/bin/validate/validate.py
T
2023-11-28 09:58:02 -06:00

169 lines
6.1 KiB
Python

import fnmatch
import glob
import os
from os import DirEntry
import sys
from jsonschema import validate, ValidationError
from collections import defaultdict
from ruamel.yaml import YAML
yaml = YAML(typ="safe")
class BaseError(Exception):
def __init__(self, path):
self.path = path
class InvalidPath(BaseError):
def __str__(self):
return "Invalid path. `src` and `bin` are the only two directories supported."
class InvalidFileName(BaseError):
def __str__(self):
return "Invalid filename. Rename file from .yml to .yaml"
class ReusedGuid(BaseError):
def __init__(self, path, guid, test_number):
super().__init__(path)
self.guid = guid
self.test_number = test_number
def __str__(self):
return (
f"GUID {self.guid} reused for test {self.test_number}. GUIDs are auto generated."
f"You can remove atomic_tests[{self.test_number}].auto_generated_guid"
)
class UnusedArgument(BaseError):
def __init__(self, path, argument, test_number):
super().__init__(path)
self.argument = argument
self.test_number = test_number
def __str__(self):
return f"Unused Input Argument {self.argument} for test number {self.test_number}"
class Validator:
errors = defaultdict(list)
def __init__(self):
schema_path = f"{os.path.dirname(os.path.abspath(__file__))}/atomic-red-team.schema.yaml"
used_guids_path = "./atomics/used_guids.txt"
with open(used_guids_path, "r") as f:
self.used_guids = [x.strip() for x in f.readlines()]
with open(schema_path, "r") as f:
self.schema = yaml.load(f)
self.guids = []
def validate(self, obj: DirEntry):
if obj.is_file():
if fnmatch.fnmatch(obj.name, "*.y*ml"):
self.validate_file(obj)
if obj.is_dir():
self.validate_directory(obj)
def validate_file(self, file: DirEntry):
"""Performs file validation"""
self.validate_yaml_extension(file)
self.validate_atomic(file)
self.validate_json_schema(file)
def validate_atomic(self, file: DirEntry):
"""Validates whether the defined input args are used."""
with open(file.path, "r") as f:
atomic = yaml.load(f)
for index, t in enumerate(atomic["atomic_tests"]):
if t.get("auto_generated_guid"):
if t["auto_generated_guid"] not in self.guids:
self.guids.append(t["auto_generated_guid"])
else:
self.errors[file.path].append(
ReusedGuid(file.path, t["auto_generated_guid"], index + 1)
)
if args := t.get("input_arguments"):
for k in list(args.keys()):
variable = f"#{{{k}}}"
executor = t.get("executor", {})
deps = t.get("dependencies", [])
if executor:
commands = [
executor.get("command"),
executor.get("cleanup_command"),
executor.get("steps"),
]
if deps:
commands += [d.get("get_prereq_command") for d in deps]
commands += [d.get("prereq_command") for d in deps]
commands = list(filter(lambda x: x is not None, commands))
if not any([variable in c for c in commands]):
self.errors[file.path].append(UnusedArgument(file.path, k, index + 1))
def validate_yaml_extension(self, file: DirEntry):
"""Validates the yaml extension"""
if fnmatch.fnmatch(file.path, "*.yml"):
self.errors[file.path].append(InvalidFileName(file.path))
def validate_json_schema(self, file: DirEntry):
"""Validates the yaml file against the schema."""
with open(file.path, "r") as f:
atomic = yaml.load(f)
try:
validate(
instance=atomic,
schema=self.schema
)
except Exception as e:
self.errors[file.path].append(e)
def validate_directory(self, directory: DirEntry):
"""Performs directory validation"""
self.validate_directory_path(directory)
def validate_directory_path(self, directory: DirEntry):
"""Validated whether the directory is allowed directory name (`src` or `bin`)"""
if directory.name not in ["src", "bin"]:
self.errors[directory.path].append(InvalidPath(directory.path))
def print_errors(self):
"""Output errors in a human friendly way."""
for i, errors in self.errors.items():
print(f"Error occurred with {i}.")
print("Each of the following are why it failed:")
for error in errors:
if isinstance(error, BaseError):
print(f"\n\t{error}\n")
elif isinstance(error, ValidationError):
if "auto_generated_guid" in error.json_path:
print(f"\n\tGUIDs are auto generated. You can remove {error.json_path}\n")
else:
if (context := error.context) and len(context) > 0:
print("\n\tIt failed because of one of the following reasons:")
messages = '\n\t\t'.join([c.message for c in context])
print(f"\n\t\t{messages}")
else:
print(f"\n\t{error}\n")
print(f"\nThe JSON Path is {error.json_path}\n")
else:
print(f"\n\t{error}\n")
validator = Validator()
for folder in glob.glob('./atomics/T*'):
for item in os.scandir(folder):
validator.validate(item)
if len(validator.errors) > 0:
print("Validation Failed")
validator.print_errors()
sys.exit(1)
else:
print("Validation Successful")