Files
litterbox/GrumpyCats/LitterBoxMCP.py
T
BlackSnufkin fb52b1432e Add Fibratus EDR profile + dashboard cache + GrumpyCats package split
Fibratus EDR profile (kind: fibratus). Pull-from-event-log model, same
shape DetonatorAgent's FibratusEdrPlugin.cs uses: operator configures
Fibratus on the EDR VM with alertsenders.eventlog: {enabled: true,
format: json}; rule matches land in the Application log. Whiskers gains
GET /api/alerts/fibratus/since which wevtutil-queries the log,
extracts <TimeCreated SystemTime> + <EventID> + <Data>, ships the raw
JSON blobs back. The new FibratusEdrAnalyzer mirrors Elastic's
two-phase shape — Phase 1 exec, Phase 2 polls Whiskers — and normalizes
Fibratus's actual schema (events[].proc.{name,exe,cmdline,parent_name,
parent_cmdline,ancestors} + bare tactic.id/technique.id/subtechnique.id
labels) into the saved-view renderer's dict.

Whiskers /api/info now reports telemetry_sources: ['fibratus'] when
fibratus.exe is at C:\Program Files\Fibratus\Bin\, so the
orchestrator can preflight before dispatching. wevtutil's single-quoted
attribute output is parsed correctly.

Dashboard reachability cache (services.edr_health). 30s TTL +
background poller every 15s. Per-probe timeouts dropped 4s/5s -> 2s.
First load post-boot waits at most one probe cycle; every subsequent
load <5ms (cache hit).

GrumpyCats package split: 1085-line monolith into:
  grumpycat.py      — orchestrator (14 lines)
  cli/              — parser, handlers, runner
  litterbox_client/ — base + per-domain mixins (files, analysis,
                       doppelganger, results, edr, reports, system)
                       composed into LitterBoxClient.
LitterBoxMCP.py rewires its one import. New CLI subcommand
fibratus-alerts and matching MCP tool fibratus_alerts_since pull
Fibratus alerts via a LitterBox passthrough endpoint
(/api/edr/fibratus/<profile>/alerts/since) for wire-checking the agent
without dispatching a payload.

CHANGELOG updated.
2026-04-30 05:28:54 -07:00

445 lines
18 KiB
Python

