266 lines
11 KiB
Python
266 lines
11 KiB
Python
# app/utils/forensics.py
|
|
"""PE forensic analysis: entropy, runtime detection, MalAPI lookup.
|
|
|
|
Office / LNK / HTML-smuggling analyzers live in their own modules
|
|
(`utils/office.py`, `utils/lnk.py`, `utils/htmlsmuggle.py`) so each file-type
|
|
inspector is self-contained and easy to maintain. This module is now strictly
|
|
PE-focused.
|
|
"""
|
|
import json
|
|
import math
|
|
from collections import Counter
|
|
|
|
|
|
# Known runtime imports for compiled languages — used to flag PE imports as
|
|
# benign-runtime rather than suspicious.
|
|
RUNTIME_IMPORTS = {
|
|
'go': {
|
|
'kernel32.dll': {
|
|
'addvectoredexceptionhandler', 'closehandle', 'createeventa',
|
|
'createfilea', 'createiocompletionport', 'createthread',
|
|
'createwaitabletimerexw', 'duplicatehandle', 'exitprocess',
|
|
'freeenvironmentstringsw', 'getconsolemode', 'getenvironmentstringsw',
|
|
'getprocaddress', 'getprocessaffinitymask',
|
|
'getqueuedcompletionstatusex', 'getstdhandle', 'getsystemdirectorya',
|
|
'getsysteminfo', 'getthreadcontext', 'loadlibrarya', 'loadlibraryw',
|
|
'postqueuedcompletionstatus', 'resumethread', 'setconsolectrlhandler',
|
|
'seterrormode', 'setevent', 'setprocesspriorityboost',
|
|
'setthreadcontext', 'setunhandledexceptionfilter', 'setwaitabletimer',
|
|
'suspendthread', 'switchtothread', 'virtualalloc', 'virtualfree',
|
|
'virtualquery', 'waitformultipleobjects', 'waitforsingleobject',
|
|
'writeconsolew', 'writefile',
|
|
}
|
|
},
|
|
'rust': {
|
|
'kernel32.dll': {
|
|
'addvectoredexceptionhandler', 'closehandle', 'createmutexa',
|
|
'formatmessagew', 'getconsolemode', 'getcurrentdirectoryw',
|
|
'getcurrentprocess', 'getcurrentprocessid', 'getcurrentthread',
|
|
'getcurrentthreadid', 'getenvironmentvariablew', 'getlasterror',
|
|
'getmodulehandlea', 'getmodulehandlew', 'getprocaddress',
|
|
'getprocessheap', 'getstdhandle', 'getsystemtimeasfiletime',
|
|
'heapalloc', 'heapfree', 'heaprealloc', 'initializeslisthead',
|
|
'isdebuggerpresent', 'isprocessorfeaturepresent', 'loadlibrarya',
|
|
'multibytetowidechar', 'queryperformancecounter', 'releasemutex',
|
|
'rtlcapturecontext', 'rtllookupfunctionentry', 'rtlvirtualunwind',
|
|
'setlasterror', 'setthreadstackguarantee',
|
|
'setunhandledexceptionfilter', 'unhandledexceptionfilter',
|
|
'waitforsingleobject', 'waitforsingleobjectex', 'widechartomultibyte',
|
|
'writeconsolew', 'lstrlenw',
|
|
},
|
|
'ntdll.dll': {
|
|
'ntwritefile', 'rtlntstatustodoserror',
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
def calculate_entropy(data):
|
|
"""Compute Shannon entropy (rounded to 2 decimals) of a byte string.
|
|
|
|
Uses `collections.Counter` (C fastpath) instead of a Python loop —
|
|
~5x faster on multi-MB inputs, which matters for big PE uploads where
|
|
we run this once over the whole file plus once per section.
|
|
"""
|
|
n = len(data)
|
|
if n == 0:
|
|
return 0
|
|
|
|
if isinstance(data, str):
|
|
data = data.encode()
|
|
n = len(data)
|
|
|
|
counts = Counter(data)
|
|
inv_n = 1.0 / n
|
|
entropy = -sum((c * inv_n) * math.log2(c * inv_n) for c in counts.values())
|
|
return round(entropy, 2)
|
|
|
|
|
|
class SecurityAnalyzer:
|
|
"""PE import / Office macro analysis with MalAPI database lookup."""
|
|
|
|
def __init__(self, malapi_path):
|
|
self.malapi_data = self._load_malapi_data(malapi_path)
|
|
self.dll_function_map = self._build_function_map()
|
|
|
|
def _load_malapi_data(self, malapi_path):
|
|
try:
|
|
with open(malapi_path, "r", encoding="utf-8") as f:
|
|
return json.loads(f.read())
|
|
except Exception as e:
|
|
print(f"Error loading MalAPI database: {e}")
|
|
return {}
|
|
|
|
def _build_function_map(self):
|
|
dll_function_map = {}
|
|
|
|
for category, functions in self.malapi_data.items():
|
|
for function_name, function_info in functions.items():
|
|
if isinstance(function_info, dict):
|
|
description = function_info.get("description", "")
|
|
dll_name = function_info.get("dll", "Unknown").lower()
|
|
else:
|
|
description = function_info
|
|
dll_name = "unknown"
|
|
|
|
if dll_name not in dll_function_map:
|
|
dll_function_map[dll_name] = {}
|
|
|
|
dll_function_map[dll_name][function_name.lower()] = (category, description)
|
|
|
|
if "unknown" not in dll_function_map:
|
|
dll_function_map["unknown"] = {}
|
|
dll_function_map["unknown"][function_name.lower()] = (category, description)
|
|
|
|
return dll_function_map
|
|
|
|
def _detect_runtime_type(self, pe):
|
|
"""Return 'go', 'rust', or None based on PE section content."""
|
|
try:
|
|
rust_indicators = [
|
|
b'rustc', b'rust_begin_unwind', b'rust_panic', b'rust_oom',
|
|
b'__rust_', b'.rustc_info', b'cargo', b'rustup',
|
|
]
|
|
|
|
rust_found = False
|
|
for section in pe.sections:
|
|
try:
|
|
section_data = section.get_data()
|
|
for rust_indicator in rust_indicators:
|
|
if rust_indicator in section_data:
|
|
rust_found = True
|
|
break
|
|
if rust_found:
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
if rust_found:
|
|
return "rust"
|
|
|
|
go_sections = ['.go.buildinfo', '.go.plt']
|
|
for section in pe.sections:
|
|
section_name = section.Name.decode().rstrip('\x00')
|
|
if section_name in go_sections:
|
|
return "go"
|
|
|
|
high_confidence_indicators = [
|
|
b'go.buildinfo', b'runtime.main', b'runtime.goexit',
|
|
b'runtime.newproc', b'runtime.mallocgc', b'go.string.',
|
|
b'go.func.', b'go.itab.', b'go.mod', b'runtime.systemstack',
|
|
b'go:linkname', b'go:nosplit', b'go:noescape',
|
|
b'runtime.schedt', b'runtime.g', b'runtime.m',
|
|
]
|
|
|
|
go_indicator_count = 0
|
|
for section in pe.sections:
|
|
try:
|
|
section_data = section.get_data()
|
|
for indicator in high_confidence_indicators:
|
|
if indicator in section_data:
|
|
go_indicator_count += 1
|
|
if go_indicator_count >= 2:
|
|
return "go"
|
|
except Exception:
|
|
continue
|
|
|
|
return None
|
|
|
|
except Exception:
|
|
return None
|
|
|
|
def analyze_pe_imports(self, pe):
|
|
"""Return (suspicious_imports, build_with) for a parsed PE."""
|
|
suspicious_imports = []
|
|
build_with = self._detect_runtime_type(pe)
|
|
|
|
if not hasattr(pe, 'DIRECTORY_ENTRY_IMPORT'):
|
|
return suspicious_imports, build_with
|
|
|
|
for entry in pe.DIRECTORY_ENTRY_IMPORT:
|
|
dll_name = entry.dll.decode().lower()
|
|
|
|
for imp in entry.imports:
|
|
if not imp.name:
|
|
continue
|
|
|
|
func_name = imp.name.decode().lower()
|
|
|
|
for lookup_dll in [dll_name, "unknown"]:
|
|
if lookup_dll in self.dll_function_map and func_name in self.dll_function_map[lookup_dll]:
|
|
category, description = self.dll_function_map[lookup_dll][func_name]
|
|
|
|
hint_value = None
|
|
if hasattr(imp, 'import_by_ordinal') and imp.import_by_ordinal:
|
|
hint_value = imp.ordinal if hasattr(imp, 'ordinal') and imp.ordinal is not None else None
|
|
else:
|
|
if hasattr(imp, 'hint') and imp.hint is not None:
|
|
if build_with in ['go', 'rust'] and imp.hint == 0:
|
|
hint_value = None
|
|
else:
|
|
hint_value = imp.hint
|
|
|
|
is_runtime_import = False
|
|
if build_with and build_with in RUNTIME_IMPORTS:
|
|
runtime_dlls = RUNTIME_IMPORTS[build_with]
|
|
is_runtime_import = (
|
|
dll_name in runtime_dlls and
|
|
func_name in runtime_dlls[dll_name]
|
|
)
|
|
|
|
suspicious_imports.append({
|
|
'dll': dll_name,
|
|
'function': func_name,
|
|
'category': category,
|
|
'note': description,
|
|
'hint': hint_value,
|
|
'is_runtime_import': is_runtime_import,
|
|
'runtime_type': build_with if is_runtime_import else None,
|
|
})
|
|
break
|
|
|
|
return suspicious_imports, build_with
|
|
|
|
def analyze_pe_sections(self, pe, entropy_calculator):
|
|
"""Build per-section info with entropy and detection notes."""
|
|
sections_info = []
|
|
standard_sections = [
|
|
'.text', '.data', '.bss', '.rdata', '.edata', '.idata',
|
|
'.pdata', '.reloc', '.rsrc', '.tls', '.debug',
|
|
]
|
|
|
|
for section in pe.sections:
|
|
section_name = section.Name.decode().rstrip('\x00')
|
|
section_data = section.get_data()
|
|
section_entropy = entropy_calculator(section_data)
|
|
|
|
is_standard = section_name in standard_sections
|
|
detection_notes = []
|
|
|
|
if section_entropy > 7.2:
|
|
detection_notes.append('High entropy may trigger detection')
|
|
if section_name == '.text' and section_entropy > 7.0:
|
|
detection_notes.append('Unusual entropy for code section')
|
|
if not is_standard:
|
|
detection_notes.append('Non-standard section name - may trigger detection')
|
|
|
|
sections_info.append({
|
|
'name': section_name,
|
|
'entropy': section_entropy,
|
|
'size': len(section_data),
|
|
'characteristics': section.Characteristics,
|
|
'is_standard': is_standard,
|
|
'detection_notes': detection_notes,
|
|
})
|
|
|
|
return sections_info
|
|
|
|
_security_analyzer_cache = {}
|
|
|
|
|
|
def get_security_analyzer(malapi_path):
|
|
"""Return a cached SecurityAnalyzer keyed by malapi_path."""
|
|
if malapi_path not in _security_analyzer_cache:
|
|
_security_analyzer_cache[malapi_path] = SecurityAnalyzer(malapi_path)
|
|
return _security_analyzer_cache[malapi_path]
|