LitterBox v3.0.0

This commit is contained in:
BlackSnufkin
2025-05-16 07:26:48 -07:00
parent e23126daab
commit a3a3f697ad
6 changed files with 1175 additions and 0 deletions
+6
View File
@@ -2,6 +2,12 @@
All notable changes to this project will be documented in this file.
## [v3.0.0] - 2025-05-16
### Added
- Python Clients to interact with LitterBox Server
* `grumpycat.py` - python standalone cli client
* `LitterBoxMCP.py` - MCP Server to intercat with LitterBox Server
## [v2.5.2] - 2025-05-09
### Fixed
- Clinet Side: removed hard-coded size limit
+232
View File
@@ -0,0 +1,232 @@
from mcp.server.fastmcp import FastMCP
from grumpycat import LitterBoxClient, LitterBoxError, LitterBoxAPIError
# Initialize MCP server focused on efficient payload OPSEC analysis
mcp = FastMCP(
name="LitterBoxMCP",
instructions=(
"You are an elite payload OPSEC specialist focused on detection evasion.\n"
"Analyze YARA signatures, behavioral patterns, and detection triggers.\n"
"Provide actionable tradecraft improvements for bypassing EDR and AV.\n"
"Focus on signature evasion, behavioral stealth, and attribution avoidance.\n"
"Emphasize practical improvements to enhance payload operational security."
),
)
# Initialize LitterBox client
client = LitterBoxClient(base_url="http://127.0.0.1:1337")
def handle_api(callable_fn, *args, **kwargs):
"""Clean error handling for API operations"""
try:
result = callable_fn(*args, **kwargs)
return {"status": "success", "data": result}
except LitterBoxAPIError as e:
return {"status": "api_error", "message": str(e), "http_code": e.status_code}
except LitterBoxError as e:
return {"status": "client_error", "message": str(e)}
except Exception as e:
return {"status": "error", "message": str(e)}
# Core Analysis Tools
@mcp.tool(name="upload_payload", description="Upload payload for OPSEC testing")
def upload_payload(path: str, name: str = None):
"""Upload payload and get hash for analysis"""
return handle_api(client.upload_file, path, file_name=name)
@mcp.tool(name="analyze_static", description="Run static analysis - check YARA signatures and file characteristics")
def analyze_static(file_hash: str):
"""Run static analysis to identify signature detections"""
return handle_api(client.analyze_file, file_hash, 'static')
@mcp.tool(name="analyze_dynamic", description="Run dynamic analysis - test behavioral detection and runtime artifacts")
def analyze_dynamic(target: str, cmd_args: list = None):
"""Run dynamic analysis to test behavioral evasion"""
return handle_api(client.analyze_file, target, 'dynamic', cmd_args=cmd_args)
@mcp.tool(name="get_file_info", description="Get basic file information and characteristics")
def get_file_info(file_hash: str):
"""Get file metadata, entropy, and PE information"""
return handle_api(client.get_results, file_hash, 'info')
@mcp.tool(name="get_static_results", description="Get static analysis results - YARA hits and signature detections")
def get_static_results(file_hash: str):
"""Get detailed static analysis results"""
return handle_api(client.get_results, file_hash, 'static')
@mcp.tool(name="get_dynamic_results", description="Get dynamic analysis results - behavioral detections and runtime artifacts")
def get_dynamic_results(target: str):
"""Get detailed dynamic analysis results"""
return handle_api(client.get_results, target, 'dynamic')
# Utility Tools
@mcp.tool(name="list_payloads", description="List all analyzed payloads with summaries")
def list_payloads():
"""Get summary of all tested payloads"""
return handle_api(client.get_files_summary)
@mcp.tool(name="validate_pid", description="Validate process ID for dynamic analysis")
def validate_pid(pid: int):
"""Validate PID before dynamic analysis"""
return handle_api(client.validate_process, pid)
@mcp.tool(name="cleanup", description="Clean up all testing artifacts")
def cleanup():
"""Remove all testing artifacts from sandbox"""
return handle_api(client.cleanup)
@mcp.tool(name="health_check", description="Check sandbox health status")
def health_check():
"""Verify sandbox tools are operational"""
return handle_api(client.check_health)
@mcp.tool(name="delete_payload", description="Delete specific payload and results")
def delete_payload(file_hash: str):
"""Remove payload and all analysis results"""
return handle_api(client.delete_file, file_hash)
# OPSEC-Focused Prompts
@mcp.prompt()
def analyze_detection_patterns(file_hash: str = "") -> str:
"""Analyze what's getting detected and why"""
return f"""Analyze detection patterns for {f'payload {file_hash}' if file_hash else 'the payload'}:
## Static Detection Analysis
- YARA rule matches and triggered signatures
- File entropy and packing indicators
- Import table and string analysis findings
- PE structure anomalies
## Dynamic Detection Analysis
- Process manipulation behaviors detected
- Memory artifacts flagged by Moneta
- Behavioral patterns triggering alerts
- Runtime API usage patterns
## Detection Improvement Strategy
- Signature evasion techniques needed
- Behavioral modification recommendations
- Obfuscation and packing adjustments
- Alternative implementation approaches
Focus on specific, actionable improvements to bypass detected patterns."""
@mcp.prompt()
def assess_evasion_effectiveness(file_hash: str = "") -> str:
"""Assess payload evasion effectiveness and improvement areas"""
return f"""Evaluate evasion effectiveness for {f'payload {file_hash}' if file_hash else 'the payload'}:
## Signature Evasion Assessment
- YARA rule bypass success/failure
- Anti-virus signature avoidance
- Static analysis resistance level
- String obfuscation effectiveness
## Behavioral Evasion Assessment
- EDR behavioral detection bypass
- Process manipulation stealth
- Memory artifact minimization
- Runtime pattern camouflage
## Improvement Recommendations
- Prioritized evasion enhancements
- Specific code modification suggestions
- Alternative technique recommendations
- Testing validation requirements
Provide concrete steps to improve detection evasion rates."""
@mcp.prompt()
def analyze_opsec_violations(file_hash: str = "") -> str:
"""Identify OPSEC violations and attribution risks"""
return f"""Identify OPSEC violations for {f'payload {file_hash}' if file_hash else 'the payload'}:
## Attribution Risk Factors
- Similarity to known offensive tools
- Unique behavioral fingerprints
- Metadata and compilation artifacts
- Code pattern attributions
## OPSEC Violation Analysis
- Signature patterns revealing tool origin
- Behavioral traits linking to frameworks
- File characteristics indicating toolset
- Communication patterns exposing infrastructure
## Mitigation Strategies
- Attribution masking techniques
- Behavioral diversification methods
- Metadata sanitization requirements
- Fingerprint elimination approaches
Focus on maintaining operational anonymity and avoiding tool attribution."""
@mcp.prompt()
def generate_improvement_plan(file_hash: str = "") -> str:
"""Generate prioritized improvement plan for payload enhancement"""
return f"""Create improvement plan for {f'payload {file_hash}' if file_hash else 'the payload'}:
## Detection Issues Identified
- Critical signature detections requiring immediate attention
- Behavioral patterns triggering EDR alerts
- File characteristics exposing payload nature
- Attribution risks from tool similarity
## Improvement Priority Matrix
1. **CRITICAL** - Signature bypasses for deployment readiness
2. **HIGH** - Behavioral evasion improvements
3. **MEDIUM** - Attribution risk mitigation
4. **LOW** - General stealth enhancements
## Implementation Roadmap
- Immediate fixes for critical detections
- Behavioral modification timeline
- Testing and validation checkpoints
- Deployment readiness criteria
## Success Metrics
- Signature detection rate reduction
- Behavioral alert elimination
- Attribution risk minimization
- Overall stealth improvement
Provide actionable, prioritized steps for payload enhancement."""
@mcp.prompt()
def evaluate_deployment_readiness(file_hash: str = "") -> str:
"""Evaluate if payload is ready for operational deployment"""
return f"""Evaluate deployment readiness for {f'payload {file_hash}' if file_hash else 'the payload'}:
## Readiness Assessment Criteria
- **Signature Evasion**: No YARA rule matches
- **Behavioral Stealth**: Clean dynamic analysis results
- **Attribution Risk**: Low similarity to known tools
- **Technical Functionality**: Proper execution and behavior
## Risk Assessment
- Detection probability estimation
- Attribution risk evaluation
- Operational security threats
- Incident response impact
## Deployment Decision Matrix
```
Category | Status | Severity | Blocker
----------------|--------|----------|--------
Signatures | P/F | H/M/L | Y/N
Behavior | P/F | H/M/L | Y/N
Attribution | P/F | H/M/L | Y/N
Functionality | P/F | H/M/L | Y/N
```
## Final Recommendation
- **GO/NO-GO/CONDITIONAL** deployment decision
- Required fixes before deployment
- Risk acceptance considerations
- Monitoring requirements post-deployment
Provide clear deployment recommendation with supporting rationale."""
if __name__ == "__main__":
mcp.serve(host="0.0.0.0", port=50051)
+151
View File
@@ -0,0 +1,151 @@
# GrumpyCats - LitterBox Malware Analysis Clients
[![Python](https://img.shields.io/badge/Python-3.11+-blue.svg)](https://www.python.org/downloads/)
[![License](https://img.shields.io/badge/license-GPL%20v3-green.svg)]()
A comprehensive toolkit for interacting with LitterBox malware analysis sandbox, featuring a standalone Python client and an MCP server for LLM-assisted analysis.
---
## grumpycat.py
**A Python client for interacting with a LitterBox malware analysis sandbox API.**
### Requirements
```bash
pip install requests
```
* NOTE: Install it globaly on the system
### Usage
```bash
python grumpycat.py [GLOBAL_OPTIONS] <command> [COMMAND_OPTIONS]
```
```
LitterBox Malware Analysis Client
positional arguments:
{upload,analyze-pid,results,files,doppelganger-scan,doppelganger,doppelganger-db,cleanup,health,delete}
Command to execute
upload Upload file for analysis
analyze-pid Analyze running process
results Get analysis results
files Get summary of all analyzed files
doppelganger-scan Run doppelganger system scan
doppelganger Run doppelganger analysis
doppelganger-db Create doppelganger fuzzy database
cleanup Clean up analysis artifacts
health Check service health
delete Delete file and its results
options:
-h, --help show this help message and exit
--debug Enable debug logging
--url URL LitterBox server URL
--timeout TIMEOUT Request timeout in seconds
--no-verify-ssl Disable SSL verification
--proxy PROXY Proxy URL (e.g., http://proxy:8080)
```
## Examples
```
# Upload and analyze a file
grumpycat.py upload malware.exe --analysis static dynamic
# Analyze a running process
grumpycat.py analyze-pid 1234 --wait
# Run Doppelganger scan
grumpycat.py doppelganger-scan --type blender
# Run Doppelganger analysis
grumpycat.py doppelganger abc123def --type fuzzy
# Create fuzzy hash database
grumpycat.py doppelganger-db --folder /path/to/files --extensions .exe .dll
# Get analysis results
grumpycat.py results abc123def --type static
# Clean up analysis artifacts
grumpycat.py cleanup --all
```
---
## LitterBoxMCP.py
**A MCP server that wrap grumpycat.py to intercat with LitterBox server.**
### Requirements
| Requirement | Installation |
|-------------|--------------|
| **Claude Desktop** | [Download](https://claude.ai/desktop) |
| **fastmcp** | `pip install fastmcp` |
| **mcp-server** | `pip install mcp-server` |
| **requests** | `pip install requests` |
| **uv** | `powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 \| iex"` |
| **grumpycat.py** | Place in same directory |
### Setup
1. **Install all requirements**
2. **Install LitterBoxMCP in Claude Desktop:**
```bash
mcp install .\LitterBoxMCP.py
```
**Expected output:**
```
[05/16/25 02:47:13] INFO Added server 'LitterBoxMCP' to Claude config claude.py:143
INFO Successfully installed LitterBoxMCP in Claude app
```
### Core Analysis Tools
| Tool | Description |
|------|-------------|
| `upload_payload(path, name=None)` | Upload payload and get hash for analysis |
| `analyze_static(file_hash)` | Run static analysis - check YARA signatures and file characteristics |
| `analyze_dynamic(target, cmd_args=None)` | Run dynamic analysis - test behavioral detection and runtime artifacts |
| `get_file_info(file_hash)` | Get file metadata, entropy, and PE information |
| `get_static_results(file_hash)` | Get detailed static analysis results |
| `get_dynamic_results(target)` | Get detailed dynamic analysis results |
### Utility Tools
| Tool | Description |
|------|-------------|
| `list_payloads()` | Get summary of all tested payloads |
| `validate_pid(pid)` | Validate process ID before dynamic analysis |
| `cleanup()` | Remove all testing artifacts from sandbox |
| `health_check()` | Verify sandbox tools are operational |
| `delete_payload(file_hash)` | Remove payload and all analysis results |
### OPSEC-Focused Prompts
| Prompt | Purpose |
|--------|---------|
| `analyze_detection_patterns(file_hash="")` | Analyze what's getting detected and why - YARA rules, entropy, behavioral patterns |
| `assess_evasion_effectiveness(file_hash="")` | Evaluate signature and behavioral evasion success rates |
| `analyze_opsec_violations(file_hash="")` | Identify attribution risks and operational security violations |
| `generate_improvement_plan(file_hash="")` | Create prioritized roadmap for payload enhancement |
| `evaluate_deployment_readiness(file_hash="")` | Assess if payload is ready for operational deployment |
### Key Features
- **Robust Error Handling** - Detailed status messages for API errors
- **OPSEC Focus** - Detection evasion, signature bypassing, and attribution avoidance
- **Actionable Intelligence** - Specific recommendations for improving payload stealth
- **Comprehensive Analysis** - Static signatures, dynamic behavior, and operational security
## Claude prompts example
![LitterBoxMCP](../Screenshots/LitterBoxMCP.mp4)
+776
View File
@@ -0,0 +1,776 @@
import argparse
import hashlib
import json
import logging
import os
import requests
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional, Dict, List, Union, BinaryIO, Any, Tuple
from requests.adapters import HTTPAdapter, Retry
from urllib.parse import urljoin
class LitterBoxError(Exception):
"""Base exception for LitterBox client errors"""
pass
class LitterBoxAPIError(LitterBoxError):
"""Exception for API-related errors"""
def __init__(self, message: str, status_code: Optional[int] = None, response: Optional[Dict] = None):
super().__init__(message)
self.status_code = status_code
self.response = response
class LitterBoxClient:
"""A Python client for interacting with the LitterBox malware analysis sandbox API."""
def __init__(self,
base_url: str = "http://127.0.0.1:1337",
timeout: int = 120,
max_retries: int = 3,
verify_ssl: bool = True,
logger: Optional[logging.Logger] = None,
proxy_config: Optional[Dict] = None,
headers: Optional[Dict] = None):
"""Initialize the LitterBox client.
Args:
base_url: The base URL of the LitterBox server
timeout: Request timeout in seconds
max_retries: Maximum number of retries for failed requests
verify_ssl: Whether to verify SSL certificates
logger: Custom logger instance
proxy_config: Proxy configuration dictionary (e.g., {"http": "http://proxy:8080"})
headers: Additional headers to include in requests
"""
self.base_url = base_url.rstrip('/')
self.timeout = timeout
self.verify_ssl = verify_ssl
self.logger = logger or logging.getLogger(__name__)
self.proxy_config = proxy_config
self.headers = headers or {}
# Configure session with retries
self.session = requests.Session()
retry_strategy = Retry(
total=max_retries,
backoff_factor=0.5,
status_forcelist=[500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "POST", "PUT", "DELETE", "OPTIONS", "TRACE"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
if proxy_config:
self.session.proxies.update(proxy_config)
if not verify_ssl:
self.session.verify = False
self.session.headers.update(self.headers)
def _make_request(self,
method: str,
endpoint: str,
**kwargs) -> requests.Response:
"""Make HTTP request with error handling.
Args:
method: HTTP method
endpoint: API endpoint
**kwargs: Additional arguments for requests
Returns:
Response object
Raises:
LitterBoxAPIError: If the API returns an error
LitterBoxError: For other client errors
"""
url = urljoin(self.base_url, endpoint)
try:
kwargs.setdefault('timeout', self.timeout)
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
return response
except requests.exceptions.HTTPError as e:
try:
error_data = response.json()
except:
error_data = {'error': response.text}
raise LitterBoxAPIError(
f"API error: {error_data.get('error', 'Unknown error')}",
status_code=response.status_code,
response=error_data
)
except requests.exceptions.RequestException as e:
raise LitterBoxError(f"Request failed: {str(e)}")
def upload_file(self,
file_path: Union[str, Path, BinaryIO],
file_name: Optional[str] = None) -> Dict:
"""Upload a file for analysis.
Args:
file_path: Path to the file or file-like object
file_name: Optional name to use for the file
Returns:
Dict containing upload status and file information
"""
if isinstance(file_path, (str, Path)):
path = Path(file_path)
if not path.exists():
raise LitterBoxError(f"File not found: {path}")
files = {'file': (file_name or path.name, open(path, 'rb'), 'application/octet-stream')}
else:
if not file_name:
raise ValueError("file_name is required when uploading file-like objects")
files = {'file': (file_name, file_path, 'application/octet-stream')}
response = self._make_request('POST', '/upload', files=files)
return response.json()
def analyze_file(self,
target: str,
analysis_type: str,
cmd_args: Optional[List[str]] = None,
wait_for_completion: bool = True) -> Dict:
"""Run analysis on a file or process.
Args:
target: File hash or PID
analysis_type: 'static' or 'dynamic'
cmd_args: Optional command line arguments for dynamic analysis
wait_for_completion: Whether to wait for analysis completion
Returns:
Dict containing analysis results
"""
# Initial validations
if analysis_type not in ['static', 'dynamic']:
raise ValueError("analysis_type must be either 'static' or 'dynamic'")
# For dynamic analysis with PID, validate first
if analysis_type == 'dynamic' and target.isdigit():
self.validate_process(target)
# For static analysis, cannot use PID
if analysis_type == 'static' and target.isdigit():
raise ValueError("Cannot perform static analysis on PID")
# Prepare request
params = {'wait': '1' if wait_for_completion else '0'}
# Validate and prepare command line arguments
data = {}
if cmd_args is not None:
if not isinstance(cmd_args, list):
raise ValueError("Arguments must be provided as a list")
if not all(isinstance(arg, str) for arg in cmd_args):
raise ValueError("All arguments must be strings")
if any(any(char in arg for char in [';', '&', '|']) for arg in cmd_args):
raise ValueError("Invalid argument characters detected")
data['args'] = cmd_args
# Make the analysis request
response = self._make_request(
'POST',
f'/analyze/{analysis_type}/{target}',
params=params,
json=data
)
# Handle early termination case (202 status)
if response.status_code == 202:
result = response.json()
return {
'status': 'early_termination',
'error': result.get('error', {}).get('message', 'Process terminated early'),
'details': {
'termination_time': result.get('error', {}).get('termination_time'),
'init_time': result.get('error', {}).get('init_time'),
'message': result.get('error', {}).get('details')
}
}
# Handle normal response
result = response.json()
# If error occurred during analysis
if result.get('status') == 'error':
return {
'status': 'error',
'error': result.get('error', {}).get('message', 'Analysis failed'),
'details': result.get('error', {}).get('details')
}
# Success case
return {
'status': 'success',
'results': result.get('results', {})
}
"""Run analysis on a file or process.
Args:
target: File hash or PID
analysis_type: 'static' or 'dynamic'
cmd_args: Optional command line arguments for dynamic analysis
wait_for_completion: Whether to wait for analysis completion
Returns:
Dict containing:
- status: 'success', 'error', or 'early_termination'
- results: Analysis results (on success)
- error: Error message (on error)
- details: Additional error details (on error/early termination)
Raises:
ValueError: If analysis type is invalid
LitterBoxAPIError: If API returns an error
"""
if analysis_type not in ['static', 'dynamic']:
raise ValueError("analysis_type must be either 'static' or 'dynamic'")
# Validate command line arguments if provided
data = {}
if cmd_args is not None:
if not isinstance(cmd_args, list):
raise ValueError("Arguments must be provided as a list")
if not all(isinstance(arg, str) for arg in cmd_args):
raise ValueError("All arguments must be strings")
if any(any(char in arg for char in [';', '&', '|']) for arg in cmd_args):
raise ValueError("Invalid argument characters detected")
data['args'] = cmd_args
params = {'wait': '1' if wait_for_completion else '0'}
response = self._make_request(
'POST',
f'/analyze/{analysis_type}/{target}',
params=params,
json=data
)
if response.status_code == 202: # Early termination
return response.json()
result = response.json()
if result.get('status') == 'error':
error_info = result.get('error', {})
if isinstance(error_info, str):
error_info = {'message': error_info}
return {
'status': 'error',
'error': error_info.get('message', 'Analysis failed'),
'details': error_info.get('details')
}
return result
"""Run analysis on a file or process.
Args:
target: File hash or PID
analysis_type: 'static' or 'dynamic'
cmd_args: Optional command line arguments for dynamic analysis
wait_for_completion: Whether to wait for analysis completion
Returns:
Dict containing analysis results
"""
if analysis_type not in ['static', 'dynamic']:
raise ValueError("analysis_type must be either 'static' or 'dynamic'")
# For non-PID targets, verify the file exists first
if not str(target).isdigit() and verify_file:
try:
# Check if file exists by attempting to get its info
self._make_request('GET', f'/api/results/{target}/info')
except LitterBoxAPIError as e:
if e.status_code == 404:
raise LitterBoxError(f"File {target} not found or not yet available")
# Validate PID for dynamic analysis
if analysis_type == 'dynamic' and str(target).isdigit():
self.validate_process(target)
elif analysis_type == 'static' and str(target).isdigit():
raise ValueError("Cannot perform static analysis on PID")
params = {'wait': '1' if wait_for_completion else '0'}
data = {}
if cmd_args is not None:
# Validate command line arguments
if not isinstance(cmd_args, list):
raise ValueError("cmd_args must be provided as a list")
if not all(isinstance(arg, str) for arg in cmd_args):
raise ValueError("All arguments must be strings")
if any(any(char in arg for char in [';', '&', '|']) for arg in cmd_args):
raise ValueError("Invalid argument characters detected")
data['args'] = cmd_args
response = self._make_request(
'POST',
f'/analyze/{analysis_type}/{target}',
params=params,
json=data
)
result = response.json()
# Handle early termination case for dynamic analysis
if result.get('status') == 'early_termination':
return {
'status': 'early_termination',
'error': result.get('error', {}).get('message', 'Process terminated early'),
'details': {
'termination_time': result.get('error', {}).get('termination_time'),
'init_time': result.get('error', {}).get('init_time'),
'message': result.get('error', {}).get('details')
}
}
return result
def get_results(self,
target: str,
analysis_type: str) -> Dict:
"""Get results for a specific analysis.
Args:
target: File hash or PID
analysis_type: 'static', 'dynamic', or 'info'
Returns:
Dict containing analysis results
"""
if analysis_type not in ['static', 'dynamic', 'info']:
raise ValueError("analysis_type must be one of: 'static', 'dynamic', 'info'")
response = self._make_request(
'GET',
f'/api/results/{target}/{analysis_type}'
)
return response.json()
def get_files_summary(self) -> Dict:
"""Get summary of all analyzed files and processes.
Returns:
Dict containing analysis summaries
"""
response = self._make_request('GET', '/files')
return response.json()
def run_blender_scan(self) -> Dict:
"""Run a system-wide Blender scan.
Returns:
Dict containing scan results
"""
data = {"operation": "scan"}
response = self._make_request('POST', '/blender', json=data)
return response.json()
def compare_with_blender(self,
file_hash: str) -> Dict:
"""Compare a file's analysis results with current system state.
Args:
file_hash: Hash of the file to compare
Returns:
Dict containing comparison results
"""
params = {'hash': file_hash}
response = self._make_request('GET', '/blender', params=params)
return response.json()
def cleanup(self,
include_uploads: bool = True,
include_results: bool = True,
include_analysis: bool = True) -> Dict:
"""Clean up analysis artifacts and uploaded files.
Args:
include_uploads: Whether to clean upload directory
include_results: Whether to clean results directory
include_analysis: Whether to clean analysis artifacts
Returns:
Dict containing cleanup results
"""
data = {
'cleanup_uploads': include_uploads,
'cleanup_results': include_results,
'cleanup_analysis': include_analysis
}
response = self._make_request('POST', '/cleanup', json=data)
return response.json()
def check_health(self) -> Dict:
"""Check the health status of the LitterBox service.
Returns:
Dict containing health status information
"""
response = self._make_request('GET', '/health')
return response.json()
def delete_file(self,
file_hash: str) -> Dict:
"""Delete a file and its analysis results.
Args:
file_hash: Hash of the file to delete
Returns:
Dict containing deletion status
"""
response = self._make_request('DELETE', f'/file/{file_hash}')
return response.json()
def validate_process(self, pid: Union[str, int]) -> Dict:
"""Validate if a process ID exists and is accessible.
Args:
pid: Process ID to validate
Returns:
Dict containing validation status
"""
response = self._make_request('POST', f'/validate/{pid}')
return response.json()
def analyze_with_doppelganger(self,
analysis_type: str,
operation: str,
file_hash: Optional[str] = None,
folder_path: Optional[str] = None,
extensions: Optional[List[str]] = None,
threshold: int = 1) -> Dict:
"""Unified method for doppelganger analysis operations.
Args:
analysis_type: 'blender' or 'fuzzy'
operation: 'scan', 'create_db', or 'analyze'
file_hash: Hash of file to analyze (for analyze operation)
folder_path: Path to folder (for create_db operation)
extensions: List of file extensions (for create_db operation)
threshold: Similarity threshold (for fuzzy analysis)
Returns:
Dict containing analysis results
"""
if analysis_type not in ['blender', 'fuzzy']:
raise ValueError("analysis_type must be either 'blender' or 'fuzzy'")
if operation == 'scan' and analysis_type != 'blender':
raise ValueError("scan operation is only available for blender analysis")
# For GET requests (comparisons)
if file_hash and operation != 'analyze':
params = {'type': analysis_type, 'hash': file_hash}
response = self._make_request('GET', '/doppelganger', params=params)
return response.json()
# For POST requests
data = {
'type': analysis_type,
'operation': operation
}
if operation == 'create_db':
if not folder_path:
raise ValueError("folder_path is required for create_db operation")
data['folder_path'] = folder_path
if extensions:
data['extensions'] = extensions
elif operation == 'analyze':
if not file_hash:
raise ValueError("file_hash is required for analyze operation")
data['hash'] = file_hash
data['threshold'] = threshold
response = self._make_request('POST', '/doppelganger', json=data)
return response.json()
def run_system_scan(self) -> Dict:
"""Run a system-wide scan using doppelganger blender analysis."""
return self.analyze_with_doppelganger('blender', 'scan')
def compare_against_system(self, file_hash: str, analysis_type: str = 'blender') -> Dict:
"""Compare a file against system state using doppelganger."""
return self.analyze_with_doppelganger(analysis_type, 'compare', file_hash=file_hash)
def create_fuzzy_db(self, folder_path: str, extensions: Optional[List[str]] = None) -> Dict:
"""Create fuzzy hash database using doppelganger."""
return self.analyze_with_doppelganger('fuzzy', 'create_db',
folder_path=folder_path,
extensions=extensions)
def analyze_fuzzy(self, file_hash: str, threshold: int = 1) -> Dict:
"""Analyze a file using fuzzy hash comparison."""
return self.analyze_with_doppelganger('fuzzy', 'analyze',
file_hash=file_hash,
threshold=threshold)
def __enter__(self):
"""Support for context manager protocol."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Clean up resources when exiting context manager."""
self.session.close()
def create_arg_parser():
"""Create and configure the argument parser for command line usage."""
parser = argparse.ArgumentParser(
description="LitterBox Malware Analysis Client",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Upload and analyze a file
%(prog)s upload malware.exe --analysis static dynamic
# Analyze a running process
%(prog)s analyze-pid 1234 --wait
# Run Doppelganger scan
%(prog)s doppelganger-scan --type blender
# Run Doppelganger analysis
%(prog)s doppelganger abc123def --type fuzzy
# Create fuzzy hash database
%(prog)s doppelganger-db --folder /path/to/files --extensions .exe .dll
# Get analysis results
%(prog)s results abc123def --type static
# Clean up analysis artifacts
%(prog)s cleanup --all
"""
)
# General options
parser.add_argument('--debug', action='store_true', help='Enable debug logging')
parser.add_argument('--url', default='http://127.0.0.1:1337', help='LitterBox server URL')
parser.add_argument('--timeout', type=int, default=30, help='Request timeout in seconds')
parser.add_argument('--no-verify-ssl', action='store_true', help='Disable SSL verification')
parser.add_argument('--proxy', help='Proxy URL (e.g., http://proxy:8080)')
subparsers = parser.add_subparsers(dest='command', help='Command to execute')
# Upload command
upload_parser = subparsers.add_parser('upload', help='Upload file for analysis')
upload_parser.add_argument('file', help='File to upload')
upload_parser.add_argument('--name', help='Custom name for the file')
upload_parser.add_argument('--analysis', nargs='+', choices=['static', 'dynamic'],
help='Run analysis after upload')
upload_parser.add_argument('--args', nargs='+', help='Command line arguments for dynamic analysis')
# Analyze PID command
analyze_pid_parser = subparsers.add_parser('analyze-pid', help='Analyze running process')
analyze_pid_parser.add_argument('pid', type=int, help='Process ID to analyze')
analyze_pid_parser.add_argument('--wait', action='store_true',
help='Wait for analysis completion')
analyze_pid_parser.add_argument('--args', nargs='+', help='Command line arguments')
# Results command
results_parser = subparsers.add_parser('results', help='Get analysis results')
results_parser.add_argument('target', help='File hash or PID')
results_parser.add_argument('--type', choices=['static', 'dynamic', 'info'],
required=True, help='Type of results to retrieve')
# Files summary command
subparsers.add_parser('files', help='Get summary of all analyzed files')
# Doppelganger scan command
doppelganger_scan_parser = subparsers.add_parser('doppelganger-scan', help='Run doppelganger system scan')
doppelganger_scan_parser.add_argument('--type', choices=['blender', 'fuzzy'], default='blender',
help='Type of scan to perform')
# Doppelganger analyze command
doppelganger_parser = subparsers.add_parser('doppelganger', help='Run doppelganger analysis')
doppelganger_parser.add_argument('hash', help='File hash to analyze')
doppelganger_parser.add_argument('--type', choices=['blender', 'fuzzy'], required=True,
help='Type of analysis to perform')
doppelganger_parser.add_argument('--threshold', type=int, default=1,
help='Similarity threshold for fuzzy analysis')
# Doppelganger database command
db_parser = subparsers.add_parser('doppelganger-db', help='Create doppelganger fuzzy database')
db_parser.add_argument('--folder', required=True, help='Folder path to process')
db_parser.add_argument('--extensions', nargs='+', help='File extensions to include')
# Cleanup command
cleanup_parser = subparsers.add_parser('cleanup', help='Clean up analysis artifacts')
cleanup_parser.add_argument('--all', action='store_true',
help='Clean all artifacts')
cleanup_parser.add_argument('--uploads', action='store_true',
help='Clean upload directory')
cleanup_parser.add_argument('--results', action='store_true',
help='Clean results directory')
cleanup_parser.add_argument('--analysis', action='store_true',
help='Clean analysis artifacts')
# Health check command
subparsers.add_parser('health', help='Check service health')
# Delete command
delete_parser = subparsers.add_parser('delete', help='Delete file and its results')
delete_parser.add_argument('hash', help='File hash to delete')
return parser
def main():
parser = create_arg_parser()
args = parser.parse_args()
# Configure logging
log_level = logging.DEBUG if args.debug else logging.INFO
logging.basicConfig(level=log_level,
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger('litterbox')
# Create client with CLI arguments
client_kwargs = {
'base_url': args.url,
'timeout': args.timeout,
'verify_ssl': not args.no_verify_ssl,
'logger': logger,
}
if args.proxy:
client_kwargs['proxy_config'] = {
'http': args.proxy,
'https': args.proxy
}
try:
client = LitterBoxClient(**client_kwargs)
if args.command == 'upload':
# Upload file
result = client.upload_file(args.file, file_name=args.name)
file_hash = result['file_info']['md5']
print(f"File uploaded successfully. Hash: {file_hash}")
# Run requested analysis
if args.analysis:
for analysis_type in args.analysis:
print(f"Running {analysis_type} analysis...")
analysis_args = args.args if analysis_type == 'dynamic' else None
result = client.analyze_file(
file_hash,
analysis_type,
cmd_args=analysis_args,
wait_for_completion=True
)
if result.get('status') == 'early_termination':
print("Process terminated early:")
print(f"Error: {result.get('error')}")
print("Details:")
print(f" Termination time: {result['details'].get('termination_time')}")
print(f" Init time: {result['details'].get('init_time')}")
print(f" Message: {result['details'].get('message')}")
elif result.get('status') == 'error':
print(f"Analysis failed: {result.get('error')}")
if 'details' in result:
print(f"Details: {result['details']}")
else:
print(json.dumps(result, indent=2))
elif args.command == 'analyze-pid':
# Analyze process
print(f"Analyzing process {args.pid}...")
result = client.analyze_file(
str(args.pid),
'dynamic',
cmd_args=args.args,
wait_for_completion=args.wait
)
print(json.dumps(result, indent=2))
elif args.command == 'results':
result = client.get_results(args.target, args.type)
print(json.dumps(result, indent=2))
elif args.command == 'files':
result = client.get_files_summary()
print(json.dumps(result, indent=2))
elif args.command == 'doppelganger-scan':
print(f"Running doppelganger scan with type: {args.type}")
result = client.analyze_with_doppelganger(args.type, 'scan')
print(json.dumps(result, indent=2))
elif args.command == 'doppelganger':
print(f"Running doppelganger analysis with type: {args.type}")
result = client.analyze_with_doppelganger(
args.type,
'analyze',
file_hash=args.hash,
threshold=args.threshold
)
print(json.dumps(result, indent=2))
elif args.command == 'doppelganger-db':
print("Creating doppelganger fuzzy database...")
result = client.analyze_with_doppelganger(
'fuzzy',
'create_db',
folder_path=args.folder,
extensions=args.extensions
)
print(json.dumps(result, indent=2))
elif args.command == 'cleanup':
if args.all:
args.uploads = args.results = args.analysis = True
result = client.cleanup(
include_uploads=args.uploads,
include_results=args.results,
include_analysis=args.analysis
)
print(json.dumps(result, indent=2))
elif args.command == 'health':
result = client.check_health()
print(json.dumps(result, indent=2))
elif args.command == 'delete':
result = client.delete_file(args.hash)
print(json.dumps(result, indent=2))
else:
parser.print_help()
except LitterBoxAPIError as e:
logger.error(f"API Error (Status {e.status_code}): {str(e)}")
if args.debug:
logger.debug(f"Response data: {e.response}")
sys.exit(1)
except LitterBoxError as e:
logger.error(f"Client Error: {str(e)}")
sys.exit(1)
except Exception as e:
logger.error(f"Unexpected Error: {str(e)}")
if args.debug:
logger.exception("Detailed error information:")
sys.exit(1)
finally:
client.session.close()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\nOperation cancelled by user")
sys.exit(130)
+10
View File
@@ -189,6 +189,16 @@ The `config.yml` file controls:
- YARA rule locations
- Analysis timeouts and limits
## Python Clients
For programmatic access to LitterBox APIs, use the **GrumpyCats** client library:
**[GrumpyCats README](GrumpyCats/README.md)**
**grumpycat.py** - Python client for LitterBox API interaction
**LitterBoxMCP.py** - MCP server for LLM-assisted malware analysis
---
## SECURITY WARNINGS
Binary file not shown.