Files

563 lines
20 KiB
Python

# app/utils/risk_analyzer.py
"""Risk scoring for file, process, and BYOVD driver analyses."""
class RiskCalculator:
"""YARA + PE risk aggregation with severity weighting."""
SEVERITY_WEIGHTS = {
'CRITICAL': 100,
'HIGH': 80,
'MEDIUM': 50,
'LOW': 20,
'INFO': 5,
}
NUMERIC_SEVERITY_MAP = {
100: 'CRITICAL',
80: 'HIGH',
50: 'MEDIUM',
20: 'LOW',
5: 'INFO',
}
@classmethod
def calculate_yara_risk(cls, matches):
if not matches:
return 0, None
max_severity_score = 0
severity_counts = {level: 0 for level in cls.SEVERITY_WEIGHTS}
for match in matches:
meta = match.get('metadata', {})
severity = meta.get('severity', 'MEDIUM')
if isinstance(severity, int):
severity = cls.NUMERIC_SEVERITY_MAP.get(severity, 'MEDIUM')
severity = severity.upper()
if severity in cls.SEVERITY_WEIGHTS:
severity_counts[severity] += 1
max_severity_score = max(max_severity_score, cls.SEVERITY_WEIGHTS[severity])
total_score = 0
risk_factors = []
for severity, count in severity_counts.items():
if count > 0:
severity_score = cls.SEVERITY_WEIGHTS[severity]
if count > 1:
additional_score = sum(severity_score * (0.5 ** i) for i in range(1, count))
total_score += severity_score + additional_score
else:
total_score += severity_score
risk_factors.append(
f"{count} {severity.lower()} severity YARA match"
f"{'es' if count > 1 else ''}"
)
normalized_score = min(100, total_score / 2)
return normalized_score, risk_factors
@classmethod
def calculate_pe_risk(cls, pe_info):
pe_risk = 0
risk_factors = []
high_entropy_sections = 0
very_high_entropy_sections = 0
for section in pe_info.get('sections', []):
entropy = section.get('entropy', 0)
if entropy > 7.5:
very_high_entropy_sections += 1
risk_factors.append(
f"Critical entropy in section {section.get('name', 'UNKNOWN')}: {entropy:.2f}"
)
elif entropy > 7.0:
high_entropy_sections += 1
risk_factors.append(
f"High entropy in section {section.get('name', 'UNKNOWN')}: {entropy:.2f}"
)
pe_risk += min(high_entropy_sections * 10 + very_high_entropy_sections * 20, 40)
suspicious_imports = pe_info.get('suspicious_imports', [])
if suspicious_imports:
critical_functions = {
'createremotethread', 'virtualallocex', 'writeprocessmemory',
'ntmapviewofsection', 'zwmapviewofsection',
}
high_risk_functions = {
'loadlibrarya', 'loadlibraryw', 'getprocaddress',
'openprocess', 'virtualallocexnuma',
}
critical_imports = sum(
1 for imp in suspicious_imports
if imp.get('function', '').lower() in critical_functions
)
high_risk_imports = sum(
1 for imp in suspicious_imports
if imp.get('function', '').lower() in high_risk_functions
)
pe_risk += min(critical_imports * 15 + high_risk_imports * 8, 30)
if critical_imports > 0 or high_risk_imports > 0:
risk_factors.append(
f"{critical_imports} critical process manipulation and "
f"{high_risk_imports} sensitive dynamic loading imports observed"
)
if pe_info.get('checksum_info'):
checksum = pe_info['checksum_info']
if checksum.get('stored_checksum') != checksum.get('calculated_checksum'):
build_with = checksum.get('build_with')
if build_with not in ['go', 'rust']:
pe_risk += 25
risk_factors.append("PE checksum mismatch observed")
return pe_risk, risk_factors
def calculate_yara_risk(matches):
"""Module-level alias for RiskCalculator.calculate_yara_risk."""
return RiskCalculator.calculate_yara_risk(matches)
def get_risk_level(risk_score):
"""Convert numerical risk score to a categorical risk level."""
if risk_score >= 75:
return "Critical"
elif risk_score >= 50:
return "High"
elif risk_score >= 25:
return "Medium"
else:
return "Low"
def get_entropy_risk_level(entropy):
"""Map an entropy value to a risk band."""
if entropy > 7.2:
return 'High'
elif entropy > 6.8:
return 'Medium'
return 'Low'
def calculate_risk(analysis_type='process', file_info=None,
static_results=None, dynamic_results=None,
byovd_results=None, edr_results=None):
"""Unified risk calculation for file, process, and driver analyses.
`edr_results` is a dict keyed by profile name → orchestrator findings
(see app/analyzers/edr/elastic_edr_analyzer.py). When non-empty, the
file's score gains a contribution scaled by the most severe alert any
profile raised. The contribution is additive (not weighted) — high-
severity EDR alerts are a strong runtime signal that should bump the
score regardless of what static/dynamic found.
"""
risk_score = 0
risk_factors = []
if analysis_type == 'driver':
if byovd_results:
byovd_risk, byovd_factors = _calculate_byovd_risk(byovd_results)
risk_factors.extend([f"BYOVD: {factor}" for factor in byovd_factors])
final_score = round(min(max(byovd_risk, 0), 100), 2)
return final_score, risk_factors
else:
return 0, ["No BYOVD analysis available"]
weights = {
'file': {'pe_info': 0.10, 'static': 0.50, 'dynamic': 0.40},
'process': {'dynamic': 1.0},
}[analysis_type]
if analysis_type == 'file' and file_info and file_info.get('pe_info'):
pe_risk, pe_factors = RiskCalculator.calculate_pe_risk(file_info['pe_info'])
risk_factors.extend(pe_factors)
risk_score += (pe_risk / 100) * weights['pe_info'] * 100
if analysis_type == 'file' and static_results:
static_risk, static_factors = _calculate_static_risk(static_results)
risk_factors.extend([f"Static: {factor}" for factor in static_factors])
risk_score += (static_risk / 100) * weights['static'] * 100
if analysis_type in ['file', 'process'] and dynamic_results:
dynamic_risk, dynamic_factors = _calculate_dynamic_risk(dynamic_results, analysis_type)
risk_factors.extend([f"Dynamic: {factor}" for factor in dynamic_factors])
risk_score += (dynamic_risk / 100) * weights['dynamic'] * 100
# EDR runs (Elastic Defend, etc.) feed in as a separate signal — they
# involve actually executing the binary on a real EDR-instrumented host,
# which is a stronger ground truth than any local heuristic.
if analysis_type == 'file' and edr_results:
edr_score, edr_factors = _calculate_edr_risk(edr_results)
risk_factors.extend([f"EDR: {factor}" for factor in edr_factors])
risk_score += edr_score
risk_score = _normalize_risk_score(risk_score, analysis_type, dynamic_results, risk_factors)
return round(min(max(risk_score, 0), 100), 2), risk_factors
def _calculate_byovd_risk(byovd_results):
risk_score = 0
risk_factors = []
if not byovd_results:
return 0, []
findings = byovd_results.get('findings', {})
summary = findings.get('summary', {})
detailed = findings.get('detailed_analysis', {})
is_lol = summary.get('is_loldriver', False)
win10_blocked = summary.get('is_win10_blocked', False)
win11_blocked = summary.get('is_win11_blocked', False)
critical_imports = detailed.get('critical_imports', '')
has_terminate_process = detailed.get('has_terminate_process', False)
has_communication = detailed.get('has_communication', False)
has_dangerous_imports = detailed.get('has_dangerous_imports', False)
has_danger = bool(
has_dangerous_imports or
(isinstance(critical_imports, str) and critical_imports.strip()) or
has_terminate_process or
has_communication
)
if win11_blocked and win10_blocked:
risk_factors.append("Blocked on both Windows 10 and 11 - minimal exploitation potential")
return 0, risk_factors
if has_danger:
risk_score += 55
danger_factors = []
if has_dangerous_imports:
danger_factors.append("critical-import flag observed")
if critical_imports and critical_imports.strip():
danger_factors.append("critical imports listed")
if has_terminate_process:
danger_factors.append("process termination capability")
if has_communication:
danger_factors.append("communication mechanisms")
if danger_factors:
risk_factors.append(f"Critical capabilities: {', '.join(danger_factors)}")
if not win11_blocked:
risk_score += 25
risk_factors.append("Not blocked on Windows 11")
else:
risk_score -= 50
risk_factors.append("Blocked on Windows 11")
if not win10_blocked:
risk_score += 20
risk_factors.append("Not blocked on Windows 10")
else:
risk_score -= 20
risk_factors.append("Blocked on Windows 10")
if not is_lol:
risk_score += 10
risk_factors.append("Not listed in LOLDrivers database")
else:
risk_score -= 5
risk_factors.append("Listed in LOLDrivers database")
final_score = max(0, min(100, risk_score))
if detailed.get('win10_block_reason'):
risk_factors.append(f"Win10 block reason: {detailed['win10_block_reason']}")
if detailed.get('win11_block_reason'):
risk_factors.append(f"Win11 block reason: {detailed['win11_block_reason']}")
return final_score, risk_factors
def _calculate_static_risk(static_results):
static_risk = 0
risk_factors = []
yara_matches = static_results.get('yara', {}).get('matches', [])
yara_score, yara_factors = calculate_yara_risk(yara_matches)
if yara_score > 0:
match_multiplier = min(len(yara_matches) * 0.15 + 1, 1.5)
static_risk += yara_score * match_multiplier
risk_factors.extend(yara_factors)
checkplz_findings = static_results.get('checkplz', {}).get('findings', {})
if checkplz_findings:
threat_score = 0
if checkplz_findings.get('initial_threat'):
threat_score += 50
risk_factors.append("Critical: CheckPLZ AV signature triggered")
indicators = checkplz_findings.get('threat_indicators', [])
if indicators:
indicator_score = min(len(indicators) * 15, 40)
threat_score += indicator_score
risk_factors.append(f"{len(indicators)} additional signature indicators observed")
static_risk += threat_score
if static_results.get('file_entropy'):
entropy = static_results['file_entropy']
if entropy > 7.5:
static_risk += 30
risk_factors.append(f"Critical overall file entropy: {entropy:.2f}")
elif entropy > 7.0:
static_risk += 20
risk_factors.append(f"High overall file entropy: {entropy:.2f}")
return static_risk, risk_factors
def _calculate_dynamic_risk(dynamic_results, analysis_type):
dynamic_risk = 0
risk_factors = []
yara_matches = dynamic_results.get('yara', {}).get('matches', [])
yara_score, yara_factors = calculate_yara_risk(yara_matches)
if yara_score > 0:
dynamic_risk += yara_score
risk_factors.extend(yara_factors)
pesieve_findings = dynamic_results.get('pe_sieve', {}).get('findings', {})
pesieve_suspicious = int(pesieve_findings.get('total_suspicious', 0))
if pesieve_suspicious > 0:
severity_multiplier = 1.5 if pesieve_findings.get('severity') == 'critical' else 1.0
pe_sieve_score = min(
pesieve_suspicious * (20 if analysis_type == 'file' else 15) * severity_multiplier,
45 if analysis_type == 'file' else 30,
)
dynamic_risk += pe_sieve_score
risk_factors.append(f"PE-Sieve observed {pesieve_suspicious} memory modifications")
dynamic_risk += _calculate_memory_anomaly_risk(dynamic_results, analysis_type, risk_factors)
dynamic_risk += _calculate_behavior_risk(dynamic_results, analysis_type, risk_factors)
dynamic_risk += _calculate_hsb_risk(dynamic_results, analysis_type, risk_factors)
dynamic_risk += _calculate_rededr_risk(dynamic_results, analysis_type, risk_factors)
return dynamic_risk, risk_factors
_EDR_HIGH_SEVERITY = {'high', 'critical'}
def _calculate_edr_risk(edr_results):
"""Score contribution from Elastic-EDR (or similar) runs.
`edr_results` is a {profile_name: findings_dict} mapping. We aggregate
across profiles by taking the strongest signal — if any profile's EDR
raised a high/critical alert, that's the file's contribution. A
"blocked_by_av" status without any alerts still counts (the AV
intercepted the payload at write/spawn — a real-world detection).
Cap is +50, mirroring the Defender-at-runtime branch in
`_calculate_rededr_risk`. The two are intentionally similar in weight
because they describe the same kind of event from different vantage
points (local Defender ETW vs. EDR backend correlation).
"""
if not edr_results:
return 0, []
best_score = 0
factors = []
for profile_name, findings in edr_results.items():
if not isinstance(findings, dict):
continue
status = findings.get('status') or ''
alerts = findings.get('alerts') or []
summary = findings.get('summary') or {}
display = findings.get('display_name') or profile_name
high_severity_count = summary.get('high_severity_alerts')
if high_severity_count is None:
high_severity_count = sum(
1 for a in alerts
if isinstance(a, dict)
and str(a.get('severity', '')).lower() in _EDR_HIGH_SEVERITY
)
contribution = 0
if status == 'blocked_by_av':
# AV intercepted on write or spawn. The payload never ran, but
# this is itself a strong real-world detection.
contribution = 35
factors.append(
f"{display} blocked the binary at write/spawn time (AV intercept)"
)
elif high_severity_count > 0:
contribution = 50
factors.append(
f"Critical: {display} raised {high_severity_count} high/critical "
f"detection alert{'s' if high_severity_count != 1 else ''}"
)
elif len(alerts) > 0:
# Lower-severity alerts still count — but as a moderate signal.
contribution = 15
factors.append(
f"{display} raised {len(alerts)} low/medium detection alert"
f"{'s' if len(alerts) != 1 else ''}"
)
if contribution > best_score:
best_score = contribution
return best_score, factors
def _calculate_rededr_risk(dynamic_results, analysis_type, risk_factors):
"""Defender-only contribution from RedEdr telemetry.
The analyzer classifies every defender_events entry as one of:
threat — real detection (ThreatFound, non-empty verdict, etc.)
scan — Defender behavior monitor actively engaged with our process
(BmModuleLoad / BmNotificationHandle* / BmOpenProcess)
internal — Defender's own state plumbing (BmInternal / BmEtw)
other — anything else (e.g., msmpeng ThreadStop)
Only `threat` events bump the score. `scan` is descriptive (operator
knows Defender engaged but didn't flag — typically the win state). The
other RedEdr signals (network, audit-API, file ops, child processes)
stay descriptive too, per the design decision.
"""
rededr = dynamic_results.get('rededr', {}).get('findings', {})
defender = rededr.get('defender_events') or []
if not defender:
return 0
threat_hits = [e for e in defender if e.get('category') == 'threat']
if threat_hits:
# ThreatFound-class verdict at runtime is the strongest possible signal.
risk_factors.append(
f"Critical: Microsoft Defender flagged the binary at runtime "
f"({len(threat_hits)} threat verdict{'s' if len(threat_hits) != 1 else ''})"
)
return 50
return 0
def _calculate_memory_anomaly_risk(dynamic_results, analysis_type, risk_factors):
moneta_findings = dynamic_results.get('moneta', {}).get('findings', {})
if not moneta_findings:
return 0
memory_scores = {
'total_private_rwx': 15 if analysis_type == 'file' else 10,
'total_modified_code': 12 if analysis_type == 'file' else 10,
'total_heap_executable': 10,
'total_modified_pe_header': 10,
'total_private_rx': 8,
'total_inconsistent_x': 8,
'total_missing_peb': 5,
'total_mismatching_peb': 5,
}
total_score = 0
anomaly_count = 0
for key, weight in memory_scores.items():
count = int(moneta_findings.get(key, 0) or 0)
if count > 0:
total_score += min(count * weight, weight * 2)
anomaly_count += count
if anomaly_count > 0:
risk_factors.append(f"{anomaly_count} weighted memory anomalies observed")
return min(total_score, 40 if analysis_type == 'file' else 30)
return 0
def _calculate_behavior_risk(dynamic_results, analysis_type, risk_factors):
patriot_findings = dynamic_results.get('patriot', {}).get('findings', {})
if not patriot_findings:
return 0
behaviors = patriot_findings.get('findings', [])
behavior_count = len(behaviors)
if behavior_count == 0:
return 0
severity_scores = {
'critical': 25 if analysis_type == 'file' else 20,
'high': 15,
'medium': 10,
'low': 5,
}
behavior_score = 0
for behavior in behaviors:
severity = behavior.get('severity', 'low')
behavior_score += severity_scores.get(severity, 5)
risk_factors.append(f"{behavior_count} weighted runtime indicators observed")
return min(behavior_score, 35)
def _calculate_hsb_risk(dynamic_results, analysis_type, risk_factors):
hsb_findings = dynamic_results.get('hsb', {}).get('findings', {})
if not (hsb_findings and hsb_findings.get('detections')):
return 0
total_hsb_score = 0
for detection in hsb_findings['detections']:
if not detection.get('findings'):
continue
count = len(detection['findings'])
severity = detection.get('max_severity', 0)
if analysis_type == 'file':
severity_multiplier = 1 + (severity * 0.5)
detection_score = min(count * 15 * severity_multiplier, 40)
else:
severity_scores = {0: 10, 1: 15, 2: 20}
max_scores = {0: 20, 1: 25, 2: 35}
detection_score = min(
count * severity_scores.get(severity, 10),
max_scores.get(severity, 20),
)
total_hsb_score += detection_score
severity_text = ["LOW", "MID", "HIGH"][min(severity, 2)]
if severity >= 2:
risk_factors.append(f"Critical: {count} high-severity memory operations observed")
else:
risk_factors.append(f"{count} {severity_text} severity memory operations observed")
return min(total_hsb_score, 45 if analysis_type == 'file' else 35)
def _normalize_risk_score(risk_score, analysis_type, dynamic_results, risk_factors):
if analysis_type == 'file':
base_score = min(max(risk_score, 0), 100)
if base_score > 75:
risk_score = min(base_score * 1.15, 100)
else:
yara_matches = dynamic_results.get('yara', {}).get('matches', []) if dynamic_results else []
pesieve_findings = dynamic_results.get('pe_sieve', {}).get('findings', {}) if dynamic_results else {}
pesieve_suspicious = int(pesieve_findings.get('total_suspicious', 0))
if len(yara_matches) == 0 and pesieve_suspicious <= 1:
risk_score = min(risk_score, 65)
if all(f.lower().find('high') == -1 for f in risk_factors):
risk_score = min(risk_score, 75)
return risk_score