Files
BlackSnufkin fb52b1432e Add Fibratus EDR profile + dashboard cache + GrumpyCats package split
Fibratus EDR profile (kind: fibratus). Pull-from-event-log model, same
shape DetonatorAgent's FibratusEdrPlugin.cs uses: operator configures
Fibratus on the EDR VM with alertsenders.eventlog: {enabled: true,
format: json}; rule matches land in the Application log. Whiskers gains
GET /api/alerts/fibratus/since which wevtutil-queries the log,
extracts <TimeCreated SystemTime> + <EventID> + <Data>, ships the raw
JSON blobs back. The new FibratusEdrAnalyzer mirrors Elastic's
two-phase shape — Phase 1 exec, Phase 2 polls Whiskers — and normalizes
Fibratus's actual schema (events[].proc.{name,exe,cmdline,parent_name,
parent_cmdline,ancestors} + bare tactic.id/technique.id/subtechnique.id
labels) into the saved-view renderer's dict.

Whiskers /api/info now reports telemetry_sources: ['fibratus'] when
fibratus.exe is at C:\Program Files\Fibratus\Bin\, so the
orchestrator can preflight before dispatching. wevtutil's single-quoted
attribute output is parsed correctly.

Dashboard reachability cache (services.edr_health). 30s TTL +
background poller every 15s. Per-probe timeouts dropped 4s/5s -> 2s.
First load post-boot waits at most one probe cycle; every subsequent
load <5ms (cache hit).

GrumpyCats package split: 1085-line monolith into:
  grumpycat.py      — orchestrator (14 lines)
  cli/              — parser, handlers, runner
  litterbox_client/ — base + per-domain mixins (files, analysis,
                       doppelganger, results, edr, reports, system)
                       composed into LitterBoxClient.
LitterBoxMCP.py rewires its one import. New CLI subcommand
fibratus-alerts and matching MCP tool fibratus_alerts_since pull
Fibratus alerts via a LitterBox passthrough endpoint
(/api/edr/fibratus/<profile>/alerts/since) for wire-checking the agent
without dispatching a payload.

CHANGELOG updated.
2026-04-30 05:28:54 -07:00

120 lines
4.7 KiB
Python

"""EDR operations — Whiskers + Elastic Defend + Fibratus profiles.
Two split-phase analyzer flavours live behind one set of endpoints:
* `kind: elastic` — LitterBox queries an Elastic stack for alerts.
* `kind: fibratus` — LitterBox polls Whiskers's event-log endpoint for
Fibratus rule matches (DetonatorAgent shape).
The CLI / MCP helpers don't need to care about the kind for dispatch
(`analyze_edr` works for both); they only diverge for the
`fibratus_alerts_since` test helper.
"""
import time
from typing import Dict, List, Optional
from .exceptions import LitterBoxAPIError
class EdrMixin:
def list_edr_profiles(self) -> Dict:
"""List EDR profiles registered under Config/edr_profiles/."""
response = self._make_request("GET", "/api/edr/profiles")
return response.json()
def get_edr_agents_status(self) -> Dict:
"""Latest reachability snapshot for every registered EDR profile.
Server-side TTL-cached + pre-warmed by a background poller, so
this is effectively instant under steady-state operation."""
response = self._make_request("GET", "/api/edr/agents/status")
return response.json()
def analyze_edr(
self,
file_hash: str,
profile: str,
cmd_args: Optional[List[str]] = None,
xor_key: Optional[int] = None,
) -> Dict:
"""Dispatch a payload to a registered EDR profile.
Returns the Phase-1 result immediately (status='polling_alerts'
on a successful exec; 'blocked_by_av' / 'agent_unreachable' /
'busy' / 'error' otherwise). Phase-2 (alert correlation) runs in
a server-side daemon thread; poll
`get_edr_results(file_hash, profile)` until status is no longer
'polling_alerts', or use `wait_for_edr_completion` below.
"""
data: Dict = {}
if cmd_args:
data.update(self._validate_command_args(cmd_args))
if xor_key is not None:
if not 0 <= xor_key <= 255:
raise ValueError(f"xor_key must be 0-255, got {xor_key}")
data["xor_key"] = xor_key
response = self._make_request(
"POST", f"/analyze/edr/{profile}/{file_hash}", json=data,
)
return response.json()
def get_edr_results(self, file_hash: str, profile: str) -> Dict:
"""Fetch the saved findings for a specific EDR profile run."""
response = self._make_request(
"GET", f"/api/results/edr/{profile}/{file_hash}",
)
return response.json()
def get_edr_index(self, file_hash: str) -> Dict:
"""Fetch every saved EDR run for a target (one entry per profile)."""
response = self._make_request("GET", f"/api/results/edr/{file_hash}")
return response.json()
def wait_for_edr_completion(
self,
file_hash: str,
profile: str,
interval: float = 3.0,
timeout: float = 180.0,
) -> Dict:
"""Block until Phase-2 settles, the saved JSON appears for the
first time, or `timeout` elapses. Returns the last-seen findings
dict (may still be 'polling_alerts' on timeout — caller decides)."""
deadline = time.monotonic() + timeout
last: Optional[Dict] = None
while time.monotonic() < deadline:
try:
last = self.get_edr_results(file_hash, profile)
if (last or {}).get("status") and last.get("status") != "polling_alerts":
return last
except LitterBoxAPIError as e:
# 404 just means Phase-1 hasn't kicked off yet — keep polling.
if e.status_code != 404:
raise
time.sleep(interval)
return last or {"status": "timeout", "error": f"Phase-2 timeout after {timeout}s"}
def fibratus_alerts_since(
self,
profile: str,
since_iso: str,
until_iso: Optional[str] = None,
) -> Dict:
"""Test/debug helper: ask LitterBox to passthrough-query the
Whiskers agent's `/api/alerts/fibratus/since` for `profile`.
Useful right after Fibratus is installed on a new VM — you can
verify the Fibratus → Application event log → Whiskers wire end
to end without dispatching a payload. Returns the agent's raw
`{supported, events: [...]}` shape; `data` strings inside each
event are unparsed JSON the caller can deserialize.
"""
params = {"from": since_iso}
if until_iso:
params["until"] = until_iso
response = self._make_request(
"GET", f"/api/edr/fibratus/{profile}/alerts/since", params=params,
)
return response.json()