197 lines
6.7 KiB
Python
197 lines
6.7 KiB
Python
"""
|
|
Windows remote scanning via SMB upload + WinRM execution.
|
|
|
|
Architecture:
|
|
1. SMB put: upload phi_scan.ps1 to C$\\Windows\\Temp\\
|
|
2. WinRM run: powershell -File C:\\Windows\\Temp\\phi_scan.ps1
|
|
3. Script writes JSON to C:\\Windows\\Temp\\phi_results.json
|
|
4. SMB get: download results.json
|
|
5. Parse and return findings
|
|
"""
|
|
import subprocess
|
|
import structlog
|
|
import json
|
|
from pathlib import Path
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
# PowerShell script that does the actual file scanning on the Windows target
|
|
PHI_WINRM_SCRIPT = r"""
|
|
# GreySec PHI Scanner — Windows Host Agent
|
|
# Scans target paths for PHI patterns, writes JSON results
|
|
|
|
param(
|
|
[string[]]$ScanPaths = @("C:\Users", "C:\ProgramData", "C:\inetpub"),
|
|
[string]$ResultsPath = "C:\Windows\Temp\phi_results.json"
|
|
)
|
|
|
|
$ErrorActionPreference = "SilentlyContinue"
|
|
|
|
# PHI regex patterns
|
|
$patterns = @{
|
|
SSN = '\b\d{3}[-\s]\d{2}[-\s]\d{4}\b'
|
|
MRN = '\b(MRN|Medical Record|Patient ID|PATID)[\s:#\-=]*\d{5,}\b'
|
|
Phone = '\b(\+?1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b'
|
|
Email = '\b[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}\b'
|
|
DOB = '\b(DOB|Date of Birth|Birthdate)[\s:#\-=]*(?:(?:0?[1-9]|1[0-2])[/.](?:0?[1-9]|[12]\d|3[01])[/.](?:19|20)\d{2})\b'
|
|
}
|
|
|
|
$extensions = @('.txt','.csv','.json','.xml','.sql','.log','.py','.md','.yaml','.yml','.tsv','.dat','.html','.htm')
|
|
|
|
$results = @{
|
|
hostname = $env:COMPUTERNAME
|
|
scanned_at = (Get-Date -Format "o")
|
|
files_scanned = 0
|
|
findings = @()
|
|
}
|
|
|
|
foreach ($basePath in $ScanPaths) {
|
|
if (-not (Test-Path $basePath)) { continue }
|
|
|
|
# Get files using Where-Object (NOT -Include, broken in PS 5.x)
|
|
$files = Get-ChildItem -Path $basePath -Recurse -File | Where-Object { $extensions -contains $_.Extension.ToLower() }
|
|
|
|
foreach ($file in $files) {
|
|
$results.files_scanned++
|
|
|
|
# Skip files > 10MB
|
|
if ($file.Length -gt 10MB) { continue }
|
|
|
|
# Skip the results file itself
|
|
if ($file.Name -eq "phi_results.json") { continue }
|
|
|
|
try {
|
|
$content = Get-Content $file.FullName -Raw -ErrorAction SilentlyContinue
|
|
if (-not $content) { continue }
|
|
|
|
foreach ($key in $patterns.Keys) {
|
|
$matches = [regex]::Matches($content, $patterns[$key])
|
|
foreach ($m in $matches) {
|
|
$radius = 40
|
|
$start = [Math]::Max(0, $m.Index - $radius)
|
|
$end = [Math]::Min($content.Length, $m.Index + $m.Length + $radius)
|
|
$context = $content.Substring($start, $end - $start).Replace("`n", " ").Replace("`r", "")
|
|
|
|
$results.findings += @{
|
|
type = $key
|
|
text = $m.Value
|
|
file = $file.FullName
|
|
line = "RAW"
|
|
context = $context
|
|
offset = $m.Index
|
|
}
|
|
}
|
|
}
|
|
} catch {}
|
|
}
|
|
}
|
|
|
|
$results | ConvertTo-Json -Depth 10 | Set-Content -Path $ResultsPath -Encoding UTF8
|
|
Write-Host "SCAN_COMPLETE:$($results.findings.Count)"
|
|
"""
|
|
|
|
|
|
def _smb_put(host: str, share: str, user: str, password: str, remote_path: str, content: bytes) -> None:
|
|
"""Upload a file via SMB using smbclient."""
|
|
proc = subprocess.Popen(
|
|
[
|
|
"smbclient",
|
|
f"//{host}/{share}",
|
|
"-U", f"{user}%{password}",
|
|
"-c", f"put - {remote_path}",
|
|
],
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
stdout, stderr = proc.communicate(input=content, timeout=30)
|
|
if proc.returncode != 0:
|
|
raise RuntimeError(f"SMB put failed: {stderr.decode()}")
|
|
|
|
|
|
def _smb_get(host: str, share: str, user: str, password: str, remote_path: str) -> bytes:
|
|
"""Download a file via SMB using smbclient."""
|
|
proc = subprocess.Popen(
|
|
[
|
|
"smbclient",
|
|
f"//{host}/{share}",
|
|
"-U", f"{user}%{password}",
|
|
"-c", f"get {remote_path} -",
|
|
],
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
stdout, stderr = proc.communicate(timeout=30)
|
|
if proc.returncode != 0:
|
|
raise RuntimeError(f"SMB get failed: {stderr.decode()}")
|
|
return stdout
|
|
|
|
|
|
def _winrm_run(host: str, port: int, user: str, password: str, script_path: str) -> tuple[int, str]:
|
|
"""Execute a PowerShell script via WinRM. Returns (status_code, stderr)."""
|
|
import winrm
|
|
s = winrm.Session(
|
|
f"http://{host}:{port}/wsman",
|
|
auth=(user, password),
|
|
transport="ntlm",
|
|
read_timeout_sec=300,
|
|
operation_timeout_sec=180,
|
|
)
|
|
r = s.run_cmd("powershell", [
|
|
"-ExecutionPolicy", "Bypass",
|
|
"-NoProfile",
|
|
"-NonInteractive",
|
|
"-File", script_path,
|
|
])
|
|
return r.status_code, r.std_err.decode()
|
|
|
|
|
|
def scan_windows_host(config) -> dict:
|
|
"""
|
|
Execute a full PHI scan against a remote Windows host via SMB + WinRM.
|
|
Returns a dict with hostname, files_scanned, and findings list.
|
|
"""
|
|
logger.info("Starting Windows host scan", hostname=config.hostname, ip=config.ip)
|
|
|
|
script_content = PHI_WINRM_SCRIPT.encode("utf-8")
|
|
remote_script = "Windows/Temp/greysec_phi.ps1"
|
|
remote_results = "Windows/Temp/phi_results.json"
|
|
winrm_script_path = r"C:\Windows\Temp\greysec_phi.ps1"
|
|
winrm_results_path = r"C:\Windows\Temp\phi_results.json"
|
|
|
|
# Step 1: SMB put — upload scan script
|
|
_smb_put(config.ip, config.share, config.username, config.password, remote_script, script_content)
|
|
logger.debug("Uploaded scan script via SMB")
|
|
|
|
# Step 2: WinRM run — execute the script
|
|
status, err = _winrm_run(config.ip, config.winrm_port, config.username, config.password, winrm_script_path)
|
|
if status != 0:
|
|
logger.error("WinRM execution failed", status=status, error=err)
|
|
|
|
# Step 3: SMB get — download results
|
|
raw = _smb_get(config.ip, config.share, config.username, config.password, remote_results)
|
|
|
|
try:
|
|
result = json.loads(raw.decode("utf-8"))
|
|
except json.JSONDecodeError:
|
|
logger.error("Failed to parse results JSON from Windows host", hostname=config.hostname)
|
|
return {"hostname": config.hostname, "ip": config.ip, "files_scanned": 0, "findings": [], "error": "JSON parse failed"}
|
|
|
|
logger.info(
|
|
"Windows scan complete",
|
|
hostname=config.hostname,
|
|
files_scanned=result.get("files_scanned", 0),
|
|
findings=len(result.get("findings", [])),
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
def scan_windows_hosts(configs: list) -> list[dict]:
|
|
"""Scan multiple Windows hosts. Returns list of result dicts."""
|
|
results = []
|
|
for cfg in configs:
|
|
results.append(scan_windows_host(cfg))
|
|
return results
|