Files
2025-09-05 04:51:14 -06:00

164 lines
5.7 KiB
Python

import fnmatch
import json
import os
import re
from dataclasses import dataclass
import requests
import yaml
from yaml.loader import SafeLoader
def get_technique_from_filename(filename):
"""Returns technique(Txxx.xxx) from the filename specified"""
return re.findall(r"T[.\d]{4,8}", filename)[0]
@dataclass
class ChangedAtomic:
"""Returns atomic technique with test number which can be later used to run atomics in CI/CD pipelines."""
technique: str
test_number: int
data: dict
class SafeLineLoader(SafeLoader):
def construct_mapping(self, node, deep=False):
"""Add line number to each block of the atomic test."""
mapping = super(SafeLineLoader, self).construct_mapping(node, deep=deep)
# Add 1 so line numbering starts at 1
mapping["__line__"] = node.start_mark.line + 1
return mapping
class GithubAPI:
labels = {
"windows": "windows",
"macos": "macOS",
"linux": "linux",
"azure-ad": "ADFS",
"containers": "containers",
"iaas:gcp": "cloud",
"iaas:aws": "cloud",
"iaas:azure": "cloud",
"office-365": "cloud",
"google-workspace": "cloud",
}
maintainers = {
"windows": ["clr2of8", "MHaggis", "cyberbuff"],
"linux": ["josehelps", "cyberbuff"],
"macos": ["josehelps", "cyberbuff"],
"containers": ["patel-bhavin"],
"iaas:gcp": ["patel-bhavin"],
"iaas:aws": ["patel-bhavin"],
"iaas:azure": ["patel-bhavin"],
"azure-ad": ["patel-bhavin"],
"google-workspace": ["patel-bhavin"],
"office-365": ["patel-bhavin"],
}
def __init__(self, token):
self.token = token
@property
def headers(self):
return {
"Authorization": f"Bearer {self.token}",
"X-GitHub-Api-Version": "2022-11-28",
"Accept": "application/vnd.github+json",
}
def get_atomic_with_lines(self, file_url: str):
"""Get Atomic Technique along with line number for each of the atomics."""
r = requests.get(file_url, headers=self.headers)
assert r.status_code == 200
return yaml.load(r.text, Loader=SafeLineLoader)
def get_files_for_pr(self, pr):
"""Get new and modified files in the `atomics` directory changed in a PR."""
response = requests.get(
f"https://api.github.com/repos/{os.getenv('GITHUB_REPOSITORY')}/pulls/{pr}/files",
headers=self.headers,
timeout=15,
)
assert response.status_code == 200
files = response.json()
return filter(
lambda x: x["status"] in ["added", "modified"]
and fnmatch.fnmatch(x["filename"], "atomics/T*/T*.yaml"),
files,
)
def get_tests_changed(self, pr: str):
"""Get all the tests changed in a PR"""
tests = []
start = 0
files = self.get_files_for_pr(pr)
for file in files:
data = self.get_atomic_with_lines(file["raw_url"])
technique = get_technique_from_filename(file["filename"])
if file["status"] == "added":
# New file; run the entire technique; Invoke-AtomicTest Txxxx
tests += [
ChangedAtomic(technique=technique, test_number=index + 1, data=t)
for index, t in enumerate(data["atomic_tests"])
]
else:
changed_lines = []
count = 0
for line in file["patch"].split("\n"):
if line.startswith("@@"):
x, y = re.findall(r"\d{1,3},\d{1,3}", line)
start = int(x.split(",")[0])
count = -1
elif line.startswith("+"): # only take count of added lines
changed_lines.append(start + count)
elif line.startswith("-"):
count -= 1
count += 1
atomics = data["atomic_tests"]
for index, t in enumerate(atomics):
curr_atomic_start = atomics[index]["__line__"]
if index + 1 < len(atomics):
curr_atomic_end = atomics[index + 1]["__line__"]
else:
curr_atomic_end = start + 60
changes_in_current_atomic = [
i
for i in changed_lines
if i > curr_atomic_start and i < curr_atomic_end
]
if len(changes_in_current_atomic) > 0:
tests.append(
ChangedAtomic(
technique=technique, test_number=index + 1, data=t
)
)
return tests
def save_labels_and_maintainers(self, pr):
"""Saves labels and maintainers into `pr/labels.json` which would be later used by a workflow run."""
tests = self.get_tests_changed(pr)
platforms = set()
for t in tests:
platforms.update(t.data["supported_platforms"])
labels = []
maintainers = []
for p in platforms:
if p in self.labels:
labels.append(self.labels[p])
if p in self.maintainers:
maintainers += self.maintainers[p]
os.mkdir("pr")
with open("pr/changedfiles.json", "w") as f:
x = [{"name": t.technique, "test_number": t.test_number} for t in tests]
f.write(json.dumps(x))
with open("pr/labels.json", "w") as f:
j = {"pr": pr, "labels": labels, "maintainers": maintainers}
f.write(json.dumps(j))