Skip to content

Commit fb85784

Browse files
Snyk Issue Api Scan "sca" import implementation (#13263)
* implementation * docu update * removed unnecessary tests to ruff it out * deduplication for snyk api fields added * tests update * ruff --------- Co-authored-by: valentijnscholten <valentijnscholten@gmail.com>
1 parent 22d7ece commit fb85784

7 files changed

Lines changed: 1816 additions & 119 deletions

File tree

docs/content/en/connecting_your_tools/parsers/file/snyk_issue_api.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@
22
title: "Snyk Issue API"
33
toc_hide: true
44
---
5-
The Snyk Issue API parser supports importing vulnerability data from the Snyk Issue API in JSON format. Currently only parsing issues of type `code` is supported. Samples of ther issue types are welcome.
5+
The Snyk Issue API parser supports importing vulnerability data from the Snyk Issue API in JSON format.
6+
7+
Currently parsing issues of type `code` (SAST) and `package_vulnerability` (SCA) are supported.
8+
9+
Samples of ther issue types are welcome.
610

711
For more information about the Snyk Issue API, refer to the [official Snyk API documentation](https://docs.snyk.io/snyk-api/reference/issues#get-orgs-org_id-issues).
812

dojo/settings/settings.dist.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1366,6 +1366,7 @@ def saml2_attrib_map_format(din):
13661366
"Cyberwatch scan (Galeax)": ["title", "description", "severity"],
13671367
"Cycognito Scan": ["title", "severity"],
13681368
"OpenVAS Parser v2": ["title", "severity", "vuln_id_from_tool", "endpoints"],
1369+
"Snyk Issue API Scan": ["vuln_id_from_tool", "file_path"],
13691370
}
13701371

13711372
# Override the hardcoded settings here via the env var
@@ -1625,6 +1626,7 @@ def saml2_attrib_map_format(din):
16251626
"Qualys Hacker Guardian Scan": DEDUPE_ALGO_HASH_CODE,
16261627
"Cyberwatch scan (Galeax)": DEDUPE_ALGO_HASH_CODE,
16271628
"OpenVAS Parser v2": DEDUPE_ALGO_HASH_CODE,
1629+
"Snyk Issue API Scan": DEDUPE_ALGO_HASH_CODE,
16281630
}
16291631

16301632
# Override the hardcoded settings here via the env var

dojo/tools/snyk_issue_api/parser.py

Lines changed: 184 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import json
2-
from contextlib import suppress
32
from datetime import datetime
43

54
from dojo.models import Finding
@@ -43,42 +42,59 @@ def process_tree(self, tree, test):
4342
findings.append(finding)
4443
return findings
4544

46-
def get_finding(self, issue, test):
47-
# Check top-level type must be "issue" as "packages" have their own API it seems.
48-
if not issue or issue.get("type") != "issue":
49-
return None
50-
51-
attributes = issue.get("attributes", {})
52-
53-
# Check attributes-level type must be "code"
54-
# Other items are not supported yet due to a lack of samples and lack of documentation
55-
# package_vulnerability,license,cloud,code,customconfig
56-
if attributes.get("type") != "code":
57-
return None
58-
59-
# Extract CWE classes
45+
def extract_cwe_classes(self, attributes):
6046
cwes = []
6147
for class_info in attributes.get("classes", []):
6248
if class_info.get("source") == "CWE":
6349
cwe_id = class_info.get("id", "").replace("CWE-", "")
6450
if cwe_id.isdigit():
6551
cwes.append(int(cwe_id))
6652

67-
# Extract location information, fixability and collect all source locations for impact
53+
return cwes
54+
55+
def extract_if_fix_is_available(self, finding_type, coordinates):
56+
if coordinates is None:
57+
return False
58+
59+
for coordinate in coordinates:
60+
# Check if any fix is available
61+
if finding_type == "code":
62+
if coordinate.get("is_fixable_snyk") or \
63+
coordinate.get("is_fixable_upstream") or \
64+
coordinate.get("is_fixable_manually"):
65+
return True
66+
67+
if finding_type == "package_vulnerability":
68+
if coordinate.get("is_fixable_snyk") or \
69+
coordinate.get("is_fixable_upstream") or \
70+
coordinate.get("is_fixable_manually") or \
71+
coordinate.get("is_patchable") or \
72+
coordinate.get("is_pinnable") or \
73+
coordinate.get("is_upgradeable"):
74+
return True
75+
return False
76+
77+
def extract_coordinate_data(self, is_type_code, coordinates):
6878
file_path = None
69-
line = None
70-
fix_available = False
79+
line = None # Always None for SCA
80+
component_name = None
81+
component_version = None
82+
reachable = False # SCA only
7183
impact_locations = []
7284

