diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f8a2f9..c0545a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ All notable changes to this project will be documented in this file. +## [v3.0.0] - 2025-05-16 +### Added +- Python Clients to interact with LitterBox Server + * `grumpycat.py` - python standalone cli client + * `LitterBoxMCP.py` - MCP Server to intercat with LitterBox Server + ## [v2.5.2] - 2025-05-09 ### Fixed - Clinet Side: removed hard-coded size limit diff --git a/GrumpyCats/LitterBoxMCP.py b/GrumpyCats/LitterBoxMCP.py new file mode 100644 index 0000000..acdec4a --- /dev/null +++ b/GrumpyCats/LitterBoxMCP.py @@ -0,0 +1,232 @@ +from mcp.server.fastmcp import FastMCP +from grumpycat import LitterBoxClient, LitterBoxError, LitterBoxAPIError + +# Initialize MCP server focused on efficient payload OPSEC analysis +mcp = FastMCP( + name="LitterBoxMCP", + instructions=( + "You are an elite payload OPSEC specialist focused on detection evasion.\n" + "Analyze YARA signatures, behavioral patterns, and detection triggers.\n" + "Provide actionable tradecraft improvements for bypassing EDR and AV.\n" + "Focus on signature evasion, behavioral stealth, and attribution avoidance.\n" + "Emphasize practical improvements to enhance payload operational security." + ), +) + +# Initialize LitterBox client +client = LitterBoxClient(base_url="http://127.0.0.1:1337") + +def handle_api(callable_fn, *args, **kwargs): + """Clean error handling for API operations""" + try: + result = callable_fn(*args, **kwargs) + return {"status": "success", "data": result} + except LitterBoxAPIError as e: + return {"status": "api_error", "message": str(e), "http_code": e.status_code} + except LitterBoxError as e: + return {"status": "client_error", "message": str(e)} + except Exception as e: + return {"status": "error", "message": str(e)} + +# Core Analysis Tools +@mcp.tool(name="upload_payload", description="Upload payload for OPSEC testing") +def upload_payload(path: str, name: str = None): + """Upload payload and get hash for analysis""" + return handle_api(client.upload_file, path, file_name=name) + +@mcp.tool(name="analyze_static", description="Run static analysis - check YARA signatures and file characteristics") +def analyze_static(file_hash: str): + """Run static analysis to identify signature detections""" + return handle_api(client.analyze_file, file_hash, 'static') + +@mcp.tool(name="analyze_dynamic", description="Run dynamic analysis - test behavioral detection and runtime artifacts") +def analyze_dynamic(target: str, cmd_args: list = None): + """Run dynamic analysis to test behavioral evasion""" + return handle_api(client.analyze_file, target, 'dynamic', cmd_args=cmd_args) + +@mcp.tool(name="get_file_info", description="Get basic file information and characteristics") +def get_file_info(file_hash: str): + """Get file metadata, entropy, and PE information""" + return handle_api(client.get_results, file_hash, 'info') + +@mcp.tool(name="get_static_results", description="Get static analysis results - YARA hits and signature detections") +def get_static_results(file_hash: str): + """Get detailed static analysis results""" + return handle_api(client.get_results, file_hash, 'static') + +@mcp.tool(name="get_dynamic_results", description="Get dynamic analysis results - behavioral detections and runtime artifacts") +def get_dynamic_results(target: str): + """Get detailed dynamic analysis results""" + return handle_api(client.get_results, target, 'dynamic') + + +# Utility Tools +@mcp.tool(name="list_payloads", description="List all analyzed payloads with summaries") +def list_payloads(): + """Get summary of all tested payloads""" + return handle_api(client.get_files_summary) + +@mcp.tool(name="validate_pid", description="Validate process ID for dynamic analysis") +def validate_pid(pid: int): + """Validate PID before dynamic analysis""" + return handle_api(client.validate_process, pid) + +@mcp.tool(name="cleanup", description="Clean up all testing artifacts") +def cleanup(): + """Remove all testing artifacts from sandbox""" + return handle_api(client.cleanup) + +@mcp.tool(name="health_check", description="Check sandbox health status") +def health_check(): + """Verify sandbox tools are operational""" + return handle_api(client.check_health) + +@mcp.tool(name="delete_payload", description="Delete specific payload and results") +def delete_payload(file_hash: str): + """Remove payload and all analysis results""" + return handle_api(client.delete_file, file_hash) + +# OPSEC-Focused Prompts +@mcp.prompt() +def analyze_detection_patterns(file_hash: str = "") -> str: + """Analyze what's getting detected and why""" + return f"""Analyze detection patterns for {f'payload {file_hash}' if file_hash else 'the payload'}: + +## Static Detection Analysis +- YARA rule matches and triggered signatures +- File entropy and packing indicators +- Import table and string analysis findings +- PE structure anomalies + +## Dynamic Detection Analysis +- Process manipulation behaviors detected +- Memory artifacts flagged by Moneta +- Behavioral patterns triggering alerts +- Runtime API usage patterns + +## Detection Improvement Strategy +- Signature evasion techniques needed +- Behavioral modification recommendations +- Obfuscation and packing adjustments +- Alternative implementation approaches + +Focus on specific, actionable improvements to bypass detected patterns.""" + +@mcp.prompt() +def assess_evasion_effectiveness(file_hash: str = "") -> str: + """Assess payload evasion effectiveness and improvement areas""" + return f"""Evaluate evasion effectiveness for {f'payload {file_hash}' if file_hash else 'the payload'}: + +## Signature Evasion Assessment +- YARA rule bypass success/failure +- Anti-virus signature avoidance +- Static analysis resistance level +- String obfuscation effectiveness + +## Behavioral Evasion Assessment +- EDR behavioral detection bypass +- Process manipulation stealth +- Memory artifact minimization +- Runtime pattern camouflage + +## Improvement Recommendations +- Prioritized evasion enhancements +- Specific code modification suggestions +- Alternative technique recommendations +- Testing validation requirements + +Provide concrete steps to improve detection evasion rates.""" + +@mcp.prompt() +def analyze_opsec_violations(file_hash: str = "") -> str: + """Identify OPSEC violations and attribution risks""" + return f"""Identify OPSEC violations for {f'payload {file_hash}' if file_hash else 'the payload'}: + +## Attribution Risk Factors +- Similarity to known offensive tools +- Unique behavioral fingerprints +- Metadata and compilation artifacts +- Code pattern attributions + +## OPSEC Violation Analysis +- Signature patterns revealing tool origin +- Behavioral traits linking to frameworks +- File characteristics indicating toolset +- Communication patterns exposing infrastructure + +## Mitigation Strategies +- Attribution masking techniques +- Behavioral diversification methods +- Metadata sanitization requirements +- Fingerprint elimination approaches + +Focus on maintaining operational anonymity and avoiding tool attribution.""" + +@mcp.prompt() +def generate_improvement_plan(file_hash: str = "") -> str: + """Generate prioritized improvement plan for payload enhancement""" + return f"""Create improvement plan for {f'payload {file_hash}' if file_hash else 'the payload'}: + +## Detection Issues Identified +- Critical signature detections requiring immediate attention +- Behavioral patterns triggering EDR alerts +- File characteristics exposing payload nature +- Attribution risks from tool similarity + +## Improvement Priority Matrix +1. **CRITICAL** - Signature bypasses for deployment readiness +2. **HIGH** - Behavioral evasion improvements +3. **MEDIUM** - Attribution risk mitigation +4. **LOW** - General stealth enhancements + +## Implementation Roadmap +- Immediate fixes for critical detections +- Behavioral modification timeline +- Testing and validation checkpoints +- Deployment readiness criteria + +## Success Metrics +- Signature detection rate reduction +- Behavioral alert elimination +- Attribution risk minimization +- Overall stealth improvement + +Provide actionable, prioritized steps for payload enhancement.""" + +@mcp.prompt() +def evaluate_deployment_readiness(file_hash: str = "") -> str: + """Evaluate if payload is ready for operational deployment""" + return f"""Evaluate deployment readiness for {f'payload {file_hash}' if file_hash else 'the payload'}: + +## Readiness Assessment Criteria +- **Signature Evasion**: No YARA rule matches +- **Behavioral Stealth**: Clean dynamic analysis results +- **Attribution Risk**: Low similarity to known tools +- **Technical Functionality**: Proper execution and behavior + +## Risk Assessment +- Detection probability estimation +- Attribution risk evaluation +- Operational security threats +- Incident response impact + +## Deployment Decision Matrix +``` +Category | Status | Severity | Blocker +----------------|--------|----------|-------- +Signatures | P/F | H/M/L | Y/N +Behavior | P/F | H/M/L | Y/N +Attribution | P/F | H/M/L | Y/N +Functionality | P/F | H/M/L | Y/N +``` + +## Final Recommendation +- **GO/NO-GO/CONDITIONAL** deployment decision +- Required fixes before deployment +- Risk acceptance considerations +- Monitoring requirements post-deployment + +Provide clear deployment recommendation with supporting rationale.""" + +if __name__ == "__main__": + mcp.serve(host="0.0.0.0", port=50051) \ No newline at end of file diff --git a/GrumpyCats/README.md b/GrumpyCats/README.md new file mode 100644 index 0000000..5930aea --- /dev/null +++ b/GrumpyCats/README.md @@ -0,0 +1,151 @@ +# GrumpyCats - LitterBox Malware Analysis Clients + +[![Python](https://img.shields.io/badge/Python-3.11+-blue.svg)](https://www.python.org/downloads/) +[![License](https://img.shields.io/badge/license-GPL%20v3-green.svg)]() + +A comprehensive toolkit for interacting with LitterBox malware analysis sandbox, featuring a standalone Python client and an MCP server for LLM-assisted analysis. + +--- + +## grumpycat.py + +**A Python client for interacting with a LitterBox malware analysis sandbox API.** + +### Requirements + +```bash +pip install requests +``` +* NOTE: Install it globaly on the system + +### Usage + +```bash +python grumpycat.py [GLOBAL_OPTIONS] [COMMAND_OPTIONS] +``` + +``` +LitterBox Malware Analysis Client + +positional arguments: + {upload,analyze-pid,results,files,doppelganger-scan,doppelganger,doppelganger-db,cleanup,health,delete} + Command to execute + upload Upload file for analysis + analyze-pid Analyze running process + results Get analysis results + files Get summary of all analyzed files + doppelganger-scan Run doppelganger system scan + doppelganger Run doppelganger analysis + doppelganger-db Create doppelganger fuzzy database + cleanup Clean up analysis artifacts + health Check service health + delete Delete file and its results + +options: + -h, --help show this help message and exit + --debug Enable debug logging + --url URL LitterBox server URL + --timeout TIMEOUT Request timeout in seconds + --no-verify-ssl Disable SSL verification + --proxy PROXY Proxy URL (e.g., http://proxy:8080) + + +``` + +## Examples + +``` + # Upload and analyze a file + grumpycat.py upload malware.exe --analysis static dynamic + + # Analyze a running process + grumpycat.py analyze-pid 1234 --wait + + # Run Doppelganger scan + grumpycat.py doppelganger-scan --type blender + + # Run Doppelganger analysis + grumpycat.py doppelganger abc123def --type fuzzy + + # Create fuzzy hash database + grumpycat.py doppelganger-db --folder /path/to/files --extensions .exe .dll + + # Get analysis results + grumpycat.py results abc123def --type static + + # Clean up analysis artifacts + grumpycat.py cleanup --all +``` +--- + +## LitterBoxMCP.py + +**A MCP server that wrap grumpycat.py to intercat with LitterBox server.** + +### Requirements + +| Requirement | Installation | +|-------------|--------------| +| **Claude Desktop** | [Download](https://claude.ai/desktop) | +| **fastmcp** | `pip install fastmcp` | +| **mcp-server** | `pip install mcp-server` | +| **requests** | `pip install requests` | +| **uv** | `powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 \| iex"` | +| **grumpycat.py** | Place in same directory | + +### Setup + +1. **Install all requirements** +2. **Install LitterBoxMCP in Claude Desktop:** + +```bash +mcp install .\LitterBoxMCP.py +``` + +**Expected output:** +``` +[05/16/25 02:47:13] INFO Added server 'LitterBoxMCP' to Claude config claude.py:143 + INFO Successfully installed LitterBoxMCP in Claude app +``` + +### Core Analysis Tools + +| Tool | Description | +|------|-------------| +| `upload_payload(path, name=None)` | Upload payload and get hash for analysis | +| `analyze_static(file_hash)` | Run static analysis - check YARA signatures and file characteristics | +| `analyze_dynamic(target, cmd_args=None)` | Run dynamic analysis - test behavioral detection and runtime artifacts | +| `get_file_info(file_hash)` | Get file metadata, entropy, and PE information | +| `get_static_results(file_hash)` | Get detailed static analysis results | +| `get_dynamic_results(target)` | Get detailed dynamic analysis results | + +### Utility Tools + +| Tool | Description | +|------|-------------| +| `list_payloads()` | Get summary of all tested payloads | +| `validate_pid(pid)` | Validate process ID before dynamic analysis | +| `cleanup()` | Remove all testing artifacts from sandbox | +| `health_check()` | Verify sandbox tools are operational | +| `delete_payload(file_hash)` | Remove payload and all analysis results | + +### OPSEC-Focused Prompts + +| Prompt | Purpose | +|--------|---------| +| `analyze_detection_patterns(file_hash="")` | Analyze what's getting detected and why - YARA rules, entropy, behavioral patterns | +| `assess_evasion_effectiveness(file_hash="")` | Evaluate signature and behavioral evasion success rates | +| `analyze_opsec_violations(file_hash="")` | Identify attribution risks and operational security violations | +| `generate_improvement_plan(file_hash="")` | Create prioritized roadmap for payload enhancement | +| `evaluate_deployment_readiness(file_hash="")` | Assess if payload is ready for operational deployment | + +### Key Features + +- **Robust Error Handling** - Detailed status messages for API errors +- **OPSEC Focus** - Detection evasion, signature bypassing, and attribution avoidance +- **Actionable Intelligence** - Specific recommendations for improving payload stealth +- **Comprehensive Analysis** - Static signatures, dynamic behavior, and operational security + +## Claude prompts example + +![LitterBoxMCP](../Screenshots/LitterBoxMCP.mp4) \ No newline at end of file diff --git a/GrumpyCats/grumpycat.py b/GrumpyCats/grumpycat.py new file mode 100644 index 0000000..e02190d --- /dev/null +++ b/GrumpyCats/grumpycat.py @@ -0,0 +1,776 @@ +import argparse +import hashlib +import json +import logging +import os +import requests +import sys +from datetime import datetime +from pathlib import Path +from typing import Optional, Dict, List, Union, BinaryIO, Any, Tuple +from requests.adapters import HTTPAdapter, Retry +from urllib.parse import urljoin + + +class LitterBoxError(Exception): + """Base exception for LitterBox client errors""" + pass + +class LitterBoxAPIError(LitterBoxError): + """Exception for API-related errors""" + def __init__(self, message: str, status_code: Optional[int] = None, response: Optional[Dict] = None): + super().__init__(message) + self.status_code = status_code + self.response = response + +class LitterBoxClient: + """A Python client for interacting with the LitterBox malware analysis sandbox API.""" + + def __init__(self, + base_url: str = "http://127.0.0.1:1337", + timeout: int = 120, + max_retries: int = 3, + verify_ssl: bool = True, + logger: Optional[logging.Logger] = None, + proxy_config: Optional[Dict] = None, + headers: Optional[Dict] = None): + """Initialize the LitterBox client. + + Args: + base_url: The base URL of the LitterBox server + timeout: Request timeout in seconds + max_retries: Maximum number of retries for failed requests + verify_ssl: Whether to verify SSL certificates + logger: Custom logger instance + proxy_config: Proxy configuration dictionary (e.g., {"http": "http://proxy:8080"}) + headers: Additional headers to include in requests + """ + self.base_url = base_url.rstrip('/') + self.timeout = timeout + self.verify_ssl = verify_ssl + self.logger = logger or logging.getLogger(__name__) + self.proxy_config = proxy_config + self.headers = headers or {} + + # Configure session with retries + self.session = requests.Session() + retry_strategy = Retry( + total=max_retries, + backoff_factor=0.5, + status_forcelist=[500, 502, 503, 504], + allowed_methods=["HEAD", "GET", "POST", "PUT", "DELETE", "OPTIONS", "TRACE"] + ) + adapter = HTTPAdapter(max_retries=retry_strategy) + self.session.mount("http://", adapter) + self.session.mount("https://", adapter) + + if proxy_config: + self.session.proxies.update(proxy_config) + if not verify_ssl: + self.session.verify = False + self.session.headers.update(self.headers) + + def _make_request(self, + method: str, + endpoint: str, + **kwargs) -> requests.Response: + """Make HTTP request with error handling. + + Args: + method: HTTP method + endpoint: API endpoint + **kwargs: Additional arguments for requests + + Returns: + Response object + + Raises: + LitterBoxAPIError: If the API returns an error + LitterBoxError: For other client errors + """ + url = urljoin(self.base_url, endpoint) + try: + kwargs.setdefault('timeout', self.timeout) + response = self.session.request(method, url, **kwargs) + response.raise_for_status() + return response + except requests.exceptions.HTTPError as e: + try: + error_data = response.json() + except: + error_data = {'error': response.text} + raise LitterBoxAPIError( + f"API error: {error_data.get('error', 'Unknown error')}", + status_code=response.status_code, + response=error_data + ) + except requests.exceptions.RequestException as e: + raise LitterBoxError(f"Request failed: {str(e)}") + + def upload_file(self, + file_path: Union[str, Path, BinaryIO], + file_name: Optional[str] = None) -> Dict: + """Upload a file for analysis. + + Args: + file_path: Path to the file or file-like object + file_name: Optional name to use for the file + + Returns: + Dict containing upload status and file information + """ + if isinstance(file_path, (str, Path)): + path = Path(file_path) + if not path.exists(): + raise LitterBoxError(f"File not found: {path}") + files = {'file': (file_name or path.name, open(path, 'rb'), 'application/octet-stream')} + else: + if not file_name: + raise ValueError("file_name is required when uploading file-like objects") + files = {'file': (file_name, file_path, 'application/octet-stream')} + + response = self._make_request('POST', '/upload', files=files) + return response.json() + + def analyze_file(self, + target: str, + analysis_type: str, + cmd_args: Optional[List[str]] = None, + wait_for_completion: bool = True) -> Dict: + """Run analysis on a file or process. + + Args: + target: File hash or PID + analysis_type: 'static' or 'dynamic' + cmd_args: Optional command line arguments for dynamic analysis + wait_for_completion: Whether to wait for analysis completion + + Returns: + Dict containing analysis results + """ + # Initial validations + if analysis_type not in ['static', 'dynamic']: + raise ValueError("analysis_type must be either 'static' or 'dynamic'") + + # For dynamic analysis with PID, validate first + if analysis_type == 'dynamic' and target.isdigit(): + self.validate_process(target) + + # For static analysis, cannot use PID + if analysis_type == 'static' and target.isdigit(): + raise ValueError("Cannot perform static analysis on PID") + + # Prepare request + params = {'wait': '1' if wait_for_completion else '0'} + + # Validate and prepare command line arguments + data = {} + if cmd_args is not None: + if not isinstance(cmd_args, list): + raise ValueError("Arguments must be provided as a list") + if not all(isinstance(arg, str) for arg in cmd_args): + raise ValueError("All arguments must be strings") + if any(any(char in arg for char in [';', '&', '|']) for arg in cmd_args): + raise ValueError("Invalid argument characters detected") + data['args'] = cmd_args + + # Make the analysis request + response = self._make_request( + 'POST', + f'/analyze/{analysis_type}/{target}', + params=params, + json=data + ) + + # Handle early termination case (202 status) + if response.status_code == 202: + result = response.json() + return { + 'status': 'early_termination', + 'error': result.get('error', {}).get('message', 'Process terminated early'), + 'details': { + 'termination_time': result.get('error', {}).get('termination_time'), + 'init_time': result.get('error', {}).get('init_time'), + 'message': result.get('error', {}).get('details') + } + } + + # Handle normal response + result = response.json() + + # If error occurred during analysis + if result.get('status') == 'error': + return { + 'status': 'error', + 'error': result.get('error', {}).get('message', 'Analysis failed'), + 'details': result.get('error', {}).get('details') + } + + # Success case + return { + 'status': 'success', + 'results': result.get('results', {}) + } + """Run analysis on a file or process. + + Args: + target: File hash or PID + analysis_type: 'static' or 'dynamic' + cmd_args: Optional command line arguments for dynamic analysis + wait_for_completion: Whether to wait for analysis completion + + Returns: + Dict containing: + - status: 'success', 'error', or 'early_termination' + - results: Analysis results (on success) + - error: Error message (on error) + - details: Additional error details (on error/early termination) + + Raises: + ValueError: If analysis type is invalid + LitterBoxAPIError: If API returns an error + """ + if analysis_type not in ['static', 'dynamic']: + raise ValueError("analysis_type must be either 'static' or 'dynamic'") + + # Validate command line arguments if provided + data = {} + if cmd_args is not None: + if not isinstance(cmd_args, list): + raise ValueError("Arguments must be provided as a list") + + if not all(isinstance(arg, str) for arg in cmd_args): + raise ValueError("All arguments must be strings") + + if any(any(char in arg for char in [';', '&', '|']) for arg in cmd_args): + raise ValueError("Invalid argument characters detected") + + data['args'] = cmd_args + + params = {'wait': '1' if wait_for_completion else '0'} + + response = self._make_request( + 'POST', + f'/analyze/{analysis_type}/{target}', + params=params, + json=data + ) + + if response.status_code == 202: # Early termination + return response.json() + + result = response.json() + if result.get('status') == 'error': + error_info = result.get('error', {}) + if isinstance(error_info, str): + error_info = {'message': error_info} + return { + 'status': 'error', + 'error': error_info.get('message', 'Analysis failed'), + 'details': error_info.get('details') + } + + return result + """Run analysis on a file or process. + + Args: + target: File hash or PID + analysis_type: 'static' or 'dynamic' + cmd_args: Optional command line arguments for dynamic analysis + wait_for_completion: Whether to wait for analysis completion + + Returns: + Dict containing analysis results + """ + if analysis_type not in ['static', 'dynamic']: + raise ValueError("analysis_type must be either 'static' or 'dynamic'") + + # For non-PID targets, verify the file exists first + if not str(target).isdigit() and verify_file: + try: + # Check if file exists by attempting to get its info + self._make_request('GET', f'/api/results/{target}/info') + except LitterBoxAPIError as e: + if e.status_code == 404: + raise LitterBoxError(f"File {target} not found or not yet available") + + # Validate PID for dynamic analysis + if analysis_type == 'dynamic' and str(target).isdigit(): + self.validate_process(target) + elif analysis_type == 'static' and str(target).isdigit(): + raise ValueError("Cannot perform static analysis on PID") + + params = {'wait': '1' if wait_for_completion else '0'} + data = {} + + if cmd_args is not None: + # Validate command line arguments + if not isinstance(cmd_args, list): + raise ValueError("cmd_args must be provided as a list") + if not all(isinstance(arg, str) for arg in cmd_args): + raise ValueError("All arguments must be strings") + if any(any(char in arg for char in [';', '&', '|']) for arg in cmd_args): + raise ValueError("Invalid argument characters detected") + data['args'] = cmd_args + + response = self._make_request( + 'POST', + f'/analyze/{analysis_type}/{target}', + params=params, + json=data + ) + result = response.json() + + # Handle early termination case for dynamic analysis + if result.get('status') == 'early_termination': + return { + 'status': 'early_termination', + 'error': result.get('error', {}).get('message', 'Process terminated early'), + 'details': { + 'termination_time': result.get('error', {}).get('termination_time'), + 'init_time': result.get('error', {}).get('init_time'), + 'message': result.get('error', {}).get('details') + } + } + + return result + + def get_results(self, + target: str, + analysis_type: str) -> Dict: + """Get results for a specific analysis. + + Args: + target: File hash or PID + analysis_type: 'static', 'dynamic', or 'info' + + Returns: + Dict containing analysis results + """ + if analysis_type not in ['static', 'dynamic', 'info']: + raise ValueError("analysis_type must be one of: 'static', 'dynamic', 'info'") + + response = self._make_request( + 'GET', + f'/api/results/{target}/{analysis_type}' + ) + return response.json() + + def get_files_summary(self) -> Dict: + """Get summary of all analyzed files and processes. + + Returns: + Dict containing analysis summaries + """ + response = self._make_request('GET', '/files') + return response.json() + + def run_blender_scan(self) -> Dict: + """Run a system-wide Blender scan. + + Returns: + Dict containing scan results + """ + data = {"operation": "scan"} + response = self._make_request('POST', '/blender', json=data) + return response.json() + + def compare_with_blender(self, + file_hash: str) -> Dict: + """Compare a file's analysis results with current system state. + + Args: + file_hash: Hash of the file to compare + + Returns: + Dict containing comparison results + """ + params = {'hash': file_hash} + response = self._make_request('GET', '/blender', params=params) + return response.json() + + def cleanup(self, + include_uploads: bool = True, + include_results: bool = True, + include_analysis: bool = True) -> Dict: + """Clean up analysis artifacts and uploaded files. + + Args: + include_uploads: Whether to clean upload directory + include_results: Whether to clean results directory + include_analysis: Whether to clean analysis artifacts + + Returns: + Dict containing cleanup results + """ + data = { + 'cleanup_uploads': include_uploads, + 'cleanup_results': include_results, + 'cleanup_analysis': include_analysis + } + response = self._make_request('POST', '/cleanup', json=data) + return response.json() + + def check_health(self) -> Dict: + """Check the health status of the LitterBox service. + + Returns: + Dict containing health status information + """ + response = self._make_request('GET', '/health') + return response.json() + + def delete_file(self, + file_hash: str) -> Dict: + """Delete a file and its analysis results. + + Args: + file_hash: Hash of the file to delete + + Returns: + Dict containing deletion status + """ + response = self._make_request('DELETE', f'/file/{file_hash}') + return response.json() + + def validate_process(self, pid: Union[str, int]) -> Dict: + """Validate if a process ID exists and is accessible. + + Args: + pid: Process ID to validate + + Returns: + Dict containing validation status + """ + response = self._make_request('POST', f'/validate/{pid}') + return response.json() + + def analyze_with_doppelganger(self, + analysis_type: str, + operation: str, + file_hash: Optional[str] = None, + folder_path: Optional[str] = None, + extensions: Optional[List[str]] = None, + threshold: int = 1) -> Dict: + """Unified method for doppelganger analysis operations. + + Args: + analysis_type: 'blender' or 'fuzzy' + operation: 'scan', 'create_db', or 'analyze' + file_hash: Hash of file to analyze (for analyze operation) + folder_path: Path to folder (for create_db operation) + extensions: List of file extensions (for create_db operation) + threshold: Similarity threshold (for fuzzy analysis) + + Returns: + Dict containing analysis results + """ + if analysis_type not in ['blender', 'fuzzy']: + raise ValueError("analysis_type must be either 'blender' or 'fuzzy'") + + if operation == 'scan' and analysis_type != 'blender': + raise ValueError("scan operation is only available for blender analysis") + + # For GET requests (comparisons) + if file_hash and operation != 'analyze': + params = {'type': analysis_type, 'hash': file_hash} + response = self._make_request('GET', '/doppelganger', params=params) + return response.json() + + # For POST requests + data = { + 'type': analysis_type, + 'operation': operation + } + + if operation == 'create_db': + if not folder_path: + raise ValueError("folder_path is required for create_db operation") + data['folder_path'] = folder_path + if extensions: + data['extensions'] = extensions + + elif operation == 'analyze': + if not file_hash: + raise ValueError("file_hash is required for analyze operation") + data['hash'] = file_hash + data['threshold'] = threshold + + response = self._make_request('POST', '/doppelganger', json=data) + return response.json() + + def run_system_scan(self) -> Dict: + """Run a system-wide scan using doppelganger blender analysis.""" + return self.analyze_with_doppelganger('blender', 'scan') + + def compare_against_system(self, file_hash: str, analysis_type: str = 'blender') -> Dict: + """Compare a file against system state using doppelganger.""" + return self.analyze_with_doppelganger(analysis_type, 'compare', file_hash=file_hash) + + def create_fuzzy_db(self, folder_path: str, extensions: Optional[List[str]] = None) -> Dict: + """Create fuzzy hash database using doppelganger.""" + return self.analyze_with_doppelganger('fuzzy', 'create_db', + folder_path=folder_path, + extensions=extensions) + + def analyze_fuzzy(self, file_hash: str, threshold: int = 1) -> Dict: + """Analyze a file using fuzzy hash comparison.""" + return self.analyze_with_doppelganger('fuzzy', 'analyze', + file_hash=file_hash, + threshold=threshold) + + def __enter__(self): + """Support for context manager protocol.""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Clean up resources when exiting context manager.""" + self.session.close() + +def create_arg_parser(): + """Create and configure the argument parser for command line usage.""" + parser = argparse.ArgumentParser( + description="LitterBox Malware Analysis Client", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Upload and analyze a file + %(prog)s upload malware.exe --analysis static dynamic + + # Analyze a running process + %(prog)s analyze-pid 1234 --wait + + # Run Doppelganger scan + %(prog)s doppelganger-scan --type blender + + # Run Doppelganger analysis + %(prog)s doppelganger abc123def --type fuzzy + + # Create fuzzy hash database + %(prog)s doppelganger-db --folder /path/to/files --extensions .exe .dll + + # Get analysis results + %(prog)s results abc123def --type static + + # Clean up analysis artifacts + %(prog)s cleanup --all +""" + ) + + # General options + parser.add_argument('--debug', action='store_true', help='Enable debug logging') + parser.add_argument('--url', default='http://127.0.0.1:1337', help='LitterBox server URL') + parser.add_argument('--timeout', type=int, default=30, help='Request timeout in seconds') + parser.add_argument('--no-verify-ssl', action='store_true', help='Disable SSL verification') + parser.add_argument('--proxy', help='Proxy URL (e.g., http://proxy:8080)') + + subparsers = parser.add_subparsers(dest='command', help='Command to execute') + + # Upload command + upload_parser = subparsers.add_parser('upload', help='Upload file for analysis') + upload_parser.add_argument('file', help='File to upload') + upload_parser.add_argument('--name', help='Custom name for the file') + upload_parser.add_argument('--analysis', nargs='+', choices=['static', 'dynamic'], + help='Run analysis after upload') + upload_parser.add_argument('--args', nargs='+', help='Command line arguments for dynamic analysis') + + # Analyze PID command + analyze_pid_parser = subparsers.add_parser('analyze-pid', help='Analyze running process') + analyze_pid_parser.add_argument('pid', type=int, help='Process ID to analyze') + analyze_pid_parser.add_argument('--wait', action='store_true', + help='Wait for analysis completion') + analyze_pid_parser.add_argument('--args', nargs='+', help='Command line arguments') + + # Results command + results_parser = subparsers.add_parser('results', help='Get analysis results') + results_parser.add_argument('target', help='File hash or PID') + results_parser.add_argument('--type', choices=['static', 'dynamic', 'info'], + required=True, help='Type of results to retrieve') + + # Files summary command + subparsers.add_parser('files', help='Get summary of all analyzed files') + + # Doppelganger scan command + doppelganger_scan_parser = subparsers.add_parser('doppelganger-scan', help='Run doppelganger system scan') + doppelganger_scan_parser.add_argument('--type', choices=['blender', 'fuzzy'], default='blender', + help='Type of scan to perform') + + # Doppelganger analyze command + doppelganger_parser = subparsers.add_parser('doppelganger', help='Run doppelganger analysis') + doppelganger_parser.add_argument('hash', help='File hash to analyze') + doppelganger_parser.add_argument('--type', choices=['blender', 'fuzzy'], required=True, + help='Type of analysis to perform') + doppelganger_parser.add_argument('--threshold', type=int, default=1, + help='Similarity threshold for fuzzy analysis') + + # Doppelganger database command + db_parser = subparsers.add_parser('doppelganger-db', help='Create doppelganger fuzzy database') + db_parser.add_argument('--folder', required=True, help='Folder path to process') + db_parser.add_argument('--extensions', nargs='+', help='File extensions to include') + + # Cleanup command + cleanup_parser = subparsers.add_parser('cleanup', help='Clean up analysis artifacts') + cleanup_parser.add_argument('--all', action='store_true', + help='Clean all artifacts') + cleanup_parser.add_argument('--uploads', action='store_true', + help='Clean upload directory') + cleanup_parser.add_argument('--results', action='store_true', + help='Clean results directory') + cleanup_parser.add_argument('--analysis', action='store_true', + help='Clean analysis artifacts') + + # Health check command + subparsers.add_parser('health', help='Check service health') + + # Delete command + delete_parser = subparsers.add_parser('delete', help='Delete file and its results') + delete_parser.add_argument('hash', help='File hash to delete') + + return parser + +def main(): + parser = create_arg_parser() + args = parser.parse_args() + + # Configure logging + log_level = logging.DEBUG if args.debug else logging.INFO + logging.basicConfig(level=log_level, + format='%(asctime)s - %(levelname)s - %(message)s') + logger = logging.getLogger('litterbox') + + # Create client with CLI arguments + client_kwargs = { + 'base_url': args.url, + 'timeout': args.timeout, + 'verify_ssl': not args.no_verify_ssl, + 'logger': logger, + } + + if args.proxy: + client_kwargs['proxy_config'] = { + 'http': args.proxy, + 'https': args.proxy + } + + try: + client = LitterBoxClient(**client_kwargs) + + if args.command == 'upload': + # Upload file + result = client.upload_file(args.file, file_name=args.name) + file_hash = result['file_info']['md5'] + print(f"File uploaded successfully. Hash: {file_hash}") + + # Run requested analysis + if args.analysis: + for analysis_type in args.analysis: + print(f"Running {analysis_type} analysis...") + analysis_args = args.args if analysis_type == 'dynamic' else None + result = client.analyze_file( + file_hash, + analysis_type, + cmd_args=analysis_args, + wait_for_completion=True + ) + + if result.get('status') == 'early_termination': + print("Process terminated early:") + print(f"Error: {result.get('error')}") + print("Details:") + print(f" Termination time: {result['details'].get('termination_time')}") + print(f" Init time: {result['details'].get('init_time')}") + print(f" Message: {result['details'].get('message')}") + elif result.get('status') == 'error': + print(f"Analysis failed: {result.get('error')}") + if 'details' in result: + print(f"Details: {result['details']}") + else: + print(json.dumps(result, indent=2)) + + elif args.command == 'analyze-pid': + # Analyze process + print(f"Analyzing process {args.pid}...") + result = client.analyze_file( + str(args.pid), + 'dynamic', + cmd_args=args.args, + wait_for_completion=args.wait + ) + print(json.dumps(result, indent=2)) + + elif args.command == 'results': + result = client.get_results(args.target, args.type) + print(json.dumps(result, indent=2)) + + elif args.command == 'files': + result = client.get_files_summary() + print(json.dumps(result, indent=2)) + + elif args.command == 'doppelganger-scan': + print(f"Running doppelganger scan with type: {args.type}") + result = client.analyze_with_doppelganger(args.type, 'scan') + print(json.dumps(result, indent=2)) + + elif args.command == 'doppelganger': + print(f"Running doppelganger analysis with type: {args.type}") + result = client.analyze_with_doppelganger( + args.type, + 'analyze', + file_hash=args.hash, + threshold=args.threshold + ) + print(json.dumps(result, indent=2)) + + elif args.command == 'doppelganger-db': + print("Creating doppelganger fuzzy database...") + result = client.analyze_with_doppelganger( + 'fuzzy', + 'create_db', + folder_path=args.folder, + extensions=args.extensions + ) + print(json.dumps(result, indent=2)) + + elif args.command == 'cleanup': + if args.all: + args.uploads = args.results = args.analysis = True + result = client.cleanup( + include_uploads=args.uploads, + include_results=args.results, + include_analysis=args.analysis + ) + print(json.dumps(result, indent=2)) + + elif args.command == 'health': + result = client.check_health() + print(json.dumps(result, indent=2)) + + elif args.command == 'delete': + result = client.delete_file(args.hash) + print(json.dumps(result, indent=2)) + + else: + parser.print_help() + + except LitterBoxAPIError as e: + logger.error(f"API Error (Status {e.status_code}): {str(e)}") + if args.debug: + logger.debug(f"Response data: {e.response}") + sys.exit(1) + except LitterBoxError as e: + logger.error(f"Client Error: {str(e)}") + sys.exit(1) + except Exception as e: + logger.error(f"Unexpected Error: {str(e)}") + if args.debug: + logger.exception("Detailed error information:") + sys.exit(1) + finally: + client.session.close() + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\nOperation cancelled by user") + sys.exit(130) \ No newline at end of file diff --git a/README.md b/README.md index 5521e2f..3977678 100644 --- a/README.md +++ b/README.md @@ -189,6 +189,16 @@ The `config.yml` file controls: - YARA rule locations - Analysis timeouts and limits +## Python Clients + +For programmatic access to LitterBox APIs, use the **GrumpyCats** client library: + +**[GrumpyCats README](GrumpyCats/README.md)** + +**grumpycat.py** - Python client for LitterBox API interaction +**LitterBoxMCP.py** - MCP server for LLM-assisted malware analysis + +--- ## SECURITY WARNINGS diff --git a/Screenshots/LitterBoxMCP.mp4 b/Screenshots/LitterBoxMCP.mp4 new file mode 100644 index 0000000..d903e44 Binary files /dev/null and b/Screenshots/LitterBoxMCP.mp4 differ