|
| 1 | +import logging |
| 2 | +from unittest.mock import patch |
| 3 | + |
| 4 | +from django.contrib.auth.models import User as DjangoUser |
| 5 | +from django.test import TransactionTestCase, tag |
| 6 | +from django.utils import timezone |
| 7 | + |
| 8 | +from dojo.importers.default_importer import DefaultImporter |
| 9 | +from dojo.models import ( |
| 10 | + Development_Environment, |
| 11 | + Engagement, |
| 12 | + Finding, |
| 13 | + Product, |
| 14 | + Product_Type, |
| 15 | + SLA_Configuration, |
| 16 | + Test, |
| 17 | + Test_Import_Finding_Action, |
| 18 | +) |
| 19 | + |
| 20 | +logger = logging.getLogger(__name__) |
| 21 | + |
| 22 | + |
| 23 | +# we need to run this as a TransactionTestCase to be able to mimic the behavior of the bulk_create fallback at runtime when a FK violation occurs |
| 24 | + |
| 25 | + |
| 26 | +@tag("transactional") |
| 27 | +class UpdateImportHistoryTests(TransactionTestCase): |
| 28 | + |
| 29 | + # loading fixtures fails in TransactionTestCase, not sure why. possibly because they are not up-to-date and missing fields like sla_configuration |
| 30 | + # creating testdata via code is a better approach, at least here. |
| 31 | + def setUp(self): |
| 32 | + super().setUp() |
| 33 | + self.env, _ = Development_Environment.objects.get_or_create(name="Development") |
| 34 | + self.prod_type = Product_Type.objects.create(name="UpdateImportHistory PT") |
| 35 | + # Ensure a valid SLA configuration exists and is assigned explicitly to avoid default FK issues |
| 36 | + self.sla = SLA_Configuration.objects.create(name="UpdateImportHistory SLA") |
| 37 | + self.prod = Product.objects.create( |
| 38 | + name="UpdateImportHistory P", |
| 39 | + prod_type=self.prod_type, |
| 40 | + sla_configuration=self.sla, |
| 41 | + ) |
| 42 | + self.eng = Engagement.objects.create( |
| 43 | + name="UpdateImportHistory E", |
| 44 | + product=self.prod, |
| 45 | + target_start=timezone.now(), |
| 46 | + target_end=timezone.now(), |
| 47 | + ) |
| 48 | + # Ensure a reporter/lead user exists for FK constraints |
| 49 | + self.user = DjangoUser.objects.create(username="admin") |
| 50 | + |
| 51 | + # Minimal importer |
| 52 | + self.importer = DefaultImporter( |
| 53 | + user=self.user, |
| 54 | + lead=self.user, |
| 55 | + environment=self.env, |
| 56 | + engagement=self.eng, |
| 57 | + minimum_severity="Info", |
| 58 | + active=True, |
| 59 | + verified=True, |
| 60 | + sync=True, |
| 61 | + scan_type="StackHawk HawkScan", |
| 62 | + ) |
| 63 | + # Explicitly create the Test similar to Engagement creation |
| 64 | + self.test = Test.objects.create( |
| 65 | + title="UpdateImportHistory T", |
| 66 | + engagement=self.eng, |
| 67 | + lead=self.user, |
| 68 | + environment=self.env, |
| 69 | + test_type=self.importer.get_or_create_test_type("StackHawk HawkScan"), |
| 70 | + scan_type="StackHawk HawkScan", |
| 71 | + target_start=timezone.now(), |
| 72 | + target_end=timezone.now(), |
| 73 | + percent_complete=0, |
| 74 | + ) |
| 75 | + # Attach to importer |
| 76 | + self.importer.test = self.test |
| 77 | + |
| 78 | + def _create_findings(self, count): |
| 79 | + findings = [] |
| 80 | + for i in range(count): |
| 81 | + f = Finding( |
| 82 | + title=f"F{i}", |
| 83 | + test=self.importer.test, |
| 84 | + severity="Low", |
| 85 | + reporter=self.user, |
| 86 | + ) |
| 87 | + f.save() |
| 88 | + findings.append(f) |
| 89 | + return findings |
| 90 | + |
| 91 | + def test_success_path_creates_expected_actions(self): |
| 92 | + new_findings = self._create_findings(5) |
| 93 | + closed_findings = self._create_findings(3) |
| 94 | + |
| 95 | + test_import = self.importer.update_import_history( |
| 96 | + new_findings=new_findings, |
| 97 | + closed_findings=closed_findings, |
| 98 | + ) |
| 99 | + |
| 100 | + total_expected = len(new_findings) + len(closed_findings) |
| 101 | + created = Test_Import_Finding_Action.objects.filter(test_import=test_import).count() |
| 102 | + self.assertEqual(created, total_expected) |
| 103 | + |
| 104 | + def test_fk_violation_in_batch_results_in_partial_fallback(self): |
| 105 | + # One bad finding (deleted after pre-check) triggers IntegrityError; fallback saves the valid ones |
| 106 | + new_findings = self._create_findings(9) |
| 107 | + bad = self._create_findings(1)[0] |
| 108 | + |
| 109 | + # Patch the existence filter to return all findings as-if they exist, then delete to simulate race after check |
| 110 | + with patch("dojo.finding.helper.filter_findings_by_existence", side_effect=lambda lst: lst): |
| 111 | + bad_id = bad.id |
| 112 | + Finding.objects.filter(id=bad_id).delete() |
| 113 | + test_import = self.importer.update_import_history(new_findings=[*new_findings, bad]) |
| 114 | + |
| 115 | + created = Test_Import_Finding_Action.objects.filter(test_import=test_import).count() |
| 116 | + # Expect only the 9 valid ones to be created; the bad one is skipped/raises during fallback |
| 117 | + self.assertEqual(created, len(new_findings)) |
| 118 | + |
| 119 | + def test_fk_violation_second_batch_results_in_partial_fallback(self): |
| 120 | + # Create 300 findings so Django's bulk_create will batch internally (batch_size=100) |
| 121 | + total = 300 |
| 122 | + new_findings = self._create_findings(total) |
| 123 | + |
| 124 | + # Delete a finding in the second batch (index 150) after the existence check |
| 125 | + bad = new_findings[150] |
| 126 | + with patch("dojo.finding.helper.filter_findings_by_existence", side_effect=lambda lst: lst): |
| 127 | + Finding.objects.filter(id=bad.id).delete() |
| 128 | + test_import = self.importer.update_import_history(new_findings=new_findings) |
| 129 | + |
| 130 | + # Expect all but the deleted one to be created via fallback |
| 131 | + created = Test_Import_Finding_Action.objects.filter(test_import=test_import).count() |
| 132 | + self.assertEqual(created, total - 1) |
| 133 | + |
| 134 | + def test_precheck_filters_out_deleted_findings_allows_successful_bulk(self): |
| 135 | + # If a finding is deleted before the existence check, it should be filtered out |
| 136 | + new_findings = self._create_findings(5) |
| 137 | + closed_findings = self._create_findings(3) |
| 138 | + |
| 139 | + # Delete one from new and one from closed before calling update_import_history |
| 140 | + Finding.objects.filter(id=new_findings[0].id).delete() |
| 141 | + Finding.objects.filter(id=closed_findings[0].id).delete() |
| 142 | + |
| 143 | + test_import = self.importer.update_import_history( |
| 144 | + new_findings=new_findings, |
| 145 | + closed_findings=closed_findings, |
| 146 | + ) |
| 147 | + |
| 148 | + expected = (len(new_findings) - 1) + (len(closed_findings) - 1) |
| 149 | + created = Test_Import_Finding_Action.objects.filter(test_import=test_import).count() |
| 150 | + self.assertEqual(created, expected) |
0 commit comments