73-
for coordinate in attributes.get("coordinates", []):
74-
# Check if any fix is available
75-
if coordinate.get("is_fixable_snyk") or \
76-
coordinate.get("is_fixable_upstream") or \
77-
coordinate.get("is_fixable_manually"):
78-
fix_available = True
85+
for coordinate in coordinates:
86+
if not is_type_code:
87+
if coordinate.get("reachability") != "not-applicable":
88+
reachable = True
7989

8090
for representation in coordinate.get("representations", []):
81-
if "sourceLocation" in representation:
91+
if not is_type_code:
92+
if "dependency" in representation:
93+
dependency = representation["dependency"]
94+
component_name = dependency.get("package_name")
95+
component_version = dependency.get("package_version")
96+
file_path = component_name
97+
elif "sourceLocation" in representation:
8298
location = representation["sourceLocation"]
8399
region = location.get("region", {})
84100
start = region.get("start", {})
@@ -100,6 +116,94 @@ def get_finding(self, issue, test):
100116
if region:
101117
line = start.get("line")
102118

119+
return file_path, line, component_name, component_version, reachable, impact_locations
120+
121+
def get_exploit_details(self, exploit_details):
122+
if exploit_details:
123+
sources = exploit_details.get("sources", [])
124+
if sources:
125+
return [f"Exploit Sources: {', '.join(sources)}", ""]
126+
127+
return None
128+
129+
def extract_problems(self, problems):
130+
if problems:
131+
problem = problems[0] # Take the first problem
132+
return [
133+
f"id: {problem.get('id', 'Unknown')}",
134+
f"Source: {problem.get('source', 'Unknown')}",
135+
f"Type: {problem.get('type', 'Unknown')}",
136+
f"URL: {problem.get('url', 'Unknown')}" if problem.get("url") else "",
137+
f"Last Updated: {problem.get('updated_at', 'Unknown')}",
138+
"", # Empty line before locations
139+
]
140+
return None
141+
142+
def extract_problem_ids(self, problems):
143+
ids = []
144+
if problems:
145+
for problem in problems:
146+
if "id" in problem:
147+
# using .extend here adds character by character to the array
148+
ids.append(problem["id"]) # noqa: PERF401
149+
return ids
150+
151+
def extract_risk_score(self, risk):
152+
if risk and "score" in risk:
153+
score = risk["score"]
154+
if isinstance(score, dict):
155+
return (
156+
f"Risk Score: {score.get('value', 'N/A')} "
157+
f"(Model: {score.get('model', 'N/A')})"
158+
)
159+
return None
160+
161+
def extract_cvss_severities(self, severities, version):
162+
for severity in severities:
163+
if version in severity.get("version"):
164+
# returning first matching severity
165+
return severity.get("vector"), severity.get("score")
166+
167+
return None, None
168+
169+
def extract_convert_created_date(self, created_at):
170+
if created_at:
171+
created_str = created_at
172+
# Parse the date string and convert to yyyy-mm-dd format
173+
try:
174+
created_date = datetime.fromisoformat(created_str)
175+
return created_date.strftime("%Y-%m-%d")
176+
except (ValueError, AttributeError):
177+
return None
178+
179+
return None
180+
181+
def get_finding(self, issue, test):
182+
# Check top-level type must be "issue" as "packages" have their own API it seems.
183+
if not issue or issue.get("type") != "issue":
184+
return None
185+
186+
attributes = issue.get("attributes", {})
187+
188+
# Check attributes-level type - support both code and package_vulnerability
189+
issue_type = attributes.get("type")
190+
191+
if issue_type not in {"code", "package_vulnerability"}:
192+
return None
193+
194+
cwes = self.extract_cwe_classes(attributes)
195+
196+
impact_details = []
197+
198+
problem = self.extract_problems(attributes.get("problems", []))
199+
if problem:
200+
impact_details.extend(problem)
201+
202+
# Add exploit details if available, SCA only
203+
exploit_details = self.get_exploit_details(attributes.get("exploit_details", {}))
204+
if exploit_details:
205+
impact_details.extend(exploit_details)
206+
103207
# Map severity levels
104208
severity_map = {
105209
"critical": "Critical",
@@ -108,16 +212,30 @@ def get_finding(self, issue, test):
108212
"low": "Low",
109213
"info": "Info",
110214
}
215+
111216
severity = severity_map.get(attributes.get("effective_severity_level", "").lower(), "Info")
112217

