1+ import csv
12import hashlib
3+ import io
24import json
5+ from contextlib import suppress
36from datetime import datetime
7+ from typing import ClassVar
48
5- from dojo .models import Finding
9+ from dateutil import parser as date_parser
10+ from django .core .files .uploadedfile import TemporaryUploadedFile
11+
12+ from dojo .models import Finding , Test
613
714__author__ = "Kirill Gotsman"
815
916
10- class H1Parser :
17+ class HackerOneVulnerabilityDisclosureProgram :
1118 """
12- A class that can be used to parse the Get All Reports JSON export from HackerOne API.
19+ Vulnerability Disclosure Program HackerOne reports
1320 """
1421
15- def get_scan_types (self ):
16- return ["HackerOne Cases" ]
17-
18- def get_label_for_scan_types (self , scan_type ):
19- return scan_type
20-
21- def get_description_for_scan_types (self , scan_type ):
22- return "Import HackerOne cases findings in JSON format."
23-
24- def get_findings (self , file , test ):
22+ def get_vulnerability_disclosure_json_findings (self , tree , test ):
2523 """
2624 Converts a HackerOne reports to a DefectDojo finding
2725 """
28-
29- # Load the contents of the JSON file into a dictionary
30- data = file .read ()
31- try :
32- tree = json .loads (str (data , "utf-8" ))
33- except Exception :
34- tree = json .loads (data )
3526 # Convert JSON report to DefectDojo format
3627 dupes = {}
3728 for content in tree ["data" ]:
3829 # Get all relevant data
3930 date = content ["attributes" ]["created_at" ]
4031 date = datetime .strftime (
41- datetime .strptime (date , "%Y-%m-%dT%H:%M:%S.%fZ" ), "%Y-%m-%d" ,
32+ datetime .strptime (date , "%Y-%m-%dT%H:%M:%S.%fZ" ),
33+ "%Y-%m-%d" ,
4234 )
4335 # Build the title of the Dojo finding
4436 title = "#" + content ["id" ] + " " + content ["attributes" ]["title" ]
@@ -47,21 +39,15 @@ def get_findings(self, file, test):
4739
4840 # References
4941 try :
50- issue_tracker_id = content ["attributes" ][
51- "issue_tracker_reference_id"
52- ]
53- issue_tracker_url = content ["attributes" ][
54- "issue_tracker_reference_url"
55- ]
42+ issue_tracker_id = content ["attributes" ]["issue_tracker_reference_id" ]
43+ issue_tracker_url = content ["attributes" ]["issue_tracker_reference_url" ]
5644 references = f"[{ issue_tracker_id } ]({ issue_tracker_url } )\n "
5745 except Exception :
5846 references = ""
5947
6048 # Build the severity of the Dojo finding
6149 try :
62- severity = content ["relationships" ]["severity" ]["data" ][
63- "attributes"
64- ]["rating" ].capitalize ()
50+ severity = content ["relationships" ]["severity" ]["data" ]["attributes" ]["rating" ].capitalize ()
6551 if severity not in ["Low" , "Medium" , "High" , "Critical" ]:
6652 severity = "Info"
6753 except Exception :
@@ -81,9 +67,7 @@ def get_findings(self, file, test):
8167 # Set CWE of the Dojo finding
8268 try :
8369 cwe = int (
84- content ["relationships" ]["weakness" ]["data" ]["attributes" ][
85- "external_id"
86- ][4 :],
70+ content ["relationships" ]["weakness" ]["data" ]["attributes" ]["external_id" ][4 :],
8771 )
8872 except Exception :
8973 cwe = 0
@@ -121,11 +105,10 @@ def get_findings(self, file, test):
121105 def build_description (self , content ):
122106 date = content ["attributes" ]["created_at" ]
123107 date = datetime .strftime (
124- datetime .strptime (date , "%Y-%m-%dT%H:%M:%S.%fZ" ), "%Y-%m-%d" ,
108+ datetime .strptime (date , "%Y-%m-%dT%H:%M:%S.%fZ" ),
109+ "%Y-%m-%d" ,
125110 )
126- reporter = content ["relationships" ]["reporter" ]["data" ]["attributes" ][
127- "username"
128- ]
111+ reporter = content ["relationships" ]["reporter" ]["data" ]["attributes" ]["username" ]
129112 triaged_date = content ["attributes" ]["triaged_at" ]
130113
131114 # Build the description of the Dojo finding
@@ -142,9 +125,7 @@ def build_description(self, content):
142125
143126 # Try to grab CVSS
144127 try :
145- cvss = content ["relationships" ]["severity" ]["data" ]["attributes" ][
146- "score"
147- ]
128+ cvss = content ["relationships" ]["severity" ]["data" ]["attributes" ]["score" ]
148129 description += f"CVSS: { cvss } \n "
149130 except Exception :
150131 pass
@@ -156,14 +137,186 @@ def build_description(self, content):
156137
157138 # Try to grab weakness if it's there
158139 try :
159- weakness_title = content ["relationships" ]["weakness" ]["data" ][
160- "attributes"
161- ]["name" ]
162- weakness_desc = content ["relationships" ]["weakness" ]["data" ][
163- "attributes"
164- ]["description" ]
140+ weakness_title = content ["relationships" ]["weakness" ]["data" ]["attributes" ]["name" ]
141+ weakness_desc = content ["relationships" ]["weakness" ]["data" ]["attributes" ]["description" ]
165142 description += f"\n ##Weakness: { weakness_title } \n { weakness_desc } "
166143 except Exception :
167144 pass
168145
169146 return description
147+
148+
149+ class HackerOneBugBountyProgram :
150+ """Bug Bounty Program HackerOne reports."""
151+
152+ fields_to_label : ClassVar [dict [str , str ]] = {
153+ "id" : "ID" ,
154+ "weakness" : "Weakness Category" ,
155+ "substate" : "Substate" ,
156+ "reporter" : "Reporter" ,
157+ "assigned" : "Assigned To" ,
158+ "public" : "Public" ,
159+ "triageted_at" : "Triaged On" ,
160+ "closed_at" : "Closed On" ,
161+ "awarded_at" : "Awarded On" ,
162+ "bounty" : "Bounty Price" ,
163+ "bonus" : "Bonus" ,
164+ "first_response_at" : "First Response On" ,
165+ "source" : "Source" ,
166+ "reference" : "Reference" ,
167+ "reference_url" : "Reference URL" ,
168+ "structured_scope" : "Structured Scope" ,
169+ "structured_scope_reference" : "Structured Scope Reference" ,
170+ "original_report_id" : "Original Report ID" ,
171+ "collaborating_users" : "Collaboration Users" ,
172+ "duplicate_report_ids" : "Duplicate Report IDs" ,
173+ }
174+
175+ def get_bug_bounty_program_json_findings (self , dict_list : dict , test : Test ) -> list [Finding ]:
176+ return self .parse_findings (dict_list , test )
177+
178+ def get_bug_bounty_program_csv_findings (self , dict_list : dict , test : Test ) -> list [Finding ]:
179+ return self .parse_findings (dict_list , test )
180+
181+ def parse_findings (self , dict_list : list [dict ], test : Test ) -> list [Finding ]:
182+ """Return a list of findings generated by the submitted report."""
183+ findings = []
184+ for entry in dict_list :
185+ status_dict = self .determine_status (entry )
186+ finding = Finding (
187+ title = entry .get ("title" ),
188+ severity = self .convert_severity (entry ),
189+ description = self .parse_description (entry ),
190+ date = date_parser .parse (entry .get ("reported_at" )),
191+ dynamic_finding = True ,
192+ test = test ,
193+ ** status_dict ,
194+ )
195+ # Add vulnerability IDs if they are present
196+ if (cve_str := entry .get ("cve_ids" )) is not None and len (cve_str ) > 0 :
197+ finding .unsaved_vulnerability_ids = [cve_str ]
198+ # Add the finding the the list
199+ findings .append (finding )
200+ return findings
201+
202+ def determine_status (self , row ) -> dict :
203+ """Generate a dict of status meta to fully represent that state of the finding
204+
205+ Possible states currently supported are open and closed. In the event that neither
206+ of those options are present, the open status will be the default, and returned
207+ """
208+ default_status = {
209+ "active" : True ,
210+ }
211+ # Open status -> active = True
212+ # Closed status -> is_mitigated = True + timestamp
213+ if (status := row .get ("state" )) is not None :
214+ if status == "open" :
215+ return default_status
216+ if status == "closed" :
217+ return {
218+ "is_mitigated" : True ,
219+ "active" : False ,
220+ "mitigated" : date_parser .parse (row .get ("closed_at" )),
221+ }
222+ return default_status
223+
224+ def convert_severity (self , entry : dict ) -> str :
225+ """Convert the severity from the parser from the string value, or CVSS score."""
226+ # Try to use the string severity first
227+ if (severity := entry .get ("severity_rating" )) is not None :
228+ if severity in ["critical" , "high" , "medium" , "low" ]:
229+ return severity .capitalize ()
230+ # Fall back to "severity_score" which I assume is CVSS Score
231+ if (severity_score := entry .get ("severity_score" )) is not None :
232+ with suppress (ValueError ):
233+ severity_score = float (severity_score )
234+ if severity_score >= 9.0 :
235+ return "Critical"
236+ if severity_score >= 7.0 :
237+ return "High"
238+ if severity_score >= 4.0 :
239+ return "Medium"
240+ if severity_score > 0.0 :
241+ return "Low"
242+ # Default to Info in all cases (assuming we reach this)
243+ return "Info"
244+
245+ def parse_description (self , entry : dict ) -> str :
246+ """Build the description from the mapping set in the fields_to_label var."""
247+ # Iterate over the items and build the string
248+ description = ""
249+ for field , label in self .fields_to_label .items ():
250+ if (value := entry .get (field )) is not None and len (value ) > 0 :
251+ description += f"**{ label } **: { value } \n "
252+ return description
253+
254+
255+ class H1Parser (
256+ HackerOneVulnerabilityDisclosureProgram ,
257+ HackerOneBugBountyProgram ,
258+ ):
259+ """
260+ A class that can be used to parse the Get All Reports JSON export from HackerOne API.
261+ """
262+
263+ def get_scan_types (self ):
264+ return ["HackerOne Cases" ]
265+
266+ def get_label_for_scan_types (self , scan_type ):
267+ return scan_type
268+
269+ def get_description_for_scan_types (self , scan_type ):
270+ return "Import HackerOne cases findings in JSON format."
271+
272+ def get_findings (self , file : TemporaryUploadedFile , test : Test ) -> list [Finding ]:
273+ """Return the list of findings generated from the uploaded report."""
274+ # first determine which format to pase
275+ file_name = file .name
276+ if str (file_name ).endswith (".json" ):
277+ return self .determine_json_format (file , test )
278+ elif str (file_name ).endswith (".csv" ):
279+ return self .determine_csv_format (file , test )
280+ else :
281+ msg = "Filename extension not recognized. Use .json or .csv"
282+ raise ValueError (msg )
283+
284+ def get_json_tree (self , file : TemporaryUploadedFile ) -> dict :
285+ """Extract the CSV file into a iterable that represents a dict."""
286+ data = file .read ()
287+ try :
288+ tree = json .loads (str (data , "utf-8" ))
289+ except Exception :
290+ tree = json .loads (data )
291+ return tree
292+
293+ def determine_json_format (self , file : TemporaryUploadedFile , test : Test ) -> list [Finding ]:
294+ """Evaluate the format of the JSON report that was uploaded to determine which parser to use."""
295+ tree = self .get_json_tree (file )
296+ # Check for some root elements
297+ if "findings" in tree :
298+ return self .get_bug_bounty_program_json_findings (tree .get ("findings" , []), test )
299+ if "data" in tree :
300+ return self .get_vulnerability_disclosure_json_findings (tree , test )
301+ else :
302+ msg = "This JSON format is not supported"
303+ raise ValueError (msg )
304+
305+ def get_csv_reader (self , file : TemporaryUploadedFile ) -> csv .DictReader :
306+ """Extract the CSV file into a iterable that represents a dict."""
307+ if file is None :
308+ return ()
309+ content = file .read ()
310+ if isinstance (content , bytes ):
311+ content = content .decode ("utf-8" )
312+ return csv .DictReader (io .StringIO (content ), delimiter = "," , quotechar = '"' )
313+
314+ def determine_csv_format (self , file : TemporaryUploadedFile , test : Test ) -> list [Finding ]:
315+ """Evaluate the format of the CSV report that was uploaded to determine which parser to use."""
316+ reader = self .get_csv_reader (file )
317+ # Check for some root elements
318+ if "bounty" in reader .fieldnames :
319+ return self .get_bug_bounty_program_csv_findings (reader , test )
320+ else :
321+ msg = "This CSV format is not supported"
322+ raise ValueError (msg )
0 commit comments