Skip to content

Commit 3cd24d1

Browse files
watson: perform async index updates (#13152)
* watson: perform async index updates * watson: perform async index updates * watson: perform async index updates * ruff
1 parent fa958b8 commit 3cd24d1

5 files changed

Lines changed: 294 additions & 2 deletions

File tree

dojo/middleware.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
from django.http import HttpResponseRedirect
1313
from django.urls import reverse
1414
from django.utils.functional import SimpleLazyObject
15+
from watson.middleware import SearchContextMiddleware
16+
from watson.search import search_context_manager
1517

1618
from dojo.models import Dojo_User
1719
from dojo.product_announcements import LongRunningRequestProductAnnouncement
@@ -210,3 +212,76 @@ def __call__(self, request):
210212
LongRunningRequestProductAnnouncement(request=request, duration=duration)
211213

212214
return response
215+
216+
217+
class AsyncSearchContextMiddleware(SearchContextMiddleware):
218+
219+
"""
220+
Ensures Watson index updates are triggered asynchronously.
221+
Inherits from watson's SearchContextMiddleware to minimize the amount of code we need to maintain.
222+
"""
223+
224+
def _close_search_context(self, request):
225+
"""Override watson's close behavior to trigger async updates when above threshold."""
226+
if search_context_manager.is_active():
227+
from django.conf import settings # noqa: PLC0415 circular import
228+
229+
# Extract tasks and check if we should trigger async update
230+
captured_tasks = self._extract_tasks_for_async()
231+
232+
# Get total number of instances across all model types
233+
total_instances = sum(len(pk_list) for pk_list in captured_tasks.values())
234+
threshold = getattr(settings, "WATSON_ASYNC_INDEX_UPDATE_THRESHOLD", 100)
235+
236+
# If threshold is below 0, async updating is disabled
237+
if threshold < 0:
238+
logger.debug(f"AsyncSearchContextMiddleware: Async updating disabled (threshold={threshold}), using synchronous update")
239+
elif total_instances > threshold:
240+
logger.debug(f"AsyncSearchContextMiddleware: {total_instances} instances > {threshold} threshold, triggering async update")
241+
self._trigger_async_index_update(captured_tasks)
242+
# Invalidate to prevent synchronous index update by super()._close_search_context()
243+
search_context_manager.invalidate()
244+
else:
245+
logger.debug(f"AsyncSearchContextMiddleware: {total_instances} instances <= {threshold} threshold, using synchronous update")
246+
# Let watson handle synchronous update for small numbers
247+
248+
super()._close_search_context(request)
249+
250+
def _extract_tasks_for_async(self):
251+
"""Extract tasks from the search context and group by model type for async processing."""
252+
current_tasks, _is_invalid = search_context_manager._stack[-1]
253+
254+
# Group by model type for efficient batch processing
255+
model_groups = {}
256+
for _engine, obj in current_tasks:
257+
model_key = f"{obj._meta.app_label}.{obj._meta.model_name}"
258+
if model_key not in model_groups:
259+
model_groups[model_key] = []
260+
model_groups[model_key].append(obj.pk)
261+
262+
# Log what we extracted per model type
263+
for model_key, pk_list in model_groups.items():
264+
logger.debug(f"AsyncSearchContextMiddleware: Extracted {len(pk_list)} {model_key} instances for async indexing")
265+
266+
return model_groups
267+
268+
def _trigger_async_index_update(self, model_groups):
269+
"""Trigger async tasks to update search indexes, chunking large lists into batches of settings.WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE."""
270+
if not model_groups:
271+
return
272+
273+
# Import here to avoid circular import
274+
from django.conf import settings # noqa: PLC0415 circular import
275+
276+
from dojo.tasks import update_watson_search_index_for_model # noqa: PLC0415 circular import
277+
278+
# Create tasks per model type, chunking large lists into configurable batches
279+
for model_name, pk_list in model_groups.items():
280+
# Chunk into batches using configurable batch size (compatible with Python 3.11)
281+
batch_size = getattr(settings, "WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE", 1000)
282+
batches = [pk_list[i:i + batch_size] for i in range(0, len(pk_list), batch_size)]
283+
284+
# Create tasks for each batch and log each one
285+
for i, batch in enumerate(batches, 1):
286+
logger.debug(f"AsyncSearchContextMiddleware: Triggering batch {i}/{len(batches)} for {model_name}: {len(batch)} instances")
287+
update_watson_search_index_for_model(model_name, batch)

dojo/settings/settings.dist.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@
8686
DD_CELERY_TASK_SERIALIZER=(str, "pickle"),
8787
DD_CELERY_PASS_MODEL_BY_ID=(str, True),
8888
DD_CELERY_LOG_LEVEL=(str, "INFO"),
89+
# Minimum number of model updated instances before search index updates as performaed asynchronously. Set to -1 to disable async updates.
90+
DD_WATSON_ASYNC_INDEX_UPDATE_THRESHOLD=(int, 100),
91+
DD_WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE=(int, 1000),
8992
DD_FOOTER_VERSION=(str, ""),
9093
# models should be passed to celery by ID, default is False (for now)
9194
DD_FORCE_LOWERCASE_TAGS=(bool, True),
@@ -918,9 +921,9 @@ def generate_url(scheme, double_slashes, user, password, host, port, path, param
918921
"dojo.middleware.LoginRequiredMiddleware",
919922
"dojo.middleware.AdditionalHeaderMiddleware",
920923
"social_django.middleware.SocialAuthExceptionMiddleware",
921-
"watson.middleware.SearchContextMiddleware",
922-
"dojo.middleware.AuditlogMiddleware",
923924
"crum.CurrentRequestUserMiddleware",
925+
"dojo.middleware.AuditlogMiddleware",
926+
"dojo.middleware.AsyncSearchContextMiddleware",
924927
"dojo.request_cache.middleware.RequestCacheMiddleware",
925928
"dojo.middleware.LongRunningRequestAlertMiddleware",
926929
]
@@ -1161,6 +1164,10 @@ def saml2_attrib_map_format(din):
11611164

11621165
CELERY_IMPORTS = ("dojo.tools.tool_issue_updater", )
11631166

1167+
# Watson async index update settings
1168+
WATSON_ASYNC_INDEX_UPDATE_THRESHOLD = env("DD_WATSON_ASYNC_INDEX_UPDATE_THRESHOLD")
1169+
WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE = env("DD_WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE")
1170+
11641171
# Celery beat scheduled tasks
11651172
CELERY_BEAT_SCHEDULE = {
11661173
"add-alerts": {

dojo/tasks.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
from auditlog.models import LogEntry
55
from celery.utils.log import get_task_logger
66
from dateutil.relativedelta import relativedelta
7+
from django.apps import apps
78
from django.conf import settings
89
from django.core.management import call_command
910
from django.db.models import Count, Prefetch
1011
from django.urls import reverse
1112
from django.utils import timezone
1213

1314
from dojo.celery import app
15+
from dojo.decorators import dojo_async_task
1416
from dojo.finding.helper import fix_loop_duplicates
1517
from dojo.management.commands.jira_status_reconciliation import jira_status_reconciliation
1618
from dojo.models import Alerts, Announcement, Endpoint, Engagement, Finding, Product, System_Settings, User
@@ -222,3 +224,53 @@ def evaluate_pro_proposition(*args, **kwargs):
222224
@app.task
223225
def clear_sessions(*args, **kwargs):
224226
call_command("clearsessions")
227+
228+
229+
@dojo_async_task
230+
@app.task
231+
def update_watson_search_index_for_model(model_name, pk_list, *args, **kwargs):
232+
"""
233+
Async task to update watson search indexes for a specific model type.
234+
235+
Args:
236+
model_key: Model identifier like 'dojo.finding'
237+
pk_list: List of primary keys for instances of this model type. it's advised to chunk the list into batches of 1000 or less.
238+
239+
"""
240+
from watson.search import SearchContextManager, default_search_engine # noqa: PLC0415 circular import
241+
242+
logger.debug(f"Starting async watson index update for {len(pk_list)} {model_name} instances")
243+
244+
try:
245+
# Create new SearchContextManager and start it
246+
context_manager = SearchContextManager()
247+
context_manager.start()
248+
249+
# Get the default engine and model class
250+
engine = default_search_engine
251+
app_label, model_name = model_name.split(".")
252+
model_class = apps.get_model(app_label, model_name)
253+
254+
# Bulk load instances and add them to the context
255+
instances = model_class.objects.filter(pk__in=pk_list)
256+
instances_added = 0
257+
instances_skipped = 0
258+
259+
for instance in instances:
260+
try:
261+
# Add to watson context (this will trigger indexing on end())
262+
context_manager.add_to_context(engine, instance)
263+
instances_added += 1
264+
except Exception as e:
265+
logger.warning(f"Skipping {model_name}:{instance.pk} - {e}")
266+
instances_skipped += 1
267+
continue
268+
269+
# Let watson handle the bulk indexing
270+
context_manager.end()
271+
272+
logger.info(f"Completed async watson index update: {instances_added} updated, {instances_skipped} skipped")
273+
274+
except Exception as e:
275+
logger.error(f"Watson async index update failed for {model_name}: {e}")
276+
raise
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
<?xml version="1.0"?>
2+
<ScanGroup>
3+
<Scan>
4+
<Name>WatsonUniqueTest</Name>
5+
<ShortName>Watson Unique Short Name</ShortName>
6+
<StartURL>https://watson-unique-test.com</StartURL>
7+
<StartTime>10/09/2025, 18:09:55</StartTime>
8+
<FinishTime>10/09/2025, 21:42:41</FinishTime>
9+
<ScanTime>212 minutes, 4 seconds</ScanTime>
10+
<Aborted>False</Aborted>
11+
<Responsive>True</Responsive>
12+
<Banner></Banner>
13+
<Os></Os>
14+
<WebServer>Apache-Coyote/1.1</WebServer>
15+
<ReportItems>
16+
<ReportItem>
17+
<Name>WatsonUniqueReportItem2025</Name>
18+
<ModuleName>WatsonUniqueTestModule</ModuleName>
19+
<Details>Watson Unique Test Details</Details>
20+
<Affects><![CDATA[/watson/unique/path]]></Affects>
21+
<Parameter></Parameter>
22+
<AOP_SourceFile></AOP_SourceFile>
23+
<AOP_SourceLine></AOP_SourceLine>
24+
<AOP_Additional></AOP_Additional>
25+
<IsFalsePositive></IsFalsePositive>
26+
<Severity>high</Severity>
27+
<Type>xss</Type>
28+
<Impact>Watson Unique Test Impact XYZ123</Impact>
29+
<DetailedInformation>Watson Unique Test Detailed Information ABC456</DetailedInformation>
30+
<Recommendation>Watson Unique Test Recommendation DEF789</Recommendation>
31+
<Description>Watson Unique Test Description GHI000</Description>
32+
<CWEList>
33+
<CWE id="79"><![CDATA[CWE-79]]></CWE>
34+
</CWEList>
35+
<References>
36+
<Reference>
37+
<URL>https://watson-unique-ref.com</URL>
38+
</Reference>
39+
</References>
40+
</ReportItem>
41+
</ReportItems>
42+
</Scan>
43+
</ScanGroup>
44+
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
from django.contrib.auth.models import User
2+
from django.test import override_settings
3+
from django.utils import timezone
4+
from rest_framework.authtoken.models import Token
5+
from rest_framework.test import APIClient
6+
from watson import search as watson
7+
8+
from dojo.models import Development_Environment, Engagement, Finding, Product, Product_Type, UserContactInfo
9+
10+
from .dojo_test_case import DojoAPITestCase
11+
12+
13+
class TestWatsonAsyncSearchIndex(DojoAPITestCase):
14+
15+
"""Test Watson search indexing works correctly for both sync and async updates."""
16+
17+
def setUp(self):
18+
"""Set up test data and API client."""
19+
super().setUp()
20+
21+
self.testuser = User.objects.create(username="admin", is_staff=True, is_superuser=True)
22+
UserContactInfo.objects.create(user=self.testuser, block_execution=True)
23+
24+
self.system_settings(enable_webhooks_notifications=False)
25+
self.system_settings(enable_product_grade=False)
26+
self.system_settings(enable_github=False)
27+
28+
# Create API client with authentication
29+
self.token = Token.objects.create(user=self.testuser)
30+
self.client = APIClient()
31+
self.client.force_login(self.testuser)
32+
33+
# Create test product type and product
34+
self.product_type = Product_Type.objects.create(name="Test Product Type")
35+
self.product = Product.objects.create(
36+
name="Test Product",
37+
description="Test product for Watson indexing",
38+
prod_type=self.product_type,
39+
)
40+
self.engagement = Engagement.objects.create(
41+
name="Test Engagement",
42+
product=self.product,
43+
target_start=timezone.now(),
44+
target_end=timezone.now(),
45+
)
46+
47+
# Create Development Environment
48+
Development_Environment.objects.get_or_create(name="Development")
49+
50+
def _import_acunetix_scan(self):
51+
"""Import an Acunetix scan and return the response."""
52+
return self.import_scan_with_params(
53+
filename="scans/acunetix/watson_test_unique.xml",
54+
scan_type="Acunetix Scan",
55+
engagement=self.engagement.id,
56+
)
57+
58+
def _search_watson_for_finding(self, search_term):
59+
"""Search Watson index for findings containing the search term."""
60+
# Search the Watson index
61+
return watson.search(search_term, models=(Finding,))
62+
63+
def _import_and_check_watson_index(self, expected_message):
64+
"""Common test logic for importing scan and verifying Watson indexing works."""
65+
# Verify no findings exist before import
66+
search_results = self._search_watson_for_finding("WatsonUniqueReportItem2025")
67+
found_finding_ids_before = [result.pk for result in search_results]
68+
self.assertEqual(len(found_finding_ids_before), 0, "Should have no findings before import")
69+
70+
# Import the scan
71+
response_data = self._import_acunetix_scan()
72+
73+
# Get test ID from response
74+
test_id = response_data["test_id"]
75+
76+
# Verify finding was created
77+
findings = Finding.objects.filter(test_id=test_id)
78+
self.assertEqual(findings.count(), 1, "Should have created exactly one finding")
79+
finding = findings.first()
80+
81+
self.assertIn("WatsonUniqueReportItem2025", finding.title, "Finding should contain 'WatsonUniqueReportItem2025' in title")
82+
83+
# Search Watson index for the finding
84+
search_results = self._search_watson_for_finding("WatsonUniqueReportItem2025")
85+
86+
# Verify finding is in search index
87+
found_finding_ids = [result.object.pk for result in search_results]
88+
89+
self.assertIn(finding.pk, found_finding_ids, expected_message.format(finding_id=finding.pk))
90+
91+
return finding
92+
93+
def test_sync_watson_indexing_single_finding(self):
94+
"""Test that single finding import uses sync indexing and finding is searchable."""
95+
# Default threshold is 100, so single finding should use sync indexing
96+
self._import_and_check_watson_index(
97+
"Finding {finding_id} should be found in Watson search index",
98+
)
99+
100+
@override_settings(WATSON_ASYNC_INDEX_UPDATE_THRESHOLD=0)
101+
def test_async_watson_indexing_single_finding(self):
102+
"""Test that with threshold=0, single finding uses async indexing and is searchable."""
103+
# With threshold=0, even single finding should trigger async indexing
104+
self._import_and_check_watson_index(
105+
"Finding {finding_id} should be found in Watson search index after async update",
106+
)
107+
108+
@override_settings(WATSON_ASYNC_INDEX_UPDATE_THRESHOLD=-1)
109+
def test_disabled_async_watson_indexing(self):
110+
"""Test that with threshold=-1, async is disabled and sync indexing works."""
111+
# With threshold=-1, async should be completely disabled
112+
self._import_and_check_watson_index(
113+
"Finding {finding_id} should be found in Watson search index with sync update",
114+
)

0 commit comments

Comments
 (0)