"""LitterBox MCP server.
Exposes the LitterBox payload-analysis sandbox to MCP clients (Claude Desktop,
Cursor, etc.) so an LLM can drive uploads, analysis, result retrieval, and
report generation, plus a small set of OPSEC-review prompts.
Defaults to stdio transport (which is what Claude Desktop and most local
clients use). For a remote MCP server, run with `--transport streamable-http`.
"""
import argparse
import asyncio
import logging
import sys
from pathlib import Path
from typing import Annotated, List, Optional
# Make the sibling grumpycat module importable regardless of CWD.
sys.path.insert(0, str(Path(__file__).parent))
from mcp.server.fastmcp import FastMCP
from pydantic import Field
from litterbox_client import LitterBoxClient
# CRITICAL: stdio transport speaks JSON-RPC over stdout. Logs MUST go to
# stderr or they corrupt the protocol stream and break the connection.
logging.basicConfig(
level=logging.INFO,
stream=sys.stderr,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("litterbox-mcp")
mcp = FastMCP(
name="LitterBox",
instructions=(
"Tools for the LitterBox payload-analysis sandbox: upload payloads / drivers, "
"run static / dynamic / EDR (Whiskers + Elastic Defend) analysis, retrieve "
"results, and generate reports. Also exposes system health (registered EDR "
"agent reachability, configured scanner inventory). Use the prompts for "
"OPSEC review of analysis output. Tool exceptions are surfaced to the client "
"by FastMCP automatically — do not wrap returns in success / error envelopes."
),
)
# One client for the lifetime of the server. The grumpycat client maintains
# its own connection pool via requests.Session.
client = LitterBoxClient(base_url="http://127.0.0.1:1337", logger=logger)
async def _call(fn, *args, **kwargs):
"""Run a sync grumpycat call in a worker thread without blocking the event loop."""
return await asyncio.to_thread(fn, *args, **kwargs)
# =============================================================================
# Intake — upload files and kick off analysis
# =============================================================================
@mcp.tool()
async def upload_payload(
path: Annotated[str, Field(description="Local path to the payload file (visible to the LitterBox server's filesystem).")],
name: Annotated[Optional[str], Field(description="Optional override for the uploaded filename.")] = None,
) -> dict:
"""Upload a payload (.exe / .dll / .bin / .lnk / .docx / .xlsx) for later analysis.
Returns the file's MD5 hash and basic metadata. Use the hash with the
`analyze_*` and `get_*` tools.
"""
return await _call(client.upload_file, path, file_name=name)
@mcp.tool()
async def upload_driver(
path: Annotated[str, Field(description="Local path to a .sys kernel driver.")],
name: Annotated[Optional[str], Field(description="Optional override for the uploaded filename.")] = None,
run_holygrail: Annotated[bool, Field(description="Run HolyGrail BYOVD analysis right after upload.")] = True,
) -> dict:
"""Upload a kernel driver and (by default) immediately run BYOVD analysis."""
return await _call(
client.upload_and_analyze_driver, path, file_name=name, run_holygrail=run_holygrail
)
@mcp.tool()
async def analyze_static(
file_hash: Annotated[str, Field(description="MD5 hash returned by an earlier upload.")],
wait: Annotated[bool, Field(description="Block until the scan finishes (recommended).")] = True,
) -> dict:
"""Run static analysis on an uploaded file (YARA / CheckPlz / Stringnalyzer)."""
return await _call(client.analyze_file, file_hash, "static", wait_for_completion=wait)
@mcp.tool()
async def analyze_dynamic(
target: Annotated[str, Field(description="MD5 hash of an uploaded file OR a numeric PID for a running process.")],
cmd_args: Annotated[Optional[List[str]], Field(description="Command-line arguments passed to the payload.")] = None,
wait: Annotated[bool, Field(description="Block until the scan finishes (recommended).")] = True,
) -> dict:
"""Run dynamic analysis (in-memory YARA, PE-Sieve, Moneta, Patriot, HSB, RedEdr).
Note: dynamic analysis EXECUTES the payload. Confirm with the user first.
"""
return await _call(
client.analyze_file, target, "dynamic", cmd_args=cmd_args, wait_for_completion=wait
)
@mcp.tool()
async def analyze_holygrail(
file_hash: Annotated[str, Field(description="MD5 of an uploaded .sys driver.")],
wait: Annotated[bool, Field(description="Block until the scan finishes (recommended).")] = True,
) -> dict:
"""Run HolyGrail BYOVD analysis on a kernel driver."""
return await _call(client.analyze_holygrail, file_hash, wait_for_completion=wait)
@mcp.tool()
async def validate_pid(
pid: Annotated[int, Field(description="OS-level process ID.")],
) -> dict:
"""Confirm a PID exists and is accessible before targeting it for dynamic analysis."""
return await _call(client.validate_process, pid)
# =============================================================================
# Retrieval — fetch analysis results and reports
# =============================================================================
@mcp.tool()
async def get_file_info(file_hash: str) -> dict:
"""File metadata: type, size, hashes, entropy, PE structure, sensitive imports."""
return await _call(client.get_file_info, file_hash)
@mcp.tool()
async def get_static_results(file_hash: str) -> dict:
"""Static analysis output (YARA matches, CheckPlz findings, Stringnalyzer indicators)."""
return await _call(client.get_static_results, file_hash)
@mcp.tool()
async def get_dynamic_results(target: str) -> dict:
"""Dynamic analysis output (memory scanners, behavioral telemetry, process output)."""
return await _call(client.get_dynamic_results, target)
@mcp.tool()
async def get_holygrail_results(file_hash: str) -> dict:
"""HolyGrail BYOVD output for a driver (LOLDrivers / block status / critical imports)."""
return await _call(client.get_holygrail_results, file_hash)
@mcp.tool()
async def get_risk_assessment(target: str) -> dict:
"""Computed detection assessment: numerical score, level (Low / Medium / High / Critical), triggering indicators."""
return await _call(client.get_risk_assessment, target)
@mcp.tool()
async def get_comprehensive_results(target: str) -> dict:
"""All available results in one parallel call (file_info + static + dynamic + holygrail)."""
return await _call(client.get_comprehensive_results, target)
@mcp.tool()
async def get_report(target: str) -> str:
"""Render the full HTML analysis report and return it inline as a string."""
return await _call(client.get_report, target)
@mcp.tool()
async def download_report(
target: str,
output_path: Annotated[Optional[str], Field(description="Directory or full path to save the .html. Defaults to current directory.")] = None,
) -> dict:
"""Download the HTML report to disk and return the saved path."""
saved = await _call(client.download_report, target, output_path)
return {"saved_to": saved}
# =============================================================================
# EDR — Whiskers agent + Elastic Defend correlation
# =============================================================================
@mcp.tool()
async def list_edr_profiles() -> dict:
"""List EDR profiles registered under Config/edr_profiles/."""
return await _call(client.list_edr_profiles)
@mcp.tool()
async def get_edr_agents_status() -> dict:
"""Live probe of every EDR profile (Whiskers agent + Elastic stack reachability,
hostname, agent version, lock state, cluster info)."""
return await _call(client.get_edr_agents_status)
@mcp.tool()
async def analyze_edr(
file_hash: Annotated[str, Field(description="MD5 hash of an uploaded file.")],
profile: Annotated[str, Field(description="EDR profile name (matches Config/edr_profiles/<name>.yml).")],
cmd_args: Annotated[Optional[List[str]], Field(description="Command-line arguments passed to the payload.")] = None,
xor_key: Annotated[Optional[int], Field(description="Single byte (0-255) to XOR-encode the payload in transit (anti-AV).", ge=0, le=255)] = None,
wait: Annotated[bool, Field(description="Block until Phase-2 (Elastic alert correlation) settles.")] = True,
timeout: Annotated[float, Field(description="Phase-2 wait timeout in seconds.", ge=10, le=600)] = 180.0,
) -> dict:
"""Dispatch a payload to a registered EDR profile (Whiskers agent + Elastic Defend).
Note: this EXECUTES the payload on the EDR VM. Confirm with the user first.
Two-phase: Phase-1 (synchronous) handles agent dispatch + lock + execution; Phase-2
(server-side daemon) correlates Elastic alerts. With wait=True, the server returns
once Phase-2 settles or `timeout` elapses.
"""
phase1 = await _call(client.analyze_edr, file_hash, profile, cmd_args=cmd_args, xor_key=xor_key)
if not wait or (phase1 or {}).get('status') != 'polling_alerts':
return {'phase_1': phase1, 'phase_2': None}
phase2 = await _call(client.wait_for_edr_completion, file_hash, profile, 3.0, timeout)
return {'phase_1': phase1, 'phase_2': phase2}
@mcp.tool()
async def get_edr_results(
file_hash: Annotated[str, Field(description="MD5 hash of the analyzed payload.")],
profile: Annotated[str, Field(description="EDR profile name.")],
) -> dict:
"""Read saved EDR findings (alerts + execution logs + summary) for one profile."""
return await _call(client.get_edr_results, file_hash, profile)
@mcp.tool()
async def get_edr_index(
file_hash: Annotated[str, Field(description="MD5 hash of the analyzed payload.")],
) -> dict:
"""Index of every saved EDR run for a target (one entry per profile that has data)."""
return await _call(client.get_edr_index, file_hash)
@mcp.tool()
async def fibratus_alerts_since(
profile: Annotated[str, Field(description="Fibratus profile name (must be kind=fibratus).")],
since_iso: Annotated[str, Field(description="ISO8601 lower bound in UTC, e.g. '2026-04-30T00:00:00Z'.")],
until_iso: Annotated[Optional[str], Field(description="ISO8601 upper bound in UTC; defaults to now.")] = None,
) -> dict:
"""Test/debug: pull Fibratus rule-match alerts via Whiskers without dispatching a payload.
LitterBox proxies the request to the registered profile's Whiskers agent,
which `wevtutil`-queries the EDR VM's Application event log for
`Provider=Fibratus` records inside the time window. Useful right after
Fibratus is set up on a new VM to verify `alertsenders.eventlog` with
`format: json` is actually emitting alerts before running real payloads.
Returns `{supported, events: [{time_created, event_id, data}]}` —
`data` is the raw JSON Fibratus produced.
"""
return await _call(client.fibratus_alerts_since, profile, since_iso, until_iso)
# =============================================================================
# System health — local scanner inventory
# =============================================================================
@mcp.tool()
async def get_scanners_status() -> dict:
"""Inventory of configured local analyzers (static + dynamic + holygrail) and
whether their binaries are present on disk. Drives the dashboard panel and
is the right thing to call before a run if a scanner has been failing."""
return await _call(client.get_scanners_status)
# =============================================================================
# Doppelganger — comparison against host snapshot or fuzzy-hash baseline
# =============================================================================
@mcp.tool()
async def run_blender_scan() -> dict:
"""Snapshot the live host so Blender can compare payload runtime indicators against it."""
return await _call(client.run_blender_scan)
@mcp.tool()
async def compare_with_blender(file_hash: str) -> dict:
"""Compare a payload's runtime indicators against the host snapshot."""
return await _call(client.compare_with_blender, file_hash)
@mcp.tool()
async def analyze_fuzzy_similarity(
file_hash: str,
threshold: Annotated[int, Field(description="ssdeep similarity cutoff 0-100.", ge=0, le=100)] = 85,
) -> dict:
"""Score a payload's similarity to known offensive tools via fuzzy hashing."""
return await _call(client.analyze_with_fuzzy, file_hash, threshold)
@mcp.tool()
async def create_fuzzy_database(
folder_path: Annotated[str, Field(description="Directory of known-tool binaries to fingerprint.")],
extensions: Annotated[Optional[List[str]], Field(description="File extensions to include (e.g. ['.exe', '.dll']).")] = None,
) -> dict:
"""(Re)build the FuzzyHash baseline DB from a folder of reference binaries."""
return await _call(client.create_fuzzy_database, folder_path, extensions)
# =============================================================================
# Fleet — list / status / cleanup
# =============================================================================
@mcp.tool()
async def list_payloads() -> dict:
"""List every analyzed payload, driver, and process in the sandbox with detection summary."""
return await _call(client.get_files_summary)
@mcp.tool()
async def sandbox_status() -> dict:
"""Health, tool readiness, and fleet summary for the LitterBox server."""
return await _call(client.get_system_status)
@mcp.tool()
async def cleanup_sandbox(
include_uploads: bool = True,
include_results: bool = True,
include_analysis: bool = True,
) -> dict:
"""Wipe analysis artifacts. DESTRUCTIVE — confirm with the user before calling."""
return await _call(
client.cleanup,
include_uploads=include_uploads,
include_results=include_results,
include_analysis=include_analysis,
)
@mcp.tool()
async def delete_payload(file_hash: str) -> dict:
"""Delete one payload and its results. DESTRUCTIVE — confirm with the user before calling."""
return await _call(client.delete_file, file_hash)
# =============================================================================
# Prompts — OPSEC review templates
# Short, focused, data-first. Each prompt tells the LLM which tools to call,
# then asks targeted questions instead of dumping a wall of categories.
# =============================================================================
@mcp.prompt()
def detection_summary(file_hash: str) -> str:
"""Summarize what triggered detection in the analysis."""
return f"""Load `get_comprehensive_results("{file_hash}")` and summarize:
1. **YARA matches** — for each rule that fired, name it and the string / pattern that triggered it.
2. **Memory anomalies** — for each PE-Sieve / Moneta / Patriot / HSB indicator, identify the technique it maps to (e.g. private RWX → manual injection, modified PE header → unhooking).
3. **Behavioral telemetry** — flag anything in the RedEdr timeline atypical for an unmodified process (notable DLL loads, child processes, IOCTL traffic).
4. **Static signals** — entropy, packing, sensitive imports, attribution-bearing strings.
Be specific: cite rule names, region addresses, API names. Do not speculate beyond the data.
"""
@mcp.prompt()
def evasion_recommendations(file_hash: str) -> str:
"""Suggest concrete evasion improvements based on what the sandbox detected."""
return f"""Use `get_comprehensive_results("{file_hash}")` to read the detection output, then propose specific changes that would reduce each detection.
For each recommendation:
- **What was detected** — one line, citing the actual finding.
- **Why it triggers** — the underlying technique or signal.
- **Concrete change** — actionable (e.g. "encrypt the C2 URL string and decrypt at use site"), not generic ("obfuscate strings").
- **Trade-off** — operational cost or downside.
Order by detection impact, highest first. Skip generic advice.
"""
@mcp.prompt()
def attribution_check(file_hash: str) -> str:
"""Identify tool / framework attribution risk."""
return f"""Pull `analyze_fuzzy_similarity("{file_hash}")` and `compare_with_blender("{file_hash}")`, then assess:
1. **Tool similarity** — which known offensive tools is this closest to? Score + name.
2. **Framework fingerprints** — Cobalt Strike / Sliver / Havoc / Metasploit-style indicators in strings, behaviors, or memory artifacts.
3. **Compilation artifacts** — RDIs, build paths, debug info that ties this binary to a specific developer or environment.
Output 3-5 attribution vectors ranked by confidence, plus one mitigation per vector.
"""
@mcp.prompt()
def deployment_readiness(file_hash: str) -> str:
"""Go / Conditional / No-Go decision based on detection state."""
return f"""Read `get_comprehensive_results("{file_hash}")` and `get_risk_assessment("{file_hash}")`.
Render a single-line verdict: **GO** / **CONDITIONAL** / **NO-GO**.
Then for each of these criteria, state PASS / FAIL with a one-sentence reason cited from the data:
- Zero YARA matches.
- No memory anomalies (private RWX, modified PE headers, threads in non-image memory).
- Risk score below 40.
- No high-confidence attribution (fuzzy similarity < 70 to known tools).
If anything fails, list the specific blockers and what would have to change to convert each to a pass.
"""
# =============================================================================
# Entry point
# =============================================================================
def main():
parser = argparse.ArgumentParser(description="LitterBox MCP server.")
parser.add_argument(
"--transport",
choices=["stdio", "streamable-http"],
default="stdio",
help="MCP transport. 'stdio' for Claude Desktop / Cursor (default); "
"'streamable-http' for a remote MCP server.",
)
parser.add_argument(
"--host",
default="127.0.0.1",
help="Bind address for streamable-http (default 127.0.0.1; only set 0.0.0.0 deliberately).",
)
parser.add_argument(
"--port",
type=int,
default=8765,
help="Port for streamable-http (default 8765).",
)
args = parser.parse_args()
if args.transport == "stdio":
logger.info("Starting LitterBox MCP on stdio")
mcp.run(transport="stdio")
else:
mcp.settings.host = args.host
mcp.settings.port = args.port
logger.info("Starting LitterBox MCP on http://%s:%d/mcp", args.host, args.port)
mcp.run(transport="streamable-http")
if __name__ == "__main__":
main()