Files
phi-scanner/variant_event_consumer.py
Hermes Agent bd6ba4ed1b Add greysec-tools pipeline scripts
- fibratus_rabbitmq_bridge.py
- variant_event_consumer.py
- start_malware_pipeline.sh
- pre-flight-vm-check.sh
- email_to_crm.py
- prowler_csv_to_supabase.py
2026-05-09 00:13:30 -05:00

205 lines
6.5 KiB
Python

#!/usr/bin/env python3
"""
variant_event_consumer.py
Consumes 'whiskers.fibratus' events from RabbitMQ,
upserts variant_run + execution_events into Supabase PostgreSQL.
Usage:
python variant_event_consumer.py --db-host localhost --db-port 5432 \
--db-name greysecthreat --db-user greysec --db-pass <pass>
Dependencies: pip install pika psycopg2-binary
"""
import json
import argparse
import pika
import psycopg2
from psycopg2.extras import execute_values
from datetime import datetime, timezone
UPSERT_VARIANT_RUN = """
INSERT INTO variant_runs (variant_id, vm_hostname, run_started_at, status, raw_meta)
VALUES %s
ON CONFLICT (variant_id, vm_hostname, run_started_at)
DO UPDATE SET
status = EXCLUDED.status,
raw_meta = EXCLUDED.raw_meta,
updated_at = NOW()
RETURNING id;
"""
UPSERT_EVENTS = """
INSERT INTO execution_events
(variant_run_id, event_time, event_id, fibratus_data, recorded_at)
VALUES %s
ON CONFLICT DO NOTHING;
"""
def get_variant_run_id(conn, variant_id, vm_hostname, run_started_at, status, raw_meta):
"""Upsert variant_runs, return the DB id."""
with conn.cursor() as cur:
cur.execute(UPSERT_VARIANT_RUN, [
(variant_id, vm_hostname, run_started_at, status, json.dumps(raw_meta))
])
row = cur.fetchone()
conn.commit()
return row[0] if row else None
def insert_events(conn, variant_run_id, events, recorded_at):
"""Bulk upsert execution_events. Skips events that aren't dicts."""
if not events or variant_run_id is None:
return
rows = []
for e in events:
if not isinstance(e, dict):
print(f"[consumer] skipping non-dict event: {type(e).__name__}")
continue
rows.append((
variant_run_id,
e.get("time_created"),
e.get("event_id"),
json.dumps(e.get("data", "")),
recorded_at,
))
if not rows:
return
with conn.cursor() as cur:
execute_values(cur, UPSERT_EVENTS, rows)
conn.commit()
def process_message(conn, body):
"""Parse JSON, upsert variant_run, insert events."""
try:
msg = json.loads(body.decode("utf-8"))
except Exception as e:
print(f"[consumer] JSON parse error: {e}")
return
if not isinstance(msg, dict):
print(f"[consumer] skipping non-dict message type={type(msg).__name__}")
return
meta = msg.get("_meta", {})
fibratus_raw = msg.get("fibratus", [])
# Handle every possible fibratus field type
if isinstance(fibratus_raw, list):
fibratus_events = fibratus_raw
elif isinstance(fibratus_raw, dict):
# Single event wrapped as dict
fibratus_events = [fibratus_raw]
else:
# Bare string or other type — not a valid event list
print(f"[consumer] skipping message with fibratus type={type(fibratus_raw).__name__}")
return
if not fibratus_events:
return
# Extract metadata with full isinstance guards
if isinstance(meta, dict):
variant_id = meta.get("variant_id", "unknown")
hostname = meta.get("hostname", "unknown")
fetched_at = meta.get("fetched_at", datetime.now(timezone.utc).isoformat())
else:
variant_id = "unknown"
hostname = "unknown"
fetched_at = datetime.now(timezone.utc).isoformat()
# Get run_started_at from first event timestamp
timestamps = []
for e in fibratus_events:
if isinstance(e, dict) and e.get("time_created"):
ts = e.get("time_created")
if isinstance(ts, str):
timestamps.append(ts)
run_started = min(timestamps) if timestamps else fetched_at
with conn:
vid = get_variant_run_id(
conn,
variant_id=variant_id,
vm_hostname=hostname,
run_started_at=run_started,
status="completed",
raw_meta=meta,
)
insert_events(conn, vid, fibratus_events, fetched_at)
print(f"[consumer] variant={variant_id} host={hostname} "
f"events={len(fibratus_events)}")
def main():
parser = argparse.ArgumentParser(description="Variant event consumer")
parser.add_argument("--db-host", default="localhost")
parser.add_argument("--db-port", type=int, default=5432)
parser.add_argument("--db-name", default="greysecthreat")
parser.add_argument("--db-user", default="greysec")
parser.add_argument("--db-pass", required=True)
parser.add_argument("--rabbitmq-host", default="localhost")
parser.add_argument("--rabbitmq-user", default="litterbox_pipeline")
parser.add_argument("--rabbitmq-pass", required=True)
parser.add_argument("--rabbitmq-vhost", default="litterbox")
parser.add_argument("--queue", default="whiskers.events")
args = parser.parse_args()
# DB connection
conn = psycopg2.connect(
host=args.db_host,
port=args.db_port,
dbname=args.db_name,
user=args.db_user,
password=args.db_pass,
)
conn.autocommit = False
print(f"[consumer] Connected to PostgreSQL {args.db_host}/{args.db_name}")
# RabbitMQ connection
credentials = pika.PlainCredentials(args.rabbitmq_user, args.rabbitmq_pass)
parameters = pika.ConnectionParameters(
host=args.rabbitmq_host,
virtual_host=args.rabbitmq_vhost,
credentials=credentials,
heartbeat=60,
)
def callback(ch, method, properties, body):
try:
process_message(conn, body)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"[consumer] processing error: {e}")
# requeue=False breaks the crash-requeue loop for malformed messages
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# Ensure queue exists and is bound
channel.queue_declare(queue=args.queue, durable=True)
# Bind to whiskers.* routing key
channel.queue_bind(exchange="litterbox.events",
queue=args.queue,
routing_key="whiskers.*")
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue=args.queue, on_message_callback=callback)
print(f"[consumer] Listening on queue '{args.queue}'...")
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
finally:
conn.close()
connection.close()
if __name__ == "__main__":
main()