921be01911
When the analysed sample is a `.sys` driver and HolyGrail has been run, the downloaded report now leads with a HolyGrail BYOVD section instead of burying it after the dynamic-scan stuff that doesn't apply to drivers. - helpers._load_file_data also loads byovd_results.json (when present) and threads it through `data['byovd_results']` into the report. - generate_html_report computes a HolyGrail score Python-side (port of holygrail/core.js's calculateScore) and exposes is_driver_report, byovd_score, byovd_label, byovd_class, av_killer_imports to the template. - For driver reports the hero "Risk Assessment" card swaps to "BYOVD Potential" with the HolyGrail score and pill, and the chip row swaps the YARA/PE-Sieve/Moneta/Patriot/HSB counts (all zero on drivers — they don't run dynamic scans) for LOLDrivers / Win10 / Win11 / Critical Imports. - The HolyGrail BYOVD section moves above File Information / Risk Factors so it's the first thing a defender sees in the report. - Non-driver reports are unchanged.
166 lines
6.2 KiB
Python
166 lines
6.2 KiB
Python
# app/helpers.py
|
|
"""Shared route-level helpers used across blueprints."""
|
|
import glob
|
|
import json
|
|
import os
|
|
import shutil
|
|
|
|
from .utils import json_helpers, path_manager, risk_analyzer, validators
|
|
|
|
|
|
class RouteHelpers:
|
|
"""Loads analysis data, computes risk, saves results, and runs cleanup."""
|
|
|
|
def __init__(self, app_config, logger):
|
|
self.config = app_config
|
|
self.logger = logger
|
|
|
|
def load_analysis_data(self, target):
|
|
"""Unified loader for both file-hash and PID targets."""
|
|
if target.isdigit():
|
|
return self._load_pid_data(target)
|
|
return self._load_file_data(target)
|
|
|
|
def _load_pid_data(self, pid):
|
|
is_valid, error_msg = validators.validate_pid(pid)
|
|
if not is_valid:
|
|
return None, error_msg, True
|
|
|
|
result_folder = os.path.join(self.config['utils']['result_folder'], f'dynamic_{pid}')
|
|
if not os.path.exists(result_folder):
|
|
return None, f'Process with PID {pid} does not exist', True
|
|
|
|
dynamic_path = os.path.join(result_folder, 'dynamic_analysis_results.json')
|
|
if not os.path.exists(dynamic_path):
|
|
return None, f'Dynamic analysis results for PID {pid} not found', True
|
|
|
|
dynamic_results = json_helpers.load_json_file(dynamic_path)
|
|
if not dynamic_results:
|
|
return None, 'Error loading dynamic analysis results', True
|
|
|
|
return {
|
|
'is_pid': True,
|
|
'pid': pid,
|
|
'result_path': result_folder,
|
|
'file_info': None,
|
|
'static_results': None,
|
|
'dynamic_results': dynamic_results,
|
|
'byovd_results': None,
|
|
}, None, False
|
|
|
|
def _load_file_data(self, file_hash):
|
|
result_path = path_manager.find_file_by_hash(file_hash, self.config['utils']['result_folder'])
|
|
if not result_path:
|
|
return None, 'Results not found', True
|
|
|
|
file_info_path = os.path.join(result_path, 'file_info.json')
|
|
if not os.path.exists(file_info_path):
|
|
return None, 'File info not found', True
|
|
|
|
file_info = json_helpers.load_json_file(file_info_path)
|
|
if not file_info:
|
|
return None, 'Error loading file info', True
|
|
|
|
static_path = os.path.join(result_path, 'static_analysis_results.json')
|
|
dynamic_path = os.path.join(result_path, 'dynamic_analysis_results.json')
|
|
byovd_path = os.path.join(result_path, 'byovd_results.json')
|
|
|
|
static_results = json_helpers.load_json_file(static_path) if os.path.exists(static_path) else None
|
|
dynamic_results = json_helpers.load_json_file(dynamic_path) if os.path.exists(dynamic_path) else None
|
|
byovd_results = json_helpers.load_json_file(byovd_path) if os.path.exists(byovd_path) else None
|
|
|
|
return {
|
|
'is_pid': False,
|
|
'pid': None,
|
|
'result_path': result_path,
|
|
'file_info': file_info,
|
|
'static_results': static_results,
|
|
'dynamic_results': dynamic_results,
|
|
'byovd_results': byovd_results,
|
|
}, None, False
|
|
|
|
def calculate_and_add_risk(self, data):
|
|
if data['is_pid']:
|
|
risk_score, risk_factors = risk_analyzer.calculate_risk(
|
|
analysis_type='process',
|
|
dynamic_results=data['dynamic_results'],
|
|
)
|
|
else:
|
|
risk_score, risk_factors = risk_analyzer.calculate_risk(
|
|
analysis_type='file',
|
|
file_info=data['file_info'],
|
|
static_results=data['static_results'],
|
|
dynamic_results=data['dynamic_results'],
|
|
)
|
|
|
|
risk_level = risk_analyzer.get_risk_level(risk_score)
|
|
|
|
risk_data = {
|
|
'score': risk_score,
|
|
'level': risk_level,
|
|
'factors': risk_factors,
|
|
}
|
|
|
|
if data['is_pid'] and data['dynamic_results']:
|
|
data['dynamic_results']['risk_assessment'] = risk_data
|
|
elif data['file_info']:
|
|
data['file_info']['risk_assessment'] = risk_data
|
|
|
|
return risk_score, risk_level, risk_factors
|
|
|
|
def get_detection_counts(self, data):
|
|
results = data['dynamic_results'] or data['static_results'] or {}
|
|
return json_helpers.extract_detection_counts(results)
|
|
|
|
def save_analysis_results(self, results, result_path, results_filename):
|
|
results_file_path = os.path.join(result_path, results_filename)
|
|
with open(results_file_path, 'w') as f:
|
|
json.dump(results, f)
|
|
self.logger.debug(f"Analysis results saved to: {results_file_path}")
|
|
return results_file_path
|
|
|
|
def process_file_cleanup(self, folders_to_clean):
|
|
results = {'uploads_cleaned': 0, 'analysis_cleaned': 0, 'result_cleaned': 0, 'errors': []}
|
|
|
|
for folder_type, folder_path in folders_to_clean.items():
|
|
if not os.path.exists(folder_path):
|
|
continue
|
|
|
|
try:
|
|
if folder_type == 'uploads':
|
|
results['uploads_cleaned'] += self._clean_files_in_folder(folder_path)
|
|
elif folder_type == 'results':
|
|
results['result_cleaned'] += self._clean_folders_in_folder(folder_path)
|
|
elif folder_type == 'analysis':
|
|
results['analysis_cleaned'] += self._clean_process_folders(folder_path)
|
|
except Exception as e:
|
|
self.logger.error(f"Error cleaning {folder_type}: {e}")
|
|
results['errors'].append(f"Error cleaning {folder_type}: {str(e)}")
|
|
|
|
return results
|
|
|
|
def _clean_files_in_folder(self, folder_path):
|
|
count = 0
|
|
for f in os.listdir(folder_path):
|
|
file_path = os.path.join(folder_path, f)
|
|
if os.path.isfile(file_path):
|
|
os.unlink(file_path)
|
|
count += 1
|
|
return count
|
|
|
|
def _clean_folders_in_folder(self, folder_path):
|
|
count = 0
|
|
for f in os.listdir(folder_path):
|
|
full_path = os.path.join(folder_path, f)
|
|
if os.path.isdir(full_path):
|
|
shutil.rmtree(full_path)
|
|
count += 1
|
|
return count
|
|
|
|
def _clean_process_folders(self, analysis_path):
|
|
count = 0
|
|
for folder in glob.glob(os.path.join(analysis_path, 'process_*')):
|
|
shutil.rmtree(folder)
|
|
count += 1
|
|
return count
|