diff --git a/netboxlabs/diode/sdk/client.py b/netboxlabs/diode/sdk/client.py index 9ece94d..1334366 100644 --- a/netboxlabs/diode/sdk/client.py +++ b/netboxlabs/diode/sdk/client.py @@ -50,6 +50,12 @@ _INGEST_SCOPE = "diode:ingest" _LOGGER = logging.getLogger(__name__) _MAX_RETRIES_ENVVAR_NAME = "DIODE_MAX_AUTH_RETRIES" +# server policy (MinTime 10s so client pings must be >= 10s, e.g. 30s interval). +_GRPC_KEEPALIVE_TIME_MS = 30_000 +_GRPC_KEEPALIVE_TIMEOUT_MS = 10_000 +_GRPC_KEEPALIVE_PERMIT_WITHOUT_CALLS = 1 +# 0 = no cap on keepalive pings without data (matches reconciler Python client). +_GRPC_HTTP2_MAX_PINGS_WITHOUT_DATA = 0 def load_dryrun_entities(file_path: str | Path) -> Iterable[Entity]: @@ -138,6 +144,32 @@ def _get_optional_config_value( return value +def _otlp_grpc_channel_options(primary_user_agent_value: str) -> list[tuple[str, Any]]: + """ + Build gRPC channel argument list for generic OTLP collectors (user-agent only). + + Avoid aggressive HTTP/2 keepalive here: many OTLP backends enforce strict ping + limits and may GOAWAY idle exporters when permit-without-stream or unlimited + pings are enabled. + """ + return [ + ("grpc.primary_user_agent", primary_user_agent_value), + ] + + +def _diode_ingest_grpc_channel_options(primary_user_agent_value: str) -> list[tuple[str, Any]]: + """Build gRPC channel argument list for the Diode ingester API (with keepalive).""" + return _otlp_grpc_channel_options(primary_user_agent_value) + [ + ("grpc.keepalive_time_ms", _GRPC_KEEPALIVE_TIME_MS), + ("grpc.keepalive_timeout_ms", _GRPC_KEEPALIVE_TIMEOUT_MS), + ( + "grpc.keepalive_permit_without_calls", + _GRPC_KEEPALIVE_PERMIT_WITHOUT_CALLS, + ), + ("grpc.http2.max_pings_without_data", _GRPC_HTTP2_MAX_PINGS_WITHOUT_DATA), + ] + + def _get_proxy_env_var(var_name: str) -> str | None: """Get proxy environment variable (case-insensitive).""" value = os.getenv(var_name.upper()) @@ -335,12 +367,9 @@ def __init__( self._authenticate(_INGEST_SCOPE) - channel_opts = [ - ( - "grpc.primary_user_agent", - f"{self._name}/{self._version} {self._app_name}/{self._app_version}", - ), - ] + channel_opts = _diode_ingest_grpc_channel_options( + f"{self._name}/{self._version} {self._app_name}/{self._app_version}" + ) proxy_url = _get_grpc_proxy_url(self._target, self._tls_verify) if proxy_url: @@ -631,12 +660,9 @@ def __init__( else None ) - channel_opts = [ - ( - "grpc.primary_user_agent", - f"{self._name}/{self._version} {self._app_name}/{self._app_version}", - ), - ] + channel_opts = _otlp_grpc_channel_options( + f"{self._name}/{self._version} {self._app_name}/{self._app_version}" + ) proxy_url = _get_grpc_proxy_url(self._target, self._tls_verify) if proxy_url: diff --git a/tests/test_client.py b/tests/test_client.py index 1f60717..622711c 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -18,9 +18,11 @@ DiodeMethodClientInterceptor, DiodeOTLPClient, _ClientCallDetails, + _diode_ingest_grpc_channel_options, _DiodeAuthentication, _get_sentry_dsn, _load_certs, + _otlp_grpc_channel_options, load_dryrun_entities, parse_target, ) @@ -268,11 +270,10 @@ def test_insecure_channel_options_with_primary_user_agent(mock_diode_authenticat mock_insecure_channel.assert_called_once() _, kwargs = mock_insecure_channel.call_args - assert kwargs["options"] == ( - ( - "grpc.primary_user_agent", - f"{client.name}/{client.version} {client.app_name}/{client.app_version}", - ), + assert kwargs["options"] == tuple( + _diode_ingest_grpc_channel_options( + f"{client.name}/{client.version} {client.app_name}/{client.app_version}" + ) ) @@ -289,11 +290,10 @@ def test_secure_channel_options_with_primary_user_agent(mock_diode_authenticatio mock_secure_channel.assert_called_once() _, kwargs = mock_secure_channel.call_args - assert kwargs["options"] == ( - ( - "grpc.primary_user_agent", - f"{client.name}/{client.version} {client.app_name}/{client.app_version}", - ), + assert kwargs["options"] == tuple( + _diode_ingest_grpc_channel_options( + f"{client.name}/{client.version} {client.app_name}/{client.app_version}" + ) ) @@ -884,6 +884,30 @@ def test_otlp_client_grpcs_uses_secure_channel(): base_channel.close.assert_called_once() +def test_otlp_insecure_channel_options_exclude_diode_keepalive(): + """OTLP targets arbitrary collectors; only user-agent is forced (Codex/OBS-2873).""" + with ( + patch("netboxlabs.diode.sdk.client.grpc.insecure_channel") as mock_insecure, + patch("netboxlabs.diode.sdk.client.logs_service_pb2_grpc.LogsServiceStub"), + ): + client = DiodeOTLPClient( + target="grpc://collector:4317", + app_name="orb-producer", + app_version="1.2.3", + ) + + mock_insecure.assert_called_once() + _, kwargs = mock_insecure.call_args + ua = ( + f"{client.name}/{client.version} " + f"{client.app_name}/{client.app_version}" + ) + assert kwargs["options"] == tuple(_otlp_grpc_channel_options(ua)) + assert all( + opt[0] != "grpc.keepalive_time_ms" for opt in kwargs["options"] + ) + + def test_diode_authentication_with_custom_certificates(): """Test _DiodeAuthentication with custom certificates - covers SSL context creation.""" # Create test certificate content