Files
hermes-agent/kali_agent_callback_receiver.py

87 lines
3.1 KiB
Python

#!/usr/bin/env python3
"""
Kali Learning Agent Callback Receiver
Listens on :5001 for task completion callbacks from the Kali Docker learning agent.
Logs to ~/.hermes/kali-agent/notifications.log and optionally notifies Hermes.
"""
import json, datetime, pathlib, sys
from flask import Flask, request, jsonify
app = Flask(__name__)
LOG_DIR = pathlib.Path.home() / ".hermes" / "kali-agent"
LOG_DIR.mkdir(exist_ok=True)
NOTIF_LOG = LOG_DIR / "notifications.log"
ENGAGEMENT_LOG = LOG_DIR / "engagement_eng-test-001.log"
ENGAGEMENT_EVIDENCE = pathlib.Path.home() / "greysec" / "engagements" / "eng-test-001"
def log(msg):
ts = datetime.datetime.utcnow().isoformat()
line = f"[{ts}] {msg}"
print(line, flush=True)
with open(NOTIF_LOG, "a") as f:
f.write(line + "\n")
def log_engagement(msg):
ts = datetime.datetime.utcnow().isoformat()
line = f"[{ts}] {msg}"
with open(ENGAGEMENT_LOG, "a") as f:
f.write(line + "\n")
@app.route("/callback", methods=["POST"])
def callback():
data = request.json or {}
task_id = data.get("task_id", "unknown")
status = data.get("status", "unknown")
command = data.get("command", "")
result = data.get("result", "")
error = data.get("error", "")
exit_code = data.get("exit_code", -1)
tool = data.get("tool", "")
completed_at = data.get("completed_at", "")
summary = result[:200].replace("\n", " ") if result else ""
log(f"CALLBACK task_id={task_id} status={status} tool={tool} exit_code={exit_code}{summary}")
log_engagement(f"TASK_COMPLETE | {task_id} | {tool} | {status} | exit={exit_code} | cmd: {command[:80]}")
# Store result in evidence directory
if result or error:
phase_dir = ENGAGEMENT_EVIDENCE / "recon"
if "nmap" in command.lower():
phase_dir = ENGAGEMENT_EVIDENCE / "recon"
elif "exploit" in status or "msf" in command.lower():
phase_dir = ENGAGEMENT_EVIDENCE / "exploitation"
elif "priv" in status or "escalate" in command.lower():
phase_dir = ENGAGEMENT_EVIDENCE / "privesc"
elif "lateral" in command.lower():
phase_dir = ENGAGEMENT_EVIDENCE / "lateral"
phase_dir.mkdir(exist_ok=True)
out_file = phase_dir / f"{task_id}_{tool}.txt"
content = f"Command: {command}\nExit Code: {exit_code}\nCompleted: {completed_at}\nStatus: {status}\n\nSTDOUT:\n{result}\n\nSTDERR:\n{error}"
with open(out_file, "w") as f:
f.write(content)
log(f"Saved result to {out_file}")
return jsonify({"received": True, "task_id": task_id}), 200
@app.route("/health")
def health():
return jsonify({"status": "ok", "service": "kali-callback-receiver"})
@app.route("/status")
def status():
"""Return recent callback log entries."""
if NOTIF_LOG.exists():
lines = NOTIF_LOG.read_text().strip().split("\n")[-20:]
else:
lines = []
return jsonify({"log": lines, "count": len(lines)})
if __name__ == "__main__":
port = int(sys.argv[1]) if len(sys.argv) > 1 else 5001
log(f"Starting Kali agent callback receiver on port {port}")
app.run(host="0.0.0.0", port=port, debug=False)