Skip to content

Commit 676fc54

Browse files
🎉 Implement pingcastle vulnerability parser (#13933)
* 🎉 Implement pingcastle vulnerability parser * udpate * update severity calculation * fix * Update unittests/tools/test_pingcastle_parser.py Co-authored-by: valentijnscholten <valentijnscholten@gmail.com> * Update dojo/tools/pingcastle/parser.py Co-authored-by: valentijnscholten <valentijnscholten@gmail.com> * Update dojo/tools/pingcastle/parser.py Co-authored-by: valentijnscholten <valentijnscholten@gmail.com> * fix * Update docs/content/supported_tools/parsers/file/pingcastle.md --------- Co-authored-by: valentijnscholten <valentijnscholten@gmail.com>
1 parent 2741ed2 commit 676fc54

7 files changed

Lines changed: 1323 additions & 0 deletions

File tree

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
title: "PingCastle"
2+
toc_hide: true
3+
---
4+
Import results from the [PingCastle](https://www.pingcastle.com/documentation/).
5+
6+
### Sample Scan Data
7+
Sample PingCastle scans can be found [here](https://github.com/DefectDojo/django-DefectDojo/tree/master/unittests/scans/pingcastle).

dojo/tools/pingcastle/__init__.py

Whitespace-only changes.

dojo/tools/pingcastle/parser.py

Lines changed: 285 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
2+
import contextlib
3+
import datetime
4+
import re
5+
6+
from defusedxml.ElementTree import parse
7+
8+
from dojo.models import Endpoint, Finding
9+
10+
11+
class PingCastleParser:
12+
13+
CVE_REGEX = re.compile(r"(CVE-\d{4}-\d{4,7})", re.IGNORECASE)
14+
15+
_SEVERITY_ORDER = ["Info", "Low", "Medium", "High", "Critical"]
16+
17+
def get_scan_types(self):
18+
return ["PingCastle"]
19+
20+
def get_label_for_scan_types(self, scan_type):
21+
return scan_type
22+
23+
def get_description_for_scan_types(self, scan_type):
24+
return "PingCastle XML export"
25+
26+
def get_findings(self, file, test):
27+
try:
28+
tree = parse(file)
29+
root = tree.getroot()
30+
except Exception as e:
31+
exception = f"Invalid PingCastle XML format: {e}"
32+
raise ValueError(exception)
33+
dupes = {}
34+
report_date = self._parse_datetime(root.findtext("GenerationDate"))
35+
domain_fqdn = root.findtext("DomainFQDN") or ""
36+
dc_infos, dc_endpoints = self._collect_domain_controllers(root)
37+
findings = []
38+
for rr in root.findall("RiskRules/HealthcheckRiskRule"):
39+
points = self._safe_int(rr.findtext("Points"))
40+
category = rr.findtext("Category") or ""
41+
model = rr.findtext("Model") or ""
42+
risk_id = rr.findtext("RiskId") or ""
43+
rationale = rr.findtext("Rationale") or ""
44+
severity = self._map_points_to_severity(points)
45+
severity = self._apply_contextual_bump(
46+
severity=severity,
47+
category=category,
48+
model=model,
49+
risk_id=risk_id,
50+
rationale=rationale,
51+
)
52+
if not severity or severity not in self._SEVERITY_ORDER:
53+
severity = "Info"
54+
title = f"[PingCastle] {risk_id} ({category}/{model})"
55+
description = self._compose_risk_rule_description(
56+
domain_fqdn=domain_fqdn,
57+
risk_id=risk_id,
58+
points=points,
59+
category=category,
60+
model=model,
61+
rationale=rationale,
62+
dc_infos=dc_infos,
63+
root=root,
64+
)
65+
finding = Finding(
66+
title=title,
67+
test=test,
68+
description=description,
69+
severity=severity,
70+
mitigation="Review and remediate according to PingCastle recommendations.",
71+
impact="Risk identified by PingCastle HealthCheck.",
72+
vuln_id_from_tool=risk_id,
73+
)
74+
if report_date:
75+
finding.date = report_date
76+
cves = list(self.CVE_REGEX.findall(rationale or ""))
77+
if cves:
78+
finding.unsaved_vulnerability_ids = cves
79+
finding.unsaved_endpoints = []
80+
if self._is_dc_specific_risk(risk_id, model, rationale):
81+
finding.unsaved_endpoints.extend(dc_endpoints)
82+
elif domain_fqdn:
83+
finding.unsaved_endpoints.append(Endpoint(host=domain_fqdn))
84+
if risk_id == "A-DC-Coerce":
85+
self._enrich_coerce_with_rpc_interfaces(finding, dc_infos)
86+
if risk_id == "A-DC-Spooler":
87+
self._enrich_spooler_status(finding, dc_infos)
88+
if risk_id == "A-MinPwdLen":
89+
self._enrich_password_policy(finding, root)
90+
dupe_key = risk_id
91+
if dupe_key in dupes:
92+
existing = dupes[dupe_key]
93+
existing.description += "\n\n-----\n\n" + finding.description
94+
existing.unsaved_endpoints.extend(finding.unsaved_endpoints)
95+
else:
96+
dupes[dupe_key] = finding
97+
findings.extend(list(dupes.values()))
98+
return findings
99+
100+
def _compose_risk_rule_description(
101+
self,
102+
domain_fqdn,
103+
risk_id,
104+
points,
105+
category,
106+
model,
107+
rationale,
108+
dc_infos,
109+
root,
110+
):
111+
lines = []
112+
lines.append("### PingCastle Risk Rule") # noqa: FURB113
113+
lines.append(f"**Domain**: `{domain_fqdn}`")
114+
lines.append(f"**RiskId**: `{risk_id}`")
115+
lines.append(f"**Category/Model**: `{category}` / `{model}`")
116+
lines.append(f"**Points**: `{points}`")
117+
if rationale:
118+
lines.append(f"**Rationale**: {rationale}")
119+
if risk_id.startswith("A-DC-") or "DomainControllers" in root.tag:
120+
if dc_infos:
121+
lines.append("\n#### Domain Controllers")
122+
for dc in dc_infos:
123+
ips = ", ".join(dc.get("ips", []))
124+
lines.append(
125+
f"- **{dc['name']}** (OS: {dc.get('os', '?')}, IPs: {ips}, "
126+
f"SpoolerRemote: {dc.get('remote_spooler', 'false')})",
127+
)
128+
return "\n".join(lines)
129+
130+
def _collect_domain_controllers(self, root):
131+
dc_infos = []
132+
endpoints = []
133+
for dc in root.findall("DomainControllers/HealthcheckDomainController"):
134+
name = dc.findtext("DCName") or ""
135+
os = dc.findtext("OperatingSystem") or ""
136+
remote_spooler = dc.findtext("RemoteSpoolerDetected") or "false"
137+
ip_elems = dc.findall("IP/string")
138+
ips = [ip_elem.text for ip_elem in ip_elems if ip_elem is not None and ip_elem.text]
139+
dc_info = {
140+
"name": name,
141+
"os": os,
142+
"remote_spooler": remote_spooler.lower() == "true",
143+
"ips": ips,
144+
"rpc_interfaces": [],
145+
}
146+
for rpc in dc.findall("RPCInterfacesOpen/HealthcheckDCRPCInterface"):
147+
dc_info["rpc_interfaces"].append({
148+
"ip": rpc.attrib.get("IP", ""),
149+
"interface": rpc.attrib.get("Interface", ""),
150+
"opnum": rpc.attrib.get("OpNum", ""),
151+
"function": rpc.attrib.get("Function", ""),
152+
})
153+
dc_infos.append(dc_info)
154+
if name:
155+
endpoints.append(Endpoint(host=name))
156+
endpoints.extend(Endpoint(host=ip) for ip in ips)
157+
return dc_infos, endpoints
158+
159+
def _enrich_coerce_with_rpc_interfaces(self, finding, dc_infos):
160+
added_any = False
161+
for dc in dc_infos:
162+
if dc.get("rpc_interfaces"):
163+
if not added_any:
164+
finding.description += "\n\n#### RPC Interfaces (potential coercion surface)\n"
165+
added_any = True
166+
finding.description += f"\n**{dc['name']}**:\n"
167+
for ri in dc["rpc_interfaces"]:
168+
finding.description += (
169+
f"- IP: `{ri['ip']}` | Interface: `{ri['interface']}` | "
170+
f"OpNum: `{ri['opnum']}` | Function: `{ri['function']}`\n"
171+
)
172+
173+
def _enrich_spooler_status(self, finding, dc_infos):
174+
any_remote_spooler = any(dc.get("remote_spooler") for dc in dc_infos)
175+
finding.description += (
176+
f"\n\n**Remote spooler exposure detected**: `{any_remote_spooler}`"
177+
)
178+
179+
def _enrich_password_policy(self, finding, root):
180+
min_len = None
181+
complexity = None
182+
for prop in root.findall("GPPPasswordPolicy/GPPSecurityPolicy/Properties/GPPSecurityPolicyProperty"):
183+
key = (prop.findtext("Property") or "").strip()
184+
val = (prop.findtext("Value") or "").strip()
185+
if key == "MinimumPasswordLength":
186+
min_len = val
187+
elif key == "PasswordComplexity":
188+
complexity = val
189+
if min_len is not None or complexity is not None:
190+
finding.description += "\n\n#### Observed Password Policy from GPO\n"
191+
if min_len is not None:
192+
finding.description += f"- MinimumPasswordLength: `{min_len}`\n"
193+
if complexity is not None:
194+
friendly = {"0": "disabled", "1": "enabled"}.get(complexity, complexity)
195+
finding.description += f"- PasswordComplexity: `{friendly}`\n"
196+
197+
@staticmethod
198+
def _parse_datetime(text):
199+
if not text:
200+
return None
201+
with contextlib.suppress(ValueError):
202+
return datetime.datetime.fromisoformat(text)
203+
return None
204+
205+
@staticmethod
206+
def _safe_int(text):
207+
try:
208+
return int(text)
209+
except (TypeError, ValueError):
210+
return 0
211+
212+
@staticmethod
213+
def _map_points_to_severity(points):
214+
if points <= 0:
215+
return "Info"
216+
if points <= 5:
217+
return "Low"
218+
if points <= 10:
219+
return "Medium"
220+
if points <= 15:
221+
return "High"
222+
return "Critical"
223+
224+
@staticmethod
225+
def _is_dc_specific_risk(risk_id: str, model: str = "", rationale: str = "") -> bool:
226+
"""
227+
Best effort classification: return True if the risk targets Domain Controllers specifically.
228+
Signals:
229+
- RiskId prefixes for DC: "A-DC-" (anomalies on DC), "S-DC-" (stale/DC subnet), and known IDs.
230+
- Model contains DC-specific notions (e.g., "Audit" with RiskId A-AuditDC).
231+
- Rationale text mentions DC count/context ("from X DC", "on domain controllers").
232+
"""
233+
rid = (risk_id or "").strip()
234+
mod = (model or "").strip()
235+
rat = (rationale or "").strip().lower()
236+
dc_prefixes = ("A-DC-", "S-DC-")
237+
if rid.startswith(dc_prefixes):
238+
return True
239+
dc_specific_ids = {
240+
"A-DC-Spooler",
241+
"A-DC-Coerce",
242+
"A-AuditDC",
243+
"S-DC-SubnetMissing",
244+
}
245+
if rid in dc_specific_ids:
246+
return True
247+
if mod == "Audit" and rid.endswith("DC"):
248+
return True
249+
dc_markers = (
250+
" from ",
251+
" dc",
252+
" dcs",
253+
" domain controller",
254+
" domain controllers",
255+
)
256+
return bool(any(marker in rat for marker in dc_markers))
257+
258+
def _apply_contextual_bump(self, severity: str, category: str = "", model: str = "",
259+
risk_id: str = "", rationale: str = "") -> str:
260+
"""
261+
Minimal additive logic on top of points-based severity:
262+
- If a CVE is mentioned -> bump by 1 level (at least Low).
263+
If rationale indicates missing/not enabled mitigation -> ensure at least Medium.
264+
- If DC-specific -> bump by 1 level.
265+
- If category is 'Exposure' -> bump by 1 level.
266+
"""
267+
if not severity or severity not in self._SEVERITY_ORDER:
268+
severity = "Info"
269+
idx = self._SEVERITY_ORDER.index(severity)
270+
rat = (rationale or "").lower()
271+
cat = (category or "").strip().lower()
272+
273+
if self.CVE_REGEX.search(rationale or ""):
274+
idx = min(idx + 1, len(self._SEVERITY_ORDER) - 1)
275+
mitigation_markers = ("mitigation", "not set", "disabled", "missing", "not enabled", "enable")
276+
if any(m in rat for m in mitigation_markers):
277+
idx = max(idx, self._SEVERITY_ORDER.index("Medium"))
278+
279+
if self._is_dc_specific_risk(risk_id, model, rationale):
280+
idx = min(idx + 1, len(self._SEVERITY_ORDER) - 1)
281+
282+
if cat == "exposure":
283+
idx = min(idx + 1, len(self._SEVERITY_ORDER) - 1)
284+
285+
return self._SEVERITY_ORDER[idx]

0 commit comments

Comments
 (0)