406 lines
14 KiB
Python
Executable File
406 lines
14 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
ingestion_webhook_server.py — FastAPI webhook endpoints for gbrain ingestion.
|
|
|
|
Endpoints:
|
|
POST /webhook/discord - Discord webhook payloads
|
|
POST /webhook/kali - Kali agent callbacks
|
|
POST /webhook/generic - Generic JSON payloads
|
|
GET /health - Queue status and health check
|
|
|
|
Security:
|
|
All webhook endpoints require Bearer token from INGEST_WEBHOOK_TOKEN env var.
|
|
|
|
Usage:
|
|
python3 ingestion_webhook_server.py # Run on default port 5002
|
|
python3 ingestion_webhook_server.py --port 5003 # Custom port
|
|
INGEST_WEBHOOK_TOKEN=secret python3 ingestion_webhook_server.py
|
|
"""
|
|
|
|
import argparse
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import sys
|
|
import uuid
|
|
from datetime import datetime
|
|
from typing import Any, Dict, Optional
|
|
|
|
from fastapi import FastAPI, HTTPException, Header, Request
|
|
from fastapi.responses import JSONResponse
|
|
from pydantic import BaseModel, Field
|
|
import uvicorn
|
|
|
|
# Add the script directory to path for imports
|
|
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
if SCRIPT_DIR not in sys.path:
|
|
sys.path.insert(0, SCRIPT_DIR)
|
|
|
|
from ingestion_orchestrator import IngestQueue, IngestJob, JobStatus
|
|
|
|
# ─── Config ───────────────────────────────────────────────────────────────────
|
|
WEBHOOK_TOKEN = os.environ.get("INGEST_WEBHOOK_TOKEN")
|
|
DEFAULT_PORT = int(os.environ.get("INGEST_WEBHOOK_PORT", 5002))
|
|
DEFAULT_HOST = os.environ.get("INGEST_WEBHOOK_HOST", "0.0.0.0")
|
|
|
|
# Initialize FastAPI app
|
|
app = FastAPI(
|
|
title="gbrain Ingestion Webhook Server",
|
|
description="Webhook endpoints for ingesting content into gbrain",
|
|
version="1.0.0"
|
|
)
|
|
|
|
# Initialize queue
|
|
queue = IngestQueue()
|
|
|
|
# ─── Pydantic Models ───────────────────────────────────────────────────────────
|
|
|
|
class DiscordWebhookPayload(BaseModel):
|
|
"""Discord webhook payload structure."""
|
|
content: str = ""
|
|
username: Optional[str] = None
|
|
avatar_url: Optional[str] = None
|
|
embeds: list = Field(default_factory=list)
|
|
webhook_id: Optional[str] = None
|
|
webhook_token: Optional[str] = None
|
|
|
|
class KaliCallbackPayload(BaseModel):
|
|
"""Kali agent callback payload structure."""
|
|
task_id: str
|
|
agent_id: str
|
|
status: str # 'completed', 'failed', 'in_progress'
|
|
result: Optional[Dict[str, Any]] = None
|
|
output: Optional[str] = None
|
|
error: Optional[str] = None
|
|
timestamp: Optional[str] = None
|
|
metadata: Optional[Dict[str, Any]] = Field(default_factory=dict)
|
|
|
|
class GenericWebhookPayload(BaseModel):
|
|
"""Generic JSON webhook payload."""
|
|
source: Optional[str] = "generic"
|
|
slug: Optional[str] = None
|
|
title: Optional[str] = None
|
|
content: str
|
|
meta: Optional[Dict[str, Any]] = Field(default_factory=dict)
|
|
|
|
class HealthResponse(BaseModel):
|
|
"""Health check response."""
|
|
status: str
|
|
queue_stats: Dict[str, int]
|
|
timestamp: str
|
|
|
|
# ─── Security ─────────────────────────────────────────────────────────────────
|
|
|
|
def validate_token(authorization: Optional[str]) -> bool:
|
|
"""Validate Bearer token from Authorization header."""
|
|
if not WEBHOOK_TOKEN:
|
|
# If no token is configured, allow all (for development)
|
|
return True
|
|
|
|
if not authorization:
|
|
return False
|
|
|
|
# Extract token from "Bearer <token>"
|
|
parts = authorization.split()
|
|
if len(parts) != 2 or parts[0].lower() != "bearer":
|
|
return False
|
|
|
|
return parts[1] == WEBHOOK_TOKEN
|
|
|
|
# ─── Helper Functions ─────────────────────────────────────────────────────────
|
|
|
|
def generate_slug(source: str, identifier: str) -> str:
|
|
"""Generate a unique slug for ingestion."""
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
short_hash = hashlib.sha256(f"{identifier}{timestamp}{uuid.uuid4()}".encode()).hexdigest()[:8]
|
|
return f"{source}/{timestamp}_{short_hash}"
|
|
|
|
def hash_content(content: str) -> str:
|
|
"""Generate content hash for deduplication."""
|
|
return hashlib.sha256(content.encode()).hexdigest()[:32]
|
|
|
|
def create_ingest_job(source: str, slug: str, title: str, content: str,
|
|
meta: Dict[str, Any]) -> IngestJob:
|
|
"""Create an IngestJob from webhook data."""
|
|
now = datetime.now()
|
|
return IngestJob(
|
|
id=None,
|
|
source=source,
|
|
content_hash=hash_content(content),
|
|
slug=slug,
|
|
title=title,
|
|
content=content,
|
|
meta=meta,
|
|
status=JobStatus.PENDING,
|
|
retries=0,
|
|
error=None,
|
|
created_at=now,
|
|
updated_at=now
|
|
)
|
|
|
|
# ─── Endpoints ────────────────────────────────────────────────────────────────
|
|
|
|
@app.get("/health", response_model=HealthResponse)
|
|
async def health_check():
|
|
"""Health check endpoint - returns queue status."""
|
|
stats = queue.get_stats()
|
|
return HealthResponse(
|
|
status="healthy",
|
|
queue_stats=stats,
|
|
timestamp=datetime.now().isoformat()
|
|
)
|
|
|
|
@app.post("/webhook/discord")
|
|
async def webhook_discord(
|
|
payload: DiscordWebhookPayload,
|
|
authorization: Optional[str] = Header(None)
|
|
):
|
|
"""Accept Discord webhook payloads and enqueue for ingestion."""
|
|
if not validate_token(authorization):
|
|
raise HTTPException(status_code=401, detail="Unauthorized")
|
|
|
|
try:
|
|
# Extract content from Discord payload
|
|
content_parts = []
|
|
title = None
|
|
|
|
if payload.content:
|
|
content_parts.append(payload.content)
|
|
|
|
# Process embeds
|
|
if payload.embeds:
|
|
for embed in payload.embeds:
|
|
embed_title = embed.get('title', '') if isinstance(embed, dict) else getattr(embed, 'title', '')
|
|
embed_desc = embed.get('description', '') if isinstance(embed, dict) else getattr(embed, 'description', '')
|
|
|
|
if embed_title and not title:
|
|
title = embed_title
|
|
if embed_desc:
|
|
content_parts.append(f"**{embed_title}**\n{embed_desc}")
|
|
|
|
content = "\n\n".join(content_parts) if content_parts else "Discord webhook (no content)"
|
|
|
|
if not title:
|
|
title = f"Discord message from {payload.username or 'unknown'}"
|
|
|
|
# Generate meta
|
|
meta = {
|
|
"webhook_source": "discord",
|
|
"username": payload.username,
|
|
"avatar_url": payload.avatar_url,
|
|
"embeds_count": len(payload.embeds),
|
|
"received_at": datetime.now().isoformat()
|
|
}
|
|
if payload.webhook_id:
|
|
meta["webhook_id"] = payload.webhook_id
|
|
|
|
# Create slug
|
|
slug = generate_slug("discord", f"{payload.username or 'unknown'}_{content[:50]}")
|
|
|
|
# Create and enqueue job
|
|
job = create_ingest_job("discord", slug, title, content, meta)
|
|
job_id = queue.enqueue(job)
|
|
|
|
if job_id:
|
|
return JSONResponse(
|
|
status_code=202,
|
|
content={
|
|
"status": "accepted",
|
|
"job_id": job_id,
|
|
"slug": slug,
|
|
"message": "Discord webhook queued for ingestion"
|
|
}
|
|
)
|
|
else:
|
|
return JSONResponse(
|
|
status_code=200,
|
|
content={
|
|
"status": "duplicate",
|
|
"slug": slug,
|
|
"message": "Content already exists (duplicate)"
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Error processing webhook: {str(e)}")
|
|
|
|
@app.post("/webhook/kali")
|
|
async def webhook_kali(
|
|
payload: KaliCallbackPayload,
|
|
authorization: Optional[str] = Header(None)
|
|
):
|
|
"""Accept Kali agent callback payloads and enqueue for ingestion."""
|
|
if not validate_token(authorization):
|
|
raise HTTPException(status_code=401, detail="Unauthorized")
|
|
|
|
try:
|
|
# Build content from Kali callback
|
|
content_parts = [f"# Kali Agent Report: {payload.task_id}"]
|
|
content_parts.append(f"**Agent ID:** {payload.agent_id}")
|
|
content_parts.append(f"**Status:** {payload.status}")
|
|
content_parts.append(f"**Timestamp:** {payload.timestamp or datetime.now().isoformat()}")
|
|
|
|
if payload.output:
|
|
content_parts.append(f"\n## Output\n```\n{payload.output}\n```")
|
|
|
|
if payload.result:
|
|
content_parts.append(f"\n## Result\n```json\n{json.dumps(payload.result, indent=2)}\n```")
|
|
|
|
if payload.error:
|
|
content_parts.append(f"\n## Error\n```\n{payload.error}\n```")
|
|
|
|
content = "\n\n".join(content_parts)
|
|
title = f"Kali Agent Report: {payload.task_id}"
|
|
|
|
# Generate meta
|
|
meta = {
|
|
"webhook_source": "kali",
|
|
"task_id": payload.task_id,
|
|
"agent_id": payload.agent_id,
|
|
"status": payload.status,
|
|
"received_at": datetime.now().isoformat(),
|
|
**(payload.metadata or {})
|
|
}
|
|
|
|
# Create slug
|
|
slug = generate_slug("kali", f"{payload.agent_id}_{payload.task_id}")
|
|
|
|
# Create and enqueue job
|
|
job = create_ingest_job("kali", slug, title, content, meta)
|
|
job_id = queue.enqueue(job)
|
|
|
|
if job_id:
|
|
return JSONResponse(
|
|
status_code=202,
|
|
content={
|
|
"status": "accepted",
|
|
"job_id": job_id,
|
|
"slug": slug,
|
|
"message": "Kali callback queued for ingestion"
|
|
}
|
|
)
|
|
else:
|
|
return JSONResponse(
|
|
status_code=200,
|
|
content={
|
|
"status": "duplicate",
|
|
"slug": slug,
|
|
"message": "Content already exists (duplicate)"
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Error processing webhook: {str(e)}")
|
|
|
|
@app.post("/webhook/generic")
|
|
async def webhook_generic(
|
|
request: Request,
|
|
authorization: Optional[str] = Header(None)
|
|
):
|
|
"""Accept generic JSON payloads and enqueue for ingestion."""
|
|
if not validate_token(authorization):
|
|
raise HTTPException(status_code=401, detail="Unauthorized")
|
|
|
|
try:
|
|
# Parse raw JSON to handle flexible payloads
|
|
body = await request.json()
|
|
|
|
source = body.get("source", "generic")
|
|
content = body.get("content", "")
|
|
title = body.get("title", f"Generic webhook from {source}")
|
|
slug_override = body.get("slug")
|
|
meta = body.get("meta", {})
|
|
|
|
if not content:
|
|
raise HTTPException(status_code=400, detail="Missing required field: content")
|
|
|
|
# Enrich meta
|
|
meta["webhook_source"] = "generic"
|
|
meta["original_source"] = source
|
|
meta["received_at"] = datetime.now().isoformat()
|
|
|
|
# Create slug
|
|
if slug_override:
|
|
slug = slug_override
|
|
else:
|
|
slug = generate_slug(source, f"{title}_{content[:50]}")
|
|
|
|
# Create and enqueue job
|
|
job = create_ingest_job(source, slug, title, content, meta)
|
|
job_id = queue.enqueue(job)
|
|
|
|
if job_id:
|
|
return JSONResponse(
|
|
status_code=202,
|
|
content={
|
|
"status": "accepted",
|
|
"job_id": job_id,
|
|
"slug": slug,
|
|
"message": "Generic webhook queued for ingestion"
|
|
}
|
|
)
|
|
else:
|
|
return JSONResponse(
|
|
status_code=200,
|
|
content={
|
|
"status": "duplicate",
|
|
"slug": slug,
|
|
"message": "Content already exists (duplicate)"
|
|
}
|
|
)
|
|
|
|
except json.JSONDecodeError as e:
|
|
raise HTTPException(status_code=400, detail=f"Invalid JSON: {str(e)}")
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Error processing webhook: {str(e)}")
|
|
|
|
# ─── Main Entry Point ─────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="gbrain Ingestion Webhook Server")
|
|
parser.add_argument("--host", default=DEFAULT_HOST, help=f"Host to bind to (default: {DEFAULT_HOST})")
|
|
parser.add_argument("--port", type=int, default=DEFAULT_PORT, help=f"Port to bind to (default: {DEFAULT_PORT})")
|
|
parser.add_argument("--reload", action="store_true", help="Enable auto-reload (development)")
|
|
parser.add_argument("--no-auth", action="store_true", help="Disable authentication (development only)")
|
|
args = parser.parse_args()
|
|
|
|
# Check if FastAPI and uvicorn are available
|
|
try:
|
|
import fastapi
|
|
import uvicorn
|
|
except ImportError as e:
|
|
print("[ERROR] Required packages not installed. Run:")
|
|
print(" pip install fastapi uvicorn pydantic")
|
|
sys.exit(1)
|
|
|
|
# Security warning
|
|
if not WEBHOOK_TOKEN and not args.no_auth:
|
|
print("[WARNING] INGEST_WEBHOOK_TOKEN not set. Authentication is disabled.")
|
|
print(" Set INGEST_WEBHOOK_TOKEN environment variable to enable security.")
|
|
|
|
print("=" * 60)
|
|
print("gbrain Ingestion Webhook Server")
|
|
print(f" Host: {args.host}")
|
|
print(f" Port: {args.port}")
|
|
print(f" Auth: {'Enabled' if WEBHOOK_TOKEN else 'Disabled'}")
|
|
print("=" * 60)
|
|
print("\nEndpoints:")
|
|
print(f" GET http://{args.host}:{args.port}/health")
|
|
print(f" POST http://{args.host}:{args.port}/webhook/discord")
|
|
print(f" POST http://{args.host}:{args.port}/webhook/kali")
|
|
print(f" POST http://{args.host}:{args.port}/webhook/generic")
|
|
print("\nPress Ctrl+C to stop\n")
|
|
|
|
uvicorn.run(
|
|
"ingestion_webhook_server:app",
|
|
host=args.host,
|
|
port=args.port,
|
|
reload=args.reload,
|
|
log_level="info"
|
|
)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|