Skip to content

Commit 93fbe3d

Browse files
Reduce and optimize number of product grading calls using a Chord (DefectDojo#12914)
* test cases: fix caching of system settings * fix tests * fix caching for github * fix caching for github * simplify cache loading * post process only when needed * set tags on (re)import * rebase set tags * reduce save with options * update counts, reduce saves with options * importers: do not save again, but postprocess directly * update counts * optimize hash_code setting * fix counts * set hash code for new findings in reimport * make smaller second save work * make smaller second save work - add no_options * update query counts * update counts * remove logging * perf3b: compute hash_code on first save * fix cve for reimport * ruff * fix no async * Merge remote-tracking branch 'upstream/dev' into perf3-reduce-saves * make smaller second save work * fix cve for reimport * initial * fix counts * fix counts * simplify * simplify * refactor to await results * handle reimport and close old findings * update query and task counts * switch back to chords * simplify * respect system settings product grading enabled * finding/test delete grading only if enabled * optimize asyn_dupe_delete grading * cleanup comments * fix merge artifact * fix loop * simplify loop * fix reimport loop * revert settings changes * revert settings changes * update counts * extract product grading method call * cleanup model deletes * product grade logging fix * extract chord orchestration into method * fix model traversal
1 parent a0f31ee commit 93fbe3d

9 files changed

Lines changed: 261 additions & 63 deletions

File tree

dojo/decorators.py

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -79,25 +79,34 @@ def we_want_async(*args, func=None, **kwargs):
7979

8080
# Defect Dojo performs all tasks asynchrnonously using celery
8181
# *unless* the user initiating the task has set block_execution to True in their usercontactinfo profile
82-
def dojo_async_task(func):
83-
@wraps(func)
84-
def __wrapper__(*args, **kwargs):
85-
from dojo.utils import get_current_user # noqa: PLC0415 circular import
86-
user = get_current_user()
87-
kwargs["async_user"] = user
88-
89-
dojo_async_task_counter.incr(
90-
func.__name__,
91-
args=args,
92-
kwargs=kwargs,
93-
)
94-
95-
countdown = kwargs.pop("countdown", 0)
96-
if we_want_async(*args, func=func, **kwargs):
97-
return func.apply_async(args=args, kwargs=kwargs, countdown=countdown)
98-
return func(*args, **kwargs)
99-
100-
return __wrapper__
82+
def dojo_async_task(func=None, *, signature=False):
83+
def decorator(func):
84+
@wraps(func)
85+
def __wrapper__(*args, **kwargs):
86+
from dojo.utils import get_current_user # noqa: PLC0415 circular import
87+
user = get_current_user()
88+
kwargs["async_user"] = user
89+
90+
dojo_async_task_counter.incr(
91+
func.__name__,
92+
args=args,
93+
kwargs=kwargs,
94+
)
95+
96+
if signature:
97+
return func.si(*args, **kwargs)
98+
99+
countdown = kwargs.pop("countdown", 0)
100+
if we_want_async(*args, func=func, **kwargs):
101+
# Return a signature for use in chord/group if requested
102+
# Execute the task
103+
return func.apply_async(args=args, kwargs=kwargs, countdown=countdown)
104+
return func(*args, **kwargs)
105+
return __wrapper__
106+
107+
if func is None:
108+
return decorator
109+
return decorator(func)
101110

102111

103112
# decorator with parameters needs another wrapper layer

dojo/finding/helper.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -379,13 +379,35 @@ def add_findings_to_auto_group(name, findings, group_by, *, create_finding_group
379379
finding_group.findings.add(*findings)
380380

381381

382+
@dojo_model_to_id
383+
@dojo_async_task(signature=True)
384+
@app.task
385+
@dojo_model_from_id
386+
def post_process_finding_save_signature(finding, dedupe_option=True, rules_option=True, product_grading_option=True, # noqa: FBT002
387+
issue_updater_option=True, push_to_jira=False, user=None, *args, **kwargs): # noqa: FBT002 - this is bit hard to fix nice have this universally fixed
388+
"""
389+
Returns a task signature for post-processing a finding. This is useful for creating task signatures
390+
that can be used in chords or groups or to await results. We need this extra method because of our dojo_async decorator.
391+
If we use more of these celery features, we should probably move away from that decorator.
392+
"""
393+
return post_process_finding_save_internal(finding, dedupe_option, rules_option, product_grading_option,
394+
issue_updater_option, push_to_jira, user, *args, **kwargs)
395+
396+
382397
@dojo_model_to_id
383398
@dojo_async_task
384399
@app.task
385400
@dojo_model_from_id
386401
def post_process_finding_save(finding, dedupe_option=True, rules_option=True, product_grading_option=True, # noqa: FBT002
387402
issue_updater_option=True, push_to_jira=False, user=None, *args, **kwargs): # noqa: FBT002 - this is bit hard to fix nice have this universally fixed
388403

404+
return post_process_finding_save_internal(finding, dedupe_option, rules_option, product_grading_option,
405+
issue_updater_option, push_to_jira, user, *args, **kwargs)
406+
407+
408+
def post_process_finding_save_internal(finding, dedupe_option=True, rules_option=True, product_grading_option=True, # noqa: FBT002
409+
issue_updater_option=True, push_to_jira=False, user=None, *args, **kwargs): # noqa: FBT002 - this is bit hard to fix nice have this universally fixed
410+
389411
if not finding:
390412
logger.warning("post_process_finding_save called with finding==None, skipping post processing")
391413
return
@@ -477,7 +499,6 @@ def finding_post_delete(sender, instance, **kwargs):
477499
# Catch instances in async delete where a single object is deleted more than once
478500
with suppress(Finding.DoesNotExist):
479501
logger.debug("finding post_delete, sender: %s instance: %s", to_str_typed(sender), to_str_typed(instance))
480-
# calculate_grade(instance.test.engagement.product)
481502

482503

483504
def reset_duplicate_before_delete(dupe):

dojo/importers/base_importer.py

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import base64
22
import logging
33

4+
from celery import chord, group
45
from django.conf import settings
56
from django.core.exceptions import ValidationError
67
from django.core.files.base import ContentFile
@@ -10,6 +11,7 @@
1011
from django.utils.timezone import make_aware
1112

1213
import dojo.finding.helper as finding_helper
14+
from dojo import utils
1315
from dojo.importers.endpoint_manager import EndpointManager
1416
from dojo.importers.options import ImporterOptions
1517
from dojo.models import (
@@ -24,6 +26,7 @@
2426
Endpoint,
2527
FileUpload,
2628
Finding,
29+
System_Settings,
2730
Test,
2831
Test_Import,
2932
Test_Import_Finding_Action,
@@ -552,6 +555,47 @@ def update_test_type_from_internal_test(self, internal_test: ParserTest) -> None
552555
self.test.test_type.dynamic_tool = dynamic_tool
553556
self.test.test_type.save()
554557

558+
def maybe_launch_post_processing_chord(
559+
self,
560+
post_processing_task_signatures,
561+
current_batch_number: int,
562+
max_batch_size: int,
563+
*
564+
is_final_batch: bool,
565+
) -> tuple[list, int, bool]:
566+
"""
567+
Helper to optionally launch a chord of post-processing tasks with a calculate-grade callback
568+
when async is desired. Uses exponential batch sizing up to the configured max batch size.
569+
570+
Returns a tuple of (post_processing_task_signatures, current_batch_number, launched)
571+
where launched indicates whether a chord/group was dispatched and signatures were reset.
572+
"""
573+
launched = False
574+
if not post_processing_task_signatures:
575+
return post_processing_task_signatures, current_batch_number, launched
576+
577+
current_batch_size = min(2 ** current_batch_number, max_batch_size)
578+
batch_full = len(post_processing_task_signatures) >= current_batch_size
579+
580+
if batch_full or is_final_batch:
581+
product = self.test.engagement.product
582+
system_settings = System_Settings.objects.get()
583+
if system_settings.enable_product_grade:
584+
calculate_grade_signature = utils.calculate_grade_signature(product)
585+
chord(post_processing_task_signatures)(calculate_grade_signature)
586+
else:
587+
group(post_processing_task_signatures).apply_async()
588+
589+
logger.debug(
590+
f"Launched chord with {len(post_processing_task_signatures)} tasks (batch #{current_batch_number}, size: {len(post_processing_task_signatures)})",
591+
)
592+
post_processing_task_signatures = []
593+
if not is_final_batch:
594+
current_batch_number += 1
595+
launched = True
596+
597+
return post_processing_task_signatures, current_batch_number, launched
598+
555599
def verify_tool_configuration_from_test(self):
556600
"""
557601
Verify that the Tool_Configuration supplied along with the
@@ -763,6 +807,7 @@ def mitigate_finding(
763807
note_message: str,
764808
*,
765809
finding_groups_enabled: bool,
810+
product_grading_option: bool = True,
766811
) -> None:
767812
"""
768813
Mitigates a finding, all endpoint statuses, leaves a note on the finding
@@ -784,9 +829,9 @@ def mitigate_finding(
784829
# to avoid pushing a finding group multiple times, we push those outside of the loop
785830
if finding_groups_enabled and finding.finding_group:
786831
# don't try to dedupe findings that we are closing
787-
finding.save(dedupe_option=False)
832+
finding.save(dedupe_option=False, product_grading_option=product_grading_option)
788833
else:
789-
finding.save(dedupe_option=False, push_to_jira=self.push_to_jira)
834+
finding.save(dedupe_option=False, push_to_jira=self.push_to_jira, product_grading_option=product_grading_option)
790835

791836
def notify_scan_added(
792837
self,

dojo/importers/default_importer.py

Lines changed: 52 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@
55
from django.db.models.query_utils import Q
66
from django.urls import reverse
77

8-
import dojo.finding.helper as finding_helper
98
import dojo.jira_link.helper as jira_helper
9+
from dojo.decorators import we_want_async
10+
from dojo.finding import helper as finding_helper
1011
from dojo.importers.base_importer import BaseImporter, Parser
1112
from dojo.importers.options import ImporterOptions
1213
from dojo.models import (
@@ -16,6 +17,7 @@
1617
Test_Import,
1718
)
1819
from dojo.notifications.helper import create_notification
20+
from dojo.utils import perform_product_grading
1921
from dojo.validators import clean_tags
2022

2123
logger = logging.getLogger(__name__)
@@ -155,6 +157,11 @@ def process_findings(
155157
parsed_findings: list[Finding],
156158
**kwargs: dict,
157159
) -> list[Finding]:
160+
# Progressive batching for chord execution
161+
post_processing_task_signatures = []
162+
current_batch_number = 1
163+
max_batch_size = 1024
164+
158165
"""
159166
Saves findings in memory that were parsed from the scan report into the database.
160167
This process involves first saving associated objects such as endpoints, files,
@@ -166,13 +173,17 @@ def process_findings(
166173
logger.debug("starting import of %i parsed findings.", len(parsed_findings) if parsed_findings else 0)
167174
group_names_to_findings_dict = {}
168175

169-
for non_clean_unsaved_finding in parsed_findings:
170-
# make sure the severity is something is digestible
171-
unsaved_finding = self.sanitize_severity(non_clean_unsaved_finding)
172-
# Filter on minimum severity if applicable
173-
if Finding.SEVERITIES[unsaved_finding.severity] > Finding.SEVERITIES[self.minimum_severity]:
174-
# finding's severity is below the configured threshold : ignoring the finding
176+
# Pre-sanitize and filter by minimum severity
177+
cleaned_findings = []
178+
for raw_finding in parsed_findings or []:
179+
sanitized = self.sanitize_severity(raw_finding)
180+
if Finding.SEVERITIES[sanitized.severity] > Finding.SEVERITIES[self.minimum_severity]:
181+
logger.debug("skipping finding due to minimum severity filter (finding=%s severity=%s min=%s)", sanitized.title, sanitized.severity, self.minimum_severity)
175182
continue
183+
cleaned_findings.append(sanitized)
184+
185+
for idx, unsaved_finding in enumerate(cleaned_findings):
186+
is_final_finding = idx == len(cleaned_findings) - 1
176187

177188
# Some parsers provide "mitigated" field but do not set timezone (because they are probably not available in the report)
178189
# Finding.mitigated is DateTimeField and it requires timezone
@@ -183,7 +194,7 @@ def process_findings(
183194
unsaved_finding.reporter = self.user
184195
unsaved_finding.last_reviewed_by = self.user
185196
unsaved_finding.last_reviewed = self.now
186-
logger.debug("process_parsed_findings: unique_id_from_tool: %s, hash_code: %s, active from report: %s, verified from report: %s", unsaved_finding.unique_id_from_tool, unsaved_finding.hash_code, unsaved_finding.active, unsaved_finding.verified)
197+
logger.debug("process_parsed_finding: unique_id_from_tool: %s, hash_code: %s, active from report: %s, verified from report: %s", unsaved_finding.unique_id_from_tool, unsaved_finding.hash_code, unsaved_finding.active, unsaved_finding.verified)
187198
# indicates an override. Otherwise, do not change the value of unsaved_finding.active
188199
if self.active is not None:
189200
unsaved_finding.active = self.active
@@ -205,7 +216,6 @@ def process_findings(
205216
# postprocessing will be done after processing related fields like endpoints, vulnerability ids, etc.
206217
unsaved_finding.save_no_options()
207218

208-
finding = unsaved_finding
209219
# Determine how the finding should be grouped
210220
self.process_finding_groups(
211221
finding,
@@ -225,9 +235,30 @@ def process_findings(
225235
new_findings.append(finding)
226236
# all data is already saved on the finding, we only need to trigger post processing
227237

228-
# to avoid pushing a finding group multiple times, we push those outside of the loop
238+
# We create a signature for the post processing task so we can decide to apply it async or sync
229239
push_to_jira = self.push_to_jira and (not self.findings_groups_enabled or not self.group_by)
230-
finding_helper.post_process_finding_save(finding, dedupe_option=True, rules_option=True, product_grading_option=True, issue_updater_option=True, push_to_jira=push_to_jira)
240+
post_processing_task_signature = finding_helper.post_process_finding_save_signature(
241+
finding,
242+
dedupe_option=True,
243+
rules_option=True,
244+
product_grading_option=False,
245+
issue_updater_option=True,
246+
push_to_jira=push_to_jira,
247+
)
248+
249+
post_processing_task_signatures.append(post_processing_task_signature)
250+
251+
# Check if we should launch a chord (batch full or end of findings)
252+
if we_want_async(async_user=self.user) and post_processing_task_signatures:
253+
post_processing_task_signatures, current_batch_number, _ = self.maybe_launch_post_processing_chord(
254+
post_processing_task_signatures,
255+
current_batch_number,
256+
max_batch_size,
257+
is_final_finding,
258+
)
259+
else:
260+
# Execute task immediately for synchronous processing
261+
post_processing_task_signature()
231262

232263
for (group_name, findings) in group_names_to_findings_dict.items():
233264
finding_helper.add_findings_to_auto_group(
@@ -243,6 +274,11 @@ def process_findings(
243274
else:
244275
jira_helper.push_to_jira(findings[0])
245276

277+
# Note: All chord batching is now handled within the loop above
278+
279+
# Always perform an initial grading, even though it might get overwritten later.
280+
perform_product_grading(self.test.engagement.product)
281+
246282
sync = kwargs.get("sync", True)
247283
if not sync:
248284
return [serialize("json", [finding]) for finding in new_findings]
@@ -320,12 +356,17 @@ def close_old_findings(
320356
"as it is not present anymore in recent scans."
321357
),
322358
finding_groups_enabled=self.findings_groups_enabled,
359+
product_grading_option=False,
323360
)
324361
# push finding groups to jira since we only only want to push whole groups
325362
if self.findings_groups_enabled and self.push_to_jira:
326363
for finding_group in {finding.finding_group for finding in old_findings if finding.finding_group is not None}:
327364
jira_helper.push_to_jira(finding_group)
328365

366+
# Calculate grade once after all findings have been closed
367+
if old_findings:
368+
perform_product_grading(self.test.engagement.product)
369+
329370
return old_findings
330371

331372
def parse_findings_static_test_type(

0 commit comments

Comments
 (0)