-
Notifications
You must be signed in to change notification settings - Fork 91
ENG-3925: Add Celery on_failure handler for worker-level DSR task deaths #8252
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| type: Changed | ||
| description: Added Celery on_failure handler to log worker-level DSR task deaths (OOM, hard timeout, broker disconnect) with error execution logs | ||
| pr: 8252 | ||
| labels: [] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -58,6 +58,61 @@ class DatabaseTask(Task): # pylint: disable=W0223 | |
| _task_engine = None | ||
| _sessionmaker = None | ||
|
|
||
| def on_failure( | ||
| self, exc: BaseException, task_id: str, args: tuple, kwargs: dict, einfo: Any | ||
| ) -> None: | ||
| """Log an execution log when a privacy request task fails at the worker level. | ||
|
|
||
| Catches failures that bypass the task's own exception handling: hard time | ||
| limit exceeded, worker killed, broker disconnect, etc. Skips if the | ||
| in-task BaseException catch-all already handled it (status already error). | ||
| Only applies to tasks with a privacy_request_id kwarg; other tasks are ignored. | ||
| """ | ||
| privacy_request_id = kwargs.get("privacy_request_id") | ||
| if not privacy_request_id: | ||
| return | ||
|
|
||
| try: | ||
| session = self.get_new_session() | ||
| try: | ||
| from fides.api.models.privacy_request import PrivacyRequest | ||
| from fides.api.schemas.privacy_request import PrivacyRequestStatus | ||
|
Comment on lines
+78
to
+79
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Inline to avoid circular deps 😢 |
||
|
|
||
| privacy_request = ( | ||
| session.query(PrivacyRequest) | ||
| .filter(PrivacyRequest.id == privacy_request_id) | ||
| .first() | ||
| ) | ||
| if not privacy_request: | ||
| return | ||
|
|
||
| if privacy_request.status == PrivacyRequestStatus.error: | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These are already handled by the in-task handler |
||
| return | ||
|
|
||
| logger.error( | ||
| "Privacy request '{}' failed at worker level: {}", | ||
| privacy_request_id, | ||
| str(exc), | ||
| ) | ||
| privacy_request.add_error_execution_log( | ||
| session, | ||
| connection_key=None, | ||
| dataset_name="Worker task failure", | ||
| collection_name=None, | ||
| message=f"Task failed at worker level: {type(exc).__name__}: {exc}", | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some example scenarios with corresponding err message: OOM / Hard time limit (Celery kills the worker):
Broker disconnect:
DB connection lost mid-task (if it escapes the catch-all):
Memory watchdog (if enabled):
|
||
| action_type=privacy_request.policy.get_action_type(), # type: ignore[arg-type] | ||
| ) | ||
| privacy_request.error_processing(db=session) | ||
| session.commit() | ||
| finally: | ||
| session.close() | ||
| except Exception: # pylint: disable=broad-except | ||
| logger.error( | ||
| "Failed to log worker-level failure for privacy request '{}': {}", | ||
| privacy_request_id, | ||
| str(exc), | ||
| ) | ||
|
|
||
| # This retry will attempt to connect 5 times with an exponential backoff (2, 4, 8, 16 seconds between each attempt). | ||
| # The original error will be re-raised if the retries are successful. All attempts are shown in the logs. | ||
| @retry( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not going to help if the worker process is killed by the OS, only "softer" failures where Celery is still alive