Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions .github/workflows/unit_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ name: Unit Tests
on:
workflow_dispatch:
push:
branches-ignore:
- '**'
branches: [ develop, main ]
pull_request:
branches: [ main, develop ]

permissions:
contents: write

jobs:
UnitTest:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -59,6 +61,36 @@ jobs:
run: |
coverage report --fail-under=85

- name: Generate coverage badge
run: |
pip install coverage-badge
mkdir -p badge-out
coverage-badge -f -o badge-out/coverage.svg

- name: Publish coverage badge to badges branch
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
set -e
cp badge-out/coverage.svg /tmp/coverage.svg
git config user.name "github-actions[bot]"
git config user.email "41898282+github-actions[bot]@users.noreply.github.com"
git fetch origin badges || true
if git show-ref --verify --quiet refs/remotes/origin/badges; then
git checkout badges
else
git checkout --orphan badges
git rm -rf . >/dev/null 2>&1 || true
fi
cp /tmp/coverage.svg coverage.svg
git add coverage.svg
if ! git diff --cached --quiet; then
git commit -m "chore: update coverage badge"
git push "https://x-access-token:${GITHUB_TOKEN}@github.com/${GITHUB_REPOSITORY}.git" HEAD:badges
else
echo "No badge changes to commit."
fi

- name: Upload report to Azure
uses: LanceMcCarthy/Action-AzureBlobUpload@v2
with:
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Change log

# Version 0.0.26
### Fixes
- **Configurable Process Callback Fallback:** Process-mode topic callbacks keep the existing thread fallback by default for backward compatibility, but services can set `TOPIC_CALLBACK_PROCESS_FALLBACK_MODE=error` to prevent long-running CPU-bound work from silently falling back into the receiver/lock-renewal process.
- **Lock Renewal Registration Timing:** Registers received messages with `AutoLockRenewer` before starting callback execution so renewal coverage begins before worker startup.
- **Parent-Side Proactive Lock Renewal:** The receiver loop now proactively renews locks for in-flight messages before expiry, independent of callback execution, so long-running jobs do not rely only on SDK background renewal threads.
- **Lock-Loss Worker Cancellation:** Cancels tracked callback workers only after the message lock is actually expired, preventing a long-running process worker from continuing after the message can become visible again.

# Version 0.0.25
### Fixes
- **Azure Topic Settlement Stability:** Moved Azure Service Bus message settlement back onto the receiver-owning loop instead of settling from worker callback threads. This keeps receive and complete/abandon operations on the same receiver flow for long-running jobs.
Expand Down
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Core package for microservice

