Bug Fixed
This commit is contained in:
+2
-2
@@ -441,7 +441,7 @@ def register_routes(app):
|
||||
# Extract scanner-specific results and calculate risk score
|
||||
try:
|
||||
process_info = dynamic_results.get('moneta', {}).get('findings', {}).get('process_info', {})
|
||||
risk_score, risk_factors = utils.calculate_risk(dynamic_results)
|
||||
risk_score, risk_factors = utils.calculate_process_risk(dynamic_results)
|
||||
risk_level = utils.get_risk_level(risk_score)
|
||||
|
||||
yara_matches = dynamic_results.get('yara', {}).get('matches', [])
|
||||
@@ -527,7 +527,7 @@ def register_routes(app):
|
||||
|
||||
# Calculate risk score
|
||||
try:
|
||||
risk_score, risk_factors = utils.calculate_risk(file_info, static_results, dynamic_results)
|
||||
risk_score, risk_factors = utils.calculate_file_risk(file_info, static_results, dynamic_results)
|
||||
risk_level = utils.get_risk_level(risk_score)
|
||||
file_based_summary[item] = {
|
||||
'md5': file_info.get('md5', 'unknown'),
|
||||
|
||||
+265
-177
@@ -497,15 +497,14 @@ class Utils:
|
||||
|
||||
return normalized_score, risk_factors
|
||||
|
||||
def calculate_risk(self, analysis_type='file', file_info=None, static_results=None, dynamic_results=None):
|
||||
def calculate_file_risk(self, file_info, static_results=None, dynamic_results=None):
|
||||
"""
|
||||
Unified risk calculation for both file and process analysis.
|
||||
Calculate overall file risk score with enhanced static analysis impact.
|
||||
|
||||
Args:
|
||||
analysis_type (str): 'file' or 'process'
|
||||
file_info (dict, optional): Information about the file
|
||||
static_results (dict, optional): Static analysis results
|
||||
dynamic_results (dict, optional): Dynamic analysis results
|
||||
file_info (dict): Information about the file.
|
||||
static_results (dict, optional): Static analysis results.
|
||||
dynamic_results (dict, optional): Dynamic analysis results.
|
||||
|
||||
Returns:
|
||||
tuple: (risk_score, risk_factors)
|
||||
@@ -513,213 +512,302 @@ class Utils:
|
||||
risk_score = 0
|
||||
risk_factors = []
|
||||
|
||||
# File-specific analysis
|
||||
if analysis_type == 'file':
|
||||
WEIGHTS = {
|
||||
'pe_info': 0.10,
|
||||
'static': 0.50,
|
||||
'dynamic': 0.40
|
||||
}
|
||||
# Adjusted weights to minimize PE info impact
|
||||
WEIGHTS = {
|
||||
'pe_info': 0.10, # Minimal impact
|
||||
'static': 0.50, # Maintain high static analysis weight
|
||||
'dynamic': 0.40 # Slightly increased
|
||||
}
|
||||
|
||||
# 1. PE Information Risk Calculation
|
||||
if file_info.get('pe_info'):
|
||||
pe_risk = 0
|
||||
pe_info = file_info['pe_info']
|
||||
|
||||
# PE Information Risk
|
||||
if file_info and file_info.get('pe_info'):
|
||||
pe_risk = 0
|
||||
pe_info = file_info['pe_info']
|
||||
# Enhanced entropy detection
|
||||
high_entropy_sections = 0
|
||||
very_high_entropy_sections = 0
|
||||
for section in pe_info.get('sections', []):
|
||||
entropy = section.get('entropy', 0)
|
||||
if entropy > 7.5: # Very high entropy threshold
|
||||
very_high_entropy_sections += 1
|
||||
risk_factors.append(f"Critical entropy in section {section.get('name', 'UNKNOWN')}: {entropy:.2f}")
|
||||
elif entropy > 7.0:
|
||||
high_entropy_sections += 1
|
||||
risk_factors.append(f"High entropy in section {section.get('name', 'UNKNOWN')}: {entropy:.2f}")
|
||||
|
||||
pe_risk += min(high_entropy_sections * 10 + very_high_entropy_sections * 20, 40)
|
||||
|
||||
# Enhanced import analysis
|
||||
suspicious_imports = pe_info.get('suspicious_imports', [])
|
||||
if suspicious_imports:
|
||||
# Categorize imports based on their risk level
|
||||
critical_functions = {
|
||||
'createremotethread', 'virtualallocex', 'writeprocessmemory', # Process injection
|
||||
'ntmapviewofsection', 'zwmapviewofsection' # Memory mapping
|
||||
}
|
||||
high_risk_functions = {
|
||||
'loadlibrarya', 'loadlibraryw', 'getprocaddress', # Dynamic loading
|
||||
'openprocess', 'virtualallocexnuma' # Process manipulation
|
||||
}
|
||||
|
||||
# Entropy analysis
|
||||
high_entropy_sections = 0
|
||||
very_high_entropy_sections = 0
|
||||
for section in pe_info.get('sections', []):
|
||||
entropy = section.get('entropy', 0)
|
||||
if entropy > 7.5:
|
||||
very_high_entropy_sections += 1
|
||||
risk_factors.append(f"Critical entropy in section {section.get('name', 'UNKNOWN')}: {entropy:.2f}")
|
||||
elif entropy > 7.0:
|
||||
high_entropy_sections += 1
|
||||
risk_factors.append(f"High entropy in section {section.get('name', 'UNKNOWN')}: {entropy:.2f}")
|
||||
# Count imports by severity based on function names
|
||||
critical_imports = sum(1 for imp in suspicious_imports
|
||||
if imp.get('function', '').lower() in critical_functions)
|
||||
high_risk_imports = sum(1 for imp in suspicious_imports
|
||||
if imp.get('function', '').lower() in high_risk_functions)
|
||||
|
||||
pe_risk += min(high_entropy_sections * 10 + very_high_entropy_sections * 20, 40)
|
||||
|
||||
# Import analysis
|
||||
suspicious_imports = pe_info.get('suspicious_imports', [])
|
||||
if suspicious_imports:
|
||||
critical_functions = {
|
||||
'createremotethread', 'virtualallocex', 'writeprocessmemory',
|
||||
'ntmapviewofsection', 'zwmapviewofsection'
|
||||
}
|
||||
high_risk_functions = {
|
||||
'loadlibrarya', 'loadlibraryw', 'getprocaddress',
|
||||
'openprocess', 'virtualallocexnuma'
|
||||
}
|
||||
|
||||
critical_imports = sum(1 for imp in suspicious_imports
|
||||
if imp.get('function', '').lower() in critical_functions)
|
||||
high_risk_imports = sum(1 for imp in suspicious_imports
|
||||
if imp.get('function', '').lower() in high_risk_functions)
|
||||
|
||||
pe_risk += min(critical_imports * 15 + high_risk_imports * 8, 30)
|
||||
if critical_imports > 0 or high_risk_imports > 0:
|
||||
risk_factors.append(f"Found {critical_imports} critical process manipulation and {high_risk_imports} high-risk dynamic loading imports")
|
||||
|
||||
# Checksum analysis
|
||||
if pe_info.get('checksum_info'):
|
||||
checksum = pe_info['checksum_info']
|
||||
if checksum.get('stored_checksum') != checksum.get('calculated_checksum'):
|
||||
pe_risk += 25
|
||||
risk_factors.append("PE checksum mismatch detected")
|
||||
|
||||
risk_score += (pe_risk / 100) * WEIGHTS['pe_info'] * 100
|
||||
pe_risk += min(critical_imports * 15 + high_risk_imports * 8, 30)
|
||||
if critical_imports > 0 or high_risk_imports > 0:
|
||||
risk_factors.append(f"Found {critical_imports} critical process manipulation and {high_risk_imports} high-risk dynamic loading imports")
|
||||
|
||||
# Enhanced checksum analysis
|
||||
if pe_info.get('checksum_info'):
|
||||
checksum = pe_info['checksum_info']
|
||||
if checksum.get('stored_checksum') != checksum.get('calculated_checksum'):
|
||||
pe_risk += 25 # Reduced impact
|
||||
risk_factors.append("PE checksum mismatch detected")
|
||||
|
||||
risk_score += (pe_risk / 100) * WEIGHTS['pe_info'] * 100
|
||||
|
||||
# Static Analysis Risk
|
||||
if static_results:
|
||||
static_risk = self._calculate_static_risk(static_results, risk_factors)
|
||||
risk_score += (static_risk / 100) * WEIGHTS['static'] * 100
|
||||
# 2. Enhanced Static Analysis Risk Calculation
|
||||
if static_results:
|
||||
static_risk = 0
|
||||
|
||||
# Enhanced YARA detection scoring
|
||||
yara_matches = static_results.get('yara', {}).get('matches', [])
|
||||
yara_score, yara_factors = self.calculate_yara_risk(yara_matches)
|
||||
if yara_score > 0:
|
||||
# Apply multiplier for multiple matching rules
|
||||
match_multiplier = min(len(yara_matches) * 0.15 + 1, 1.5) # Up to 50% boost
|
||||
static_risk += yara_score * match_multiplier
|
||||
# Directly use the yara_factors which already include severity
|
||||
risk_factors.extend([f"Static: {factor}" for factor in yara_factors])
|
||||
|
||||
# Enhanced CheckPLZ analysis
|
||||
checkplz_findings = static_results.get('checkplz', {}).get('findings', {})
|
||||
if checkplz_findings:
|
||||
threat_score = 0
|
||||
if checkplz_findings.get('initial_threat'):
|
||||
threat_score += 50
|
||||
risk_factors.append("Critical: CheckPLZ detected initial threat indicators")
|
||||
|
||||
# Additional CheckPLZ indicators
|
||||
indicators = checkplz_findings.get('threat_indicators', [])
|
||||
if indicators:
|
||||
indicator_score = min(len(indicators) * 15, 40)
|
||||
threat_score += indicator_score
|
||||
risk_factors.append(f"Found {len(indicators)} additional threat indicators")
|
||||
|
||||
static_risk += threat_score
|
||||
|
||||
# Add file entropy analysis
|
||||
if static_results.get('file_entropy'):
|
||||
entropy = static_results['file_entropy']
|
||||
if entropy > 7.5:
|
||||
static_risk += 30
|
||||
risk_factors.append(f"Critical overall file entropy: {entropy:.2f}")
|
||||
elif entropy > 7.0:
|
||||
static_risk += 20
|
||||
risk_factors.append(f"High overall file entropy: {entropy:.2f}")
|
||||
|
||||
risk_score += (static_risk / 100) * WEIGHTS['static'] * 100
|
||||
|
||||
# Dynamic Analysis Risk (common for both file and process)
|
||||
|
||||
|
||||
# 3. Dynamic Analysis Risk Calculation
|
||||
if dynamic_results:
|
||||
dynamic_risk = 0
|
||||
|
||||
# YARA analysis
|
||||
# YARA dynamic detections
|
||||
yara_matches = dynamic_results.get('yara', {}).get('matches', [])
|
||||
yara_score, yara_factors = self.calculate_yara_risk(yara_matches)
|
||||
if yara_score > 0:
|
||||
dynamic_risk += yara_score
|
||||
# Similarly for dynamic, use the factors directly
|
||||
risk_factors.extend([f"Dynamic: {factor}" for factor in yara_factors])
|
||||
|
||||
# PE-Sieve analysis
|
||||
# Enhanced PE-Sieve scoring
|
||||
pesieve_findings = dynamic_results.get('pe_sieve', {}).get('findings', {})
|
||||
pesieve_suspicious = int(pesieve_findings.get('total_suspicious', 0))
|
||||
if pesieve_suspicious > 0:
|
||||
severity_multiplier = 1.5 if pesieve_findings.get('severity') == 'critical' else 1.0
|
||||
pe_sieve_score = min(pesieve_suspicious * (20 if analysis_type == 'file' else 15) * severity_multiplier,
|
||||
45 if analysis_type == 'file' else 30)
|
||||
severity_multiplier = 1.0
|
||||
if pesieve_findings.get('severity') == 'critical':
|
||||
severity_multiplier = 1.5
|
||||
|
||||
pe_sieve_score = min(pesieve_suspicious * 20 * severity_multiplier, 45)
|
||||
dynamic_risk += pe_sieve_score
|
||||
risk_factors.append(f"PE-Sieve found {pesieve_suspicious} suspicious indicators")
|
||||
|
||||
# Memory analysis
|
||||
dynamic_risk += self._calculate_memory_risk(dynamic_results, risk_factors, analysis_type)
|
||||
# Enhanced memory anomaly detection
|
||||
moneta_findings = dynamic_results.get('moneta', {}).get('findings', {})
|
||||
if moneta_findings:
|
||||
# Weight different types of anomalies
|
||||
memory_scores = {
|
||||
'total_private_rwx': 15, # Highest risk
|
||||
'total_modified_code': 12,
|
||||
'total_heap_executable': 10,
|
||||
'total_modified_pe_header': 10,
|
||||
'total_private_rx': 8,
|
||||
'total_inconsistent_x': 8,
|
||||
'total_missing_peb': 5,
|
||||
'total_mismatching_peb': 5
|
||||
}
|
||||
|
||||
total_score = 0
|
||||
anomaly_count = 0
|
||||
|
||||
for key, weight in memory_scores.items():
|
||||
count = int(moneta_findings.get(key, 0) or 0)
|
||||
if count > 0:
|
||||
total_score += min(count * weight, weight * 2) # Cap each type
|
||||
anomaly_count += count
|
||||
|
||||
if anomaly_count > 0:
|
||||
dynamic_risk += min(total_score, 40) # Overall cap
|
||||
risk_factors.append(f"Found {anomaly_count} weighted memory anomalies")
|
||||
|
||||
# Apply weights for file analysis
|
||||
if analysis_type == 'file':
|
||||
risk_score += (dynamic_risk / 100) * WEIGHTS['dynamic'] * 100
|
||||
else:
|
||||
risk_score += dynamic_risk
|
||||
# Enhanced behavior analysis
|
||||
patriot_findings = dynamic_results.get('patriot', {}).get('findings', {})
|
||||
if patriot_findings:
|
||||
behaviors = patriot_findings.get('findings', [])
|
||||
behavior_count = len(behaviors)
|
||||
|
||||
if behavior_count > 0:
|
||||
# Weight by severity
|
||||
severity_scores = {
|
||||
'critical': 25,
|
||||
'high': 15,
|
||||
'medium': 10,
|
||||
'low': 5
|
||||
}
|
||||
|
||||
behavior_score = 0
|
||||
for behavior in behaviors:
|
||||
severity = behavior.get('severity', 'low')
|
||||
behavior_score += severity_scores.get(severity, 5)
|
||||
|
||||
dynamic_risk += min(behavior_score, 35)
|
||||
risk_factors.append(f"Found {behavior_count} weighted suspicious behaviors")
|
||||
|
||||
# Enhanced HSB detection
|
||||
hsb_findings = dynamic_results.get('hsb', {}).get('findings', {})
|
||||
if hsb_findings and hsb_findings.get('detections'):
|
||||
total_hsb_score = 0
|
||||
for detection in hsb_findings['detections']:
|
||||
if detection.get('findings'):
|
||||
count = len(detection['findings'])
|
||||
severity = detection.get('max_severity', 1)
|
||||
|
||||
# Weight by severity
|
||||
severity_multiplier = 1 + (severity * 0.5) # 1.5x for severity 1, 2x for severity 2, etc.
|
||||
detection_score = min(count * 15 * severity_multiplier, 40)
|
||||
|
||||
total_hsb_score += detection_score
|
||||
|
||||
if severity >= 2:
|
||||
risk_factors.append(f"Critical: Found {count} high-severity memory operations")
|
||||
else:
|
||||
risk_factors.append(f"Found {count} suspicious memory operations")
|
||||
|
||||
dynamic_risk += min(total_hsb_score, 45)
|
||||
|
||||
risk_score += (dynamic_risk / 100) * WEIGHTS['dynamic'] * 100
|
||||
|
||||
# Final normalization
|
||||
if analysis_type == 'process':
|
||||
if max(yara_score if 'yara_score' in locals() else 0, 0) == 0 and pesieve_suspicious <= 1:
|
||||
risk_score = min(risk_score, 65)
|
||||
if all(f.lower().find('high') == -1 for f in risk_factors):
|
||||
risk_score = min(risk_score, 75)
|
||||
else:
|
||||
base_score = min(max(risk_score, 0), 100)
|
||||
if base_score > 75:
|
||||
risk_score = min(base_score * 1.15, 100)
|
||||
|
||||
return round(min(max(risk_score, 0), 100), 2), risk_factors
|
||||
|
||||
def _calculate_static_risk(self, static_results, risk_factors):
|
||||
"""Helper method for static analysis risk calculation"""
|
||||
static_risk = 0
|
||||
# Normalize final score and apply exponential weighting for high-risk factors
|
||||
base_score = min(max(risk_score, 0), 100)
|
||||
if base_score > 75: # High-risk threshold
|
||||
# Apply exponential scaling to high scores
|
||||
risk_score = min(base_score * 1.15, 100)
|
||||
|
||||
# YARA analysis
|
||||
yara_matches = static_results.get('yara', {}).get('matches', [])
|
||||
return round(risk_score, 2), risk_factors
|
||||
|
||||
def calculate_process_risk(self, dynamic_results):
|
||||
"""
|
||||
Calculate risk score for process-based analysis using only dynamic results.
|
||||
Improved to provide more accurate risk assessment.
|
||||
|
||||
Args:
|
||||
dynamic_results (dict): Dynamic analysis results from process scanning
|
||||
|
||||
Returns:
|
||||
tuple: (risk_score, risk_factors)
|
||||
"""
|
||||
risk_score = 0
|
||||
risk_factors = []
|
||||
|
||||
if not dynamic_results:
|
||||
return 0, []
|
||||
|
||||
# YARA detections (high impact)
|
||||
yara_matches = dynamic_results.get('yara', {}).get('matches', [])
|
||||
yara_score, yara_factors = self.calculate_yara_risk(yara_matches)
|
||||
if yara_score > 0:
|
||||
match_multiplier = min(len(yara_matches) * 0.15 + 1, 1.5)
|
||||
static_risk += yara_score * match_multiplier
|
||||
risk_factors.extend([f"Static: {factor}" for factor in yara_factors])
|
||||
risk_score += yara_score # Direct addition as YARA indicates high risk
|
||||
risk_factors.extend([f"Dynamic: {factor}" for factor in yara_factors])
|
||||
|
||||
# CheckPLZ analysis
|
||||
checkplz_findings = static_results.get('checkplz', {}).get('findings', {})
|
||||
if checkplz_findings:
|
||||
threat_score = 50 if checkplz_findings.get('initial_threat') else 0
|
||||
if threat_score:
|
||||
risk_factors.append("Critical: CheckPLZ detected initial threat indicators")
|
||||
|
||||
indicators = checkplz_findings.get('threat_indicators', [])
|
||||
if indicators:
|
||||
indicator_score = min(len(indicators) * 15, 40)
|
||||
threat_score += indicator_score
|
||||
risk_factors.append(f"Found {len(indicators)} additional threat indicators")
|
||||
|
||||
static_risk += threat_score
|
||||
# PE-Sieve detections (moderate impact)
|
||||
pesieve_findings = dynamic_results.get('pe_sieve', {}).get('findings', {})
|
||||
pesieve_suspicious = int(pesieve_findings.get('total_suspicious', 0))
|
||||
if pesieve_suspicious > 0:
|
||||
# Adjusted to give moderate weight - single suspicious item shouldn't trigger high risk
|
||||
risk_score += min(pesieve_suspicious * 15, 30) # Reduced from 25/50 to 15/30
|
||||
risk_factors.append(f"PE-Sieve found {pesieve_suspicious} suspicious modifications")
|
||||
|
||||
# File entropy analysis
|
||||
if static_results.get('file_entropy'):
|
||||
entropy = static_results['file_entropy']
|
||||
if entropy > 7.5:
|
||||
static_risk += 30
|
||||
risk_factors.append(f"Critical overall file entropy: {entropy:.2f}")
|
||||
elif entropy > 7.0:
|
||||
static_risk += 20
|
||||
risk_factors.append(f"High overall file entropy: {entropy:.2f}")
|
||||
|
||||
return static_risk
|
||||
|
||||
def _calculate_memory_risk(self, dynamic_results, risk_factors, analysis_type):
|
||||
"""Helper method for memory-related risk calculation"""
|
||||
memory_risk = 0
|
||||
|
||||
# Moneta analysis
|
||||
# Moneta memory anomalies
|
||||
moneta_findings = dynamic_results.get('moneta', {}).get('findings', {})
|
||||
if moneta_findings:
|
||||
memory_scores = {
|
||||
'total_private_rwx': 15,
|
||||
'total_modified_code': 12,
|
||||
'total_heap_executable': 10,
|
||||
'total_modified_pe_header': 10,
|
||||
'total_private_rx': 8,
|
||||
'total_inconsistent_x': 8,
|
||||
'total_missing_peb': 5,
|
||||
'total_mismatching_peb': 5
|
||||
}
|
||||
|
||||
total_score = 0
|
||||
anomaly_count = 0
|
||||
|
||||
for key, weight in memory_scores.items():
|
||||
count = int(moneta_findings.get(key, 0) or 0)
|
||||
if count > 0:
|
||||
total_score += min(count * weight, weight * 2)
|
||||
anomaly_count += count
|
||||
|
||||
if anomaly_count > 0:
|
||||
cap = 40 if analysis_type == 'file' else 30
|
||||
memory_risk += min(total_score, cap)
|
||||
risk_factors.append(f"Found {anomaly_count} weighted memory anomalies")
|
||||
memory_anomalies = sum([
|
||||
int(moneta_findings.get('total_private_rwx', 0) or 0),
|
||||
int(moneta_findings.get('total_private_rx', 0) or 0),
|
||||
int(moneta_findings.get('total_modified_code', 0) or 0),
|
||||
int(moneta_findings.get('total_heap_executable', 0) or 0),
|
||||
int(moneta_findings.get('total_modified_pe_header', 0) or 0),
|
||||
int(moneta_findings.get('total_inconsistent_x', 0) or 0),
|
||||
int(moneta_findings.get('total_missing_peb', 0) or 0),
|
||||
int(moneta_findings.get('total_mismatching_peb', 0) or 0)
|
||||
])
|
||||
if memory_anomalies > 0:
|
||||
risk_score += min(memory_anomalies * 10, 30) # Reduced from 15/40 to 10/30
|
||||
risk_factors.append(f"Found {memory_anomalies} memory anomalies")
|
||||
|
||||
# Behavior analysis
|
||||
if dynamic_results.get('patriot', {}).get('findings'):
|
||||
behaviors = dynamic_results['patriot']['findings'].get('findings', [])
|
||||
if behaviors:
|
||||
severity_scores = {'critical': 25, 'high': 15, 'medium': 10, 'low': 5}
|
||||
behavior_score = sum(severity_scores.get(b.get('severity', 'low'), 5) for b in behaviors)
|
||||
cap = 35 if analysis_type == 'file' else 30
|
||||
memory_risk += min(behavior_score, cap)
|
||||
risk_factors.append(f"Found {len(behaviors)} weighted suspicious behaviors")
|
||||
# Patriot detections
|
||||
patriot_findings = len(dynamic_results.get('patriot', {})
|
||||
.get('findings', {}).get('findings', []))
|
||||
if patriot_findings > 0:
|
||||
risk_score += min(patriot_findings * 15, 35) # Reduced from 20/40 to 15/35
|
||||
risk_factors.append(f"Found {patriot_findings} suspicious behaviors")
|
||||
|
||||
# HSB analysis
|
||||
# HSB detections with proper severity handling
|
||||
hsb_findings = dynamic_results.get('hsb', {}).get('findings', {})
|
||||
if hsb_findings and hsb_findings.get('detections'):
|
||||
total_hsb_score = 0
|
||||
for detection in hsb_findings['detections']:
|
||||
if detection.get('findings'):
|
||||
count = len(detection['findings'])
|
||||
severity = detection.get('max_severity', 1)
|
||||
severity_multiplier = 1 + (severity * 0.5)
|
||||
detection_score = min(count * (15 if analysis_type == 'file' else 10) * severity_multiplier,
|
||||
40 if analysis_type == 'file' else 25)
|
||||
total_hsb_score += detection_score
|
||||
hsb_detections = len(detection['findings'])
|
||||
max_severity = detection.get('max_severity', 0)
|
||||
|
||||
if severity >= 2:
|
||||
risk_factors.append(f"Critical: Found {count} high-severity memory operations")
|
||||
else:
|
||||
risk_factors.append(f"Found {count} suspicious memory operations")
|
||||
|
||||
memory_risk += min(total_hsb_score, 45 if analysis_type == 'file' else 35)
|
||||
# Adjust scoring based on severity
|
||||
if max_severity == 0: # LOW
|
||||
score = min(hsb_detections * 10, 20)
|
||||
elif max_severity == 1: # MID
|
||||
score = min(hsb_detections * 15, 25)
|
||||
else: # HIGH
|
||||
score = min(hsb_detections * 20, 35)
|
||||
|
||||
risk_score += score
|
||||
severity_text = "LOW" if max_severity == 0 else "MID" if max_severity == 1 else "HIGH"
|
||||
risk_factors.append(f"Found {hsb_detections} {severity_text} severity memory operations")
|
||||
|
||||
# Final normalization with more granular scaling
|
||||
if risk_score > 0:
|
||||
# Ensure single low/mid severity findings don't trigger high risk
|
||||
if max(yara_score, 0) == 0 and pesieve_suspicious <= 1:
|
||||
risk_score = min(risk_score, 65) # Cap at 65 if no YARA matches and only minor PE-Sieve findings
|
||||
|
||||
# Additional cap for low severity combinations
|
||||
if all(f.lower().find('high') == -1 for f in risk_factors):
|
||||
risk_score = min(risk_score, 75) # Cap at 75 if no high severity findings
|
||||
|
||||
return memory_risk
|
||||
return round(min(max(risk_score, 0), 100), 2), risk_factors
|
||||
|
||||
def get_risk_level(self, risk_score):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user