Files
prowler-ops/consumer/prowler_consumer.py
2026-05-08 17:45:30 -05:00

258 lines
9.6 KiB
Python

#!/usr/bin/env python3
"""
Prowler consumer daemon.
Watches the output directory for new Prowler JSON files, parses findings,
stores them in SQLite, and enqueues remediation actions.
"""
import argparse
import glob
import json
import logging
import os
import sqlite3
import sys
import time
from datetime import datetime
from pathlib import Path
# Add consumer to path
sys.path.insert(0, str(Path(__file__).parent))
from models import Finding, RemediationAction
from remediation.remediator import determine_action, apply_action
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("consumer")
class FindingsDB:
"""SQLite-backed finding store."""
def __init__(self, db_path: str = None):
if db_path is None:
db_path = Path(__file__).parent / "findings.db"
self.db_path = str(db_path)
self._init_db()
def _init_db(self):
conn = sqlite3.connect(self.db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS findings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
check_id TEXT, result TEXT, severity TEXT,
resource_id TEXT, resource_arn TEXT, region TEXT,
compliance_json TEXT, description TEXT,
check_title TEXT, service TEXT,
status TEXT DEFAULT 'new',
discovered_at TEXT, remediated_at TEXT,
UNIQUE(check_id, resource_id, discovered_at)
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS remediation_actions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
finding_id INTEGER, action_type TEXT, resource_id TEXT,
dry_run INTEGER, applicable INTEGER, reason TEXT,
status TEXT DEFAULT 'pending',
created_at TEXT, applied_at TEXT, result_json TEXT,
FOREIGN KEY (finding_id) REFERENCES findings(id)
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_finding_status ON findings(status)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_action_status ON remediation_actions(status)")
conn.commit()
conn.close()
def insert_finding(self, finding: Finding) -> int:
conn = sqlite3.connect(self.db_path)
try:
cur = conn.execute("""
INSERT OR IGNORE INTO findings
(check_id, result, severity, resource_id, resource_arn, region,
compliance_json, description, check_title, service, status, discovered_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
finding.check_id, finding.result, finding.severity,
finding.resource_id, finding.resource_arn, finding.region,
json.dumps(finding.compliance), finding.description,
finding.check_title, finding.service, finding.status,
finding.discovered_at,
))
conn.commit()
return cur.lastrowid or 0
finally:
conn.close()
def insert_action(self, action: RemediationAction, finding_id: int) -> int:
conn = sqlite3.connect(self.db_path)
try:
cur = conn.execute("""
INSERT INTO remediation_actions
(finding_id, action_type, resource_id, dry_run, applicable, reason, status, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
finding_id, action.action_type, action.resource_id,
int(action.dry_run), int(action.applicable), action.reason,
action.status, action.created_at,
))
conn.commit()
return cur.lastrowid or 0
finally:
conn.close()
def get_pending_actions(self, dry_run: bool = True) -> list:
conn = sqlite3.connect(self.db_path)
try:
rows = conn.execute("""
SELECT ra.id, ra.finding_id, ra.action_type, ra.resource_id,
ra.dry_run, ra.reason, f.severity
FROM remediation_actions ra
JOIN findings f ON ra.finding_id = f.id
WHERE ra.status = 'pending' AND ra.dry_run = ?
ORDER BY f.severity DESC, ra.id ASC
""", (int(dry_run),)).fetchall()
return rows
finally:
conn.close()
def mark_action_applied(self, action_id: int, result_json: str, applied_at: str):
conn = sqlite3.connect(self.db_path)
conn.execute("""
UPDATE remediation_actions
SET status = 'applied', result_json = ?, applied_at = ?
WHERE id = ?
""", (result_json, applied_at, action_id))
conn.commit()
conn.close()
def mark_finding_remediated(self, finding_id: int):
conn = sqlite3.connect(self.db_path)
conn.execute("""
UPDATE findings SET status = 'remediated', remediated_at = ?
WHERE id = ?
""", (datetime.utcnow().isoformat(), finding_id))
conn.commit()
conn.close()
def count_by_status(self) -> dict:
conn = sqlite3.connect(self.db_path)
try:
cur = conn.execute("""
SELECT status, COUNT(*) FROM findings GROUP BY status
""")
return dict(cur.fetchall())
finally:
conn.close()
def count_by_severity(self) -> dict:
conn = sqlite3.connect(self.db_path)
try:
cur = conn.execute("""
SELECT severity, result, COUNT(*)
FROM findings
GROUP BY severity, result
ORDER BY severity
""")
return {(row[0], row[1]): row[2] for row in cur.fetchall()}
finally:
conn.close()
def process_file(json_path: str, db: FindingsDB, dry_run: bool = True) -> int:
"""Parse a Prowler JSON file and store findings + enqueue actions."""
logger.info(f"Processing {json_path}")
count = 0
with open(json_path) as f:
data = json.load(f)
# Prowler v5 JSON is a list of findings
findings_list = data if isinstance(data, list) else data.get("findings", [])
for item in findings_list:
finding = Finding.from_dict(item)
if finding.result != "FAIL":
continue
finding_id = db.insert_finding(finding)
if finding_id == 0:
# Already exists, skip
continue
count += 1
action = determine_action(finding, dry_run=dry_run)
if action:
action_id = db.insert_action(action, finding_id)
logger.info(f" Finding {finding.check_id} on {finding.resource_id} → action {action.action_type} (id={action_id})")
return count
def run_daemon(output_dir: str, db: FindingsDB, poll_interval: int = 30, dry_run: bool = True):
"""Watch output_dir for new JSON files and process them."""
logger.info(f"Starting daemon — watching {output_dir} every {poll_interval}s, dry_run={dry_run}")
processed = set()
while True:
for path in glob.glob(os.path.join(output_dir, "prowler-findings-*.json")):
if path not in processed:
try:
n = process_file(path, db, dry_run=dry_run)
if n > 0:
logger.info(f"{n} new FAIL findings stored from {os.path.basename(path)}")
processed.add(path)
except Exception as e:
logger.error(f"Error processing {path}: {e}")
time.sleep(poll_interval)
def run_once(output_dir: str, db: FindingsDB, dry_run: bool = True) -> int:
"""Process all existing files in output_dir once."""
total = 0
for path in sorted(glob.glob(os.path.join(output_dir, "prowler-findings-*.json"))):
try:
n = process_file(path, db, dry_run=dry_run)
total += n
except Exception as e:
logger.error(f"Error processing {path}: {e}")
return total
def main():
parser = argparse.ArgumentParser(description="Prowler findings consumer")
parser.add_argument("--output-dir", "-o", default="./output",
help="Directory containing Prowler JSON output files")
parser.add_argument("--db", default=None,
help="Path to SQLite findings database")
parser.add_argument("--daemon", "-d", action="store_true",
help="Run as daemon (poll for new files)")
parser.add_argument("--poll-interval", type=int, default=30,
help="Seconds between polls in daemon mode")
parser.add_argument("--dry-run", action="store_true", default=True,
help="Only log remediation actions (default)")
parser.add_argument("--apply", action="store_true",
help="Actually apply remediation changes to AWS")
parser.add_argument("--stats", "-s", action="store_true",
help="Print finding statistics and exit")
args = parser.parse_args()
output_dir = Path(args.output_dir).expanduser()
output_dir.mkdir(exist_ok=True)
db = FindingsDB(args.db)
dry_run = not args.apply
if args.stats:
counts = db.count_by_status()
print("Findings by status:", counts)
sev = db.count_by_severity()
print("Findings by severity/result:", sev)
return
if args.daemon:
run_daemon(str(output_dir), db, poll_interval=args.poll_interval, dry_run=dry_run)
else:
n = run_once(str(output_dir), db, dry_run=dry_run)
logger.info(f"Done — processed {n} new FAIL findings")
if __name__ == "__main__":
main()