11import json
2- from contextlib import suppress
32from datetime import datetime
43
54from dojo .models import Finding
@@ -43,42 +42,59 @@ def process_tree(self, tree, test):
4342 findings .append (finding )
4443 return findings
4544
46- def get_finding (self , issue , test ):
47- # Check top-level type must be "issue" as "packages" have their own API it seems.
48- if not issue or issue .get ("type" ) != "issue" :
49- return None
50-
51- attributes = issue .get ("attributes" , {})
52-
53- # Check attributes-level type must be "code"
54- # Other items are not supported yet due to a lack of samples and lack of documentation
55- # package_vulnerability,license,cloud,code,customconfig
56- if attributes .get ("type" ) != "code" :
57- return None
58-
59- # Extract CWE classes
45+ def extract_cwe_classes (self , attributes ):
6046 cwes = []
6147 for class_info in attributes .get ("classes" , []):
6248 if class_info .get ("source" ) == "CWE" :
6349 cwe_id = class_info .get ("id" , "" ).replace ("CWE-" , "" )
6450 if cwe_id .isdigit ():
6551 cwes .append (int (cwe_id ))
6652
67- # Extract location information, fixability and collect all source locations for impact
53+ return cwes
54+
55+ def extract_if_fix_is_available (self , finding_type , coordinates ):
56+ if coordinates is None :
57+ return False
58+
59+ for coordinate in coordinates :
60+ # Check if any fix is available
61+ if finding_type == "code" :
62+ if coordinate .get ("is_fixable_snyk" ) or \
63+ coordinate .get ("is_fixable_upstream" ) or \
64+ coordinate .get ("is_fixable_manually" ):
65+ return True
66+
67+ if finding_type == "package_vulnerability" :
68+ if coordinate .get ("is_fixable_snyk" ) or \
69+ coordinate .get ("is_fixable_upstream" ) or \
70+ coordinate .get ("is_fixable_manually" ) or \
71+ coordinate .get ("is_patchable" ) or \
72+ coordinate .get ("is_pinnable" ) or \
73+ coordinate .get ("is_upgradeable" ):
74+ return True
75+ return False
76+
77+ def extract_coordinate_data (self , is_type_code , coordinates ):
6878 file_path = None
69- line = None
70- fix_available = False
79+ line = None # Always None for SCA
80+ component_name = None
81+ component_version = None
82+ reachable = False # SCA only
7183 impact_locations = []
7284
73- for coordinate in attributes .get ("coordinates" , []):
74- # Check if any fix is available
75- if coordinate .get ("is_fixable_snyk" ) or \
76- coordinate .get ("is_fixable_upstream" ) or \
77- coordinate .get ("is_fixable_manually" ):
78- fix_available = True
85+ for coordinate in coordinates :
86+ if not is_type_code :
87+ if coordinate .get ("reachability" ) != "not-applicable" :
88+ reachable = True
7989
8090 for representation in coordinate .get ("representations" , []):
81- if "sourceLocation" in representation :
91+ if not is_type_code :
92+ if "dependency" in representation :
93+ dependency = representation ["dependency" ]
94+ component_name = dependency .get ("package_name" )
95+ component_version = dependency .get ("package_version" )
96+ file_path = component_name
97+ elif "sourceLocation" in representation :
8298 location = representation ["sourceLocation" ]
8399 region = location .get ("region" , {})
84100 start = region .get ("start" , {})
@@ -100,6 +116,94 @@ def get_finding(self, issue, test):
100116 if region :
101117 line = start .get ("line" )
102118
119+ return file_path , line , component_name , component_version , reachable , impact_locations
120+
121+ def get_exploit_details (self , exploit_details ):
122+ if exploit_details :
123+ sources = exploit_details .get ("sources" , [])
124+ if sources :
125+ return [f"Exploit Sources: { ', ' .join (sources )} " , "" ]
126+
127+ return None
128+
129+ def extract_problems (self , problems ):
130+ if problems :
131+ problem = problems [0 ] # Take the first problem
132+ return [
133+ f"id: { problem .get ('id' , 'Unknown' )} " ,
134+ f"Source: { problem .get ('source' , 'Unknown' )} " ,
135+ f"Type: { problem .get ('type' , 'Unknown' )} " ,
136+ f"URL: { problem .get ('url' , 'Unknown' )} " if problem .get ("url" ) else "" ,
137+ f"Last Updated: { problem .get ('updated_at' , 'Unknown' )} " ,
138+ "" , # Empty line before locations
139+ ]
140+ return None
141+
142+ def extract_problem_ids (self , problems ):
143+ ids = []
144+ if problems :
145+ for problem in problems :
146+ if "id" in problem :
147+ # using .extend here adds character by character to the array
148+ ids .append (problem ["id" ]) # noqa: PERF401
149+ return ids
150+
151+ def extract_risk_score (self , risk ):
152+ if risk and "score" in risk :
153+ score = risk ["score" ]
154+ if isinstance (score , dict ):
155+ return (
156+ f"Risk Score: { score .get ('value' , 'N/A' )} "
157+ f"(Model: { score .get ('model' , 'N/A' )} )"
158+ )
159+ return None
160+
161+ def extract_cvss_severities (self , severities , version ):
162+ for severity in severities :
163+ if version in severity .get ("version" ):
164+ # returning first matching severity
165+ return severity .get ("vector" ), severity .get ("score" )
166+
167+ return None , None
168+
169+ def extract_convert_created_date (self , created_at ):
170+ if created_at :
171+ created_str = created_at
172+ # Parse the date string and convert to yyyy-mm-dd format
173+ try :
174+ created_date = datetime .fromisoformat (created_str )
175+ return created_date .strftime ("%Y-%m-%d" )
176+ except (ValueError , AttributeError ):
177+ return None
178+
179+ return None
180+
181+ def get_finding (self , issue , test ):
182+ # Check top-level type must be "issue" as "packages" have their own API it seems.
183+ if not issue or issue .get ("type" ) != "issue" :
184+ return None
185+
186+ attributes = issue .get ("attributes" , {})
187+
188+ # Check attributes-level type - support both code and package_vulnerability
189+ issue_type = attributes .get ("type" )
190+
191+ if issue_type not in {"code" , "package_vulnerability" }:
192+ return None
193+
194+ cwes = self .extract_cwe_classes (attributes )
195+
196+ impact_details = []
197+
198+ problem = self .extract_problems (attributes .get ("problems" , []))
199+ if problem :
200+ impact_details .extend (problem )
201+
202+ # Add exploit details if available, SCA only
203+ exploit_details = self .get_exploit_details (attributes .get ("exploit_details" , {}))
204+ if exploit_details :
205+ impact_details .extend (exploit_details )
206+
103207 # Map severity levels
104208 severity_map = {
105209 "critical" : "Critical" ,
@@ -108,16 +212,30 @@ def get_finding(self, issue, test):
108212 "low" : "Low" ,
109213 "info" : "Info" ,
110214 }
215+
111216 severity = severity_map .get (attributes .get ("effective_severity_level" , "" ).lower (), "Info" )
112217
113- # Parse created_at date
114- created = None
115- if attributes .get ("created_at" ):
116- with suppress (ValueError ):
117- created = datetime .strptime (attributes ["created_at" ], "%Y-%m-%dT%H:%M:%S.%fZ" )
118- if not created :
119- with suppress (ValueError ):
120- created = datetime .strptime (attributes ["created_at" ], "%Y-%m-%dT%H:%M:%SZ" )
218+ created = self .extract_convert_created_date (attributes .get ("created_at" ))
219+
220+ is_out_of_scope = False # attributes.get("is_out_of_scope", False)
221+
222+ file_path , line , component_name , component_version , reachable , impact_locations = self .extract_coordinate_data (issue_type == "code" , attributes .get ("coordinates" , []))
223+
224+ # Locations (Code only)
225+ if impact_locations :
226+ for location in impact_locations :
227+ impact_details .extend (location )
228+
229+ # Add package details (SCA only)
230+ if component_name :
231+ impact_details .extend ([
232+ "Package Details:" ,
233+ f"Package: { component_name } " ,
234+ f"Version: { component_version or 'Unknown' } " ,
235+ "" ,
236+ ])
237+
238+ impact_details .append (f"Reachable: { 'Yes' if reachable else 'No' } " )
121239
122240 # Create finding
123241 finding = Finding (
@@ -130,50 +248,51 @@ def get_finding(self, issue, test):
130248 unique_id_from_tool = issue .get ("id" ),
131249 file_path = file_path ,
132250 line = line ,
133- out_of_scope = attributes . get ( "ignored" , False ) ,
251+ out_of_scope = is_out_of_scope ,
134252 active = attributes .get ("status" ) == "open" and not attributes .get ("ignored" , False ),
135- verified = True ,
253+ # not all open issues are verified, only fixed and ignored
254+ verified = attributes .get ("ignored" , True ) or attributes .get ("status" ) == "resolved" ,
255+ false_p = attributes .get ("ignored" ),
256+ # mitigated is type "date", not "boolean"
257+ is_mitigated = attributes .get ("status" ) == "resolved" ,
136258 cwe = cwes [0 ] if cwes else None ,
137259 date = created ,
260+ component_name = component_name ,
261+ component_version = component_version ,
262+ risk_accepted = False ,
138263 )
139264
140- # Set fix_available if the field exists in the model
141- if hasattr (finding , "fix_available" ):
142- finding .fix_available = fix_available
265+ # sca only
266+ if attributes .get ("key" ):
267+ finding .vuln_id_from_tool = attributes .get ("key" )
268+
269+ if attributes .get ("severities" ):
270+ v3vector , v3score = self .extract_cvss_severities (attributes .get ("severities" , {}), "3" )
271+ v4vector , v4score = self .extract_cvss_severities (attributes .get ("severities" , {}), "4" )
272+
273+ if v3vector and v3score :
274+ finding .cvssv3 = v3vector
275+ finding .cvssv3_score = v3score
276+
277+ if v4vector and v4score :
278+ finding .cvssv4 = v4vector
279+ finding .cvssv4_score = v4score
280+
281+ finding .unsaved_vulnerability_ids = self .extract_problem_ids (attributes .get ("problems" , []))
282+
283+ finding .fix_available = self .extract_if_fix_is_available (issue_type , attributes .get ("coordinates" , []))
143284
144285 # Add risk score if available
145- risk = attributes .get ("risk" , {})
146- if risk and "score" in risk :
147- score = risk ["score" ]
148- if isinstance (score , dict ):
149- finding .severity_justification = (
150- f"Risk Score: { score .get ('value' , 'N/A' )} "
151- f"(Model: { score .get ('model' , 'N/A' )} )"
152- )
286+ risk = self .extract_risk_score (attributes .get ("risk" , {}))
287+
288+ if risk :
289+ finding .severity_justification = risk
153290
154291 # Add additional CWEs as references
155292 if len (cwes ) > 1 :
156293 finding .references = "Additional CWEs: " + ", " .join (f"CWE-{ cwe } " for cwe in cwes [1 :])
157294
158- # Add problem details and all source locations to impact
159- impact_details = []
160-
161- # Add problem information
162- problems = attributes .get ("problems" , [])
163- if problems :
164- problem = problems [0 ] # Take the first problem
165- impact_details .extend ([
166- f"Source: { problem .get ('source' , 'Unknown' )} " ,
167- f"Type: { problem .get ('type' , 'Unknown' )} " ,
168- f"Last Updated: { problem .get ('updated_at' , 'Unknown' )} " ,
169- f"Severity: { severity } " ,
170- "" , # Empty line before locations
171- ])
172-
173- # Add all source locations
174- for location in impact_locations :
175- impact_details .extend (location )
176-
295+ # Set impact with details
177296 if impact_details :
178297 finding .impact = "\n " .join (impact_details ).rstrip ()
179298
0 commit comments