Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import collections
import functools
import inspect
import itertools
import logging
import threading
Expand Down Expand Up @@ -62,14 +63,22 @@
_REGULAR_SHUTDOWN_THREAD_NAME = "Thread-RegularStreamShutdown"
_RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated"
_RETRYABLE_STREAM_ERRORS = (
exceptions.Aborted,
exceptions.DeadlineExceeded,
exceptions.ServiceUnavailable,
exceptions.GatewayTimeout,
exceptions.InternalServerError,
exceptions.ResourceExhausted,
exceptions.ServiceUnavailable,
exceptions.Unknown,
exceptions.GatewayTimeout,
exceptions.Aborted,
)
_TERMINATING_STREAM_ERRORS = (exceptions.Cancelled,)
_TERMINATING_STREAM_ERRORS = (
exceptions.Cancelled,
exceptions.InvalidArgument,
exceptions.NotFound,
exceptions.PermissionDenied,
exceptions.Unauthenticated,
exceptions.Unauthorized,
)
_MAX_LOAD = 1.0
"""The load threshold above which to pause the incoming message stream."""

Expand Down Expand Up @@ -98,6 +107,13 @@
code_pb2.UNAVAILABLE,
}

# `on_fatal_exception` was added in `google-api-core v2.25.1``, which allows us to inform
# callers on unrecoverable errors. We can only pass this arg if it's available in the
# `BackgroundConsumer` spec.
_SHOULD_USE_ON_FATAL_ERROR_CALLBACK = "on_fatal_exception" in inspect.getfullargspec(
Comment thread
abbrowne126 marked this conversation as resolved.
bidi.BackgroundConsumer
)


def _wrap_as_exception(maybe_exception: Any) -> BaseException:
"""Wrap an object as a Python exception, if needed.
Expand Down Expand Up @@ -876,7 +892,18 @@ def open(
assert self._scheduler is not None
scheduler_queue = self._scheduler.queue
self._dispatcher = dispatcher.Dispatcher(self, scheduler_queue)
self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response)

# `on_fatal_exception` is only available in more recent library versions.
# For backwards compatibility reasons, we only pass it when `google-api-core` supports it.
if _SHOULD_USE_ON_FATAL_ERROR_CALLBACK:
self._consumer = bidi.BackgroundConsumer(
self._rpc,
self._on_response,
on_fatal_exception=self._on_fatal_exception,
)
else:
self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response)

self._leaser = leaser.Leaser(self)
self._heartbeater = heartbeater.Heartbeater(self)

Expand Down Expand Up @@ -1247,6 +1274,17 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:

self.maybe_pause_consumer()

def _on_fatal_exception(self, exception: BaseException) -> None:
"""
Called whenever `self.consumer` receives a non-retryable exception.
We close the manager on such non-retryable cases.
"""
_LOGGER.exception(
"Streaming pull terminating after receiving non-recoverable error: %s",
exception,
)
self.close(exception)

def _should_recover(self, exception: BaseException) -> bool:
"""Determine if an error on the RPC stream should be recovered.

Expand Down Expand Up @@ -1283,8 +1321,10 @@ def _should_terminate(self, exception: BaseException) -> bool:
in a list of terminating exceptions.
"""
exception = _wrap_as_exception(exception)
if isinstance(exception, _TERMINATING_STREAM_ERRORS):
_LOGGER.debug("Observed terminating stream error %s", exception)
is_api_error = isinstance(exception, exceptions.GoogleAPICallError)
# Terminate any non-API errors, or non-retryable errors (permission denied, unauthorized, etc.)
if not is_api_error or isinstance(exception, _TERMINATING_STREAM_ERRORS):
_LOGGER.error("Observed terminating stream error %s", exception)
return True
_LOGGER.debug("Observed non-terminating stream error %s", exception)
return False
Expand Down
53 changes: 45 additions & 8 deletions tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1333,7 +1333,13 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi
leaser.return_value.start.assert_called_once()
assert manager.leaser == leaser.return_value

background_consumer.assert_called_once_with(manager._rpc, manager._on_response)
if streaming_pull_manager._SHOULD_USE_ON_FATAL_ERROR_CALLBACK:
background_consumer.assert_called_once_with(
manager._rpc, manager._on_response, manager._on_fatal_exception
)
else:
background_consumer.assert_called_once_with(manager._rpc, manager._on_response)

background_consumer.return_value.start.assert_called_once()
assert manager._consumer == background_consumer.return_value

Expand Down Expand Up @@ -1432,6 +1438,31 @@ def test_close():
assert manager.is_active is False


def test_closes_on_fatal_consumer_error():
(
manager,
consumer,
dispatcher,
leaser,
heartbeater,
scheduler,
) = make_running_manager()

if streaming_pull_manager._SHOULD_USE_ON_FATAL_ERROR_CALLBACK:
error = ValueError("some fatal exception")
manager._on_fatal_exception(error)

await_manager_shutdown(manager, timeout=3)

consumer.stop.assert_called_once()
leaser.stop.assert_called_once()
dispatcher.stop.assert_called_once()
heartbeater.stop.assert_called_once()
scheduler.shutdown.assert_called_once()

assert manager.is_active is False


def test_close_inactive_consumer():
(
manager,
Expand Down Expand Up @@ -2270,18 +2301,24 @@ def test__should_recover_false():
def test__should_terminate_true():
manager = make_manager()

details = "Cancelled. Go away, before I taunt you a second time."
exc = exceptions.Cancelled(details)

assert manager._should_terminate(exc) is True
for exc in [
exceptions.Cancelled(""),
exceptions.PermissionDenied(""),
TypeError(),
ValueError(),
]:
assert manager._should_terminate(exc)


def test__should_terminate_false():
manager = make_manager()

exc = TypeError("wahhhhhh")

assert manager._should_terminate(exc) is False
for exc in [
exceptions.ResourceExhausted(""),
exceptions.ServiceUnavailable(""),
exceptions.DeadlineExceeded(""),
]:
assert not manager._should_terminate(exc)


@mock.patch("threading.Thread", autospec=True)
Expand Down