Files
phi-scanner/results/winrm_scan_impl.py
2026-05-08 17:44:26 -05:00

197 lines
6.7 KiB
Python

"""
Windows remote scanning via SMB upload + WinRM execution.
Architecture:
1. SMB put: upload phi_scan.ps1 to C$\\Windows\\Temp\\
2. WinRM run: powershell -File C:\\Windows\\Temp\\phi_scan.ps1
3. Script writes JSON to C:\\Windows\\Temp\\phi_results.json
4. SMB get: download results.json
5. Parse and return findings
"""
import subprocess
import structlog
import json
from pathlib import Path
logger = structlog.get_logger()
# PowerShell script that does the actual file scanning on the Windows target
PHI_WINRM_SCRIPT = r"""
# GreySec PHI Scanner — Windows Host Agent
# Scans target paths for PHI patterns, writes JSON results
param(
[string[]]$ScanPaths = @("C:\Users", "C:\ProgramData", "C:\inetpub"),
[string]$ResultsPath = "C:\Windows\Temp\phi_results.json"
)
$ErrorActionPreference = "SilentlyContinue"
# PHI regex patterns
$patterns = @{
SSN = '\b\d{3}[-\s]\d{2}[-\s]\d{4}\b'
MRN = '\b(MRN|Medical Record|Patient ID|PATID)[\s:#\-=]*\d{5,}\b'
Phone = '\b(\+?1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b'
Email = '\b[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}\b'
DOB = '\b(DOB|Date of Birth|Birthdate)[\s:#\-=]*(?:(?:0?[1-9]|1[0-2])[/.](?:0?[1-9]|[12]\d|3[01])[/.](?:19|20)\d{2})\b'
}
$extensions = @('.txt','.csv','.json','.xml','.sql','.log','.py','.md','.yaml','.yml','.tsv','.dat','.html','.htm')
$results = @{
hostname = $env:COMPUTERNAME
scanned_at = (Get-Date -Format "o")
files_scanned = 0
findings = @()
}
foreach ($basePath in $ScanPaths) {
if (-not (Test-Path $basePath)) { continue }
# Get files using Where-Object (NOT -Include, broken in PS 5.x)
$files = Get-ChildItem -Path $basePath -Recurse -File | Where-Object { $extensions -contains $_.Extension.ToLower() }
foreach ($file in $files) {
$results.files_scanned++
# Skip files > 10MB
if ($file.Length -gt 10MB) { continue }
# Skip the results file itself
if ($file.Name -eq "phi_results.json") { continue }
try {
$content = Get-Content $file.FullName -Raw -ErrorAction SilentlyContinue
if (-not $content) { continue }
foreach ($key in $patterns.Keys) {
$matches = [regex]::Matches($content, $patterns[$key])
foreach ($m in $matches) {
$radius = 40
$start = [Math]::Max(0, $m.Index - $radius)
$end = [Math]::Min($content.Length, $m.Index + $m.Length + $radius)
$context = $content.Substring($start, $end - $start).Replace("`n", " ").Replace("`r", "")
$results.findings += @{
type = $key
text = $m.Value
file = $file.FullName
line = "RAW"
context = $context
offset = $m.Index
}
}
}
} catch {}
}
}
$results | ConvertTo-Json -Depth 10 | Set-Content -Path $ResultsPath -Encoding UTF8
Write-Host "SCAN_COMPLETE:$($results.findings.Count)"
"""
def _smb_put(host: str, share: str, user: str, password: str, remote_path: str, content: bytes) -> None:
"""Upload a file via SMB using smbclient."""
proc = subprocess.Popen(
[
"smbclient",
f"//{host}/{share}",
"-U", f"{user}%{password}",
"-c", f"put - {remote_path}",
],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = proc.communicate(input=content, timeout=30)
if proc.returncode != 0:
raise RuntimeError(f"SMB put failed: {stderr.decode()}")
def _smb_get(host: str, share: str, user: str, password: str, remote_path: str) -> bytes:
"""Download a file via SMB using smbclient."""
proc = subprocess.Popen(
[
"smbclient",
f"//{host}/{share}",
"-U", f"{user}%{password}",
"-c", f"get {remote_path} -",
],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = proc.communicate(timeout=30)
if proc.returncode != 0:
raise RuntimeError(f"SMB get failed: {stderr.decode()}")
return stdout
def _winrm_run(host: str, port: int, user: str, password: str, script_path: str) -> tuple[int, str]:
"""Execute a PowerShell script via WinRM. Returns (status_code, stderr)."""
import winrm
s = winrm.Session(
f"http://{host}:{port}/wsman",
auth=(user, password),
transport="ntlm",
read_timeout_sec=300,
operation_timeout_sec=180,
)
r = s.run_cmd("powershell", [
"-ExecutionPolicy", "Bypass",
"-NoProfile",
"-NonInteractive",
"-File", script_path,
])
return r.status_code, r.std_err.decode()
def scan_windows_host(config) -> dict:
"""
Execute a full PHI scan against a remote Windows host via SMB + WinRM.
Returns a dict with hostname, files_scanned, and findings list.
"""
logger.info("Starting Windows host scan", hostname=config.hostname, ip=config.ip)
script_content = PHI_WINRM_SCRIPT.encode("utf-8")
remote_script = "Windows/Temp/greysec_phi.ps1"
remote_results = "Windows/Temp/phi_results.json"
winrm_script_path = r"C:\Windows\Temp\greysec_phi.ps1"
winrm_results_path = r"C:\Windows\Temp\phi_results.json"
# Step 1: SMB put — upload scan script
_smb_put(config.ip, config.share, config.username, config.password, remote_script, script_content)
logger.debug("Uploaded scan script via SMB")
# Step 2: WinRM run — execute the script
status, err = _winrm_run(config.ip, config.winrm_port, config.username, config.password, winrm_script_path)
if status != 0:
logger.error("WinRM execution failed", status=status, error=err)
# Step 3: SMB get — download results
raw = _smb_get(config.ip, config.share, config.username, config.password, remote_results)
try:
result = json.loads(raw.decode("utf-8"))
except json.JSONDecodeError:
logger.error("Failed to parse results JSON from Windows host", hostname=config.hostname)
return {"hostname": config.hostname, "ip": config.ip, "files_scanned": 0, "findings": [], "error": "JSON parse failed"}
logger.info(
"Windows scan complete",
hostname=config.hostname,
files_scanned=result.get("files_scanned", 0),
findings=len(result.get("findings", [])),
)
return result
def scan_windows_hosts(configs: list) -> list[dict]:
"""Scan multiple Windows hosts. Returns list of result dicts."""
results = []
for cfg in configs:
results.append(scan_windows_host(cfg))
return results