[![python-ms-core](https://img.shields.io/pypi/v/python-ms-core?label=python-ms-core&cacheSeconds=60&t=1)](https://pypi.org/project/python-ms-core/)
[![Unit Tests](https://github.com/TaskarCenterAtUW/TDEI-Python-ms-core/actions/workflows/unit_tests.yml/badge.svg)](https://github.com/TaskarCenterAtUW/TDEI-Python-ms-core/actions/workflows/unit_tests.yml)
![Coverage](https://raw.githubusercontent.com/TaskarCenterAtUW/TDEI-Python-ms-core/actions/badges/coverage.svg)

## System requirements
| Software | Version |
Expand Down Expand Up @@ -510,4 +513,4 @@ test_subscribe_with_subscription (test_topic.test_topic.TestTopic) ... ok
Ran 174 tests in 8.175s

OK
```
```
207 changes: 201 additions & 6 deletions src/python_ms_core/core/topic/azure_topic.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import json
import logging
import multiprocessing as mp
Expand Down Expand Up @@ -48,6 +49,7 @@ def __init__(self, config: TopicConfig=None, topic_name=None, max_concurrent_mes
self.publisher = self.client.get_topic_sender(topic_name=topic_name)
self.executor = ThreadPoolExecutor(max_workers=max_concurrent_messages)
self.callback_execution_mode = self._get_callback_execution_mode()
self.callback_process_fallback_mode = self._get_process_fallback_mode()
self.callback_process_start_method = self._get_process_start_method()
self.process_context = self._get_process_context()
self.internal_count = 0
Expand All @@ -65,6 +67,9 @@ def __init__(self, config: TopicConfig=None, topic_name=None, max_concurrent_mes
self.wait_time_for_message = 5
self.thread_lock = threading.Lock()
self.pending_tasks = []
self.inflight_tasks = {}
self.inflight_lock_renewal_state = {}
self.lock_renew_retry_interval = 5


def publish(self, data: QueueMessage):
Expand All @@ -89,6 +94,7 @@ def subscribe(self, subscription: str, callback, max_receivable_messages=-1):
self.receiver.local_received_messages = 0
while True:
try:
self._renew_inflight_message_locks()
self._settle_completed_tasks()
to_receive = self._get_receivable_count(max_receivable_messages=max_receivable_messages)
if max_receivable_messages > 0 and self.receiver.local_received_messages >= max_receivable_messages:
Expand All @@ -106,13 +112,14 @@ def subscribe(self, subscription: str, callback, max_receivable_messages=-1):
with self.thread_lock:
self.internal_count += len(messages)
for message in messages:
execution_task = self._submit_processing_task(message, callback)
self.lock_renewal.register(
self.receiver,
message,
max_lock_renewal_duration=self.max_renewal_duration,
on_lock_renew_failure=self._handle_lock_renew_failure,
)
execution_task = self._submit_processing_task(message, callback)
self._track_inflight_task(message, execution_task)
self.pending_tasks.append((execution_task, message))
else:
if len(self.pending_tasks) > 0:
Expand Down Expand Up @@ -152,11 +159,23 @@ def _submit_processing_task(self, message, callback):
try:
return self._submit_process_task(message_payload, callback)
except Exception as exc:
logger.warning(
'Falling back to thread execution for message %s because process start failed: %s',
self._get_message_id(message),
message_id = self._get_message_id(message)
if self.callback_process_fallback_mode == 'thread':
logger.warning(
'Falling back to thread execution for message %s because process start failed: %s',
message_id,
exc,
)
return self._submit_thread_task(message_payload, callback)
logger.error(
'Process execution failed for message %s and thread fallback is disabled: %s',
message_id,
exc,
)
return _ImmediateExecutionTask({
'success': False,
'error': f'Process execution failed and thread fallback is disabled: {exc}',
})
return self._submit_thread_task(message_payload, callback)

def _submit_thread_task(self, message_payload, callback):
Expand Down Expand Up @@ -197,9 +216,11 @@ def _wait_for_pending_tasks(self, timeout=0.5):
return
deadline = time.time() + timeout
while time.time() < deadline:
self._renew_inflight_message_locks()
if any(task.done() for task, _ in self.pending_tasks):
break
time.sleep(min(0.1, max(deadline - time.time(), 0)))
self._renew_inflight_message_locks()
self._settle_completed_tasks()

def _settle_completed_tasks(self):
Expand Down Expand Up @@ -247,6 +268,8 @@ def _settle_task(self, x, incoming_message=None):
except Exception as e:
logger.error(f'Error in settling message: {e}')
finally:
if incoming_message is not None:
self._release_inflight_task(incoming_message)
with self.thread_lock:
self.internal_count = max(self.internal_count - 1, 0)
return
Expand All @@ -258,11 +281,130 @@ def _handle_lock_renew_failure(self, renewable, error):
f'Error renewing lock for message {message_id}: {failure_reason}; '
f'locked_until_utc={getattr(renewable, "locked_until_utc", None)}'
)
if getattr(renewable, '_lock_expired', False):
self._cancel_inflight_task(
renewable,
f'message lock expired after renewal failure: {failure_reason}',
)

def _track_inflight_task(self, message, task):
with self.thread_lock:
message_key = self._get_message_key(message)
self.inflight_tasks[message_key] = task
self.inflight_lock_renewal_state[message_key] = {
'message': message,
'renew_margin': self._get_lock_renewal_margin_seconds(message),
'last_renew_attempt': 0,
}

def _release_inflight_task(self, message):
with self.thread_lock:
message_key = self._get_message_key(message)
self.inflight_tasks.pop(message_key, None)
self.inflight_lock_renewal_state.pop(message_key, None)

def _renew_inflight_message_locks(self):
with self.thread_lock:
renewal_items = [
(
message_key,
state['message'],
self.inflight_tasks.get(message_key),
state['renew_margin'],
state['last_renew_attempt'],
)
for message_key, state in self.inflight_lock_renewal_state.items()
]

for message_key, message, task, renew_margin, last_renew_attempt in renewal_items:
if task is None or task.done():
continue
if getattr(message, '_lock_expired', False):
self._handle_lock_renew_failure(message, getattr(message, 'auto_renew_error', None))
continue

locked_until_utc = getattr(message, 'locked_until_utc', None)
if locked_until_utc is None:
continue

remaining_seconds = (locked_until_utc - self._utc_now_for(locked_until_utc)).total_seconds()
if remaining_seconds > renew_margin:
continue

current_time = time.time()
if current_time - last_renew_attempt < self.lock_renew_retry_interval:
continue

with self.thread_lock:
state = self.inflight_lock_renewal_state.get(message_key)
if state is None:
continue
state['last_renew_attempt'] = current_time

try:
renewed_until = self.receiver.renew_message_lock(message)
logger.info(
'Renewed lock for message %s until %s',
self._get_message_id(message),
renewed_until,
)
except Exception as exc:
message.auto_renew_error = exc
logger.error(
'Error proactively renewing lock for message %s: %s; locked_until_utc=%s',
self._get_message_id(message),
exc,
getattr(message, 'locked_until_utc', None),
)
if getattr(message, '_lock_expired', False):
self._handle_lock_renew_failure(message, exc)

def _get_lock_renewal_margin_seconds(self, message):
locked_until_utc = getattr(message, 'locked_until_utc', None)
received_timestamp_utc = getattr(message, '_received_timestamp_utc', None)
if (
isinstance(locked_until_utc, datetime.datetime)
and isinstance(received_timestamp_utc, datetime.datetime)
):
lock_duration_seconds = (locked_until_utc - received_timestamp_utc).total_seconds()
if lock_duration_seconds > 0:
return max(5, min(self.lock_renewal_margin, lock_duration_seconds * 0.5))
return min(self.lock_renewal_margin, 30)

def _cancel_inflight_task(self, message, reason):
with self.thread_lock:
task = self.inflight_tasks.get(self._get_message_key(message))
if task is None or task.done():
return
message_id = self._get_message_id(message)
if task.cancel(reason):
logger.error(
'Cancelled callback worker for message %s because %s',
message_id,
reason,
)
else:
logger.error(
'Unable to cancel callback worker for message %s after %s',
message_id,
reason,
)

@staticmethod
def _get_message_id(message):
return getattr(message, 'message_id', None) or getattr(message, 'messageId', 'unknown')

@staticmethod
def _get_message_key(message):
return id(message)

@staticmethod
def _utc_now_for(value):
now = datetime.datetime.now(datetime.timezone.utc)
if getattr(value, 'tzinfo', None) is None:
return now.replace(tzinfo=None)
return now

@staticmethod
def _get_callback_execution_mode():
value = os.environ.get('TOPIC_CALLBACK_EXECUTION_MODE', 'process')
Expand All @@ -275,6 +417,18 @@ def _get_callback_execution_mode():
)
return 'process'

@staticmethod
def _get_process_fallback_mode():
value = os.environ.get('TOPIC_CALLBACK_PROCESS_FALLBACK_MODE', 'thread')
normalized = str(value).strip().lower()
if normalized in ('error', 'thread'):
return normalized
logger.warning(
'Invalid value for TOPIC_CALLBACK_PROCESS_FALLBACK_MODE: %s. Using thread.',
value,
)
return 'thread'

@staticmethod
def _get_process_start_method():
available_methods = mp.get_all_start_methods()
Expand Down Expand Up @@ -318,6 +472,20 @@ def _run_callback_in_subprocess(message_payload, callbackfn, result_connection):
result_connection.close()


class _ImmediateExecutionTask:
def __init__(self, result):
self._result = result

def done(self):
return True

def result(self):
return self._result

def cancel(self, reason):
return False


class _FutureExecutionTask:
def __init__(self, future):
self._future = future
Expand All @@ -328,15 +496,19 @@ def done(self):
def result(self):
return self._future.result()

def cancel(self, reason):
return self._future.cancel()


class _ProcessExecutionTask:
def __init__(self, process, result_connection):
self._process = process
self._result_connection = result_connection
self._result = None
self._connection_closed = False

def done(self):
return not self._process.is_alive()
return self._result is not None or not self._process.is_alive()

def result(self):
if self._result is not None:
Expand All @@ -352,6 +524,29 @@ def result(self):
'error': f'Callback worker exited with code {self._process.exitcode} without returning a result.',
}
finally:
self._result_connection.close()
self._close_result_connection()

return self._result

def cancel(self, reason):
if self.done():
return False

self._process.terminate()
self._process.join(timeout=5)
if self._process.is_alive() and hasattr(self._process, 'kill'):
self._process.kill()
self._process.join(timeout=5)

self._result = {
'success': False,
'error': f'Callback worker terminated because {reason}.',
}
self._close_result_connection()
return True

def _close_result_connection(self):
if self._connection_closed:
return
self._connection_closed = True
self._result_connection.close()
Loading
Loading