Files
hermes-agent/ingestion_webhook_server.py

406 lines
14 KiB
Python
Executable File

#!/usr/bin/env python3
"""
ingestion_webhook_server.py — FastAPI webhook endpoints for gbrain ingestion.
Endpoints:
POST /webhook/discord - Discord webhook payloads
POST /webhook/kali - Kali agent callbacks
POST /webhook/generic - Generic JSON payloads
GET /health - Queue status and health check
Security:
All webhook endpoints require Bearer token from INGEST_WEBHOOK_TOKEN env var.
Usage:
python3 ingestion_webhook_server.py # Run on default port 5002
python3 ingestion_webhook_server.py --port 5003 # Custom port
INGEST_WEBHOOK_TOKEN=secret python3 ingestion_webhook_server.py
"""
import argparse
import hashlib
import json
import os
import sys
import uuid
from datetime import datetime
from typing import Any, Dict, Optional
from fastapi import FastAPI, HTTPException, Header, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
import uvicorn
# Add the script directory to path for imports
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
if SCRIPT_DIR not in sys.path:
sys.path.insert(0, SCRIPT_DIR)
from ingestion_orchestrator import IngestQueue, IngestJob, JobStatus
# ─── Config ───────────────────────────────────────────────────────────────────
WEBHOOK_TOKEN = os.environ.get("INGEST_WEBHOOK_TOKEN")
DEFAULT_PORT = int(os.environ.get("INGEST_WEBHOOK_PORT", 5002))
DEFAULT_HOST = os.environ.get("INGEST_WEBHOOK_HOST", "0.0.0.0")
# Initialize FastAPI app
app = FastAPI(
title="gbrain Ingestion Webhook Server",
description="Webhook endpoints for ingesting content into gbrain",
version="1.0.0"
)
# Initialize queue
queue = IngestQueue()
# ─── Pydantic Models ───────────────────────────────────────────────────────────
class DiscordWebhookPayload(BaseModel):
"""Discord webhook payload structure."""
content: str = ""
username: Optional[str] = None
avatar_url: Optional[str] = None
embeds: list = Field(default_factory=list)
webhook_id: Optional[str] = None
webhook_token: Optional[str] = None
class KaliCallbackPayload(BaseModel):
"""Kali agent callback payload structure."""
task_id: str
agent_id: str
status: str # 'completed', 'failed', 'in_progress'
result: Optional[Dict[str, Any]] = None
output: Optional[str] = None
error: Optional[str] = None
timestamp: Optional[str] = None
metadata: Optional[Dict[str, Any]] = Field(default_factory=dict)
class GenericWebhookPayload(BaseModel):
"""Generic JSON webhook payload."""
source: Optional[str] = "generic"
slug: Optional[str] = None
title: Optional[str] = None
content: str
meta: Optional[Dict[str, Any]] = Field(default_factory=dict)
class HealthResponse(BaseModel):
"""Health check response."""
status: str
queue_stats: Dict[str, int]
timestamp: str
# ─── Security ─────────────────────────────────────────────────────────────────
def validate_token(authorization: Optional[str]) -> bool:
"""Validate Bearer token from Authorization header."""
if not WEBHOOK_TOKEN:
# If no token is configured, allow all (for development)
return True
if not authorization:
return False
# Extract token from "Bearer <token>"
parts = authorization.split()
if len(parts) != 2 or parts[0].lower() != "bearer":
return False
return parts[1] == WEBHOOK_TOKEN
# ─── Helper Functions ─────────────────────────────────────────────────────────
def generate_slug(source: str, identifier: str) -> str:
"""Generate a unique slug for ingestion."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
short_hash = hashlib.sha256(f"{identifier}{timestamp}{uuid.uuid4()}".encode()).hexdigest()[:8]
return f"{source}/{timestamp}_{short_hash}"
def hash_content(content: str) -> str:
"""Generate content hash for deduplication."""
return hashlib.sha256(content.encode()).hexdigest()[:32]
def create_ingest_job(source: str, slug: str, title: str, content: str,
meta: Dict[str, Any]) -> IngestJob:
"""Create an IngestJob from webhook data."""
now = datetime.now()
return IngestJob(
id=None,
source=source,
content_hash=hash_content(content),
slug=slug,
title=title,
content=content,
meta=meta,
status=JobStatus.PENDING,
retries=0,
error=None,
created_at=now,
updated_at=now
)
# ─── Endpoints ────────────────────────────────────────────────────────────────
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""Health check endpoint - returns queue status."""
stats = queue.get_stats()
return HealthResponse(
status="healthy",
queue_stats=stats,
timestamp=datetime.now().isoformat()
)
@app.post("/webhook/discord")
async def webhook_discord(
payload: DiscordWebhookPayload,
authorization: Optional[str] = Header(None)
):
"""Accept Discord webhook payloads and enqueue for ingestion."""
if not validate_token(authorization):
raise HTTPException(status_code=401, detail="Unauthorized")
try:
# Extract content from Discord payload
content_parts = []
title = None
if payload.content:
content_parts.append(payload.content)
# Process embeds
if payload.embeds:
for embed in payload.embeds:
embed_title = embed.get('title', '') if isinstance(embed, dict) else getattr(embed, 'title', '')
embed_desc = embed.get('description', '') if isinstance(embed, dict) else getattr(embed, 'description', '')
if embed_title and not title:
title = embed_title
if embed_desc:
content_parts.append(f"**{embed_title}**\n{embed_desc}")
content = "\n\n".join(content_parts) if content_parts else "Discord webhook (no content)"
if not title:
title = f"Discord message from {payload.username or 'unknown'}"
# Generate meta
meta = {
"webhook_source": "discord",
"username": payload.username,
"avatar_url": payload.avatar_url,
"embeds_count": len(payload.embeds),
"received_at": datetime.now().isoformat()
}
if payload.webhook_id:
meta["webhook_id"] = payload.webhook_id
# Create slug
slug = generate_slug("discord", f"{payload.username or 'unknown'}_{content[:50]}")
# Create and enqueue job
job = create_ingest_job("discord", slug, title, content, meta)
job_id = queue.enqueue(job)
if job_id:
return JSONResponse(
status_code=202,
content={
"status": "accepted",
"job_id": job_id,
"slug": slug,
"message": "Discord webhook queued for ingestion"
}
)
else:
return JSONResponse(
status_code=200,
content={
"status": "duplicate",
"slug": slug,
"message": "Content already exists (duplicate)"
}
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error processing webhook: {str(e)}")
@app.post("/webhook/kali")
async def webhook_kali(
payload: KaliCallbackPayload,
authorization: Optional[str] = Header(None)
):
"""Accept Kali agent callback payloads and enqueue for ingestion."""
if not validate_token(authorization):
raise HTTPException(status_code=401, detail="Unauthorized")
try:
# Build content from Kali callback
content_parts = [f"# Kali Agent Report: {payload.task_id}"]
content_parts.append(f"**Agent ID:** {payload.agent_id}")
content_parts.append(f"**Status:** {payload.status}")
content_parts.append(f"**Timestamp:** {payload.timestamp or datetime.now().isoformat()}")
if payload.output:
content_parts.append(f"\n## Output\n```\n{payload.output}\n```")
if payload.result:
content_parts.append(f"\n## Result\n```json\n{json.dumps(payload.result, indent=2)}\n```")
if payload.error:
content_parts.append(f"\n## Error\n```\n{payload.error}\n```")
content = "\n\n".join(content_parts)
title = f"Kali Agent Report: {payload.task_id}"
# Generate meta
meta = {
"webhook_source": "kali",
"task_id": payload.task_id,
"agent_id": payload.agent_id,
"status": payload.status,
"received_at": datetime.now().isoformat(),
**(payload.metadata or {})
}
# Create slug
slug = generate_slug("kali", f"{payload.agent_id}_{payload.task_id}")
# Create and enqueue job
job = create_ingest_job("kali", slug, title, content, meta)
job_id = queue.enqueue(job)
if job_id:
return JSONResponse(
status_code=202,
content={
"status": "accepted",
"job_id": job_id,
"slug": slug,
"message": "Kali callback queued for ingestion"
}
)
else:
return JSONResponse(
status_code=200,
content={
"status": "duplicate",
"slug": slug,
"message": "Content already exists (duplicate)"
}
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error processing webhook: {str(e)}")
@app.post("/webhook/generic")
async def webhook_generic(
request: Request,
authorization: Optional[str] = Header(None)
):
"""Accept generic JSON payloads and enqueue for ingestion."""
if not validate_token(authorization):
raise HTTPException(status_code=401, detail="Unauthorized")
try:
# Parse raw JSON to handle flexible payloads
body = await request.json()
source = body.get("source", "generic")
content = body.get("content", "")
title = body.get("title", f"Generic webhook from {source}")
slug_override = body.get("slug")
meta = body.get("meta", {})
if not content:
raise HTTPException(status_code=400, detail="Missing required field: content")
# Enrich meta
meta["webhook_source"] = "generic"
meta["original_source"] = source
meta["received_at"] = datetime.now().isoformat()
# Create slug
if slug_override:
slug = slug_override
else:
slug = generate_slug(source, f"{title}_{content[:50]}")
# Create and enqueue job
job = create_ingest_job(source, slug, title, content, meta)
job_id = queue.enqueue(job)
if job_id:
return JSONResponse(
status_code=202,
content={
"status": "accepted",
"job_id": job_id,
"slug": slug,
"message": "Generic webhook queued for ingestion"
}
)
else:
return JSONResponse(
status_code=200,
content={
"status": "duplicate",
"slug": slug,
"message": "Content already exists (duplicate)"
}
)
except json.JSONDecodeError as e:
raise HTTPException(status_code=400, detail=f"Invalid JSON: {str(e)}")
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error processing webhook: {str(e)}")
# ─── Main Entry Point ─────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="gbrain Ingestion Webhook Server")
parser.add_argument("--host", default=DEFAULT_HOST, help=f"Host to bind to (default: {DEFAULT_HOST})")
parser.add_argument("--port", type=int, default=DEFAULT_PORT, help=f"Port to bind to (default: {DEFAULT_PORT})")
parser.add_argument("--reload", action="store_true", help="Enable auto-reload (development)")
parser.add_argument("--no-auth", action="store_true", help="Disable authentication (development only)")
args = parser.parse_args()
# Check if FastAPI and uvicorn are available
try:
import fastapi
import uvicorn
except ImportError as e:
print("[ERROR] Required packages not installed. Run:")
print(" pip install fastapi uvicorn pydantic")
sys.exit(1)
# Security warning
if not WEBHOOK_TOKEN and not args.no_auth:
print("[WARNING] INGEST_WEBHOOK_TOKEN not set. Authentication is disabled.")
print(" Set INGEST_WEBHOOK_TOKEN environment variable to enable security.")
print("=" * 60)
print("gbrain Ingestion Webhook Server")
print(f" Host: {args.host}")
print(f" Port: {args.port}")
print(f" Auth: {'Enabled' if WEBHOOK_TOKEN else 'Disabled'}")
print("=" * 60)
print("\nEndpoints:")
print(f" GET http://{args.host}:{args.port}/health")
print(f" POST http://{args.host}:{args.port}/webhook/discord")
print(f" POST http://{args.host}:{args.port}/webhook/kali")
print(f" POST http://{args.host}:{args.port}/webhook/generic")
print("\nPress Ctrl+C to stop\n")
uvicorn.run(
"ingestion_webhook_server:app",
host=args.host,
port=args.port,
reload=args.reload,
log_level="info"
)
if __name__ == "__main__":
main()