fb52b1432e
Fibratus EDR profile (kind: fibratus). Pull-from-event-log model, same
shape DetonatorAgent's FibratusEdrPlugin.cs uses: operator configures
Fibratus on the EDR VM with alertsenders.eventlog: {enabled: true,
format: json}; rule matches land in the Application log. Whiskers gains
GET /api/alerts/fibratus/since which wevtutil-queries the log,
extracts <TimeCreated SystemTime> + <EventID> + <Data>, ships the raw
JSON blobs back. The new FibratusEdrAnalyzer mirrors Elastic's
two-phase shape — Phase 1 exec, Phase 2 polls Whiskers — and normalizes
Fibratus's actual schema (events[].proc.{name,exe,cmdline,parent_name,
parent_cmdline,ancestors} + bare tactic.id/technique.id/subtechnique.id
labels) into the saved-view renderer's dict.
Whiskers /api/info now reports telemetry_sources: ['fibratus'] when
fibratus.exe is at C:\Program Files\Fibratus\Bin\, so the
orchestrator can preflight before dispatching. wevtutil's single-quoted
attribute output is parsed correctly.
Dashboard reachability cache (services.edr_health). 30s TTL +
background poller every 15s. Per-probe timeouts dropped 4s/5s -> 2s.
First load post-boot waits at most one probe cycle; every subsequent
load <5ms (cache hit).
GrumpyCats package split: 1085-line monolith into:
grumpycat.py — orchestrator (14 lines)
cli/ — parser, handlers, runner
litterbox_client/ — base + per-domain mixins (files, analysis,
doppelganger, results, edr, reports, system)
composed into LitterBoxClient.
LitterBoxMCP.py rewires its one import. New CLI subcommand
fibratus-alerts and matching MCP tool fibratus_alerts_since pull
Fibratus alerts via a LitterBox passthrough endpoint
(/api/edr/fibratus/<profile>/alerts/since) for wire-checking the agent
without dispatching a payload.
CHANGELOG updated.
86 lines
3.5 KiB
Python
86 lines
3.5 KiB
Python
"""Doppelganger operations — Blender system snapshot + FuzzyHash similarity.
|
|
|
|
The single `/doppelganger` server endpoint multiplexes both engines and
|
|
the four operation kinds (`scan` / `compare` / `analyze` / `create_db`).
|
|
We expose one unified entry point plus four convenience wrappers.
|
|
"""
|
|
|
|
from typing import Dict, List, Optional
|
|
|
|
|
|
class DoppelgangerMixin:
|
|
def doppelganger_operation(
|
|
self,
|
|
analysis_type: str,
|
|
operation: str,
|
|
file_hash: Optional[str] = None,
|
|
folder_path: Optional[str] = None,
|
|
extensions: Optional[List[str]] = None,
|
|
threshold: int = 1,
|
|
) -> Dict:
|
|
"""Unified Doppelganger entry point. The four convenience wrappers
|
|
below cover the operations that actually have CLI shape."""
|
|
self._validate_doppelganger_params(analysis_type, operation, file_hash, folder_path)
|
|
|
|
# Comparison reads use GET so they're cheap to retry.
|
|
if file_hash and operation in ["compare", "analyze"]:
|
|
params = {"type": analysis_type, "hash": file_hash}
|
|
if operation == "analyze" and analysis_type == "fuzzy":
|
|
params["threshold"] = threshold
|
|
response = self._make_request("GET", "/doppelganger", params=params)
|
|
return response.json()
|
|
|
|
# Mutating ops (scan / create_db / analyze with extra args) go via POST.
|
|
data = {"type": analysis_type, "operation": operation}
|
|
|
|
if operation == "create_db":
|
|
data["folder_path"] = folder_path
|
|
if extensions:
|
|
data["extensions"] = extensions
|
|
elif operation == "analyze":
|
|
data["hash"] = file_hash
|
|
data["threshold"] = threshold
|
|
|
|
response = self._make_request("POST", "/doppelganger", json=data)
|
|
return response.json()
|
|
|
|
def run_blender_scan(self) -> Dict:
|
|
"""Run a system-wide Blender host snapshot."""
|
|
return self.doppelganger_operation("blender", "scan")
|
|
|
|
def compare_with_blender(self, file_hash: str) -> Dict:
|
|
"""Compare a file against the latest Blender host snapshot."""
|
|
return self.doppelganger_operation("blender", "compare", file_hash=file_hash)
|
|
|
|
def create_fuzzy_database(
|
|
self, folder_path: str, extensions: Optional[List[str]] = None,
|
|
) -> Dict:
|
|
"""(Re)build the FuzzyHash baseline DB from a folder of references."""
|
|
return self.doppelganger_operation(
|
|
"fuzzy", "create_db", folder_path=folder_path, extensions=extensions,
|
|
)
|
|
|
|
def analyze_with_fuzzy(self, file_hash: str, threshold: int = 1) -> Dict:
|
|
"""Score a payload's similarity to the baseline via fuzzy hashing."""
|
|
return self.doppelganger_operation(
|
|
"fuzzy", "analyze", file_hash=file_hash, threshold=threshold,
|
|
)
|
|
|
|
# ---- internal --------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def _validate_doppelganger_params(
|
|
analysis_type: str,
|
|
operation: str,
|
|
file_hash: Optional[str],
|
|
folder_path: Optional[str],
|
|
):
|
|
if analysis_type not in ["blender", "fuzzy"]:
|
|
raise ValueError("analysis_type must be either 'blender' or 'fuzzy'")
|
|
if operation == "scan" and analysis_type != "blender":
|
|
raise ValueError("scan operation is only available for blender analysis")
|
|
if operation == "create_db" and not folder_path:
|
|
raise ValueError("folder_path is required for create_db operation")
|
|
if operation == "analyze" and not file_hash:
|
|
raise ValueError("file_hash is required for analyze operation")
|