316 lines
9.5 KiB
Python
316 lines
9.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Lightweight gbrain client for Hermes pipeline integration.
|
|
Wraps gbrain CLI commands with proper arg handling and JSON parsing.
|
|
"""
|
|
import subprocess
|
|
import os
|
|
import json
|
|
import sys
|
|
import re
|
|
from pathlib import Path
|
|
from datetime import datetime, date
|
|
|
|
try:
|
|
import psycopg2
|
|
HAS_PG = True
|
|
except ImportError:
|
|
HAS_PG = False
|
|
|
|
GBRAIN_BUN = str(Path.home() / ".bun" / "bin" / "bun")
|
|
GBRAIN_CLI = str(Path.home() / ".bun" / "install" / "global" / "node_modules" / "gbrain" / "src" / "cli.ts")
|
|
OLLAMA_HOST = os.environ.get("OLLAMA_HOST", "http://localhost:11434")
|
|
|
|
# gbrain PostgreSQL connection (Docker container: gbrain-postgres on port 5433)
|
|
GBRAIN_DB = {
|
|
"host": "localhost",
|
|
"port": 5433,
|
|
"user": "gbrain",
|
|
"dbname": "gbrain",
|
|
"password": "gbrain", # Docker container POSTGRES_PASSWORD
|
|
}
|
|
|
|
|
|
def _pg_connect():
|
|
if not HAS_PG:
|
|
raise RuntimeError("psycopg2 not installed — run: pip install psycopg2-binary")
|
|
return psycopg2.connect(**GBRAIN_DB)
|
|
|
|
|
|
def run(cmd: str, *args, input: str = None) -> dict:
|
|
"""Run a gbrain command, return parsed stdout or raw output."""
|
|
full_cmd = [GBRAIN_BUN, GBRAIN_CLI, cmd, *args]
|
|
kw = {"capture_output": True, "timeout": 60}
|
|
if input:
|
|
kw["input"] = input.encode()
|
|
result = subprocess.run(full_cmd, **kw)
|
|
if result.returncode != 0:
|
|
err = result.stderr.decode().strip()
|
|
# gbrain puts errors on stderr, stdout may have partial JSON
|
|
try:
|
|
return json.loads(result.stdout.decode().strip() or "{}")
|
|
except Exception:
|
|
return {"error": err, "returncode": result.returncode}
|
|
try:
|
|
out = result.stdout.decode().strip()
|
|
if not out:
|
|
return {"ok": True}
|
|
return json.loads(out)
|
|
except json.JSONDecodeError:
|
|
return {"raw": out}
|
|
|
|
|
|
def ask(question: str, expand: bool = False) -> str:
|
|
"""Ask a question — returns text answer from gbrain."""
|
|
args = ["ask"]
|
|
if not expand:
|
|
args.append("--no-expand")
|
|
args.append("--")
|
|
args.append(question)
|
|
r = run(*args)
|
|
return r.get("answer", r.get("raw", str(r)))
|
|
|
|
|
|
def put(slug: str, content: str) -> bool:
|
|
"""Write/update a page in gbrain. Returns True on success."""
|
|
r = run("put", slug, input=content)
|
|
return r.get("status", r.get("returncode", -1)) in ("created", "created_or_updated", 0)
|
|
|
|
|
|
def add_timeline_entry(slug: str, timeline_date: str = None, summary: str = None, source: str = "pipeline") -> bool:
|
|
"""
|
|
Insert a timeline entry directly into gbrain timeline_entries table,
|
|
bypassing the INNER JOIN constraint in addTimelineEntriesBatch.
|
|
|
|
Args:
|
|
slug: gbrain page slug (e.g. 'discord/2026-05-02-abc123')
|
|
timeline_date: ISO date string (YYYY-MM-DD). Defaults to today.
|
|
summary: Timeline entry summary text. Defaults to 'Page created'.
|
|
source: Source tag (default 'pipeline')
|
|
|
|
Returns:
|
|
True if inserted (or already exists), False on error.
|
|
"""
|
|
if not HAS_PG:
|
|
return False
|
|
try:
|
|
conn = _pg_connect()
|
|
cur = conn.cursor()
|
|
# Resolve slug -> page_id
|
|
cur.execute("SELECT id FROM pages WHERE slug = %s", (slug,))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
# page doesn't exist yet — insert page first
|
|
title = slug.split("/")[-1].replace("-", " ").replace("_", " ").title()
|
|
cur.execute(
|
|
"INSERT INTO pages (slug, title, type) VALUES (%s, %s, 'concept') ON CONFLICT DO NOTHING",
|
|
(slug, title)
|
|
)
|
|
cur.execute("SELECT id FROM pages WHERE slug = %s", (slug,))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
return False
|
|
|
|
page_id = row[0]
|
|
entry_date = timeline_date or datetime.now().strftime("%Y-%m-%d")
|
|
entry_summary = summary or "Page created"
|
|
|
|
cur.execute("""
|
|
INSERT INTO timeline_entries (page_id, date, source, summary, detail)
|
|
VALUES (%s, %s, %s, %s, '')
|
|
ON CONFLICT (page_id, date, summary) DO NOTHING
|
|
""", (page_id, entry_date, source, entry_summary))
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
return True
|
|
except Exception as e:
|
|
return False
|
|
|
|
|
|
def put_with_timeline(slug: str, content: str, timeline_date: str = None, summary: str = None) -> bool:
|
|
"""
|
|
Write a page to gbrain AND add a timeline entry in one call.
|
|
This is the preferred ingestion method for all pipeline scripts.
|
|
|
|
Args:
|
|
slug: gbrain page slug
|
|
content: page body markdown
|
|
timeline_date: ISO date (YYYY-MM-DD). Defaults to today.
|
|
summary: timeline entry summary. Defaults to 'Page created via pipeline'.
|
|
|
|
Returns:
|
|
True if both page write and timeline insert succeeded.
|
|
"""
|
|
ok = put(slug, content)
|
|
if ok:
|
|
entry_summary = summary or f"Page created via pipeline: {slug}"
|
|
add_timeline_entry(slug, timeline_date, entry_summary)
|
|
return ok
|
|
|
|
|
|
def get(slug: str) -> dict:
|
|
"""Get a page by slug."""
|
|
return run("get", slug)
|
|
|
|
|
|
def search(query: str, n: int = 5, json_out: bool = True) -> list:
|
|
"""Keyword search, returns list of results."""
|
|
args = ["search", query]
|
|
if json_out:
|
|
args.append("--json")
|
|
r = run(*args)
|
|
if isinstance(r, list):
|
|
return r[:n]
|
|
return r.get("results", [])[:n]
|
|
|
|
|
|
def query(question: str, n: int = 5, json_out: bool = True) -> list:
|
|
"""Hybrid semantic search, returns ranked results."""
|
|
args = ["query", question, f"-n={n}"]
|
|
if json_out:
|
|
args.append("--json")
|
|
r = run(*args)
|
|
if isinstance(r, list):
|
|
return r[:n]
|
|
return r.get("results", [])[:n]
|
|
|
|
|
|
def list_pages(tag: str = None, limit: int = 20) -> list:
|
|
"""List pages, optionally filtered by tag."""
|
|
args = ["list", f"--limit={limit}"]
|
|
if tag:
|
|
args.extend(["--tag", tag])
|
|
r = run(*args)
|
|
if isinstance(r, list):
|
|
return r
|
|
return []
|
|
|
|
|
|
def delete(slug: str) -> bool:
|
|
"""Delete a page by slug."""
|
|
r = run("delete", slug)
|
|
return r.get("status") == "deleted" or r.get("returncode") == 0
|
|
|
|
|
|
def stats() -> dict:
|
|
"""Get gbrain stats."""
|
|
return run("stats")
|
|
|
|
|
|
def doctor() -> dict:
|
|
"""Run gbrain doctor health check."""
|
|
return run("doctor")
|
|
|
|
|
|
def embed_target(slug: str) -> bool:
|
|
"""Embed a specific page."""
|
|
r = run("embed", slug)
|
|
return r.get("returncode", -1) == 0
|
|
|
|
|
|
def embed_all() -> bool:
|
|
"""Run embed --all."""
|
|
r = run("embed", "--all")
|
|
return r.get("returncode", -1) == 0
|
|
|
|
|
|
def embed_stale() -> bool:
|
|
"""Run embed --stale for missing embeddings."""
|
|
r = run("embed", "--stale")
|
|
return r.get("returncode", -1) == 0
|
|
|
|
|
|
def link_extract() -> bool:
|
|
"""Extract graph links from all pages."""
|
|
r = run("link-extract")
|
|
return r.get("returncode", -1) == 0
|
|
|
|
|
|
def timeline_extract() -> bool:
|
|
"""Extract timeline entries from all pages."""
|
|
r = run("timeline-extract")
|
|
return r.get("returncode", -1) == 0
|
|
|
|
|
|
# ---- CLI interface ----
|
|
|
|
if __name__ == "__main__":
|
|
if len(sys.argv) < 2:
|
|
print("Usage: gbrain_client.py <cmd> [args]")
|
|
print("Commands: ask, put, get, search, query, list, delete, stats, doctor")
|
|
sys.exit(1)
|
|
|
|
cmd = sys.argv[1]
|
|
|
|
if cmd == "ask":
|
|
if len(sys.argv) < 3:
|
|
print("Usage: gbrain_client.py ask <question>")
|
|
sys.exit(1)
|
|
print(ask(sys.argv[2]))
|
|
|
|
elif cmd == "put":
|
|
if len(sys.argv) < 4:
|
|
print("Usage: gbrain_client.py put <slug> <content_file>")
|
|
sys.exit(1)
|
|
slug = sys.argv[2]
|
|
with open(sys.argv[3]) as f:
|
|
content = f.read()
|
|
ok = put(slug, content)
|
|
print("OK" if ok else f"FAILED")
|
|
|
|
elif cmd == "get":
|
|
if len(sys.argv) < 3:
|
|
print("Usage: gbrain_client.py get <slug>")
|
|
sys.exit(1)
|
|
print(json.dumps(get(sys.argv[2]), indent=2))
|
|
|
|
elif cmd == "search":
|
|
if len(sys.argv) < 3:
|
|
print("Usage: gbrain_client.py search <query> [n=5]")
|
|
sys.exit(1)
|
|
n = int(sys.argv[3]) if len(sys.argv) > 3 else 5
|
|
for r in search(sys.argv[2], n=n):
|
|
score = r.get("score", 0)
|
|
slug = r.get("slug", "?")
|
|
title = r.get("title", r.get("excerpt", "")[:60])
|
|
print(f" [{score:.4f}] {slug}")
|
|
if title:
|
|
print(f" {title}")
|
|
|
|
elif cmd == "query":
|
|
if len(sys.argv) < 3:
|
|
print("Usage: gbrain_client.py query <question> [n=5]")
|
|
sys.exit(1)
|
|
n = int(sys.argv[3]) if len(sys.argv) > 3 else 5
|
|
for r in query(sys.argv[2], n=n):
|
|
score = r.get("score", 0)
|
|
slug = r.get("slug", "?")
|
|
excerpt = r.get("excerpt", "")[:80]
|
|
print(f" [{score:.4f}] {slug}")
|
|
if excerpt:
|
|
print(f" {excerpt}...")
|
|
|
|
elif cmd == "list":
|
|
tag = sys.argv[2] if len(sys.argv) > 2 else None
|
|
limit = int(sys.argv[3]) if len(sys.argv) > 3 else 20
|
|
for p in list_pages(tag=tag, limit=limit):
|
|
print(f" {p}")
|
|
|
|
elif cmd == "delete":
|
|
if len(sys.argv) < 3:
|
|
print("Usage: gbrain_client.py delete <slug>")
|
|
sys.exit(1)
|
|
ok = delete(sys.argv[2])
|
|
print("Deleted" if ok else "Failed")
|
|
|
|
elif cmd == "stats":
|
|
print(json.dumps(stats(), indent=2))
|
|
|
|
elif cmd == "doctor":
|
|
print(json.dumps(doctor(), indent=2))
|
|
|
|
else:
|
|
print(f"Unknown command: {cmd}")
|
|
sys.exit(1)
|