258 lines
9.6 KiB
Python
258 lines
9.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Prowler consumer daemon.
|
|
Watches the output directory for new Prowler JSON files, parses findings,
|
|
stores them in SQLite, and enqueues remediation actions.
|
|
"""
|
|
import argparse
|
|
import glob
|
|
import json
|
|
import logging
|
|
import os
|
|
import sqlite3
|
|
import sys
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
# Add consumer to path
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
|
|
from models import Finding, RemediationAction
|
|
from remediation.remediator import determine_action, apply_action
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
|
)
|
|
logger = logging.getLogger("consumer")
|
|
|
|
|
|
class FindingsDB:
|
|
"""SQLite-backed finding store."""
|
|
|
|
def __init__(self, db_path: str = None):
|
|
if db_path is None:
|
|
db_path = Path(__file__).parent / "findings.db"
|
|
self.db_path = str(db_path)
|
|
self._init_db()
|
|
|
|
def _init_db(self):
|
|
conn = sqlite3.connect(self.db_path)
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS findings (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
check_id TEXT, result TEXT, severity TEXT,
|
|
resource_id TEXT, resource_arn TEXT, region TEXT,
|
|
compliance_json TEXT, description TEXT,
|
|
check_title TEXT, service TEXT,
|
|
status TEXT DEFAULT 'new',
|
|
discovered_at TEXT, remediated_at TEXT,
|
|
UNIQUE(check_id, resource_id, discovered_at)
|
|
)
|
|
""")
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS remediation_actions (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
finding_id INTEGER, action_type TEXT, resource_id TEXT,
|
|
dry_run INTEGER, applicable INTEGER, reason TEXT,
|
|
status TEXT DEFAULT 'pending',
|
|
created_at TEXT, applied_at TEXT, result_json TEXT,
|
|
FOREIGN KEY (finding_id) REFERENCES findings(id)
|
|
)
|
|
""")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_finding_status ON findings(status)")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_action_status ON remediation_actions(status)")
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def insert_finding(self, finding: Finding) -> int:
|
|
conn = sqlite3.connect(self.db_path)
|
|
try:
|
|
cur = conn.execute("""
|
|
INSERT OR IGNORE INTO findings
|
|
(check_id, result, severity, resource_id, resource_arn, region,
|
|
compliance_json, description, check_title, service, status, discovered_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
finding.check_id, finding.result, finding.severity,
|
|
finding.resource_id, finding.resource_arn, finding.region,
|
|
json.dumps(finding.compliance), finding.description,
|
|
finding.check_title, finding.service, finding.status,
|
|
finding.discovered_at,
|
|
))
|
|
conn.commit()
|
|
return cur.lastrowid or 0
|
|
finally:
|
|
conn.close()
|
|
|
|
def insert_action(self, action: RemediationAction, finding_id: int) -> int:
|
|
conn = sqlite3.connect(self.db_path)
|
|
try:
|
|
cur = conn.execute("""
|
|
INSERT INTO remediation_actions
|
|
(finding_id, action_type, resource_id, dry_run, applicable, reason, status, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
finding_id, action.action_type, action.resource_id,
|
|
int(action.dry_run), int(action.applicable), action.reason,
|
|
action.status, action.created_at,
|
|
))
|
|
conn.commit()
|
|
return cur.lastrowid or 0
|
|
finally:
|
|
conn.close()
|
|
|
|
def get_pending_actions(self, dry_run: bool = True) -> list:
|
|
conn = sqlite3.connect(self.db_path)
|
|
try:
|
|
rows = conn.execute("""
|
|
SELECT ra.id, ra.finding_id, ra.action_type, ra.resource_id,
|
|
ra.dry_run, ra.reason, f.severity
|
|
FROM remediation_actions ra
|
|
JOIN findings f ON ra.finding_id = f.id
|
|
WHERE ra.status = 'pending' AND ra.dry_run = ?
|
|
ORDER BY f.severity DESC, ra.id ASC
|
|
""", (int(dry_run),)).fetchall()
|
|
return rows
|
|
finally:
|
|
conn.close()
|
|
|
|
def mark_action_applied(self, action_id: int, result_json: str, applied_at: str):
|
|
conn = sqlite3.connect(self.db_path)
|
|
conn.execute("""
|
|
UPDATE remediation_actions
|
|
SET status = 'applied', result_json = ?, applied_at = ?
|
|
WHERE id = ?
|
|
""", (result_json, applied_at, action_id))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def mark_finding_remediated(self, finding_id: int):
|
|
conn = sqlite3.connect(self.db_path)
|
|
conn.execute("""
|
|
UPDATE findings SET status = 'remediated', remediated_at = ?
|
|
WHERE id = ?
|
|
""", (datetime.utcnow().isoformat(), finding_id))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def count_by_status(self) -> dict:
|
|
conn = sqlite3.connect(self.db_path)
|
|
try:
|
|
cur = conn.execute("""
|
|
SELECT status, COUNT(*) FROM findings GROUP BY status
|
|
""")
|
|
return dict(cur.fetchall())
|
|
finally:
|
|
conn.close()
|
|
|
|
def count_by_severity(self) -> dict:
|
|
conn = sqlite3.connect(self.db_path)
|
|
try:
|
|
cur = conn.execute("""
|
|
SELECT severity, result, COUNT(*)
|
|
FROM findings
|
|
GROUP BY severity, result
|
|
ORDER BY severity
|
|
""")
|
|
return {(row[0], row[1]): row[2] for row in cur.fetchall()}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def process_file(json_path: str, db: FindingsDB, dry_run: bool = True) -> int:
|
|
"""Parse a Prowler JSON file and store findings + enqueue actions."""
|
|
logger.info(f"Processing {json_path}")
|
|
count = 0
|
|
with open(json_path) as f:
|
|
data = json.load(f)
|
|
|
|
# Prowler v5 JSON is a list of findings
|
|
findings_list = data if isinstance(data, list) else data.get("findings", [])
|
|
for item in findings_list:
|
|
finding = Finding.from_dict(item)
|
|
if finding.result != "FAIL":
|
|
continue
|
|
finding_id = db.insert_finding(finding)
|
|
if finding_id == 0:
|
|
# Already exists, skip
|
|
continue
|
|
count += 1
|
|
action = determine_action(finding, dry_run=dry_run)
|
|
if action:
|
|
action_id = db.insert_action(action, finding_id)
|
|
logger.info(f" Finding {finding.check_id} on {finding.resource_id} → action {action.action_type} (id={action_id})")
|
|
return count
|
|
|
|
|
|
def run_daemon(output_dir: str, db: FindingsDB, poll_interval: int = 30, dry_run: bool = True):
|
|
"""Watch output_dir for new JSON files and process them."""
|
|
logger.info(f"Starting daemon — watching {output_dir} every {poll_interval}s, dry_run={dry_run}")
|
|
processed = set()
|
|
while True:
|
|
for path in glob.glob(os.path.join(output_dir, "prowler-findings-*.json")):
|
|
if path not in processed:
|
|
try:
|
|
n = process_file(path, db, dry_run=dry_run)
|
|
if n > 0:
|
|
logger.info(f" → {n} new FAIL findings stored from {os.path.basename(path)}")
|
|
processed.add(path)
|
|
except Exception as e:
|
|
logger.error(f"Error processing {path}: {e}")
|
|
time.sleep(poll_interval)
|
|
|
|
|
|
def run_once(output_dir: str, db: FindingsDB, dry_run: bool = True) -> int:
|
|
"""Process all existing files in output_dir once."""
|
|
total = 0
|
|
for path in sorted(glob.glob(os.path.join(output_dir, "prowler-findings-*.json"))):
|
|
try:
|
|
n = process_file(path, db, dry_run=dry_run)
|
|
total += n
|
|
except Exception as e:
|
|
logger.error(f"Error processing {path}: {e}")
|
|
return total
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Prowler findings consumer")
|
|
parser.add_argument("--output-dir", "-o", default="./output",
|
|
help="Directory containing Prowler JSON output files")
|
|
parser.add_argument("--db", default=None,
|
|
help="Path to SQLite findings database")
|
|
parser.add_argument("--daemon", "-d", action="store_true",
|
|
help="Run as daemon (poll for new files)")
|
|
parser.add_argument("--poll-interval", type=int, default=30,
|
|
help="Seconds between polls in daemon mode")
|
|
parser.add_argument("--dry-run", action="store_true", default=True,
|
|
help="Only log remediation actions (default)")
|
|
parser.add_argument("--apply", action="store_true",
|
|
help="Actually apply remediation changes to AWS")
|
|
parser.add_argument("--stats", "-s", action="store_true",
|
|
help="Print finding statistics and exit")
|
|
args = parser.parse_args()
|
|
|
|
output_dir = Path(args.output_dir).expanduser()
|
|
output_dir.mkdir(exist_ok=True)
|
|
|
|
db = FindingsDB(args.db)
|
|
dry_run = not args.apply
|
|
|
|
if args.stats:
|
|
counts = db.count_by_status()
|
|
print("Findings by status:", counts)
|
|
sev = db.count_by_severity()
|
|
print("Findings by severity/result:", sev)
|
|
return
|
|
|
|
if args.daemon:
|
|
run_daemon(str(output_dir), db, poll_interval=args.poll_interval, dry_run=dry_run)
|
|
else:
|
|
n = run_once(str(output_dir), db, dry_run=dry_run)
|
|
logger.info(f"Done — processed {n} new FAIL findings")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|