From eb9d5d8554e53c5472999aafcb0a4da0275ab7e1 Mon Sep 17 00:00:00 2001 From: sujata-m Date: Mon, 1 Jun 2026 19:14:57 +0530 Subject: [PATCH] Fix Service Bus lock renewal for long-running topic callbacks **Summary** - Adds parent-side proactive lock renewal for in-flight Azure topic messages while callback workers are still running. - Registers messages with `AutoLockRenewer` before starting callback execution. - Tracks in-flight callback tasks and releases tracking during settlement. - Cancels process workers only after the message lock has actually expired, preventing duplicate long-running processing after lock loss. - Adds configurable process fallback behavior with `TOPIC_CALLBACK_PROCESS_FALLBACK_MODE`. - Bumps package version to `0.0.26`. - Adds unit coverage for proactive renewal, lock-loss handling, fallback behavior, and in-flight cleanup. **Testing** - `python -m pytest` - `273 passed`, `1` existing warning. --- .github/workflows/unit_tests.yml | 36 ++- CHANGELOG.md | 7 + README.md | 5 +- src/python_ms_core/core/topic/azure_topic.py | 207 ++++++++++++++++- src/python_ms_core/version.py | 2 +- .../unit_tests/test_topic/test_azure_topic.py | 216 +++++++++++++++++- 6 files changed, 462 insertions(+), 11 deletions(-) diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index 094f267..1476ab0 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -2,11 +2,13 @@ name: Unit Tests on: workflow_dispatch: push: - branches-ignore: - - '**' + branches: [ develop, main ] pull_request: branches: [ main, develop ] +permissions: + contents: write + jobs: UnitTest: runs-on: ubuntu-latest @@ -59,6 +61,36 @@ jobs: run: | coverage report --fail-under=85 + - name: Generate coverage badge + run: | + pip install coverage-badge + mkdir -p badge-out + coverage-badge -f -o badge-out/coverage.svg + + - name: Publish coverage badge to badges branch + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: | + set -e + cp badge-out/coverage.svg /tmp/coverage.svg + git config user.name "github-actions[bot]" + git config user.email "41898282+github-actions[bot]@users.noreply.github.com" + git fetch origin badges || true + if git show-ref --verify --quiet refs/remotes/origin/badges; then + git checkout badges + else + git checkout --orphan badges + git rm -rf . >/dev/null 2>&1 || true + fi + cp /tmp/coverage.svg coverage.svg + git add coverage.svg + if ! git diff --cached --quiet; then + git commit -m "chore: update coverage badge" + git push "https://x-access-token:${GITHUB_TOKEN}@github.com/${GITHUB_REPOSITORY}.git" HEAD:badges + else + echo "No badge changes to commit." + fi + - name: Upload report to Azure uses: LanceMcCarthy/Action-AzureBlobUpload@v2 with: diff --git a/CHANGELOG.md b/CHANGELOG.md index d1ee198..f8d2b98 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Change log +# Version 0.0.26 +### Fixes +- **Configurable Process Callback Fallback:** Process-mode topic callbacks keep the existing thread fallback by default for backward compatibility, but services can set `TOPIC_CALLBACK_PROCESS_FALLBACK_MODE=error` to prevent long-running CPU-bound work from silently falling back into the receiver/lock-renewal process. +- **Lock Renewal Registration Timing:** Registers received messages with `AutoLockRenewer` before starting callback execution so renewal coverage begins before worker startup. +- **Parent-Side Proactive Lock Renewal:** The receiver loop now proactively renews locks for in-flight messages before expiry, independent of callback execution, so long-running jobs do not rely only on SDK background renewal threads. +- **Lock-Loss Worker Cancellation:** Cancels tracked callback workers only after the message lock is actually expired, preventing a long-running process worker from continuing after the message can become visible again. + # Version 0.0.25 ### Fixes - **Azure Topic Settlement Stability:** Moved Azure Service Bus message settlement back onto the receiver-owning loop instead of settling from worker callback threads. This keeps receive and complete/abandon operations on the same receiver flow for long-running jobs. diff --git a/README.md b/README.md index ecf6da7..4bd0ceb 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,8 @@ # Core package for microservice +[![python-ms-core](https://img.shields.io/pypi/v/python-ms-core?label=python-ms-core&cacheSeconds=60&t=1)](https://pypi.org/project/python-ms-core/) +[![Unit Tests](https://github.com/TaskarCenterAtUW/TDEI-Python-ms-core/actions/workflows/unit_tests.yml/badge.svg)](https://github.com/TaskarCenterAtUW/TDEI-Python-ms-core/actions/workflows/unit_tests.yml) +![Coverage](https://raw.githubusercontent.com/TaskarCenterAtUW/TDEI-Python-ms-core/actions/badges/coverage.svg) ## System requirements | Software | Version | @@ -510,4 +513,4 @@ test_subscribe_with_subscription (test_topic.test_topic.TestTopic) ... ok Ran 174 tests in 8.175s OK -``` \ No newline at end of file +``` diff --git a/src/python_ms_core/core/topic/azure_topic.py b/src/python_ms_core/core/topic/azure_topic.py index 4cf741e..55dd556 100644 --- a/src/python_ms_core/core/topic/azure_topic.py +++ b/src/python_ms_core/core/topic/azure_topic.py @@ -1,3 +1,4 @@ +import datetime import json import logging import multiprocessing as mp @@ -48,6 +49,7 @@ def __init__(self, config: TopicConfig=None, topic_name=None, max_concurrent_mes self.publisher = self.client.get_topic_sender(topic_name=topic_name) self.executor = ThreadPoolExecutor(max_workers=max_concurrent_messages) self.callback_execution_mode = self._get_callback_execution_mode() + self.callback_process_fallback_mode = self._get_process_fallback_mode() self.callback_process_start_method = self._get_process_start_method() self.process_context = self._get_process_context() self.internal_count = 0 @@ -65,6 +67,9 @@ def __init__(self, config: TopicConfig=None, topic_name=None, max_concurrent_mes self.wait_time_for_message = 5 self.thread_lock = threading.Lock() self.pending_tasks = [] + self.inflight_tasks = {} + self.inflight_lock_renewal_state = {} + self.lock_renew_retry_interval = 5 def publish(self, data: QueueMessage): @@ -89,6 +94,7 @@ def subscribe(self, subscription: str, callback, max_receivable_messages=-1): self.receiver.local_received_messages = 0 while True: try: + self._renew_inflight_message_locks() self._settle_completed_tasks() to_receive = self._get_receivable_count(max_receivable_messages=max_receivable_messages) if max_receivable_messages > 0 and self.receiver.local_received_messages >= max_receivable_messages: @@ -106,13 +112,14 @@ def subscribe(self, subscription: str, callback, max_receivable_messages=-1): with self.thread_lock: self.internal_count += len(messages) for message in messages: - execution_task = self._submit_processing_task(message, callback) self.lock_renewal.register( self.receiver, message, max_lock_renewal_duration=self.max_renewal_duration, on_lock_renew_failure=self._handle_lock_renew_failure, ) + execution_task = self._submit_processing_task(message, callback) + self._track_inflight_task(message, execution_task) self.pending_tasks.append((execution_task, message)) else: if len(self.pending_tasks) > 0: @@ -152,11 +159,23 @@ def _submit_processing_task(self, message, callback): try: return self._submit_process_task(message_payload, callback) except Exception as exc: - logger.warning( - 'Falling back to thread execution for message %s because process start failed: %s', - self._get_message_id(message), + message_id = self._get_message_id(message) + if self.callback_process_fallback_mode == 'thread': + logger.warning( + 'Falling back to thread execution for message %s because process start failed: %s', + message_id, + exc, + ) + return self._submit_thread_task(message_payload, callback) + logger.error( + 'Process execution failed for message %s and thread fallback is disabled: %s', + message_id, exc, ) + return _ImmediateExecutionTask({ + 'success': False, + 'error': f'Process execution failed and thread fallback is disabled: {exc}', + }) return self._submit_thread_task(message_payload, callback) def _submit_thread_task(self, message_payload, callback): @@ -197,9 +216,11 @@ def _wait_for_pending_tasks(self, timeout=0.5): return deadline = time.time() + timeout while time.time() < deadline: + self._renew_inflight_message_locks() if any(task.done() for task, _ in self.pending_tasks): break time.sleep(min(0.1, max(deadline - time.time(), 0))) + self._renew_inflight_message_locks() self._settle_completed_tasks() def _settle_completed_tasks(self): @@ -247,6 +268,8 @@ def _settle_task(self, x, incoming_message=None): except Exception as e: logger.error(f'Error in settling message: {e}') finally: + if incoming_message is not None: + self._release_inflight_task(incoming_message) with self.thread_lock: self.internal_count = max(self.internal_count - 1, 0) return @@ -258,11 +281,130 @@ def _handle_lock_renew_failure(self, renewable, error): f'Error renewing lock for message {message_id}: {failure_reason}; ' f'locked_until_utc={getattr(renewable, "locked_until_utc", None)}' ) + if getattr(renewable, '_lock_expired', False): + self._cancel_inflight_task( + renewable, + f'message lock expired after renewal failure: {failure_reason}', + ) + + def _track_inflight_task(self, message, task): + with self.thread_lock: + message_key = self._get_message_key(message) + self.inflight_tasks[message_key] = task + self.inflight_lock_renewal_state[message_key] = { + 'message': message, + 'renew_margin': self._get_lock_renewal_margin_seconds(message), + 'last_renew_attempt': 0, + } + + def _release_inflight_task(self, message): + with self.thread_lock: + message_key = self._get_message_key(message) + self.inflight_tasks.pop(message_key, None) + self.inflight_lock_renewal_state.pop(message_key, None) + + def _renew_inflight_message_locks(self): + with self.thread_lock: + renewal_items = [ + ( + message_key, + state['message'], + self.inflight_tasks.get(message_key), + state['renew_margin'], + state['last_renew_attempt'], + ) + for message_key, state in self.inflight_lock_renewal_state.items() + ] + + for message_key, message, task, renew_margin, last_renew_attempt in renewal_items: + if task is None or task.done(): + continue + if getattr(message, '_lock_expired', False): + self._handle_lock_renew_failure(message, getattr(message, 'auto_renew_error', None)) + continue + + locked_until_utc = getattr(message, 'locked_until_utc', None) + if locked_until_utc is None: + continue + + remaining_seconds = (locked_until_utc - self._utc_now_for(locked_until_utc)).total_seconds() + if remaining_seconds > renew_margin: + continue + + current_time = time.time() + if current_time - last_renew_attempt < self.lock_renew_retry_interval: + continue + + with self.thread_lock: + state = self.inflight_lock_renewal_state.get(message_key) + if state is None: + continue + state['last_renew_attempt'] = current_time + + try: + renewed_until = self.receiver.renew_message_lock(message) + logger.info( + 'Renewed lock for message %s until %s', + self._get_message_id(message), + renewed_until, + ) + except Exception as exc: + message.auto_renew_error = exc + logger.error( + 'Error proactively renewing lock for message %s: %s; locked_until_utc=%s', + self._get_message_id(message), + exc, + getattr(message, 'locked_until_utc', None), + ) + if getattr(message, '_lock_expired', False): + self._handle_lock_renew_failure(message, exc) + + def _get_lock_renewal_margin_seconds(self, message): + locked_until_utc = getattr(message, 'locked_until_utc', None) + received_timestamp_utc = getattr(message, '_received_timestamp_utc', None) + if ( + isinstance(locked_until_utc, datetime.datetime) + and isinstance(received_timestamp_utc, datetime.datetime) + ): + lock_duration_seconds = (locked_until_utc - received_timestamp_utc).total_seconds() + if lock_duration_seconds > 0: + return max(5, min(self.lock_renewal_margin, lock_duration_seconds * 0.5)) + return min(self.lock_renewal_margin, 30) + + def _cancel_inflight_task(self, message, reason): + with self.thread_lock: + task = self.inflight_tasks.get(self._get_message_key(message)) + if task is None or task.done(): + return + message_id = self._get_message_id(message) + if task.cancel(reason): + logger.error( + 'Cancelled callback worker for message %s because %s', + message_id, + reason, + ) + else: + logger.error( + 'Unable to cancel callback worker for message %s after %s', + message_id, + reason, + ) @staticmethod def _get_message_id(message): return getattr(message, 'message_id', None) or getattr(message, 'messageId', 'unknown') + @staticmethod + def _get_message_key(message): + return id(message) + + @staticmethod + def _utc_now_for(value): + now = datetime.datetime.now(datetime.timezone.utc) + if getattr(value, 'tzinfo', None) is None: + return now.replace(tzinfo=None) + return now + @staticmethod def _get_callback_execution_mode(): value = os.environ.get('TOPIC_CALLBACK_EXECUTION_MODE', 'process') @@ -275,6 +417,18 @@ def _get_callback_execution_mode(): ) return 'process' + @staticmethod + def _get_process_fallback_mode(): + value = os.environ.get('TOPIC_CALLBACK_PROCESS_FALLBACK_MODE', 'thread') + normalized = str(value).strip().lower() + if normalized in ('error', 'thread'): + return normalized + logger.warning( + 'Invalid value for TOPIC_CALLBACK_PROCESS_FALLBACK_MODE: %s. Using thread.', + value, + ) + return 'thread' + @staticmethod def _get_process_start_method(): available_methods = mp.get_all_start_methods() @@ -318,6 +472,20 @@ def _run_callback_in_subprocess(message_payload, callbackfn, result_connection): result_connection.close() +class _ImmediateExecutionTask: + def __init__(self, result): + self._result = result + + def done(self): + return True + + def result(self): + return self._result + + def cancel(self, reason): + return False + + class _FutureExecutionTask: def __init__(self, future): self._future = future @@ -328,15 +496,19 @@ def done(self): def result(self): return self._future.result() + def cancel(self, reason): + return self._future.cancel() + class _ProcessExecutionTask: def __init__(self, process, result_connection): self._process = process self._result_connection = result_connection self._result = None + self._connection_closed = False def done(self): - return not self._process.is_alive() + return self._result is not None or not self._process.is_alive() def result(self): if self._result is not None: @@ -352,6 +524,29 @@ def result(self): 'error': f'Callback worker exited with code {self._process.exitcode} without returning a result.', } finally: - self._result_connection.close() + self._close_result_connection() return self._result + + def cancel(self, reason): + if self.done(): + return False + + self._process.terminate() + self._process.join(timeout=5) + if self._process.is_alive() and hasattr(self._process, 'kill'): + self._process.kill() + self._process.join(timeout=5) + + self._result = { + 'success': False, + 'error': f'Callback worker terminated because {reason}.', + } + self._close_result_connection() + return True + + def _close_result_connection(self): + if self._connection_closed: + return + self._connection_closed = True + self._result_connection.close() diff --git a/src/python_ms_core/version.py b/src/python_ms_core/version.py index 858d0a1..af2cb9a 100644 --- a/src/python_ms_core/version.py +++ b/src/python_ms_core/version.py @@ -1 +1 @@ -__version__ = '0.0.25' +__version__ = '0.0.26' diff --git a/tests/unit_tests/test_topic/test_azure_topic.py b/tests/unit_tests/test_topic/test_azure_topic.py index 6b68937..fb914ed 100644 --- a/tests/unit_tests/test_topic/test_azure_topic.py +++ b/tests/unit_tests/test_topic/test_azure_topic.py @@ -1,3 +1,4 @@ +import datetime import os import unittest from unittest.mock import MagicMock, patch @@ -46,6 +47,7 @@ def test_init_sets_process_execution_defaults( topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1) self.assertEqual(topic.callback_execution_mode, 'process') + self.assertEqual(topic.callback_process_fallback_mode, 'thread') self.assertEqual(topic.callback_process_start_method, 'fork') self.assertIs(topic.process_context, mock_process_context) mock_auto_lock_renewer.assert_called_once() @@ -87,7 +89,7 @@ def test_submit_processing_task_uses_process_runner_by_default( topic._submit_process_task.assert_called_once_with('{"message":"hello"}', mock_callback) topic._submit_thread_task.assert_not_called() - @patch.dict(os.environ, {}, clear=True) + @patch.dict(os.environ, {'TOPIC_CALLBACK_PROCESS_FALLBACK_MODE': 'thread'}, clear=True) @patch('src.python_ms_core.core.topic.azure_topic.logger') @patch('src.python_ms_core.core.topic.azure_topic.mp.get_context') @patch('src.python_ms_core.core.topic.azure_topic.mp.get_all_start_methods', return_value=['fork', 'spawn']) @@ -129,6 +131,55 @@ def test_submit_processing_task_falls_back_to_thread_when_process_start_fails( self.assertEqual(warning_args[1], 'message-1') self.assertEqual(str(warning_args[2]), 'process boom') + @patch.dict(os.environ, {'TOPIC_CALLBACK_PROCESS_FALLBACK_MODE': 'error'}, clear=True) + @patch('src.python_ms_core.core.topic.azure_topic.logger') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_context') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_all_start_methods', return_value=['fork', 'spawn']) + @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') + @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') + def test_submit_processing_task_returns_failure_when_process_fails_and_thread_fallback_disabled( + self, + mock_service_bus_client, + mock_auto_lock_renewer, + mock_get_all_start_methods, + mock_get_context, + mock_logger, + ): + mock_client = MagicMock() + mock_config = MagicMock(connection_string='Endpoint=sb://test/') + mock_message = MagicMock() + mock_callback = MagicMock() + + mock_service_bus_client.from_connection_string.return_value = mock_client + mock_client.get_topic_sender.return_value = MagicMock() + mock_get_context.return_value = MagicMock() + mock_message.message_id = 'message-1' + mock_message.__str__.return_value = '{"message":"hello"}' + + topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1) + topic._submit_process_task = MagicMock(side_effect=RuntimeError('process boom')) + topic._submit_thread_task = MagicMock(return_value='thread-task') + + task = topic._submit_processing_task(mock_message, mock_callback) + + self.assertTrue(task.done()) + self.assertEqual( + task.result(), + { + 'success': False, + 'error': 'Process execution failed and thread fallback is disabled: process boom', + }, + ) + topic._submit_thread_task.assert_not_called() + mock_logger.error.assert_called_once() + error_args = mock_logger.error.call_args[0] + self.assertEqual( + error_args[0], + 'Process execution failed for message %s and thread fallback is disabled: %s', + ) + self.assertEqual(error_args[1], 'message-1') + self.assertEqual(str(error_args[2]), 'process boom') + @patch.dict(os.environ, {}, clear=True) @patch('src.python_ms_core.core.topic.azure_topic.mp.get_context') @patch('src.python_ms_core.core.topic.azure_topic.mp.get_all_start_methods', return_value=['fork', 'spawn']) @@ -343,6 +394,169 @@ def test_settle_task_skips_expired_message( ) self.assertEqual(topic.internal_count, 0) + @patch.dict(os.environ, {}, clear=True) + @patch('src.python_ms_core.core.topic.azure_topic.logger') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_context') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_all_start_methods', return_value=['fork', 'spawn']) + @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') + @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') + def test_handle_lock_renew_failure_cancels_running_inflight_task( + self, + mock_service_bus_client, + mock_auto_lock_renewer, + mock_get_all_start_methods, + mock_get_context, + mock_logger, + ): + mock_client = MagicMock() + mock_config = MagicMock(connection_string='Endpoint=sb://test/') + mock_message = MagicMock() + mock_task = MagicMock() + + mock_service_bus_client.from_connection_string.return_value = mock_client + mock_client.get_topic_sender.return_value = MagicMock() + mock_get_context.return_value = MagicMock() + mock_message.message_id = 'message-1' + mock_message.locked_until_utc = '2026-03-17T09:39:28Z' + mock_message._lock_expired = True + mock_task.done.return_value = False + mock_task.cancel.return_value = True + + topic = AzureTopic(config=mock_config, topic_name='mock-topic') + topic._track_inflight_task(mock_message, mock_task) + + topic._handle_lock_renew_failure(mock_message, RuntimeError('renew failed')) + + mock_task.cancel.assert_called_once_with('message lock expired after renewal failure: renew failed') + self.assertEqual(mock_logger.error.call_count, 2) + self.assertEqual( + mock_logger.error.call_args_list[1][0], + ( + 'Cancelled callback worker for message %s because %s', + 'message-1', + 'message lock expired after renewal failure: renew failed', + ), + ) + + @patch.dict(os.environ, {}, clear=True) + @patch('src.python_ms_core.core.topic.azure_topic.logger') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_context') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_all_start_methods', return_value=['fork', 'spawn']) + @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') + @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') + def test_handle_lock_renew_failure_does_not_cancel_when_lock_is_still_active( + self, + mock_service_bus_client, + mock_auto_lock_renewer, + mock_get_all_start_methods, + mock_get_context, + mock_logger, + ): + mock_client = MagicMock() + mock_config = MagicMock(connection_string='Endpoint=sb://test/') + mock_message = MagicMock() + mock_task = MagicMock() + + mock_service_bus_client.from_connection_string.return_value = mock_client + mock_client.get_topic_sender.return_value = MagicMock() + mock_get_context.return_value = MagicMock() + mock_message.message_id = 'message-1' + mock_message.locked_until_utc = '2026-03-17T09:39:28Z' + mock_message._lock_expired = False + mock_task.done.return_value = False + + topic = AzureTopic(config=mock_config, topic_name='mock-topic') + topic._track_inflight_task(mock_message, mock_task) + + topic._handle_lock_renew_failure(mock_message, RuntimeError('renew failed')) + + mock_task.cancel.assert_not_called() + mock_logger.error.assert_called_once_with( + 'Error renewing lock for message message-1: renew failed; ' + 'locked_until_utc=2026-03-17T09:39:28Z' + ) + + @patch.dict(os.environ, {}, clear=True) + @patch('src.python_ms_core.core.topic.azure_topic.logger') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_context') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_all_start_methods', return_value=['fork', 'spawn']) + @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') + @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') + def test_renew_inflight_message_locks_renews_before_expiration( + self, + mock_service_bus_client, + mock_auto_lock_renewer, + mock_get_all_start_methods, + mock_get_context, + mock_logger, + ): + now = datetime.datetime.now(datetime.timezone.utc) + renewed_until = now + datetime.timedelta(seconds=45) + mock_client = MagicMock() + mock_receiver = MagicMock() + mock_config = MagicMock(connection_string='Endpoint=sb://test/') + mock_message = MagicMock() + mock_task = MagicMock() + + mock_service_bus_client.from_connection_string.return_value = mock_client + mock_client.get_topic_sender.return_value = MagicMock() + mock_get_context.return_value = MagicMock() + mock_message.message_id = 'message-1' + mock_message._lock_expired = False + mock_message._received_timestamp_utc = now - datetime.timedelta(seconds=20) + mock_message.locked_until_utc = now + datetime.timedelta(seconds=10) + mock_task.done.return_value = False + mock_receiver.renew_message_lock.return_value = renewed_until + + topic = AzureTopic(config=mock_config, topic_name='mock-topic') + topic.receiver = mock_receiver + topic._track_inflight_task(mock_message, mock_task) + + topic._renew_inflight_message_locks() + + mock_receiver.renew_message_lock.assert_called_once_with(mock_message) + mock_logger.info.assert_called_once_with( + 'Renewed lock for message %s until %s', + 'message-1', + renewed_until, + ) + + @patch.dict(os.environ, {}, clear=True) + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_context') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_all_start_methods', return_value=['fork', 'spawn']) + @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') + @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') + def test_settle_task_releases_inflight_task( + self, + mock_service_bus_client, + mock_auto_lock_renewer, + mock_get_all_start_methods, + mock_get_context, + ): + mock_client = MagicMock() + mock_receiver = MagicMock() + mock_message = MagicMock() + mock_config = MagicMock(connection_string='Endpoint=sb://test/') + mock_task = MagicMock() + + mock_message._lock_expired = False + mock_service_bus_client.from_connection_string.return_value = mock_client + mock_client.get_topic_sender.return_value = MagicMock() + mock_client.get_subscription_receiver.return_value = mock_receiver + mock_get_context.return_value = MagicMock() + + topic = AzureTopic(config=mock_config, topic_name='mock-topic') + topic.receiver = mock_receiver + topic.internal_count = 1 + topic._track_inflight_task(mock_message, mock_task) + + topic._settle_task( + CompletedTask({'success': True, 'error': None}), + incoming_message=mock_message, + ) + + self.assertNotIn(topic._get_message_key(mock_message), topic.inflight_tasks) + @patch.dict(os.environ, {}, clear=True) @patch('src.python_ms_core.core.topic.azure_topic.logger') @patch('src.python_ms_core.core.topic.azure_topic.mp.get_context')