113-
# Parse created_at date
114-
created = None
115-
if attributes.get("created_at"):
116-
with suppress(ValueError):
117-
created = datetime.strptime(attributes["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ")
118-
if not created:
119-
with suppress(ValueError):
120-
created = datetime.strptime(attributes["created_at"], "%Y-%m-%dT%H:%M:%SZ")
218+
created = self.extract_convert_created_date(attributes.get("created_at"))
219+
220+
is_out_of_scope = False # attributes.get("is_out_of_scope", False)
221+
222+
file_path, line, component_name, component_version, reachable, impact_locations = self.extract_coordinate_data(issue_type == "code", attributes.get("coordinates", []))
223+
224+
# Locations (Code only)
225+
if impact_locations:
226+
for location in impact_locations:
227+
impact_details.extend(location)
228+
229+
# Add package details (SCA only)
230+
if component_name:
231+
impact_details.extend([
232+
"Package Details:",
233+
f"Package: {component_name}",
234+
f"Version: {component_version or 'Unknown'}",
235+
"",
236+
])
237+
238+
impact_details.append(f"Reachable: {'Yes' if reachable else 'No'}")
121239

122240
# Create finding
123241
finding = Finding(
@@ -130,50 +248,51 @@ def get_finding(self, issue, test):
130248
unique_id_from_tool=issue.get("id"),
131249
file_path=file_path,
132250
line=line,
133-
out_of_scope=attributes.get("ignored", False),
251+
out_of_scope=is_out_of_scope,
134252
active=attributes.get("status") == "open" and not attributes.get("ignored", False),
135-
verified=True,
253+
# not all open issues are verified, only fixed and ignored
254+
verified=attributes.get("ignored", True) or attributes.get("status") == "resolved",
255+
false_p=attributes.get("ignored"),
256+
# mitigated is type "date", not "boolean"
257+
is_mitigated=attributes.get("status") == "resolved",
136258
cwe=cwes[0] if cwes else None,
137259
date=created,
260+
component_name=component_name,
261+
component_version=component_version,
262+
risk_accepted=False,
138263
)
139264

140-
# Set fix_available if the field exists in the model
141-
if hasattr(finding, "fix_available"):
142-
finding.fix_available = fix_available
265+
# sca only
266+
if attributes.get("key"):
267+
finding.vuln_id_from_tool = attributes.get("key")
268+
269+
if attributes.get("severities"):
270+
v3vector, v3score = self.extract_cvss_severities(attributes.get("severities", {}), "3")
271+
v4vector, v4score = self.extract_cvss_severities(attributes.get("severities", {}), "4")
272+
273+
if v3vector and v3score:
274+
finding.cvssv3 = v3vector
275+
finding.cvssv3_score = v3score
276+
277+
if v4vector and v4score:
278+
finding.cvssv4 = v4vector
279+
finding.cvssv4_score = v4score
280+
281+
finding.unsaved_vulnerability_ids = self.extract_problem_ids(attributes.get("problems", []))
282+
283+
finding.fix_available = self.extract_if_fix_is_available(issue_type, attributes.get("coordinates", []))
143284

144285
# Add risk score if available
145-
risk = attributes.get("risk", {})
146-
if risk and "score" in risk:
147-
score = risk["score"]
148-
if isinstance(score, dict):
149-
finding.severity_justification = (
150-
f"Risk Score: {score.get('value', 'N/A')} "
151-
f"(Model: {score.get('model', 'N/A')})"
152-
)
286+
risk = self.extract_risk_score(attributes.get("risk", {}))
287+
288+
if risk:
289+
finding.severity_justification = risk
153290

154291
# Add additional CWEs as references
155292
if len(cwes) > 1:
156293
finding.references = "Additional CWEs: " + ", ".join(f"CWE-{cwe}" for cwe in cwes[1:])
157294

158-
# Add problem details and all source locations to impact
159-
impact_details = []
160-
161-
# Add problem information
162-
problems = attributes.get("problems", [])
163-
if problems:
164-
problem = problems[0] # Take the first problem
165-
impact_details.extend([
166-
f"Source: {problem.get('source', 'Unknown')}",
167-
f"Type: {problem.get('type', 'Unknown')}",
168-
f"Last Updated: {problem.get('updated_at', 'Unknown')}",
169-
f"Severity: {severity}",
170-
"", # Empty line before locations
171-
])
172-
173-
# Add all source locations
174-
for location in impact_locations:
175-
impact_details.extend(location)
176-
295+
# Set impact with details
177296
if impact_details:
178297
finding.impact = "\n".join(impact_details).rstrip()
179298

0 commit comments

Comments
 (0)