Files
blue-team-tools/tests/regression_tests_runner.py
Swachchhanda Shrawan Poudel 889b07d952 Merge PR #5943 from @swachchhanda000 - Add regression test count mismatch finder
chore: regression test count mismatch finder
2026-04-20 14:38:44 +02:00

719 lines
25 KiB
Python

"""Run regression tests for Sigma rules based on their regression_tests_path attribute."""
import argparse
import json
import os
import subprocess
import sys
from typing import Dict, List
import yaml
def get_absolute_path(base_path: str, relative_path: str) -> str:
"""Convert a relative path to an absolute path based on a base path."""
if os.path.isabs(relative_path):
return relative_path
# Normalize path separators
relative_path = relative_path.replace("/", os.sep).replace("\\", os.sep)
workspace_root = base_path
while not os.path.exists(os.path.join(workspace_root, relative_path)):
parent = os.path.dirname(workspace_root)
if parent == workspace_root: # Reached filesystem root
break
workspace_root = parent
return os.path.join(workspace_root, relative_path)
def load_info_yaml(
regression_tests_path: str, rule_id: str, file_path: str
) -> tuple[List[Dict], List[Dict]]:
"""Load and parse the regression test info YAML file."""
results = []
missing_files = []
if not os.path.exists(regression_tests_path):
missing_files.append(
{
"rule_path": file_path,
"rule_id": rule_id,
"missing_file": regression_tests_path,
"file_type": "regression_tests_path",
}
)
return results, missing_files
try:
with open(regression_tests_path, "r", encoding="utf-8") as f:
info_data = yaml.safe_load(f)
if not info_data or "regression_tests_info" not in info_data:
print(f"Warning: No regression_tests_info found in {regression_tests_path}")
return results, missing_files
# Extract test data from regression_tests_info
test_data = []
regression_tests = info_data.get("regression_tests_info", [])
rule_metadata = info_data.get("rule_metadata", [])
for test in regression_tests:
if not isinstance(test, dict):
continue
test_path = get_absolute_path(
os.path.dirname(file_path), test.get("path", "")
)
# Check if test file exists
if not os.path.exists(test_path):
missing_files.append(
{
"rule_path": file_path,
"rule_id": rule_id,
"missing_file": test_path,
"file_type": "test_file",
"test_name": test.get("name", "Unnamed Test"),
"test_type": test.get("type", "unknown"),
}
)
test_data.append(
{
"type": test.get("type", "unknown"),
"path": test_path,
"name": test.get("name", "Unnamed Test"),
"provider": test.get("provider", ""),
"match_count": test.get("match_count"),
}
)
info_metadata_rule_id = None
for metadata_entry in rule_metadata:
if not isinstance(metadata_entry, dict):
continue
info_metadata_rule_id = metadata_entry.get("id", "")
if test_data:
results.append(
{
"path": file_path,
"id": rule_id,
"tests": test_data,
"info_metadata_rule_id": info_metadata_rule_id,
}
)
except yaml.YAMLError as e:
print(f"Warning: Could not parse info file {regression_tests_path}: {e}")
return results, missing_files
def find_rule_missing_test(rule_data: Dict, file_path: str) -> tuple[bool, List[Dict]]:
"""Find missing test files for a single rule based on its data.
Returns:
skip: True if the rule should be skipped, False otherwise
missing_regression_tests_path: List of dicts with missing regression_tests_path info
"""
missing_regression_tests_path = []
rule_id = rule_data.get("id", "unknown")
rule_status = rule_data.get("status", "").lower()
# Check if rule status requires regression tests
requires_regression_tests = rule_status in ["test", "stable"]
# Check if rule has regression_tests_path
has_regression_tests_path = "regression_tests_path" in rule_data
# If rule requires regression tests but doesn't have regression_tests_path
if requires_regression_tests and not has_regression_tests_path:
missing_regression_tests_path.append(
{
"rule_path": file_path,
"rule_id": rule_id,
"status": rule_status,
}
)
return True, missing_regression_tests_path
# Skip rules that don't require regression tests
# and don't have regression_tests_path
if not requires_regression_tests and not has_regression_tests_path:
return True, missing_regression_tests_path
return False, missing_regression_tests_path
def find_rule_tests(rule_data: Dict, file_path: str) -> tuple[List[Dict], List[Dict]]:
"""Find regression tests and missing files for a single rule based on its data."""
results = []
missing_files = []
rule_id = rule_data.get("id", "unknown")
if rule_data and "regression_tests_path" in rule_data:
regression_tests_path = get_absolute_path(
os.path.dirname(file_path),
rule_data.get("regression_tests_path", ""),
)
# Load the info.yml file
yml_result, yml_missing_files = load_info_yaml(
regression_tests_path, rule_id, file_path
)
results.extend(yml_result)
missing_files.extend(yml_missing_files)
return results, missing_files
# pylint: disable=too-many-locals
def find_rules_with_tests(
rules_paths: List[str],
) -> tuple[List[Dict], List[Dict], List[Dict]]:
"""Find all rules that have a 'regression_tests_path' attribute pointing to test info files.
Returns:
tuple: (rules_with_tests, missing_files, missing_regression_tests_path)
"""
results = []
missing_files = []
missing_regression_tests_path = []
for rules_path in rules_paths:
if not os.path.exists(rules_path):
print(f"Warning: Rules path {rules_path} does not exist")
continue
for root, _, files in os.walk(rules_path):
for file in files:
if not file.endswith(".yml"):
continue
file_path = os.path.join(root, file)
try:
with open(file_path, "r", encoding="utf-8") as f:
rule_data = yaml.safe_load(f)
if not rule_data:
continue
# Check for missing regression_tests_path
skip, missing_test = find_rule_missing_test(rule_data, file_path)
missing_regression_tests_path.extend(missing_test)
if skip:
continue
# Find tests for the rule
(
result,
missing_file,
) = find_rule_tests(rule_data, file_path)
results.extend(result)
missing_files.extend(missing_file)
except yaml.YAMLError as e:
print(f"Warning: Could not parse {file_path}: {e}")
return results, missing_files, missing_regression_tests_path
def run_evtx_checker(
rule_path: str,
rule_id: str,
test_data: Dict,
evtx_checker_path: str,
thor_config: str,
) -> tuple[bool, str]:
"""Run evtx-sigma-checker and check if rule ID is in output."""
evtx_path = test_data["path"]
# File existence is now checked upfront in find_rules_with_tests
# No need to check again here
cmd = [
evtx_checker_path,
"--log-source",
thor_config,
"--evtx-path",
evtx_path,
"--rule-level",
"informational",
"--rule-path",
os.path.dirname(rule_path),
]
try:
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=300, check=True
)
if result.returncode != 0:
print(f" Warning: evtx-sigma-checker failed: {result.stderr}")
return False, ""
# Check if rule ID appears in output
output_lines = result.stdout.strip().splitlines()
match_outputs = []
for line in output_lines:
try:
json_obj = json.loads(line)
if json_obj.get("RuleId") == rule_id:
match_outputs.append(line)
except json.JSONDecodeError:
# Skip lines that aren't valid JSON
print(f" Warning: Skipping non-JSON line: {line}")
continue
match_count = len(match_outputs)
all_output = "\n ".join(match_outputs)
expected_count = test_data.get("match_count")
if expected_count is not None:
if match_count < expected_count:
print(
f" Error: {rule_id}: Match count too low: expected {expected_count}, got {match_count}"
)
return False, all_output
if match_count > expected_count:
print(
f" Warning: {rule_id}: Got {match_count} matches but only {expected_count} expected - consider updating match_count in info.yml"
)
return True, all_output
return match_count > 0, all_output
except subprocess.TimeoutExpired:
print(" Timeout: evtx-sigma-checker timed out")
return False, ""
except subprocess.CalledProcessError as e:
print(f" Error running evtx-sigma-checker: {e}")
if e.stderr:
print(f" Output: {e.stderr}")
return False, ""
def run_test(
rule_path: str,
rule_id: str,
test_data: Dict,
evtx_checker_path: str,
thor_config: str,
) -> tuple[bool, str]:
"""Run a test based on its type."""
test_type = test_data.get("type", "unknown")
if test_type == "evtx":
return run_evtx_checker(
rule_path, rule_id, test_data, evtx_checker_path, thor_config
)
print(f" Warning: Unknown test type '{test_type}', skipping")
return False, ""
def parse_arguments() -> argparse.Namespace:
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(
description="Run regression tests for Sigma rules with regression_tests_path"
)
parser.add_argument(
"--rules-paths",
required=True,
action="extend",
nargs="+",
help="Comma-separated paths to rule directories",
)
parser.add_argument(
"--evtx-checker",
help="Path to evtx-sigma-checker binary (required unless using --validate-only)",
)
parser.add_argument(
"--thor-config",
help="Path to thor.yml configuration file (required unless using --validate-only)",
)
parser.add_argument(
"--validate-only",
action="store_true",
help="Only validate rule status requirements without running tests",
)
parser.add_argument(
"--ignore-validation",
action="store_true",
help="Ignore rule status validation requirements",
)
parser.add_argument(
"--verbose",
action="store_true",
help="Enable verbose output, showing successful test results as well",
)
return parser.parse_args()
def init_checks(args: argparse.Namespace) -> None:
"""Initialization that checks for functional environment."""
if args.validate_only:
print("Starting Rule Status Validation...")
else:
print("Starting Regression Tests...")
# Check required arguments for test execution
if not args.evtx_checker or not args.thor_config:
print(
"Error: --evtx-checker and --thor-config are required unless using --validate-only"
)
sys.exit(1)
# Check if evtx-sigma-checker exists
if not os.path.exists(args.evtx_checker):
print(f"Error: evtx-sigma-checker not found at {args.evtx_checker}")
sys.exit(1)
# Check if THOR config exists
if not os.path.exists(args.thor_config):
print(f"Error: Thor config not found at {args.thor_config}")
sys.exit(1)
print(f"Rules paths: {args.rules_paths}")
if not args.validate_only:
print(f"EVTX checker: {args.evtx_checker}")
print(f"Thor config: {args.thor_config}")
print()
# pylint: disable=too-many-locals
def run_tests(
args: argparse.Namespace, rules_with_tests
) -> tuple[int, int, List[Dict]]:
"""Run tests for all rules with test data."""
total_tests = 0
passed_tests = 0
failures = []
for rule_info in rules_with_tests:
rule_path = rule_info["path"]
rule_id = rule_info["id"]
tests = rule_info["tests"]
if args.verbose:
print(f"\nTesting rule: {rule_id}")
print(f" File: {rule_path}")
for i, test_data in enumerate(tests):
test_name = test_data.get("name", f"Test {i+1}")
test_type = test_data.get("type", "unknown")
test_path = test_data.get("path", "unknown")
if args.verbose:
print(f" {test_name} (type: {test_type}): {test_path}")
total_tests += 1
success, output = run_test(
rule_path, rule_id, test_data, args.evtx_checker, args.thor_config
)
if success:
passed_tests += 1
if args.verbose:
print(f" ✓ PASS - Match found for Rule ID: {rule_id}\n")
print(f" Output: {output}")
else:
failures.append(
{
"rule_id": rule_id,
"rule_path": rule_path,
"test_name": test_name,
"test_type": test_type,
"test_path": test_path,
"test_number": i + 1,
}
)
if args.verbose:
print(" ✗ FAIL")
if args.verbose:
print()
return total_tests, passed_tests, failures
def validate_missing_tests(
args: argparse.Namespace,
rules_with_tests: List[Dict],
missing_regression_tests_path: List[Dict],
) -> None:
"""Print rules missing regression_tests_path and handle validation."""
# Check for missing regression_tests_path in test/stable rules
if missing_regression_tests_path and not args.ignore_validation:
print()
print("-" * 50)
print("RULES MISSING REGRESSION_TESTS_PATH:")
print("-" * 50)
for missing in missing_regression_tests_path:
print(f"Rule: {missing['rule_id']} (status: {missing['status']})")
print(f" File: {missing['rule_path']}")
print()
print("=" * 70)
print(
"Rules with status 'test' or 'stable' must have a 'regression_tests_path' field."
)
print("Please add regression tests for these rules or change their status.")
print("=" * 70)
print(
f"\nERROR: Found {len(missing_regression_tests_path)} "
"test/stable rule(s) without regression_tests_path."
)
sys.exit(1)
elif missing_regression_tests_path and args.ignore_validation:
print(
f"\nWARNING: Found {len(missing_regression_tests_path)} "
"test/stable rule(s) without regression_tests_path (validation ignored)"
)
print(
"Consider adding regression tests for these rules "
"or changing their status to 'experimental'."
)
# If validate-only mode, exit successfully after validation
if args.validate_only:
if args.ignore_validation and missing_regression_tests_path:
print("✅ All rules passed validation (validation ignored)!")
else:
print("✅ All rules passed validation!")
print(f"Found {len(rules_with_tests)} rules with regression tests configured.")
sys.exit(0)
def check_missing_test_files(missing_files: List[Dict]) -> None:
"""Check for missing test files and print errors if any are found."""
if not missing_files:
return
print(f"\nERROR: Found {len(missing_files)} missing file(s):")
print("=" * 60)
regression_test_files = [
f for f in missing_files if f["file_type"] == "regression_tests_path"
]
test_files = [f for f in missing_files if f["file_type"] == "test_file"]
if regression_test_files:
print(f"\nMISSING REGRESSION TEST INFO FILES ({len(regression_test_files)}):")
print("-" * 50)
for missing in regression_test_files:
print(f"Rule: {missing['rule_id']}")
print(f" File: {missing['rule_path']}")
print(f" Missing: {missing['missing_file']}")
print()
if test_files:
print(f"\nMISSING TEST DATA FILES ({len(test_files)}):")
print("-" * 50)
for missing in test_files:
print(f"Rule: {missing['rule_id']}")
print(f" File: {missing['rule_path']}")
print(f" Test: {missing['test_name']} (type: {missing['test_type']})")
print(f" Missing: {missing['missing_file']}")
print()
print("=" * 60)
print("Please ensure all referenced files exist before running tests.")
sys.exit(1)
def print_summary(total_tests: int, passed_tests: int, failures: List[Dict]) -> None:
"""Print a summary of the test results."""
print("=" * 60)
print("REGRESSION TEST SUMMARY")
print("=" * 60)
print(f"Total tests run: {total_tests}")
print(f"Passed: {passed_tests}")
print(f"Failed: {len(failures)}")
if total_tests > 0:
success_rate = (passed_tests / total_tests) * 100
print(f"Success rate: {success_rate:.1f}%")
# Print failures
if failures:
print(f"\nFAILED TESTS ({len(failures)}):")
print("-" * 40)
for failure in failures:
print(f"Rule: {failure['rule_id']}")
print(f" File: {failure['rule_path']}")
print(f" Test: {failure['test_name']} (type: {failure['test_type']})")
print(f" Path: {failure['test_path']}")
print()
print("=" * 60)
def check_rule_id_consistency(rules_with_tests: List[Dict]) -> List[Dict]:
"""Check if rule IDs are consistent between rule files and their info.yml files.
Also checks if rule IDs match the test file names.
Returns:
List of dicts containing information about inconsistent rule IDs
"""
inconsistent_rules = []
for rule_info in rules_with_tests:
rule_id = rule_info["id"]
info_metadata_rule_id = rule_info.get("info_metadata_rule_id", "")
rule_path = rule_info["path"]
tests = rule_info.get("tests", [])
# Check rule ID vs info.yml rule_metadata[0].id consistency
if not info_metadata_rule_id:
inconsistent_rules.append(
{
"rule_id": rule_id,
"info_metadata_rule_id": info_metadata_rule_id,
"rule_path": rule_path,
"issue": "missing_info_metadata_rule_id",
"expected": rule_id,
"actual": info_metadata_rule_id,
"message": "info.yml is missing rule_metadata or rule_metadata[0].id",
}
)
elif rule_id != info_metadata_rule_id:
inconsistent_rules.append(
{
"rule_id": rule_id,
"info_metadata_rule_id": info_metadata_rule_id,
"rule_path": rule_path,
"issue": "rule_vs_info_metadata_mismatch",
"expected": rule_id,
"actual": info_metadata_rule_id,
"message": f"Rule ID '{rule_id}' in rule file does not match "
f"info.yml rule_metadata[0].id '{info_metadata_rule_id}'",
}
)
# Check rule ID vs test file name consistency
for test in tests:
test_path = test.get("path", "")
if test_path:
# Extract filename without extension
filename = os.path.basename(test_path)
name_without_ext = os.path.splitext(filename)[0]
file_ext = os.path.splitext(filename)[1].lower()
# Check if the filename (without extension) matches the rule ID
# Only check for .evtx and .json files (.json is optional conversion of .evtx)
if file_ext in [".evtx", ".json"] and name_without_ext != rule_id:
expected_filename = f"{rule_id}{file_ext}"
inconsistent_rules.append(
{
"rule_id": rule_id,
"test_filename": filename,
"rule_path": rule_path,
"test_path": test_path,
"issue": "rule_vs_testfile_mismatch",
"expected": expected_filename,
"actual": filename,
"message": f"Rule ID '{rule_id}' does not match test file"
f"name '{name_without_ext}' (expected: {rule_id}{file_ext})",
}
)
if inconsistent_rules:
print("\nERROR: Found rule ID inconsistencies:")
print("=" * 60)
print()
# Group by issue type for better readability
rule_vs_info_issues = [
r
for r in inconsistent_rules
if r.get("issue")
in ["rule_vs_info_metadata_mismatch", "missing_info_metadata_rule_id"]
]
rule_vs_testfile_issues = [
r
for r in inconsistent_rules
if r.get("issue") == "rule_vs_testfile_mismatch"
]
if rule_vs_info_issues:
print("RULE ID vs INFO.YML RULE_METADATA[0].ID MISMATCHES:")
print("-" * 50)
for inconsistent in rule_vs_info_issues:
print(f"Rule file ID: {inconsistent['rule_id']}")
print(
f"Info.yml rule_metadata[0].id: {inconsistent['info_metadata_rule_id']}"
)
print(f"Expected: {inconsistent['expected']}")
print(f"Actual: {inconsistent['actual']}")
print(f"Rule file: {inconsistent['rule_path']}")
print(f"Message: {inconsistent['message']}")
print("-" * 50)
print()
if rule_vs_testfile_issues:
print("RULE ID vs TEST FILE NAME MISMATCHES:")
print("-" * 40)
for inconsistent in rule_vs_testfile_issues:
print(f"Rule ID: {inconsistent['rule_id']}")
print(f"Expected filename: {inconsistent['expected']}")
print(f"Actual filename: {inconsistent['actual']}")
print(f"Rule file: {inconsistent['rule_path']}")
print(f"Test file: {inconsistent['test_path']}")
print(f"{inconsistent['message']}")
print()
print("<=>" * 20)
print("Rule IDs must match between:")
print("1. Rule files ID and their info.yml rule_metadata[0].id")
print("2. Rule files ID and their test file names (EVTX/JSON files)")
print(" Note: JSON files are optional conversions of EVTX files")
return inconsistent_rules
def main():
"""Main function to run regression tests for Sigma rules."""
args = parse_arguments()
init_checks(args)
# Find rules with tests
print("Scanning for rules with test data...")
rules_with_tests, missing_files, missing_regression_tests_path = (
find_rules_with_tests(args.rules_paths)
)
print(f"Found {len(rules_with_tests)} rule(s) with regression tests configured.\n")
print("Checking for consistent rule <--> test mapping...")
inconsistent_rules = check_rule_id_consistency(rules_with_tests)
if inconsistent_rules:
sys.exit(1)
else:
print("All rules are mapped correctly.")
validate_missing_tests(args, rules_with_tests, missing_regression_tests_path)
check_missing_test_files(missing_files)
print()
if not rules_with_tests:
print("No rules with test data found")
sys.exit(1)
# Test each rule
print("Running tests...\n")
total_tests, passed_tests, failures = run_tests(args, rules_with_tests)
print_summary(total_tests, passed_tests, failures)
# Exit with error code if any tests failed
if failures:
sys.exit(1)
if __name__ == "__main__":
main()