|
| 1 | +import hashlib |
| 2 | +from dataclasses import dataclass |
| 3 | +from typing import Optional |
| 4 | + |
| 5 | +from dojo.models import Endpoint, Finding |
| 6 | + |
| 7 | + |
| 8 | +@dataclass |
| 9 | +class OpenVASFindingAuxData: |
| 10 | + |
| 11 | + """Dataclass to contain all information added later to a finding""" |
| 12 | + |
| 13 | + references: list[str] |
| 14 | + summary: str = "" |
| 15 | + qod: str = "" |
| 16 | + openvas_result: str = "" |
| 17 | + fallback_cvss_score: float | None = None |
| 18 | + |
| 19 | + |
| 20 | +def setup_finding(test: bool) -> tuple[Finding, OpenVASFindingAuxData]: |
| 21 | + """Base setup and init for findings and auxiliary data""" |
| 22 | + finding = Finding(test=test, dynamic_finding=True, static_finding=False, severity="Info", nb_occurences=1, cwe=None) |
| 23 | + finding.unsaved_vulnerability_ids = [] |
| 24 | + finding.unsaved_endpoints = [Endpoint()] |
| 25 | + |
| 26 | + aux_info = OpenVASFindingAuxData([]) |
| 27 | + |
| 28 | + return finding, aux_info |
| 29 | + |
| 30 | + |
| 31 | +def is_valid_severity(severity: str) -> bool: |
| 32 | + valid_severity = ("Info", "Low", "Medium", "High", "Critical") |
| 33 | + return severity in valid_severity |
| 34 | + |
| 35 | + |
| 36 | +def cleanup_openvas_text(text: str) -> str: |
| 37 | + """Removes unnessesary defectojo newlines""" |
| 38 | + return text.replace("\n ", " ") |
| 39 | + |
| 40 | + |
| 41 | +def escape_restructured_text(text: str) -> str: |
| 42 | + """Changes text so that restructured text symbols are not interpreted""" |
| 43 | + # OpenVAS likes to include markdown like tables in some fields |
| 44 | + # Defectdojo uses reStructuredText which causes them to be rendered wrong |
| 45 | + return f"```\n{text}\n```" |
| 46 | + |
| 47 | + |
| 48 | +def postprocess_finding(finding: Finding, aux_info: OpenVASFindingAuxData): |
| 49 | + """Update finding with AuxData content""" |
| 50 | + if aux_info.openvas_result: |
| 51 | + finding.steps_to_reproduce = escape_restructured_text(cleanup_openvas_text(aux_info.openvas_result)) |
| 52 | + if aux_info.summary: |
| 53 | + finding.description += f"\n**Summary**: {cleanup_openvas_text(aux_info.summary)}" |
| 54 | + if aux_info.qod: |
| 55 | + finding.description += f"\n**QoD**: {aux_info.qod}" |
| 56 | + if len(aux_info.references) > 0: |
| 57 | + finding.references = "\n".join(["- " + ref for ref in aux_info.references]) |
| 58 | + # fallback in case no cvss version is detected |
| 59 | + if aux_info.fallback_cvss_score and not finding.cvssv3_score and not finding.cvssv4_score: |
| 60 | + finding.cvssv3_score = aux_info.fallback_cvss_score |
| 61 | + |
| 62 | + # heuristic for fixed-available detection |
| 63 | + if finding.mitigation: |
| 64 | + search_terms = ["Update to version", "The vendor has released updates"] |
| 65 | + if any(text in finding.mitigation for text in search_terms): |
| 66 | + finding.fix_available = True |
| 67 | + |
| 68 | + |
| 69 | +def deduplicate(dupes: dict[str, Finding], finding: Finding): |
| 70 | + """Combine multiple openvas findings into one defectdojo finding with potentially multiple endpoints""" |
| 71 | + finding_hash = gen_finding_hash(finding) |
| 72 | + |
| 73 | + if finding_hash not in dupes: |
| 74 | + dupes[finding_hash] = finding |
| 75 | + else: |
| 76 | + # OpenVas does not combine multiple findings into one |
| 77 | + # e.g if 2 vulnerable java runtimes are present on the host this is reported as 2 finding. |
| 78 | + # The only way do differantiate theese findings when they are based on the same vulnerabilty |
| 79 | + # is the data in mapped to steps to reproduce. |
| 80 | + # However we cannot hash this field as it can contain data that changes between scans |
| 81 | + # e.g timestamps or packet ids |
| 82 | + # we therfore combine them into one defectdojo finding because duplicates during reimport cause |
| 83 | + # https://github.com/DefectDojo/django-DefectDojo/issues/3958 |
| 84 | + org = dupes[finding_hash] |
| 85 | + org.nb_occurences += 1 |
| 86 | + if org.steps_to_reproduce != finding.steps_to_reproduce: |
| 87 | + if "Endpoint" in org.steps_to_reproduce: |
| 88 | + org.steps_to_reproduce += "\n---------------------------------------\n" |
| 89 | + org.steps_to_reproduce += f"**Endpoint**: {finding.unsaved_endpoints[0].host}\n" |
| 90 | + org.steps_to_reproduce += finding.steps_to_reproduce |
| 91 | + else: |
| 92 | + tmp = org.steps_to_reproduce |
| 93 | + org.steps_to_reproduce = f"**Endpoint**: {org.unsaved_endpoints[0].host}\n" |
| 94 | + org.steps_to_reproduce += tmp |
| 95 | + |
| 96 | + # combine identical findings on different hosts into one with multiple hosts |
| 97 | + endpoint = finding.unsaved_endpoints[0] |
| 98 | + if endpoint not in org.unsaved_endpoints: |
| 99 | + org.unsaved_endpoints += finding.unsaved_endpoints |
| 100 | + |
| 101 | + |
| 102 | +def gen_finding_hash(finding: Finding) -> str: |
| 103 | + """Generate a hash for a finding that is used for deduplication of findings inside the current report""" |
| 104 | + endpoint = finding.unsaved_endpoints[0] |
| 105 | + hash_data = [ |
| 106 | + str(endpoint), |
| 107 | + finding.title, |
| 108 | + finding.vuln_id_from_tool, |
| 109 | + finding.severity, |
| 110 | + ] |
| 111 | + return hashlib.sha256("|".join(hash_data).encode("utf-8")).hexdigest() |
0 commit comments