Files
2026-05-08 17:45:30 -05:00

73 lines
2.6 KiB
Python

#!/usr/bin/env python3
"""
Remediation daemon.
Polls the findings DB for pending remediation actions and applies them.
"""
import argparse
import logging
import sys
import time
from datetime import datetime
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from consumer.prowler_consumer import FindingsDB
from consumer.remediation.remediator import apply_action
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("remediation-daemon")
def run_daemon(db_path: str, poll_interval: int = 60, dry_run: bool = True):
logger.info(f"Remediation daemon starting — dry_run={dry_run}, poll_interval={poll_interval}s")
db = FindingsDB(db_path)
applied_count = 0
while True:
pending = db.get_pending_actions(dry_run=dry_run)
for row in pending:
action_id, finding_id, action_type, resource_id, dry_run_flag, reason, severity = row
logger.info(f"[{severity}] {action_type} on {resource_id}")
# Build a minimal action object
from consumer.models import RemediationAction
import json
action = RemediationAction(
action_type=action_type,
resource_id=resource_id,
dry_run=bool(dry_run_flag),
applicable=True,
reason=reason,
status="pending",
)
result = apply_action(action)
db.mark_action_applied(action_id, result.result_json or "{}", datetime.utcnow().isoformat())
if result.status == "applied":
db.mark_finding_remediated(finding_id)
applied_count += 1
logger.info(f" → Applied: {result.result_json}")
else:
logger.warning(f" → Failed: {result.result_json}")
time.sleep(poll_interval)
def main():
parser = argparse.ArgumentParser(description="Remediation action daemon")
parser.add_argument("--db", default=None, help="Path to findings SQLite DB")
parser.add_argument("--poll-interval", type=int, default=60)
parser.add_argument("--dry-run", action="store_true", default=True)
parser.add_argument("--apply", action="store_true", help="Actually apply changes (default is dry-run)")
args = parser.parse_args()
if args.db is None:
args.db = str(Path(__file__).parent.parent / "consumer" / "findings.db")
run_daemon(args.db, poll_interval=args.poll_interval, dry_run=not args.apply)
if __name__ == "__main__":
main()