96 lines
3.1 KiB
Python
96 lines
3.1 KiB
Python
"""Dataclass models for Prowler findings and remediation actions."""
|
|
|
|
from dataclasses import dataclass, field, asdict
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
import json
|
|
|
|
|
|
@dataclass
|
|
class Finding:
|
|
"""Represents a single Prowler security finding."""
|
|
check_id: str
|
|
result: str # PASS, FAIL, MUTED
|
|
severity: str # critical, high, medium, low, informational
|
|
resource_id: str
|
|
resource_arn: str
|
|
region: str
|
|
compliance: dict # {framework: [controls]}
|
|
description: str
|
|
check_title: str
|
|
service: str
|
|
# Optional fields that may not be in older Prowler output
|
|
status: str = "new" # new, remediated, ignored
|
|
discovered_at: Optional[str] = None
|
|
remediated_at: Optional[str] = None
|
|
|
|
def __post_init__(self):
|
|
if self.discovered_at is None:
|
|
self.discovered_at = datetime.utcnow().isoformat()
|
|
# Normalize severity to lowercase
|
|
if self.severity:
|
|
self.severity = self.severity.lower()
|
|
# Normalize result to uppercase
|
|
if self.result:
|
|
self.result = self.result.upper()
|
|
|
|
def to_dict(self) -> dict:
|
|
return asdict(self)
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: dict) -> "Finding":
|
|
"""Parse a Prowler JSON finding into a Finding dataclass."""
|
|
# Prowler v5 JSON fields
|
|
return cls(
|
|
check_id=data.get("check_id", ""),
|
|
result=data.get("result", "UNKNOWN"),
|
|
severity=data.get("severity", "informational"),
|
|
resource_id=data.get("resource_id", ""),
|
|
resource_arn=data.get("resource_arn", ""),
|
|
region=data.get("region", ""),
|
|
compliance=data.get("compliance", {}),
|
|
description=data.get("description", ""),
|
|
check_title=data.get("check_title", ""),
|
|
service=data.get("service", ""),
|
|
)
|
|
|
|
@staticmethod
|
|
def severity_order() -> list:
|
|
"""Return severity levels in order of priority for filtering."""
|
|
return ["critical", "high", "medium", "low", "informational"]
|
|
|
|
def severity_index(self) -> int:
|
|
"""Return numeric index for severity (lower = more severe)."""
|
|
try:
|
|
return self.severity_order().index(self.severity.lower())
|
|
except ValueError:
|
|
return len(self.severity_order())
|
|
|
|
|
|
@dataclass
|
|
class RemediationAction:
|
|
"""Represents a remediation action to be applied (or already applied) to a finding."""
|
|
action_type: str # s3_block_public, iam_enforce_mfa, rds_encrypt, etc.
|
|
resource_id: str
|
|
dry_run: bool = True
|
|
applicable: bool = True
|
|
reason: str = ""
|
|
# Tracking fields
|
|
status: str = "pending" # pending, applied, failed, skipped
|
|
created_at: Optional[str] = None
|
|
applied_at: Optional[str] = None
|
|
result_json: Optional[str] = None
|
|
|
|
def __post_init__(self):
|
|
if self.created_at is None:
|
|
self.created_at = datetime.utcnow().isoformat()
|
|
if self.result_json is None:
|
|
self.result_json = "{}"
|
|
|
|
def to_dict(self) -> dict:
|
|
return asdict(self)
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: dict) -> "RemediationAction":
|
|
return cls(**data)
|