Skip to content

Commit 930792d

Browse files
simplify
1 parent 87e5d45 commit 930792d

3 files changed

Lines changed: 63 additions & 58 deletions

File tree

dojo/importers/default_importer.py

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,6 @@ def process_findings(
163163
post_processing_task_signatures = []
164164
current_batch_number = 1
165165
max_batch_size = 1024
166-
pending_grade_calculations = []
167166

168167
"""
169168
Saves findings in memory that were parsed from the scan report into the database.
@@ -176,7 +175,13 @@ def process_findings(
176175
logger.debug("starting import of %i parsed findings.", len(parsed_findings) if parsed_findings else 0)
177176
group_names_to_findings_dict = {}
178177

179-
for non_clean_unsaved_finding in parsed_findings:
178+
# Create iterator over parsed findings
179+
findings_iterator = iter(parsed_findings)
180+
181+
# Get first finding to start the loop
182+
non_clean_unsaved_finding = next(findings_iterator, None)
183+
184+
while non_clean_unsaved_finding:
180185
# make sure the severity is something is digestible
181186
unsaved_finding = self.sanitize_severity(non_clean_unsaved_finding)
182187
# Filter on minimum severity if applicable
@@ -253,25 +258,31 @@ def process_findings(
253258
push_to_jira=push_to_jira,
254259
)
255260

256-
if we_want_async(async_user=self.user):
257-
# Collect signatures for progressive batch execution
258-
post_processing_task_signatures.append(post_processing_task_signature)
261+
post_processing_task_signatures.append(post_processing_task_signature)
259262

263+
# Get next finding for next iteration
264+
non_clean_unsaved_finding = next(findings_iterator, None)
265+
is_final = not non_clean_unsaved_finding
266+
267+
# Check if we should launch a chord (batch full or end of findings)
268+
if we_want_async(async_user=self.user) and post_processing_task_signatures:
260269
# Calculate current batch size: 2^batch_number, capped at max_batch_size
261270
current_batch_size = min(2 ** current_batch_number, max_batch_size)
262271

263-
# Launch chord when batch is full
264-
if len(post_processing_task_signatures) >= current_batch_size:
272+
batch_full = len(post_processing_task_signatures) >= current_batch_size
273+
274+
if batch_full or is_final:
275+
# Launch chord with current batch of signatures
265276
product = self.test.engagement.product
266277
calculate_grade_signature = utils.calculate_grade_signature(product)
267-
chord_result = chord(post_processing_task_signatures)(calculate_grade_signature)
268-
pending_grade_calculations.append(chord_result)
278+
chord(post_processing_task_signatures)(calculate_grade_signature)
269279

270-
logger.debug(f"Launched chord with {len(post_processing_task_signatures)} tasks (batch #{current_batch_number}, size: {current_batch_size})")
280+
logger.debug(f"Launched chord with {len(post_processing_task_signatures)} tasks (batch #{current_batch_number}, size: {len(post_processing_task_signatures)})")
271281

272-
# Reset for next batch
282+
# Reset for next batch (only if not final)
273283
post_processing_task_signatures = []
274-
current_batch_number += 1
284+
if not is_final:
285+
current_batch_number += 1
275286
else:
276287
# Execute task immediately for synchronous processing
277288
post_processing_task_signature()
@@ -290,18 +301,10 @@ def process_findings(
290301
else:
291302
jira_helper.push_to_jira(findings[0])
292303

293-
# Handle any remaining signatures in the final batch
294-
product = self.test.engagement.product
295-
296-
if we_want_async(async_user=self.user):
297-
if post_processing_task_signatures:
298-
# Launch final chord with remaining signatures
299-
calculate_grade_signature = utils.calculate_grade_signature(product)
300-
chord_result = chord(post_processing_task_signatures)(calculate_grade_signature)
301-
pending_grade_calculations.append(chord_result)
302-
logger.debug(f"Launched final chord with {len(post_processing_task_signatures)} remaining tasks")
304+
# Note: All chord batching is now handled within the loop above
303305

304-
# Always perform an initial grading, even though it might get overwritten alter.
306+
# Always perform an initial grading, even though it might get overwritten later.
307+
product = self.test.engagement.product
305308
calculate_grade(product)
306309

307310
sync = kwargs.get("sync", True)

dojo/importers/default_reimporter.py

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -184,13 +184,18 @@ def process_findings(
184184
post_processing_task_signatures = []
185185
current_batch_number = 1
186186
max_batch_size = 1024
187-
pending_grade_calculations = []
188187

189188
logger.debug(f"starting reimport of {len(parsed_findings) if parsed_findings else 0} items.")
190189
logger.debug("STEP 1: looping over findings from the reimported report and trying to match them to existing findings")
191190
deduplicationLogger.debug(f"Algorithm used for matching new findings to existing findings: {self.deduplication_algorithm}")
192191

193-
for non_clean_unsaved_finding in parsed_findings:
192+
# Create iterator over parsed findings
193+
findings_iterator = iter(parsed_findings)
194+
195+
# Get first finding to start the loop
196+
non_clean_unsaved_finding = next(findings_iterator, None)
197+
198+
while non_clean_unsaved_finding:
194199
# make sure the severity is something is digestible
195200
unsaved_finding = self.sanitize_severity(non_clean_unsaved_finding)
196201
# Filter on minimum severity if applicable
@@ -258,28 +263,33 @@ def process_findings(
258263
issue_updater_option=True,
259264
push_to_jira=push_to_jira,
260265
)
261-
if we_want_async(async_user=self.user):
262-
# Collect signatures for progressive batch execution
263-
post_processing_task_signatures.append(post_processing_task_signature)
266+
post_processing_task_signatures.append(post_processing_task_signature)
267+
268+
# Get next finding for next iteration
269+
non_clean_unsaved_finding = next(findings_iterator, None)
264270

265-
# Calculate current batch size: 2^batch_number, capped at max_batch_size
266-
current_batch_size = min(2 ** current_batch_number, max_batch_size)
271+
# Check if we should launch a chord (batch full or end of findings)
272+
if we_want_async(async_user=self.user) and post_processing_task_signatures:
273+
# Calculate current batch size: 2^batch_number, capped at max_batch_size
274+
current_batch_size = min(2 ** current_batch_number, max_batch_size)
267275

268-
# Launch chord when batch is full
269-
if len(post_processing_task_signatures) >= current_batch_size:
270-
product = self.test.engagement.product
271-
calculate_grade_signature = utils.calculate_grade_signature(product)
272-
chord_result = chord(post_processing_task_signatures)(calculate_grade_signature)
273-
pending_grade_calculations.append(chord_result)
276+
batch_full = len(post_processing_task_signatures) >= current_batch_size
277+
is_final = not non_clean_unsaved_finding
274278

275-
logger.debug(f"Launched chord with {len(post_processing_task_signatures)} tasks (batch #{current_batch_number}, size: {current_batch_size})")
279+
if batch_full or is_final:
280+
# Launch chord with current batch of signatures
281+
product = self.test.engagement.product
282+
calculate_grade_signature = utils.calculate_grade_signature(product)
283+
chord(post_processing_task_signatures)(calculate_grade_signature)
276284

277-
# Reset for next batch
278-
post_processing_task_signatures = []
285+
logger.debug(f"Launched chord with {len(post_processing_task_signatures)} tasks (batch #{current_batch_number}, size: {len(post_processing_task_signatures)})")
286+
287+
# Reset for next batch (only if not final)
288+
post_processing_task_signatures = []
289+
if not is_final:
279290
current_batch_number += 1
280-
else:
281-
# Execute task immediately for synchronous processing
282-
post_processing_task_signature()
291+
else:
292+
post_processing_task_signature()
283293

284294
self.to_mitigate = (set(self.original_items) - set(self.reactivated_items) - set(self.unchanged_items))
285295
# due to #3958 we can have duplicates inside the same report
@@ -292,18 +302,10 @@ def process_findings(
292302
# Process groups
293303
self.process_groups_for_all_findings(**kwargs)
294304

295-
# Handle any remaining signatures in the final batch
296-
product = self.test.engagement.product
297-
298-
if we_want_async(async_user=self.user):
299-
if post_processing_task_signatures:
300-
# Launch final chord with remaining signatures
301-
calculate_grade_signature = utils.calculate_grade_signature(product)
302-
chord_result = chord(post_processing_task_signatures)(calculate_grade_signature)
303-
pending_grade_calculations.append(chord_result)
304-
logger.debug(f"Launched final chord with {len(post_processing_task_signatures)} remaining tasks")
305+
# Note: All chord batching is now handled within the loop above
305306

306307
# Synchronous tasks were already executed during processing, just calculate grade
308+
product = self.test.engagement.product
307309
calculate_grade(product)
308310

309311
# Process the results and return them back

unittests/test_importers_performance.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -176,12 +176,12 @@ def import_reimport_performance(self, expected_num_queries1, expected_num_async_
176176
# def test_import_reimport_reimport_performance_async(self, mock):
177177
def test_import_reimport_reimport_performance_async(self):
178178
self.import_reimport_performance(
179-
expected_num_queries1=713,
180-
expected_num_async_tasks1=11,
181-
expected_num_queries2=610,
182-
expected_num_async_tasks2=23,
183-
expected_num_queries3=292,
184-
expected_num_async_tasks3=21,
179+
expected_num_queries1=715,
180+
expected_num_async_tasks1=13,
181+
expected_num_queries2=612,
182+
expected_num_async_tasks2=25,
183+
expected_num_queries3=294,
184+
expected_num_async_tasks3=23,
185185
)
186186

187187
# @patch("dojo.decorators.we_want_async", return_value=False)

0 commit comments

Comments
 (0)