fb52b1432e
Fibratus EDR profile (kind: fibratus). Pull-from-event-log model, same
shape DetonatorAgent's FibratusEdrPlugin.cs uses: operator configures
Fibratus on the EDR VM with alertsenders.eventlog: {enabled: true,
format: json}; rule matches land in the Application log. Whiskers gains
GET /api/alerts/fibratus/since which wevtutil-queries the log,
extracts <TimeCreated SystemTime> + <EventID> + <Data>, ships the raw
JSON blobs back. The new FibratusEdrAnalyzer mirrors Elastic's
two-phase shape — Phase 1 exec, Phase 2 polls Whiskers — and normalizes
Fibratus's actual schema (events[].proc.{name,exe,cmdline,parent_name,
parent_cmdline,ancestors} + bare tactic.id/technique.id/subtechnique.id
labels) into the saved-view renderer's dict.
Whiskers /api/info now reports telemetry_sources: ['fibratus'] when
fibratus.exe is at C:\Program Files\Fibratus\Bin\, so the
orchestrator can preflight before dispatching. wevtutil's single-quoted
attribute output is parsed correctly.
Dashboard reachability cache (services.edr_health). 30s TTL +
background poller every 15s. Per-probe timeouts dropped 4s/5s -> 2s.
First load post-boot waits at most one probe cycle; every subsequent
load <5ms (cache hit).
GrumpyCats package split: 1085-line monolith into:
grumpycat.py — orchestrator (14 lines)
cli/ — parser, handlers, runner
litterbox_client/ — base + per-domain mixins (files, analysis,
doppelganger, results, edr, reports, system)
composed into LitterBoxClient.
LitterBoxMCP.py rewires its one import. New CLI subcommand
fibratus-alerts and matching MCP tool fibratus_alerts_since pull
Fibratus alerts via a LitterBox passthrough endpoint
(/api/edr/fibratus/<profile>/alerts/since) for wire-checking the agent
without dispatching a payload.
CHANGELOG updated.
120 lines
4.7 KiB
Python
120 lines
4.7 KiB
Python
"""EDR operations — Whiskers + Elastic Defend + Fibratus profiles.
|
|
|
|
Two split-phase analyzer flavours live behind one set of endpoints:
|
|
* `kind: elastic` — LitterBox queries an Elastic stack for alerts.
|
|
* `kind: fibratus` — LitterBox polls Whiskers's event-log endpoint for
|
|
Fibratus rule matches (DetonatorAgent shape).
|
|
|
|
The CLI / MCP helpers don't need to care about the kind for dispatch
|
|
(`analyze_edr` works for both); they only diverge for the
|
|
`fibratus_alerts_since` test helper.
|
|
"""
|
|
|
|
import time
|
|
from typing import Dict, List, Optional
|
|
|
|
from .exceptions import LitterBoxAPIError
|
|
|
|
|
|
class EdrMixin:
|
|
def list_edr_profiles(self) -> Dict:
|
|
"""List EDR profiles registered under Config/edr_profiles/."""
|
|
response = self._make_request("GET", "/api/edr/profiles")
|
|
return response.json()
|
|
|
|
def get_edr_agents_status(self) -> Dict:
|
|
"""Latest reachability snapshot for every registered EDR profile.
|
|
|
|
Server-side TTL-cached + pre-warmed by a background poller, so
|
|
this is effectively instant under steady-state operation."""
|
|
response = self._make_request("GET", "/api/edr/agents/status")
|
|
return response.json()
|
|
|
|
def analyze_edr(
|
|
self,
|
|
file_hash: str,
|
|
profile: str,
|
|
cmd_args: Optional[List[str]] = None,
|
|
xor_key: Optional[int] = None,
|
|
) -> Dict:
|
|
"""Dispatch a payload to a registered EDR profile.
|
|
|
|
Returns the Phase-1 result immediately (status='polling_alerts'
|
|
on a successful exec; 'blocked_by_av' / 'agent_unreachable' /
|
|
'busy' / 'error' otherwise). Phase-2 (alert correlation) runs in
|
|
a server-side daemon thread; poll
|
|
`get_edr_results(file_hash, profile)` until status is no longer
|
|
'polling_alerts', or use `wait_for_edr_completion` below.
|
|
"""
|
|
data: Dict = {}
|
|
if cmd_args:
|
|
data.update(self._validate_command_args(cmd_args))
|
|
if xor_key is not None:
|
|
if not 0 <= xor_key <= 255:
|
|
raise ValueError(f"xor_key must be 0-255, got {xor_key}")
|
|
data["xor_key"] = xor_key
|
|
|
|
response = self._make_request(
|
|
"POST", f"/analyze/edr/{profile}/{file_hash}", json=data,
|
|
)
|
|
return response.json()
|
|
|
|
def get_edr_results(self, file_hash: str, profile: str) -> Dict:
|
|
"""Fetch the saved findings for a specific EDR profile run."""
|
|
response = self._make_request(
|
|
"GET", f"/api/results/edr/{profile}/{file_hash}",
|
|
)
|
|
return response.json()
|
|
|
|
def get_edr_index(self, file_hash: str) -> Dict:
|
|
"""Fetch every saved EDR run for a target (one entry per profile)."""
|
|
response = self._make_request("GET", f"/api/results/edr/{file_hash}")
|
|
return response.json()
|
|
|
|
def wait_for_edr_completion(
|
|
self,
|
|
file_hash: str,
|
|
profile: str,
|
|
interval: float = 3.0,
|
|
timeout: float = 180.0,
|
|
) -> Dict:
|
|
"""Block until Phase-2 settles, the saved JSON appears for the
|
|
first time, or `timeout` elapses. Returns the last-seen findings
|
|
dict (may still be 'polling_alerts' on timeout — caller decides)."""
|
|
deadline = time.monotonic() + timeout
|
|
last: Optional[Dict] = None
|
|
while time.monotonic() < deadline:
|
|
try:
|
|
last = self.get_edr_results(file_hash, profile)
|
|
if (last or {}).get("status") and last.get("status") != "polling_alerts":
|
|
return last
|
|
except LitterBoxAPIError as e:
|
|
# 404 just means Phase-1 hasn't kicked off yet — keep polling.
|
|
if e.status_code != 404:
|
|
raise
|
|
time.sleep(interval)
|
|
return last or {"status": "timeout", "error": f"Phase-2 timeout after {timeout}s"}
|
|
|
|
def fibratus_alerts_since(
|
|
self,
|
|
profile: str,
|
|
since_iso: str,
|
|
until_iso: Optional[str] = None,
|
|
) -> Dict:
|
|
"""Test/debug helper: ask LitterBox to passthrough-query the
|
|
Whiskers agent's `/api/alerts/fibratus/since` for `profile`.
|
|
|
|
Useful right after Fibratus is installed on a new VM — you can
|
|
verify the Fibratus → Application event log → Whiskers wire end
|
|
to end without dispatching a payload. Returns the agent's raw
|
|
`{supported, events: [...]}` shape; `data` strings inside each
|
|
event are unparsed JSON the caller can deserialize.
|
|
"""
|
|
params = {"from": since_iso}
|
|
if until_iso:
|
|
params["until"] = until_iso
|
|
response = self._make_request(
|
|
"GET", f"/api/edr/fibratus/{profile}/alerts/since", params=params,
|
|
)
|
|
return response.json()
|