Files
2026-05-08 17:45:30 -05:00

96 lines
3.1 KiB
Python

"""Dataclass models for Prowler findings and remediation actions."""
from dataclasses import dataclass, field, asdict
from datetime import datetime
from typing import Optional
import json
@dataclass
class Finding:
"""Represents a single Prowler security finding."""
check_id: str
result: str # PASS, FAIL, MUTED
severity: str # critical, high, medium, low, informational
resource_id: str
resource_arn: str
region: str
compliance: dict # {framework: [controls]}
description: str
check_title: str
service: str
# Optional fields that may not be in older Prowler output
status: str = "new" # new, remediated, ignored
discovered_at: Optional[str] = None
remediated_at: Optional[str] = None
def __post_init__(self):
if self.discovered_at is None:
self.discovered_at = datetime.utcnow().isoformat()
# Normalize severity to lowercase
if self.severity:
self.severity = self.severity.lower()
# Normalize result to uppercase
if self.result:
self.result = self.result.upper()
def to_dict(self) -> dict:
return asdict(self)
@classmethod
def from_dict(cls, data: dict) -> "Finding":
"""Parse a Prowler JSON finding into a Finding dataclass."""
# Prowler v5 JSON fields
return cls(
check_id=data.get("check_id", ""),
result=data.get("result", "UNKNOWN"),
severity=data.get("severity", "informational"),
resource_id=data.get("resource_id", ""),
resource_arn=data.get("resource_arn", ""),
region=data.get("region", ""),
compliance=data.get("compliance", {}),
description=data.get("description", ""),
check_title=data.get("check_title", ""),
service=data.get("service", ""),
)
@staticmethod
def severity_order() -> list:
"""Return severity levels in order of priority for filtering."""
return ["critical", "high", "medium", "low", "informational"]
def severity_index(self) -> int:
"""Return numeric index for severity (lower = more severe)."""
try:
return self.severity_order().index(self.severity.lower())
except ValueError:
return len(self.severity_order())
@dataclass
class RemediationAction:
"""Represents a remediation action to be applied (or already applied) to a finding."""
action_type: str # s3_block_public, iam_enforce_mfa, rds_encrypt, etc.
resource_id: str
dry_run: bool = True
applicable: bool = True
reason: str = ""
# Tracking fields
status: str = "pending" # pending, applied, failed, skipped
created_at: Optional[str] = None
applied_at: Optional[str] = None
result_json: Optional[str] = None
def __post_init__(self):
if self.created_at is None:
self.created_at = datetime.utcnow().isoformat()
if self.result_json is None:
self.result_json = "{}"
def to_dict(self) -> dict:
return asdict(self)
@classmethod
def from_dict(cls, data: dict) -> "RemediationAction":
return cls(**data)