fb52b1432e
Fibratus EDR profile (kind: fibratus). Pull-from-event-log model, same
shape DetonatorAgent's FibratusEdrPlugin.cs uses: operator configures
Fibratus on the EDR VM with alertsenders.eventlog: {enabled: true,
format: json}; rule matches land in the Application log. Whiskers gains
GET /api/alerts/fibratus/since which wevtutil-queries the log,
extracts <TimeCreated SystemTime> + <EventID> + <Data>, ships the raw
JSON blobs back. The new FibratusEdrAnalyzer mirrors Elastic's
two-phase shape — Phase 1 exec, Phase 2 polls Whiskers — and normalizes
Fibratus's actual schema (events[].proc.{name,exe,cmdline,parent_name,
parent_cmdline,ancestors} + bare tactic.id/technique.id/subtechnique.id
labels) into the saved-view renderer's dict.
Whiskers /api/info now reports telemetry_sources: ['fibratus'] when
fibratus.exe is at C:\Program Files\Fibratus\Bin\, so the
orchestrator can preflight before dispatching. wevtutil's single-quoted
attribute output is parsed correctly.
Dashboard reachability cache (services.edr_health). 30s TTL +
background poller every 15s. Per-probe timeouts dropped 4s/5s -> 2s.
First load post-boot waits at most one probe cycle; every subsequent
load <5ms (cache hit).
GrumpyCats package split: 1085-line monolith into:
grumpycat.py — orchestrator (14 lines)
cli/ — parser, handlers, runner
litterbox_client/ — base + per-domain mixins (files, analysis,
doppelganger, results, edr, reports, system)
composed into LitterBoxClient.
LitterBoxMCP.py rewires its one import. New CLI subcommand
fibratus-alerts and matching MCP tool fibratus_alerts_since pull
Fibratus alerts via a LitterBox passthrough endpoint
(/api/edr/fibratus/<profile>/alerts/since) for wire-checking the agent
without dispatching a payload.
CHANGELOG updated.
91 lines
3.4 KiB
Python
91 lines
3.4 KiB
Python
"""Run static / dynamic / HolyGrail analyses.
|
|
|
|
Static + dynamic both go through `analyze_file`; BYOVD analyses use the
|
|
dedicated `/holygrail` endpoint. `upload_and_analyze_driver` is a
|
|
convenience wrapper for the typical "upload .sys then run HolyGrail"
|
|
flow.
|
|
"""
|
|
|
|
from pathlib import Path
|
|
from typing import BinaryIO, Dict, List, Optional, Union
|
|
|
|
from .exceptions import LitterBoxAPIError, LitterBoxError
|
|
|
|
|
|
class AnalysisMixin:
|
|
def analyze_file(
|
|
self,
|
|
target: str,
|
|
analysis_type: str,
|
|
cmd_args: Optional[List[str]] = None,
|
|
wait_for_completion: bool = True,
|
|
verify_file: bool = False,
|
|
) -> Dict:
|
|
"""Run analysis on a file or PID. `target` is either a file MD5
|
|
or a numeric PID (dynamic only)."""
|
|
self._validate_analysis_type(analysis_type, ["static", "dynamic"])
|
|
|
|
# Pre-validate the PID for dynamic-on-pid analysis so the caller
|
|
# gets a clean ValueError rather than a server-side 404.
|
|
if analysis_type == "dynamic" and target.isdigit():
|
|
try:
|
|
self.validate_process(target)
|
|
except LitterBoxAPIError as e:
|
|
if e.status_code == 404:
|
|
raise LitterBoxError(f"Process with PID {target} not found or not accessible")
|
|
raise
|
|
elif analysis_type == "static" and target.isdigit():
|
|
raise ValueError("Cannot perform static analysis on PID")
|
|
|
|
# Optional file existence check before the (potentially expensive) analysis.
|
|
if not target.isdigit() and verify_file:
|
|
try:
|
|
self.get_file_info(target)
|
|
except LitterBoxAPIError as e:
|
|
if e.status_code == 404:
|
|
raise LitterBoxError(f"File {target} not found or not yet available")
|
|
|
|
params = {"wait": "1" if wait_for_completion else "0"}
|
|
data = self._validate_command_args(cmd_args)
|
|
|
|
response = self._make_request(
|
|
"POST", f"/analyze/{analysis_type}/{target}",
|
|
params=params, json=data,
|
|
)
|
|
|
|
result = response.json()
|
|
if result.get("status") == "early_termination":
|
|
self.logger.warning(f"Analysis terminated early: {result.get('error')}")
|
|
elif result.get("status") == "error":
|
|
self.logger.error(f"Analysis failed: {result.get('error')}")
|
|
return result
|
|
|
|
def analyze_holygrail(self, file_hash: str, wait_for_completion: bool = True) -> Dict:
|
|
"""Run HolyGrail BYOVD analysis on a kernel driver."""
|
|
params = {"hash": file_hash}
|
|
if wait_for_completion:
|
|
params["wait"] = "1"
|
|
response = self._make_request("GET", "/holygrail", params=params)
|
|
return response.json()
|
|
|
|
def upload_and_analyze_driver(
|
|
self,
|
|
file_path: Union[str, Path, BinaryIO],
|
|
file_name: Optional[str] = None,
|
|
run_holygrail: bool = True,
|
|
) -> Dict:
|
|
"""Upload a kernel driver and (by default) immediately run HolyGrail."""
|
|
upload_result = self.upload_file(file_path, file_name)
|
|
file_hash = upload_result["file_info"]["md5"]
|
|
|
|
results = {"upload": upload_result, "holygrail": None}
|
|
|
|
if run_holygrail:
|
|
try:
|
|
results["holygrail"] = self.analyze_holygrail(file_hash)
|
|
except LitterBoxError as e:
|
|
self.logger.error(f"HolyGrail analysis failed: {e}")
|
|
results["holygrail"] = {"error": str(e)}
|
|
|
|
return results
|