diff --git a/app/routes.py b/app/routes.py index b1a2a8c..fc2f6be 100644 --- a/app/routes.py +++ b/app/routes.py @@ -441,7 +441,7 @@ def register_routes(app): # Extract scanner-specific results and calculate risk score try: process_info = dynamic_results.get('moneta', {}).get('findings', {}).get('process_info', {}) - risk_score, risk_factors = utils.calculate_risk(dynamic_results) + risk_score, risk_factors = utils.calculate_process_risk(dynamic_results) risk_level = utils.get_risk_level(risk_score) yara_matches = dynamic_results.get('yara', {}).get('matches', []) @@ -527,7 +527,7 @@ def register_routes(app): # Calculate risk score try: - risk_score, risk_factors = utils.calculate_risk(file_info, static_results, dynamic_results) + risk_score, risk_factors = utils.calculate_file_risk(file_info, static_results, dynamic_results) risk_level = utils.get_risk_level(risk_score) file_based_summary[item] = { 'md5': file_info.get('md5', 'unknown'), diff --git a/app/utils.py b/app/utils.py index 4de03cc..affcde9 100644 --- a/app/utils.py +++ b/app/utils.py @@ -497,15 +497,14 @@ class Utils: return normalized_score, risk_factors - def calculate_risk(self, analysis_type='file', file_info=None, static_results=None, dynamic_results=None): + def calculate_file_risk(self, file_info, static_results=None, dynamic_results=None): """ - Unified risk calculation for both file and process analysis. + Calculate overall file risk score with enhanced static analysis impact. Args: - analysis_type (str): 'file' or 'process' - file_info (dict, optional): Information about the file - static_results (dict, optional): Static analysis results - dynamic_results (dict, optional): Dynamic analysis results + file_info (dict): Information about the file. + static_results (dict, optional): Static analysis results. + dynamic_results (dict, optional): Dynamic analysis results. Returns: tuple: (risk_score, risk_factors) @@ -513,213 +512,302 @@ class Utils: risk_score = 0 risk_factors = [] - # File-specific analysis - if analysis_type == 'file': - WEIGHTS = { - 'pe_info': 0.10, - 'static': 0.50, - 'dynamic': 0.40 - } + # Adjusted weights to minimize PE info impact + WEIGHTS = { + 'pe_info': 0.10, # Minimal impact + 'static': 0.50, # Maintain high static analysis weight + 'dynamic': 0.40 # Slightly increased + } + + # 1. PE Information Risk Calculation + if file_info.get('pe_info'): + pe_risk = 0 + pe_info = file_info['pe_info'] - # PE Information Risk - if file_info and file_info.get('pe_info'): - pe_risk = 0 - pe_info = file_info['pe_info'] + # Enhanced entropy detection + high_entropy_sections = 0 + very_high_entropy_sections = 0 + for section in pe_info.get('sections', []): + entropy = section.get('entropy', 0) + if entropy > 7.5: # Very high entropy threshold + very_high_entropy_sections += 1 + risk_factors.append(f"Critical entropy in section {section.get('name', 'UNKNOWN')}: {entropy:.2f}") + elif entropy > 7.0: + high_entropy_sections += 1 + risk_factors.append(f"High entropy in section {section.get('name', 'UNKNOWN')}: {entropy:.2f}") + + pe_risk += min(high_entropy_sections * 10 + very_high_entropy_sections * 20, 40) + + # Enhanced import analysis + suspicious_imports = pe_info.get('suspicious_imports', []) + if suspicious_imports: + # Categorize imports based on their risk level + critical_functions = { + 'createremotethread', 'virtualallocex', 'writeprocessmemory', # Process injection + 'ntmapviewofsection', 'zwmapviewofsection' # Memory mapping + } + high_risk_functions = { + 'loadlibrarya', 'loadlibraryw', 'getprocaddress', # Dynamic loading + 'openprocess', 'virtualallocexnuma' # Process manipulation + } - # Entropy analysis - high_entropy_sections = 0 - very_high_entropy_sections = 0 - for section in pe_info.get('sections', []): - entropy = section.get('entropy', 0) - if entropy > 7.5: - very_high_entropy_sections += 1 - risk_factors.append(f"Critical entropy in section {section.get('name', 'UNKNOWN')}: {entropy:.2f}") - elif entropy > 7.0: - high_entropy_sections += 1 - risk_factors.append(f"High entropy in section {section.get('name', 'UNKNOWN')}: {entropy:.2f}") + # Count imports by severity based on function names + critical_imports = sum(1 for imp in suspicious_imports + if imp.get('function', '').lower() in critical_functions) + high_risk_imports = sum(1 for imp in suspicious_imports + if imp.get('function', '').lower() in high_risk_functions) - pe_risk += min(high_entropy_sections * 10 + very_high_entropy_sections * 20, 40) - - # Import analysis - suspicious_imports = pe_info.get('suspicious_imports', []) - if suspicious_imports: - critical_functions = { - 'createremotethread', 'virtualallocex', 'writeprocessmemory', - 'ntmapviewofsection', 'zwmapviewofsection' - } - high_risk_functions = { - 'loadlibrarya', 'loadlibraryw', 'getprocaddress', - 'openprocess', 'virtualallocexnuma' - } - - critical_imports = sum(1 for imp in suspicious_imports - if imp.get('function', '').lower() in critical_functions) - high_risk_imports = sum(1 for imp in suspicious_imports - if imp.get('function', '').lower() in high_risk_functions) - - pe_risk += min(critical_imports * 15 + high_risk_imports * 8, 30) - if critical_imports > 0 or high_risk_imports > 0: - risk_factors.append(f"Found {critical_imports} critical process manipulation and {high_risk_imports} high-risk dynamic loading imports") - - # Checksum analysis - if pe_info.get('checksum_info'): - checksum = pe_info['checksum_info'] - if checksum.get('stored_checksum') != checksum.get('calculated_checksum'): - pe_risk += 25 - risk_factors.append("PE checksum mismatch detected") - - risk_score += (pe_risk / 100) * WEIGHTS['pe_info'] * 100 + pe_risk += min(critical_imports * 15 + high_risk_imports * 8, 30) + if critical_imports > 0 or high_risk_imports > 0: + risk_factors.append(f"Found {critical_imports} critical process manipulation and {high_risk_imports} high-risk dynamic loading imports") + + # Enhanced checksum analysis + if pe_info.get('checksum_info'): + checksum = pe_info['checksum_info'] + if checksum.get('stored_checksum') != checksum.get('calculated_checksum'): + pe_risk += 25 # Reduced impact + risk_factors.append("PE checksum mismatch detected") + + risk_score += (pe_risk / 100) * WEIGHTS['pe_info'] * 100 - # Static Analysis Risk - if static_results: - static_risk = self._calculate_static_risk(static_results, risk_factors) - risk_score += (static_risk / 100) * WEIGHTS['static'] * 100 + # 2. Enhanced Static Analysis Risk Calculation + if static_results: + static_risk = 0 + + # Enhanced YARA detection scoring + yara_matches = static_results.get('yara', {}).get('matches', []) + yara_score, yara_factors = self.calculate_yara_risk(yara_matches) + if yara_score > 0: + # Apply multiplier for multiple matching rules + match_multiplier = min(len(yara_matches) * 0.15 + 1, 1.5) # Up to 50% boost + static_risk += yara_score * match_multiplier + # Directly use the yara_factors which already include severity + risk_factors.extend([f"Static: {factor}" for factor in yara_factors]) + + # Enhanced CheckPLZ analysis + checkplz_findings = static_results.get('checkplz', {}).get('findings', {}) + if checkplz_findings: + threat_score = 0 + if checkplz_findings.get('initial_threat'): + threat_score += 50 + risk_factors.append("Critical: CheckPLZ detected initial threat indicators") + + # Additional CheckPLZ indicators + indicators = checkplz_findings.get('threat_indicators', []) + if indicators: + indicator_score = min(len(indicators) * 15, 40) + threat_score += indicator_score + risk_factors.append(f"Found {len(indicators)} additional threat indicators") + + static_risk += threat_score + + # Add file entropy analysis + if static_results.get('file_entropy'): + entropy = static_results['file_entropy'] + if entropy > 7.5: + static_risk += 30 + risk_factors.append(f"Critical overall file entropy: {entropy:.2f}") + elif entropy > 7.0: + static_risk += 20 + risk_factors.append(f"High overall file entropy: {entropy:.2f}") + + risk_score += (static_risk / 100) * WEIGHTS['static'] * 100 - # Dynamic Analysis Risk (common for both file and process) + + + # 3. Dynamic Analysis Risk Calculation if dynamic_results: dynamic_risk = 0 - # YARA analysis + # YARA dynamic detections yara_matches = dynamic_results.get('yara', {}).get('matches', []) yara_score, yara_factors = self.calculate_yara_risk(yara_matches) if yara_score > 0: dynamic_risk += yara_score + # Similarly for dynamic, use the factors directly risk_factors.extend([f"Dynamic: {factor}" for factor in yara_factors]) - # PE-Sieve analysis + # Enhanced PE-Sieve scoring pesieve_findings = dynamic_results.get('pe_sieve', {}).get('findings', {}) pesieve_suspicious = int(pesieve_findings.get('total_suspicious', 0)) if pesieve_suspicious > 0: - severity_multiplier = 1.5 if pesieve_findings.get('severity') == 'critical' else 1.0 - pe_sieve_score = min(pesieve_suspicious * (20 if analysis_type == 'file' else 15) * severity_multiplier, - 45 if analysis_type == 'file' else 30) + severity_multiplier = 1.0 + if pesieve_findings.get('severity') == 'critical': + severity_multiplier = 1.5 + + pe_sieve_score = min(pesieve_suspicious * 20 * severity_multiplier, 45) dynamic_risk += pe_sieve_score risk_factors.append(f"PE-Sieve found {pesieve_suspicious} suspicious indicators") - # Memory analysis - dynamic_risk += self._calculate_memory_risk(dynamic_results, risk_factors, analysis_type) + # Enhanced memory anomaly detection + moneta_findings = dynamic_results.get('moneta', {}).get('findings', {}) + if moneta_findings: + # Weight different types of anomalies + memory_scores = { + 'total_private_rwx': 15, # Highest risk + 'total_modified_code': 12, + 'total_heap_executable': 10, + 'total_modified_pe_header': 10, + 'total_private_rx': 8, + 'total_inconsistent_x': 8, + 'total_missing_peb': 5, + 'total_mismatching_peb': 5 + } + + total_score = 0 + anomaly_count = 0 + + for key, weight in memory_scores.items(): + count = int(moneta_findings.get(key, 0) or 0) + if count > 0: + total_score += min(count * weight, weight * 2) # Cap each type + anomaly_count += count + + if anomaly_count > 0: + dynamic_risk += min(total_score, 40) # Overall cap + risk_factors.append(f"Found {anomaly_count} weighted memory anomalies") - # Apply weights for file analysis - if analysis_type == 'file': - risk_score += (dynamic_risk / 100) * WEIGHTS['dynamic'] * 100 - else: - risk_score += dynamic_risk + # Enhanced behavior analysis + patriot_findings = dynamic_results.get('patriot', {}).get('findings', {}) + if patriot_findings: + behaviors = patriot_findings.get('findings', []) + behavior_count = len(behaviors) + + if behavior_count > 0: + # Weight by severity + severity_scores = { + 'critical': 25, + 'high': 15, + 'medium': 10, + 'low': 5 + } + + behavior_score = 0 + for behavior in behaviors: + severity = behavior.get('severity', 'low') + behavior_score += severity_scores.get(severity, 5) + + dynamic_risk += min(behavior_score, 35) + risk_factors.append(f"Found {behavior_count} weighted suspicious behaviors") + + # Enhanced HSB detection + hsb_findings = dynamic_results.get('hsb', {}).get('findings', {}) + if hsb_findings and hsb_findings.get('detections'): + total_hsb_score = 0 + for detection in hsb_findings['detections']: + if detection.get('findings'): + count = len(detection['findings']) + severity = detection.get('max_severity', 1) + + # Weight by severity + severity_multiplier = 1 + (severity * 0.5) # 1.5x for severity 1, 2x for severity 2, etc. + detection_score = min(count * 15 * severity_multiplier, 40) + + total_hsb_score += detection_score + + if severity >= 2: + risk_factors.append(f"Critical: Found {count} high-severity memory operations") + else: + risk_factors.append(f"Found {count} suspicious memory operations") + + dynamic_risk += min(total_hsb_score, 45) + + risk_score += (dynamic_risk / 100) * WEIGHTS['dynamic'] * 100 - # Final normalization - if analysis_type == 'process': - if max(yara_score if 'yara_score' in locals() else 0, 0) == 0 and pesieve_suspicious <= 1: - risk_score = min(risk_score, 65) - if all(f.lower().find('high') == -1 for f in risk_factors): - risk_score = min(risk_score, 75) - else: - base_score = min(max(risk_score, 0), 100) - if base_score > 75: - risk_score = min(base_score * 1.15, 100) - - return round(min(max(risk_score, 0), 100), 2), risk_factors - - def _calculate_static_risk(self, static_results, risk_factors): - """Helper method for static analysis risk calculation""" - static_risk = 0 + # Normalize final score and apply exponential weighting for high-risk factors + base_score = min(max(risk_score, 0), 100) + if base_score > 75: # High-risk threshold + # Apply exponential scaling to high scores + risk_score = min(base_score * 1.15, 100) - # YARA analysis - yara_matches = static_results.get('yara', {}).get('matches', []) + return round(risk_score, 2), risk_factors + + def calculate_process_risk(self, dynamic_results): + """ + Calculate risk score for process-based analysis using only dynamic results. + Improved to provide more accurate risk assessment. + + Args: + dynamic_results (dict): Dynamic analysis results from process scanning + + Returns: + tuple: (risk_score, risk_factors) + """ + risk_score = 0 + risk_factors = [] + + if not dynamic_results: + return 0, [] + + # YARA detections (high impact) + yara_matches = dynamic_results.get('yara', {}).get('matches', []) yara_score, yara_factors = self.calculate_yara_risk(yara_matches) if yara_score > 0: - match_multiplier = min(len(yara_matches) * 0.15 + 1, 1.5) - static_risk += yara_score * match_multiplier - risk_factors.extend([f"Static: {factor}" for factor in yara_factors]) + risk_score += yara_score # Direct addition as YARA indicates high risk + risk_factors.extend([f"Dynamic: {factor}" for factor in yara_factors]) - # CheckPLZ analysis - checkplz_findings = static_results.get('checkplz', {}).get('findings', {}) - if checkplz_findings: - threat_score = 50 if checkplz_findings.get('initial_threat') else 0 - if threat_score: - risk_factors.append("Critical: CheckPLZ detected initial threat indicators") - - indicators = checkplz_findings.get('threat_indicators', []) - if indicators: - indicator_score = min(len(indicators) * 15, 40) - threat_score += indicator_score - risk_factors.append(f"Found {len(indicators)} additional threat indicators") - - static_risk += threat_score + # PE-Sieve detections (moderate impact) + pesieve_findings = dynamic_results.get('pe_sieve', {}).get('findings', {}) + pesieve_suspicious = int(pesieve_findings.get('total_suspicious', 0)) + if pesieve_suspicious > 0: + # Adjusted to give moderate weight - single suspicious item shouldn't trigger high risk + risk_score += min(pesieve_suspicious * 15, 30) # Reduced from 25/50 to 15/30 + risk_factors.append(f"PE-Sieve found {pesieve_suspicious} suspicious modifications") - # File entropy analysis - if static_results.get('file_entropy'): - entropy = static_results['file_entropy'] - if entropy > 7.5: - static_risk += 30 - risk_factors.append(f"Critical overall file entropy: {entropy:.2f}") - elif entropy > 7.0: - static_risk += 20 - risk_factors.append(f"High overall file entropy: {entropy:.2f}") - - return static_risk - - def _calculate_memory_risk(self, dynamic_results, risk_factors, analysis_type): - """Helper method for memory-related risk calculation""" - memory_risk = 0 - - # Moneta analysis + # Moneta memory anomalies moneta_findings = dynamic_results.get('moneta', {}).get('findings', {}) - if moneta_findings: - memory_scores = { - 'total_private_rwx': 15, - 'total_modified_code': 12, - 'total_heap_executable': 10, - 'total_modified_pe_header': 10, - 'total_private_rx': 8, - 'total_inconsistent_x': 8, - 'total_missing_peb': 5, - 'total_mismatching_peb': 5 - } - - total_score = 0 - anomaly_count = 0 - - for key, weight in memory_scores.items(): - count = int(moneta_findings.get(key, 0) or 0) - if count > 0: - total_score += min(count * weight, weight * 2) - anomaly_count += count - - if anomaly_count > 0: - cap = 40 if analysis_type == 'file' else 30 - memory_risk += min(total_score, cap) - risk_factors.append(f"Found {anomaly_count} weighted memory anomalies") + memory_anomalies = sum([ + int(moneta_findings.get('total_private_rwx', 0) or 0), + int(moneta_findings.get('total_private_rx', 0) or 0), + int(moneta_findings.get('total_modified_code', 0) or 0), + int(moneta_findings.get('total_heap_executable', 0) or 0), + int(moneta_findings.get('total_modified_pe_header', 0) or 0), + int(moneta_findings.get('total_inconsistent_x', 0) or 0), + int(moneta_findings.get('total_missing_peb', 0) or 0), + int(moneta_findings.get('total_mismatching_peb', 0) or 0) + ]) + if memory_anomalies > 0: + risk_score += min(memory_anomalies * 10, 30) # Reduced from 15/40 to 10/30 + risk_factors.append(f"Found {memory_anomalies} memory anomalies") - # Behavior analysis - if dynamic_results.get('patriot', {}).get('findings'): - behaviors = dynamic_results['patriot']['findings'].get('findings', []) - if behaviors: - severity_scores = {'critical': 25, 'high': 15, 'medium': 10, 'low': 5} - behavior_score = sum(severity_scores.get(b.get('severity', 'low'), 5) for b in behaviors) - cap = 35 if analysis_type == 'file' else 30 - memory_risk += min(behavior_score, cap) - risk_factors.append(f"Found {len(behaviors)} weighted suspicious behaviors") + # Patriot detections + patriot_findings = len(dynamic_results.get('patriot', {}) + .get('findings', {}).get('findings', [])) + if patriot_findings > 0: + risk_score += min(patriot_findings * 15, 35) # Reduced from 20/40 to 15/35 + risk_factors.append(f"Found {patriot_findings} suspicious behaviors") - # HSB analysis + # HSB detections with proper severity handling hsb_findings = dynamic_results.get('hsb', {}).get('findings', {}) if hsb_findings and hsb_findings.get('detections'): - total_hsb_score = 0 for detection in hsb_findings['detections']: if detection.get('findings'): - count = len(detection['findings']) - severity = detection.get('max_severity', 1) - severity_multiplier = 1 + (severity * 0.5) - detection_score = min(count * (15 if analysis_type == 'file' else 10) * severity_multiplier, - 40 if analysis_type == 'file' else 25) - total_hsb_score += detection_score + hsb_detections = len(detection['findings']) + max_severity = detection.get('max_severity', 0) - if severity >= 2: - risk_factors.append(f"Critical: Found {count} high-severity memory operations") - else: - risk_factors.append(f"Found {count} suspicious memory operations") - - memory_risk += min(total_hsb_score, 45 if analysis_type == 'file' else 35) + # Adjust scoring based on severity + if max_severity == 0: # LOW + score = min(hsb_detections * 10, 20) + elif max_severity == 1: # MID + score = min(hsb_detections * 15, 25) + else: # HIGH + score = min(hsb_detections * 20, 35) + + risk_score += score + severity_text = "LOW" if max_severity == 0 else "MID" if max_severity == 1 else "HIGH" + risk_factors.append(f"Found {hsb_detections} {severity_text} severity memory operations") + + # Final normalization with more granular scaling + if risk_score > 0: + # Ensure single low/mid severity findings don't trigger high risk + if max(yara_score, 0) == 0 and pesieve_suspicious <= 1: + risk_score = min(risk_score, 65) # Cap at 65 if no YARA matches and only minor PE-Sieve findings + + # Additional cap for low severity combinations + if all(f.lower().find('high') == -1 for f in risk_factors): + risk_score = min(risk_score, 75) # Cap at 75 if no high severity findings - return memory_risk + return round(min(max(risk_score, 0), 100), 2), risk_factors def get_risk_level(self, risk_score): """