Files
hermes-agent/gbrain_client.py

316 lines
9.5 KiB
Python

#!/usr/bin/env python3
"""
Lightweight gbrain client for Hermes pipeline integration.
Wraps gbrain CLI commands with proper arg handling and JSON parsing.
"""
import subprocess
import os
import json
import sys
import re
from pathlib import Path
from datetime import datetime, date
try:
import psycopg2
HAS_PG = True
except ImportError:
HAS_PG = False
GBRAIN_BUN = str(Path.home() / ".bun" / "bin" / "bun")
GBRAIN_CLI = str(Path.home() / ".bun" / "install" / "global" / "node_modules" / "gbrain" / "src" / "cli.ts")
OLLAMA_HOST = os.environ.get("OLLAMA_HOST", "http://localhost:11434")
# gbrain PostgreSQL connection (Docker container: gbrain-postgres on port 5433)
GBRAIN_DB = {
"host": "localhost",
"port": 5433,
"user": "gbrain",
"dbname": "gbrain",
"password": "gbrain", # Docker container POSTGRES_PASSWORD
}
def _pg_connect():
if not HAS_PG:
raise RuntimeError("psycopg2 not installed — run: pip install psycopg2-binary")
return psycopg2.connect(**GBRAIN_DB)
def run(cmd: str, *args, input: str = None) -> dict:
"""Run a gbrain command, return parsed stdout or raw output."""
full_cmd = [GBRAIN_BUN, GBRAIN_CLI, cmd, *args]
kw = {"capture_output": True, "timeout": 60}
if input:
kw["input"] = input.encode()
result = subprocess.run(full_cmd, **kw)
if result.returncode != 0:
err = result.stderr.decode().strip()
# gbrain puts errors on stderr, stdout may have partial JSON
try:
return json.loads(result.stdout.decode().strip() or "{}")
except Exception:
return {"error": err, "returncode": result.returncode}
try:
out = result.stdout.decode().strip()
if not out:
return {"ok": True}
return json.loads(out)
except json.JSONDecodeError:
return {"raw": out}
def ask(question: str, expand: bool = False) -> str:
"""Ask a question — returns text answer from gbrain."""
args = ["ask"]
if not expand:
args.append("--no-expand")
args.append("--")
args.append(question)
r = run(*args)
return r.get("answer", r.get("raw", str(r)))
def put(slug: str, content: str) -> bool:
"""Write/update a page in gbrain. Returns True on success."""
r = run("put", slug, input=content)
return r.get("status", r.get("returncode", -1)) in ("created", "created_or_updated", 0)
def add_timeline_entry(slug: str, timeline_date: str = None, summary: str = None, source: str = "pipeline") -> bool:
"""
Insert a timeline entry directly into gbrain timeline_entries table,
bypassing the INNER JOIN constraint in addTimelineEntriesBatch.
Args:
slug: gbrain page slug (e.g. 'discord/2026-05-02-abc123')
timeline_date: ISO date string (YYYY-MM-DD). Defaults to today.
summary: Timeline entry summary text. Defaults to 'Page created'.
source: Source tag (default 'pipeline')
Returns:
True if inserted (or already exists), False on error.
"""
if not HAS_PG:
return False
try:
conn = _pg_connect()
cur = conn.cursor()
# Resolve slug -> page_id
cur.execute("SELECT id FROM pages WHERE slug = %s", (slug,))
row = cur.fetchone()
if not row:
# page doesn't exist yet — insert page first
title = slug.split("/")[-1].replace("-", " ").replace("_", " ").title()
cur.execute(
"INSERT INTO pages (slug, title, type) VALUES (%s, %s, 'concept') ON CONFLICT DO NOTHING",
(slug, title)
)
cur.execute("SELECT id FROM pages WHERE slug = %s", (slug,))
row = cur.fetchone()
if not row:
return False
page_id = row[0]
entry_date = timeline_date or datetime.now().strftime("%Y-%m-%d")
entry_summary = summary or "Page created"
cur.execute("""
INSERT INTO timeline_entries (page_id, date, source, summary, detail)
VALUES (%s, %s, %s, %s, '')
ON CONFLICT (page_id, date, summary) DO NOTHING
""", (page_id, entry_date, source, entry_summary))
conn.commit()
cur.close()
conn.close()
return True
except Exception as e:
return False
def put_with_timeline(slug: str, content: str, timeline_date: str = None, summary: str = None) -> bool:
"""
Write a page to gbrain AND add a timeline entry in one call.
This is the preferred ingestion method for all pipeline scripts.
Args:
slug: gbrain page slug
content: page body markdown
timeline_date: ISO date (YYYY-MM-DD). Defaults to today.
summary: timeline entry summary. Defaults to 'Page created via pipeline'.
Returns:
True if both page write and timeline insert succeeded.
"""
ok = put(slug, content)
if ok:
entry_summary = summary or f"Page created via pipeline: {slug}"
add_timeline_entry(slug, timeline_date, entry_summary)
return ok
def get(slug: str) -> dict:
"""Get a page by slug."""
return run("get", slug)
def search(query: str, n: int = 5, json_out: bool = True) -> list:
"""Keyword search, returns list of results."""
args = ["search", query]
if json_out:
args.append("--json")
r = run(*args)
if isinstance(r, list):
return r[:n]
return r.get("results", [])[:n]
def query(question: str, n: int = 5, json_out: bool = True) -> list:
"""Hybrid semantic search, returns ranked results."""
args = ["query", question, f"-n={n}"]
if json_out:
args.append("--json")
r = run(*args)
if isinstance(r, list):
return r[:n]
return r.get("results", [])[:n]
def list_pages(tag: str = None, limit: int = 20) -> list:
"""List pages, optionally filtered by tag."""
args = ["list", f"--limit={limit}"]
if tag:
args.extend(["--tag", tag])
r = run(*args)
if isinstance(r, list):
return r
return []
def delete(slug: str) -> bool:
"""Delete a page by slug."""
r = run("delete", slug)
return r.get("status") == "deleted" or r.get("returncode") == 0
def stats() -> dict:
"""Get gbrain stats."""
return run("stats")
def doctor() -> dict:
"""Run gbrain doctor health check."""
return run("doctor")
def embed_target(slug: str) -> bool:
"""Embed a specific page."""
r = run("embed", slug)
return r.get("returncode", -1) == 0
def embed_all() -> bool:
"""Run embed --all."""
r = run("embed", "--all")
return r.get("returncode", -1) == 0
def embed_stale() -> bool:
"""Run embed --stale for missing embeddings."""
r = run("embed", "--stale")
return r.get("returncode", -1) == 0
def link_extract() -> bool:
"""Extract graph links from all pages."""
r = run("link-extract")
return r.get("returncode", -1) == 0
def timeline_extract() -> bool:
"""Extract timeline entries from all pages."""
r = run("timeline-extract")
return r.get("returncode", -1) == 0
# ---- CLI interface ----
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: gbrain_client.py <cmd> [args]")
print("Commands: ask, put, get, search, query, list, delete, stats, doctor")
sys.exit(1)
cmd = sys.argv[1]
if cmd == "ask":
if len(sys.argv) < 3:
print("Usage: gbrain_client.py ask <question>")
sys.exit(1)
print(ask(sys.argv[2]))
elif cmd == "put":
if len(sys.argv) < 4:
print("Usage: gbrain_client.py put <slug> <content_file>")
sys.exit(1)
slug = sys.argv[2]
with open(sys.argv[3]) as f:
content = f.read()
ok = put(slug, content)
print("OK" if ok else f"FAILED")
elif cmd == "get":
if len(sys.argv) < 3:
print("Usage: gbrain_client.py get <slug>")
sys.exit(1)
print(json.dumps(get(sys.argv[2]), indent=2))
elif cmd == "search":
if len(sys.argv) < 3:
print("Usage: gbrain_client.py search <query> [n=5]")
sys.exit(1)
n = int(sys.argv[3]) if len(sys.argv) > 3 else 5
for r in search(sys.argv[2], n=n):
score = r.get("score", 0)
slug = r.get("slug", "?")
title = r.get("title", r.get("excerpt", "")[:60])
print(f" [{score:.4f}] {slug}")
if title:
print(f" {title}")
elif cmd == "query":
if len(sys.argv) < 3:
print("Usage: gbrain_client.py query <question> [n=5]")
sys.exit(1)
n = int(sys.argv[3]) if len(sys.argv) > 3 else 5
for r in query(sys.argv[2], n=n):
score = r.get("score", 0)
slug = r.get("slug", "?")
excerpt = r.get("excerpt", "")[:80]
print(f" [{score:.4f}] {slug}")
if excerpt:
print(f" {excerpt}...")
elif cmd == "list":
tag = sys.argv[2] if len(sys.argv) > 2 else None
limit = int(sys.argv[3]) if len(sys.argv) > 3 else 20
for p in list_pages(tag=tag, limit=limit):
print(f" {p}")
elif cmd == "delete":
if len(sys.argv) < 3:
print("Usage: gbrain_client.py delete <slug>")
sys.exit(1)
ok = delete(sys.argv[2])
print("Deleted" if ok else "Failed")
elif cmd == "stats":
print(json.dumps(stats(), indent=2))
elif cmd == "doctor":
print(json.dumps(doctor(), indent=2))
else:
print(f"Unknown command: {cmd}")
sys.exit(1)