Skip to content

Commit 3627081

Browse files
dedupe reopen: continue to try all match candidates (#14011)
* dedupe reopen: add test cases that prove the bug * remove obsolete method * dedupe reopen: proceed with next candidate if candidate is mitigated * rename methods
1 parent 1b235b5 commit 3627081

3 files changed

Lines changed: 439 additions & 39 deletions

File tree

dojo/finding/deduplication.py

Lines changed: 26 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
from collections.abc import Iterator
23
from operator import attrgetter
34

45
import hyperlink
@@ -127,7 +128,7 @@ def set_duplicate(new_finding, existing_finding):
127128
msg = "Can not add duplicate to itself"
128129
raise Exception(msg)
129130
if is_duplicate_reopen(new_finding, existing_finding):
130-
msg = "Found a regression. Ignore this so that a new duplicate chain can be made"
131+
msg = "Found a regression where a duplicate of a mitigated finding is found. We do not reopen this, but create a new duplicate chain as per @PR 9558: Deduplication: Do not reopen original finding"
131132
raise Exception(msg)
132133
if new_finding.duplicate and finding_mitigated(existing_finding):
133134
msg = "Skip this finding as we do not want to attach a new duplicate to a mitigated finding"
@@ -174,17 +175,6 @@ def finding_not_human_set_status(finding: Finding) -> bool:
174175
return finding.out_of_scope is False and finding.false_p is False
175176

176177

177-
def set_duplicate_reopen(new_finding, existing_finding):
178-
logger.debug("duplicate reopen existing finding")
179-
existing_finding.mitigated = new_finding.mitigated
180-
existing_finding.is_mitigated = new_finding.is_mitigated
181-
existing_finding.active = new_finding.active
182-
existing_finding.verified = new_finding.verified
183-
existing_finding.notes.create(author=existing_finding.reporter,
184-
entry="This finding has been automatically re-opened as it was found in recent scans.")
185-
existing_finding.save()
186-
187-
188178
def is_deduplication_on_engagement_mismatch(new_finding, to_duplicate_finding):
189179
if new_finding.test.engagement != to_duplicate_finding.test.engagement:
190180
deduplication_mismatch = new_finding.test.engagement.deduplication_on_engagement \
@@ -355,9 +345,9 @@ def _is_candidate_older(new_finding, candidate):
355345
return is_older
356346

357347

358-
def match_hash_candidate(new_finding, candidates_by_hash):
348+
def get_matches_from_hash_candidates(new_finding, candidates_by_hash) -> Iterator[Finding]:
359349
if new_finding.hash_code is None:
360-
return None
350+
return
361351
possible_matches = candidates_by_hash.get(new_finding.hash_code, [])
362352
deduplicationLogger.debug(f"Finding {new_finding.id}: Found {len(possible_matches)} findings with same hash_code, ids={[(c.id, c.hash_code) for c in possible_matches]}")
363353

@@ -368,13 +358,12 @@ def match_hash_candidate(new_finding, candidates_by_hash):
368358
deduplicationLogger.debug("deduplication_on_engagement_mismatch, skipping dedupe.")
369359
continue
370360
if are_endpoints_duplicates(new_finding, candidate):
371-
return candidate
372-
return None
361+
yield candidate
373362

374363

375-
def match_unique_id_candidate(new_finding, candidates_by_uid):
364+
def get_matches_from_unique_id_candidates(new_finding, candidates_by_uid) -> Iterator[Finding]:
376365
if new_finding.unique_id_from_tool is None:
377-
return None
366+
return
378367

379368
possible_matches = candidates_by_uid.get(new_finding.unique_id_from_tool, [])
380369
deduplicationLogger.debug(f"Finding {new_finding.id}: Found {len(possible_matches)} findings with same unique_id_from_tool, ids={[(c.id, c.unique_id_from_tool) for c in possible_matches]}")
@@ -385,11 +374,10 @@ def match_unique_id_candidate(new_finding, candidates_by_uid):
385374
if is_deduplication_on_engagement_mismatch(new_finding, candidate):
386375
deduplicationLogger.debug("deduplication_on_engagement_mismatch, skipping dedupe.")
387376
continue
388-
return candidate
389-
return None
377+
yield candidate
390378

391379

392-
def match_uid_or_hash_candidate(new_finding, candidates_by_uid, candidates_by_hash):
380+
def get_matches_from_uid_or_hash_candidates(new_finding, candidates_by_uid, candidates_by_hash) -> Iterator[Finding]:
393381
# Combine UID and hash candidates and walk oldest-first
394382
uid_list = candidates_by_uid.get(new_finding.unique_id_from_tool, []) if new_finding.unique_id_from_tool is not None else []
395383
hash_list = candidates_by_hash.get(new_finding.hash_code, []) if new_finding.hash_code is not None else []
@@ -404,15 +392,15 @@ def match_uid_or_hash_candidate(new_finding, candidates_by_uid, candidates_by_ha
404392
continue
405393
if is_deduplication_on_engagement_mismatch(new_finding, candidate):
406394
deduplicationLogger.debug("deduplication_on_engagement_mismatch, skipping dedupe.")
407-
return None
395+
continue
408396
if are_endpoints_duplicates(new_finding, candidate):
409397
deduplicationLogger.debug("UID_OR_HASH: endpoints match, returning candidate %s with test_type %s unique_id_from_tool %s hash_code %s", candidate.id, candidate.test.test_type, candidate.unique_id_from_tool, candidate.hash_code)
410-
return candidate
411-
deduplicationLogger.debug("UID_OR_HASH: endpoints mismatch, skipping candidate %s", candidate.id)
412-
return None
398+
yield candidate
399+
else:
400+
deduplicationLogger.debug("UID_OR_HASH: endpoints mismatch, skipping candidate %s", candidate.id)
413401

414402

415-
def match_legacy_candidate(new_finding, candidates_by_title, candidates_by_cwe):
403+
def get_matches_from_legacy_candidates(new_finding, candidates_by_title, candidates_by_cwe) -> Iterator[Finding]:
416404
# ---------------------------------------------------------
417405
# 1) Collects all the findings that have the same:
418406
# (title and static_finding and dynamic_finding)
@@ -469,8 +457,7 @@ def match_legacy_candidate(new_finding, candidates_by_title, candidates_by_cwe):
469457
+ " flag_endpoints: " + str(flag_endpoints) + " flag_line_path:" + str(flag_line_path) + " flag_hash:" + str(flag_hash))
470458

471459
if (flag_endpoints or flag_line_path) and flag_hash:
472-
return candidate
473-
return None
460+
yield candidate
474461

475462

476463
def _dedupe_batch_hash_code(findings):
@@ -482,10 +469,10 @@ def _dedupe_batch_hash_code(findings):
482469
return
483470
for new_finding in findings:
484471
deduplicationLogger.debug(f"deduplication start for finding {new_finding.id} with DEDUPE_ALGO_HASH_CODE")
485-
match = match_hash_candidate(new_finding, candidates_by_hash)
486-
if match:
472+
for match in get_matches_from_hash_candidates(new_finding, candidates_by_hash):
487473
try:
488474
set_duplicate(new_finding, match)
475+
break
489476
except Exception as e:
490477
deduplicationLogger.debug(str(e))
491478

@@ -499,12 +486,14 @@ def _dedupe_batch_unique_id(findings):
499486
return
500487
for new_finding in findings:
501488
deduplicationLogger.debug(f"deduplication start for finding {new_finding.id} with DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL")
502-
match = match_unique_id_candidate(new_finding, candidates_by_uid)
503-
if match:
489+
for match in get_matches_from_unique_id_candidates(new_finding, candidates_by_uid):
490+
deduplicationLogger.debug(f"Trying to deduplicate finding {new_finding.id} against candidate {match.id}")
504491
try:
505492
set_duplicate(new_finding, match)
493+
deduplicationLogger.debug(f"Successfully deduplicated finding {new_finding.id} against candidate {match.id}")
494+
break
506495
except Exception as e:
507-
deduplicationLogger.debug(str(e))
496+
deduplicationLogger.debug(f"Exception when deduplicating finding {new_finding.id} against candidate {match.id}: {e!s}")
508497

509498

510499
def _dedupe_batch_uid_or_hash(findings):
@@ -520,13 +509,12 @@ def _dedupe_batch_uid_or_hash(findings):
520509
if new_finding.duplicate:
521510
continue
522511

523-
match = match_uid_or_hash_candidate(new_finding, candidates_by_uid, existing_by_hash)
524-
if match:
512+
for match in get_matches_from_uid_or_hash_candidates(new_finding, candidates_by_uid, existing_by_hash):
525513
try:
526514
set_duplicate(new_finding, match)
515+
break
527516
except Exception as e:
528517
deduplicationLogger.debug(str(e))
529-
continue
530518

531519

532520
def _dedupe_batch_legacy(findings):
@@ -538,10 +526,10 @@ def _dedupe_batch_legacy(findings):
538526
return
539527
for new_finding in findings:
540528
deduplicationLogger.debug(f"deduplication start for finding {new_finding.id} with DEDUPE_ALGO_LEGACY")
541-
match = match_legacy_candidate(new_finding, candidates_by_title, candidates_by_cwe)
542-
if match:
529+
for match in get_matches_from_legacy_candidates(new_finding, candidates_by_title, candidates_by_cwe):
543530
try:
544531
set_duplicate(new_finding, match)
532+
break
545533
except Exception as e:
546534
deduplicationLogger.debug(str(e))
547535

unittests/test_deduplication_logic.py

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from django.core import serializers
1010
from django.utils import timezone
1111

12+
from dojo.finding.deduplication import set_duplicate
1213
from dojo.importers.default_importer import DefaultImporter
1314
from dojo.models import (
1415
Development_Environment,
@@ -879,6 +880,136 @@ def test_extra_endpoints_unique_id(self):
879880
# expect duplicate: unique_id match regardless of extra endpoints
880881
self.assert_finding(finding_new, not_pk=124, duplicate=True, duplicate_finding_id=124, hash_code=finding_124.hash_code)
881882

883+
def test_regression_duplicate_reopen_unique_id(self):
884+
"""
885+
Test the is_duplicate_reopen exception in set_duplicate.
886+
When trying to attach a new active finding to a mitigated finding,
887+
an exception should be raised.
888+
889+
Related to: https://github.com/DefectDojo/django-DefectDojo/issues/14010
890+
"""
891+
# Get an existing finding with unique_id (finding 124)
892+
finding_124 = Finding.objects.get(id=124)
893+
original_unique_id = finding_124.unique_id_from_tool
894+
895+
# Mitigate the existing finding
896+
finding_124.active = False
897+
finding_124.is_mitigated = True
898+
finding_124.mitigated = timezone.now()
899+
finding_124.out_of_scope = False
900+
finding_124.false_p = False
901+
finding_124.save(dedupe_option=False)
902+
903+
# Create a new finding with the same unique_id that is active (not mitigated)
904+
finding_new, _ = self.copy_and_reset_finding(find_id=124)
905+
finding_new.active = True
906+
finding_new.is_mitigated = False
907+
finding_new.mitigated = None
908+
finding_new.unique_id_from_tool = original_unique_id
909+
finding_new.save(dedupe_option=False)
910+
911+
# Try to deduplicate - this should raise an exception
912+
with self.assertRaises(Exception) as context:
913+
set_duplicate(finding_new, finding_124)
914+
915+
self.assertIn("Found a regression", str(context.exception))
916+
917+
def test_duplicate_attached_to_mitigated_finding_unique_id(self):
918+
"""
919+
Test the exception when attaching a finding that is already marked as duplicate
920+
to a mitigated finding.
921+
922+
Related to: https://github.com/DefectDojo/django-DefectDojo/issues/14010
923+
"""
924+
# Get an existing finding with unique_id (finding 124)
925+
finding_124 = Finding.objects.get(id=124)
926+
original_unique_id = finding_124.unique_id_from_tool
927+
928+
# Mitigate the existing finding
929+
finding_124.active = False
930+
finding_124.is_mitigated = True
931+
finding_124.mitigated = timezone.now()
932+
finding_124.save(dedupe_option=False)
933+
934+
# Create a new finding that is already marked as duplicate AND also mitigated
935+
# This ensures that is_duplicate_reopen doesn't trigger first
936+
finding_new, _ = self.copy_and_reset_finding(find_id=124)
937+
finding_new.duplicate = True
938+
finding_new.active = False
939+
finding_new.is_mitigated = True
940+
finding_new.mitigated = timezone.now()
941+
finding_new.unique_id_from_tool = original_unique_id
942+
finding_new.save(dedupe_option=False)
943+
944+
# Try to deduplicate - this should raise an exception
945+
with self.assertRaises(Exception) as context:
946+
set_duplicate(finding_new, finding_124)
947+
948+
self.assertIn("Skip this finding as we do not want to attach a new duplicate to a mitigated finding", str(context.exception))
949+
950+
def test_multiple_findings_same_unique_id_mixed_states_unique_id(self):
951+
"""
952+
Test deduplication with multiple findings having the same unique_id,
953+
where some are mitigated and some are active. The deduplication should
954+
skip mitigated ones and use the first active one.
955+
956+
Related to: https://github.com/DefectDojo/django-DefectDojo/issues/14010
957+
"""
958+
# Get an existing finding with unique_id (finding 124)
959+
finding_124 = Finding.objects.get(id=124)
960+
original_unique_id = finding_124.unique_id_from_tool
961+
962+
# Mitigate the original finding so it's not a candidate
963+
finding_124.active = False
964+
finding_124.is_mitigated = True
965+
finding_124.mitigated = timezone.now()
966+
finding_124.save()
967+
968+
# Create multiple findings with the same unique_id
969+
# First: active finding (this should become the original)
970+
finding_active, _ = self.copy_and_reset_finding(find_id=124)
971+
finding_active.active = True
972+
finding_active.is_mitigated = False
973+
finding_active.mitigated = None
974+
finding_active.unique_id_from_tool = original_unique_id
975+
finding_active.save(dedupe_option=False) # Don't deduplicate so it remains active
976+
977+
# Second: mitigated finding
978+
finding_mitigated, _ = self.copy_and_reset_finding(find_id=124)
979+
finding_mitigated.active = False
980+
finding_mitigated.is_mitigated = True
981+
finding_mitigated.mitigated = timezone.now()
982+
finding_mitigated.out_of_scope = False
983+
finding_mitigated.false_p = False
984+
finding_mitigated.unique_id_from_tool = original_unique_id
985+
finding_mitigated.save(dedupe_option=False)
986+
987+
# Ensure mitigated finding has lower ID (will be checked first)
988+
if finding_mitigated.id > finding_active.id:
989+
# Swap by deleting and recreating in correct order
990+
finding_mitigated.delete()
991+
finding_mitigated, _ = self.copy_and_reset_finding(find_id=124)
992+
finding_mitigated.active = False
993+
finding_mitigated.is_mitigated = True
994+
finding_mitigated.mitigated = timezone.now()
995+
finding_mitigated.out_of_scope = False
996+
finding_mitigated.false_p = False
997+
finding_mitigated.unique_id_from_tool = original_unique_id
998+
finding_mitigated.save(dedupe_option=False)
999+
1000+
# Create a new finding with the same unique_id
1001+
# It should skip the mitigated finding and deduplicate against the active one
1002+
finding_new, _ = self.copy_and_reset_finding(find_id=124)
1003+
finding_new.active = True
1004+
finding_new.is_mitigated = False
1005+
finding_new.unique_id_from_tool = original_unique_id
1006+
finding_new.save()
1007+
1008+
# The new finding should be marked as duplicate of the active finding,
1009+
# not the mitigated one (even though mitigated has lower ID)
1010+
self.assert_finding(finding_new, duplicate=True, duplicate_finding_id=finding_active.id)
1011+
self.assertNotEqual(finding_new.duplicate_finding.id, finding_mitigated.id)
1012+
8821013
# algo unique_id_or_hash_code Veracode scan
8831014

8841015
def test_identical_unique_id_or_hash_code(self):

0 commit comments

Comments
 (0)