|
1 | 1 | import csv |
2 | 2 | import io |
| 3 | +import json |
| 4 | +import logging |
3 | 5 | import sys |
4 | 6 |
|
5 | | -from dojo.models import Finding |
| 7 | +from dateutil import parser as date_parser |
6 | 8 |
|
| 9 | +from dojo.models import SEVERITIES, Finding, Test |
7 | 10 |
|
8 | | -class WizParser: |
9 | | - def get_scan_types(self): |
10 | | - return ["Wiz Scan"] |
| 11 | +logger = logging.getLogger(__name__) |
11 | 12 |
|
12 | | - def get_label_for_scan_types(self, scan_type): |
13 | | - return "Wiz Scan" |
14 | 13 |
|
15 | | - def get_description_for_scan_types(self, scan_type): |
16 | | - return "Wiz scan results in csv file format." |
| 14 | +class WizParserByTitle: |
| 15 | + """Parser the CSV where the "Title" field is the match for a finding title.""" |
17 | 16 |
|
18 | | - def get_findings(self, filename, test): |
19 | | - content = filename.read() |
20 | | - if isinstance(content, bytes): |
21 | | - content = content.decode("utf-8") |
22 | | - csv.field_size_limit(int(sys.maxsize / 10)) # the request/resp are big |
23 | | - reader = csv.DictReader(io.StringIO(content)) |
| 17 | + def parse_findings(self, test: Test, reader: csv.DictReader) -> list[Finding]: |
| 18 | + """Parse the CSV with the assumed format of the link below. |
| 19 | +
|
| 20 | + test file: https://github.com/DefectDojo/django-DefectDojo/blob/master/unittests/scans/wiz/multiple_findings.csv |
| 21 | + """ |
24 | 22 | findings = [] |
| 23 | + description_fields = [ |
| 24 | + "Description", |
| 25 | + "Resource Type", |
| 26 | + "Resource external ID", |
| 27 | + "Subscription ID", |
| 28 | + "Project IDs", |
| 29 | + "Project Names", |
| 30 | + "Control ID", |
| 31 | + "Resource Name", |
| 32 | + "Resource Region", |
| 33 | + "Resource Status", |
| 34 | + "Resource Platform", |
| 35 | + "Resource OS", |
| 36 | + "Resource original JSON", |
| 37 | + "Issue ID", |
| 38 | + "Resource vertex ID", |
| 39 | + "Ticket URLs", |
| 40 | + "Note", |
| 41 | + "Due At", |
| 42 | + "Subscription Name", |
| 43 | + "Wiz URL", |
| 44 | + "Cloud Provider URL", |
| 45 | + "Resource Tags", |
| 46 | + "Kubernetes Cluster", |
| 47 | + "Kubernetes Namespace", |
| 48 | + "Container Service", |
| 49 | + ] |
| 50 | + # Iterate over the objects to create findings |
25 | 51 | for row in reader: |
26 | 52 | if row.get("Status").lower() == "open": |
27 | | - Title = row.get("Title") |
28 | | - Severity = row.get("Severity") |
29 | | - Description = row.get("Description") |
30 | | - Resource_Type = row.get("Resource Type") |
31 | | - Resource_external_ID = row.get("Resource external ID") |
32 | | - Subscription_ID = row.get("Subscription ID") |
33 | | - Project_IDs = row.get("Project IDs") |
34 | | - Project_Names = row.get("Project Names") |
35 | | - Control_ID = row.get("Control ID") |
36 | | - Resource_Name = row.get("Resource Name") |
37 | | - Resource_Region = row.get("Resource Region") |
38 | | - Resource_Status = row.get("Resource Status") |
39 | | - Resource_Platform = row.get("Resource Platform") |
40 | | - Resource_OS = row.get("Resource OS") |
41 | | - Resource_original_JSON = row.get("Resource original JSON") |
42 | | - Issue_ID = row.get("Issue ID") |
43 | | - Resource_vertex_ID = row.get("Resource vertex ID") |
44 | | - Ticket_URLs = row.get("Ticket URLs") |
45 | | - Note = row.get("Note") |
46 | | - Due_At = row.get("Due At") |
47 | | - Subscription_Name = row.get("Subscription Name") |
48 | | - Wiz_URL = row.get("Wiz URL") |
49 | | - Cloud_Provider_URL = row.get("Cloud Provider URL") |
50 | | - Resource_Tags = row.get("Resource Tags") |
51 | | - Kubernetes_Cluster = row.get("Kubernetes Cluster") |
52 | | - Kubernetes_Namespace = row.get("Kubernetes Namespace") |
53 | | - Container_Service = row.get("Container Service") |
| 53 | + title = row.get("Title") |
| 54 | + severity = row.get("Severity") |
| 55 | + mitigation = row.get("Remediation Recommendation") |
54 | 56 | description = "" |
55 | | - description += "**Description**: " + Description + "\n" |
56 | | - description += "**Resource Type**: " + Resource_Type + "\n" |
57 | | - description += "**external ID**: " + Resource_external_ID + "\n" |
58 | | - description += "**Subscription ID**: " + Subscription_ID + "\n" |
59 | | - description += "**Project IDs**: " + Project_IDs + "\n" |
60 | | - description += "**Project Names**: " + Project_Names + "\n" |
61 | | - description += "**Control ID**: " + Control_ID + "\n" |
62 | | - description += "**Resource Name**: " + Resource_Name + "\n" |
63 | | - description += "**Resource Region**: " + Resource_Region + "\n" |
64 | | - description += "**Resource Status**: " + Resource_Status + "\n" |
65 | | - description += "**Resource Platform**: " + Resource_Platform + "\n" |
66 | | - description += "**Resource OS**: " + Resource_OS + "\n" |
67 | | - description += "**original JSON**: " + Resource_original_JSON + "\n" |
68 | | - description += "**Issue ID**: " + Issue_ID + "\n" |
69 | | - description += "**vertex ID**: " + Resource_vertex_ID + "\n" |
70 | | - description += "**Ticket URLs**: " + Ticket_URLs + "\n" |
71 | | - description += "**Note**: " + Note + "\n" |
72 | | - description += "**Due At**: " + Due_At + "\n" |
73 | | - description += "**Subscription Name**: " + Subscription_Name + "\n" |
74 | | - description += "**Wiz URL**: " + Wiz_URL + "\n" |
75 | | - description += "**Provider URL**: " + Cloud_Provider_URL + "\n" |
76 | | - description += "**Resource Tags**: " + Resource_Tags + "\n" |
77 | | - description += "**Kubernetes Cluster**: " + Kubernetes_Cluster + "\n" |
78 | | - description += "**Kubernetes Namespace**: " + Kubernetes_Namespace + "\n" |
79 | | - description += "**Container Service**: " + Container_Service + "\n" |
| 57 | + # Iterate over the description fields to create the description |
| 58 | + for field in description_fields: |
| 59 | + if (field_value := row.get(field)) is not None and len(field_value) > 0: |
| 60 | + description += f"**{field}**: {field_value}\n" |
| 61 | + # Create the finding object |
80 | 62 | findings.append( |
81 | 63 | Finding( |
82 | | - title=Title, |
| 64 | + title=title, |
83 | 65 | description=description, |
84 | | - severity=Severity.lower().capitalize(), |
| 66 | + severity=severity.lower().capitalize(), |
85 | 67 | static_finding=False, |
86 | 68 | dynamic_finding=True, |
87 | | - mitigation=row.get("Remediation Recommendation"), |
| 69 | + mitigation=mitigation, |
88 | 70 | test=test, |
89 | 71 | ), |
90 | 72 | ) |
91 | 73 | return findings |
| 74 | + |
| 75 | + |
| 76 | +class WizParserByDetailedName: |
| 77 | + """Parser the CSV where the "DetailedName" and "Name" fields are the match for a finding title.""" |
| 78 | + |
| 79 | + def parse_findings(self, test: Test, reader: csv.DictReader) -> list[Finding]: |
| 80 | + """Parse the CSV with the assumed format of the link below. |
| 81 | +
|
| 82 | + test file: Coming soon! |
| 83 | + """ |
| 84 | + findings = [] |
| 85 | + description_fields = { |
| 86 | + "WizURL": "Wiz URL", |
| 87 | + "HasExploit": "Has Exploit", |
| 88 | + "HasCisaKevExploit": "Has Cisa Kev Exploit", |
| 89 | + "LocationPath": "Location Path", |
| 90 | + "Version": "Version", |
| 91 | + "DetectionMethod": "Detection Method", |
| 92 | + "Link": "Link", |
| 93 | + "Projects": "Projects", |
| 94 | + "AssetID": "Asset ID", |
| 95 | + "AssetName": "Asset Name", |
| 96 | + "AssetRegion": "Asset Region", |
| 97 | + "ProviderUniqueId": "Provider Unique Id", |
| 98 | + "CloudProviderURL": "Cloud Provider URL", |
| 99 | + "CloudPlatform": "Cloud Platform", |
| 100 | + "SubscriptionExternalId": "Subscription External Id", |
| 101 | + "SubscriptionId": "Subscription Id", |
| 102 | + "SubscriptionName": "Subscription Name", |
| 103 | + "ExecutionControllers": "Execution Controllers", |
| 104 | + "ExecutionControllersSubscriptionExternalIds": "Execution Controllers Subscription External Ids", |
| 105 | + "ExecutionControllersSubscriptionNames": "Execution Controllers Subscription Names", |
| 106 | + "OperatingSystem": "Operating System", |
| 107 | + "IpAddresses": "Ip Addresses", |
| 108 | + } |
| 109 | + mitigation_fields = { |
| 110 | + "LocationPath": "Location Path", |
| 111 | + "FixedVersion": "Fixed Version", |
| 112 | + "Remediation": "Remediation", |
| 113 | + } |
| 114 | + |
| 115 | + for row in reader: |
| 116 | + # Common fields |
| 117 | + vulnerability_id = row.get("Name") |
| 118 | + package_name = row.get("DetailedName") |
| 119 | + package_version = row.get("Version") |
| 120 | + severity = row.get("VendorSeverity") |
| 121 | + finding_id = row.get("ID") |
| 122 | + |
| 123 | + description = self._construct_string_field(description_fields, row) |
| 124 | + mitigation = self._construct_string_field(mitigation_fields, row) |
| 125 | + status_dict = self._convert_status(row) |
| 126 | + # Create the finding object |
| 127 | + finding = Finding( |
| 128 | + title=f"{package_name}: {vulnerability_id}", |
| 129 | + description=description, |
| 130 | + mitigation=mitigation, |
| 131 | + severity=self._validate_severities(severity), |
| 132 | + static_finding=True, |
| 133 | + unique_id_from_tool=finding_id, |
| 134 | + component_name=package_name, |
| 135 | + component_version=package_version, |
| 136 | + date=date_parser.parse(row.get("FirstDetected")), |
| 137 | + test=test, |
| 138 | + **status_dict, |
| 139 | + ) |
| 140 | + finding.unsaved_vulnerability_ids = [vulnerability_id] |
| 141 | + finding.unsaved_tags = self._parse_tags(row.get("Tags", "[]")) |
| 142 | + findings.append(finding) |
| 143 | + return findings |
| 144 | + |
| 145 | + def _construct_string_field(self, fields: dict[str, str], row: dict) -> str: |
| 146 | + """Construct a formatted string based on the fields dict supplied.""" |
| 147 | + return_string = "" |
| 148 | + for field, pretty_field in fields.items(): |
| 149 | + if (field_value := row.get(field)) is not None and len(field_value) > 0: |
| 150 | + return_string += f"**{pretty_field}**: `{field_value}`\n" |
| 151 | + return return_string |
| 152 | + |
| 153 | + def _parse_tags(self, tags: str) -> list[str]: |
| 154 | + """parse the Tag string dict, and convert to a list of strings. |
| 155 | +
|
| 156 | + The format of the tags is is "{""key"":""value""}" format |
| 157 | + """ |
| 158 | + # Convert the string to a dict |
| 159 | + tag_dict = json.loads(tags) |
| 160 | + return [f"{key}: {value}" for key, value in tag_dict.items()] |
| 161 | + |
| 162 | + def _validate_severities(self, severity: str) -> str: |
| 163 | + """Ensure the supplied severity fits what DefectDojo is expecting.""" |
| 164 | + if severity not in SEVERITIES: |
| 165 | + logger.error(f"Severity is not supported: {severity}") |
| 166 | + # Default to Info severity |
| 167 | + return "Info" |
| 168 | + return severity |
| 169 | + |
| 170 | + def _convert_status(self, row: dict) -> dict: |
| 171 | + """Convert the "FindingStatus" column to a dict of Finding statuses. |
| 172 | +
|
| 173 | + - Open-> Active = True |
| 174 | + - Other statuses that may exist... |
| 175 | + """ |
| 176 | + if (status := row.get("FindingStatus")) is not None: |
| 177 | + if status == "Open": |
| 178 | + return {"active": True} |
| 179 | + # Return the default status of active |
| 180 | + return {"active": True} |
| 181 | + |
| 182 | + |
| 183 | +class WizParser( |
| 184 | + WizParserByTitle, |
| 185 | + WizParserByDetailedName, |
| 186 | +): |
| 187 | + def get_scan_types(self): |
| 188 | + return ["Wiz Scan"] |
| 189 | + |
| 190 | + def get_label_for_scan_types(self, scan_type): |
| 191 | + return "Wiz Scan" |
| 192 | + |
| 193 | + def get_description_for_scan_types(self, scan_type): |
| 194 | + return "Wiz scan results in csv file format." |
| 195 | + |
| 196 | + def get_findings(self, filename, test): |
| 197 | + content = filename.read() |
| 198 | + if isinstance(content, bytes): |
| 199 | + content = content.decode("utf-8") |
| 200 | + csv.field_size_limit(int(sys.maxsize / 10)) # the request/resp are big |
| 201 | + reader = csv.DictReader(io.StringIO(content)) |
| 202 | + # Determine which parser to use |
| 203 | + if "Title" in reader.fieldnames: |
| 204 | + return WizParserByTitle().parse_findings(test, reader) |
| 205 | + if all(field in reader.fieldnames for field in ["Name", "DetailedName"]): |
| 206 | + return WizParserByDetailedName().parse_findings(test, reader) |
| 207 | + else: |
| 208 | + msg = "This CSV format of Wiz is not supported" |
| 209 | + raise ValueError(msg) |
0 commit comments