Files
BlackSnufkin fb52b1432e Add Fibratus EDR profile + dashboard cache + GrumpyCats package split
Fibratus EDR profile (kind: fibratus). Pull-from-event-log model, same
shape DetonatorAgent's FibratusEdrPlugin.cs uses: operator configures
Fibratus on the EDR VM with alertsenders.eventlog: {enabled: true,
format: json}; rule matches land in the Application log. Whiskers gains
GET /api/alerts/fibratus/since which wevtutil-queries the log,
extracts <TimeCreated SystemTime> + <EventID> + <Data>, ships the raw
JSON blobs back. The new FibratusEdrAnalyzer mirrors Elastic's
two-phase shape — Phase 1 exec, Phase 2 polls Whiskers — and normalizes
Fibratus's actual schema (events[].proc.{name,exe,cmdline,parent_name,
parent_cmdline,ancestors} + bare tactic.id/technique.id/subtechnique.id
labels) into the saved-view renderer's dict.

Whiskers /api/info now reports telemetry_sources: ['fibratus'] when
fibratus.exe is at C:\Program Files\Fibratus\Bin\, so the
orchestrator can preflight before dispatching. wevtutil's single-quoted
attribute output is parsed correctly.

Dashboard reachability cache (services.edr_health). 30s TTL +
background poller every 15s. Per-probe timeouts dropped 4s/5s -> 2s.
First load post-boot waits at most one probe cycle; every subsequent
load <5ms (cache hit).

GrumpyCats package split: 1085-line monolith into:
  grumpycat.py      — orchestrator (14 lines)
  cli/              — parser, handlers, runner
  litterbox_client/ — base + per-domain mixins (files, analysis,
                       doppelganger, results, edr, reports, system)
                       composed into LitterBoxClient.
LitterBoxMCP.py rewires its one import. New CLI subcommand
fibratus-alerts and matching MCP tool fibratus_alerts_since pull
Fibratus alerts via a LitterBox passthrough endpoint
(/api/edr/fibratus/<profile>/alerts/since) for wire-checking the agent
without dispatching a payload.

CHANGELOG updated.
2026-04-30 05:28:54 -07:00

91 lines
3.4 KiB
Python

"""Run static / dynamic / HolyGrail analyses.
Static + dynamic both go through `analyze_file`; BYOVD analyses use the
dedicated `/holygrail` endpoint. `upload_and_analyze_driver` is a
convenience wrapper for the typical "upload .sys then run HolyGrail"
flow.
"""
from pathlib import Path
from typing import BinaryIO, Dict, List, Optional, Union
from .exceptions import LitterBoxAPIError, LitterBoxError
class AnalysisMixin:
def analyze_file(
self,
target: str,
analysis_type: str,
cmd_args: Optional[List[str]] = None,
wait_for_completion: bool = True,
verify_file: bool = False,
) -> Dict:
"""Run analysis on a file or PID. `target` is either a file MD5
or a numeric PID (dynamic only)."""
self._validate_analysis_type(analysis_type, ["static", "dynamic"])
# Pre-validate the PID for dynamic-on-pid analysis so the caller
# gets a clean ValueError rather than a server-side 404.
if analysis_type == "dynamic" and target.isdigit():
try:
self.validate_process(target)
except LitterBoxAPIError as e:
if e.status_code == 404:
raise LitterBoxError(f"Process with PID {target} not found or not accessible")
raise
elif analysis_type == "static" and target.isdigit():
raise ValueError("Cannot perform static analysis on PID")
# Optional file existence check before the (potentially expensive) analysis.
if not target.isdigit() and verify_file:
try:
self.get_file_info(target)
except LitterBoxAPIError as e:
if e.status_code == 404:
raise LitterBoxError(f"File {target} not found or not yet available")
params = {"wait": "1" if wait_for_completion else "0"}
data = self._validate_command_args(cmd_args)
response = self._make_request(
"POST", f"/analyze/{analysis_type}/{target}",
params=params, json=data,
)
result = response.json()
if result.get("status") == "early_termination":
self.logger.warning(f"Analysis terminated early: {result.get('error')}")
elif result.get("status") == "error":
self.logger.error(f"Analysis failed: {result.get('error')}")
return result
def analyze_holygrail(self, file_hash: str, wait_for_completion: bool = True) -> Dict:
"""Run HolyGrail BYOVD analysis on a kernel driver."""
params = {"hash": file_hash}
if wait_for_completion:
params["wait"] = "1"
response = self._make_request("GET", "/holygrail", params=params)
return response.json()
def upload_and_analyze_driver(
self,
file_path: Union[str, Path, BinaryIO],
file_name: Optional[str] = None,
run_holygrail: bool = True,
) -> Dict:
"""Upload a kernel driver and (by default) immediately run HolyGrail."""
upload_result = self.upload_file(file_path, file_name)
file_hash = upload_result["file_info"]["md5"]
results = {"upload": upload_result, "holygrail": None}
if run_holygrail:
try:
results["holygrail"] = self.analyze_holygrail(file_hash)
except LitterBoxError as e:
self.logger.error(f"HolyGrail analysis failed: {e}")
results["holygrail"] = {"error": str(e)}
return results