Skip to content

Commit 6a2c516

Browse files
extract chord orchestration into method
1 parent 360b788 commit 6a2c516

3 files changed

Lines changed: 56 additions & 54 deletions

File tree

dojo/importers/base_importer.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import base64
22
import logging
33

4+
from celery import chord, group
45
from django.conf import settings
56
from django.core.exceptions import ValidationError
67
from django.core.files.base import ContentFile
@@ -10,6 +11,7 @@
1011
from django.utils.timezone import make_aware
1112

1213
import dojo.finding.helper as finding_helper
14+
from dojo import utils
1315
from dojo.importers.endpoint_manager import EndpointManager
1416
from dojo.importers.options import ImporterOptions
1517
from dojo.models import (
@@ -24,6 +26,7 @@
2426
Endpoint,
2527
FileUpload,
2628
Finding,
29+
System_Settings,
2730
Test,
2831
Test_Import,
2932
Test_Import_Finding_Action,
@@ -532,6 +535,47 @@ def update_test_type_from_internal_test(self, internal_test: ParserTest) -> None
532535
self.test.test_type.dynamic_tool = dynamic_tool
533536
self.test.test_type.save()
534537

538+
def maybe_launch_post_processing_chord(
539+
self,
540+
post_processing_task_signatures,
541+
current_batch_number: int,
542+
max_batch_size: int,
543+
*
544+
is_final_batch: bool,
545+
) -> tuple[list, int, bool]:
546+
"""
547+
Helper to optionally launch a chord of post-processing tasks with a calculate-grade callback
548+
when async is desired. Uses exponential batch sizing up to the configured max batch size.
549+
550+
Returns a tuple of (post_processing_task_signatures, current_batch_number, launched)
551+
where launched indicates whether a chord/group was dispatched and signatures were reset.
552+
"""
553+
launched = False
554+
if not post_processing_task_signatures:
555+
return post_processing_task_signatures, current_batch_number, launched
556+
557+
current_batch_size = min(2 ** current_batch_number, max_batch_size)
558+
batch_full = len(post_processing_task_signatures) >= current_batch_size
559+
560+
if batch_full or is_final_batch:
561+
product = self.test.engagement.product
562+
system_settings = System_Settings.objects.get()
563+
if system_settings.enable_product_grade:
564+
calculate_grade_signature = utils.calculate_grade_signature(product)
565+
chord(post_processing_task_signatures)(calculate_grade_signature)
566+
else:
567+
group(post_processing_task_signatures).apply_async()
568+
569+
logger.debug(
570+
f"Launched chord with {len(post_processing_task_signatures)} tasks (batch #{current_batch_number}, size: {len(post_processing_task_signatures)})",
571+
)
572+
post_processing_task_signatures = []
573+
if not is_final_batch:
574+
current_batch_number += 1
575+
launched = True
576+
577+
return post_processing_task_signatures, current_batch_number, launched
578+
535579
def verify_tool_configuration_from_test(self):
536580
"""
537581
Verify that the Tool_Configuration supplied along with the

dojo/importers/default_importer.py

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,18 @@
11
import logging
22

3-
from celery import chord, group
43
from django.core.files.uploadedfile import TemporaryUploadedFile
54
from django.core.serializers import serialize
65
from django.db.models.query_utils import Q
76
from django.urls import reverse
87

98
import dojo.jira_link.helper as jira_helper
10-
from dojo import utils
119
from dojo.decorators import we_want_async
1210
from dojo.finding import helper as finding_helper
1311
from dojo.importers.base_importer import BaseImporter, Parser
1412
from dojo.importers.options import ImporterOptions
1513
from dojo.models import (
1614
Engagement,
1715
Finding,
18-
System_Settings,
1916
Test,
2017
Test_Import,
2118
)
@@ -253,30 +250,12 @@ def process_findings(
253250

254251
# Check if we should launch a chord (batch full or end of findings)
255252
if we_want_async(async_user=self.user) and post_processing_task_signatures:
256-
# Calculate current batch size: 2^batch_number, capped at max_batch_size
257-
# We do this because post processing only starts after all tasks have been added to the chord
258-
# So we start with small batches to minmize the delay
259-
current_batch_size = min(2 ** current_batch_number, max_batch_size)
260-
261-
batch_full = len(post_processing_task_signatures) >= current_batch_size
262-
263-
if batch_full or is_final_finding:
264-
# Launch chord with current batch of signatures
265-
product = self.test.engagement.product
266-
system_settings = System_Settings.objects.get()
267-
if system_settings.enable_product_grade:
268-
calculate_grade_signature = utils.calculate_grade_signature(product)
269-
chord(post_processing_task_signatures)(calculate_grade_signature)
270-
elif post_processing_task_signatures:
271-
# If product grading is disabled, just run the post-processing tasks without the grade calculation callback
272-
group(post_processing_task_signatures).apply_async()
273-
274-
logger.debug(f"Launched chord with {len(post_processing_task_signatures)} tasks (batch #{current_batch_number}, size: {len(post_processing_task_signatures)})")
275-
276-
# Reset for next batch (only if not final)
277-
post_processing_task_signatures = []
278-
if not is_final_finding:
279-
current_batch_number += 1
253+
post_processing_task_signatures, current_batch_number, _ = self.maybe_launch_post_processing_chord(
254+
post_processing_task_signatures,
255+
current_batch_number,
256+
max_batch_size,
257+
is_final_finding,
258+
)
280259
else:
281260
# Execute task immediately for synchronous processing
282261
post_processing_task_signature()

dojo/importers/default_reimporter.py

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,18 @@
11
import logging
22

3-
from celery import chord, group
43
from django.core.files.uploadedfile import TemporaryUploadedFile
54
from django.core.serializers import serialize
65
from django.db.models.query_utils import Q
76

87
import dojo.finding.helper as finding_helper
98
import dojo.jira_link.helper as jira_helper
10-
from dojo import utils
119
from dojo.decorators import we_want_async
1210
from dojo.importers.base_importer import BaseImporter, Parser
1311
from dojo.importers.options import ImporterOptions
1412
from dojo.models import (
1513
Development_Environment,
1614
Finding,
1715
Notes,
18-
System_Settings,
1916
Test,
2017
Test_Import,
2118
)
@@ -271,30 +268,12 @@ def process_findings(
271268

272269
# Check if we should launch a chord (batch full or end of findings)
273270
if we_want_async(async_user=self.user) and post_processing_task_signatures:
274-
# Calculate current batch size: 2^batch_number, capped at max_batch_size
275-
# We do this because post processing only starts after all tasks have been added to the chord
276-
# So we start with small batches to minmize the delay
277-
current_batch_size = min(2 ** current_batch_number, max_batch_size)
278-
279-
batch_full = len(post_processing_task_signatures) >= current_batch_size
280-
281-
if batch_full or is_final:
282-
# Launch chord with current batch of signatures
283-
product = self.test.engagement.product
284-
system_settings = System_Settings.objects.get()
285-
if system_settings.enable_product_grade:
286-
calculate_grade_signature = utils.calculate_grade_signature(product)
287-
chord(post_processing_task_signatures)(calculate_grade_signature)
288-
elif post_processing_task_signatures:
289-
# If product grading is disabled, just run the post-processing tasks without the grade calculation callback
290-
group(post_processing_task_signatures).apply_async()
291-
292-
logger.debug(f"Launched chord with {len(post_processing_task_signatures)} tasks (batch #{current_batch_number}, size: {len(post_processing_task_signatures)})")
293-
294-
# Reset for next batch (only if not final)
295-
post_processing_task_signatures = []
296-
if not is_final:
297-
current_batch_number += 1
271+
post_processing_task_signatures, current_batch_number, _ = self.maybe_launch_post_processing_chord(
272+
post_processing_task_signatures,
273+
current_batch_number,
274+
max_batch_size,
275+
is_final,
276+
)
298277
else:
299278
post_processing_task_signature()
300279

0 commit comments

Comments
 (0)