bd6ba4ed1b
- fibratus_rabbitmq_bridge.py - variant_event_consumer.py - start_malware_pipeline.sh - pre-flight-vm-check.sh - email_to_crm.py - prowler_csv_to_supabase.py
205 lines
6.5 KiB
Python
205 lines
6.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
variant_event_consumer.py
|
|
Consumes 'whiskers.fibratus' events from RabbitMQ,
|
|
upserts variant_run + execution_events into Supabase PostgreSQL.
|
|
|
|
Usage:
|
|
python variant_event_consumer.py --db-host localhost --db-port 5432 \
|
|
--db-name greysecthreat --db-user greysec --db-pass <pass>
|
|
|
|
Dependencies: pip install pika psycopg2-binary
|
|
"""
|
|
|
|
import json
|
|
import argparse
|
|
import pika
|
|
import psycopg2
|
|
from psycopg2.extras import execute_values
|
|
from datetime import datetime, timezone
|
|
|
|
|
|
UPSERT_VARIANT_RUN = """
|
|
INSERT INTO variant_runs (variant_id, vm_hostname, run_started_at, status, raw_meta)
|
|
VALUES %s
|
|
ON CONFLICT (variant_id, vm_hostname, run_started_at)
|
|
DO UPDATE SET
|
|
status = EXCLUDED.status,
|
|
raw_meta = EXCLUDED.raw_meta,
|
|
updated_at = NOW()
|
|
RETURNING id;
|
|
"""
|
|
|
|
UPSERT_EVENTS = """
|
|
INSERT INTO execution_events
|
|
(variant_run_id, event_time, event_id, fibratus_data, recorded_at)
|
|
VALUES %s
|
|
ON CONFLICT DO NOTHING;
|
|
"""
|
|
|
|
|
|
def get_variant_run_id(conn, variant_id, vm_hostname, run_started_at, status, raw_meta):
|
|
"""Upsert variant_runs, return the DB id."""
|
|
with conn.cursor() as cur:
|
|
cur.execute(UPSERT_VARIANT_RUN, [
|
|
(variant_id, vm_hostname, run_started_at, status, json.dumps(raw_meta))
|
|
])
|
|
row = cur.fetchone()
|
|
conn.commit()
|
|
return row[0] if row else None
|
|
|
|
|
|
def insert_events(conn, variant_run_id, events, recorded_at):
|
|
"""Bulk upsert execution_events. Skips events that aren't dicts."""
|
|
if not events or variant_run_id is None:
|
|
return
|
|
rows = []
|
|
for e in events:
|
|
if not isinstance(e, dict):
|
|
print(f"[consumer] skipping non-dict event: {type(e).__name__}")
|
|
continue
|
|
rows.append((
|
|
variant_run_id,
|
|
e.get("time_created"),
|
|
e.get("event_id"),
|
|
json.dumps(e.get("data", "")),
|
|
recorded_at,
|
|
))
|
|
if not rows:
|
|
return
|
|
with conn.cursor() as cur:
|
|
execute_values(cur, UPSERT_EVENTS, rows)
|
|
conn.commit()
|
|
|
|
|
|
def process_message(conn, body):
|
|
"""Parse JSON, upsert variant_run, insert events."""
|
|
try:
|
|
msg = json.loads(body.decode("utf-8"))
|
|
except Exception as e:
|
|
print(f"[consumer] JSON parse error: {e}")
|
|
return
|
|
|
|
if not isinstance(msg, dict):
|
|
print(f"[consumer] skipping non-dict message type={type(msg).__name__}")
|
|
return
|
|
|
|
meta = msg.get("_meta", {})
|
|
fibratus_raw = msg.get("fibratus", [])
|
|
|
|
# Handle every possible fibratus field type
|
|
if isinstance(fibratus_raw, list):
|
|
fibratus_events = fibratus_raw
|
|
elif isinstance(fibratus_raw, dict):
|
|
# Single event wrapped as dict
|
|
fibratus_events = [fibratus_raw]
|
|
else:
|
|
# Bare string or other type — not a valid event list
|
|
print(f"[consumer] skipping message with fibratus type={type(fibratus_raw).__name__}")
|
|
return
|
|
|
|
if not fibratus_events:
|
|
return
|
|
|
|
# Extract metadata with full isinstance guards
|
|
if isinstance(meta, dict):
|
|
variant_id = meta.get("variant_id", "unknown")
|
|
hostname = meta.get("hostname", "unknown")
|
|
fetched_at = meta.get("fetched_at", datetime.now(timezone.utc).isoformat())
|
|
else:
|
|
variant_id = "unknown"
|
|
hostname = "unknown"
|
|
fetched_at = datetime.now(timezone.utc).isoformat()
|
|
|
|
# Get run_started_at from first event timestamp
|
|
timestamps = []
|
|
for e in fibratus_events:
|
|
if isinstance(e, dict) and e.get("time_created"):
|
|
ts = e.get("time_created")
|
|
if isinstance(ts, str):
|
|
timestamps.append(ts)
|
|
run_started = min(timestamps) if timestamps else fetched_at
|
|
|
|
with conn:
|
|
vid = get_variant_run_id(
|
|
conn,
|
|
variant_id=variant_id,
|
|
vm_hostname=hostname,
|
|
run_started_at=run_started,
|
|
status="completed",
|
|
raw_meta=meta,
|
|
)
|
|
insert_events(conn, vid, fibratus_events, fetched_at)
|
|
|
|
print(f"[consumer] variant={variant_id} host={hostname} "
|
|
f"events={len(fibratus_events)}")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Variant event consumer")
|
|
parser.add_argument("--db-host", default="localhost")
|
|
parser.add_argument("--db-port", type=int, default=5432)
|
|
parser.add_argument("--db-name", default="greysecthreat")
|
|
parser.add_argument("--db-user", default="greysec")
|
|
parser.add_argument("--db-pass", required=True)
|
|
parser.add_argument("--rabbitmq-host", default="localhost")
|
|
parser.add_argument("--rabbitmq-user", default="litterbox_pipeline")
|
|
parser.add_argument("--rabbitmq-pass", required=True)
|
|
parser.add_argument("--rabbitmq-vhost", default="litterbox")
|
|
parser.add_argument("--queue", default="whiskers.events")
|
|
args = parser.parse_args()
|
|
|
|
# DB connection
|
|
conn = psycopg2.connect(
|
|
host=args.db_host,
|
|
port=args.db_port,
|
|
dbname=args.db_name,
|
|
user=args.db_user,
|
|
password=args.db_pass,
|
|
)
|
|
conn.autocommit = False
|
|
print(f"[consumer] Connected to PostgreSQL {args.db_host}/{args.db_name}")
|
|
|
|
# RabbitMQ connection
|
|
credentials = pika.PlainCredentials(args.rabbitmq_user, args.rabbitmq_pass)
|
|
parameters = pika.ConnectionParameters(
|
|
host=args.rabbitmq_host,
|
|
virtual_host=args.rabbitmq_vhost,
|
|
credentials=credentials,
|
|
heartbeat=60,
|
|
)
|
|
|
|
def callback(ch, method, properties, body):
|
|
try:
|
|
process_message(conn, body)
|
|
ch.basic_ack(delivery_tag=method.delivery_tag)
|
|
except Exception as e:
|
|
print(f"[consumer] processing error: {e}")
|
|
# requeue=False breaks the crash-requeue loop for malformed messages
|
|
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
|
|
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
# Ensure queue exists and is bound
|
|
channel.queue_declare(queue=args.queue, durable=True)
|
|
# Bind to whiskers.* routing key
|
|
channel.queue_bind(exchange="litterbox.events",
|
|
queue=args.queue,
|
|
routing_key="whiskers.*")
|
|
channel.basic_qos(prefetch_count=1)
|
|
channel.basic_consume(queue=args.queue, on_message_callback=callback)
|
|
|
|
print(f"[consumer] Listening on queue '{args.queue}'...")
|
|
try:
|
|
channel.start_consuming()
|
|
except KeyboardInterrupt:
|
|
channel.stop_consuming()
|
|
finally:
|
|
conn.close()
|
|
connection.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|