diff --git a/dojo/decorators.py b/dojo/decorators.py index 1d1f6aac67c..b7b84d59430 100644 --- a/dojo/decorators.py +++ b/dojo/decorators.py @@ -79,25 +79,34 @@ def we_want_async(*args, func=None, **kwargs): # Defect Dojo performs all tasks asynchrnonously using celery # *unless* the user initiating the task has set block_execution to True in their usercontactinfo profile -def dojo_async_task(func): - @wraps(func) - def __wrapper__(*args, **kwargs): - from dojo.utils import get_current_user # noqa: PLC0415 circular import - user = get_current_user() - kwargs["async_user"] = user - - dojo_async_task_counter.incr( - func.__name__, - args=args, - kwargs=kwargs, - ) - - countdown = kwargs.pop("countdown", 0) - if we_want_async(*args, func=func, **kwargs): - return func.apply_async(args=args, kwargs=kwargs, countdown=countdown) - return func(*args, **kwargs) - - return __wrapper__ +def dojo_async_task(func=None, *, signature=False): + def decorator(func): + @wraps(func) + def __wrapper__(*args, **kwargs): + from dojo.utils import get_current_user # noqa: PLC0415 circular import + user = get_current_user() + kwargs["async_user"] = user + + dojo_async_task_counter.incr( + func.__name__, + args=args, + kwargs=kwargs, + ) + + if signature: + return func.si(*args, **kwargs) + + countdown = kwargs.pop("countdown", 0) + if we_want_async(*args, func=func, **kwargs): + # Return a signature for use in chord/group if requested + # Execute the task + return func.apply_async(args=args, kwargs=kwargs, countdown=countdown) + return func(*args, **kwargs) + return __wrapper__ + + if func is None: + return decorator + return decorator(func) # decorator with parameters needs another wrapper layer diff --git a/dojo/finding/helper.py b/dojo/finding/helper.py index 36249415693..6e82e596905 100644 --- a/dojo/finding/helper.py +++ b/dojo/finding/helper.py @@ -379,6 +379,21 @@ def add_findings_to_auto_group(name, findings, group_by, *, create_finding_group finding_group.findings.add(*findings) +@dojo_model_to_id +@dojo_async_task(signature=True) +@app.task +@dojo_model_from_id +def post_process_finding_save_signature(finding, dedupe_option=True, rules_option=True, product_grading_option=True, # noqa: FBT002 + issue_updater_option=True, push_to_jira=False, user=None, *args, **kwargs): # noqa: FBT002 - this is bit hard to fix nice have this universally fixed + """ + Returns a task signature for post-processing a finding. This is useful for creating task signatures + that can be used in chords or groups or to await results. We need this extra method because of our dojo_async decorator. + If we use more of these celery features, we should probably move away from that decorator. + """ + return post_process_finding_save_internal(finding, dedupe_option, rules_option, product_grading_option, + issue_updater_option, push_to_jira, user, *args, **kwargs) + + @dojo_model_to_id @dojo_async_task @app.task @@ -386,6 +401,13 @@ def add_findings_to_auto_group(name, findings, group_by, *, create_finding_group def post_process_finding_save(finding, dedupe_option=True, rules_option=True, product_grading_option=True, # noqa: FBT002 issue_updater_option=True, push_to_jira=False, user=None, *args, **kwargs): # noqa: FBT002 - this is bit hard to fix nice have this universally fixed + return post_process_finding_save_internal(finding, dedupe_option, rules_option, product_grading_option, + issue_updater_option, push_to_jira, user, *args, **kwargs) + + +def post_process_finding_save_internal(finding, dedupe_option=True, rules_option=True, product_grading_option=True, # noqa: FBT002 + issue_updater_option=True, push_to_jira=False, user=None, *args, **kwargs): # noqa: FBT002 - this is bit hard to fix nice have this universally fixed + if not finding: logger.warning("post_process_finding_save called with finding==None, skipping post processing") return @@ -477,7 +499,6 @@ def finding_post_delete(sender, instance, **kwargs): # Catch instances in async delete where a single object is deleted more than once with suppress(Finding.DoesNotExist): logger.debug("finding post_delete, sender: %s instance: %s", to_str_typed(sender), to_str_typed(instance)) - # calculate_grade(instance.test.engagement.product) def reset_duplicate_before_delete(dupe): diff --git a/dojo/importers/base_importer.py b/dojo/importers/base_importer.py index 1108a7b3354..257411d1c70 100644 --- a/dojo/importers/base_importer.py +++ b/dojo/importers/base_importer.py @@ -1,6 +1,7 @@ import base64 import logging +from celery import chord, group from django.conf import settings from django.core.exceptions import ValidationError from django.core.files.base import ContentFile @@ -10,6 +11,7 @@ from django.utils.timezone import make_aware import dojo.finding.helper as finding_helper +from dojo import utils from dojo.importers.endpoint_manager import EndpointManager from dojo.importers.options import ImporterOptions from dojo.models import ( @@ -24,6 +26,7 @@ Endpoint, FileUpload, Finding, + System_Settings, Test, Test_Import, Test_Import_Finding_Action, @@ -552,6 +555,47 @@ def update_test_type_from_internal_test(self, internal_test: ParserTest) -> None self.test.test_type.dynamic_tool = dynamic_tool self.test.test_type.save() + def maybe_launch_post_processing_chord( + self, + post_processing_task_signatures, + current_batch_number: int, + max_batch_size: int, + * + is_final_batch: bool, + ) -> tuple[list, int, bool]: + """ + Helper to optionally launch a chord of post-processing tasks with a calculate-grade callback + when async is desired. Uses exponential batch sizing up to the configured max batch size. + + Returns a tuple of (post_processing_task_signatures, current_batch_number, launched) + where launched indicates whether a chord/group was dispatched and signatures were reset. + """ + launched = False + if not post_processing_task_signatures: + return post_processing_task_signatures, current_batch_number, launched + + current_batch_size = min(2 ** current_batch_number, max_batch_size) + batch_full = len(post_processing_task_signatures) >= current_batch_size + + if batch_full or is_final_batch: + product = self.test.engagement.product + system_settings = System_Settings.objects.get() + if system_settings.enable_product_grade: + calculate_grade_signature = utils.calculate_grade_signature(product) + chord(post_processing_task_signatures)(calculate_grade_signature) + else: + group(post_processing_task_signatures).apply_async() + + logger.debug( + f"Launched chord with {len(post_processing_task_signatures)} tasks (batch #{current_batch_number}, size: {len(post_processing_task_signatures)})", + ) + post_processing_task_signatures = [] + if not is_final_batch: + current_batch_number += 1 + launched = True + + return post_processing_task_signatures, current_batch_number, launched + def verify_tool_configuration_from_test(self): """ Verify that the Tool_Configuration supplied along with the @@ -763,6 +807,7 @@ def mitigate_finding( note_message: str, *, finding_groups_enabled: bool, + product_grading_option: bool = True, ) -> None: """ Mitigates a finding, all endpoint statuses, leaves a note on the finding @@ -784,9 +829,9 @@ def mitigate_finding( # to avoid pushing a finding group multiple times, we push those outside of the loop if finding_groups_enabled and finding.finding_group: # don't try to dedupe findings that we are closing - finding.save(dedupe_option=False) + finding.save(dedupe_option=False, product_grading_option=product_grading_option) else: - finding.save(dedupe_option=False, push_to_jira=self.push_to_jira) + finding.save(dedupe_option=False, push_to_jira=self.push_to_jira, product_grading_option=product_grading_option) def notify_scan_added( self, diff --git a/dojo/importers/default_importer.py b/dojo/importers/default_importer.py index ed4106971b7..d127ed33f6a 100644 --- a/dojo/importers/default_importer.py +++ b/dojo/importers/default_importer.py @@ -5,8 +5,9 @@ from django.db.models.query_utils import Q from django.urls import reverse -import dojo.finding.helper as finding_helper import dojo.jira_link.helper as jira_helper +from dojo.decorators import we_want_async +from dojo.finding import helper as finding_helper from dojo.importers.base_importer import BaseImporter, Parser from dojo.importers.options import ImporterOptions from dojo.models import ( @@ -16,6 +17,7 @@ Test_Import, ) from dojo.notifications.helper import create_notification +from dojo.utils import perform_product_grading from dojo.validators import clean_tags logger = logging.getLogger(__name__) @@ -155,6 +157,11 @@ def process_findings( parsed_findings: list[Finding], **kwargs: dict, ) -> list[Finding]: + # Progressive batching for chord execution + post_processing_task_signatures = [] + current_batch_number = 1 + max_batch_size = 1024 + """ Saves findings in memory that were parsed from the scan report into the database. This process involves first saving associated objects such as endpoints, files, @@ -166,13 +173,17 @@ def process_findings( logger.debug("starting import of %i parsed findings.", len(parsed_findings) if parsed_findings else 0) group_names_to_findings_dict = {} - for non_clean_unsaved_finding in parsed_findings: - # make sure the severity is something is digestible - unsaved_finding = self.sanitize_severity(non_clean_unsaved_finding) - # Filter on minimum severity if applicable - if Finding.SEVERITIES[unsaved_finding.severity] > Finding.SEVERITIES[self.minimum_severity]: - # finding's severity is below the configured threshold : ignoring the finding + # Pre-sanitize and filter by minimum severity + cleaned_findings = [] + for raw_finding in parsed_findings or []: + sanitized = self.sanitize_severity(raw_finding) + if Finding.SEVERITIES[sanitized.severity] > Finding.SEVERITIES[self.minimum_severity]: + logger.debug("skipping finding due to minimum severity filter (finding=%s severity=%s min=%s)", sanitized.title, sanitized.severity, self.minimum_severity) continue + cleaned_findings.append(sanitized) + + for idx, unsaved_finding in enumerate(cleaned_findings): + is_final_finding = idx == len(cleaned_findings) - 1 # Some parsers provide "mitigated" field but do not set timezone (because they are probably not available in the report) # Finding.mitigated is DateTimeField and it requires timezone @@ -183,7 +194,7 @@ def process_findings( unsaved_finding.reporter = self.user unsaved_finding.last_reviewed_by = self.user unsaved_finding.last_reviewed = self.now - logger.debug("process_parsed_findings: unique_id_from_tool: %s, hash_code: %s, active from report: %s, verified from report: %s", unsaved_finding.unique_id_from_tool, unsaved_finding.hash_code, unsaved_finding.active, unsaved_finding.verified) + logger.debug("process_parsed_finding: unique_id_from_tool: %s, hash_code: %s, active from report: %s, verified from report: %s", unsaved_finding.unique_id_from_tool, unsaved_finding.hash_code, unsaved_finding.active, unsaved_finding.verified) # indicates an override. Otherwise, do not change the value of unsaved_finding.active if self.active is not None: unsaved_finding.active = self.active @@ -205,7 +216,6 @@ def process_findings( # postprocessing will be done after processing related fields like endpoints, vulnerability ids, etc. unsaved_finding.save_no_options() - finding = unsaved_finding # Determine how the finding should be grouped self.process_finding_groups( finding, @@ -225,9 +235,30 @@ def process_findings( new_findings.append(finding) # all data is already saved on the finding, we only need to trigger post processing - # to avoid pushing a finding group multiple times, we push those outside of the loop + # We create a signature for the post processing task so we can decide to apply it async or sync push_to_jira = self.push_to_jira and (not self.findings_groups_enabled or not self.group_by) - finding_helper.post_process_finding_save(finding, dedupe_option=True, rules_option=True, product_grading_option=True, issue_updater_option=True, push_to_jira=push_to_jira) + post_processing_task_signature = finding_helper.post_process_finding_save_signature( + finding, + dedupe_option=True, + rules_option=True, + product_grading_option=False, + issue_updater_option=True, + push_to_jira=push_to_jira, + ) + + post_processing_task_signatures.append(post_processing_task_signature) + + # Check if we should launch a chord (batch full or end of findings) + if we_want_async(async_user=self.user) and post_processing_task_signatures: + post_processing_task_signatures, current_batch_number, _ = self.maybe_launch_post_processing_chord( + post_processing_task_signatures, + current_batch_number, + max_batch_size, + is_final_finding, + ) + else: + # Execute task immediately for synchronous processing + post_processing_task_signature() for (group_name, findings) in group_names_to_findings_dict.items(): finding_helper.add_findings_to_auto_group( @@ -243,6 +274,11 @@ def process_findings( else: jira_helper.push_to_jira(findings[0]) + # Note: All chord batching is now handled within the loop above + + # Always perform an initial grading, even though it might get overwritten later. + perform_product_grading(self.test.engagement.product) + sync = kwargs.get("sync", True) if not sync: return [serialize("json", [finding]) for finding in new_findings] @@ -320,12 +356,17 @@ def close_old_findings( "as it is not present anymore in recent scans." ), finding_groups_enabled=self.findings_groups_enabled, + product_grading_option=False, ) # push finding groups to jira since we only only want to push whole groups if self.findings_groups_enabled and self.push_to_jira: for finding_group in {finding.finding_group for finding in old_findings if finding.finding_group is not None}: jira_helper.push_to_jira(finding_group) + # Calculate grade once after all findings have been closed + if old_findings: + perform_product_grading(self.test.engagement.product) + return old_findings def parse_findings_static_test_type( diff --git a/dojo/importers/default_reimporter.py b/dojo/importers/default_reimporter.py index 455730a647b..7adb2c65c48 100644 --- a/dojo/importers/default_reimporter.py +++ b/dojo/importers/default_reimporter.py @@ -6,6 +6,7 @@ import dojo.finding.helper as finding_helper import dojo.jira_link.helper as jira_helper +from dojo.decorators import we_want_async from dojo.importers.base_importer import BaseImporter, Parser from dojo.importers.options import ImporterOptions from dojo.models import ( @@ -15,6 +16,7 @@ Test, Test_Import, ) +from dojo.utils import perform_product_grading from dojo.validators import clean_tags logger = logging.getLogger(__name__) @@ -176,18 +178,31 @@ def process_findings( self.reactivated_items = [] self.unchanged_items = [] self.group_names_to_findings_dict = {} + # Progressive batching for chord execution + post_processing_task_signatures = [] + current_batch_number = 1 + max_batch_size = 1024 logger.debug(f"starting reimport of {len(parsed_findings) if parsed_findings else 0} items.") logger.debug("STEP 1: looping over findings from the reimported report and trying to match them to existing findings") deduplicationLogger.debug(f"Algorithm used for matching new findings to existing findings: {self.deduplication_algorithm}") - for non_clean_unsaved_finding in parsed_findings: - # make sure the severity is something is digestible - unsaved_finding = self.sanitize_severity(non_clean_unsaved_finding) - # Filter on minimum severity if applicable - if Finding.SEVERITIES[unsaved_finding.severity] > Finding.SEVERITIES[self.minimum_severity]: - # finding's severity is below the configured threshold : ignoring the finding + # Pre-sanitize and filter by minimum severity to avoid loop control pitfalls + cleaned_findings = [] + for raw_finding in parsed_findings or []: + sanitized = self.sanitize_severity(raw_finding) + if Finding.SEVERITIES[sanitized.severity] > Finding.SEVERITIES[self.minimum_severity]: + logger.debug( + "skipping finding due to minimum severity filter (finding=%s severity=%s min=%s)", + getattr(sanitized, "title", ""), + sanitized.severity, + self.minimum_severity, + ) continue + cleaned_findings.append(sanitized) + + for idx, unsaved_finding in enumerate(cleaned_findings): + is_final = idx == len(cleaned_findings) - 1 # Some parsers provide "mitigated" field but do not set timezone (because they are probably not available in the report) # Finding.mitigated is DateTimeField and it requires timezone if unsaved_finding.mitigated and not unsaved_finding.mitigated.tzinfo: @@ -238,9 +253,29 @@ def process_findings( ) # all data is already saved on the finding, we only need to trigger post processing - # to avoid pushing a finding group multiple times, we push those outside of the loop + # Execute post-processing task immediately if async, otherwise execute synchronously push_to_jira = self.push_to_jira and (not self.findings_groups_enabled or not self.group_by) - finding_helper.post_process_finding_save(finding, dedupe_option=True, rules_option=True, product_grading_option=True, issue_updater_option=True, push_to_jira=push_to_jira) + + post_processing_task_signature = finding_helper.post_process_finding_save_signature( + finding, + dedupe_option=True, + rules_option=True, + product_grading_option=False, + issue_updater_option=True, + push_to_jira=push_to_jira, + ) + post_processing_task_signatures.append(post_processing_task_signature) + + # Check if we should launch a chord (batch full or end of findings) + if we_want_async(async_user=self.user) and post_processing_task_signatures: + post_processing_task_signatures, current_batch_number, _ = self.maybe_launch_post_processing_chord( + post_processing_task_signatures, + current_batch_number, + max_batch_size, + is_final, + ) + else: + post_processing_task_signature() self.to_mitigate = (set(self.original_items) - set(self.reactivated_items) - set(self.unchanged_items)) # due to #3958 we can have duplicates inside the same report @@ -252,6 +287,12 @@ def process_findings( self.untouched = set(self.unchanged_items) - set(self.to_mitigate) - set(self.new_items) - set(self.reactivated_items) # Process groups self.process_groups_for_all_findings(**kwargs) + + # Note: All chord batching is now handled within the loop above + + # Synchronous tasks were already executed during processing, just calculate grade + perform_product_grading(self.test.engagement.product) + # Process the results and return them back return self.process_results(**kwargs) @@ -286,6 +327,7 @@ def close_old_findings( finding, f"Mitigated by {self.test.test_type} re-upload.", finding_groups_enabled=self.findings_groups_enabled, + product_grading_option=False, ) mitigated_findings.append(finding) # push finding groups to jira since we only only want to push whole groups @@ -293,6 +335,10 @@ def close_old_findings( for finding_group in {finding.finding_group for finding in findings if finding.finding_group is not None}: jira_helper.push_to_jira(finding_group) + # Calculate grade once after all findings have been closed + if mitigated_findings: + perform_product_grading(self.test.engagement.product) + return mitigated_findings def parse_findings_static_test_type( diff --git a/dojo/models.py b/dojo/models.py index 1adb0dfd797..5bdf195bedb 100644 --- a/dojo/models.py +++ b/dojo/models.py @@ -1652,7 +1652,8 @@ def delete(self, *args, **kwargs): with suppress(Engagement.DoesNotExist, Product.DoesNotExist): # Suppressing a potential issue created from async delete removing # related objects in a separate task - calculate_grade(self.product) + from dojo.utils import perform_product_grading # noqa: PLC0415 circular import + perform_product_grading(self.product) def inherit_tags(self, potentially_existing_tags): # get a copy of the tags to be inherited @@ -2251,13 +2252,15 @@ def hash_code_allows_null_cwe(self): deduplicationLogger.debug(f"HASHCODE_ALLOWS_NULL_CWE is: {hashCodeAllowsNullCwe}") return hashCodeAllowsNullCwe - def delete(self, *args, **kwargs): + def delete(self, *args, product_grading_option=True, **kwargs): logger.debug("%d test delete", self.id) super().delete(*args, **kwargs) - with suppress(Test.DoesNotExist, Engagement.DoesNotExist, Product.DoesNotExist): - # Suppressing a potential issue created from async delete removing - # related objects in a separate task - calculate_grade(self.engagement.product) + if product_grading_option: + with suppress(Test.DoesNotExist, Engagement.DoesNotExist, Product.DoesNotExist): + # Suppressing a potential issue created from async delete removing + # related objects in a separate task + from dojo.utils import perform_product_grading # noqa: PLC0415 circular import + perform_product_grading(self.engagement.product) @property def statistics(self): @@ -2856,15 +2859,17 @@ def copy(self, test=None): return copy - def delete(self, *args, **kwargs): + def delete(self, *args, product_grading_option=True, **kwargs): logger.debug("%d finding delete", self.id) from dojo.finding import helper as finding_helper # noqa: PLC0415 circular import finding_helper.finding_delete(self) super().delete(*args, **kwargs) - with suppress(Finding.DoesNotExist, Test.DoesNotExist, Engagement.DoesNotExist, Product.DoesNotExist): - # Suppressing a potential issue created from async delete removing - # related objects in a separate task - calculate_grade(self.test.engagement.product) + if product_grading_option: + with suppress(Finding.DoesNotExist, Test.DoesNotExist, Engagement.DoesNotExist, Product.DoesNotExist): + # Suppressing a potential issue created from async delete removing + # related objects in a separate task + from dojo.utils import perform_product_grading # noqa: PLC0415 circular import + perform_product_grading(self.test.engagement.product) # only used by bulk risk acceptance api @classmethod @@ -4707,7 +4712,6 @@ def __str__(self): from dojo.utils import ( # noqa: E402 # there is issue due to a circular import - calculate_grade, parse_cvss_data, to_str_typed, ) diff --git a/dojo/tasks.py b/dojo/tasks.py index 8d8de62117b..ee6f2aec3bc 100644 --- a/dojo/tasks.py +++ b/dojo/tasks.py @@ -131,7 +131,7 @@ def async_dupe_delete(*args, **kwargs): logger.info("delete excess duplicates (max_dupes per finding: %s, max deletes per run: %s)", dupe_max, total_duplicate_delete_count_max_per_run) deduplicationLogger.info("delete excess duplicates (max_dupes per finding: %s, max deletes per run: %s)", dupe_max, total_duplicate_delete_count_max_per_run) - # limit to 100 to prevent overlapping jobs + # limit to settings.DUPE_DELETE_MAX_PER_RUN to prevent overlapping jobs results = Finding.objects \ .filter(duplicate=True) \ .order_by() \ @@ -148,13 +148,17 @@ def async_dupe_delete(*args, **kwargs): queryset=Finding.objects.filter(duplicate=True).order_by("date"))) total_deleted_count = 0 + affected_products = set() for original in originals_with_too_many_duplicates: duplicate_list = original.original_finding.all() dupe_count = len(duplicate_list) - dupe_max for finding in duplicate_list: deduplicationLogger.debug(f"deleting finding {finding.id}:{finding.title} ({finding.hash_code}))") - finding.delete() + # Collect the product for batch grading later + affected_products.add(finding.test.engagement.product) + # Skip individual product grading during deletion + finding.delete(product_grading_option=False) total_deleted_count += 1 dupe_count -= 1 if dupe_count <= 0: @@ -167,6 +171,14 @@ def async_dupe_delete(*args, **kwargs): logger.info("total number of excess duplicates deleted: %s", total_deleted_count) + # Batch product grading for all affected products + if affected_products: + system_settings = System_Settings.objects.get() + if system_settings.enable_product_grade: + logger.info("performing batch product grading for %s products", len(affected_products)) + for product in affected_products: + calculate_grade(product) + @app.task(ignore_result=False) def celery_status(): diff --git a/dojo/utils.py b/dojo/utils.py index 20d8a8ff82d..c8cd1edd2aa 100644 --- a/dojo/utils.py +++ b/dojo/utils.py @@ -1555,11 +1555,25 @@ def get_setting(setting): return getattr(settings, setting) +@dojo_model_to_id +@dojo_async_task(signature=True) +@app.task +@dojo_model_from_id(model=Product) +def calculate_grade_signature(product, *args, **kwargs): + """Returns a signature for calculating product grade that can be used in chords or groups.""" + return calculate_grade_internal(product, *args, **kwargs) + + @dojo_model_to_id @dojo_async_task @app.task @dojo_model_from_id(model=Product) def calculate_grade(product, *args, **kwargs): + return calculate_grade_internal(product, *args, **kwargs) + + +def calculate_grade_internal(product, *args, **kwargs): + """Internal function for calculating product grade.""" system_settings = System_Settings.objects.get() if not product: logger.warning("ignoring calculate product for product None!") @@ -1606,6 +1620,12 @@ def calculate_grade(product, *args, **kwargs): logger.debug("Product %s grade %s is up to date", product.id, prod_numeric_grade) +def perform_product_grading(product): + system_settings = System_Settings.objects.get() + if system_settings.enable_product_grade: + calculate_grade(product) + + def get_celery_worker_status(): from .tasks import celery_status # noqa: PLC0415 circular import res = celery_status.apply_async() diff --git a/unittests/test_importers_performance.py b/unittests/test_importers_performance.py index 7cd0d626fd2..2c8754ff874 100644 --- a/unittests/test_importers_performance.py +++ b/unittests/test_importers_performance.py @@ -176,7 +176,7 @@ def import_reimport_performance(self, expected_num_queries1, expected_num_async_ # def test_import_reimport_reimport_performance_async(self, mock): def test_import_reimport_reimport_performance_async(self): self.import_reimport_performance( - expected_num_queries1=679, + expected_num_queries1=680, expected_num_async_tasks1=10, expected_num_queries2=606, expected_num_async_tasks2=22, @@ -198,7 +198,7 @@ def test_import_reimport_reimport_performance_no_async(self): testuser.usercontactinfo.block_execution = True testuser.usercontactinfo.save() self.import_reimport_performance( - expected_num_queries1=679, + expected_num_queries1=680, expected_num_async_tasks1=10, expected_num_queries2=611, expected_num_async_tasks2=22, @@ -222,10 +222,10 @@ def test_import_reimport_reimport_performance_no_async_with_product_grading(self self.system_settings(enable_product_grade=True) self.import_reimport_performance( - expected_num_queries1=684, - expected_num_async_tasks1=15, - expected_num_queries2=617, - expected_num_async_tasks2=28, - expected_num_queries3=299, - expected_num_async_tasks3=25, + expected_num_queries1=681, + expected_num_async_tasks1=11, + expected_num_queries2=612, + expected_num_async_tasks2=23, + expected_num_queries3=295, + expected_num_async_tasks3=21, )