From cb77fc163630bf2a48aec6ab4dfb2aa6012b2813 Mon Sep 17 00:00:00 2001 From: Taylor Jones Date: Wed, 6 May 2026 14:24:09 -0500 Subject: [PATCH] python(feat): Updates for new sift-stream-bindings API --- python/CHANGELOG.md | 242 ++++++++++++++++++ .../_internal/low_level_wrappers/ingestion.py | 75 ++++-- python/lib/sift_client/resources/__init__.py | 3 +- python/lib/sift_client/resources/ingestion.py | 189 ++++---------- python/pyproject.toml | 15 +- .../sift_stream_bindings.pyi | 4 +- .../src/stream/builder.rs | 8 +- 7 files changed, 356 insertions(+), 180 deletions(-) diff --git a/python/CHANGELOG.md b/python/CHANGELOG.md index eeceefe42..93734f331 100644 --- a/python/CHANGELOG.md +++ b/python/CHANGELOG.md @@ -3,6 +3,248 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). +## [v0.15.0] - May 7, 2026 + +### What's New + +v0.15.0 updates the streaming ingestion client to match the new `sift-stream-bindings` 0.3.0 +API. The `RecoveryStrategyConfig` class and `recovery_strategy` parameter have been replaced +with an explicit `StreamingMode` enum and discrete per-mode configuration kwargs. The +non-blocking send method has been renamed for consistency with the Rust library, and a new +`try_send` method is available for single-flow non-blocking sends. This release contains +**breaking changes** to the ingestion API — see below for details and a migration prompt. + +#### Breaking Changes + +##### 1. `RecoveryStrategyConfig` Removed — Replaced by `StreamingMode` + Per-Mode Kwargs + +`RecoveryStrategyConfig` and the `recovery_strategy` parameter on `IngestionConfigStreamingClient` +and `IngestionAPIAsync` have been removed. Transport mode is now selected via a `StreamingMode` +enum, with separate `retry_policy` and `disk_backup_policy` kwargs for per-mode configuration. + +The default mode is `StreamingMode.LIVE_WITH_BACKUPS`, which matches the previous default +behavior of `RecoveryStrategyConfig.retry_with_backups()`. + +**Removed from the public API:** +- `RecoveryStrategyConfig` class (and its `retry_only()` / `retry_with_backups()` factory methods) +- `recovery_strategy` parameter on `IngestionConfigStreamingClient` and `IngestionAPIAsync` + +**Added:** +- `StreamingMode` enum — `LIVE_ONLY`, `LIVE_WITH_BACKUPS`, `FILE_BACKUP` +- `streaming_mode` parameter (default: `StreamingMode.LIVE_WITH_BACKUPS`) +- `retry_policy` parameter — applies to `LIVE_WITH_BACKUPS` mode +- `disk_backup_policy` parameter — applies to `LIVE_WITH_BACKUPS` and `FILE_BACKUP` modes +- `checkpoint_interval_seconds` parameter — applies to `LIVE_WITH_BACKUPS` mode + +**Before:** +```python +from sift_client.resources.ingestion import ( + IngestionConfigStreamingClient, + RecoveryStrategyConfig, +) + +# Default: live streaming with backups +client = await IngestionConfigStreamingClient.create( + ingestion_config=my_config, + recovery_strategy=RecoveryStrategyConfig.retry_with_backups(), +) + +# Retry only (no disk backups) +client = await IngestionConfigStreamingClient.create( + ingestion_config=my_config, + recovery_strategy=RecoveryStrategyConfig.retry_only(), +) +``` + +**After:** +```python +from sift_client.resources.ingestion import ( + IngestionConfigStreamingClient, + StreamingMode, +) + +# Default: live streaming with backups (no change needed if you were using the default) +client = await IngestionConfigStreamingClient.create( + ingestion_config=my_config, + streaming_mode=StreamingMode.LIVE_WITH_BACKUPS, +) + +# Live only — no disk backups, lowest overhead +client = await IngestionConfigStreamingClient.create( + ingestion_config=my_config, + streaming_mode=StreamingMode.LIVE_ONLY, +) + +# File backup only +client = await IngestionConfigStreamingClient.create( + ingestion_config=my_config, + streaming_mode=StreamingMode.FILE_BACKUP, +) +``` + +To pass a custom retry or disk backup policy: +```python +from sift_stream_bindings import DiskBackupPolicyPy, RetryPolicyPy + +client = await IngestionConfigStreamingClient.create( + ingestion_config=my_config, + streaming_mode=StreamingMode.LIVE_WITH_BACKUPS, + retry_policy=RetryPolicyPy.default(), + disk_backup_policy=DiskBackupPolicyPy.default(), + checkpoint_interval_seconds=30, +) +``` + +##### 2. `send_requests_nonblocking` Renamed to `try_send_requests` + +`IngestionConfigStreamingClient.send_requests_nonblocking` has been renamed to `try_send_requests` +to align with the Rust `sift-stream` naming convention where `try_` methods return immediately +without awaiting channel capacity. + +**Before:** +```python +client.send_requests_nonblocking(requests) +``` + +**After:** +```python +client.try_send_requests(requests) +``` + +#### New Features + +##### `try_send` — Non-Blocking Single-Flow Send + +A new `try_send(flow)` method is available on `IngestionConfigStreamingClient` for non-blocking +single-flow sends. It accepts either a `Flow` or a raw `FlowPy` object. + +```python +client.try_send(flow) +``` + +Use `try_send` in real-time loops where blocking on channel capacity is unacceptable. For +most use cases, the async `send(flow)` method (which applies backpressure) is preferred. + +##### `sift-stream-bindings` 0.3.0 + +The `sift-stream-bindings` dependency has been bumped to 0.3.0, which reflects the +`sift-stream` 0.9.0 breaking API changes (stepped builder, send rename, removed types). + +#### AI-Assisted Migration Prompt (v0.14.x → v0.15.0) + +Copy and paste the following prompt to an AI coding agent to automate the upgrade: + +``` +You are upgrading a Python project from sift_client v0.14.x to v0.15.0. The streaming ingestion +API has breaking changes. Apply ALL of the following changes precisely. Do not make any other +modifications. + +--- + +## 1. Update `pyproject.toml` + +Find every occurrence of `sift-stream-bindings==0.2.2` and replace it with +`sift-stream-bindings==0.3.0`. This may appear under multiple dependency groups (e.g. `all`, +`dev-all`, `sift-stream`, `sift-stream-bindings`). + +--- + +## 2. Remove all imports of `RecoveryStrategyConfig` + +Delete any line that imports `RecoveryStrategyConfig`, for example: + + from sift_client.resources.ingestion import RecoveryStrategyConfig + from sift_client.resources.ingestion import IngestionConfigStreamingClient, RecoveryStrategyConfig + +Remove only `RecoveryStrategyConfig` from those imports; keep any other names on the same line. + +--- + +## 3. Add `StreamingMode` to imports where needed + +Wherever `IngestionConfigStreamingClient` or `IngestionAPIAsync` is imported and a streaming +mode needs to be specified, add `StreamingMode` to the import: + + from sift_client.resources.ingestion import IngestionConfigStreamingClient, StreamingMode + +--- + +## 4. Replace all `recovery_strategy` call sites + +Search for every call to `IngestionConfigStreamingClient.create(...)` and +`IngestionAPIAsync.create(...)` that contains a `recovery_strategy` keyword argument. + +### Case A — `RecoveryStrategyConfig.retry_with_backups()` (or no recovery_strategy at all) + +This was (and remains) the default. Replace the kwarg: + + # BEFORE + recovery_strategy=RecoveryStrategyConfig.retry_with_backups() + + # AFTER + streaming_mode=StreamingMode.LIVE_WITH_BACKUPS + +If the call had no `recovery_strategy` argument, no change is needed — `LIVE_WITH_BACKUPS` +is the default. + +If the old call passed explicit `retry_policy` or `disk_backup_policy` arguments inside +`RecoveryStrategyConfig.retry_with_backups(...)`, move them to top-level kwargs: + + # BEFORE + recovery_strategy=RecoveryStrategyConfig.retry_with_backups( + retry_policy=my_retry_policy, + disk_backup_policy=my_disk_policy, + ) + + # AFTER + streaming_mode=StreamingMode.LIVE_WITH_BACKUPS, + retry_policy=my_retry_policy, + disk_backup_policy=my_disk_policy, + +### Case B — `RecoveryStrategyConfig.retry_only()` + +Replace with `streaming_mode=StreamingMode.LIVE_ONLY`. If a `retry_policy` was passed, +keep it as a top-level kwarg (it is ignored for `LIVE_ONLY` in this version, but preserving +it avoids a TypeError): + + # BEFORE + recovery_strategy=RecoveryStrategyConfig.retry_only(retry_policy=my_policy) + + # AFTER + streaming_mode=StreamingMode.LIVE_ONLY + +### Case C — Raw `RecoveryStrategyPy` object passed directly + +If any call passes a raw `RecoveryStrategyPy` instance as `recovery_strategy`, determine +which mode it was configured for and replace accordingly: +- `RecoveryStrategyPy.retry_only(...)` → `streaming_mode=StreamingMode.LIVE_ONLY` +- `RecoveryStrategyPy.retry_with_backups(...)` → `streaming_mode=StreamingMode.LIVE_WITH_BACKUPS` + with `retry_policy` and `disk_backup_policy` promoted to top-level kwargs. + +--- + +## 5. Rename `send_requests_nonblocking` → `try_send_requests` + +Find every call to `.send_requests_nonblocking(...)` on any ingestion client instance and +rename it to `.try_send_requests(...)`. The signature is unchanged. + + # BEFORE + client.send_requests_nonblocking(requests) + + # AFTER + client.try_send_requests(requests) + +--- + +## 6. Verify + +After applying the above changes: +1. Run `grep -r "RecoveryStrategyConfig" .` — expect zero results. +2. Run `grep -r "send_requests_nonblocking" .` — expect zero results. +3. Run `grep -r "recovery_strategy" .` — expect zero results. +4. Run your test suite to confirm no remaining references. +``` + ## [v0.14.1] - April 30, 2026 ### Bugfixes diff --git a/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py b/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py index ad248837f..6a68bd547 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py @@ -39,7 +39,7 @@ from datetime import datetime from sift_stream_bindings import ( - DurationPy, + DiskBackupPolicyPy, FlowConfigPy, FlowDescriptorPy, FlowPy, @@ -47,7 +47,7 @@ IngestWithConfigDataStreamRequestPy, IngestWithConfigDataStreamRequestWrapperPy, MetadataPy, - RecoveryStrategyPy, + RetryPolicyPy, RunFormPy, RunSelectorPy, SiftStreamBuilderPy, @@ -56,7 +56,7 @@ TimeValuePy, ) - from sift_client.resources.ingestion import TracingConfig + from sift_client.resources.ingestion import StreamingMode, TracingConfig def _to_rust_py_timestamp(time: datetime) -> TimeValuePy: @@ -250,26 +250,32 @@ async def create_sift_stream_instance( cls, api_key: str, grpc_uri: str, - ingestion_config: IngestionConfigFormPy, + ingestion_config_form: IngestionConfigFormPy, run_form: RunFormPy | None = None, run_id: str | None = None, asset_tags: list[str] | None = None, asset_metadata: list[MetadataPy] | None = None, - recovery_strategy: RecoveryStrategyPy | None = None, - checkpoint_interval: DurationPy | None = None, + streaming_mode: StreamingMode = ..., # type: ignore[assignment] + retry_policy: RetryPolicyPy | None = None, + disk_backup_policy: DiskBackupPolicyPy | None = None, + checkpoint_interval_seconds: int | None = None, enable_tls: bool = True, tracing_config: TracingConfig | None = None, ) -> IngestionConfigStreamingLowLevelClient: # Importing here to allow sift_stream_bindings to be an optional dependancy for non-ingestion users # TODO(nathan): Fix bindings to fix mypy issues with tracing functions from sift_stream_bindings import ( # type: ignore[attr-defined] + DurationPy, SiftStreamBuilderPy, init_tracing, # type: ignore[attr-defined] init_tracing_with_file, # type: ignore[attr-defined] is_tracing_initialized, # type: ignore[attr-defined] - ) # type: ignore[attr-defined] + ) + + from sift_client.resources.ingestion import StreamingMode, TracingConfig - from sift_client.resources.ingestion import TracingConfig + if streaming_mode is ...: # type: ignore[comparison-overlap] + streaming_mode = StreamingMode.LIVE_WITH_BACKUPS if not is_tracing_initialized(): if tracing_config is None: @@ -287,21 +293,35 @@ async def create_sift_stream_instance( # Use stdout/stderr only init_tracing(tracing_config.level) - builder = SiftStreamBuilderPy( - uri=grpc_uri, - apikey=api_key, - ) - - builder.enable_tls = enable_tls - builder.ingestion_config = ingestion_config - builder.recovery_strategy = recovery_strategy - builder.checkpoint_interval = checkpoint_interval - builder.asset_tags = asset_tags - builder.metadata = asset_metadata - builder.run = run_form - builder.run_id = run_id - - sift_stream_instance = await builder.build() + sift_builder = SiftStreamBuilderPy(uri=grpc_uri, apikey=api_key) + sift_builder.enable_tls = enable_tls + + config_builder = sift_builder.ingestion_config(ingestion_config_form) + config_builder.run = run_form + config_builder.run_id = run_id + config_builder.asset_tags = asset_tags + config_builder.metadata = asset_metadata + + if streaming_mode == StreamingMode.LIVE_ONLY: + sift_stream_instance = await config_builder.live_only().build() + + elif streaming_mode == StreamingMode.FILE_BACKUP: + fb_builder = config_builder.file_backup() + if disk_backup_policy is not None: + fb_builder.disk_backup_policy = disk_backup_policy + sift_stream_instance = await fb_builder.build() + + else: # LIVE_WITH_BACKUPS (default) + lwb_builder = config_builder.live_with_backups() + if retry_policy is not None: + lwb_builder.retry_policy = retry_policy + if disk_backup_policy is not None: + lwb_builder.disk_backup_policy = disk_backup_policy + if checkpoint_interval_seconds is not None: + lwb_builder.checkpoint_interval = DurationPy( + secs=checkpoint_interval_seconds, nanos=0 + ) + sift_stream_instance = await lwb_builder.build() return cls(sift_stream_instance) @@ -314,10 +334,13 @@ async def batch_send(self, flows: Iterable[FlowPy]): async def send_requests(self, requests: list[IngestWithConfigDataStreamRequestPy]): await self._sift_stream_instance.send_requests(requests) - def send_requests_nonblocking( + def try_send_requests( self, requests: Iterable[IngestWithConfigDataStreamRequestWrapperPy] - ): - self._sift_stream_instance.send_requests_nonblocking(requests) + ) -> None: + self._sift_stream_instance.try_send_requests(requests) + + def try_send(self, flow: FlowPy) -> None: + self._sift_stream_instance.try_send(flow) def get_flow_descriptor(self, flow_name: str) -> FlowDescriptorPy: return self._sift_stream_instance.get_flow_descriptor(flow_name) diff --git a/python/lib/sift_client/resources/__init__.py b/python/lib/sift_client/resources/__init__.py index 2b7a4c55b..f8c7b5af3 100644 --- a/python/lib/sift_client/resources/__init__.py +++ b/python/lib/sift_client/resources/__init__.py @@ -154,7 +154,7 @@ async def main(): from sift_client.resources.calculated_channels import CalculatedChannelsAPIAsync from sift_client.resources.channels import ChannelsAPIAsync from sift_client.resources.file_attachments import FileAttachmentsAPIAsync -from sift_client.resources.ingestion import IngestionAPIAsync, TracingConfig +from sift_client.resources.ingestion import IngestionAPIAsync, StreamingMode, TracingConfig from sift_client.resources.jobs import JobsAPIAsync from sift_client.resources.ping import PingAPIAsync from sift_client.resources.reports import ReportsAPIAsync @@ -200,6 +200,7 @@ async def main(): "FileAttachmentsAPI", "FileAttachmentsAPIAsync", "IngestionAPIAsync", + "StreamingMode", "JobsAPI", "JobsAPIAsync", "PingAPI", diff --git a/python/lib/sift_client/resources/ingestion.py b/python/lib/sift_client/resources/ingestion.py index 69ae78527..fe34eca41 100644 --- a/python/lib/sift_client/resources/ingestion.py +++ b/python/lib/sift_client/resources/ingestion.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +from enum import Enum from typing import TYPE_CHECKING, Iterator from sift_client._internal.low_level_wrappers.ingestion import ( @@ -17,14 +18,12 @@ from sift_stream_bindings import ( DiskBackupPolicyPy, - DurationPy, FlowDescriptorPy, FlowPy, IngestionConfigFormPy, IngestWithConfigDataStreamRequestPy, IngestWithConfigDataStreamRequestWrapperPy, MetadataPy, - RecoveryStrategyPy, RetryPolicyPy, RunFormPy, SiftStreamMetricsSnapshotPy, @@ -36,6 +35,14 @@ logger = logging.getLogger(__name__) +class StreamingMode(str, Enum): + """Selects the SiftStream transport mode.""" + + LIVE_ONLY = "live_only" + LIVE_WITH_BACKUPS = "live_with_backups" + FILE_BACKUP = "file_backup" + + class TracingConfig: """Configuration for tracing in SiftStream. @@ -120,112 +127,6 @@ def with_file( ) -class RecoveryStrategyConfig: - """Configuration for the SiftStream recovery strategy. - - This class provides a Python-friendly interface for configuring the recovery strategy used in SiftStream. - Recovery strategies determine how SiftStream handles failures and retries when ingesting data. - - Recovery strategies control: - - How frequently to retry a failed connection to Sift. - - Whether to use per checkpoint backups to allow re-ingestion of data to Sift after a streaming failure. - - Settings to control the number and size of backup files, and whether to retain backups after verification of successful ingestion into sift. - - Most users should use one of the factory methods: - - `retry_only()` - Only attempt to reconnect to Sift after a connection failure. Any data which failed to be ingested will be lost. - - More performant, but with no guarantee of data ingestion. - - `retry_with_backups()` - Ingestion is checkpointed. If an ingestion issue occurs during a checkpoint, that data will be re-ingested into Sift - asynchronously along with incoming live data. Backup files are generated and by default, cleared after a successful checkpoint or re-ingestion. - """ - - def __init__(self, recovery_strategy_py: RecoveryStrategyPy | None): - """Initialize a RecoveryStrategyConfig. - - Args: - recovery_strategy_py: The underlying RecoveryStrategyPy instance. - If None, uses the default retry_with_backups strategy. - - Note: - Most users should use the factory methods (`retry_only()` or `retry_with_backups()`) - instead of calling this constructor directly. - """ - # Importing here to allow sift_stream_bindings to be an optional dependancy for non-ingestion users - try: - from sift_stream_bindings import DiskBackupPolicyPy, RecoveryStrategyPy, RetryPolicyPy - except ImportError as e: - _sift_stream_bindings_import_error(e) - - # Default to retry_with_backups() - # This is intentionally different from SiftStream, which defaults to retry_only - self._recovery_strategy_py = recovery_strategy_py or RecoveryStrategyPy.retry_with_backups( - retry_policy=RetryPolicyPy.default(), disk_backup_policy=DiskBackupPolicyPy.default() - ) - - def _to_rust_config(self) -> RecoveryStrategyPy: - """Convert to RecoveryStrategyPy for use with the ingestion client. - - Returns: - A RecoveryStrategyPy instance that can be passed to the ingestion client. - """ - return self._recovery_strategy_py - - @classmethod - def retry_only(cls, retry_policy: RetryPolicyPy | None = None) -> RecoveryStrategyConfig: - """Create a recovery strategy that only retries connection failures. - - Args: - retry_policy: Retry policy configuration specifying retry attempts, backoff timing, etc. - If None, uses the default retry policy (5 attempts, 50ms initial backoff, - 5s max backoff, multiplier of 5). - - Returns: - A RecoveryStrategyConfig configured for retry-only strategy. - """ - # Importing here to allow sift_stream_bindings to be an optional dependancy for non-ingestion users - try: - from sift_stream_bindings import RecoveryStrategyPy, RetryPolicyPy - except ImportError as e: - _sift_stream_bindings_import_error(e) - - retry_policy_py = retry_policy or RetryPolicyPy.default() - - recovery_strategy_py = RecoveryStrategyPy.retry_only(retry_policy_py) - return cls(recovery_strategy_py=recovery_strategy_py) - - @classmethod - def retry_with_backups( - cls, - retry_policy: RetryPolicyPy | None = None, - disk_backup_policy: DiskBackupPolicyPy | None = None, - ) -> RecoveryStrategyConfig: - """Create a recovery strategy with retries re-ingestion using disk based backups. - - Args: - retry_policy: Retry policy configuration specifying retry attempts, backoff timing, etc. - If None, uses the default retry policy (5 attempts, 50ms initial backoff, - 5s max backoff, multiplier of 5). - disk_backup_policy: Disk backup policy configuration specifying backup directory, - file size limits, etc. If None, uses the default disk backup policy. - - Returns: - A RecoveryStrategyConfig configured for retry with disk backups. - """ - # Importing here to allow sift_stream_bindings to be an optional dependancy for non-ingestion users - try: - from sift_stream_bindings import DiskBackupPolicyPy, RecoveryStrategyPy, RetryPolicyPy - except ImportError as e: - _sift_stream_bindings_import_error(e) - - retry_policy_py = retry_policy or RetryPolicyPy.default() - disk_backup_policy_py = disk_backup_policy or DiskBackupPolicyPy.default() - - recovery_strategy_py = RecoveryStrategyPy.retry_with_backups( - retry_policy=retry_policy_py, - disk_backup_policy=disk_backup_policy_py, - ) - return cls(recovery_strategy_py=recovery_strategy_py) - - class IngestionAPIAsync(ResourceBase): """High-level API for interacting with ingestion services. @@ -252,7 +153,9 @@ async def create_ingestion_config_streaming_client( run: RunCreate | dict | str | Run | None = None, asset_tags: list[str] | list[Tag] | None = None, asset_metadata: dict[str, str | float | bool] | None = None, - recovery_strategy: RecoveryStrategyConfig | RecoveryStrategyPy | None = None, + streaming_mode: StreamingMode = StreamingMode.LIVE_WITH_BACKUPS, + retry_policy: RetryPolicyPy | None = None, + disk_backup_policy: DiskBackupPolicyPy | None = None, checkpoint_interval_seconds: int | None = None, enable_tls: bool = True, tracing_config: TracingConfig | None = None, @@ -264,11 +167,13 @@ async def create_ingestion_config_streaming_client( run: The run to associate with ingestion. Can be a Run, RunCreate, dict, or run ID string. asset_tags: Tags to associate with the asset. asset_metadata: Metadata to associate with the asset. - recovery_strategy: The recovery strategy to use for ingestion. - checkpoint_interval_seconds: The checkpoint interval in seconds. + streaming_mode: Transport mode for the stream. Defaults to LIVE_WITH_BACKUPS. + retry_policy: Retry policy for LIVE_WITH_BACKUPS mode. + disk_backup_policy: Disk backup policy for LIVE_WITH_BACKUPS or FILE_BACKUP mode. + checkpoint_interval_seconds: Checkpoint interval in seconds (LIVE_WITH_BACKUPS only). enable_tls: Whether to enable TLS for the connection. - tracing_config: Configuration for SiftStream tracing. Use TracingConfig.stdout_only() - to enable tracing to stdout only, or TracingConfig.stdout_with_file() to enable + tracing_config: Configuration for SiftStream tracing. Use TracingConfig.console_only() + to enable tracing to stdout only, or TracingConfig.with_file() to enable tracing to both stdout and rolling log files. Defaults to None (tracing will be initialized with default settings if not already initialized). @@ -281,7 +186,9 @@ async def create_ingestion_config_streaming_client( run=run, asset_tags=asset_tags, asset_metadata=asset_metadata, - recovery_strategy=recovery_strategy, + streaming_mode=streaming_mode, + retry_policy=retry_policy, + disk_backup_policy=disk_backup_policy, checkpoint_interval_seconds=checkpoint_interval_seconds, enable_tls=enable_tls, tracing_config=tracing_config, @@ -314,7 +221,9 @@ async def _create( run: RunCreate | dict | str | Run | RunFormPy | None = None, asset_tags: list[str] | list[Tag] | None = None, asset_metadata: dict[str, str | float | bool] | None = None, - recovery_strategy: RecoveryStrategyConfig | RecoveryStrategyPy | None = None, + streaming_mode: StreamingMode = StreamingMode.LIVE_WITH_BACKUPS, + retry_policy: RetryPolicyPy | None = None, + disk_backup_policy: DiskBackupPolicyPy | None = None, checkpoint_interval_seconds: int | None = None, enable_tls: bool = True, tracing_config: TracingConfig | None = None, @@ -327,8 +236,10 @@ async def _create( run: The run to associate with ingestion. Can be a Run, RunCreate, dict, or run ID string. asset_tags: Tags to associate with the asset. asset_metadata: Metadata to associate with the asset. - recovery_strategy: The recovery strategy to use for ingestion. - checkpoint_interval_seconds: The checkpoint interval in seconds. + streaming_mode: Transport mode for the stream. Defaults to LIVE_WITH_BACKUPS. + retry_policy: Retry policy for LIVE_WITH_BACKUPS mode. + disk_backup_policy: Disk backup policy for LIVE_WITH_BACKUPS or FILE_BACKUP mode. + checkpoint_interval_seconds: Checkpoint interval in seconds (LIVE_WITH_BACKUPS only). enable_tls: Whether to enable TLS for the connection. tracing_config: Configuration for SiftStream tracing. Use TracingConfig.console_only() to enable tracing to stdout only, or TracingConfig.with_file() to enable @@ -341,11 +252,9 @@ async def _create( # Importing here to allow sift_stream_bindings to be an optional dependancy for non-ingestion users try: from sift_stream_bindings import ( - DurationPy, IngestionConfigFormPy, MetadataPy, MetadataValuePy, - RecoveryStrategyPy, RunFormPy, ) except ImportError as e: @@ -373,13 +282,6 @@ async def _create( else: ingestion_config_form = ingestion_config - # Convert the recovery strategy variants - recovery_strategy_py: RecoveryStrategyPy | None = None - if isinstance(recovery_strategy, RecoveryStrategyConfig): - recovery_strategy_py = recovery_strategy._to_rust_config() - elif isinstance(recovery_strategy, RecoveryStrategyPy): - recovery_strategy_py = recovery_strategy - # Convert the run variants to a run or run_id run_form: RunFormPy | None = None run_id: str | None = None @@ -408,21 +310,18 @@ async def _create( for key, value in asset_metadata.items() ] - # Convert checkpoint_interval_seconds to DurationPy - checkpoint_interval: DurationPy | None = None - if checkpoint_interval_seconds is not None: - checkpoint_interval = DurationPy(secs=checkpoint_interval_seconds, nanos=0) - low_level_client = await IngestionConfigStreamingLowLevelClient.create_sift_stream_instance( api_key=api_key, grpc_uri=grpc_uri, - ingestion_config=ingestion_config_form, + ingestion_config_form=ingestion_config_form, run_form=run_form, run_id=run_id, asset_tags=asset_tags_list, asset_metadata=asset_metadata_list, - recovery_strategy=recovery_strategy_py, - checkpoint_interval=checkpoint_interval, + streaming_mode=streaming_mode, + retry_policy=retry_policy, + disk_backup_policy=disk_backup_policy, + checkpoint_interval_seconds=checkpoint_interval_seconds, enable_tls=enable_tls, tracing_config=tracing_config, ) @@ -493,10 +392,10 @@ async def send_requests(self, requests: list[IngestWithConfigDataStreamRequestPy """ await self._low_level_client.send_requests(requests) - def send_requests_nonblocking( + def try_send_requests( self, requests: Iterable[IngestWithConfigDataStreamRequestWrapperPy] - ): - """Send data in a manner identical to the raw gRPC service for ingestion-config based streaming. + ) -> None: + """Send data non-blocking in a manner identical to the raw gRPC service for ingestion-config based streaming. This method offers a way to send data that matches the raw gRPC service interface. You are expected to handle channel value ordering as well as empty values correctly. @@ -506,9 +405,21 @@ def send_requests_nonblocking( building of the request. Args: - requests: List of ingestion requests to send to Sift. + requests: Iterable of ingestion requests to send to Sift. """ - self._low_level_client.send_requests_nonblocking(requests) + self._low_level_client.try_send_requests(requests) + + def try_send(self, flow: Flow | FlowPy) -> None: + """Non-blocking send — returns immediately without awaiting channel capacity. + + Args: + flow: The flow to send to Sift. + """ + if isinstance(flow, Flow): + flow_py = flow._to_rust_form() + else: + flow_py = flow + self._low_level_client.try_send(flow_py) def get_flow_descriptor(self, flow_name: str) -> FlowDescriptorPy: """Retrieve a flow descriptor by name. diff --git a/python/pyproject.toml b/python/pyproject.toml index 042ec6b31..458b6a2a1 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "sift_stack_py" -version = "0.14.1" +version = "0.15.0" description = "Python client library for the Sift API" requires-python = ">=3.8" readme = { file = "README.md", content-type = "text/markdown" } @@ -65,7 +65,7 @@ all = [ 'pyOpenSSL<24.0.0', 'pyarrow>=17.0.0', 'rosbags~=0.0', - 'sift-stream-bindings==0.2.2', + 'sift-stream-bindings==0.3.0', 'types-pyOpenSSL<24.0.0', ] build = [ @@ -106,7 +106,7 @@ dev-all = [ 'pytest==8.2.2', 'rosbags~=0.0', 'ruff~=0.12.10', - 'sift-stream-bindings==0.2.2', + 'sift-stream-bindings==0.3.0', 'tomlkit~=0.13.3', 'types-pyOpenSSL<24.0.0', ] @@ -159,7 +159,7 @@ docs-build = [ 'pytest==8.2.2', 'rosbags~=0.0', 'ruff~=0.12.10', - 'sift-stream-bindings==0.2.2', + 'sift-stream-bindings==0.3.0', 'tomlkit~=0.13.3', 'types-pyOpenSSL<24.0.0', ] @@ -182,10 +182,10 @@ rosbags = [ 'rosbags~=0.0', ] sift-stream = [ - 'sift-stream-bindings==0.2.2', + 'sift-stream-bindings==0.3.0', ] sift-stream-bindings = [ - 'sift-stream-bindings==0.2.2', + 'sift-stream-bindings==0.3.0', ] tdms = [ 'npTDMS~=1.9', @@ -221,7 +221,7 @@ docs = ["mkdocs==1.6.1", openssl = ["pyOpenSSL<24.0.0", "types-pyOpenSSL<24.0.0", "cffi~=1.14"] tdms = ["npTDMS~=1.9"] rosbags = ["rosbags~=0.0"] -sift-stream = ["sift-stream-bindings==0.2.2"] +sift-stream = ["sift-stream-bindings==0.3.0"] hdf5 = ["h5py~=3.11", "polars~=1.8"] # polars is only used by sift_py; remove once sift_py is fully deprecated data-review = ["pyarrow>=17.0.0"] @@ -396,4 +396,3 @@ testpaths = [ markers = [ "integration: mark a test as an integration test (requires API)" ] - diff --git a/rust/crates/sift_stream_bindings/sift_stream_bindings.pyi b/rust/crates/sift_stream_bindings/sift_stream_bindings.pyi index d894c3be3..a4c0d166c 100644 --- a/rust/crates/sift_stream_bindings/sift_stream_bindings.pyi +++ b/rust/crates/sift_stream_bindings/sift_stream_bindings.pyi @@ -539,7 +539,7 @@ class SiftStreamBuilderPy: r""" Whether TLS is enabled. Defaults to `True`. Set to `False` for local testing only. """ - ingestion_config: typing.Optional[IngestionConfigFormPy] + ingestion_config_form: typing.Optional[IngestionConfigFormPy] r""" Ingestion config form. Must be set before calling [`build()`][SiftStreamBuilderPy::build]. """ @@ -567,7 +567,7 @@ class SiftStreamBuilderPy: This is the quick path: `ingestion_config` must be set; all other fields are optional. For other modes (checkpointing, disk backups, tunable capacities), use - [`ingestion_config()`][SiftStreamBuilderPy::ingestion_config] to advance to the full + [`ingestion_config()`][SiftStreamBuilderPy::ingestion_config_form] to advance to the full builder chain. Returns a coroutine that resolves to a [`SiftStreamPy`]. diff --git a/rust/crates/sift_stream_bindings/src/stream/builder.rs b/rust/crates/sift_stream_bindings/src/stream/builder.rs index ba2286bbf..63eff8b77 100644 --- a/rust/crates/sift_stream_bindings/src/stream/builder.rs +++ b/rust/crates/sift_stream_bindings/src/stream/builder.rs @@ -38,7 +38,7 @@ pub struct SiftStreamBuilderPy { enable_tls: bool, /// Ingestion config form. Must be set before calling [`build()`][SiftStreamBuilderPy::build]. #[pyo3(get, set)] - ingestion_config: Option, + ingestion_config_form: Option, /// Optional run to associate with the stream. Mutually exclusive with `run_id`; /// if both are set, `run_id` takes precedence. #[pyo3(get, set)] @@ -64,7 +64,7 @@ impl SiftStreamBuilderPy { uri: uri.into(), apikey: apikey.into(), enable_tls: true, - ingestion_config: None, + ingestion_config_form: None, run: None, run_id: None, asset_tags: None, @@ -76,12 +76,12 @@ impl SiftStreamBuilderPy { /// /// This is the quick path: `ingestion_config` must be set; all other fields are optional. /// For other modes (checkpointing, disk backups, tunable capacities), use - /// [`ingestion_config()`][SiftStreamBuilderPy::ingestion_config] to advance to the full + /// [`ingestion_config()`][SiftStreamBuilderPy::ingestion_config_form] to advance to the full /// builder chain. /// /// Returns a coroutine that resolves to a [`SiftStreamPy`]. pub fn build(&mut self, py: Python) -> PyResult> { - let ingestion_config = self.ingestion_config.clone().ok_or_else(|| { + let ingestion_config = self.ingestion_config_form.clone().ok_or_else(|| { pyo3::exceptions::PyValueError::new_err( "ingestion_config must be set before calling build()", )