Files
phi-scanner/prowler_csv_to_supabase.py
Hermes Agent bd6ba4ed1b Add greysec-tools pipeline scripts
- fibratus_rabbitmq_bridge.py
- variant_event_consumer.py
- start_malware_pipeline.sh
- pre-flight-vm-check.sh
- email_to_crm.py
- prowler_csv_to_supabase.py
2026-05-09 00:13:30 -05:00

237 lines
9.7 KiB
Python

#!/usr/bin/env python3
"""
Load Prowler CSV findings into GreySec Supabase PostgreSQL.
Maps AWS account IDs to company UUIDs via config, then upserts findings.
Usage:
python3 prowler_csv_to_supabase.py # load all CSVs
python3 prowler_csv_to_supabase.py --dry-run # preview, no inserts
python3 prowler_csv_to_supabase.py --account 980352155236 # single account
python3 prowler_csv_to_supabase.py --csv /path/to/file.csv --company-id <uuid>
"""
import argparse
import csv
import json
import os
import sys
import urllib.request
import urllib.parse
import urllib.error
from datetime import datetime
from pathlib import Path
# ── Config ────────────────────────────────────────────────────────────────────
API_BASE = "http://localhost:3000"
API_KEY = "greysec-dev-key-2026"
HEADERS = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}
CSV_DIR = Path.home() / "greysec" / "prowler-ops" / "output"
# Map AWS Account UID → (company_id UUID, company_name)
# Populated from companies table on init; can also be set manually.
ACCOUNT_MAP = {
# Keystone Technologies (keystoneechnologies.com) — real AWS environment
"980352155236": ("5f3ddb84-1b67-4d0f-854d-88dfb048b281", "Keystone Technologies"),
# NOTE: 582826565025 (ASloggett-Prog) is Adam's dev account — NOT a client, skip
}
# ── API helpers ───────────────────────────────────────────────────────────────
def api_get(path: str, params: dict = None) -> list | dict | None:
url = f"{API_BASE}{path}"
if params:
url += "?" + urllib.parse.urlencode(params)
req = urllib.request.Request(url, headers=HEADERS)
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return json.loads(resp.read())
except urllib.error.HTTPError as e:
body = e.read().decode()
print(f" HTTP {e.code}: {body[:200]}", file=sys.stderr)
return None
def api_post(path: str, payload: dict) -> dict | None:
url = f"{API_BASE}{path}"
req = urllib.request.Request(url, data=json.dumps(payload).encode(),
headers=HEADERS, method="POST")
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return json.loads(resp.read())
except urllib.error.HTTPError as e:
body = e.read().decode()
if e.code == 409: # conflict = already exists, skip
return None
print(f" HTTP {e.code}: {body[:200]}", file=sys.stderr)
return None
def api_patch(path: str, payload: dict) -> dict | None:
url = f"{API_BASE}{path}"
req = urllib.request.Request(url, data=json.dumps(payload).encode(),
headers=HEADERS, method="PATCH")
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return json.loads(resp.read())
except urllib.error.HTTPError as e:
body = e.read().decode()
print(f" HTTP {e.code}: {body[:200]}", file=sys.stderr)
return None
# ── CSV column indices (set once per file) ────────────────────────────────────
COLS = {}
def parse_cols(header_line: str) -> dict:
parts = header_line.split(";")
return {p.strip(): i for i, p in enumerate(parts)}
# ── Loader ────────────────────────────────────────────────────────────────────
def load_csv(csv_path: Path, company_id: str, company_name: str,
engagement_id: str = None, dry_run: bool = False) -> dict:
stats = {"total": 0, "fail": 0, "pass": 0, "muted": 0,
"inserted": 0, "skipped": 0, "errors": 0}
with open(csv_path, newline="", encoding="utf-8", errors="replace") as f:
reader = csv.reader(f, delimiter=";")
header = next(reader)
col_idx = {p.strip(): i for i, p in enumerate(header)}
def col(name, row, default=""):
try:
v = row[col_idx[name]].strip()
return v if v and v != "None" else default
except (KeyError, IndexError):
return default
for row in reader:
stats["total"] += 1
status = col("STATUS", row).upper()
severity = col("SEVERITY", row).lower()
check_id = col("CHECK_ID", row)
resource = col("RESOURCE_UID", row)
if status == "FAIL":
stats["fail"] += 1
elif status == "PASS":
stats["pass"] += 1
stats["skipped"] += 1
continue # only store FAIL/MUTED findings
elif status == "MUTED":
stats["muted"] += 1
else:
stats["skipped"] += 1
continue
# Build compliance list
compliance_str = col("COMPLIANCE", row)
compliance = [c.strip() for c in compliance_str.split("|") if c.strip()] if compliance_str else []
payload = {
"engagement_id": engagement_id,
"company_id": company_id,
"provider": col("PROVIDER", row) or "aws",
"region": col("REGION", row),
"finding_id": f"{check_id}::{resource}",
"check_result": status,
"severity": severity if severity in ("critical","high","medium","low","informational") else "informational",
"title": col("CHECK_TITLE", row),
"description": col("DESCRIPTION", row),
"compliance_framework": compliance,
"raw_output_path": str(csv_path),
}
if dry_run:
print(f" [DRY] {severity:10s} {status} {check_id}")
else:
result = api_post("/rest/v1/prowler_findings", payload)
if result is not None:
stats["inserted"] += 1
else:
stats["skipped"] += 1
if stats["total"] % 500 == 0:
print(f" ... processed {stats['total']} rows, {stats['fail']} FAILs so far")
return stats
def build_account_map():
"""Pull AWS account IDs from companies table via /rest/v1/companies."""
companies = api_get("/rest/v1/companies", {"limit": 200})
if not companies:
print("Warning: could not fetch companies from API", file=sys.stderr)
return
# companies table doesn't store AWS account IDs — they live in prowler CSV filenames.
# We keep ACCOUNT_MAP as the authoritative source.
print(f"Companies loaded: {len(companies)}")
for c in companies:
print(f" {c.get('id','')[:8]} {c.get('name',''):<30} domain={c.get('domain','')}")
def main():
ap = argparse.ArgumentParser(description="Load Prowler CSV findings into Supabase")
ap.add_argument("--csv", type=Path, help="Path to a specific CSV file")
ap.add_argument("--account", type=str, help="AWS Account UID to process (default: all known)")
ap.add_argument("--company-id", type=str, help="Company UUID (overrides ACCOUNT_MAP)")
ap.add_argument("--dry-run", action="store_true", help="Print findings without inserting")
ap.add_argument("--engagement", type=str, help="Optional engagement_id UUID to tag findings")
ap.add_argument("--verbose", action="store_true")
args = ap.parse_args()
if args.csv and args.company_id:
# Direct single-file mode
csv_path = Path(args.csv)
company_id = args.company_id
company_name = "manual"
print(f"Loading {csv_path} → company {company_id} {'[DRY RUN]' if args.dry_run else ''}")
stats = load_csv(csv_path, company_id, company_name, args.engagement, args.dry_run)
print(f"\nResults: {json.dumps(stats, indent=2)}")
return
# Discover CSVs
if not CSV_DIR.exists():
print(f"CSV directory not found: {CSV_DIR}")
sys.exit(1)
csvs = sorted(CSV_DIR.glob("prowler-output-*.csv"))
if not csvs:
print(f"No prowler CSV files found in {CSV_DIR}")
sys.exit(1)
print(f"Found {len(csvs)} CSV file(s) in {CSV_DIR}")
print(f"Account map: {ACCOUNT_MAP}")
print()
grand = {"total": 0, "fail": 0, "inserted": 0, "skipped": 0, "errors": 0}
for csv_path in csvs:
# Extract account UID from filename: prowler-output-980352155236-20260428195828.csv
parts = csv_path.stem.replace("prowler-output-", "").split("-")
account_uid = parts[0] if parts else ""
if args.account and account_uid != args.account:
continue
if account_uid not in ACCOUNT_MAP:
print(f"Skipping {csv_path.name} — account {account_uid} not in ACCOUNT_MAP")
continue
company_id, company_name = ACCOUNT_MAP[account_uid]
print(f"[{csv_path.name}]")
print(f" account={account_uid} company={company_name} {'[DRY RUN]' if args.dry_run else ''}")
stats = load_csv(csv_path, company_id, company_name, args.engagement, args.dry_run)
print(f"{stats['total']} rows, {stats['fail']} FAILs, {stats['inserted']} inserted, "
f"{stats['skipped']} skipped")
for k in grand:
grand[k] += stats[k]
print()
print(f"Grand total: {grand['total']} rows, {grand['fail']} FAILs, "
f"{grand['inserted']} inserted, {grand['skipped']} skipped")
if __name__ == "__main__":
main()